Statistics
| Branch: | Revision:

ml / util / rateLimiter.c @ b71dda24

History | View | Annotate | Download (3.84 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
//#include <sys/time.h>
11

    
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

    
17
#include "udpSocket.h"
18
#include "rateLimiter.h"
19
#include "queueManagement.h"
20

    
21

    
22
extern struct event_base *base;
23
static long bucket_size = 0;
24
static int drain_rate = 0;
25

    
26
static long bytes_in_bucket = 0;
27
struct timeval bib_then = { 0, 0};
28

    
29
void planFreeSpaceInBucketEvent();
30

    
31
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
32

    
33
        /*struct timeval test;
34

35
        gettimeofday(&test,NULL);
36

37
        int us;
38

39
        if(test.tv_usec > (int) arg)
40
                us = test.tv_usec - (int) arg;
41
        else
42
                us = 1000000 + test.tv_usec - (int) arg;
43

44
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
45

    
46
        while(outputRateControl(getFirstPacketSize()) == OK) {        
47

    
48

    
49
                PacketContainer* packet = takePacketToSend();
50

    
51
                if (packet == NULL) return;
52

    
53
                struct timeval now;
54
                   gettimeofday(&now, NULL);
55
                bib_then = now;
56

    
57
#ifdef RTX
58
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
59
#endif
60

    
61
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
62
        }
63

    
64
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
65
        
66
        return;
67
}
68

    
69
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
70
        
71
        int rate = getDrainRate();
72

    
73
        int TXtime_sec = bytes / rate;                        //seconds
74
        int TXtime_usec = ((bytes - (TXtime_sec * rate) ) * 1000000) / rate;                //microseconds
75

    
76
        struct timeval TXtime = {TXtime_sec, TXtime_usec};        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
77

    
78
        //struct timeval test;
79

    
80
        //gettimeofday(&test,NULL);
81

    
82
        //fprintf(stderr,"Planing event for one packet in: %d microseconds\n",TXtime_usec);
83
        struct event *ev;
84
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
85
        event_add(ev,&TXtime);
86
}
87

    
88
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
89
{
90
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
91

    
92
        if(!(priority & HP)) {
93
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
94
                        return addPacketTXqueue(newPacket);
95
                }        
96
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
97
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
98
                        return addPacketTXqueue(newPacket);
99
                }
100
        }
101
#ifdef RTX
102
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
103
#endif
104

    
105
        return sendPacket(udpSocket, iov, 4, socketaddr);
106
}
107

    
108
void setOutputRateParams(int bucketsize, int drainrate) {                        //given in kBytes and kBytes/s
109
     bucket_size = bucketsize*1024;                //now it is in Bytes
110
     outputRateControl(0);
111
     drain_rate = drainrate; 
112
}
113

    
114
int getDrainRate () {                        //in Bytes!!!!!!!!!!
115
        return drain_rate * 1024;
116
}
117

    
118
int outputRateControl(int len) {
119
   struct timeval now;
120
   gettimeofday(&now, NULL);
121
   if(drain_rate <= 0) {
122
      bytes_in_bucket = 0;
123
      bib_then = now;
124
      return OK;
125
   }
126
   else {   
127

    
128
        long leaked;
129
        int total_drain_secs = bytes_in_bucket / (drain_rate *1024) + 1; 
130
       if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
131
           bytes_in_bucket = 0;
132
       }
133
       else {
134
          leaked = drain_rate * 1024 * (now.tv_sec - bib_then.tv_sec);
135
          leaked += drain_rate * 1024 * (now.tv_usec - bib_then.tv_usec) / 1000000; 
136
          if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
137
          else bytes_in_bucket -= leaked;
138
       }
139

    
140
       bib_then = now;
141
       if(bytes_in_bucket + len <= bucket_size) {
142
              bytes_in_bucket += len;
143
              return OK;
144
       }
145
       else { 
146
                return THROTTLE;
147
        }
148
   }
149
}
150

    
151