Revision 5f3adef4 ml/ml.c

View differences:

ml/ml.c
32 32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33 33
 */
34 34

  
35
#include <arpa/inet.h>
36
#ifndef WIN32
37
#include <netinet/in.h>
38
#include <sys/socket.h>
39
#endif
40
#include <fcntl.h>
41
#include <event2/event.h>
35 42
#include <stdlib.h>
36 43
#include <unistd.h>
37 44
#include <stdio.h>
......
39 46
#include <stdint.h>
40 47
#include <string.h>
41 48
#include <sys/types.h>
49
#include <arpa/inet.h>
50
#include <netdb.h>
51
#include <errno.h>
42 52
#include <time.h>
43 53
#include <math.h>
44 54
#include <assert.h>
45
#include <errno.h>
46

  
47
#ifndef WIN32
48
#include <arpa/inet.h>
49
#include <netdb.h>
50
#include <netinet/in.h>
51
#include <sys/socket.h>
52
#include <fcntl.h>
53
#else
54

  
55
#include <winsock2.h>
56
#include <ws2tcpip.h>
57
#endif
58 55

  
59 56
#include "util/udpSocket.h"
60 57
#include "util/stun.h"
61 58
#include "transmissionHandler.h"
62
#include "util/rateLimiter.h"
63
#include "util/queueManagement.h"
64 59

  
65 60
#define LOG_MODULE "[ml] "
66 61
#include "ml_log.h"
......
112 107
 */
113 108
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
114 109

  
115
#ifdef RTX
116
/*
117
 * default timeout value for a packet reception
118
 */
119
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
120

  
121
/*
122
 * default timeout value for a packet reception
123
 */
124
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
125

  
126
/*
127
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
128
 */
129
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
130

  
131
#endif
132

  
133

  
134 110
/*
135 111
 * global variables
136 112
 */
137

  
138 113
/*
139 114
 * define a buffer of pointers to connect structures
140 115
 */
......
208 183
/*
209 184
 * helper function to get rid of a warning
210 185
 */
211
#ifndef WIN32
212 186
int min(int a, int b) {
213 187
	if (a > b) return b;
214 188
	return a;
215 189
}
216
#endif
217

  
218
#ifdef RTX
219
//*********Counters**********
220

  
221
struct Counters {
222
	unsigned int receivedCompleteMsgCounter;
223
	unsigned int receivedIncompleteMsgCounter;
224
	unsigned int receivedDataPktCounter;
225
	unsigned int receivedRTXDataPktCounter;
226
	unsigned int receivedNACK1PktCounter;
227
	unsigned int receivedNACKMorePktCounter;
228
	unsigned int sentDataPktCounter;
229
	unsigned int sentRTXDataPktCtr;
230
	unsigned int sentNACK1PktCounter;
231
	unsigned int sentNACKMorePktCounter;
232
} counters;
233

  
234
extern unsigned int sentRTXDataPktCounter;
235

  
236
/*
237
 * receive timeout for a packet
238
 */
239
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
240

  
241

  
242
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
243

  
244
void mlShowCounters() {
245
	counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
246
	fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
247
	return;
248
}
249

  
250
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
251
{
252
	struct nack_msg *nackmsg;
253
	
254
	msgbuf += msg_h->len_mon_data_hdr;
255
	msg_size -= msg_h->len_mon_data_hdr;
256
	nackmsg = (struct nack_msg*) msgbuf;
257
	
258
	unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
259
	//if (gapSize == 1349) counters.receivedNACK1PktCounter++;
260
	//else counters.receivedNACKMorePktCounter++;
261

  
262
	rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);	
263
}
264

  
265
void pkt_recv_timeout_cb(int fd, short event, void *arg){
266
	int recv_id = (long) arg;
267
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
268

  
269
	//check if message still exists	
270
	if (recvdatabuf[recv_id] == NULL) return;
271

  
272
	//check if gap was filled in the meantime
273
	if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
274
		recvdatabuf[recv_id]->firstGap++;
275
		return;	
276
	}
277

  
278
	struct nack_msg nackmsg;
279
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
280
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
281
	nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
282
	nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
283
	recvdatabuf[recv_id]->firstGap++;
284

  
285
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
286
	//if (gapSize == 1349) counters.sentNACK1PktCounter++;
287
	//else counters.sentNACKMorePktCounter++;
288

  
289
	//fprintf(stderr,"Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
290

  
291
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
292
}
293

  
294
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
295
	int recv_id = (long) arg;
296
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
297

  
298
	if (recvdatabuf[recv_id] == NULL) {
299
		//fprintf(stderr,"Called last_pkt_recv_timeout_cb but there is no slot\n");
300
		return;
301
	}
302

  
303
	//fprintf(stderr,"Starting last_pkt_recv_timeout_cb for msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);
304

  
305
	if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
306

  
307
	struct nack_msg nackmsg;
308
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
309
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
310
	nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
311
	nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
312

  
313
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
314
	//if (gapSize == 1349) counters.sentNACK1PktCounter++;
315
	//else counters.sentNACKMorePktCounter++;	
316

  
317
	//fprintf(stderr,"last_pkt - Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
318

  
319
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
320
}
321

  
322
#endif
323

  
324

  
325 190

  
326 191
/*
327 192
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
......
437 302
			msg_h.offset = htonl(offset);
438 303
			msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
439 304

  
305
			//monitoring layer hook
306
			if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
307
				mon_pkt_inf pkt_info;
308

  
309
				memset(h_pkt,0,MON_PKT_HEADER_SPACE);
310

  
311
				pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
312
				pkt_info.buffer = msg + offset;
313
				pkt_info.bufSize = pkt_len;
314
				pkt_info.msgtype = msg_type;
315
				pkt_info.dataID = connectbuf[con_id]->seqnr;
316
				pkt_info.offset = offset;
317
				pkt_info.datasize = msg_len;
318
				pkt_info.monitoringHeaderLen = iov[1].iov_len;
319
				pkt_info.monitoringHeader = iov[1].iov_base;
320
				pkt_info.ttl = -1;
321
				memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
322

  
323
				(get_Send_pkt_inf_cb) ((void *) &pkt_info);
324
			}
440 325

  
441 326
			debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
442
			int priority = 0; 
443
			if ((msg_type == ML_CON_MSG)
444
#ifdef RTX
445
 || (msg_type == ML_NACK_MSG)
446
#endif
447
) priority = HP;
448
			//fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
449
			switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
327
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
450 328
				case MSGLEN:
451 329
					info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
452 330
					// TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
......
466 344
					offset = msg_len; // exit the while
467 345
					break;
468 346
				case OK:
469
#ifdef RTX
470
					if (msg_type < 127) counters.sentDataPktCounter++;
471
#endif
472 347
					//update
473 348
					offset += pkt_len;
474 349
					//transmit data header only in the first packet
......
477 352
			}
478 353
		} while(offset != msg_len && !truncable);
479 354
	} while(retry);
480
	//fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
481
	//fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
482 355
}
483 356

  
484 357
void pmtu_timeout_cb(int fd, short event, void *arg);
485 358

  
486
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
487
	//monitoring layer hook
488
	if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
489
		mon_pkt_inf pkt_info;	
490

  
491
		struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
492

  
493
		memset(iov[1].iov_base,0,iov[1].iov_len);
494

  
495
		pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
496
		pkt_info.buffer = iov[3].iov_base;
497
		pkt_info.bufSize = iov[3].iov_len;
498
		pkt_info.msgtype = msg_h->msg_type;
499
		pkt_info.dataID = ntohl(msg_h->msg_seq_num);
500
		pkt_info.offset = ntohl(msg_h->offset);
501
		pkt_info.datasize = ntohl(msg_h->msg_length);
502
		pkt_info.monitoringHeaderLen = iov[1].iov_len;
503
		pkt_info.monitoringHeader = iov[1].iov_base;
504
		pkt_info.ttl = -1;
505
		memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
506

  
507
		(get_Send_pkt_inf_cb) ((void *) &pkt_info);
508
	}
509

  
510
 	//struct msg_header *msg_h;
511
    //msg_h = (struct msg_header *) iov[0].iov_base;        
512

  
513
	//fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
514

  
515
	return sendPacketFinal(udpSocket, iov, len, socketaddr);
516
}
517

  
518 359
void reschedule_conn_msg(int con_id)
519 360
{
520 361
	if (connectbuf[con_id]->timeout_event) {
......
549 390
	msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
550 391

  
551 392
	memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
552
  {
553
                        char buf[SOCKETID_STRING_SIZE];
554
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
555
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
556
   }
393

  
557 394
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
558 395
}
559 396

  
......
667 504
				connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
668 505
				connectbuf[free_con_id]->starttime = time(NULL);
669 506
				memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
670
		//Workaround to support reuse of socketID
671
				connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
672
				connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
673 507
				connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;	// bootstrap pmtu from the other's size. Not strictly needed, but a good hint
674 508
				connectbuf[free_con_id]->timeout_event = NULL;
675 509
				connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
......
791 625
		NAT_traversal = true;
792 626
		// callback to the upper layer indicating that the socketID is now
793 627
		// ready to use
794
		{
795
                	char buf[SOCKETID_STRING_SIZE];
796
                	mlSocketIDToString(&local_socketID,buf,sizeof(buf));
797
 			debug("received local socket_address: %s\n", buf);
798
		}
799 628
		(receive_SocketID_cb) (&local_socketID, 0);
800 629
	}
801 630
}
......
860 689
			&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
861 690
		rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
862 691

  
863
#ifdef RTX
864
		counters.receivedIncompleteMsgCounter++;
865
		//mlShowCounters();
866
		//fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);		
867
#endif
868
 		//(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
692
// 		(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
693
// 			recvdatabuf[recv_id]->msgtype, &rParams);
869 694

  
870 695
		//clean up
871 696
		if (recvdatabuf[recv_id]->timeout_event) {
......
891 716
		return;
892 717
	}
893 718

  
894
#ifdef RTX
895
	counters.receivedDataPktCounter++;
896
#endif	
897 719
	// check if a recv_data exist and enter data
898 720
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
899 721
		if (recvdatabuf[recv_id] != NULL) {
......
916 738
		recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
917 739
		recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
918 740
		recvdatabuf[recv_id]->arrivedBytes = 0;	//count this without the Mon headers
919
#ifdef RTX
920
		recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
921
		recvdatabuf[recv_id]->expectedOffset = 0;
922
		recvdatabuf[recv_id]->gapCounter = 0;
923
		recvdatabuf[recv_id]->firstGap = 0;
924
		recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
925
#endif
926

  
927 741
		/*
928 742
		* read the timeout data and set it
929 743
		*/
......
942 756

  
943 757
	//if first packet extract mon data header and advance pointer
944 758
	if (msg_h->offset == 0) {
945
		//fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
946 759
		memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
947 760
		msgbuf += msg_h->len_mon_data_hdr;
948 761
		bufsize -= msg_h->len_mon_data_hdr;
......
953 766
	// increment fragmentnr
954 767
	recvdatabuf[recv_id]->recvFragments++;
955 768
	// increment the arrivedBytes
956
	recvdatabuf[recv_id]->arrivedBytes += bufsize; 
957

  
958
	//fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
769
	recvdatabuf[recv_id]->arrivedBytes += bufsize;
959 770

  
960 771
	// enter the data into the buffer
961 772
	memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
962
#ifdef RTX
963
	// detecting a new gap	
964
	if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
965
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
966
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
967
		if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
968
		evtimer_add(event_new(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id), &pkt_recv_timeout);
969
	}
970
	
971
	//filling the gap by delayed packets
972
	if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
973
		counters.receivedRTXDataPktCounter++;
974
		//skip retransmitted packets
975
		if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
976
			int i;
977
			//fprintf(stderr,"firstGap: %d	gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
978
			for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
979
				if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
980
					recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
981
					break;
982
				}
983
				if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
984
					recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
985
					break;
986
				}
987
			}
988
		} else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
989
			//counters.receivedRTXDataPktCounter++;
990
			}
991
	}
992

  
993
	//updating the expectedOffset	
994
	if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
995
#endif
996 773

  
997 774
	//TODO very basic checkif all fragments arrived: has to be reviewed
998 775
	if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen)
......
1045 822
				debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
1046 823
				rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1047 824

  
1048
#ifdef RTX
1049
				counters.receivedCompleteMsgCounter++;
1050
				//mlShowCounters();
1051
#endif
1052

  
1053 825
				(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1054 826
					recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1055 827
			} else {
1056 828
			    warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1057 829
			}
1058
			
830

  
1059 831
			//clean up
1060 832
			if (recvdatabuf[recv_id]->timeout_event) {
1061
				debug("ML: freeing timeout for %d\n",recv_id);
833
				debug("ML: freeing timeout for %d",recv_id);
1062 834
				event_del(recvdatabuf[recv_id]->timeout_event);
1063 835
				event_free(recvdatabuf[recv_id]->timeout_event);
1064 836
				recvdatabuf[recv_id]->timeout_event = NULL;
1065 837
			} else {
1066 838
				debug("ML: received in 1 packet\n",recv_id);
1067 839
			}
1068
#ifdef RTX
1069
			if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1070
				debug("ML: freeing last packet timeout for %d",recv_id);
1071
				event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1072
				event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1073
				recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1074
			}
1075
			//fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);	
1076
#endif
1077 840
			free(recvdatabuf[recv_id]->recvbuf);
1078 841
			free(recvdatabuf[recv_id]);
1079 842
			recvdatabuf[recv_id] = NULL;
......
1083 846
				//TODO make timeout at least a DEFINE
1084 847
				recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
1085 848
				evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1086
#ifdef RTX
1087
				recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1088
				evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1089
#endif
1090 849
			}
1091 850
		}
1092 851
	}
......
1138 897
	}
1139 898

  
1140 899
	//error in PMTU discovery?
1141
	if (connectbuf[con_id]->pmtusize == P_ERROR) {
900
	if (connectbuf[con_id]->pmtusize == ERROR) {
1142 901
		if (connectbuf[con_id]->internal_connect == true) {
1143 902
			//as of now we tried directly connecting, now let's try trough the NAT
1144 903
			connectbuf[con_id]->internal_connect = false;
......
1191 950
	case BELOWDSL:
1192 951
		return MIN;
1193 952
	case MIN:
1194
		return P_ERROR;
953
		return ERROR;
1195 954
	default:
1196 955
		warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
1197 956
		return MIN;
......
1252 1011

  
1253 1012
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1254 1013

  
1255
    if (new_pmtusize == P_ERROR) {
1014
    if (new_pmtusize == ERROR) {
1256 1015
		error("ML:  Could not create connection with connectionID %i !\n",
1257 1016
			connectionID);
1258 1017

  
......
1367 1126
	unsigned short stun_bind_response = 0x0101;
1368 1127
	unsigned short * msgspot = (unsigned short *) msgbuf;
1369 1128
	if (*msgspot == stun_bind_response) {
1370
		debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1129
		debug("ML: recv_pkg: parse stun message called\n");
1371 1130
		recv_stun_msg(msgbuf, recvSize);
1372 1131
		return;
1373 1132
	}
......
1429 1188
			debug("ML: received conn pkg\n");
1430 1189
			recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1431 1190
			break;
1432
#ifdef RTX
1433
		case ML_NACK_MSG:
1434
			debug("ML: received nack pkg\n");
1435
			recv_nack_msg(msg_h, bufptr, msg_size);
1436
			break;
1437
#endif
1438 1191
		default:
1439 1192
			if(msg_h->msg_type < 127) {
1440 1193
				debug("ML: received data pkg\n");
......
1464 1217
 */
1465 1218
void nat_traversal_timeout(int fd, short event, void *arg)
1466 1219
{
1467
debug("X. NatTrTo %d\n", NAT_traversal);
1468 1220
	if (NAT_traversal == false) {
1469 1221
		debug("ML: NAT traversal request re-send\n");
1470 1222
		if(receive_SocketID_cb)
1471 1223
			(receive_SocketID_cb) (&local_socketID, 2);
1472 1224
		try_stun();
1473 1225
	}
1474
debug("X. NatTrTo\n");
1475 1226
}
1476 1227

  
1477 1228
//return IP address, or INADDR_NONE if can't resolve
......
1495 1246
{
1496 1247
	struct sockaddr_in udpaddr = {0};
1497 1248
	udpaddr.sin_family = AF_INET;
1498
        debug("X. create_socket %s, %d\n", ipaddr, port);
1499 1249
	if (ipaddr == NULL) {
1500 1250
		/*
1501 1251
		* try to guess the local IP address
......
1570 1320

  
1571 1321
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1572 1322

  
1573
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1574 1323
	base = (struct event_base *) arg;
1575 1324
	recv_data_callback = recv_data_cb;
1576 1325
	mlSetRecvTimeout(timeout_value);
......
1580 1329

  
1581 1330
	}
1582 1331
	register_recv_localsocketID_cb(local_socketID_cb);
1583
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1584 1332
	return create_socket(port, ipaddr);
1585 1333
}
1586 1334

  
1587
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1335
void mlSetThrottle(int bucketsize, int drainrate) {
1588 1336
        setOutputRateParams(bucketsize, drainrate);
1589
	setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1590 1337
}
1591 1338
     
1592
void mlSetVerbosity (int log_level) {
1593
	setLogLevel(log_level);
1594
}
1595 1339

  
1596 1340
/* register callbacks  */
1597 1341
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
......
1652 1396
void mlSetRecvTimeout(struct timeval timeout_value){
1653 1397

  
1654 1398
	recv_timeout = timeout_value;
1655
#ifdef RTX
1656
	unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1657
	total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1658
	last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1659
	last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1660
	fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);	
1661
	fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);	
1662
#endif
1399

  
1663 1400
}
1664 1401

  
1665 1402
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){

Also available in: Unified diff