Statistics
| Branch: | Revision:

napa-baselibs / ml / util / rateLimiter.c @ d05148bb

History | View | Annotate | Download (3.47 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
#include <ml_all.h>
11

    
12
extern struct event_base *base;
13
static long bucket_size = 0;
14
static int64_t drain_rate = 0;
15

    
16

    
17
static long bytes_in_bucket = 0;
18
struct timeval bib_then = { 0, 0};
19

    
20
void planFreeSpaceInBucketEvent();
21

    
22
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
23

    
24
        /*struct timeval test;
25

26
        gettimeofday(&test,NULL);
27

28
        int us;
29

30
        if(test.tv_usec > (int) arg)
31
                us = test.tv_usec - (int) arg;
32
        else
33
                us = 1000000 + test.tv_usec - (int) arg;
34

35
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
36

    
37
        while((!isQueueEmpty()) && (outputRateControl(getFirstPacketSize()) == OK)) {        
38

    
39

    
40
                PacketContainer* packet = takePacketToSend();
41

    
42
                if (packet == NULL) return;
43

    
44
                struct timeval now;
45
                   gettimeofday(&now, NULL);
46
                bib_then = now;
47

    
48
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
49

    
50
#ifdef RTX
51
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
52
                else destroyPacketContainer(packet);
53
#else
54
                destroyPacketContainer(packet);
55
#endif
56

    
57
        }
58

    
59
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
60
        
61
        return;
62
}
63

    
64
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
65
        struct timeval TXtime;
66
        struct event *ev;
67

    
68
        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
69
        TXtime.tv_sec = bytes / drain_rate; //seconds
70
        TXtime.tv_usec = (bytes - TXtime.tv_sec * drain_rate) * 1000000 / drain_rate; //microseconds
71

    
72
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
73
        event_add(ev, &TXtime);
74
}
75

    
76
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
77
{
78
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
79

    
80
        if(!(priority & HP)) {
81
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
82
                        return addPacketTXqueue(newPacket);
83
                }        
84
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
85
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
86
                        return addPacketTXqueue(newPacket);
87
                }
88
        }
89
#ifdef RTX
90
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
91
        else destroyPacketContainer(newPacket);
92
#else
93
        destroyPacketContainer(newPacket);
94
#endif
95

    
96
        return sendPacket(udpSocket, iov, 4, socketaddr);
97
}
98

    
99
void setOutputRateParams(int bucketsize, int drainrate) { //given in Bytes and Bits/s
100
     bucket_size = bucketsize;
101
     outputRateControl(0);
102
     drain_rate = drainrate >> 3; //now in bytes/s
103
}
104

    
105
int outputRateControl(int len) {
106
        struct timeval now;
107
        gettimeofday(&now, NULL);
108

    
109
        if(drain_rate <= 0) {
110
                bytes_in_bucket = 0;
111
                bib_then = now;
112
                return OK;
113
        } else {
114
                unsigned int leaked;
115
                int total_drain_secs = bytes_in_bucket / (drain_rate) + 1;
116

    
117
                if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
118
                                bytes_in_bucket = 0;
119
                } else {
120
                        leaked = drain_rate * (now.tv_sec - bib_then.tv_sec);
121
                        leaked += drain_rate * (now.tv_usec - bib_then.tv_usec) / 1000000; 
122
                        if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
123
                                        else bytes_in_bucket -= leaked;
124
                }
125

    
126
                bib_then = now;
127
                if(bytes_in_bucket + len <= bucket_size) {
128
                        bytes_in_bucket += len;
129
                        return OK;
130
                } else {
131
                        return THROTTLE;
132
                }
133
   }
134
}