Revision 612e856d

View differences:

ml/Makefile.am
6 6
noinst_LIBRARIES = libml.a
7 7

  
8 8
libml_a_SOURCES = BUGS.txt ml.c ml_log.c util/stun.c \
9
	util/udpSocket.c util/rateControl.c
9
	util/udpSocket.c util/rateLimiter.c util/queueManagement.c
10 10
# transmissionHandler.c
11 11

  
12 12
# testMessaginglayer: echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c
13
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c util/rateControl.c ${EVENT} ${MATH} ${GDB}
13
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c ${EVENT} ${MATH} ${GDB}
14 14

  
15 15
#libml_a_LIBADD = $(top_builddir)/dclog/libdclog.a
ml/include/ml.h
51 51
#include <stdint.h>
52 52
#include <sys/time.h>
53 53

  
54

  
55 54
/**
56 55
 * @brief The size of a socketID
57 56
 */
......
262 261
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg);
263 262

  
264 263
/**
265
  * Configure the parameters for output rate control.
266
  * These values may also be set while packets are being transmitted.
267
  * @param bucketsize The size of the bucket in kbytes
268
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
269
*/
270
void mlSetThrottle(int bucketsize, int drainrate);
271

  
272
/**
273 264
  * Configure the verbosity of messages
274 265
  * @param log_level [0-4] the lower the less messages are printed out
275 266
*/
......
523 514
 */
524 515
int mlGetPathMTU(int ConnectionId);
525 516

  
517

  
518

  
519
/**
520
  * Configure the parameters for output rate control.
521
  * @param bucketsize The size of the bucket in kbytes
522
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
523
  * @param maxQueueSize In kbytes. Max data stored while limiting the output rate. If 0 packets limitted by drainrate are dropped.
524
  * @param maxQueueSizeRTX In kbytes. Max data waiting for the retransmission if needed.
525
  * @param maxTimeToHold. Time for which sent packets are stored in RTX queue in seconds.
526
*/
527
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold);
528

  
529

  
526 530
#ifdef __cplusplus
527 531
}
528 532
#endif
ml/ml.c
39 39
#include <stdint.h>
40 40
#include <string.h>
41 41
#include <sys/types.h>
42
#include <event2/event.h>
43 42
#include <time.h>
44 43
#include <math.h>
45 44
#include <assert.h>
......
60 59
#include "util/udpSocket.h"
61 60
#include "util/stun.h"
62 61
#include "transmissionHandler.h"
62
#include "util/rateLimiter.h"
63
#include "util/queueManagement.h"
63 64

  
64 65
#define LOG_MODULE "[ml] "
65 66
#include "ml_log.h"
......
330 331
			}
331 332

  
332 333
			debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
333
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
334
			int priority = 0; 
335
			if (msg_type == ML_CON_MSG) priority = HP;
336
			switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
334 337
				case MSGLEN:
335 338
					info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
336 339
					// TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
......
399 402
  {
400 403
                        char buf[SOCKETID_STRING_SIZE];
401 404
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
402
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %d\n", buf, sizeof(struct conn_msg));
405
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
403 406
   }
404 407
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
405 408
}
......
1355 1358
	return create_socket(port, ipaddr);
1356 1359
}
1357 1360

  
1358
void mlSetThrottle(int bucketsize, int drainrate) {
1361
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1359 1362
        setOutputRateParams(bucketsize, drainrate);
1363
	setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1360 1364
}
1361

  
1365
     
1362 1366
void mlSetVerbosity (int log_level) {
1363 1367
	setLogLevel(log_level);
1364 1368
}
ml/util/queueManagement.c
1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10

  
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <math.h>
14

  
15
#include <unistd.h>
16
#include <string.h>
17
#include <sys/uio.h>
18
#include <netinet/in.h>
19

  
20
#include "queueManagement.h"
21
#include "udpSocket.h"
22

  
23
	FILE *file = NULL;
24

  
25
PacketQueue TXqueue;
26
PacketQueue RTXqueue;
27

  
28
int TXmaxSize = 6000*1500;			//bytes
29
int RTXmaxSize = 6000*1500;			//bytes
30

  
31
struct timeval maxTimeToHold = {5,0};
32

  
33
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
34
	
35
	PacketContainer *packet = malloc(sizeof(PacketContainer));
36
	packet->udpSocket = uSoc;
37
	packet->iovlen = iovlen;
38
	packet->next = NULL;
39
	packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
40

  
41
	packet->priority = prior;
42

  
43
 	int i;
44
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
45
        for (i=0; i<iovlen; i++){
46
                packet->iov[i].iov_len = ioVector[i].iov_len;
47
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
48
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
49
        }
50

  
51
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
52
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
53

  
54
	int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
55
	return packet;
56
}
57

  
58
void destroyPacketContainer(PacketContainer* pktContainer){
59

  
60
        int i;
61
        for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
62
        free(pktContainer->iov);
63
        free(pktContainer->socketaddr);
64
        free(pktContainer);
65
}
66

  
67
int addPacketTXqueue(PacketContainer *packet) {
68
	if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
69
		fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
70
		destroyPacketContainer(packet);
71
		return THROTTLE;
72
	}
73
	TXqueue.size += packet->pktLen;	
74

  
75
	if (TXqueue.head == NULL) {			//adding first element
76
		TXqueue.head = packet;
77
		TXqueue.tail = packet;		
78
	} else {					//adding at the end of the queue
79
		TXqueue.tail->next = packet;
80
		TXqueue.tail = packet;
81
	}
82
	
83
	//logQueueOccupancy();
84
	return OK;
85
}
86

  
87
PacketContainer* takePacketToSend() {			//returns pointer to packet or NULL if queue is empty
88
	if (TXqueue.head != NULL) {
89
		PacketContainer *packet = TXqueue.head;
90
		
91
		TXqueue.size -= packet->pktLen;
92
		TXqueue.head = TXqueue.head->next;
93
		packet->next = NULL;
94
					
95
		return packet;
96
	}
97
	else return NULL;	
98
}
99

  
100

  
101
void addPacketRTXqueue(PacketContainer *packet) {
102
	//removing old packets - because of maxTimeToHold
103
	struct timeval now, age;
104
	gettimeofday(&now, NULL);
105
 
106
	PacketContainer *tmp = RTXqueue.head;
107
	while (tmp != NULL) {
108
		age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
109
		age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
110

  
111
		if (age.tv_sec > maxTimeToHold.tv_sec) {
112

  
113
			tmp = tmp->next;
114
			removeOldestPacket();
115
		}
116
		else break;
117
	}
118

  
119

  
120
	if ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
121
		removeOldestPacket();
122
	}
123

  
124
	//adding timeStamp
125
	gettimeofday(&(packet->timeStamp), NULL);
126

  
127
	//finally - adding packet
128
	RTXqueue.size += packet->pktLen;
129

  
130
	if (RTXqueue.head == NULL) {			//adding first element
131
		RTXqueue.head = packet;
132
		RTXqueue.tail = packet;
133
		return;	
134
	} else {					//adding at the end of the queue
135
		RTXqueue.tail->next = packet;
136
		RTXqueue.tail = packet;
137
		return;
138
	}
139
}
140

  
141
int removeOldestPacket() {			//return 0 if success, else (queue empty) return 1
142
	if (RTXqueue.head != NULL) {
143
		RTXqueue.size -= RTXqueue.head->pktLen;
144
		PacketContainer *pointer = RTXqueue.head;		
145
		RTXqueue.head = RTXqueue.head->next;
146
		destroyPacketContainer(pointer);
147
		return 0;
148
	}
149
	return 1;	
150
}
151

  
152
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) {
153
	TXmaxSize = TXsize;
154
	RTXmaxSize = RTXsize;
155
	maxTimeToHold.tv_sec = (int)floor(maxTTHold);
156
	maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
157
}
158

  
159
int isQueueEmpty() {
160
	
161
	if (TXqueue.head == NULL) return 1;
162
	return 0;
163
}
164

  
165
int getFirstPacketSize() {
166

  
167
	if (TXqueue.head != NULL) return TXqueue.head->pktLen;
168
	return 0;
169
}
170

  
171
void logQueueOccupancy() {
172
	struct timeval now;
173
	gettimeofday(&now, NULL);
174
	double time = now.tv_sec + ((double) now.tv_usec)/1000000;
175
	file = fopen("./queue.txt", "a+");
176
	//char *str;
177
	//sprintf(str,"%f",time); 
178
	//fputs(str, p);
179
	fprintf(file,"%f \t %d \n",time, TXqueue.size);	
180
	fprintf(stderr,"[queueManagement::logQueueOccupancy] -- Time: %f \t queueOccupancy: %d \n", time, TXqueue.size);
181
	fclose(file);
182
}
ml/util/queueManagement.h
1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10

  
11
#include <stdio.h>
12
#include <stdlib.h>
13

  
14
#include <unistd.h>
15

  
16
#include <netinet/in.h>
17

  
18

  
19
typedef struct PktContainer {
20
	int udpSocket; 
21
	struct iovec *iov; 
22
	int iovlen; 
23
	struct sockaddr_in *socketaddr;
24

  
25
	int pktLen;		//kB
26
	struct timeval timeStamp;
27
	struct PktContainer *next;
28
	unsigned char priority;
29
} PacketContainer;
30

  
31

  
32
typedef struct PktQueue { 
33
	PacketContainer *head;
34
	PacketContainer *tail;
35

  
36
	int size;		//kB
37

  
38
} PacketQueue;
39

  
40

  
41
PacketContainer* createPacketContainer (const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior);
42

  
43
int addPacketTXqueue(PacketContainer *packet);
44

  
45
PacketContainer* takePacketToSend();
46

  
47
void addPacketRTXqueue(PacketContainer *packet);
48

  
49
int removeOldestPacket() ;
50

  
51
void setQueueSizes (int TXsize, int RTXsize);
52

  
53
int isQueueEmpty();
54

  
55
int getFirstPacketSize();
56

  
57
void setQueuesParams (int TXsize, int RTXsize, double maxTimeToHold);
ml/util/rateControl.c
1
/*
2
 *          Policy Management
3
 *
4
 *
5
 * This software was created by arpad.bakay@netvisor.hu
6
 *
7
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
8
 */
9

  
10
#include <sys/time.h>
11

  
12
#include "udpSocket.h"
13

  
14
static long bucket_size = 0;
15
static int drain_rate = 0;
16

  
17
static long bytes_in_bucket = 0;
18
struct timeval bib_when = { 0, 0};
19

  
20
int outputRateControl(int len) {
21
   struct timeval now;
22
   gettimeofday(&now, NULL);
23
   if(drain_rate <= 0) {
24
      bytes_in_bucket = 0;
25
      bib_when = now;
26
      return OK;
27
   }
28
   else {
29
       int total_drain_secs = bytes_in_bucket / drain_rate + 1; 
30
       if(now.tv_sec - bib_when.tv_sec - 1 < total_drain_secs) {
31
           bytes_in_bucket = 0;
32
       }
33
       else {
34
          long leaked = drain_rate * 1024 * (now.tv_sec - bib_when.tv_sec);
35
          leaked += drain_rate * (now.tv_usec - bib_when.tv_usec) / (1000000 / 1024); 
36
	  if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
37
          else bytes_in_bucket -= leaked;
38
       }
39
       bib_when = now;
40
       if(bytes_in_bucket + len <= bucket_size) {
41
              bytes_in_bucket += len;
42
              return OK;
43
       }
44
       else  return THROTTLE;
45
   }
46
}
47

  
48
void setOutputRateParams(int bucketsize, int drainrate) {
49
     bucket_size = bucketsize * 1024;
50
     outputRateControl(0);
51
     drain_rate = drainrate; 
52
}
53

  
54

  
ml/util/rateLimiter.c
1
/*
2
 *          
3
 *
4
 *	upgraded rateControl - token bucket
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10
//#include <sys/time.h>
11

  
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

  
17
#include "udpSocket.h"
18
#include "rateLimiter.h"
19
#include "queueManagement.h"
20

  
21

  
22
extern struct event_base *base;
23
static long bucket_size = 0;
24
static int drain_rate = 0;
25

  
26
static long bytes_in_bucket = 0;
27
struct timeval bib_then = { 0, 0};
28

  
29
void planFreeSpaceInBucketEvent();
30

  
31
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
32

  
33
	/*struct timeval test;
34

  
35
	gettimeofday(&test,NULL);
36

  
37
	int us;
38

  
39
	if(test.tv_usec > (int) arg)
40
		us = test.tv_usec - (int) arg;
41
	else
42
		us = 1000000 + test.tv_usec - (int) arg;
43

  
44
	fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
45

  
46
	while(outputRateControl(getFirstPacketSize()) == OK) {	
47

  
48

  
49
		PacketContainer* packet = takePacketToSend();
50

  
51
		if (packet == NULL) return;
52

  
53
		struct timeval now;
54
   		gettimeofday(&now, NULL);
55
		bib_then = now;
56

  
57
		if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
58

  
59
		sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
60
	}
61

  
62
	if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
63
	
64
	return;
65
}
66

  
67
void planFreeSpaceInBucketEvent(int bytes) {		//plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
68
	
69
	int rate = getDrainRate();
70

  
71
	int TXtime_sec = bytes / rate;			//seconds
72
	int TXtime_usec = ((bytes - (TXtime_sec * rate) ) * 1000000) / rate;		//microseconds
73

  
74
	struct timeval TXtime = {TXtime_sec, TXtime_usec};	//time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
75

  
76
	//struct timeval test;
77

  
78
	//gettimeofday(&test,NULL);
79

  
80
	//fprintf(stderr,"Planing event for one packet in: %d microseconds\n",TXtime_usec);
81
	struct event *ev;
82
	ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
83
	event_add(ev,&TXtime);
84
}
85

  
86
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
87
{
88
	PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
89

  
90
	if(!(priority & HP)) {
91
		if (!isQueueEmpty()) {						//some packets are already waiting, "I am for sure after them"
92
			return addPacketTXqueue(newPacket);
93
		}	
94
		else if(outputRateControl(newPacket->pktLen) != OK) {			//queue is empty, not enough space in bucket - "I will be first in the queue"
95
			planFreeSpaceInBucketEvent(newPacket->pktLen);		//when there will be enough space in the bucket for the first packet from the queue
96
			return addPacketTXqueue(newPacket);
97
		}
98
	}
99
	if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
100
	
101
	return sendPacket(udpSocket, iov, 4, socketaddr);
102
}
103

  
104
void setOutputRateParams(int bucketsize, int drainrate) {			//given in kBytes and kBytes/s
105
     bucket_size = bucketsize*1024;		//now it is in Bytes
106
     outputRateControl(0);
107
     drain_rate = drainrate; 
108
}
109

  
110
int getDrainRate () {			//in Bytes!!!!!!!!!!
111
	return drain_rate * 1024;
112
}
113

  
114
int outputRateControl(int len) {
115
   struct timeval now;
116
   gettimeofday(&now, NULL);
117
   if(drain_rate <= 0) {
118
      bytes_in_bucket = 0;
119
      bib_then = now;
120
      return OK;
121
   }
122
   else {   
123

  
124
	long leaked;
125
	int total_drain_secs = bytes_in_bucket / (drain_rate *1024) + 1; 
126
       if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
127
           bytes_in_bucket = 0;
128
       }
129
       else {
130
          leaked = drain_rate * 1024 * (now.tv_sec - bib_then.tv_sec);
131
          leaked += drain_rate * 1024 * (now.tv_usec - bib_then.tv_usec) / 1000000; 
132
	  if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
133
          else bytes_in_bucket -= leaked;
134
       }
135

  
136
       bib_then = now;
137
       if(bytes_in_bucket + len <= bucket_size) {
138
              bytes_in_bucket += len;
139
              return OK;
140
       }
141
       else { 
142
		return THROTTLE;
143
	}
144
   }
145
}
146

  
147

  
ml/util/rateLimiter.h
1
/*
2
 *          
3
 *
4
 *	upgraded rateControl - token bucket
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10
//#include <sys/time.h>
11

  
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

  
17

  
18
#define HP 1
19
#define NO_RTX 2
20

  
21
void planFreeSpaceInBucketEvent();
22

  
23
void freeSpaceInBucket_cb (int fd, short event,void *arg);
24

  
25
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority);
26

  
ml/util/udpSocket.c
19 19
 * AND FITNESS FOR A PARTICULAR PURPOSE AND THE WARRANTY AGAINST LATENT
20 20
 * DEFECTS, WITH RESPECT TO THE PROGRAM AND THE ACCOMPANYING
21 21
 * DOCUMENTATION.
22
 /*
22
 *
23 23
 * No Liability For Consequential Damages IN NO EVENT SHALL NEC Europe
24 24
 * Ltd., NEC Corporation OR ANY OF ITS SUBSIDIARIES BE LIABLE FOR ANY
25 25
 * DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS
......
74 74
#define LOG_MODULE "[ml] "
75 75
#include "ml_log.h"
76 76

  
77
#include "rateLimiter.h"
78
#include "queueManagement.h"
79

  
77 80
/* debug varible: set to 1 if you want debug output  */
78 81
int verbose = 0;
79 82

  
......
191 194
{
192 195
	int error, ret;
193 196
	struct msghdr msgh;
194
        
195
        if(outputRateControl(len) != OK) return THROTTLE;
196 197

  
197 198
	msgh.msg_name = socketaddr;
198 199
	msgh.msg_namelen = sizeof(struct sockaddr_in);

Also available in: Unified diff