Revision da5dade7 transmissionHandler.c

View differences:

transmissionHandler.c
1
/*
2
 *          Policy Management
3
 *
4
 *          NEC Europe Ltd. PROPRIETARY INFORMATION
5
 *
6
 * This software is supplied under the terms of a license agreement
7
 * or nondisclosure agreement with NEC Europe Ltd. and may not be
8
 * copied or disclosed except in accordance with the terms of that
9
 * agreement.
10
 *
11
 *      Copyright (c) 2009 NEC Europe Ltd. All Rights Reserved.
12
 *
13
 * Authors: Kristian Beckers  <beckers@nw.neclab.eu>
14
 *          Sebastian Kiesel  <kiesel@nw.neclab.eu>
15
 *          
16
 *
17
 * NEC Europe Ltd. DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR IMPLIED,
18
 * INCLUDING BUT NOT LIMITED TO IMPLIED WARRANTIES OF MERCHANTABILITY
19
 * AND FITNESS FOR A PARTICULAR PURPOSE AND THE WARRANTY AGAINST LATENT
20
 * DEFECTS, WITH RESPECT TO THE PROGRAM AND THE ACCOMPANYING
21
 * DOCUMENTATION.
22
 *
23
 * No Liability For Consequential Damages IN NO EVENT SHALL NEC Europe
24
 * Ltd., NEC Corporation OR ANY OF ITS SUBSIDIARIES BE LIABLE FOR ANY
25
 * DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS
26
 * OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF INFORMATION, OR
27
 * OTHER PECUNIARY LOSS AND INDIRECT, CONSEQUENTIAL, INCIDENTAL,
28
 * ECONOMIC OR PUNITIVE DAMAGES) ARISING OUT OF THE USE OF OR INABILITY
29
 * TO USE THIS PROGRAM, EVEN IF NEC Europe Ltd. HAS BEEN ADVISED OF THE
30
 * POSSIBILITY OF SUCH DAMAGES.
31
 *
32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33
 */
34
#include <arpa/inet.h>
35
#include <netinet/in.h>
36
#include <sys/socket.h>
37
#include <fcntl.h>
38
#include <event2/event.h>
39
#include <stdlib.h>
40
#include <unistd.h>
41
#include <stdio.h>
42
#include <stddef.h>
43
#include <stdint.h>
44
#include <string.h>
45
#include <sys/types.h>
46
#include <arpa/inet.h>
47
#include <netdb.h>
48
#include <errno.h>
49
#include <time.h>
50
#include <math.h>
51
#include "util/udpSocket.h"
52
#include "util/stun.h"
53
#include "transmissionHandler.h"
54

  
55
#define LOG_MODULE "[ml] "
56
#include "grapes_log.h"
57

  
58
/*
59
 * a pointer to a libevent instance 
60
 */
61
struct event_base *base;
62

  
63
/*
64
 * define the nr of connections the messaging layer can handle 
65
 */
66
#define CONNECTBUFSIZE 10000
67
/*
68
 * define the nr of data that can be received parallel 
69
 */
70
#define RECVDATABUFSIZE 10000
71
/*
72
 * define an array for message multiplexing 
73
 */
74
#define MSGMULTIPLEXSIZE 127
75

  
76
/*
77
 * global variables 
78
 */
79
/*
80
 * define a buffer of pointers to connect structures 
81
 */
82
connect_data *connectbuf[CONNECTBUFSIZE];
83

  
84
/*
85
 * define a pointer buffer with pointers to recv_data structures 
86
 */
87
recvdata *recvdatabuf[RECVDATABUFSIZE];
88

  
89
/*
90
 * define a pointer buffer for message multiplexing 
91
 */
92
receive_data_cb recvcbbuf[MSGMULTIPLEXSIZE];
93

  
94
/*
95
 * stun server address 
96
 */
97
struct sockaddr_in stun_server;
98

  
99
/*
100
 * receive timeout 
101
 */
102
struct timeval recv_timeout;
103

  
104
/*
105
 * boolean NAT traversal successful if true 
106
 */
107
boolean NAT_traversal;
108

  
109
/*
110
 * file descriptor for local socket 
111
 */
112
evutil_socket_t socketfd;
113

  
114
/*
115
 * local socketID 
116
 */
117
socket_ID local_socketID;
118

  
119
socketID_handle loc_socketID = &local_socketID;
120

  
121
/*
122
 * callback function pointers 
123
 */
124
/*
125
 * monitoring module callbacks 
126
 */
127
get_recv_pkt_inf_cb get_Recv_pkt_inf_cb = NULL;
128
get_send_pkt_inf_cb get_Send_pkt_inf_cb = NULL;
129
set_monitoring_header_pkt_cb set_Monitoring_header_pkt_cb = NULL;
130
get_recv_data_inf_cb get_Recv_data_inf_cb = NULL;
131
get_send_data_inf_cb get_Send_data_inf_cb = NULL;
132
set_monitoring_header_data_cb set_Monitoring_header_data_cb = NULL;
133
/*
134
 * connection callbacks 
135
 */
136
receive_connection_cb receive_Connection_cb = NULL;
137
connection_failed_cb failed_Connection_cb = NULL;
138
/*
139
 * local socketID callback 
140
 */
141
receive_localsocketID_cb receive_SocketID_cb;
142

  
143
/*
144
 * boolean that defines if received data is transmitted to the upper layer 
145
 * via callback or via upper layer polling 
146
 */
147
boolean recv_data_callback;
148

  
149
/*
150
 * helper function to get rid of a warning
151
 */
152
int min(int a, int b) {
153
	if (a > b) return b;
154
	return a;
155
}
156

  
157
/*
158
 * functions 
159
 */
160
/*
161
 * register callback functions 
162
 */
163

  
164
void register_get_recv_pkt_inf(get_recv_pkt_inf_cb recv_pkt_inf_cb)
165
{
166
	if (recv_pkt_inf_cb == NULL)
167
		printf("Register get_recv_pkt_inf_cb failed: NULL ptr  \n");
168
	else
169
		get_Recv_pkt_inf_cb = recv_pkt_inf_cb;
170
}
171

  
172
void register_get_send_pkt_inf(get_send_pkt_inf_cb send_pkt_inf_cb)
173
{
174
	if (send_pkt_inf_cb == NULL)
175
		printf("Register get_send_pkt_inf_cb: NULL ptr  \n");
176
	else
177
		get_Send_pkt_inf_cb = send_pkt_inf_cb;
178
}
179

  
180
void
181
register_set_monitoring_header_pkt_cb(set_monitoring_header_pkt_cb
182
				      monitoring_header_pkt_cb)
183
{
184
	if (monitoring_header_pkt_cb == NULL)
185
		printf("Register set_monitoring_header_pkt_cb: NULL ptr  \n");
186
	else
187
		set_Monitoring_header_pkt_cb = monitoring_header_pkt_cb;
188
}
189

  
190
void register_get_recv_data_inf(get_recv_data_inf_cb recv_data_inf_cb)
191
{
192
	if (recv_data_inf_cb == NULL)
193
		printf("Register get_recv_data_inf_cb: NULL ptr  \n");
194
	else
195
		get_Recv_data_inf_cb = recv_data_inf_cb;
196
}
197

  
198
void register_get_send_data_inf(get_send_data_inf_cb send_data_inf_cb)
199
{
200
	if (send_data_inf_cb == NULL)
201
		printf("Register get_send_data_inf_cb: NULL ptr  \n");
202
	else
203
		get_Send_data_inf_cb = send_data_inf_cb;
204
}
205

  
206
void
207
register_set_monitoring_header_data_cb(set_monitoring_header_data_cb
208
				       monitoring_header_data_cb)
209
{
210
	if (monitoring_header_data_cb == NULL)
211
		printf("Register set_monitoring_header_data_cb : NULL ptr  \n");
212
	else
213
	set_Monitoring_header_data_cb = monitoring_header_data_cb;
214
}
215

  
216
void register_recv_connection_cb(receive_connection_cb connection_cb)
217
{
218
	if (connection_cb == NULL)
219
		printf("Register receive_connection_cb: NULL ptr  \n");
220
	else
221
		receive_Connection_cb = connection_cb;
222
}
223

  
224
void
225
register_recv_localsocketID_cb(receive_localsocketID_cb local_socketID_cb)
226
{
227
	if (local_socketID_cb == NULL)
228
		printf("Register receive_localsocketID_cb: NULL ptr \n");
229
	else
230
	receive_SocketID_cb = local_socketID_cb;
231
}
232

  
233

  
234
void register_error_connection_cb(connection_failed_cb connection_fail)
235
{
236
	if (connection_fail == NULL)
237
		printf("Register connection_failed_cb: NULL ptr  \n");
238
	else
239
	failed_Connection_cb = connection_fail;
240
}
241

  
242

  
243
void register_recv_data_cb(receive_data_cb data_cb, unsigned char msgtype)
244
{
245

  
246
    if (msgtype > 126) {
247

  
248
	printf
249
	    ("transmissionHandler: Could not register recv_data callback. Msgtype is greater then 126 \n");
250

  
251
    }
252

  
253
    if (data_cb == NULL) {
254

  
255
	printf("Register receive data callback: NUll ptr \n ");
256

  
257
    } else {
258

  
259
	recvcbbuf[msgtype] = data_cb;
260

  
261
    }
262
}
263

  
264
void
265
init_transmissionHandler(boolean recv_data_cb,
266
			 struct timeval timeout_value, const int port,
267
			 const char *ipaddr, const int stun_port,
268
			 const char *stun_ipaddr,
269
			 receive_localsocketID_cb local_socketID_cb,
270
			 void *arg)
271
{
272
	base = (struct event_base *) arg;
273
	recv_data_callback = recv_data_cb;
274
	setRecvTimeout(timeout_value);
275
	if (stun_ipaddr) {
276
		 setStunServer(stun_port, stun_ipaddr);
277
	} else {
278
	
279
	}
280
	register_recv_localsocketID_cb(local_socketID_cb);
281
	create_socket(port, ipaddr);
282
}
283

  
284
/*
285
 * Sockets 
286
 */
287
/*
288
 * returns a handle to the socketID struct the ipaddr can be a null
289
 * pointer. Then all available ipaddr on the machine are choosen.
290
 */
291
void create_socket(const int port, const char *ipaddr)
292
{
293
	struct sockaddr_in udpaddr;
294
	udpaddr.sin_family = AF_INET;
295
	if (ipaddr == NULL) {
296
		/*
297
		* try to guess the local IP address
298
		*/
299
		const char *ipaddr_iface = autodetect_ipaddress();
300
		if (ipaddr_iface) {
301
			udpaddr.sin_addr.s_addr = inet_addr(ipaddr_iface);
302
		} else {
303
			udpaddr.sin_addr.s_addr = INADDR_ANY;
304
		}
305
	} else {
306
		udpaddr.sin_addr.s_addr = inet_addr(ipaddr);
307
	}
308
	udpaddr.sin_port = htons(port);
309

  
310
	socketaddrgen udpgen;
311
	udpgen.udpaddr = udpaddr;
312
	local_socketID.internal_addr = udpgen;
313
	
314
	socketfd = createSocket(port, ipaddr);
315
	
316
	struct event *ev;
317
	ev = event_new(base, socketfd, EV_READ | EV_PERSIST, recv_pkg, NULL);
318
	
319
	event_add(ev, NULL);
320
	
321
	if (isStunDefined()) {
322
		/*
323
		* send the NAT traversal STUN request 
324
		*/
325
		 send_stun_request(socketfd, &stun_server);
326

  
327
		/*
328
		* enter a NAT traversal timeout that takes care of retransmission 
329
		*/
330
		struct event *ev1;
331
		struct timeval timeout_value_NAT_traversal = { 2, 0 };
332
		ev1 = evtimer_new(base, nat_traversal_timeout, NULL);
333
		event_add(ev1, &timeout_value_NAT_traversal);
334

  
335
		NAT_traversal = false;
336
	} else {
337
		/*
338
		* Assume we have accessibility and copy internal address to external one
339
		*/
340
		local_socketID.external_addr = local_socketID.internal_addr;
341
		NAT_traversal = true; // @TODO: this is not really NAT traversal, but a flag that init is over
342
		// callback to the upper layer indicating that the socketID is now
343
		// ready to use
344
		(receive_SocketID_cb) (&local_socketID, 0);
345
	}
346
}
347

  
348
void close_socket(socketID_handle socketID)
349
{
350
	free(socketID);
351
}
352

  
353
/*
354
 * Connections 
355
 */
356
// the first socketID is the from socketID, the second one is the to
357
// socketID
358

  
359
int
360
open_connection(socketID_handle external_socketID, receive_connection_cb connection_cb, void *arg)
361
{
362
	int con_id;
363
	if (external_socketID == NULL) {
364
		error("ML: cannot open connection: one of the socketIDs is NULL");
365
		return -1;
366
	} 
367
	if (NAT_traversal == false) {
368
		error("ML: cannot open connection: NAT traversal for socketID still in progress");
369
		return -1;
370
	}
371
	if (connection_cb == NULL) {
372
		error("ML: cannot open connection: connection_cb is NULL");
373
		return -1;
374
	}
375

  
376
	// check if that connection already exist
377

  
378
	con_id = connection_exist(external_socketID, false);
379
	if (con_id >= 0) {
380
		// if so check if it is ready to use	
381
		if (connectbuf[con_id]->status == READY) {
382
				// if so use the callback immidiatley
383
				(connection_cb) (con_id, arg);
384

  
385
		// otherwise just write the connection cb and the arg pointer
386
		// into the connection struct
387
		} else {
388
			struct receive_connection_cb_list *temp;
389
			temp = malloc(sizeof(struct receive_connection_cb_list));
390
			temp->next = NULL;
391
			temp->connection_cb = connection_cb;
392
			temp->arg = arg;
393
			if(connectbuf[con_id]->connection_last != NULL) {
394
				connectbuf[con_id]->connection_last->next = temp;
395
				connectbuf[con_id]->connection_last = temp;
396
			} else
397
				connectbuf[con_id]->connection_last = connectbuf[con_id]->connection_head = temp;
398
		}
399
		return con_id;
400
	}
401
	// make entry in connection_establishment array
402
	for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
403
		if (connectbuf[con_id] == NULL) {
404
			connectbuf[con_id] = (connect_data *) malloc(sizeof(connect_data));
405
			memset(connectbuf[con_id],0,sizeof(connect_data));
406
			connectbuf[con_id]->starttime = time(NULL);
407
			memcpy(&connectbuf[con_id]->external_socketID, external_socketID, sizeof(socket_ID));
408
			connectbuf[con_id]->pmtusize = MAX;
409
			connectbuf[con_id]->status = INVITE;
410
			connectbuf[con_id]->seqnr = 0;
411
			connectbuf[con_id]->internal_connect = !compare_external_address_socketIDs(external_socketID, &local_socketID);
412
			/*
413
			* timeout values for the pmtu discovery 
414
			*/
415
			connectbuf[con_id]->timeout_value.tv_sec = 15;
416
			connectbuf[con_id]->timeout_value.tv_usec = 0;
417
			connectbuf[con_id]->connectionID = con_id;
418

  
419
			connectbuf[con_id]->connection_head = connectbuf[con_id]->connection_last = malloc(sizeof(struct receive_connection_cb_list));
420
			connectbuf[con_id]->connection_last->next = NULL;
421
			connectbuf[con_id]->connection_last->connection_cb = connection_cb;
422
			connectbuf[con_id]->connection_last->arg = arg;
423
			connectbuf[con_id]->external_connectionID = -1;
424
			break;
425
		}
426
	} //end of for
427

  
428
	if (con_id == CONNECTBUFSIZE) {
429
		error("ML: Could not open connection: connbuffer full");
430
		return -1;
431
	}
432

  
433
	// create and send a connection message
434
	send_conn_msg(con_id, MAX, INVITE);
435

  
436
	struct event *ev;
437
	ev = evtimer_new(base, pmtu_timeout_cb, (void *) (long)con_id);
438
	event_add(ev, &connectbuf[con_id]->timeout_value);
439

  
440
	return con_id;
441
}
442

  
443
//done
444
void close_connection(const int connectionID)
445
{
446
	// remove it from the connection array
447
	if(connectbuf[connectionID]) {
448
		if(connectbuf[connectionID]->ctrl_msg_buf)
449
			free(connectbuf[connectionID]->ctrl_msg_buf);
450
		free(connectbuf[connectionID]);
451
		connectbuf[connectionID] = NULL;
452
	}
453
}
454

  
455
void keep_connection_alive(const int connectionID)
456
{
457

  
458
    // to be done with the NAT traversal
459
    // send a message over the wire
460
    printf("\n");
461

  
462
}
463

  
464
int
465
send_all_data(const int connectionID, send_all_data_container * container,
466
	      int nr_entries, unsigned char msgtype, send_params * sParams)
467
{
468

  
469

  
470

  
471
    if (nr_entries < 1 || nr_entries > 5) {
472

  
473
	printf
474
	    ("send_all_data ERROR: nr_enties is not between 1 and 5 \n ");
475
	return 0;
476

  
477
    } else {
478

  
479
	if (nr_entries == 1) {
480

  
481
	    send_data(connectionID, container->buffer_1,
482
		      container->length_1, msgtype, sParams);
483

  
484
	    return 1;
485

  
486
	} else if (nr_entries == 2) {
487

  
488
	    int buflen = container->length_1 + container->length_2;
489
	    char buf[buflen];
490
	    memcpy(buf, container->buffer_1, container->length_1);
491
	    memcpy(&buf[container->length_1], container->buffer_2,
492
		   container->length_2);
493
	    send_data(connectionID, buf, buflen, msgtype, sParams);
494

  
495
	    return 1;
496

  
497
	} else if (nr_entries == 3) {
498

  
499
	    int buflen =
500
		container->length_1 + container->length_2 +
501
		container->length_3;
502
	    char buf[buflen];
503
	    memcpy(buf, container->buffer_1, container->length_1);
504
	    memcpy(&buf[container->length_1], container->buffer_2,
505
		   container->length_2);
506
	    memcpy(&buf[container->length_2], container->buffer_3,
507
		   container->length_3);
508
	    send_data(connectionID, buf, buflen, msgtype, sParams);
509

  
510

  
511
	    return 1;
512

  
513
	} else if (nr_entries == 4) {
514

  
515
	    int buflen =
516
		container->length_1 + container->length_2 +
517
		container->length_3 + container->length_4;
518
	    char buf[buflen];
519
	    memcpy(buf, container->buffer_1, container->length_1);
520
	    memcpy(&buf[container->length_1], container->buffer_2,
521
		   container->length_2);
522
	    memcpy(&buf[container->length_2], container->buffer_3,
523
		   container->length_3);
524
	    memcpy(&buf[container->length_3], container->buffer_4,
525
		   container->length_4);
526
	    send_data(connectionID, buf, buflen, msgtype, sParams);
527

  
528
	    return 1;
529

  
530
	} else {
531

  
532
	    int buflen =
533
		container->length_1 + container->length_2 +
534
		container->length_3 + container->length_4 +
535
		container->length_5;
536
	    char buf[buflen];
537
	    memcpy(buf, container->buffer_1, container->length_1);
538
	    memcpy(&buf[container->length_1], container->buffer_2,
539
		   container->length_2);
540
	    memcpy(&buf[container->length_2], container->buffer_3,
541
		   container->length_3);
542
	    memcpy(&buf[container->length_3], container->buffer_4,
543
		   container->length_4);
544
	    memcpy(&buf[container->length_4], container->buffer_5,
545
		   container->length_5);
546
	    send_data(connectionID, buf, buflen, msgtype, sParams);
547

  
548
	    return 1;
549
	}
550

  
551
    }
552

  
553

  
554
}
555

  
556
/*
557
 * send an entire block of data 
558
 */
559
void send_data(int con_id, char *sendbuf, int bufsize, int msgtype, send_params * sParams)
560
{
561
	if (sParams == NULL) {
562
		error("ML: send data failed: send_params is a NULL ptr");
563
		return;
564
	}
565

  
566
	if (con_id < 0) {
567
		error("ML: send data failed: connectionID does not exist");
568
		return;
569
	}
570

  
571
	if (connectbuf[con_id] == NULL) {
572
		error("ML: send data failed: connectionID does not exist");
573
		return;
574
	}
575
	if (connectbuf[con_id]->status != READY) {
576
	    error("ML: send data failed: connection is not active");
577
	    return;
578
	}
579

  
580
	send_msg(con_id, msgtype, sendbuf, bufsize, false, sParams);
581
}
582

  
583
/*
584
 * recv data with polling 
585
 */
586
int
587
recv_data(const int connectionID, char *recvbuf, int *bufsize,
588
	  recv_params * rParams)
589
{
590
	//TODO yet to be converted
591
	return 0;
592
#if 0
593
	if (rParams == NULL) {
594
		error("ML: recv_data failed: recv_params is a NULL ptr");
595
		return 0;
596
    } else {
597

  
598
	printf("transmissionhandler: recv data called \n");
599

  
600
	int i = 0;
601
	int returnValue = 0;
602
	double timeout = (double) recv_timeout.tv_sec;
603
	time_t endtime = time(NULL);
604

  
605
	for (i = 0; i < RECVDATABUFSIZE; i++) {
606

  
607
	    if (recvdatabuf[i] != NULL) {
608

  
609
		if (recvdatabuf[i]->connectionID == connectionID) {
610

  
611
		    printf("transmissionhandler: recv data has entry  \n");
612

  
613
		    double timepass = difftime(endtime, recvdatabuf[i]->starttime);
614

  
615
		    // check if the specified connection has data and it
616
		    // is complete
617
		    // check the data seqnr
618
		    // if(connectionID == recvdatabuf[i]->connectionID &&
619
		    // 1 == recvdatabuf[i]->status){
620

  
621
		    if (1 == recvdatabuf[i]->status) {
622

  
623
			// printf("transmissionHandler: recv_data set is
624
			// complete \n" );
625

  
626
			// printf("debud \n");
627

  
628
			// exchange the pointers
629
			int buffersize = 0;
630
			buffersize = recvdatabuf[i]->bufsize;
631
			*bufsize = buffersize;
632
			// recvbuf = recvdatabuf[i]->recvbuf;
633

  
634
			// printf("buffersize %d \n",buffersize);
635
			memcpy(recvbuf, recvdatabuf[i]->recvbuf,
636
			       buffersize);
637
			// printf(" recvbuf %s \n",recvbuf );
638

  
639
// 			double nrMissFrags =
640
// 			    (double) recvdatabuf[i]->nrFragments /
641
// 			    (double) recvdatabuf[i]->recvFragments;
642
// 			int nrMissingFragments = (int) ceil(nrMissFrags);
643

  
644
//			rParams->nrMissingFragments = nrMissingFragments;
645
// 			rParams->nrFragments = recvdatabuf[i]->nrFragments;
646
			rParams->msgtype = recvdatabuf[i]->msgtype;
647
			rParams->connectionID =
648
			    recvdatabuf[i]->connectionID;
649

  
650
			// break from the loop
651
			// printf(" recvbuf %s \n ",recvbuf);
652

  
653
			// double nrMissFrags =
654
			// (double)recvdatabuf[i]->nrFragments /
655
			// (double)recvdatabuf[i]->recvFragments;
656
			// int nrMissingFragments =
657
			// (int)ceil(nrMissFrags);
658

  
659
			if(get_Recv_data_inf_cb != NULL) {
660
				mon_data_inf recv_data_inf;
661

  
662
				recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
663
				recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
664
				recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
665
				recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
666
// 				recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
667
// 				recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
668
				gettimeofday(&recv_data_inf.arrival_time, NULL);
669
				recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
670
				recv_data_inf.nrMissingFragments = nrMissingFragments;
671
				recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
672
				recv_data_inf.priority = false;
673
				recv_data_inf.padding = false;
674
				recv_data_inf.confirmation = false;
675
				recv_data_inf.reliable = false;
676

  
677
				// send data recv callback to monitoring module
678

  
679
				(get_Recv_data_inf_cb) ((void *) &recv_data_inf);
680
			}
681

  
682

  
683
			// free the allocated memory
684
			free(recvdatabuf[i]);
685
			recvdatabuf[i] = NULL;
686

  
687
			returnValue = 1;
688
			break;
689

  
690
		    }
691

  
692
		    if (recvdatabuf[i] != NULL) {
693

  
694
			if (timepass > timeout) {
695

  
696
			    printf("transmissionHandler: recv_data timeout called  \n");
697

  
698
			    // some data about the missing chunks should
699
			    // be added here
700
			    // exchange the pointers 
701
			    int buffersize = 0;
702
			    buffersize = recvdatabuf[i]->bufsize;
703
			    *bufsize = buffersize;
704
			    // recvbuf = recvdatabuf[i]->recvbuf;
705

  
706
			    double nrMissFrags =
707
				(double) recvdatabuf[i]->nrFragments /
708
				(double) recvdatabuf[i]->recvFragments;
709
			    int nrMissingFragments =
710
				(int) ceil(nrMissFrags);
711

  
712
			    // printf(" recvbuf %s \n",recvbuf );
713

  
714
			    memcpy(recvbuf, recvdatabuf[i]->recvbuf,
715
				   buffersize);
716

  
717
			    rParams->nrMissingFragments =
718
				nrMissingFragments;
719
			    rParams->nrFragments =
720
				recvdatabuf[i]->nrFragments;
721
			    rParams->msgtype = recvdatabuf[i]->msgtype;
722
			    rParams->connectionID =
723
				recvdatabuf[i]->connectionID;
724

  
725
				if(get_Recv_data_inf_cb != NULL) {
726
					mon_data_inf recv_data_inf;
727

  
728
					recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
729
					recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
730
					recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
731
					recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
732
					recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
733
					recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
734
					gettimeofday(&recv_data_inf.arrival_time, NULL);
735
					recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
736
					recv_data_inf.nrMissingFragments = nrMissingFragments;
737
					recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
738
					recv_data_inf.priority = false;
739
					recv_data_inf.padding = false;
740
					recv_data_inf.confirmation = false;
741
					recv_data_inf.reliable = false;
742

  
743
					// send data recv callback to monitoring module
744

  
745
					(get_Recv_data_inf_cb) ((void *) &recv_data_inf);
746
				}
747

  
748
			    // free the allocated memory 
749
			    free(recvdatabuf[i]);
750
			    recvdatabuf[i] = NULL;
751

  
752
			    returnValue = 1;
753
			    break;
754

  
755
			}
756
		    }
757

  
758
		}
759

  
760
	    }
761
	    // printf("2 recvbuf %s \n ",recvbuf);
762
	}
763
	return returnValue;
764
    }
765
#endif
766
}
767

  
768
void setStunServer(const int port, const char *ipaddr)
769
{
770
	stun_server.sin_family = AF_INET;
771
	if (ipaddr == NULL)
772
		stun_server.sin_addr.s_addr = htonl(INADDR_ANY);
773
	else
774
		stun_server.sin_addr.s_addr = resolve(ipaddr);
775
	stun_server.sin_port = htons(port);
776
}
777

  
778
void unsetStunServer()
779
{
780
	stun_server.sin_addr.s_addr = 0;
781
}
782

  
783
bool isStunDefined()
784
{
785
	return stun_server.sin_addr.s_addr;
786
}
787

  
788
void setRecvTimeout(struct timeval timeout_value) {
789
	recv_timeout = timeout_value;
790
}
791
 
792
void send_msg(int con_id, int msg_type, char* msg, int msg_len, bool truncable, send_params * sParams) {
793
	socketaddrgen udpgen;
794
	bool retry;
795
	int pkt_len, offset;
796
	struct iovec iov[4];
797

  
798
	char h_pkt[MON_HEADER_SPACE];
799
	char h_data[MON_HEADER_SPACE];
800

  
801
	struct msg_header msg_h;
802

  
803
	iov[0].iov_base = &msg_h;
804
	iov[0].iov_len = MSG_HEADER_SIZE;
805

  
806
	msg_h.local_con_id = con_id;
807
	msg_h.remote_con_id = connectbuf[con_id]->external_connectionID;
808
	msg_h.msg_type = msg_type;
809
	msg_h.msg_seq_num = connectbuf[con_id]->seqnr++;
810

  
811

  
812
	iov[1].iov_len = iov[2].iov_len = 0;
813
	iov[1].iov_base = h_pkt;
814
	iov[2].iov_base = h_data;
815

  
816

  
817
	if (connectbuf[con_id]->internal_connect)
818
		udpgen = connectbuf[con_id]->external_socketID.internal_addr;
819
	else
820
		udpgen = connectbuf[con_id]->external_socketID.external_addr;
821

  
822
	do{
823
		offset = 0;
824
		retry = false;
825
		// Monitoring layer hook
826
		if(set_Monitoring_header_data_cb != NULL) {
827
			iov[2].iov_len = ((set_Monitoring_header_data_cb) (&(connectbuf[con_id]->external_socketID), msg_type));
828
		}
829
		msg_h.len_mon_data_hdr = iov[2].iov_len;
830

  
831
		if(get_Send_data_inf_cb != NULL && iov[2].iov_len != 0) {
832
			mon_data_inf sd_data_inf;
833
			
834
			sd_data_inf.remote_socketID = &(connectbuf[con_id]->external_socketID);
835
			sd_data_inf.buffer = msg;
836
			sd_data_inf.bufSize = msg_len;
837
			sd_data_inf.msgtype = msg_type;
838
			sd_data_inf.monitoringDataHeader = iov[2].iov_base;
839
			sd_data_inf.monitoringDataHeaderLen = iov[2].iov_len;
840
			sd_data_inf.priority = sParams->priority;
841
			sd_data_inf.padding = sParams->padding;
842
			sd_data_inf.confirmation = sParams->confirmation;
843
			sd_data_inf.reliable = sParams->reliable;
844
			memset(&sd_data_inf.arrival_time, 0, sizeof(struct timeval));
845
	
846
			(get_Send_data_inf_cb) ((void *) &sd_data_inf);
847
		}
848

  
849
		do {
850
			if(set_Monitoring_header_pkt_cb != NULL) {
851
				iov[1].iov_len = (set_Monitoring_header_pkt_cb) (&(connectbuf[con_id]->external_socketID), msg_type);
852
			}
853
			pkt_len = min(connectbuf[con_id]->pmtusize - iov[2].iov_len - iov[1].iov_len - iov[0].iov_len, msg_len - offset) ;
854

  
855
			iov[3].iov_len = pkt_len;
856
			iov[3].iov_base = msg + offset;
857

  
858
			//fill header
859
			msg_h.len_mon_packet_hdr = iov[1].iov_len;
860
			msg_h.offset = offset;
861
			msg_h.msg_length = truncable ? pkt_len : msg_len;
862
			
863
			//monitoring layer hook
864
			if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
865
				mon_pkt_inf pkt_info;
866

  
867
				pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
868
				pkt_info.buffer = msg + offset;
869
				pkt_info.bufSize = pkt_len;
870
				pkt_info.msgtype = msg_type;
871
				pkt_info.dataID = connectbuf[con_id]->seqnr;
872
				pkt_info.offset = offset;
873
				pkt_info.datasize = msg_len;
874
				pkt_info.monitoringHeaderLen = iov[1].iov_len;
875
				pkt_info.monitoringHeader = iov[1].iov_base;
876
				pkt_info.ttl = -1;
877
				memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
878
	
879
				(get_Send_pkt_inf_cb) ((void *) &pkt_info);
880
			}
881

  
882

  
883
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
884
				case MSGLEN:
885
					connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
886
					connectbuf[con_id]->delay = true;
887
					retry = true;
888
					offset = msg_len; // exit the while
889
					break;
890
				case FAILURE:
891
					offset = msg_len; // exit the while
892
					break;
893
				case OK:
894
					//update
895
					offset += pkt_len + iov[2].iov_len;
896
					//transmit data header only in the first packet
897
					iov[2].iov_len = 0;
898
					break;
899
			}
900
		} while(offset != msg_len + msg_h.len_mon_data_hdr && !truncable);
901
	} while(retry);
902
}
903

  
904
void send_conn_msg(int con_id, int buf_size, int command_type)
905
{
906
	send_params sparams;
907
	if(connectbuf[con_id]->ctrl_msg_buf == NULL)
908
		connectbuf[con_id]->ctrl_msg_buf = malloc(MAX);
909

  
910
	if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
911
		error("ML: can not allocate memory for connection message");
912
		return;
913
	}
914

  
915
	struct conn_msg *msg_header = (struct conn_msg*) connectbuf[con_id]->ctrl_msg_buf;
916

  
917
	msg_header->comand_type = command_type;
918
	msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
919

  
920
	memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
921

  
922
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &sparams);
923
}
924

  
925
void
926
recv_conn_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
927
{
928
	struct conn_msg *con_msg;
929
	int free_con_id, con_id;
930

  
931
	time_t now = time(NULL);
932
	double timediff = 0.0;
933

  
934
	// Monitoring layer hook
935
	if(get_Recv_data_inf_cb != NULL && msg_h->len_mon_data_hdr != 0) {
936
		// update pointer to the real data
937
		msgbuf += msg_h->len_mon_data_hdr;
938
		bufsize -= msg_h->len_mon_data_hdr;
939
		con_msg = (struct conn_msg *)msgbuf;
940

  
941
		mon_data_inf recv_data_inf;
942
		recv_data_inf.remote_socketID = &(con_msg->sock_id);
943
		recv_data_inf.buffer = msgbuf;
944
		recv_data_inf.bufSize = bufsize;
945
		recv_data_inf.msgtype = msg_h->msg_type;
946
		recv_data_inf.monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
947
		recv_data_inf.monitoringDataHeader = msg_h->len_mon_data_hdr ? msgbuf : NULL;
948
		gettimeofday(&recv_data_inf.arrival_time, NULL);
949
		recv_data_inf.firstPacketArrived = true;
950
		recv_data_inf.recvFragments = 1;
951
		recv_data_inf.priority = false;
952
		recv_data_inf.padding = false;
953
		recv_data_inf.confirmation = false;
954
		recv_data_inf.reliable = false;
955

  
956
		// send data recv callback to monitoring module
957
		(get_Recv_data_inf_cb) ((void *) &recv_data_inf);
958
	} else
959
		con_msg = (struct conn_msg *) msgbuf;
960

  
961
	// check the connection command type
962
	switch (con_msg->comand_type) {
963
		/*
964
		* if INVITE: enter a new socket make new entry in connect array
965
		* send an ok 
966
		*/
967
		case INVITE:
968
			debug("ML: received INVITE");
969
			/*
970
			* check if another connection for the external connectionID exist 
971
			* that was established within the last 2 seconds 
972
			*/
973
			free_con_id = -1;
974
			for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
975
				if (connectbuf[con_id] != NULL) {
976
					if (compare_socketIDs(&(connectbuf[con_id]->external_socketID), &(con_msg->sock_id)) == 0) {
977
						timediff = difftime(now, connectbuf[con_id]->starttime);
978
						if (timediff < 2)
979
							break;
980
					}
981
				} else if(free_con_id == -1)
982
					free_con_id = con_id;
983
			}
984

  
985
			if (con_id == CONNECTBUFSIZE) {
986
				// create an entry in the connecttrybuf
987
				if(free_con_id == -1) {
988
					error("ML: no new connect_buf available");
989
					return;
990
				}
991
				connectbuf[free_con_id] = (connect_data *) malloc(sizeof(connect_data));
992
				memset(connectbuf[free_con_id],0,sizeof(connect_data));
993
				connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
994
				connectbuf[free_con_id]->starttime = time(NULL);
995
				memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
996
				connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;
997
				connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
998
				connectbuf[free_con_id]->internal_connect =  
999
					!compare_external_address_socketIDs(&(con_msg->sock_id), loc_socketID);
1000
				con_id = free_con_id;
1001
			}
1002

  
1003
			if(connectbuf[con_id]->status <= CONNECT) {
1004
				//update status and send back answer
1005
				connectbuf[con_id]->status = CONNECT;
1006
				send_conn_msg(con_id, con_msg->pmtu_size, CONNECT);
1007
			}
1008
			break;
1009
		case CONNECT:
1010
			debug("ML: received CONNECT");
1011

  
1012
			if(msg_h->remote_con_id != -1 && connectbuf[msg_h->remote_con_id] == NULL) {
1013
				error("ML: received CONNECT for unexistent connection");
1014
				return;
1015
			}
1016

  
1017
			/*
1018
			* check if the connection status is not already 1 or 2 
1019
			*/
1020
			if (connectbuf[msg_h->remote_con_id]->status == INVITE) {
1021
				// set the external connectionID
1022
				connectbuf[msg_h->remote_con_id]->external_connectionID = msg_h->local_con_id;
1023
				// change status con_msg the connection_data
1024
				connectbuf[msg_h->remote_con_id]->status = READY;
1025
				// change pmtusize in the connection_data
1026
				connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
1027

  
1028
				// send the READY
1029
				send_conn_msg(msg_h->remote_con_id, con_msg->pmtu_size, READY);
1030

  
1031
				if (receive_Connection_cb != NULL)
1032
					(receive_Connection_cb) (msg_h->remote_con_id, NULL);
1033

  
1034
				// call all registered callbacks
1035
				while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
1036
					struct receive_connection_cb_list *temp;
1037
					temp = connectbuf[msg_h->remote_con_id]->connection_head;
1038
					(temp->connection_cb) (msg_h->remote_con_id, temp->arg);
1039
					connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
1040
					free(temp);
1041
				}
1042
				connectbuf[msg_h->remote_con_id]->connection_head =
1043
					connectbuf[msg_h->remote_con_id]->connection_last = NULL;
1044
			} else
1045
				// send the READY
1046
				send_conn_msg(msg_h->remote_con_id, con_msg->pmtu_size, READY);
1047

  
1048
			debug("ML: active connection established");
1049
			break;
1050

  
1051
			/*
1052
			* if READY: find the entry in the connection array set the
1053
			* connection active change the pmtu size 
1054
			*/
1055
		case READY:
1056
			debug("ML: received READY");
1057
			if(connectbuf[msg_h->remote_con_id] == NULL) {
1058
				error("ML: received READY for unexistent connection");
1059
				return;
1060
			}
1061
			/*
1062
			* checks if the connection is not already established 
1063
			*/
1064
			if (connectbuf[msg_h->remote_con_id]->status == CONNECT) {
1065
				// change status of the connection
1066
				connectbuf[msg_h->remote_con_id]->status = 2;
1067
				// change pmtusize
1068
				connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
1069

  
1070
				if (receive_Connection_cb != NULL)
1071
					(receive_Connection_cb) (msg_h->remote_con_id, NULL);
1072
			
1073
				while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
1074
					struct receive_connection_cb_list *temp;
1075
					temp = connectbuf[msg_h->remote_con_id]->connection_head;
1076
					(temp->connection_cb) (msg_h->remote_con_id, temp->arg);
1077
					connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
1078
					free(temp);
1079
				}
1080
				connectbuf[msg_h->remote_con_id]->connection_head =
1081
					connectbuf[msg_h->remote_con_id]->connection_last = NULL;
1082
				debug("ML: passiv connection established");
1083
			}
1084
			break;
1085
	}
1086
}
1087

  
1088
/*
1089
 * what to do once a packet arrived if it is a conn packet send it to
1090
 * recv_conn handler if it is a data packet send it to the recv_data
1091
 * handler 
1092
 */
1093

  
1094
//done --
1095
void recv_pkg(int fd, short event, void *arg)
1096
{
1097
	debug("ML: recv_pkg called");
1098

  
1099
	struct msg_header *msg_h;
1100
	char msgbuf[MAX];
1101
	char *bufptr = msgbuf;
1102
	int ttl;
1103
	struct sockaddr_in recv_addr;
1104
	pmtu recvSize = MAX;
1105
	int msg_size;
1106

  
1107
	recvPacket(fd, msgbuf, &recvSize, &recv_addr, pmtu_error_cb_th, &ttl);
1108

  
1109

  
1110
	// check if it is not just an ERROR message
1111
	if(recvSize < 0)
1112
		return;
1113

  
1114
	// @TODO check if this simplistic STUN message recognition really always works, probably not
1115
	unsigned short stun_bind_response = 0x0101;
1116
	unsigned short * msgspot = (unsigned short *) msgbuf;
1117
	if (*msgspot == stun_bind_response) {
1118
		debug("ML: recv_pkg: parse stun message called");
1119
		recv_stun_msg(msgbuf, recvSize);
1120
		return;
1121
	}
1122

  
1123
	msg_h = (struct msg_header *) msgbuf;
1124
	bufptr += MSG_HEADER_SIZE + msg_h->len_mon_packet_hdr;
1125
	msg_size = recvSize - MSG_HEADER_SIZE - msg_h->len_mon_packet_hdr;
1126

  
1127

  
1128
	if(get_Recv_pkt_inf_cb != NULL && msg_h->len_mon_packet_hdr != 0) {
1129
		mon_pkt_inf msginfNow;
1130
		msginfNow.monitoringHeaderLen = msg_h->len_mon_packet_hdr;
1131
		msginfNow.monitoringHeader = msg_h->len_mon_packet_hdr ? &msgbuf[0] + MSG_HEADER_SIZE : NULL;
1132
		//TODO rethink this ...
1133
		if(msg_h->msg_type == ML_CON_MSG) {
1134
			struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1135
			msginfNow.remote_socketID = &(c_msg->sock_id);
1136
		}
1137
		else if(connectbuf[msg_h->remote_con_id] == NULL) {
1138
			error("ML: received pkg called with non existent connection");
1139
			return;
1140
		} else
1141
			msginfNow.remote_socketID = &(connectbuf[msg_h->remote_con_id]->external_socketID);
1142
		msginfNow.buffer = bufptr;
1143
		msginfNow.bufSize = recvSize;
1144
		msginfNow.msgtype = msg_h->msg_type;
1145
		msginfNow.ttl = ttl;
1146
		msginfNow.dataID = msg_h->msg_seq_num;
1147
		msginfNow.offset = msg_h->offset;
1148
		msginfNow.datasize = msg_h->msg_length;
1149
		gettimeofday(&msginfNow.arrival_time, NULL);
1150
		(get_Recv_pkt_inf_cb) ((void *) &msginfNow);
1151
	}
1152

  
1153

  
1154
	switch(msg_h->msg_type) {
1155
		case ML_CON_MSG:
1156
			debug("ML: received conn pkg");
1157
			recv_conn_msg(msg_h, bufptr, msg_size);
1158
			break;
1159
		default:
1160
			if(msg_h->msg_type < 127) {
1161
				debug("ML: received data pkg");
1162
				recv_data_msg(msg_h, bufptr, msg_size);
1163
				break;
1164
			}
1165
			debug("ML: unrecognised msg_type");
1166
			break;
1167
	}
1168
}
1169

  
1170
void recv_stun_msg(char *msgbuf, int recvSize)
1171
{
1172
	/*
1173
	* create empty stun message struct 
1174
	*/
1175
	StunMessage resp;
1176
	memset(&resp, 0, sizeof(StunMessage));
1177
	/*
1178
	* parse the message 
1179
	*/
1180
	int returnValue = 0;
1181
	returnValue = recv_stun_message(msgbuf, recvSize, &resp);
1182

  
1183
	if (returnValue == 0) {
1184
		/*
1185
		* read the reflexive Address into the local_socketID 
1186
		*/
1187
		struct sockaddr_in reflexiveAddr;
1188
		reflexiveAddr.sin_family = AF_INET;
1189
		reflexiveAddr.sin_addr.s_addr = htonl(resp.mappedAddress.ipv4.addr);
1190
		reflexiveAddr.sin_port = htons(resp.mappedAddress.ipv4.port);
1191
		socketaddrgen reflexiveAddres;
1192
		reflexiveAddres.udpaddr = reflexiveAddr;
1193
		local_socketID.external_addr = reflexiveAddres;
1194
		NAT_traversal = true;
1195
		// callback to the upper layer indicating that the socketID is now
1196
		// ready to use
1197
		(receive_SocketID_cb) (&local_socketID, 0);
1198
	}
1199
}
1200

  
1201
// process a single recv data message
1202
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
1203
{
1204
	debug("transmissionHandler: received data message called");
1205

  
1206
	int recv_id, free_recv_id = -1;
1207

  
1208
	if(connectbuf[msg_h->remote_con_id] == NULL) {
1209
		debug("ML: Received a message not related to any opened connection!");
1210
		return;
1211
	}
1212

  
1213
	// check if a recv_data exist and enter data
1214
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
1215
		if (recvdatabuf[recv_id] != NULL) {
1216
			if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID && 
1217
					msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
1218
						break;
1219
		} else
1220
			if(free_recv_id == -1)
1221
				free_recv_id = recv_id;
1222

  
1223

  
1224
	if(recv_id == RECVDATABUFSIZE) { 
1225
		//no recv_data found: create one
1226
		recv_id = free_recv_id;
1227
		recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
1228
		memset(recvdatabuf[recv_id], 0, sizeof(recvdata));
1229
		recvdatabuf[recv_id]->connectionID = msg_h->remote_con_id;
1230
		recvdatabuf[recv_id]->seqnr = msg_h->msg_seq_num;
1231
		recvdatabuf[recv_id]->monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
1232
		recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
1233
		recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
1234
		/*
1235
		* read the timeout data and set it 
1236
		*/
1237
		recvdatabuf[recv_id]->timeout_value.tv_sec = recv_timeout.tv_sec;
1238
		recvdatabuf[recv_id]->timeout_value.tv_usec = recv_timeout.tv_usec;
1239
		recvdatabuf[recv_id]->recvID = recv_id;
1240
		recvdatabuf[recv_id]->starttime = time(NULL);
1241
		recvdatabuf[recv_id]->msgtype = msg_h->msg_type;
1242

  
1243
		// fill the buffer with zeros
1244
		memset(recvdatabuf[recv_id]->recvbuf, 0, msg_h->msg_length);
1245
	}
1246

  
1247
	if (msg_h->offset == 0)
1248
		recvdatabuf[recv_id]->firstPacketArrived = 1;
1249

  
1250

  
1251
	// increment fragmentnr
1252
	recvdatabuf[recv_id]->recvFragments++;
1253
	// increment the arrivedBytes
1254
	recvdatabuf[recv_id]->arrivedBytes += bufsize;
1255

  
1256
	// enter the data into the buffer
1257
	memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->offset, msgbuf, bufsize);
1258
	
1259
	//TODO very basic checkif all fragments arrived: has to be reviewed
1260
	if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize)
1261
		recvdatabuf[recv_id]->status = COMPLETE; //buffer full -> msg completly arrived
1262
	else
1263
		recvdatabuf[recv_id]->status = ACTIVE;
1264

  
1265
	if (recv_data_callback) {
1266
		if(recvdatabuf[recv_id]->status == COMPLETE) {
1267
			// Monitoring layer hook
1268
			if(get_Recv_data_inf_cb != NULL) {
1269
				mon_data_inf recv_data_inf;
1270
	
1271
				recv_data_inf.remote_socketID =
1272
					 &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1273
				recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
1274
				recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
1275
				recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
1276
				recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
1277
				recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
1278
					recvdatabuf[recv_id]->recvbuf : NULL;
1279
				gettimeofday(&recv_data_inf.arrival_time, NULL);
1280
				recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1281
				recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
1282
				recv_data_inf.priority = false;
1283
				recv_data_inf.padding = false;
1284
				recv_data_inf.confirmation = false;
1285
				recv_data_inf.reliable = false;
1286

  
1287
				// send data recv callback to monitoring module
1288

  
1289
				(get_Recv_data_inf_cb) ((void *) &recv_data_inf);
1290
			}
1291

  
1292
			// Get the right callback
1293
			receive_data_cb receive_data_callback = recvcbbuf[msg_h->msg_type];
1294
			if (receive_data_callback) {
1295
	
1296
				recv_params rParams;
1297
	
1298
				rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->arrivedBytes;
1299
				rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
1300
				rParams.msgtype = recvdatabuf[recv_id]->msgtype;
1301
				rParams.connectionID = recvdatabuf[recv_id]->connectionID;
1302
				rParams.remote_socketID = 
1303
					&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1304
				rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1305
	
1306
				(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1307
					recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1308
			} else {
1309
			    fprintf(stderr,"ML: Warning: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1310
			}
1311
			
1312
			//clean up
1313
			free(recvdatabuf[recv_id]->recvbuf);
1314
			free(recvdatabuf[recv_id]);
1315
			recvdatabuf[recv_id] = NULL;
1316
		} else { // not COMPLETE
1317
			//start time out
1318
			//TODO make timeout at least a DEFINE
1319
			struct timeval timeout = { 4, 0 };
1320
			event_base_once(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id, &timeout);
1321
		}
1322
	}
1323
}
1324

  
1325
//done
1326
void recv_timeout_cb(int fd, short event, void *arg)
1327
{
1328
	int recv_id = (long) arg;
1329
	debug("ML: recv_timeout_cb called");
1330

  
1331
	if (recvdatabuf[recv_id] == NULL) {
1332
		return;
1333
	}
1334

  
1335

  
1336
	if(recvdatabuf[recv_id]->status == ACTIVE) {
1337
		//TODO make timeout at least a DEFINE
1338
		struct timeval timeout = { 4, 0 };
1339
		recvdatabuf[recv_id]->status = INACTIVE;
1340
		event_base_once(base, -1, EV_TIMEOUT, recv_timeout_cb,
1341
			arg, &timeout);
1342
		return;
1343
	}
1344

  
1345
	if(recvdatabuf[recv_id]->status == INACTIVE) {
1346
		// Monitoring layer hook
1347
		if(get_Recv_data_inf_cb != NULL) {
1348
			mon_data_inf recv_data_inf;
1349

  
1350
			recv_data_inf.remote_socketID =
1351
					&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1352
			recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
1353
			recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
1354
			recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
1355
			recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
1356
			recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
1357
				recvdatabuf[recv_id]->recvbuf : NULL;
1358
			gettimeofday(&recv_data_inf.arrival_time, NULL);
1359
			recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1360
			recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
1361
			recv_data_inf.priority = false;
1362
			recv_data_inf.padding = false;
1363
			recv_data_inf.confirmation = false;
1364
			recv_data_inf.reliable = false;
1365

  
1366
			// send data recv callback to monitoring module
1367

  
1368
			(get_Recv_data_inf_cb) ((void *) &recv_data_inf);
1369
		}
1370

  
1371
		// Get the right callback
1372
		receive_data_cb receive_data_callback = recvcbbuf[recvdatabuf[recv_id]->msgtype];
1373

  
1374
		recv_params rParams;
1375

  
1376
		rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->arrivedBytes;
1377
		rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
1378
		rParams.msgtype = recvdatabuf[recv_id]->msgtype;
1379
		rParams.connectionID = recvdatabuf[recv_id]->connectionID;
1380
		rParams.remote_socketID = 
1381
			&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1382
		rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1383

  
1384
		(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1385
			recvdatabuf[recv_id]->msgtype, &rParams);
1386

  
1387
		//clean up
1388
		free(recvdatabuf[recv_id]->recvbuf);
1389
		free(recvdatabuf[recv_id]);
1390
		recvdatabuf[recv_id] = NULL;
1391
	}
1392
}
1393

  
1394
//done
1395
void pmtu_timeout_cb(int fd, short event, void *arg)
1396
{
1397
	debug("ML: pmtu timeout called");
1398

  
1399
	int con_id = (long) arg;
1400
	pmtu new_pmtusize;
1401
	struct timeval timeout;
1402
	
1403
	if(connectbuf[con_id] == NULL) {
1404
		error("ML: pmtu timeout called on non existing con_id");
1405
		return;
1406
	}
1407

  
1408
	if(connectbuf[con_id]->status == READY) {
1409
		// nothing to do anymore
1410
		return;
1411
	}
1412

  
1413
	timeout = connectbuf[con_id]->timeout_value;
1414

  
1415
	if(connectbuf[con_id]->delay || connectbuf[con_id]->trials == MAX_TRIALS - 1) {
1416
		double delay = timeout.tv_sec + timeout.tv_usec / 1000000.0;
1417
		delay = delay * 5;
1418
		timeout.tv_sec = floor(delay);
1419
		timeout.tv_usec = fmod(delay, 1.0) * 1000000.0;
1420
		if(connectbuf[con_id]->delay) {
1421
			connectbuf[con_id]->delay = false;
1422
			goto reschedule;
1423
		}
1424
	}
1425

  
1426
	if(connectbuf[con_id]->trials == MAX_TRIALS) {
1427
		// decrement the pmtu size
1428
		connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
1429
		connectbuf[con_id]->trials = 0;
1430
	}
1431

  
1432
	//error in PMTU discovery?
1433
	if (connectbuf[con_id]->pmtusize == ERROR) {
1434
		if (connectbuf[con_id]->internal_connect == true) {
1435
			//as of now we tried directly connecting, now let's try trough the NAT
1436
			connectbuf[con_id]->internal_connect = false;
1437
			connectbuf[con_id]->pmtusize = MAX;
1438
		} else {
1439
			//nothing to do we have to give up
1440
			error("ML: Could not create connection with connectionID %i!",con_id);
1441
			// envoke the callback for failed connection establishment
1442
			if(failed_Connection_cb != NULL)
1443
				(failed_Connection_cb) (con_id, NULL);
1444
			// delete the connection entry
1445
			close_connection(con_id);
1446
			return;
1447
		}
1448
	}
1449

  
1450
	//retry with new pmtu size
1451
	connectbuf[con_id]->trials++;
1452
	send_conn_msg(con_id, connectbuf[con_id]->pmtusize, connectbuf[con_id]->status);
1453

  
1454
reschedule:
1455
	/* reschedule */
1456
	event_base_once(base, -1, EV_TIMEOUT, pmtu_timeout_cb, (void *) (long)con_id, &timeout);
1457
}
1458

  
1459
/*
1460
 * decrements the mtu size
1461
 */
1462
pmtu pmtu_decrement(pmtu pmtusize)
1463
{
1464
	pmtu pmtu_return_size;
1465
	switch(pmtusize) {
1466
	case MAX:
1467
		return DSL;
1468
	case DSL:
1469
		return DSLMEDIUM;
1470
	case DSLMEDIUM:
1471
		return DSLSLIM;
1472
	case DSLSLIM:
1473
		return BELOWDSL;
1474
	case BELOWDSL:
1475
		return MIN;
1476
	default:
1477
		return ERROR;
1478
	}
1479
}
1480

  
1481
void pmtu_error_cb_th(char *msg, int msglen)
1482
{
1483
	debug("ML: pmtu_error callback called msg_size: %d",msglen);
1484
	//TODO debug
1485
	return;
1486

  
1487
    char *msgbufptr = NULL;
1488
    int msgtype;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff