Revision 6575ae37 ml.c

View differences:

ml.c
32 32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33 33
 */
34 34

  
35
#include <arpa/inet.h>
36
#ifndef WIN32
37
#include <netinet/in.h>
38
#include <sys/socket.h>
39
#endif
40
#include <fcntl.h>
41
#include <event2/event.h>
42
#include <stdlib.h>
43
#include <unistd.h>
44
#include <stdio.h>
45
#include <stddef.h>
46
#include <stdint.h>
47
#include <string.h>
48
#include <sys/types.h>
49
#include <arpa/inet.h>
50
#include <netdb.h>
51
#include <errno.h>
52
#include <time.h>
53
#include <math.h>
54
#include <assert.h>
55

  
56
#include "util/udpSocket.h"
57
#include "util/stun.h"
58
#include "transmissionHandler.h"
59

  
60
#define LOG_MODULE "[ml] "
61
#include "ml_log.h"
35
#include <ml_all.h>
62 36

  
63 37
/**************************** START OF INTERNALS ***********************/
64 38

  
......
95 69
/*
96 70
 * timeout before thinking of an mtu problem (check MAX_TRIALS as well)
97 71
 */
98
#define PMTU_TIMEOUT { 0, 500000 }
72
#define PMTU_TIMEOUT 500000 // in usec
99 73

  
100 74
/*
101 75
 * retry sending connection messages this many times before reducing pmtu
......
107 81
 */
108 82
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
109 83

  
84
#ifdef RTX
85
/*
86
 * default timeout value for a packet reception
87
 */
88
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
89

  
90
/*
91
 * default timeout value for a packet reception
92
 */
93
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
94

  
95
/*
96
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
97
 */
98
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
99

  
100
#endif
101

  
102

  
110 103
/*
111 104
 * global variables
112 105
 */
106

  
113 107
/*
114 108
 * define a buffer of pointers to connect structures
115 109
 */
......
183 177
/*
184 178
 * helper function to get rid of a warning
185 179
 */
180
#ifndef WIN32
186 181
int min(int a, int b) {
187 182
	if (a > b) return b;
188 183
	return a;
189 184
}
185
#endif
186

  
187
#ifdef RTX
188
//*********Counters**********
189

  
190
struct Counters {
191
	unsigned int receivedCompleteMsgCounter;
192
	unsigned int receivedIncompleteMsgCounter;
193
	unsigned int receivedDataPktCounter;
194
	unsigned int receivedRTXDataPktCounter;
195
	unsigned int receivedNACK1PktCounter;
196
	unsigned int receivedNACKMorePktCounter;
197
	unsigned int sentDataPktCounter;
198
	unsigned int sentRTXDataPktCtr;
199
	unsigned int sentNACK1PktCounter;
200
	unsigned int sentNACKMorePktCounter;
201
} counters;
202

  
203
extern unsigned int sentRTXDataPktCounter;
204

  
205
/*
206
 * receive timeout for a packet
207
 */
208
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
209

  
210

  
211
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
212

  
213
void mlShowCounters() {
214
	counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
215
	fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
216
	return;
217
}
218

  
219
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
220
{
221
	struct nack_msg *nackmsg;
222
	
223
	msgbuf += msg_h->len_mon_data_hdr;
224
	msg_size -= msg_h->len_mon_data_hdr;
225
	nackmsg = (struct nack_msg*) msgbuf;
226
	
227
	unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
228
	//if (gapSize == 1349) counters.receivedNACK1PktCounter++;
229
	//else counters.receivedNACKMorePktCounter++;
230

  
231
	rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);	
232
}
233

  
234
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams);
235

  
236
void pkt_recv_timeout_cb(int fd, short event, void *arg){
237
	int recv_id = (long) arg;
238
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
239

  
240
	//check if message still exists	
241
	if (recvdatabuf[recv_id] == NULL) return;
242

  
243
	//check if gap was filled in the meantime
244
	if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
245
		recvdatabuf[recv_id]->firstGap++;
246
		return;	
247
	}
248

  
249
	struct nack_msg nackmsg;
250
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
251
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
252
	nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
253
	nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
254
	recvdatabuf[recv_id]->firstGap++;
255

  
256
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
257

  
258
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
259
}
260

  
261
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
262
	int recv_id = (long) arg;
263
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
264

  
265
	if (recvdatabuf[recv_id] == NULL) {
266
		return;
267
	}
268

  
269
	if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
270

  
271
	struct nack_msg nackmsg;
272
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
273
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
274
	nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
275
	nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
276

  
277
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
278

  
279
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
280
}
281

  
282
#endif
283

  
284

  
190 285

  
191 286
/*
192 287
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
......
227 322
	return stun_server.sin_addr.s_addr != INADDR_NONE;
228 323
}
229 324

  
230
void send_msg(int con_id, int msg_type, char* msg, int msg_len, bool truncable, send_params * sParams) {
325
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams) {
231 326
	socketaddrgen udpgen;
232 327
	bool retry;
233 328
	int pkt_len, offset;
......
302 397
			msg_h.offset = htonl(offset);
303 398
			msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
304 399

  
305
			//monitoring layer hook
306
			if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
307
				mon_pkt_inf pkt_info;
308

  
309
				memset(h_pkt,0,MON_PKT_HEADER_SPACE);
310

  
311
				pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
312
				pkt_info.buffer = msg + offset;
313
				pkt_info.bufSize = pkt_len;
314
				pkt_info.msgtype = msg_type;
315
				pkt_info.dataID = connectbuf[con_id]->seqnr;
316
				pkt_info.offset = offset;
317
				pkt_info.datasize = msg_len;
318
				pkt_info.monitoringHeaderLen = iov[1].iov_len;
319
				pkt_info.monitoringHeader = iov[1].iov_base;
320
				pkt_info.ttl = -1;
321
				memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
322

  
323
				(get_Send_pkt_inf_cb) ((void *) &pkt_info);
324
			}
325 400

  
326 401
			debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
327
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
402
			int priority = 0; 
403
			if ((msg_type == ML_CON_MSG)
404
#ifdef RTX
405
 || (msg_type == ML_NACK_MSG)
406
#endif
407
) priority = HP;
408
			//fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
409
			switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
328 410
				case MSGLEN:
329 411
					info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
330 412
					// TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
......
340 422
					offset = msg_len; // exit the while
341 423
					break;
342 424
                                case THROTTLE:
343
                                    //    debug("THROTTLE on output"); 
425
                                        debug("THROTTLE on output"); 
344 426
					offset = msg_len; // exit the while
345 427
					break;
346 428
				case OK:
429
#ifdef RTX
430
					if (msg_type < 127) counters.sentDataPktCounter++;
431
#endif
347 432
					//update
348 433
					offset += pkt_len;
349 434
					//transmit data header only in the first packet
......
352 437
			}
353 438
		} while(offset != msg_len && !truncable);
354 439
	} while(retry);
440
	//fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
441
	//fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
355 442
}
356 443

  
357 444
void pmtu_timeout_cb(int fd, short event, void *arg);
358 445

  
446
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
447
	//monitoring layer hook
448
debug("SENDP1");
449
	if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
450
		mon_pkt_inf pkt_info;	
451

  
452
		struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
453

  
454
		memset(iov[1].iov_base,0,iov[1].iov_len);
455

  
456
		pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
457
		pkt_info.buffer = iov[3].iov_base;
458
		pkt_info.bufSize = iov[3].iov_len;
459
		pkt_info.msgtype = msg_h->msg_type;
460
		pkt_info.dataID = ntohl(msg_h->msg_seq_num);
461
		pkt_info.offset = ntohl(msg_h->offset);
462
		pkt_info.datasize = ntohl(msg_h->msg_length);
463
		pkt_info.monitoringHeaderLen = iov[1].iov_len;
464
		pkt_info.monitoringHeader = iov[1].iov_base;
465
		pkt_info.ttl = -1;
466
		memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
467

  
468
		(get_Send_pkt_inf_cb) ((void *) &pkt_info);
469
	}
470

  
471
 	//struct msg_header *msg_h;
472
    //msg_h = (struct msg_header *) iov[0].iov_base;        
473

  
474
	//fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
475
debug("SENDP2");
476

  
477
	return sendPacketFinal(udpSocket, iov, len, socketaddr);
478
}
479

  
359 480
void reschedule_conn_msg(int con_id)
360 481
{
361 482
	if (connectbuf[con_id]->timeout_event) {
......
390 511
	msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
391 512

  
392 513
	memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
393

  
514
  {
515
                        char buf[SOCKETID_STRING_SIZE];
516
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
517
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
518
   }
394 519
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
395 520
}
396 521

  
397 522
void send_conn_msg_with_pmtu_discovery(int con_id, int buf_size, int command_type)
398 523
{
399
	struct timeval tout = PMTU_TIMEOUT;
524
	struct timeval tout = {0,0};
525
	tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
400 526
	connectbuf[con_id]->timeout_value = tout;
401 527
	connectbuf[con_id]->trials = 1;
402 528
	send_conn_msg(con_id, buf_size, command_type);
......
504 630
				connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
505 631
				connectbuf[free_con_id]->starttime = time(NULL);
506 632
				memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
633
		//Workaround to support reuse of socketID
634
				connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
635
				connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
507 636
				connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;	// bootstrap pmtu from the other's size. Not strictly needed, but a good hint
508 637
				connectbuf[free_con_id]->timeout_event = NULL;
509 638
				connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
......
625 754
		NAT_traversal = true;
626 755
		// callback to the upper layer indicating that the socketID is now
627 756
		// ready to use
757
		{
758
                	char buf[SOCKETID_STRING_SIZE];
759
                	mlSocketIDToString(&local_socketID,buf,sizeof(buf));
760
 			debug("received local socket_address: %s\n", buf);
761
		}
628 762
		(receive_SocketID_cb) (&local_socketID, 0);
629 763
	}
630 764
}
......
689 823
			&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
690 824
		rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
691 825

  
692
// 		(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
693
// 			recvdatabuf[recv_id]->msgtype, &rParams);
826
#ifdef RTX
827
		counters.receivedIncompleteMsgCounter++;
828
		//mlShowCounters();
829
		//fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);		
830
#endif
831
 		//(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
694 832

  
695 833
		//clean up
696 834
		if (recvdatabuf[recv_id]->timeout_event) {
......
707 845
// process a single recv data message
708 846
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
709 847
{
710
	debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset);
848
	debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d inlength: %d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset, msg_h->msg_length);
711 849

  
712 850
	int recv_id, free_recv_id = -1;
713 851

  
......
716 854
		return;
717 855
	}
718 856

  
857
#ifdef RTX
858
	counters.receivedDataPktCounter++;
859
#endif	
719 860
	// check if a recv_data exist and enter data
720
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
861
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++) {
721 862
		if (recvdatabuf[recv_id] != NULL) {
722 863
			if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID &&
723 864
					msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
......
725 866
		} else
726 867
			if(free_recv_id == -1)
727 868
				free_recv_id = recv_id;
728

  
869
  }
729 870

  
730 871
	if(recv_id == RECVDATABUFSIZE) {
872
		debug(" recv id not found (free found: %d)\n", free_recv_id);
731 873
		//no recv_data found: create one
732 874
		recv_id = free_recv_id;
733 875
		recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
......
738 880
		recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
739 881
		recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
740 882
		recvdatabuf[recv_id]->arrivedBytes = 0;	//count this without the Mon headers
883
#ifdef RTX
884
		recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
885
		recvdatabuf[recv_id]->expectedOffset = 0;
886
		recvdatabuf[recv_id]->gapCounter = 0;
887
		recvdatabuf[recv_id]->firstGap = 0;
888
		recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
889
#endif
890

  
741 891
		/*
742 892
		* read the timeout data and set it
743 893
		*/
......
756 906

  
757 907
	//if first packet extract mon data header and advance pointer
758 908
	if (msg_h->offset == 0) {
909
		//fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
759 910
		memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
760 911
		msgbuf += msg_h->len_mon_data_hdr;
761 912
		bufsize -= msg_h->len_mon_data_hdr;
......
766 917
	// increment fragmentnr
767 918
	recvdatabuf[recv_id]->recvFragments++;
768 919
	// increment the arrivedBytes
769
	recvdatabuf[recv_id]->arrivedBytes += bufsize;
920
	recvdatabuf[recv_id]->arrivedBytes += bufsize; 
921

  
922
	//fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
770 923

  
771 924
	// enter the data into the buffer
772 925
	memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
926
#ifdef RTX
927
	// detecting a new gap	
928
	if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
929
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
930
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
931
		if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
932
		evtimer_add(event_new(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id), &pkt_recv_timeout);
933
	}
934
	
935
	//filling the gap by delayed packets
936
	if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
937
		counters.receivedRTXDataPktCounter++;
938
		//skip retransmitted packets
939
		if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
940
			int i;
941
			//fprintf(stderr,"firstGap: %d	gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
942
			for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
943
				if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
944
					recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
945
					break;
946
				}
947
				if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
948
					recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
949
					break;
950
				}
951
			}
952
		} else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
953
			//counters.receivedRTXDataPktCounter++;
954
			}
955
	}
956

  
957
	//updating the expectedOffset	
958
	if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
959
#endif
773 960

  
774 961
	//TODO very basic checkif all fragments arrived: has to be reviewed
775 962
	if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen)
......
822 1009
				debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
823 1010
				rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
824 1011

  
1012
#ifdef RTX
1013
				counters.receivedCompleteMsgCounter++;
1014
				//mlShowCounters();
1015
#endif
1016

  
825 1017
				(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
826 1018
					recvdatabuf[recv_id]->msgtype, (void *) &rParams);
827 1019
			} else {
828 1020
			    warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
829 1021
			}
830

  
1022
			
831 1023
			//clean up
832 1024
			if (recvdatabuf[recv_id]->timeout_event) {
833 1025
				debug("ML: freeing timeout for %d",recv_id);
......
837 1029
			} else {
838 1030
				debug("ML: received in 1 packet\n",recv_id);
839 1031
			}
1032
#ifdef RTX
1033
			if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1034
				debug("ML: freeing last packet timeout for %d",recv_id);
1035
				event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1036
				event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1037
				recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1038
			}
1039
			//fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);	
1040
#endif
840 1041
			free(recvdatabuf[recv_id]->recvbuf);
841 1042
			free(recvdatabuf[recv_id]);
842 1043
			recvdatabuf[recv_id] = NULL;
......
846 1047
				//TODO make timeout at least a DEFINE
847 1048
				recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
848 1049
				evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1050
#ifdef RTX
1051
				recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1052
				evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1053
#endif
849 1054
			}
850 1055
		}
851 1056
	}
......
889 1094

  
890 1095
	if(connectbuf[con_id]->trials == MAX_TRIALS) {
891 1096
		// decrement the pmtu size
892
		struct timeval tout = PMTU_TIMEOUT;
1097
		struct timeval tout = {0,0};
1098
		tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
893 1099
		info("\tML: decreasing pmtu estimate from %d to %d\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize));
894 1100
		connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
895 1101
		connectbuf[con_id]->timeout_value = tout; 
......
897 1103
	}
898 1104

  
899 1105
	//error in PMTU discovery?
900
	if (connectbuf[con_id]->pmtusize == ERROR) {
1106
	if (connectbuf[con_id]->pmtusize == P_ERROR) {
901 1107
		if (connectbuf[con_id]->internal_connect == true) {
902 1108
			//as of now we tried directly connecting, now let's try trough the NAT
903 1109
			connectbuf[con_id]->internal_connect = false;
......
922 1128
int schedule_pmtu_timeout(int con_id)
923 1129
{
924 1130
	if (! connectbuf[con_id]->timeout_event) {
925
		struct timeval tout = PMTU_TIMEOUT;
1131
		struct timeval tout = {0,0};
1132
		tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
926 1133
		connectbuf[con_id]->timeout_value = tout;
927 1134
		connectbuf[con_id]->trials = 1;
928 1135
		connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
......
950 1157
	case BELOWDSL:
951 1158
		return MIN;
952 1159
	case MIN:
953
		return ERROR;
1160
		return P_ERROR;
954 1161
	default:
955 1162
		warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
956 1163
		return MIN;
......
1011 1218

  
1012 1219
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1013 1220

  
1014
    if (new_pmtusize == ERROR) {
1221
    if (new_pmtusize == P_ERROR) {
1015 1222
		error("ML:  Could not create connection with connectionID %i !\n",
1016 1223
			connectionID);
1017 1224

  
......
1126 1333
	unsigned short stun_bind_response = 0x0101;
1127 1334
	unsigned short * msgspot = (unsigned short *) msgbuf;
1128 1335
	if (*msgspot == stun_bind_response) {
1129
		debug("ML: recv_pkg: parse stun message called\n");
1336
		debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1130 1337
		recv_stun_msg(msgbuf, recvSize);
1131 1338
		return;
1132 1339
	}
1133 1340

  
1134 1341
	msg_h = (struct msg_header *) msgbuf;
1135 1342

  
1343
        uint32_t inlen = ntohl(msg_h->msg_length);
1344
        if(inlen > 0x20000 || inlen < ntohl(msg_h->offset) || inlen == 0) {
1345
            warn("ML: BAD PACKET received from: %s:%d (len: %d < %d [=%08X] o:%d)", 
1346
                  inet_ntoa(recv_addr.sin_addr), recv_addr.sin_port,
1347
                               recvSize, inlen, inlen, ntohl(msg_h->offset));
1348
 warn("ML: received %d: %02X %02X %02X %02X %02X %02X - %02X %02X %02X %02X %02X %02X", recvSize,
1349
            msgbuf[0], msgbuf[1],msgbuf[2],msgbuf[3],msgbuf[4],msgbuf[5],
1350
            msgbuf[6],msgbuf[7],msgbuf[8],msgbuf[9],msgbuf[10],msgbuf[11]);
1351

  
1352
            return;
1353
       }
1354

  
1136 1355
	/* convert header from network to host order */
1137 1356
	msg_h->offset = ntohl(msg_h->offset);
1138 1357
	msg_h->msg_length = ntohl(msg_h->msg_length);
......
1166 1385
			struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1167 1386
			msginfNow.remote_socketID = &(c_msg->sock_id);
1168 1387
		}
1169
		else if(connectbuf[msg_h->remote_con_id] == NULL) {
1388
                else if(msg_h->remote_con_id < 0 || 
1389
                       msg_h->remote_con_id >= CONNECTBUFSIZE || 
1390
                       connectbuf[msg_h->remote_con_id] == NULL) {
1170 1391
			error("ML: received pkg called with non existent connection\n");
1171 1392
			return;
1172 1393
		} else
......
1188 1409
			debug("ML: received conn pkg\n");
1189 1410
			recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1190 1411
			break;
1412
#ifdef RTX
1413
		case ML_NACK_MSG:
1414
			debug("ML: received nack pkg\n");
1415
			recv_nack_msg(msg_h, bufptr, msg_size);
1416
			break;
1417
#endif
1191 1418
		default:
1192 1419
			if(msg_h->msg_type < 127) {
1193 1420
				debug("ML: received data pkg\n");
......
1217 1444
 */
1218 1445
void nat_traversal_timeout(int fd, short event, void *arg)
1219 1446
{
1447
debug("X. NatTrTo %d\n", NAT_traversal);
1220 1448
	if (NAT_traversal == false) {
1221 1449
		debug("ML: NAT traversal request re-send\n");
1222 1450
		if(receive_SocketID_cb)
1223 1451
			(receive_SocketID_cb) (&local_socketID, 2);
1224 1452
		try_stun();
1225 1453
	}
1454
debug("X. NatTrTo\n");
1226 1455
}
1227 1456

  
1228 1457
//return IP address, or INADDR_NONE if can't resolve
......
1246 1475
{
1247 1476
	struct sockaddr_in udpaddr = {0};
1248 1477
	udpaddr.sin_family = AF_INET;
1478
        debug("X. create_socket %s, %d\n", ipaddr, port);
1249 1479
	if (ipaddr == NULL) {
1250 1480
		/*
1251 1481
		* try to guess the local IP address
......
1320 1550

  
1321 1551
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1322 1552

  
1553
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1323 1554
	base = (struct event_base *) arg;
1324 1555
	recv_data_callback = recv_data_cb;
1325 1556
	mlSetRecvTimeout(timeout_value);
......
1329 1560

  
1330 1561
	}
1331 1562
	register_recv_localsocketID_cb(local_socketID_cb);
1563
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1332 1564
	return create_socket(port, ipaddr);
1333 1565
}
1334 1566

  
1335
void mlSetThrottle(int bucketsize, int drainrate) {
1567
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1336 1568
        setOutputRateParams(bucketsize, drainrate);
1569
	setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1337 1570
}
1338 1571
     
1572
void mlSetVerbosity (int log_level) {
1573
	setLogLevel(log_level);
1574
}
1339 1575

  
1340 1576
/* register callbacks  */
1341 1577
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
......
1396 1632
void mlSetRecvTimeout(struct timeval timeout_value){
1397 1633

  
1398 1634
	recv_timeout = timeout_value;
1399

  
1635
#ifdef RTX
1636
	unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1637
	total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1638
	last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1639
	last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1640
	fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);	
1641
	fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);	
1642
#endif
1400 1643
}
1401 1644

  
1402 1645
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){

Also available in: Unified diff