Statistics
| Branch: | Revision:

napa-baselibs / ml / util / rateLimiter.c @ 612e856d

History | View | Annotate | Download (3.81 KB)

1
/*
2
 *          
3
 *
4
 *        upgraded rateControl - token bucket
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10
//#include <sys/time.h>
11

    
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

    
17
#include "udpSocket.h"
18
#include "rateLimiter.h"
19
#include "queueManagement.h"
20

    
21

    
22
extern struct event_base *base;
23
static long bucket_size = 0;
24
static int drain_rate = 0;
25

    
26
static long bytes_in_bucket = 0;
27
struct timeval bib_then = { 0, 0};
28

    
29
void planFreeSpaceInBucketEvent();
30

    
31
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
32

    
33
        /*struct timeval test;
34

35
        gettimeofday(&test,NULL);
36

37
        int us;
38

39
        if(test.tv_usec > (int) arg)
40
                us = test.tv_usec - (int) arg;
41
        else
42
                us = 1000000 + test.tv_usec - (int) arg;
43

44
        fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
45

    
46
        while(outputRateControl(getFirstPacketSize()) == OK) {        
47

    
48

    
49
                PacketContainer* packet = takePacketToSend();
50

    
51
                if (packet == NULL) return;
52

    
53
                struct timeval now;
54
                   gettimeofday(&now, NULL);
55
                bib_then = now;
56

    
57
                if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
58

    
59
                sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
60
        }
61

    
62
        if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
63
        
64
        return;
65
}
66

    
67
void planFreeSpaceInBucketEvent(int bytes) {                //plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
68
        
69
        int rate = getDrainRate();
70

    
71
        int TXtime_sec = bytes / rate;                        //seconds
72
        int TXtime_usec = ((bytes - (TXtime_sec * rate) ) * 1000000) / rate;                //microseconds
73

    
74
        struct timeval TXtime = {TXtime_sec, TXtime_usec};        //time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
75

    
76
        //struct timeval test;
77

    
78
        //gettimeofday(&test,NULL);
79

    
80
        //fprintf(stderr,"Planing event for one packet in: %d microseconds\n",TXtime_usec);
81
        struct event *ev;
82
        ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
83
        event_add(ev,&TXtime);
84
}
85

    
86
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
87
{
88
        PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
89

    
90
        if(!(priority & HP)) {
91
                if (!isQueueEmpty()) {                                                //some packets are already waiting, "I am for sure after them"
92
                        return addPacketTXqueue(newPacket);
93
                }        
94
                else if(outputRateControl(newPacket->pktLen) != OK) {                        //queue is empty, not enough space in bucket - "I will be first in the queue"
95
                        planFreeSpaceInBucketEvent(newPacket->pktLen);                //when there will be enough space in the bucket for the first packet from the queue
96
                        return addPacketTXqueue(newPacket);
97
                }
98
        }
99
        if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
100
        
101
        return sendPacket(udpSocket, iov, 4, socketaddr);
102
}
103

    
104
void setOutputRateParams(int bucketsize, int drainrate) {                        //given in kBytes and kBytes/s
105
     bucket_size = bucketsize*1024;                //now it is in Bytes
106
     outputRateControl(0);
107
     drain_rate = drainrate; 
108
}
109

    
110
int getDrainRate () {                        //in Bytes!!!!!!!!!!
111
        return drain_rate * 1024;
112
}
113

    
114
int outputRateControl(int len) {
115
   struct timeval now;
116
   gettimeofday(&now, NULL);
117
   if(drain_rate <= 0) {
118
      bytes_in_bucket = 0;
119
      bib_then = now;
120
      return OK;
121
   }
122
   else {   
123

    
124
        long leaked;
125
        int total_drain_secs = bytes_in_bucket / (drain_rate *1024) + 1; 
126
       if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
127
           bytes_in_bucket = 0;
128
       }
129
       else {
130
          leaked = drain_rate * 1024 * (now.tv_sec - bib_then.tv_sec);
131
          leaked += drain_rate * 1024 * (now.tv_usec - bib_then.tv_usec) / 1000000; 
132
          if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
133
          else bytes_in_bucket -= leaked;
134
       }
135

    
136
       bib_then = now;
137
       if(bytes_in_bucket + len <= bucket_size) {
138
              bytes_in_bucket += len;
139
              return OK;
140
       }
141
       else { 
142
                return THROTTLE;
143
        }
144
   }
145
}
146

    
147