Revision 6575ae37

View differences:

Makefile.am
5 5
INCLUDES = -I$(top_srcdir)/include/  #-I$(top_srcdir)/dclog
6 6
noinst_LIBRARIES = libml.a
7 7

  
8
libml_a_SOURCES = BUGS.txt ml.c util/stun.c \
9
	util/udpSocket.c util/rateControl.c
10
# transmissionHandler.c
8
libml_a_SOURCES = BUGS.txt ml.c ml_log.c util/stun.c \
9
	util/udpSocket.c util/rateLimiter.c util/queueManagement.c
11 10

  
12 11
# testMessaginglayer: echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c
13
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c util/rateControl.c ${EVENT} ${MATH} ${GDB}
12
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c ${EVENT} ${MATH} ${GDB}
14 13

  
15 14
#libml_a_LIBADD = $(top_builddir)/dclog/libdclog.a
include/ml.h
51 51
#include <stdint.h>
52 52
#include <sys/time.h>
53 53

  
54

  
55 54
/**
56 55
 * @brief The size of a socketID
57 56
 */
......
262 261
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg);
263 262

  
264 263
/**
265
  * Configure the parameters for output rate control.
266
  * These values may also be set while packets are being transmitted.
267
  * @param bucketsize The size of the bucket in kbytes
268
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
264
  * Configure the verbosity of messages
265
  * @param log_level [0-4] the lower the less messages are printed out
269 266
*/
270
void mlSetThrottle(int bucketsize, int drainrate);
267
void mlSetVerbosity(int log_level);
271 268

  
272 269
/**
273 270
 * @brief Register a received packet callback.
......
517 514
 */
518 515
int mlGetPathMTU(int ConnectionId);
519 516

  
517

  
518

  
519
/**
520
  * Configure the parameters for output rate control.
521
  * @param bucketsize The size of the bucket in kbytes
522
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
523
  * @param maxQueueSize In kbytes. Max data stored while limiting the output rate. If 0 packets limitted by drainrate are dropped.
524
  * @param maxQueueSizeRTX In kbytes. Max data waiting for the retransmission if needed.
525
  * @param maxTimeToHold. Time for which sent packets are stored in RTX queue in seconds.
526
*/
527
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold);
528

  
529

  
520 530
#ifdef __cplusplus
521 531
}
522 532
#endif
ml.c
32 32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33 33
 */
34 34

  
35
#include <arpa/inet.h>
36
#ifndef WIN32
37
#include <netinet/in.h>
38
#include <sys/socket.h>
39
#endif
40
#include <fcntl.h>
41
#include <event2/event.h>
42
#include <stdlib.h>
43
#include <unistd.h>
44
#include <stdio.h>
45
#include <stddef.h>
46
#include <stdint.h>
47
#include <string.h>
48
#include <sys/types.h>
49
#include <arpa/inet.h>
50
#include <netdb.h>
51
#include <errno.h>
52
#include <time.h>
53
#include <math.h>
54
#include <assert.h>
55

  
56
#include "util/udpSocket.h"
57
#include "util/stun.h"
58
#include "transmissionHandler.h"
59

  
60
#define LOG_MODULE "[ml] "
61
#include "ml_log.h"
35
#include <ml_all.h>
62 36

  
63 37
/**************************** START OF INTERNALS ***********************/
64 38

  
......
95 69
/*
96 70
 * timeout before thinking of an mtu problem (check MAX_TRIALS as well)
97 71
 */
98
#define PMTU_TIMEOUT { 0, 500000 }
72
#define PMTU_TIMEOUT 500000 // in usec
99 73

  
100 74
/*
101 75
 * retry sending connection messages this many times before reducing pmtu
......
107 81
 */
108 82
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
109 83

  
84
#ifdef RTX
85
/*
86
 * default timeout value for a packet reception
87
 */
88
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
89

  
90
/*
91
 * default timeout value for a packet reception
92
 */
93
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
94

  
95
/*
96
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
97
 */
98
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
99

  
100
#endif
101

  
102

  
110 103
/*
111 104
 * global variables
112 105
 */
106

  
113 107
/*
114 108
 * define a buffer of pointers to connect structures
115 109
 */
......
183 177
/*
184 178
 * helper function to get rid of a warning
185 179
 */
180
#ifndef WIN32
186 181
int min(int a, int b) {
187 182
	if (a > b) return b;
188 183
	return a;
189 184
}
185
#endif
186

  
187
#ifdef RTX
188
//*********Counters**********
189

  
190
struct Counters {
191
	unsigned int receivedCompleteMsgCounter;
192
	unsigned int receivedIncompleteMsgCounter;
193
	unsigned int receivedDataPktCounter;
194
	unsigned int receivedRTXDataPktCounter;
195
	unsigned int receivedNACK1PktCounter;
196
	unsigned int receivedNACKMorePktCounter;
197
	unsigned int sentDataPktCounter;
198
	unsigned int sentRTXDataPktCtr;
199
	unsigned int sentNACK1PktCounter;
200
	unsigned int sentNACKMorePktCounter;
201
} counters;
202

  
203
extern unsigned int sentRTXDataPktCounter;
204

  
205
/*
206
 * receive timeout for a packet
207
 */
208
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
209

  
210

  
211
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
212

  
213
void mlShowCounters() {
214
	counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
215
	fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
216
	return;
217
}
218

  
219
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
220
{
221
	struct nack_msg *nackmsg;
222
	
223
	msgbuf += msg_h->len_mon_data_hdr;
224
	msg_size -= msg_h->len_mon_data_hdr;
225
	nackmsg = (struct nack_msg*) msgbuf;
226
	
227
	unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
228
	//if (gapSize == 1349) counters.receivedNACK1PktCounter++;
229
	//else counters.receivedNACKMorePktCounter++;
230

  
231
	rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);	
232
}
233

  
234
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams);
235

  
236
void pkt_recv_timeout_cb(int fd, short event, void *arg){
237
	int recv_id = (long) arg;
238
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
239

  
240
	//check if message still exists	
241
	if (recvdatabuf[recv_id] == NULL) return;
242

  
243
	//check if gap was filled in the meantime
244
	if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
245
		recvdatabuf[recv_id]->firstGap++;
246
		return;	
247
	}
248

  
249
	struct nack_msg nackmsg;
250
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
251
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
252
	nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
253
	nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
254
	recvdatabuf[recv_id]->firstGap++;
255

  
256
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
257

  
258
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
259
}
260

  
261
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
262
	int recv_id = (long) arg;
263
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
264

  
265
	if (recvdatabuf[recv_id] == NULL) {
266
		return;
267
	}
268

  
269
	if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
270

  
271
	struct nack_msg nackmsg;
272
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
273
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
274
	nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
275
	nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
276

  
277
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
278

  
279
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
280
}
281

  
282
#endif
283

  
284

  
190 285

  
191 286
/*
192 287
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
......
227 322
	return stun_server.sin_addr.s_addr != INADDR_NONE;
228 323
}
229 324

  
230
void send_msg(int con_id, int msg_type, char* msg, int msg_len, bool truncable, send_params * sParams) {
325
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams) {
231 326
	socketaddrgen udpgen;
232 327
	bool retry;
233 328
	int pkt_len, offset;
......
302 397
			msg_h.offset = htonl(offset);
303 398
			msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
304 399

  
305
			//monitoring layer hook
306
			if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
307
				mon_pkt_inf pkt_info;
308

  
309
				memset(h_pkt,0,MON_PKT_HEADER_SPACE);
310

  
311
				pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
312
				pkt_info.buffer = msg + offset;
313
				pkt_info.bufSize = pkt_len;
314
				pkt_info.msgtype = msg_type;
315
				pkt_info.dataID = connectbuf[con_id]->seqnr;
316
				pkt_info.offset = offset;
317
				pkt_info.datasize = msg_len;
318
				pkt_info.monitoringHeaderLen = iov[1].iov_len;
319
				pkt_info.monitoringHeader = iov[1].iov_base;
320
				pkt_info.ttl = -1;
321
				memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
322

  
323
				(get_Send_pkt_inf_cb) ((void *) &pkt_info);
324
			}
325 400

  
326 401
			debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
327
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
402
			int priority = 0; 
403
			if ((msg_type == ML_CON_MSG)
404
#ifdef RTX
405
 || (msg_type == ML_NACK_MSG)
406
#endif
407
) priority = HP;
408
			//fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
409
			switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
328 410
				case MSGLEN:
329 411
					info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
330 412
					// TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
......
340 422
					offset = msg_len; // exit the while
341 423
					break;
342 424
                                case THROTTLE:
343
                                    //    debug("THROTTLE on output"); 
425
                                        debug("THROTTLE on output"); 
344 426
					offset = msg_len; // exit the while
345 427
					break;
346 428
				case OK:
429
#ifdef RTX
430
					if (msg_type < 127) counters.sentDataPktCounter++;
431
#endif
347 432
					//update
348 433
					offset += pkt_len;
349 434
					//transmit data header only in the first packet
......
352 437
			}
353 438
		} while(offset != msg_len && !truncable);
354 439
	} while(retry);
440
	//fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
441
	//fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
355 442
}
356 443

  
357 444
void pmtu_timeout_cb(int fd, short event, void *arg);
358 445

  
446
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
447
	//monitoring layer hook
448
debug("SENDP1");
449
	if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
450
		mon_pkt_inf pkt_info;	
451

  
452
		struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
453

  
454
		memset(iov[1].iov_base,0,iov[1].iov_len);
455

  
456
		pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
457
		pkt_info.buffer = iov[3].iov_base;
458
		pkt_info.bufSize = iov[3].iov_len;
459
		pkt_info.msgtype = msg_h->msg_type;
460
		pkt_info.dataID = ntohl(msg_h->msg_seq_num);
461
		pkt_info.offset = ntohl(msg_h->offset);
462
		pkt_info.datasize = ntohl(msg_h->msg_length);
463
		pkt_info.monitoringHeaderLen = iov[1].iov_len;
464
		pkt_info.monitoringHeader = iov[1].iov_base;
465
		pkt_info.ttl = -1;
466
		memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
467

  
468
		(get_Send_pkt_inf_cb) ((void *) &pkt_info);
469
	}
470

  
471
 	//struct msg_header *msg_h;
472
    //msg_h = (struct msg_header *) iov[0].iov_base;        
473

  
474
	//fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
475
debug("SENDP2");
476

  
477
	return sendPacketFinal(udpSocket, iov, len, socketaddr);
478
}
479

  
359 480
void reschedule_conn_msg(int con_id)
360 481
{
361 482
	if (connectbuf[con_id]->timeout_event) {
......
390 511
	msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
391 512

  
392 513
	memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
393

  
514
  {
515
                        char buf[SOCKETID_STRING_SIZE];
516
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
517
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
518
   }
394 519
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
395 520
}
396 521

  
397 522
void send_conn_msg_with_pmtu_discovery(int con_id, int buf_size, int command_type)
398 523
{
399
	struct timeval tout = PMTU_TIMEOUT;
524
	struct timeval tout = {0,0};
525
	tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
400 526
	connectbuf[con_id]->timeout_value = tout;
401 527
	connectbuf[con_id]->trials = 1;
402 528
	send_conn_msg(con_id, buf_size, command_type);
......
504 630
				connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
505 631
				connectbuf[free_con_id]->starttime = time(NULL);
506 632
				memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
633
		//Workaround to support reuse of socketID
634
				connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
635
				connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
507 636
				connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;	// bootstrap pmtu from the other's size. Not strictly needed, but a good hint
508 637
				connectbuf[free_con_id]->timeout_event = NULL;
509 638
				connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
......
625 754
		NAT_traversal = true;
626 755
		// callback to the upper layer indicating that the socketID is now
627 756
		// ready to use
757
		{
758
                	char buf[SOCKETID_STRING_SIZE];
759
                	mlSocketIDToString(&local_socketID,buf,sizeof(buf));
760
 			debug("received local socket_address: %s\n", buf);
761
		}
628 762
		(receive_SocketID_cb) (&local_socketID, 0);
629 763
	}
630 764
}
......
689 823
			&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
690 824
		rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
691 825

  
692
// 		(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
693
// 			recvdatabuf[recv_id]->msgtype, &rParams);
826
#ifdef RTX
827
		counters.receivedIncompleteMsgCounter++;
828
		//mlShowCounters();
829
		//fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);		
830
#endif
831
 		//(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
694 832

  
695 833
		//clean up
696 834
		if (recvdatabuf[recv_id]->timeout_event) {
......
707 845
// process a single recv data message
708 846
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
709 847
{
710
	debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset);
848
	debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d inlength: %d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset, msg_h->msg_length);
711 849

  
712 850
	int recv_id, free_recv_id = -1;
713 851

  
......
716 854
		return;
717 855
	}
718 856

  
857
#ifdef RTX
858
	counters.receivedDataPktCounter++;
859
#endif	
719 860
	// check if a recv_data exist and enter data
720
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
861
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++) {
721 862
		if (recvdatabuf[recv_id] != NULL) {
722 863
			if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID &&
723 864
					msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
......
725 866
		} else
726 867
			if(free_recv_id == -1)
727 868
				free_recv_id = recv_id;
728

  
869
  }
729 870

  
730 871
	if(recv_id == RECVDATABUFSIZE) {
872
		debug(" recv id not found (free found: %d)\n", free_recv_id);
731 873
		//no recv_data found: create one
732 874
		recv_id = free_recv_id;
733 875
		recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
......
738 880
		recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
739 881
		recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
740 882
		recvdatabuf[recv_id]->arrivedBytes = 0;	//count this without the Mon headers
883
#ifdef RTX
884
		recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
885
		recvdatabuf[recv_id]->expectedOffset = 0;
886
		recvdatabuf[recv_id]->gapCounter = 0;
887
		recvdatabuf[recv_id]->firstGap = 0;
888
		recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
889
#endif
890

  
741 891
		/*
742 892
		* read the timeout data and set it
743 893
		*/
......
756 906

  
757 907
	//if first packet extract mon data header and advance pointer
758 908
	if (msg_h->offset == 0) {
909
		//fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
759 910
		memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
760 911
		msgbuf += msg_h->len_mon_data_hdr;
761 912
		bufsize -= msg_h->len_mon_data_hdr;
......
766 917
	// increment fragmentnr
767 918
	recvdatabuf[recv_id]->recvFragments++;
768 919
	// increment the arrivedBytes
769
	recvdatabuf[recv_id]->arrivedBytes += bufsize;
920
	recvdatabuf[recv_id]->arrivedBytes += bufsize; 
921

  
922
	//fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
770 923

  
771 924
	// enter the data into the buffer
772 925
	memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
926
#ifdef RTX
927
	// detecting a new gap	
928
	if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
929
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
930
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
931
		if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
932
		evtimer_add(event_new(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id), &pkt_recv_timeout);
933
	}
934
	
935
	//filling the gap by delayed packets
936
	if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
937
		counters.receivedRTXDataPktCounter++;
938
		//skip retransmitted packets
939
		if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
940
			int i;
941
			//fprintf(stderr,"firstGap: %d	gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
942
			for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
943
				if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
944
					recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
945
					break;
946
				}
947
				if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
948
					recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
949
					break;
950
				}
951
			}
952
		} else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
953
			//counters.receivedRTXDataPktCounter++;
954
			}
955
	}
956

  
957
	//updating the expectedOffset	
958
	if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
959
#endif
773 960

  
774 961
	//TODO very basic checkif all fragments arrived: has to be reviewed
775 962
	if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen)
......
822 1009
				debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
823 1010
				rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
824 1011

  
1012
#ifdef RTX
1013
				counters.receivedCompleteMsgCounter++;
1014
				//mlShowCounters();
1015
#endif
1016

  
825 1017
				(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
826 1018
					recvdatabuf[recv_id]->msgtype, (void *) &rParams);
827 1019
			} else {
828 1020
			    warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
829 1021
			}
830

  
1022
			
831 1023
			//clean up
832 1024
			if (recvdatabuf[recv_id]->timeout_event) {
833 1025
				debug("ML: freeing timeout for %d",recv_id);
......
837 1029
			} else {
838 1030
				debug("ML: received in 1 packet\n",recv_id);
839 1031
			}
1032
#ifdef RTX
1033
			if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1034
				debug("ML: freeing last packet timeout for %d",recv_id);
1035
				event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1036
				event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1037
				recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1038
			}
1039
			//fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);	
1040
#endif
840 1041
			free(recvdatabuf[recv_id]->recvbuf);
841 1042
			free(recvdatabuf[recv_id]);
842 1043
			recvdatabuf[recv_id] = NULL;
......
846 1047
				//TODO make timeout at least a DEFINE
847 1048
				recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
848 1049
				evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1050
#ifdef RTX
1051
				recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1052
				evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1053
#endif
849 1054
			}
850 1055
		}
851 1056
	}
......
889 1094

  
890 1095
	if(connectbuf[con_id]->trials == MAX_TRIALS) {
891 1096
		// decrement the pmtu size
892
		struct timeval tout = PMTU_TIMEOUT;
1097
		struct timeval tout = {0,0};
1098
		tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
893 1099
		info("\tML: decreasing pmtu estimate from %d to %d\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize));
894 1100
		connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
895 1101
		connectbuf[con_id]->timeout_value = tout; 
......
897 1103
	}
898 1104

  
899 1105
	//error in PMTU discovery?
900
	if (connectbuf[con_id]->pmtusize == ERROR) {
1106
	if (connectbuf[con_id]->pmtusize == P_ERROR) {
901 1107
		if (connectbuf[con_id]->internal_connect == true) {
902 1108
			//as of now we tried directly connecting, now let's try trough the NAT
903 1109
			connectbuf[con_id]->internal_connect = false;
......
922 1128
int schedule_pmtu_timeout(int con_id)
923 1129
{
924 1130
	if (! connectbuf[con_id]->timeout_event) {
925
		struct timeval tout = PMTU_TIMEOUT;
1131
		struct timeval tout = {0,0};
1132
		tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
926 1133
		connectbuf[con_id]->timeout_value = tout;
927 1134
		connectbuf[con_id]->trials = 1;
928 1135
		connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
......
950 1157
	case BELOWDSL:
951 1158
		return MIN;
952 1159
	case MIN:
953
		return ERROR;
1160
		return P_ERROR;
954 1161
	default:
955 1162
		warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
956 1163
		return MIN;
......
1011 1218

  
1012 1219
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1013 1220

  
1014
    if (new_pmtusize == ERROR) {
1221
    if (new_pmtusize == P_ERROR) {
1015 1222
		error("ML:  Could not create connection with connectionID %i !\n",
1016 1223
			connectionID);
1017 1224

  
......
1126 1333
	unsigned short stun_bind_response = 0x0101;
1127 1334
	unsigned short * msgspot = (unsigned short *) msgbuf;
1128 1335
	if (*msgspot == stun_bind_response) {
1129
		debug("ML: recv_pkg: parse stun message called\n");
1336
		debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1130 1337
		recv_stun_msg(msgbuf, recvSize);
1131 1338
		return;
1132 1339
	}
1133 1340

  
1134 1341
	msg_h = (struct msg_header *) msgbuf;
1135 1342

  
1343
        uint32_t inlen = ntohl(msg_h->msg_length);
1344
        if(inlen > 0x20000 || inlen < ntohl(msg_h->offset) || inlen == 0) {
1345
            warn("ML: BAD PACKET received from: %s:%d (len: %d < %d [=%08X] o:%d)", 
1346
                  inet_ntoa(recv_addr.sin_addr), recv_addr.sin_port,
1347
                               recvSize, inlen, inlen, ntohl(msg_h->offset));
1348
 warn("ML: received %d: %02X %02X %02X %02X %02X %02X - %02X %02X %02X %02X %02X %02X", recvSize,
1349
            msgbuf[0], msgbuf[1],msgbuf[2],msgbuf[3],msgbuf[4],msgbuf[5],
1350
            msgbuf[6],msgbuf[7],msgbuf[8],msgbuf[9],msgbuf[10],msgbuf[11]);
1351

  
1352
            return;
1353
       }
1354

  
1136 1355
	/* convert header from network to host order */
1137 1356
	msg_h->offset = ntohl(msg_h->offset);
1138 1357
	msg_h->msg_length = ntohl(msg_h->msg_length);
......
1166 1385
			struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1167 1386
			msginfNow.remote_socketID = &(c_msg->sock_id);
1168 1387
		}
1169
		else if(connectbuf[msg_h->remote_con_id] == NULL) {
1388
                else if(msg_h->remote_con_id < 0 || 
1389
                       msg_h->remote_con_id >= CONNECTBUFSIZE || 
1390
                       connectbuf[msg_h->remote_con_id] == NULL) {
1170 1391
			error("ML: received pkg called with non existent connection\n");
1171 1392
			return;
1172 1393
		} else
......
1188 1409
			debug("ML: received conn pkg\n");
1189 1410
			recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1190 1411
			break;
1412
#ifdef RTX
1413
		case ML_NACK_MSG:
1414
			debug("ML: received nack pkg\n");
1415
			recv_nack_msg(msg_h, bufptr, msg_size);
1416
			break;
1417
#endif
1191 1418
		default:
1192 1419
			if(msg_h->msg_type < 127) {
1193 1420
				debug("ML: received data pkg\n");
......
1217 1444
 */
1218 1445
void nat_traversal_timeout(int fd, short event, void *arg)
1219 1446
{
1447
debug("X. NatTrTo %d\n", NAT_traversal);
1220 1448
	if (NAT_traversal == false) {
1221 1449
		debug("ML: NAT traversal request re-send\n");
1222 1450
		if(receive_SocketID_cb)
1223 1451
			(receive_SocketID_cb) (&local_socketID, 2);
1224 1452
		try_stun();
1225 1453
	}
1454
debug("X. NatTrTo\n");
1226 1455
}
1227 1456

  
1228 1457
//return IP address, or INADDR_NONE if can't resolve
......
1246 1475
{
1247 1476
	struct sockaddr_in udpaddr = {0};
1248 1477
	udpaddr.sin_family = AF_INET;
1478
        debug("X. create_socket %s, %d\n", ipaddr, port);
1249 1479
	if (ipaddr == NULL) {
1250 1480
		/*
1251 1481
		* try to guess the local IP address
......
1320 1550

  
1321 1551
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1322 1552

  
1553
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1323 1554
	base = (struct event_base *) arg;
1324 1555
	recv_data_callback = recv_data_cb;
1325 1556
	mlSetRecvTimeout(timeout_value);
......
1329 1560

  
1330 1561
	}
1331 1562
	register_recv_localsocketID_cb(local_socketID_cb);
1563
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1332 1564
	return create_socket(port, ipaddr);
1333 1565
}
1334 1566

  
1335
void mlSetThrottle(int bucketsize, int drainrate) {
1567
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1336 1568
        setOutputRateParams(bucketsize, drainrate);
1569
	setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1337 1570
}
1338 1571
     
1572
void mlSetVerbosity (int log_level) {
1573
	setLogLevel(log_level);
1574
}
1339 1575

  
1340 1576
/* register callbacks  */
1341 1577
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
......
1396 1632
void mlSetRecvTimeout(struct timeval timeout_value){
1397 1633

  
1398 1634
	recv_timeout = timeout_value;
1399

  
1635
#ifdef RTX
1636
	unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1637
	total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1638
	last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1639
	last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1640
	fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);	
1641
	fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);	
1642
#endif
1400 1643
}
1401 1644

  
1402 1645
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){
ml_all.h
1
#include <stdlib.h>
2
#include <unistd.h>
3
#include <stdio.h>
4
#include <stddef.h>
5
#include <stdint.h>
6
#include <string.h>
7
#include <sys/types.h>
8
#include <time.h>
9
#include <math.h>
10
#include <assert.h>
11
#include <errno.h>
12

  
13
#ifndef WIN32
14
#include <arpa/inet.h>
15
#include <netdb.h>
16
#include <netinet/in.h>
17
#include <sys/socket.h>
18
#include <fcntl.h>
19
#else
20

  
21
#include <winsock2.h>
22
#include <ws2tcpip.h>
23
#endif
24

  
25
#include "util/udpSocket.h"
26
#include "util/stun.h"
27
#include "transmissionHandler.h"
28
#include "util/rateLimiter.h"
29
#include "util/queueManagement.h"
30

  
31
#define LOG_MODULE "[ml] "
32
#include "ml_log.h"
33

  
ml_log.c
1
/* Set defualt log level */
2
int ml_log_level = 2; //2 -> WARNING
3

  
4
void setLogLevel(int ll) {
5
	ml_log_level = ll;
6
}
ml_log.h
4 4
#include <stdlib.h>
5 5
#include <stdio.h>
6 6

  
7
//#include "grapes_log.h"
7
extern int ml_log_level;
8 8

  
9
#ifndef _GRAPES_LOG_H
9
extern void setLogLevel(int ll);
10 10

  
11
#define DPRINT(format, ... )  {struct timeval tnow; gettimeofday(&tnow,NULL); fprintf(stderr, "%ld.%03ld "format, tnow.tv_sec, tnow.tv_usec/1000, ##__VA_ARGS__ );}
11
#define DPRINT(ll, format, ... )  {struct timeval tnow; if(ll <= ml_log_level) {gettimeofday(&tnow,NULL); fprintf(stderr, "%ld.%03ld "format, tnow.tv_sec, tnow.tv_usec/1000, ##__VA_ARGS__ );fprintf(stderr,format[strlen(format)-1] == '\n'?"":"\n"); fflush(stderr);}}
12 12

  
13
#define debug(format, ... ) //DPRINT(format, ##__VA_ARGS__ )
13
#define debug(format, ... ) DPRINT(4 ,format, ##__VA_ARGS__ )
14 14
/** Convenience macro to log LOG_INFO messages */
15
#define info(format, ... )  DPRINT(format, ##__VA_ARGS__ )
15
#define info(format, ... )  DPRINT(3, format, ##__VA_ARGS__ )
16 16
/** Convenience macro to log LOG_WARN messages */
17
#define warn(format, ... )  DPRINT(format, ##__VA_ARGS__ )
17
#define warn(format, ... )  DPRINT(2, format, ##__VA_ARGS__ )
18 18
/** Convenience macro to log LOG_ERROR messages */
19
#define error(format, ... )  DPRINT(format, ##__VA_ARGS__ )
19
#define error(format, ... )  DPRINT(1, format, ##__VA_ARGS__ )
20 20
/**  Convenience macro to log LOG_CRITICAL messages and crash the program */
21
#define fatal(format, ... )  { DPRINT(format, ##__VA_ARGS__ ); exit(-1); }
21
#define fatal(format, ... )  { DPRINT(0, format, ##__VA_ARGS__ ); exit(-1); }
22 22

  
23 23
#endif
24
#endif
transmissionHandler.h
46 46
#ifndef TRANSMISSIONHANDLER_H
47 47
#define TRANSMISSIONHANDLER_H
48 48
#include <sys/time.h>
49
#ifndef WIN32
49 50
#include <netinet/in.h>
50 51
#include <sys/socket.h>
52
#include <arpa/inet.h>
53
#include <sys/uio.h>
54
#endif
51 55
#include <fcntl.h>
52 56
#include <event2/event.h>
53 57
#include <sys/types.h>
54
#include <netinet/in.h>
55
#include <arpa/inet.h>
56
#include <sys/uio.h>
57 58
#include "util/udpSocket.h"
58 59
#include "util/stun.h"
59 60
#include "ml.h"
60 61

  
62
#ifndef WIN32
61 63
#ifndef boolean
62 64
typedef bool boolean;
63 65
#endif
66
#endif
67

  
64 68
#ifndef TRUE
65 69
#define TRUE ((bool)1)
66 70
#endif
......
68 72
#define FALSE ((bool)0)
69 73
#endif
70 74

  
75
#ifdef RTX
76
/**
77
 * This is the maximum number of gaps RTX can keep track of inside one message
78
 */
79
#define RTX_MAX_GAPS 25
80

  
81
#define ML_NACK_MSG 128
82
#endif
71 83
/**
72 84
 * This is the maximum size of the monitoring module header that can be added to the messaging layer header
73 85
 */
......
87 99
/**
88 100
 * Defined a mtu size - IP size - UDP size
89 101
 */
90
typedef enum {MAX = 1472, DSL = 1464, DSLMEDIUM = 1422, DSLSLIM = 1372, BELOWDSL = 1172, MIN = 472,ERROR = -1} pmtu;
102
typedef enum {MAX = 1472, DSL = 1464, DSLMEDIUM = 1422, DSLSLIM = 1372, BELOWDSL = 1172, MIN = 472, P_ERROR = -1} pmtu;
91 103

  
92 104
/**
93 105
 * Define connection command types
......
119 131
  socketaddrgen external_addr; ///< external or reflexive address
120 132
} socket_ID;
121 133

  
134
#ifdef RTX
135
struct gap {
136
	int offsetFrom;
137
	int offsetTo;
138
};
139
#endif
140

  
122 141
/**
123 142
  * A struct that contains information about data that is being received
124 143
  */
......
138 157
  struct event *timeout_event; ///< a timeout event
139 158
  struct timeval timeout_value; ///< the value for a libevent timeout
140 159
  time_t starttime; ///< the start time
160
#ifdef RTX
161
  struct event* last_pkt_timeout_event;
162
  int txConnectionID;
163
  int expectedOffset;
164
  int gapCounter; //index of the first "free slot"
165
  int firstGap;	//first gap which hasn't been handled yet (for which the NACK hasn't been sent yet)
166
  struct gap gapArray[RTX_MAX_GAPS];
167
#endif
141 168
} recvdata;
142 169

  
143 170
struct receive_connection_cb_list{
......
172 199

  
173 200
#define ML_CON_MSG 127
174 201

  
202

  
175 203
/**
176 204
 * A struct with the messaging layer header for connection handling messages
177 205
 */
......
183 211
	socket_ID sock_id;	/// the socketId of the sender
184 212
} __attribute__((packed));
185 213

  
214
#ifdef RTX
215
/************modifications-START************/
216

  
217
struct nack_msg {
218
	int32_t con_id;		///local connectionID of the transmitter
219
	int32_t msg_seq_num;
220
	uint32_t offsetFrom;
221
	uint32_t offsetTo;
222
} __attribute__((packed));
223

  
224
/************modifications-END**************/
225
#endif
226

  
186 227
struct msg_header {
187 228
	uint32_t offset;
188 229
	uint32_t msg_length;
......
196 237
#define MSG_HEADER_SIZE (sizeof(struct msg_header))
197 238
#pragma pack(pop)   /* restore original alignment from stack */
198 239

  
240
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr);
241

  
199 242
//
200 243
///**
201 244
// * The init method needs to be called to initialise the transmissionHandler
util/queueManagement.c
1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10

  
11
#include "../ml_all.h"
12

  
13
FILE *file = NULL;
14

  
15
PacketQueue TXqueue;
16
PacketQueue RTXqueue;
17

  
18
unsigned int sentRTXDataPktCounter = 0;
19

  
20
int TXmaxSize = 6000*1500;			//bytes
21
int RTXmaxSize = 6000*1500;			//bytes
22

  
23
struct timeval maxTimeToHold = {5,0};
24

  
25
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
26
	
27
	PacketContainer *packet = malloc(sizeof(PacketContainer));
28
	packet->udpSocket = uSoc;
29
	packet->iovlen = iovlen;
30
	packet->next = NULL;
31
	packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
32

  
33
	packet->priority = prior;
34

  
35
 	int i;
36
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
37
        for (i=0; i<iovlen; i++){
38
                packet->iov[i].iov_len = ioVector[i].iov_len;
39
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
40
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
41
        }
42

  
43
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
44
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
45

  
46
	int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
47
	return packet;
48
}
49

  
50
void destroyPacketContainer(PacketContainer* pktContainer){
51

  
52
	if (pktContainer != NULL){
53
	        int i;
54
        	for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
55
        	free(pktContainer->iov);
56
        	free(pktContainer->socketaddr);
57
        	free(pktContainer);
58
	}
59
}
60

  
61
int addPacketTXqueue(PacketContainer *packet) {
62
	if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
63
		fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
64
		destroyPacketContainer(packet);
65
		return THROTTLE;
66
	}
67
	TXqueue.size += packet->pktLen;	
68

  
69
	if (TXqueue.head == NULL) {			//adding first element
70
		TXqueue.head = packet;
71
		TXqueue.tail = packet;		
72
	} else {					//adding at the end of the queue
73
		TXqueue.tail->next = packet;
74
		TXqueue.tail = packet;
75
	}
76
	
77
	return OK;
78
}
79

  
80
PacketContainer* takePacketToSend() {			//returns pointer to packet or NULL if queue is empty
81

  
82
	if (TXqueue.head != NULL) {
83
		PacketContainer *packet = TXqueue.head;
84
		
85
		TXqueue.size -= packet->pktLen;
86
		TXqueue.head = TXqueue.head->next;
87
		packet->next = NULL;
88
		if (TXqueue.head == NULL) TXqueue.tail = NULL;					
89
		return packet;
90
	}
91
	else return NULL;	
92
}
93

  
94
#ifdef RTX
95
void addPacketRTXqueue(PacketContainer *packet) {
96
	//removing old packets - because of maxTimeToHold
97
	struct timeval now, age;
98
	gettimeofday(&now, NULL);
99
 
100
	PacketContainer *tmp = RTXqueue.head;
101
	while (tmp != NULL) {
102
		age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
103
		age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
104

  
105
		if (age.tv_sec > maxTimeToHold.tv_sec) {
106

  
107
			tmp = tmp->next;
108
			removeOldestPacket();
109
		}
110
		else break;
111
	}
112

  
113
	while ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
114
		removeOldestPacket();
115
	}
116

  
117
	//adding timeStamp
118
	gettimeofday(&(packet->timeStamp), NULL);
119

  
120
	//finally - adding packet
121
	RTXqueue.size += packet->pktLen;
122
	packet->next = NULL;
123

  
124
	if (RTXqueue.head == NULL) {			//adding first element
125
		RTXqueue.head = packet;
126
		RTXqueue.tail = packet;
127
		return;	
128
	} else {					//adding at the end of the queue
129
		RTXqueue.tail->next = packet;
130
		RTXqueue.tail = packet;
131
		return;
132
	}
133
}
134

  
135
int removeOldestPacket() {			//return 0 if success, else (queue empty) return 1
136
	if (RTXqueue.head != NULL) {
137
		RTXqueue.size -= RTXqueue.head->pktLen;
138
		PacketContainer *pointer = RTXqueue.head;		
139
		RTXqueue.head = RTXqueue.head->next;
140
		destroyPacketContainer(pointer);
141
		return 0;
142
	}
143
	return 1;	
144
}
145
#endif
146

  
147
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) { //in bytes, bytes, sexonds
148
	TXmaxSize = TXsize;
149
	RTXmaxSize = RTXsize;
150
	maxTimeToHold.tv_sec = (int)floor(maxTTHold);
151
	maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
152
}
153

  
154
int isQueueEmpty() {
155
	
156
	if (TXqueue.head == NULL) return 1;
157
	return 0;
158
}
159

  
160
int getFirstPacketSize() {
161

  
162
	if (TXqueue.head != NULL) return TXqueue.head->pktLen;
163
	return 0;
164
}
165

  
166
#ifdef RTX
167
PacketContainer* searchPacketInRTX(int connID, int msgSeqNum, int offset) {
168
	
169
	connID = ntohl(connID);
170
	msgSeqNum = ntohl(msgSeqNum);
171
	offset = ntohl(offset);
172

  
173
	//fprintf(stderr,"***************Searching for packet... connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),ntohl(offset));
174

  
175
	PacketContainer *tmp = RTXqueue.head;
176
	PacketContainer *prev = NULL;
177

  
178
	while (tmp != NULL) {
179
		struct msg_header *msg_h;
180
		
181
		msg_h = (struct msg_header *) tmp->iov[0].iov_base;
182

  
183
		//fprintf(stderr,"^^^^^^^^^Searching^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),offset);
184
		//fprintf(stderr,"^^^^^^^^^In queue^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",msg_h->local_con_id,ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
185

  
186
		if ((msg_h->local_con_id == connID) && (msg_h->msg_seq_num == msgSeqNum) && (msg_h->offset == offset)) {
187
		//fprintf(stderr,"*****************Packet found: ConID: %d  MsgseqNum: %d Offset: %d \n", ntohl(connID),ntohl(msgSeqNum),ntohl(offset));
188
			if (tmp == RTXqueue.head) RTXqueue.head = tmp->next;
189
			else if (tmp == RTXqueue.tail) { RTXqueue.tail = prev; prev->next == NULL; }
190
			else prev->next = tmp->next;
191
			RTXqueue.size -= tmp->pktLen;
192
			return tmp;
193
		}
194
		prev = tmp;
195
		tmp = tmp->next;
196
	}
197

  
198
return NULL;
199
}
200

  
201
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo) {
202
        int offset = offsetFrom;
203
        //fprintf(stderr,"\t\t\t\t Retransmission request From: %d To: %d\n",offsetFrom, offsetTo);
204

  
205
        while (offset < offsetTo) {
206
                PacketContainer *packetToRTX = searchPacketInRTX(connID,msgSeqNum,offset);
207
                if (packetToRTX == NULL) return 1;
208

  
209
                //sending packet
210
                //fprintf(stderr,"\t\t\t\t\t Retransmitting packet: %d of msg_seq_num %d.\n",offset/1349,msgSeqNum);
211
                sendPacket(packetToRTX->udpSocket, packetToRTX->iov, 4, packetToRTX->socketaddr);
212
                sentRTXDataPktCounter++;
213
                offset += packetToRTX->iov[3].iov_len;
214
		destroyPacketContainer(packetToRTX);
215
        }
216
        return 0;
217
}
218
#endif
util/queueManagement.h
1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10

  
11
#include <stdio.h>
12
#include <stdlib.h>
13

  
14
#include <unistd.h>
15

  
16
#ifndef WIN32
17
#include <netinet/in.h>
18
#else
19
#include <winsock2.h>
20
#endif
21

  
22
typedef struct PktContainer {
23
	int udpSocket; 
24
	struct iovec *iov; 
25
	int iovlen; 
26
	struct sockaddr_in *socketaddr;
27

  
28
	int pktLen;		//kB
29
	struct timeval timeStamp;
30
	struct PktContainer *next;
31
	unsigned char priority;
32
} PacketContainer;
33

  
34

  
35
typedef struct PktQueue { 
36
	PacketContainer *head;
37
	PacketContainer *tail;
38

  
39
	int size;		//kB
40

  
41
} PacketQueue;
42

  
43

  
44
PacketContainer* createPacketContainer (const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior);
45

  
46
int addPacketTXqueue(PacketContainer *packet);
47

  
48
PacketContainer* takePacketToSend();
49

  
50
int removeOldestPacket() ;
51

  
52
int isQueueEmpty();
53

  
54
int getFirstPacketSize();
55

  
56
void setQueuesParams (int TXsize, int RTXsize, double maxTimeToHold); //in  bytes, bytes, seconds
57

  
58
#ifdef RTX
59
void addPacketRTXqueue(PacketContainer *packet);
60

  
61
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo);
62
#endif
util/rateControl.c
1
/*
2
 *          Policy Management
3
 *
4
 *
5
 * This software was created by arpad.bakay@netvisor.hu
6
 *
7
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
8
 */
9

  
10
#include <sys/time.h>
11

  
12
#include "udpSocket.h"
13

  
14
static long bucket_size = 0;
15
static int drain_rate = 0;
16

  
17
static long bytes_in_bucket = 0;
18
struct timeval bib_when = { 0, 0};
19

  
20
int outputRateControl(int len) {
21
   struct timeval now;
22
   gettimeofday(&now, NULL);
23
   if(drain_rate <= 0) {
24
      bytes_in_bucket = 0;
25
      bib_when = now;
26
      return OK;
27
   }
28
   else {
29
       int total_drain_secs = bytes_in_bucket / drain_rate + 1; 
30
       if(now.tv_sec - bib_when.tv_sec - 1 < total_drain_secs) {
31
           bytes_in_bucket = 0;
32
       }
33
       else {
34
          long leaked = drain_rate * 1024 * (now.tv_sec - bib_when.tv_sec);
35
          leaked += drain_rate * (now.tv_usec - bib_when.tv_usec) / (1000000 / 1024); 
36
	  if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
37
          else bytes_in_bucket -= leaked;
38
       }
39
       bib_when = now;
40
       if(bytes_in_bucket + len <= bucket_size) {
41
              bytes_in_bucket += len;
42
              return OK;
43
       }
44
       else  return THROTTLE;
45
   }
46
}
47

  
48
void setOutputRateParams(int bucketsize, int drainrate) {
49
     bucket_size = bucketsize * 1024;
50
     outputRateControl(0);
51
     drain_rate = drainrate; 
52
}
53

  
54

  
util/rateLimiter.c
1
/*
2
 *          
3
 *
4
 *	upgraded rateControl - token bucket
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10
#include <ml_all.h>
11

  
12
extern struct event_base *base;
13
static long bucket_size = 0;
14
static int64_t drain_rate = 0;
15

  
16

  
17
static long bytes_in_bucket = 0;
18
struct timeval bib_then = { 0, 0};
19

  
20
void planFreeSpaceInBucketEvent();
21

  
22
void freeSpaceInBucket_cb (int fd, short event,void *arg) {
23

  
24
	/*struct timeval test;
25

  
26
	gettimeofday(&test,NULL);
27

  
28
	int us;
29

  
30
	if(test.tv_usec > (int) arg)
31
		us = test.tv_usec - (int) arg;
32
	else
33
		us = 1000000 + test.tv_usec - (int) arg;
34

  
35
	fprintf(stderr,"Event scheduled in: %d microseconds\n",us);*/
36

  
37
	while((!isQueueEmpty()) && (outputRateControl(getFirstPacketSize()) == OK)) {	
38

  
39

  
40
		PacketContainer* packet = takePacketToSend();
41

  
42
		if (packet == NULL) return;
43

  
44
		struct timeval now;
45
   		gettimeofday(&now, NULL);
46
		bib_then = now;
47

  
48
#ifdef RTX
49
		if (!(packet->priority & NO_RTX)) addPacketRTXqueue(packet);
50
#endif
51

  
52
		sendPacket(packet->udpSocket, packet->iov, 4, packet->socketaddr);
53
	}
54

  
55
	if (!isQueueEmpty()) planFreeSpaceInBucketEvent(getFirstPacketSize());
56
	
57
	return;
58
}
59

  
60
void planFreeSpaceInBucketEvent(int bytes) {		//plan the event for time when there will be free space in bucket (for the first packet from the TXqueue)
61
	struct timeval TXtime;
62
	struct event *ev;
63

  
64
	//time needed to send data = firstPacketFromQueue.size, will free space for this packet in the bucket
65
	TXtime.tv_sec = bytes / drain_rate; //seconds
66
	TXtime.tv_usec = (bytes - TXtime.tv_sec * drain_rate) * 1000000 / drain_rate; //microseconds
67

  
68
	ev = evtimer_new(base, freeSpaceInBucket_cb, NULL);
69
	event_add(ev, &TXtime);
70
}
71

  
72
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority)
73
{
74
	PacketContainer *newPacket = (PacketContainer*) createPacketContainer(udpSocket,iov,len,socketaddr,priority);
75

  
76
debug("QUUEO1");
77
debug("QUUEO1");
78
	if(!(priority & HP)) {
79
		if (!isQueueEmpty()) {						//some packets are already waiting, "I am for sure after them"
80
			return addPacketTXqueue(newPacket);
81
		}	
82
		else if(outputRateControl(newPacket->pktLen) != OK) {			//queue is empty, not enough space in bucket - "I will be first in the queue"
83
			planFreeSpaceInBucketEvent(newPacket->pktLen);		//when there will be enough space in the bucket for the first packet from the queue
84
			return addPacketTXqueue(newPacket);
85
		}
86
	}
87
debug("QUUEO1c\n");
88
#ifdef RTX
89
	if (!(priority & NO_RTX)) addPacketRTXqueue(newPacket);
90
#endif
91

  
92
debug("QUUEO2");
93
	return sendPacket(udpSocket, iov, 4, socketaddr);
94
}
95

  
96
void setOutputRateParams(int bucketsize, int drainrate) { //given in Bytes and Bits/s
97
     bucket_size = bucketsize;
98
     outputRateControl(0);
99
     drain_rate = drainrate >> 3; //now in bytes/s
100
}
101

  
102
int outputRateControl(int len) {
103
	struct timeval now;
104
	gettimeofday(&now, NULL);
105

  
106
	if(drain_rate <= 0) {
107
		bytes_in_bucket = 0;
108
		bib_then = now;
109
		return OK;
110
	} else {
111
		unsigned int leaked;
112
		int total_drain_secs = bytes_in_bucket / (drain_rate) + 1;
113

  
114
		if(now.tv_sec - bib_then.tv_sec - 1 > total_drain_secs) {
115
				bytes_in_bucket = 0;
116
		} else {
117
			leaked = drain_rate * (now.tv_sec - bib_then.tv_sec);
118
			leaked += drain_rate * (now.tv_usec - bib_then.tv_usec) / 1000000; 
119
			if(leaked > bytes_in_bucket) bytes_in_bucket = 0;
120
					else bytes_in_bucket -= leaked;
121
		}
122

  
123
		bib_then = now;
124
		if(bytes_in_bucket + len <= bucket_size) {
125
			bytes_in_bucket += len;
126
			return OK;
127
		} else {
128
			return THROTTLE;
129
		}
130
   }
131
}
util/rateLimiter.h
1
/*
2
 *          
3
 *
4
 *	upgraded rateControl - token bucket
5
 * 
6
 *	Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

  
10
//#include <sys/time.h>
11

  
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <event2/event.h>
15
#include <errno.h>
16

  
17

  
18
#define HP 1
19
#define NO_RTX 2
20

  
21
void planFreeSpaceInBucketEvent();
22

  
23
void freeSpaceInBucket_cb (int fd, short event,void *arg);
24

  
25
int queueOrSendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr, unsigned char priority);
26

  
util/stun.c
41 41
#include <stdio.h> 
42 42
#include <unistd.h>
43 43

  
44
#include <sys/ioctl.h>
45
#include <sys/socket.h>
46 44
#include <sys/time.h>
47 45
#include <sys/types.h> 
48
#include <arpa/inet.h>
49 46

  
50 47
#include <fcntl.h>
48

  
49
#ifndef WIN32
50
#include <sys/ioctl.h>
51
#include <sys/socket.h>
51 52
#include <netdb.h>
52 53
#include <netinet/in.h>
53 54
//#include <arpa/nameser.h>
54 55
//#include <resolv.h>
55 56
#include <net/if.h>
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff