Statistics
| Branch: | Revision:

napa-baselibs / ml / util / rateLimiter.c @ 1859b483

History | View | Annotate | Download (3.47 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
//#include <sys/time.h>
11

    
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

    
17
#include "udpSocket.h"
18
#include "rateLimiter.h"
19
#include "queueManagement.h"
20

    
21

    
22
extern struct event_base *base;
23
static long bucket_size = 0;
24
static int64_t drain_rate = 0;
25

    
26

    
27
static long bytes_in_bucket = 0;
28
struct timeval bib_then = { 0, 0};
29

    
30
void planFreeSpaceInBucketEvent();
31

    
32
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
33

    
34
        /*struct timeval test;
35

36
        gettimeofday(&test,NULL);
37

38
        int us;
39

40
        if(test.tv_usec > (int) arg)
41
                us = test.tv_usec - (int) arg;
42
        else
43
                us = 1000000 + test.tv_usec - (int) arg;
44

45
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
46

    
47
        while((!isQueueEmpty()) && (outputRateControl(getFirstPacketSize()) == OK)) {        
48

    
49

    
50
                PacketContainer* packet = takePacketToSend();
51

    
52
                if (packet == NULL) return;
53

    
54
                struct timeval now;
55
                   gettimeofday(&now, NULL);
56
                bib_then = now;
57

    
58
#ifdef RTX
59
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
60
#endif
61

    
62
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
63
        }
64

    
65
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
66
        
67
        return;
68
}
69

    
70
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
71
        struct timeval TXtime;
72
        struct event *ev;
73

    
74
        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
75
        TXtime.tv_sec = bytes / drain_rate; //seconds
76
        TXtime.tv_usec = (bytes - TXtime.tv_sec * drain_rate) * 1000000 / drain_rate; //microseconds
77

    
78
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
79
        event_add(ev, &TXtime);
80
}
81

    
82
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
83
{
84
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
85

    
86
        if(!(priority & HP)) {
87
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
88
                        return addPacketTXqueue(newPacket);
89
                }        
90
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
91
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
92
                        return addPacketTXqueue(newPacket);
93
                }
94
        }
95
#ifdef RTX
96
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
97
#endif
98

    
99
        return sendPacket(udpSocket, iov, 4, socketaddr);
100
}
101

    
102
void setOutputRateParams(int bucketsize, int drainrate) { //given in Bytes and Bits/s
103
     bucket_size = bucketsize;
104
     outputRateControl(0);
105
     drain_rate = drainrate >> 3; //now in bytes/s
106
}
107

    
108
int outputRateControl(int len) {
109
        struct timeval now;
110
        gettimeofday(&now, NULL);
111

    
112
        if(drain_rate <= 0) {
113
                bytes_in_bucket = 0;
114
                bib_then = now;
115
                return OK;
116
        } else {
117
                long leaked;
118
                int total_drain_secs = bytes_in_bucket / (drain_rate) + 1;
119

    
120
                if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
121
                                bytes_in_bucket = 0;
122
                } else {
123
                        leaked = drain_rate * (now.tv_sec - bib_then.tv_sec);
124
                        leaked += drain_rate * (now.tv_usec - bib_then.tv_usec) / 1000000; 
125
                        if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
126
                                        else bytes_in_bucket -= leaked;
127
                }
128

    
129
                bib_then = now;
130
                if(bytes_in_bucket + len <= bucket_size) {
131
                        bytes_in_bucket += len;
132
                        return OK;
133
                } else {
134
                        return THROTTLE;
135
                }
136
   }
137
}