Statistics
| Branch: | Revision:

ml / util / rateLimiter.c @ 6575ae37

History | View | Annotate | Download (3.38 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
#include <ml_all.h>
11

    
12
extern struct event_base *base;
13
static long bucket_size = 0;
14
static int64_t drain_rate = 0;
15

    
16

    
17
static long bytes_in_bucket = 0;
18
struct timeval bib_then = { 0, 0};
19

    
20
void planFreeSpaceInBucketEvent();
21

    
22
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
23

    
24
        /*struct timeval test;
25

26
        gettimeofday(&test,NULL);
27

28
        int us;
29

30
        if(test.tv_usec > (int) arg)
31
                us = test.tv_usec - (int) arg;
32
        else
33
                us = 1000000 + test.tv_usec - (int) arg;
34

35
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
36

    
37
        while((!isQueueEmpty()) && (outputRateControl(getFirstPacketSize()) == OK)) {        
38

    
39

    
40
                PacketContainer* packet = takePacketToSend();
41

    
42
                if (packet == NULL) return;
43

    
44
                struct timeval now;
45
                   gettimeofday(&now, NULL);
46
                bib_then = now;
47

    
48
#ifdef RTX
49
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
50
#endif
51

    
52
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
53
        }
54

    
55
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
56
        
57
        return;
58
}
59

    
60
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
61
        struct timeval TXtime;
62
        struct event *ev;
63

    
64
        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
65
        TXtime.tv_sec = bytes / drain_rate; //seconds
66
        TXtime.tv_usec = (bytes - TXtime.tv_sec * drain_rate) * 1000000 / drain_rate; //microseconds
67

    
68
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
69
        event_add(ev, &TXtime);
70
}
71

    
72
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
73
{
74
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
75

    
76
debug("QUUEO1");
77
debug("QUUEO1");
78
        if(!(priority & HP)) {
79
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
80
                        return addPacketTXqueue(newPacket);
81
                }        
82
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
83
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
84
                        return addPacketTXqueue(newPacket);
85
                }
86
        }
87
debug("QUUEO1c\n");
88
#ifdef RTX
89
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
90
#endif
91

    
92
debug("QUUEO2");
93
        return sendPacket(udpSocket, iov, 4, socketaddr);
94
}
95

    
96
void setOutputRateParams(int bucketsize, int drainrate) { //given in Bytes and Bits/s
97
     bucket_size = bucketsize;
98
     outputRateControl(0);
99
     drain_rate = drainrate >> 3; //now in bytes/s
100
}
101

    
102
int outputRateControl(int len) {
103
        struct timeval now;
104
        gettimeofday(&now, NULL);
105

    
106
        if(drain_rate <= 0) {
107
                bytes_in_bucket = 0;
108
                bib_then = now;
109
                return OK;
110
        } else {
111
                unsigned int leaked;
112
                int total_drain_secs = bytes_in_bucket / (drain_rate) + 1;
113

    
114
                if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
115
                                bytes_in_bucket = 0;
116
                } else {
117
                        leaked = drain_rate * (now.tv_sec - bib_then.tv_sec);
118
                        leaked += drain_rate * (now.tv_usec - bib_then.tv_usec) / 1000000; 
119
                        if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
120
                                        else bytes_in_bucket -= leaked;
121
                }
122

    
123
                bib_then = now;
124
                if(bytes_in_bucket + len <= bucket_size) {
125
                        bytes_in_bucket += len;
126
                        return OK;
127
                } else {
128
                        return THROTTLE;
129
                }
130
   }
131
}