Statistics
| Branch: | Revision:

napa-baselibs / ml / util / rateLimiter.c @ 32c04c21

History | View | Annotate | Download (3.31 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
#include <ml_all.h>
11

    
12
extern struct event_base *base;
13
static long bucket_size = 0;
14
static int64_t drain_rate = 0;
15

    
16

    
17
static long bytes_in_bucket = 0;
18
struct timeval bib_then = { 0, 0};
19

    
20
void planFreeSpaceInBucketEvent();
21

    
22
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
23

    
24
        /*struct timeval test;
25

26
        gettimeofday(&test,NULL);
27

28
        int us;
29

30
        if(test.tv_usec > (int) arg)
31
                us = test.tv_usec - (int) arg;
32
        else
33
                us = 1000000 + test.tv_usec - (int) arg;
34

35
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
36

    
37
        while((!isQueueEmpty()) && (outputRateControl(getFirstPacketSize()) == OK)) {        
38

    
39

    
40
                PacketContainer* packet = takePacketToSend();
41

    
42
                if (packet == NULL) return;
43

    
44
                struct timeval now;
45
                   gettimeofday(&now, NULL);
46
                bib_then = now;
47

    
48
#ifdef RTX
49
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
50
#endif
51

    
52
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
53
        }
54

    
55
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
56
        
57
        return;
58
}
59

    
60
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
61
        struct timeval TXtime;
62
        struct event *ev;
63

    
64
        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
65
        TXtime.tv_sec = bytes / drain_rate; //seconds
66
        TXtime.tv_usec = (bytes - TXtime.tv_sec * drain_rate) * 1000000 / drain_rate; //microseconds
67

    
68
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
69
        event_add(ev, &TXtime);
70
}
71

    
72
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
73
{
74
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
75

    
76
        if(!(priority & HP)) {
77
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
78
                        return addPacketTXqueue(newPacket);
79
                }        
80
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
81
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
82
                        return addPacketTXqueue(newPacket);
83
                }
84
        }
85
#ifdef RTX
86
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
87
#endif
88

    
89
        return sendPacket(udpSocket, iov, 4, socketaddr);
90
}
91

    
92
void setOutputRateParams(int bucketsize, int drainrate) { //given in Bytes and Bits/s
93
     bucket_size = bucketsize;
94
     outputRateControl(0);
95
     drain_rate = drainrate >> 3; //now in bytes/s
96
}
97

    
98
int outputRateControl(int len) {
99
        struct timeval now;
100
        gettimeofday(&now, NULL);
101

    
102
        if(drain_rate <= 0) {
103
                bytes_in_bucket = 0;
104
                bib_then = now;
105
                return OK;
106
        } else {
107
                unsigned int leaked;
108
                int total_drain_secs = bytes_in_bucket / (drain_rate) + 1;
109

    
110
                if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
111
                                bytes_in_bucket = 0;
112
                } else {
113
                        leaked = drain_rate * (now.tv_sec - bib_then.tv_sec);
114
                        leaked += drain_rate * (now.tv_usec - bib_then.tv_usec) / 1000000; 
115
                        if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
116
                                        else bytes_in_bucket -= leaked;
117
                }
118

    
119
                bib_then = now;
120
                if(bytes_in_bucket + len <= bucket_size) {
121
                        bytes_in_bucket += len;
122
                        return OK;
123
                } else {
124
                        return THROTTLE;
125
                }
126
   }
127
}