Statistics
| Branch: | Revision:

ml / ml.c @ 66849ce7

History | View | Annotate | Download (72.4 KB)

1
/*
2
 *          Policy Management
3
 *
4
 *          NEC Europe Ltd. PROPRIETARY INFORMATION
5
 *
6
 * This software is supplied under the terms of a license agreement
7
 * or nondisclosure agreement with NEC Europe Ltd. and may not be
8
 * copied or disclosed except in accordance with the terms of that
9
 * agreement.
10
 *
11
 *      Copyright (c) 2009 NEC Europe Ltd. All Rights Reserved.
12
 *
13
 * Authors: Kristian Beckers  <beckers@nw.neclab.eu>
14
 *          Sebastian Kiesel  <kiesel@nw.neclab.eu>
15
 *          
16
 *
17
 * NEC Europe Ltd. DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR IMPLIED,
18
 * INCLUDING BUT NOT LIMITED TO IMPLIED WARRANTIES OF MERCHANTABILITY
19
 * AND FITNESS FOR A PARTICULAR PURPOSE AND THE WARRANTY AGAINST LATENT
20
 * DEFECTS, WITH RESPECT TO THE PROGRAM AND THE ACCOMPANYING
21
 * DOCUMENTATION.
22
 *
23
 * No Liability For Consequential Damages IN NO EVENT SHALL NEC Europe
24
 * Ltd., NEC Corporation OR ANY OF ITS SUBSIDIARIES BE LIABLE FOR ANY
25
 * DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS
26
 * OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF INFORMATION, OR
27
 * OTHER PECUNIARY LOSS AND INDIRECT, CONSEQUENTIAL, INCIDENTAL,
28
 * ECONOMIC OR PUNITIVE DAMAGES) ARISING OUT OF THE USE OF OR INABILITY
29
 * TO USE THIS PROGRAM, EVEN IF NEC Europe Ltd. HAS BEEN ADVISED OF THE
30
 * POSSIBILITY OF SUCH DAMAGES.
31
 *
32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33
 */
34

    
35
#include <stdlib.h>
36
#include <unistd.h>
37
#include <stdio.h>
38
#include <stddef.h>
39
#include <stdint.h>
40
#include <string.h>
41
#include <sys/types.h>
42
#include <time.h>
43
#include <math.h>
44
#include <assert.h>
45
#include <errno.h>
46

    
47
#ifndef WIN32
48
#include <arpa/inet.h>
49
#include <netdb.h>
50
#include <netinet/in.h>
51
#include <sys/socket.h>
52
#include <fcntl.h>
53
#else
54

    
55
#include <winsock2.h>
56
#include <ws2tcpip.h>
57
#endif
58

    
59
#include "util/udpSocket.h"
60
#include "util/stun.h"
61
#include "transmissionHandler.h"
62
#include "util/rateLimiter.h"
63
#include "util/queueManagement.h"
64

    
65
#define LOG_MODULE "[ml] "
66
#include "ml_log.h"
67

    
68
/**************************** START OF INTERNALS ***********************/
69

    
70

    
71
/*
72
 * reserved message type for internal puposes
73
 */
74
#define MSG_TYPE_ML_KEEPALIVE 0x126        //TODO: check that it is really interpreted as internal
75

    
76
/*
77
 * a pointer to a libevent instance
78
 */
79
struct event_base *base;
80

    
81
/*
82
 * define the nr of connections the messaging layer can handle
83
 */
84
#define CONNECTBUFSIZE 10000
85
/*
86
 * define the nr of data that can be received parallel
87
 */
88
#define RECVDATABUFSIZE 10000
89
/*
90
 * define an array for message multiplexing
91
 */
92
#define MSGMULTIPLEXSIZE 127
93

    
94

    
95
/*
96
 * timeout before thinking that the STUN server can't be connected
97
 */
98
#define NAT_TRAVERSAL_TIMEOUT { 1, 0 }
99

    
100
/*
101
 * timeout before thinking of an mtu problem (check MAX_TRIALS as well)
102
 */
103
#define PMTU_TIMEOUT { 0, 500000 }
104

    
105
/*
106
 * retry sending connection messages this many times before reducing pmtu
107
 */
108
#define MAX_TRIALS 3
109

    
110
/*
111
 * default timeout value between the first and the last received packet of a message
112
 */
113
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
114

    
115
#ifdef RTX
116
/*
117
 * default timeout value for a packet reception
118
 */
119
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
120

    
121
/*
122
 * default timeout value for a packet reception
123
 */
124
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
125

    
126
/*
127
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
128
 */
129
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
130

    
131
#endif
132

    
133

    
134
/*
135
 * global variables
136
 */
137

    
138
/*
139
 * define a buffer of pointers to connect structures
140
 */
141
connect_data *connectbuf[CONNECTBUFSIZE];
142

    
143
/*
144
 * define a pointer buffer with pointers to recv_data structures
145
 */
146
recvdata *recvdatabuf[RECVDATABUFSIZE];
147

    
148
/*
149
 * define a pointer buffer for message multiplexing
150
 */
151
receive_data_cb recvcbbuf[MSGMULTIPLEXSIZE];
152

    
153
/*
154
 * stun server address
155
 */
156
struct sockaddr_in stun_server;
157

    
158
/*
159
 * receive timeout
160
 */
161
static struct timeval recv_timeout = RECV_TIMEOUT_DEFAULT;
162

    
163
/*
164
 * boolean NAT traversal successful if true
165
 */
166
boolean NAT_traversal;
167

    
168
/*
169
 * file descriptor for local socket
170
 */
171
evutil_socket_t socketfd;
172

    
173
/*
174
 * local socketID
175
 */
176
socket_ID local_socketID;
177

    
178
socketID_handle loc_socketID = &local_socketID;
179

    
180
/*
181
 * callback function pointers
182
 */
183
/*
184
 * monitoring module callbacks
185
 */
186
get_recv_pkt_inf_cb get_Recv_pkt_inf_cb = NULL;
187
get_send_pkt_inf_cb get_Send_pkt_inf_cb = NULL;
188
set_monitoring_header_pkt_cb set_Monitoring_header_pkt_cb = NULL;
189
get_recv_data_inf_cb get_Recv_data_inf_cb = NULL;
190
get_send_data_inf_cb get_Send_data_inf_cb = NULL;
191
set_monitoring_header_data_cb set_Monitoring_header_data_cb = NULL;
192
/*
193
 * connection callbacks
194
 */
195
receive_connection_cb receive_Connection_cb = NULL;
196
connection_failed_cb failed_Connection_cb = NULL;
197
/*
198
 * local socketID callback
199
 */
200
receive_localsocketID_cb receive_SocketID_cb;
201

    
202
/*
203
 * boolean that defines if received data is transmitted to the upper layer
204
 * via callback or via upper layer polling
205
 */
206
boolean recv_data_callback;
207

    
208
/*
209
 * helper function to get rid of a warning
210
 */
211
#ifndef WIN32
212
int min(int a, int b) {
213
        if (a > b) return b;
214
        return a;
215
}
216
#endif
217

    
218
#ifdef RTX
219
//*********Counters**********
220

    
221
struct Counters {
222
        unsigned int receivedCompleteMsgCounter;
223
        unsigned int receivedIncompleteMsgCounter;
224
        unsigned int receivedDataPktCounter;
225
        unsigned int receivedRTXDataPktCounter;
226
        unsigned int receivedNACK1PktCounter;
227
        unsigned int receivedNACKMorePktCounter;
228
        unsigned int sentDataPktCounter;
229
        unsigned int sentRTXDataPktCtr;
230
        unsigned int sentNACK1PktCounter;
231
        unsigned int sentNACKMorePktCounter;
232
} counters;
233

    
234
extern unsigned int sentRTXDataPktCounter;
235

    
236
/*
237
 * receive timeout for a packet
238
 */
239
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
240

    
241

    
242
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
243

    
244
void mlShowCounters() {
245
        counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
246
        fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
247
        return;
248
}
249

    
250
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
251
{
252
        struct nack_msg *nackmsg;
253
        
254
        msgbuf += msg_h->len_mon_data_hdr;
255
        msg_size -= msg_h->len_mon_data_hdr;
256
        nackmsg = (struct nack_msg*) msgbuf;
257
        
258
        unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
259
        //if (gapSize == 1349) counters.receivedNACK1PktCounter++;
260
        //else counters.receivedNACKMorePktCounter++;
261

    
262
        rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);        
263
}
264

    
265
void pkt_recv_timeout_cb(int fd, short event, void *arg){
266
        int recv_id = (long) arg;
267
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
268

    
269
        //check if message still exists        
270
        if (recvdatabuf[recv_id] == NULL) return;
271

    
272
        //check if gap was filled in the meantime
273
        if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
274
                recvdatabuf[recv_id]->firstGap++;
275
                return;        
276
        }
277

    
278
        struct nack_msg nackmsg;
279
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
280
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
281
        nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
282
        nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
283
        recvdatabuf[recv_id]->firstGap++;
284

    
285
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
286
        //if (gapSize == 1349) counters.sentNACK1PktCounter++;
287
        //else counters.sentNACKMorePktCounter++;
288

    
289
        //fprintf(stderr,"Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
290

    
291
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
292
}
293

    
294
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
295
        int recv_id = (long) arg;
296
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
297

    
298
        if (recvdatabuf[recv_id] == NULL) {
299
                //fprintf(stderr,"Called last_pkt_recv_timeout_cb but there is no slot\n");
300
                return;
301
        }
302

    
303
        //fprintf(stderr,"Starting last_pkt_recv_timeout_cb for msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);
304

    
305
        if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
306

    
307
        struct nack_msg nackmsg;
308
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
309
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
310
        nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
311
        nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
312

    
313
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
314
        //if (gapSize == 1349) counters.sentNACK1PktCounter++;
315
        //else counters.sentNACKMorePktCounter++;        
316

    
317
        //fprintf(stderr,"last_pkt - Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
318

    
319
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
320
}
321

    
322
#endif
323

    
324

    
325

    
326
/*
327
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
328
 */
329
const char *conid_to_string(int con_id)
330
{
331
        static char s[INET_ADDRSTRLEN+1+5+1+INET_ADDRSTRLEN+1+5+1];
332
        mlSocketIDToString(&connectbuf[con_id]->external_socketID, s, sizeof(s));
333
        return s;
334
}
335

    
336
void register_recv_localsocketID_cb(receive_localsocketID_cb local_socketID_cb)
337
{
338
        if (local_socketID_cb == NULL) {
339
                error("ML : Register receive_localsocketID_cb: NULL ptr \n");
340
        } else {
341
                receive_SocketID_cb = local_socketID_cb;
342
        }
343
}
344

    
345

    
346
//void keep_connection_alive(const int connectionID)
347
//{
348
//
349
//    // to be done with the NAT traversal
350
//    // send a message over the wire
351
//    printf("\n");
352
//
353
//}
354

    
355
void unsetStunServer()
356
{
357
        stun_server.sin_addr.s_addr = INADDR_NONE;
358
}
359

    
360
bool isStunDefined()
361
{
362
        return stun_server.sin_addr.s_addr != INADDR_NONE;
363
}
364

    
365
void send_msg(int con_id, int msg_type, char* msg, int msg_len, bool truncable, send_params * sParams) {
366
        socketaddrgen udpgen;
367
        bool retry;
368
        int pkt_len, offset;
369
        struct iovec iov[4];
370

    
371
        char h_pkt[MON_PKT_HEADER_SPACE];
372
        char h_data[MON_DATA_HEADER_SPACE];
373

    
374
        struct msg_header msg_h;
375

    
376
        debug("ML: send_msg to %s conID:%d extID:%d\n", conid_to_string(con_id), con_id, connectbuf[con_id]->external_connectionID);
377

    
378
        iov[0].iov_base = &msg_h;
379
        iov[0].iov_len = MSG_HEADER_SIZE;
380

    
381
        msg_h.local_con_id = htonl(con_id);
382
        msg_h.remote_con_id = htonl(connectbuf[con_id]->external_connectionID);
383
        msg_h.msg_type = msg_type;
384
        msg_h.msg_seq_num = htonl(connectbuf[con_id]->seqnr++);
385

    
386

    
387
        iov[1].iov_len = iov[2].iov_len = 0;
388
        iov[1].iov_base = h_pkt;
389
        iov[2].iov_base = h_data;
390

    
391

    
392
        if (connectbuf[con_id]->internal_connect)
393
                udpgen = connectbuf[con_id]->external_socketID.internal_addr;
394
        else
395
                udpgen = connectbuf[con_id]->external_socketID.external_addr;
396

    
397
        do{
398
                offset = 0;
399
                retry = false;
400
                // Monitoring layer hook
401
                if(set_Monitoring_header_data_cb != NULL) {
402
                        iov[2].iov_len = ((set_Monitoring_header_data_cb) (&(connectbuf[con_id]->external_socketID), msg_type));
403
                }
404
                msg_h.len_mon_data_hdr = iov[2].iov_len;
405

    
406
                if(get_Send_data_inf_cb != NULL && iov[2].iov_len != 0) {
407
                        mon_data_inf sd_data_inf;
408

    
409
                        memset(h_data, 0, MON_DATA_HEADER_SPACE);
410

    
411
                        sd_data_inf.remote_socketID = &(connectbuf[con_id]->external_socketID);
412
                        sd_data_inf.buffer = msg;
413
                        sd_data_inf.bufSize = msg_len;
414
                        sd_data_inf.msgtype = msg_type;
415
                        sd_data_inf.monitoringDataHeader = iov[2].iov_base;
416
                        sd_data_inf.monitoringDataHeaderLen = iov[2].iov_len;
417
                        sd_data_inf.priority = sParams->priority;
418
                        sd_data_inf.padding = sParams->padding;
419
                        sd_data_inf.confirmation = sParams->confirmation;
420
                        sd_data_inf.reliable = sParams->reliable;
421
                        memset(&sd_data_inf.arrival_time, 0, sizeof(struct timeval));
422

    
423
                        (get_Send_data_inf_cb) ((void *) &sd_data_inf);
424
                }
425

    
426
                do {
427
                        if(set_Monitoring_header_pkt_cb != NULL) {
428
                                iov[1].iov_len = (set_Monitoring_header_pkt_cb) (&(connectbuf[con_id]->external_socketID), msg_type);
429
                        }
430
                        pkt_len = min(connectbuf[con_id]->pmtusize - iov[2].iov_len - iov[1].iov_len - iov[0].iov_len, msg_len - offset) ;
431

    
432
                        iov[3].iov_len = pkt_len;
433
                        iov[3].iov_base = msg + offset;
434

    
435
                        //fill header
436
                        msg_h.len_mon_packet_hdr = iov[1].iov_len;
437
                        msg_h.offset = htonl(offset);
438
                        msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
439

    
440
                        //monitoring layer hook
441
                        if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
442
                                mon_pkt_inf pkt_info;
443

    
444
                                memset(h_pkt,0,MON_PKT_HEADER_SPACE);
445

    
446
                                pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
447
                                pkt_info.buffer = msg + offset;
448
                                pkt_info.bufSize = pkt_len;
449
                                pkt_info.msgtype = msg_type;
450
                                pkt_info.dataID = connectbuf[con_id]->seqnr;
451
                                pkt_info.offset = offset;
452
                                pkt_info.datasize = msg_len;
453
                                pkt_info.monitoringHeaderLen = iov[1].iov_len;
454
                                pkt_info.monitoringHeader = iov[1].iov_base;
455
                                pkt_info.ttl = -1;
456
                                memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
457

    
458
                                (get_Send_pkt_inf_cb) ((void *) &pkt_info);
459
                        }
460

    
461
                        debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
462
                        int priority = 0; 
463
                        if ((msg_type == ML_CON_MSG)
464
#ifdef RTX
465
 || (msg_type == ML_NACK_MSG)
466
#endif
467
) priority = HP;
468
                        //fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
469
                        switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
470
                                case MSGLEN:
471
                                        info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
472
                                        // TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
473
                                        connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
474
                                        if (connectbuf[con_id]->pmtusize > 0) {
475
                                                connectbuf[con_id]->delay = true;
476
                                                retry = true;
477
                                        }
478
                                        offset = msg_len; // exit the while
479
                                        break;
480
                                case FAILURE:
481
                                        info("ML: sending message failed (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
482
                                        offset = msg_len; // exit the while
483
                                        break;
484
                                case THROTTLE:
485
                                    //    debug("THROTTLE on output"); 
486
                                        offset = msg_len; // exit the while
487
                                        break;
488
                                case OK:
489
#ifdef RTX
490
                                        if (msg_type < 127) counters.sentDataPktCounter++;
491
#endif
492
                                        //update
493
                                        offset += pkt_len;
494
                                        //transmit data header only in the first packet
495
                                        iov[2].iov_len = 0;
496
                                        break;
497
                        }
498
                } while(offset != msg_len && !truncable);
499
        } while(retry);
500
        //fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
501
        //fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
502
}
503

    
504
void pmtu_timeout_cb(int fd, short event, void *arg);
505

    
506
void reschedule_conn_msg(int con_id)
507
{
508
        if (connectbuf[con_id]->timeout_event) {
509
                /* delete old timout */        
510
                event_del(connectbuf[con_id]->timeout_event);
511
                event_free(connectbuf[con_id]->timeout_event);
512
        }
513
        connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
514
        evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
515
}
516

    
517
void send_conn_msg(int con_id, int buf_size, int command_type)
518
{
519
        if (buf_size < sizeof(struct conn_msg)) {
520
                error("ML: requested connection message size is too small\n");
521
                return;
522
        }
523

    
524
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
525
                connectbuf[con_id]->ctrl_msg_buf = malloc(buf_size);
526
                memset(connectbuf[con_id]->ctrl_msg_buf, 0, buf_size);
527
        }
528

    
529
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
530
                error("ML: can not allocate memory for connection message\n");
531
                return;
532
        }
533

    
534
        struct conn_msg *msg_header = (struct conn_msg*) connectbuf[con_id]->ctrl_msg_buf;
535

    
536
        msg_header->comand_type = command_type;
537
        msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
538

    
539
        memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
540
  {
541
                        char buf[SOCKETID_STRING_SIZE];
542
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
543
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
544
   }
545
        send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
546
}
547

    
548
void send_conn_msg_with_pmtu_discovery(int con_id, int buf_size, int command_type)
549
{
550
        struct timeval tout = PMTU_TIMEOUT;
551
        connectbuf[con_id]->timeout_value = tout;
552
        connectbuf[con_id]->trials = 1;
553
        send_conn_msg(con_id, buf_size, command_type);
554
        reschedule_conn_msg(con_id);
555
}
556

    
557
void resend_conn_msg(int con_id)
558
{
559
        connectbuf[con_id]->trials++;
560
        send_conn_msg(con_id, connectbuf[con_id]->pmtusize, connectbuf[con_id]->status);
561
        reschedule_conn_msg(con_id);
562
}
563

    
564
void recv_conn_msg(struct msg_header *msg_h, char *msgbuf, int msg_size, struct sockaddr_in *recv_addr)
565
{
566
        struct conn_msg *con_msg;
567
        int free_con_id, con_id;
568

    
569
        time_t now = time(NULL);
570
        double timediff = 0.0;
571
        char sock_id_str[1000];
572
        
573
        msgbuf += msg_h->len_mon_data_hdr;
574
        msg_size -= msg_h->len_mon_data_hdr;
575
        con_msg = (struct conn_msg *)msgbuf;
576
        
577
        //verify message validity
578
        if (msg_size < sizeof(struct conn_msg)) {
579
                char recv_addr_str[INET_ADDRSTRLEN];
580
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
581
                info("Invalid conn_msg received from %s\n", recv_addr_str);
582
                return;
583
        }
584

    
585
        //decode sock_id for debug messages
586
        mlSocketIDToString(&con_msg->sock_id,sock_id_str,999);
587

    
588
        if (con_msg->sock_id.internal_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr &&
589
            con_msg->sock_id.external_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr   ) {
590
                char recv_addr_str[INET_ADDRSTRLEN];
591
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
592
                info("Conn msg received from %s, but claims to be from %s", recv_addr_str, sock_id_str);
593
                return;
594
        }
595

    
596
        // Monitoring layer hook
597
        if(get_Recv_data_inf_cb != NULL) {
598
                // update pointer to the real data
599
                mon_data_inf recv_data_inf;
600
                recv_data_inf.remote_socketID = &(con_msg->sock_id);
601
                recv_data_inf.buffer = msgbuf;
602
                recv_data_inf.bufSize = msg_size;
603
                recv_data_inf.msgtype = msg_h->msg_type;
604
                recv_data_inf.monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
605
                recv_data_inf.monitoringDataHeader = msg_h->len_mon_data_hdr ? msgbuf : NULL;
606
                gettimeofday(&recv_data_inf.arrival_time, NULL);
607
                recv_data_inf.firstPacketArrived = true;
608
                recv_data_inf.recvFragments = 1;
609
                recv_data_inf.priority = false;
610
                recv_data_inf.padding = false;
611
                recv_data_inf.confirmation = false;
612
                recv_data_inf.reliable = false;
613

    
614
                // send data recv callback to monitoring module
615
                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
616
        }
617

    
618
        // check the connection command type
619
        switch (con_msg->comand_type) {
620
                /*
621
                * if INVITE: enter a new socket make new entry in connect array
622
                * send an ok
623
                */
624
                case INVITE:
625
                        info("ML: received INVITE from %s (size:%d)\n", sock_id_str, msg_size);
626
                        /*
627
                        * check if another connection for the external connectionID exist
628
                        * that was established within the last 2 seconds
629
                        */
630
                        free_con_id = -1;
631
                        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
632
                                if (connectbuf[con_id] != NULL) {
633
                                        if (mlCompareSocketIDs(&(connectbuf[con_id]->external_socketID), &(con_msg->sock_id)) == 0) {
634
                                                //timediff = difftime(now, connectbuf[con_id]->starttime);        //TODO: why this timeout? Shouldn't the connection be closed instead if there is a timeout?
635
                                                //if (timediff < 2)
636
                                                //update remote connection ID
637
                                                if (connectbuf[con_id]->external_connectionID != msg_h->local_con_id) {
638
                                                        warn("ML: updating remote connection ID for %s: from %d to %d\n",sock_id_str, connectbuf[con_id]->external_connectionID, msg_h->local_con_id);
639
                                                        connectbuf[con_id]->external_connectionID = msg_h->local_con_id;
640
                                                }
641
                                                break;
642
                                        }
643
                                } else if(free_con_id == -1)
644
                                        free_con_id = con_id;
645
                        }
646

    
647
                        if (con_id == CONNECTBUFSIZE) {
648
                                // create an entry in the connecttrybuf
649
                                if(free_con_id == -1) {
650
                                        error("ML: no new connect_buf available\n");
651
                                        return;
652
                                }
653
                                connectbuf[free_con_id] = (connect_data *) malloc(sizeof(connect_data));
654
                                memset(connectbuf[free_con_id],0,sizeof(connect_data));
655
                                connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
656
                                connectbuf[free_con_id]->starttime = time(NULL);
657
                                memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
658
                //Workaround to support reuse of socketID
659
                                connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
660
                                connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
661
                                connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;        // bootstrap pmtu from the other's size. Not strictly needed, but a good hint
662
                                connectbuf[free_con_id]->timeout_event = NULL;
663
                                connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
664
                                connectbuf[free_con_id]->internal_connect =
665
                                        !compare_external_address_socketIDs(&(con_msg->sock_id), loc_socketID);
666
                                con_id = free_con_id;
667
                        }
668

    
669
                        //if(connectbuf[con_id]->status <= CONNECT) { //TODO: anwer anyway. Why the outher would invite otherwise?
670
                                //update status and send back answer
671
                                connectbuf[con_id]->status = CONNECT;
672
                                send_conn_msg_with_pmtu_discovery(con_id, con_msg->pmtu_size, CONNECT);
673
                        //}
674
                        break;
675
                case CONNECT:
676
                        info("ML: received CONNECT from %s (size:%d)\n", sock_id_str, msg_size);
677

    
678
                        if(msg_h->remote_con_id != -1 && connectbuf[msg_h->remote_con_id] == NULL) {
679
                                error("ML: received CONNECT for inexistent connection rconID:%d\n",msg_h->remote_con_id);
680
                                return;
681
                        }
682

    
683
                        /*
684
                        * check if the connection status is not already 1 or 2
685
                        */
686
                        if (connectbuf[msg_h->remote_con_id]->status == INVITE) {
687
                                // set the external connectionID
688
                                connectbuf[msg_h->remote_con_id]->external_connectionID = msg_h->local_con_id;
689
                                // change status con_msg the connection_data
690
                                connectbuf[msg_h->remote_con_id]->status = READY;
691
                                // change pmtusize in the connection_data: not needed. receiving a CONNECT means our INVITE went through. So why change pmtu?
692
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
693

    
694
                                // send the READY
695
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
696

    
697
                                if (receive_Connection_cb != NULL)
698
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
699

    
700
                                // call all registered callbacks
701
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
702
                                        struct receive_connection_cb_list *temp;
703
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
704
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
705
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
706
                                        free(temp);
707
                                }
708
                                connectbuf[msg_h->remote_con_id]->connection_head =
709
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
710
                        } else {
711
                                // send the READY
712
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
713
                        }
714

    
715
                        debug("ML: active connection established\n");
716
                        break;
717

    
718
                        /*
719
                        * if READY: find the entry in the connection array set the
720
                        * connection active change the pmtu size
721
                        */
722
                case READY:
723
                        info("ML: received READY from %s (size:%d)\n", sock_id_str, msg_size);
724
                        if(connectbuf[msg_h->remote_con_id] == NULL) {
725
                                error("ML: received READY for inexistent connection\n");
726
                                return;
727
                        }
728
                        /*
729
                        * checks if the connection is not already established
730
                        */
731
                        if (connectbuf[msg_h->remote_con_id]->status == CONNECT) {
732
                                // change status of the connection
733
                                connectbuf[msg_h->remote_con_id]->status = READY;
734
                                // change pmtusize: not needed. pmtu doesn't have to be symmetric
735
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
736

    
737
                                if (receive_Connection_cb != NULL)
738
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
739

    
740
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
741
                                        struct receive_connection_cb_list *temp;
742
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
743
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
744
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
745
                                        free(temp);
746
                                }
747
                                connectbuf[msg_h->remote_con_id]->connection_head =
748
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
749
                                debug("ML: passive connection established\n");
750
                        }
751
                        break;
752
        }
753
}
754

    
755
void recv_stun_msg(char *msgbuf, int recvSize)
756
{
757
        /*
758
        * create empty stun message struct
759
        */
760
        StunMessage resp;
761
        memset(&resp, 0, sizeof(StunMessage));
762
        /*
763
        * parse the message
764
        */
765
        int returnValue = 0;
766
        returnValue = recv_stun_message(msgbuf, recvSize, &resp);
767

    
768
        if (returnValue == 0) {
769
                /*
770
                * read the reflexive Address into the local_socketID
771
                */
772
                struct sockaddr_in reflexiveAddr = {0};
773
                reflexiveAddr.sin_family = AF_INET;
774
                reflexiveAddr.sin_addr.s_addr = htonl(resp.mappedAddress.ipv4.addr);
775
                reflexiveAddr.sin_port = htons(resp.mappedAddress.ipv4.port);
776
                socketaddrgen reflexiveAddres = {0};
777
                reflexiveAddres.udpaddr = reflexiveAddr;
778
                local_socketID.external_addr = reflexiveAddres;
779
                NAT_traversal = true;
780
                // callback to the upper layer indicating that the socketID is now
781
                // ready to use
782
                {
783
                        char buf[SOCKETID_STRING_SIZE];
784
                        mlSocketIDToString(&local_socketID,buf,sizeof(buf));
785
                         debug("received local socket_address: %s\n", buf);
786
                }
787
                (receive_SocketID_cb) (&local_socketID, 0);
788
        }
789
}
790

    
791
//done
792
void recv_timeout_cb(int fd, short event, void *arg)
793
{
794
        int recv_id = (long) arg;
795
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
796

    
797
        if (recvdatabuf[recv_id] == NULL) {
798
                return;
799
        }
800

    
801

    
802
/*        if(recvdatabuf[recv_id]->status == ACTIVE) {
803
                //TODO make timeout at least a DEFINE
804
                struct timeval timeout = { 4, 0 };
805
                recvdatabuf[recv_id]->status = INACTIVE;
806
                event_base_once(base, -1, EV_TIMEOUT, recv_timeout_cb,
807
                        arg, &timeout);
808
                return;
809
        }
810
*/
811

    
812
        if(recvdatabuf[recv_id]->status == ACTIVE) {
813
                // Monitoring layer hook
814
                if(get_Recv_data_inf_cb != NULL) {
815
                        mon_data_inf recv_data_inf;
816

    
817
                        recv_data_inf.remote_socketID =
818
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
819
                        recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
820
                        recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
821
                        recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
822
                        recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
823
                        recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
824
                                recvdatabuf[recv_id]->recvbuf : NULL;
825
                        gettimeofday(&recv_data_inf.arrival_time, NULL);
826
                        recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
827
                        recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
828
                        recv_data_inf.priority = false;
829
                        recv_data_inf.padding = false;
830
                        recv_data_inf.confirmation = false;
831
                        recv_data_inf.reliable = false;
832

    
833
                        // send data recv callback to monitoring module
834

    
835
//                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
836
                }
837

    
838
                // Get the right callback
839
                receive_data_cb receive_data_callback = recvcbbuf[recvdatabuf[recv_id]->msgtype];
840

    
841
                recv_params rParams;
842

    
843
                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->arrivedBytes;
844
                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
845
                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
846
                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
847
                rParams.remote_socketID =
848
                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
849
                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
850

    
851
#ifdef RTX
852
                counters.receivedIncompleteMsgCounter++;
853
                //mlShowCounters();
854
                //fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);                
855
#endif
856
                 //(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
857

    
858
                //clean up
859
                if (recvdatabuf[recv_id]->timeout_event) {
860
                        event_del(recvdatabuf[recv_id]->timeout_event);
861
                        event_free(recvdatabuf[recv_id]->timeout_event);
862
                        recvdatabuf[recv_id]->timeout_event = NULL;
863
                }
864
                free(recvdatabuf[recv_id]->recvbuf);
865
                free(recvdatabuf[recv_id]);
866
                recvdatabuf[recv_id] = NULL;
867
        }
868
}
869

    
870
// process a single recv data message
871
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
872
{
873
        debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset);
874

    
875
        int recv_id, free_recv_id = -1;
876

    
877
        if(connectbuf[msg_h->remote_con_id] == NULL) {
878
                debug("ML: Received a message not related to any opened connection!\n");
879
                return;
880
        }
881

    
882
#ifdef RTX
883
        counters.receivedDataPktCounter++;
884
#endif        
885
        // check if a recv_data exist and enter data
886
        for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
887
                if (recvdatabuf[recv_id] != NULL) {
888
                        if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID &&
889
                                        msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
890
                                                break;
891
                } else
892
                        if(free_recv_id == -1)
893
                                free_recv_id = recv_id;
894

    
895

    
896
        if(recv_id == RECVDATABUFSIZE) {
897
                //no recv_data found: create one
898
                recv_id = free_recv_id;
899
                recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
900
                memset(recvdatabuf[recv_id], 0, sizeof(recvdata));
901
                recvdatabuf[recv_id]->connectionID = msg_h->remote_con_id;
902
                recvdatabuf[recv_id]->seqnr = msg_h->msg_seq_num;
903
                recvdatabuf[recv_id]->monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
904
                recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
905
                recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
906
                recvdatabuf[recv_id]->arrivedBytes = 0;        //count this without the Mon headers
907
#ifdef RTX
908
                recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
909
                recvdatabuf[recv_id]->expectedOffset = 0;
910
                recvdatabuf[recv_id]->gapCounter = 0;
911
                recvdatabuf[recv_id]->firstGap = 0;
912
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
913
#endif
914

    
915
                /*
916
                * read the timeout data and set it
917
                */
918
                recvdatabuf[recv_id]->timeout_value = recv_timeout;
919
                recvdatabuf[recv_id]->timeout_event = NULL;
920
                recvdatabuf[recv_id]->recvID = recv_id;
921
                recvdatabuf[recv_id]->starttime = time(NULL);
922
                recvdatabuf[recv_id]->msgtype = msg_h->msg_type;
923

    
924
                // fill the buffer with zeros
925
                memset(recvdatabuf[recv_id]->recvbuf, 0, recvdatabuf[recv_id]->bufsize);
926
                debug(" new @ id:%d\n",recv_id);
927
        } else {        //message structure already exists, no need to create new
928
                debug(" found @ id:%d (arrived before this packet: bytes:%d fragments%d\n",recv_id, recvdatabuf[recv_id]->arrivedBytes, recvdatabuf[recv_id]->recvFragments);
929
        }
930

    
931
        //if first packet extract mon data header and advance pointer
932
        if (msg_h->offset == 0) {
933
                //fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
934
                memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
935
                msgbuf += msg_h->len_mon_data_hdr;
936
                bufsize -= msg_h->len_mon_data_hdr;
937
                recvdatabuf[recv_id]->firstPacketArrived = 1;
938
        }
939

    
940

    
941
        // increment fragmentnr
942
        recvdatabuf[recv_id]->recvFragments++;
943
        // increment the arrivedBytes
944
        recvdatabuf[recv_id]->arrivedBytes += bufsize; 
945

    
946
        //fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
947

    
948
        // enter the data into the buffer
949
        memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
950
#ifdef RTX
951
        // detecting a new gap        
952
        if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
953
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
954
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
955
                if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
956
                evtimer_add(event_new(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id), &pkt_recv_timeout);
957
        }
958
        
959
        //filling the gap by delayed packets
960
        if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
961
                counters.receivedRTXDataPktCounter++;
962
                //skip retransmitted packets
963
                if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
964
                        int i;
965
                        //fprintf(stderr,"firstGap: %d        gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
966
                        for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
967
                                if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
968
                                        recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
969
                                        break;
970
                                }
971
                                if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
972
                                        recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
973
                                        break;
974
                                }
975
                        }
976
                } else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
977
                        //counters.receivedRTXDataPktCounter++;
978
                        }
979
        }
980

    
981
        //updating the expectedOffset        
982
        if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
983
#endif
984

    
985
        //TODO very basic checkif all fragments arrived: has to be reviewed
986
        if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen)
987
                recvdatabuf[recv_id]->status = COMPLETE; //buffer full -> msg completly arrived
988
        else
989
                recvdatabuf[recv_id]->status = ACTIVE;
990

    
991
        if (recv_data_callback) {
992
                if(recvdatabuf[recv_id]->status == COMPLETE) {
993
                        // Monitoring layer hook
994
                        if(get_Recv_data_inf_cb != NULL) {
995
                                mon_data_inf recv_data_inf;
996

    
997
                                recv_data_inf.remote_socketID =
998
                                         &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
999
                                recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
1000
                                recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
1001
                                recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
1002
                                recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
1003
                                recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
1004
                                        recvdatabuf[recv_id]->recvbuf : NULL;
1005
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
1006
                                recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1007
                                recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
1008
                                recv_data_inf.priority = false;
1009
                                recv_data_inf.padding = false;
1010
                                recv_data_inf.confirmation = false;
1011
                                recv_data_inf.reliable = false;
1012

    
1013
                                // send data recv callback to monitoring module
1014

    
1015
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
1016
                        }
1017

    
1018
                        // Get the right callback
1019
                        receive_data_cb receive_data_callback = recvcbbuf[msg_h->msg_type];
1020
                        if (receive_data_callback) {
1021

    
1022
                                recv_params rParams;
1023

    
1024
                                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen - recvdatabuf[recv_id]->arrivedBytes;
1025
                                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
1026
                                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
1027
                                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
1028
                                rParams.remote_socketID =
1029
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1030

    
1031
                                char str[1000];
1032
                                mlSocketIDToString(rParams.remote_socketID,str,999);
1033
                                debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
1034
                                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1035

    
1036
#ifdef RTX
1037
                                counters.receivedCompleteMsgCounter++;
1038
                                //mlShowCounters();
1039
#endif
1040

    
1041
                                (receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1042
                                        recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1043
                        } else {
1044
                            warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1045
                        }
1046
                        
1047
                        //clean up
1048
                        if (recvdatabuf[recv_id]->timeout_event) {
1049
                                debug("ML: freeing timeout for %d\n",recv_id);
1050
                                event_del(recvdatabuf[recv_id]->timeout_event);
1051
                                event_free(recvdatabuf[recv_id]->timeout_event);
1052
                                recvdatabuf[recv_id]->timeout_event = NULL;
1053
                        } else {
1054
                                debug("ML: received in 1 packet\n",recv_id);
1055
                        }
1056
#ifdef RTX
1057
                        if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1058
                                debug("ML: freeing last packet timeout for %d",recv_id);
1059
                                event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1060
                                event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1061
                                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1062
                        }
1063
                        //fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);        
1064
#endif
1065
                        free(recvdatabuf[recv_id]->recvbuf);
1066
                        free(recvdatabuf[recv_id]);
1067
                        recvdatabuf[recv_id] = NULL;
1068
                } else { // not COMPLETE
1069
                        if (!recvdatabuf[recv_id]->timeout_event) {
1070
                                //start time out
1071
                                //TODO make timeout at least a DEFINE
1072
                                recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
1073
                                evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1074
#ifdef RTX
1075
                                recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1076
                                evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1077
#endif
1078
                        }
1079
                }
1080
        }
1081
}
1082

    
1083
//done
1084
void pmtu_timeout_cb(int fd, short event, void *arg)
1085
{
1086

    
1087
        int con_id = (long) arg;
1088
        pmtu new_pmtusize;
1089

    
1090
        debug("ML: pmtu timeout called (lcon:%d)\n",con_id);
1091

    
1092
        if(connectbuf[con_id] == NULL) {
1093
                error("ML: pmtu timeout called on non existing con_id\n");
1094
                return;
1095
        }
1096

    
1097
        if(connectbuf[con_id]->status == READY) {
1098
                // nothing to do anymore
1099
                event_del(connectbuf[con_id]->timeout_event);
1100
                event_free(connectbuf[con_id]->timeout_event);
1101
                connectbuf[con_id]->timeout_event = NULL;
1102
                return;
1103
        }
1104

    
1105
        info("ML: pmtu timeout while connecting(to:%s lcon:%d status:%d size:%d trial:%d tout:%ld.%06ld)\n",conid_to_string(con_id), con_id, connectbuf[con_id]->status, connectbuf[con_id]->pmtusize, connectbuf[con_id]->trials, connectbuf[con_id]->timeout_value.tv_sec, connectbuf[con_id]->timeout_value.tv_usec);
1106

    
1107
        if(connectbuf[con_id]->delay || connectbuf[con_id]->trials == MAX_TRIALS - 1) {
1108
                double delay = connectbuf[con_id]->timeout_value.tv_sec + connectbuf[con_id]->timeout_value.tv_usec / 1000000.0;
1109
                delay = delay * 2;
1110
                info("\tML: increasing pmtu timeout to %f sec\n", delay);
1111
                connectbuf[con_id]->timeout_value.tv_sec = floor(delay);
1112
                connectbuf[con_id]->timeout_value.tv_usec = fmod(delay, 1.0) * 1000000.0;
1113
                if(connectbuf[con_id]->delay) {
1114
                        connectbuf[con_id]->delay = false;
1115
                        reschedule_conn_msg(con_id);
1116
                }
1117
        }
1118

    
1119
        if(connectbuf[con_id]->trials == MAX_TRIALS) {
1120
                // decrement the pmtu size
1121
                struct timeval tout = PMTU_TIMEOUT;
1122
                info("\tML: decreasing pmtu estimate from %d to %d\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize));
1123
                connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
1124
                connectbuf[con_id]->timeout_value = tout; 
1125
                connectbuf[con_id]->trials = 0;
1126
        }
1127

    
1128
        //error in PMTU discovery?
1129
        if (connectbuf[con_id]->pmtusize == P_ERROR) {
1130
                if (connectbuf[con_id]->internal_connect == true) {
1131
                        //as of now we tried directly connecting, now let's try trough the NAT
1132
                        connectbuf[con_id]->internal_connect = false;
1133
                        connectbuf[con_id]->pmtusize = DSLSLIM;
1134
                } else {
1135
                        //nothing to do we have to give up
1136
                        error("ML: Could not create connection with connectionID %i!\n",con_id);
1137
                        // envoke the callback for failed connection establishment
1138
                        if(failed_Connection_cb != NULL)
1139
                                (failed_Connection_cb) (con_id, NULL);
1140
                        // delete the connection entry
1141
                        mlCloseConnection(con_id);
1142
                        return;
1143
                }
1144
        }
1145

    
1146
        //retry
1147
        resend_conn_msg(con_id);
1148
}
1149

    
1150

    
1151
int schedule_pmtu_timeout(int con_id)
1152
{
1153
        if (! connectbuf[con_id]->timeout_event) {
1154
                struct timeval tout = PMTU_TIMEOUT;
1155
                connectbuf[con_id]->timeout_value = tout;
1156
                connectbuf[con_id]->trials = 1;
1157
                connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
1158
                evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
1159
        }
1160
}
1161

    
1162
/*
1163
 * decrements the mtu size
1164
 */
1165
pmtu pmtu_decrement(pmtu pmtusize)
1166
{
1167
        pmtu pmtu_return_size;
1168
        switch(pmtusize) {
1169
        case MAX:
1170
                //return DSL;
1171
                return DSLSLIM;        //shortcut to use less vales
1172
        case DSL:
1173
                return DSLMEDIUM;
1174
        case DSLMEDIUM:
1175
                return DSLSLIM;
1176
        case DSLSLIM:
1177
                //return BELOWDSL;
1178
                return MIN;        //shortcut to use less vales
1179
        case BELOWDSL:
1180
                return MIN;
1181
        case MIN:
1182
                return P_ERROR;
1183
        default:
1184
                warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
1185
                return MIN;
1186
        }
1187
}
1188

    
1189
// called when an ICMP pmtu error message (type 3, code 4) is received
1190
void pmtu_error_cb_th(char *msg, int msglen)
1191
{
1192
        debug("ML: pmtu_error callback called msg_size: %d\n",msglen);
1193
        //TODO debug
1194
        return;
1195

    
1196
    char *msgbufptr = NULL;
1197
    int msgtype;
1198
    int connectionID;
1199
    pmtu pmtusize;
1200
    pmtu new_pmtusize;
1201
    int dead = 0;
1202

    
1203
    // check the packettype
1204
    msgbufptr = &msg[0];
1205

    
1206
    // check the msgtype
1207
    msgbufptr = &msg[1];
1208
    memcpy(&msgtype, msgbufptr, 4);
1209

    
1210
    if (msgtype == 0) {
1211

    
1212
        // get the connectionID
1213
        msgbufptr = &msg[5];
1214
        memcpy(&connectionID, msgbufptr, 4);
1215

    
1216
        int msgtype_c = connectbuf[connectionID]->status;
1217
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1218

    
1219
        if (msgtype_c != msgtype) {
1220
            dead = 1;
1221
        }
1222

    
1223

    
1224
    } else if (msgtype == 1) {
1225

    
1226
        // read the connectionID
1227
        msgbufptr = &msg[9];
1228
        memcpy(&connectionID, msgbufptr, 4);
1229

    
1230
        int msgtype_c = connectbuf[connectionID]->status;
1231
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1232

    
1233
        if (msgtype_c != msgtype) {
1234
            dead = 1;
1235
        }
1236

    
1237
    }
1238
    // decrement the pmtu size
1239
    new_pmtusize = pmtu_decrement(pmtusize);
1240

    
1241
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1242

    
1243
    if (new_pmtusize == P_ERROR) {
1244
                error("ML:  Could not create connection with connectionID %i !\n",
1245
                        connectionID);
1246

    
1247
                if(failed_Connection_cb != NULL)
1248
                        (failed_Connection_cb) (connectionID, NULL);
1249
                // set the message type to a non existent message
1250
                msgtype = 2;
1251
                // delete the connection entry
1252
                 mlCloseConnection(connectionID);
1253
        }
1254

    
1255
    if (msgtype == 0 && dead != 1) {
1256

    
1257
        // stop the timeout event
1258
        // timeout_del(connectbuf[connectionID]->timeout);
1259
        /*
1260
         * libevent2
1261
         */
1262

    
1263
        // event_del(connectbuf[connectionID]->timeout);
1264

    
1265

    
1266
        // create and send a connection message
1267
//         create_conn_msg(new_pmtusize, connectionID,
1268
//                         &local_socketID, INVITE);
1269

    
1270
//        send_conn_msg(connectionID, new_pmtusize);
1271

    
1272
        // set a timeout event for the pmtu discovery
1273
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1274
        // *)&connectionID);
1275

    
1276
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1277

    
1278
        /*
1279
         * libevent2
1280
         */
1281

    
1282
        struct event *ev;
1283
        ev = evtimer_new(base, pmtu_timeout_cb,
1284
                         (void *) connectbuf[connectionID]);
1285

    
1286
        // connectbuf[connectionID]->timeout = ev;
1287

    
1288
        event_add(ev, &connectbuf[connectionID]->timeout_value);
1289

    
1290
    } else if (msgtype == 1 && dead != 1) {
1291

    
1292
        // stop the timeout event
1293
        // timeout_del(connectbuf[connectionID]->timeout);
1294

    
1295
        /*
1296
         * libevent2
1297
         */
1298
        // info("still here 11 \n");
1299
        // printf("ev %d \n",connectbuf[connectionID]->timeout);
1300
        // event_del(connectbuf[connectionID]->timeout );
1301
        // evtimer_del(connectbuf[connectionID]->timeout );
1302

    
1303

    
1304
//         // create and send a connection message
1305
//         create_conn_msg(new_pmtusize,
1306
//                         connectbuf[connectionID]->connectionID,
1307
//                         NULL, CONNECT);
1308

    
1309
        //send_conn_msg(connectionID, new_pmtusize);
1310

    
1311
        // set a timeout event for the pmtu discovery
1312
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1313
        // *)&connectionID);
1314
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1315

    
1316
        /*
1317
         * libevent2
1318
         */
1319
        // struct event *ev;
1320
        // ev = evtimer_new(base,pmtu_timeout_cb, (void
1321
        // *)connectbuf[connectionID]);
1322
        // connectbuf[connectionID]->timeout = ev;
1323
        // event_add(ev,&connectbuf[connectionID]->timeout_value);
1324

    
1325
    }
1326
}
1327

    
1328
/*
1329
 * what to do once a packet arrived if it is a conn packet send it to
1330
 * recv_conn handler if it is a data packet send it to the recv_data
1331
 * handler
1332
 */
1333

    
1334
//done --
1335
void recv_pkg(int fd, short event, void *arg)
1336
{
1337
        debug("ML: recv_pkg called\n");
1338

    
1339
        struct msg_header *msg_h;
1340
        char msgbuf[MAX];
1341
        pmtu recvSize = MAX;
1342
        char *bufptr = msgbuf;
1343
        int ttl;
1344
        struct sockaddr_in recv_addr;
1345
        int msg_size;
1346

    
1347
        recvPacket(fd, msgbuf, &recvSize, &recv_addr, pmtu_error_cb_th, &ttl);
1348

    
1349

    
1350
        // check if it is not just an ERROR message
1351
        if(recvSize < 0)
1352
                return;
1353

    
1354
        // @TODO check if this simplistic STUN message recognition really always works, probably not
1355
        unsigned short stun_bind_response = 0x0101;
1356
        unsigned short * msgspot = (unsigned short *) msgbuf;
1357
        if (*msgspot == stun_bind_response) {
1358
                debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1359
                recv_stun_msg(msgbuf, recvSize);
1360
                return;
1361
        }
1362

    
1363
        msg_h = (struct msg_header *) msgbuf;
1364

    
1365
        /* convert header from network to host order */
1366
        msg_h->offset = ntohl(msg_h->offset);
1367
        msg_h->msg_length = ntohl(msg_h->msg_length);
1368
        msg_h->local_con_id = ntohl(msg_h->local_con_id);
1369
        msg_h->remote_con_id = ntohl(msg_h->remote_con_id);
1370
        msg_h->msg_seq_num = ntohl(msg_h->msg_seq_num);
1371

    
1372
        //verify minimum size
1373
        if (recvSize < sizeof(struct msg_header)) {
1374
          info("UDP packet too small, can't be an ML packet");
1375
          return;
1376
        }
1377

    
1378
        //TODO add more verifications
1379

    
1380
        bufptr += MSG_HEADER_SIZE + msg_h->len_mon_packet_hdr;
1381
        msg_size = recvSize - MSG_HEADER_SIZE - msg_h->len_mon_packet_hdr;
1382

    
1383
        //verify more fields
1384
        if (msg_size < 0) {
1385
          info("Corrupted UDP packet received");
1386
          return;
1387
        }
1388

    
1389
        if(get_Recv_pkt_inf_cb != NULL) {
1390
                mon_pkt_inf msginfNow;
1391
                msginfNow.monitoringHeaderLen = msg_h->len_mon_packet_hdr;
1392
                msginfNow.monitoringHeader = msg_h->len_mon_packet_hdr ? &msgbuf[0] + MSG_HEADER_SIZE : NULL;
1393
                //TODO rethink this ...
1394
                if(msg_h->msg_type == ML_CON_MSG) {
1395
                        struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1396
                        msginfNow.remote_socketID = &(c_msg->sock_id);
1397
                }
1398
                else if(connectbuf[msg_h->remote_con_id] == NULL) {
1399
                        error("ML: received pkg called with non existent connection\n");
1400
                        return;
1401
                } else
1402
                        msginfNow.remote_socketID = &(connectbuf[msg_h->remote_con_id]->external_socketID);
1403
                msginfNow.buffer = bufptr;
1404
                msginfNow.bufSize = recvSize;
1405
                msginfNow.msgtype = msg_h->msg_type;
1406
                msginfNow.ttl = ttl;
1407
                msginfNow.dataID = msg_h->msg_seq_num;
1408
                msginfNow.offset = msg_h->offset;
1409
                msginfNow.datasize = msg_h->msg_length;
1410
                gettimeofday(&msginfNow.arrival_time, NULL);
1411
                (get_Recv_pkt_inf_cb) ((void *) &msginfNow);
1412
        }
1413

    
1414

    
1415
        switch(msg_h->msg_type) {
1416
                case ML_CON_MSG:
1417
                        debug("ML: received conn pkg\n");
1418
                        recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1419
                        break;
1420
#ifdef RTX
1421
                case ML_NACK_MSG:
1422
                        debug("ML: received nack pkg\n");
1423
                        recv_nack_msg(msg_h, bufptr, msg_size);
1424
                        break;
1425
#endif
1426
                default:
1427
                        if(msg_h->msg_type < 127) {
1428
                                debug("ML: received data pkg\n");
1429
                                recv_data_msg(msg_h, bufptr, msg_size);
1430
                                break;
1431
                        }
1432
                        debug("ML: unrecognised msg_type\n");
1433
                        break;
1434
        }
1435
}
1436

    
1437
/*
1438
 * compare the external IP address of two socketIDs
1439
 */
1440
int
1441
compare_external_address_socketIDs(socketID_handle sock1, socketID_handle sock2)
1442
{
1443
        if( sock1->external_addr.udpaddr.sin_addr.s_addr == sock2->external_addr.udpaddr.sin_addr.s_addr)
1444
                return 0;
1445
        return 1;
1446
}
1447

    
1448
void try_stun();
1449

    
1450
/*
1451
 * the timeout of the NAT traversal
1452
 */
1453
void nat_traversal_timeout(int fd, short event, void *arg)
1454
{
1455
debug("X. NatTrTo %d\n", NAT_traversal);
1456
        if (NAT_traversal == false) {
1457
                debug("ML: NAT traversal request re-send\n");
1458
                if(receive_SocketID_cb)
1459
                        (receive_SocketID_cb) (&local_socketID, 2);
1460
                try_stun();
1461
        }
1462
debug("X. NatTrTo\n");
1463
}
1464

    
1465
//return IP address, or INADDR_NONE if can't resolve
1466
unsigned long resolve(const char *ipaddr)
1467
{
1468
        struct hostent *h = gethostbyname(ipaddr);
1469
        if (!h) {
1470
                error("ML: Unable to resolve host name %s\n", ipaddr);
1471
                return INADDR_NONE;
1472
        }
1473
        unsigned long *addr = (unsigned long *) (h->h_addr);
1474
        return *addr;
1475
}
1476

    
1477

    
1478
/*
1479
 * returns the file descriptor, or <0 on error. The ipaddr can be a null
1480
 * pointer. Then all available ipaddr on the machine are choosen.
1481
 */
1482
int create_socket(const int port, const char *ipaddr)
1483
{
1484
        struct sockaddr_in udpaddr = {0};
1485
        udpaddr.sin_family = AF_INET;
1486
        debug("X. create_socket %s, %d\n", ipaddr, port);
1487
        if (ipaddr == NULL) {
1488
                /*
1489
                * try to guess the local IP address
1490
                */
1491
                const char *ipaddr_iface = mlAutodetectIPAddress();
1492
                if (ipaddr_iface) {
1493
                        udpaddr.sin_addr.s_addr = inet_addr(ipaddr_iface);
1494
                } else {
1495
                        udpaddr.sin_addr.s_addr = INADDR_ANY;
1496
                }
1497
        } else {
1498
                udpaddr.sin_addr.s_addr = inet_addr(ipaddr);
1499
        }
1500
        udpaddr.sin_port = htons(port);
1501

    
1502
        socketaddrgen udpgen;
1503
        memset(&udpgen,0,sizeof(socketaddrgen));        //this will be sent over the net, so set it to 0
1504
        udpgen.udpaddr = udpaddr;
1505
        local_socketID.internal_addr = udpgen;
1506

    
1507
        socketfd = createSocket(port, ipaddr);
1508
        if (socketfd < 0){
1509
                return socketfd;
1510
        }
1511

    
1512
        struct event *ev;
1513
        ev = event_new(base, socketfd, EV_READ | EV_PERSIST, recv_pkg, NULL);
1514

    
1515
        event_add(ev, NULL);
1516

    
1517
        try_stun();
1518

    
1519
        return socketfd;
1520
}
1521

    
1522
/*
1523
 * try to figure out external IP using STUN, if defined
1524
 */
1525
void try_stun()
1526
{
1527
        if (isStunDefined()) {
1528
                /*
1529
                * send the NAT traversal STUN request
1530
                */
1531
                 send_stun_request(socketfd, &stun_server);
1532

    
1533
                /*
1534
                * enter a NAT traversal timeout that takes care of retransmission
1535
                */
1536
                struct event *ev1;
1537
                struct timeval timeout_value_NAT_traversal = NAT_TRAVERSAL_TIMEOUT;
1538
                ev1 = evtimer_new(base, nat_traversal_timeout, NULL);
1539
                event_add(ev1, &timeout_value_NAT_traversal);
1540

    
1541
                NAT_traversal = false;
1542
        } else {
1543
                /*
1544
                * Assume we have accessibility and copy internal address to external one
1545
                */
1546
                local_socketID.external_addr = local_socketID.internal_addr;
1547
                NAT_traversal = true; // @TODO: this is not really NAT traversal, but a flag that init is over
1548
                // callback to the upper layer indicating that the socketID is now
1549
                // ready to use
1550
                if(receive_SocketID_cb)
1551
                        (receive_SocketID_cb) (&local_socketID, 0); //success
1552
        }
1553
}
1554

    
1555
/**************************** END OF INTERNAL ***********************/
1556

    
1557
/**************************** MONL functions *************************/
1558

    
1559
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1560

    
1561
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1562
        base = (struct event_base *) arg;
1563
        recv_data_callback = recv_data_cb;
1564
        mlSetRecvTimeout(timeout_value);
1565
        if (stun_ipaddr) {
1566
                 mlSetStunServer(stun_port, stun_ipaddr);
1567
        } else {
1568

    
1569
        }
1570
        register_recv_localsocketID_cb(local_socketID_cb);
1571
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1572
        return create_socket(port, ipaddr);
1573
}
1574

    
1575
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1576
        setOutputRateParams(bucketsize, drainrate);
1577
        setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1578
}
1579
     
1580
void mlSetVerbosity (int log_level) {
1581
        setLogLevel(log_level);
1582
}
1583

    
1584
/* register callbacks  */
1585
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
1586

    
1587
        if (recv_pkt_inf_cb == NULL) {
1588
                error("ML: Register get_recv_pkt_inf_cb failed: NULL ptr  \n");
1589
        } else {
1590
                get_Recv_pkt_inf_cb = recv_pkt_inf_cb;
1591
        }
1592
}
1593

    
1594
void mlRegisterGetSendPktInf(get_send_pkt_inf_cb  send_pkt_inf_cb){
1595

    
1596
        if (send_pkt_inf_cb == NULL) {
1597
                error("ML: Register get_send_pkt_inf_cb: NULL ptr  \n");
1598
        } else {
1599
                get_Send_pkt_inf_cb = send_pkt_inf_cb;
1600
        }
1601
}
1602

    
1603

    
1604
void mlRegisterSetMonitoringHeaderPktCb(set_monitoring_header_pkt_cb monitoring_header_pkt_cb ){
1605

    
1606
        if (monitoring_header_pkt_cb == NULL) {
1607
                error("ML: Register set_monitoring_header_pkt_cb: NULL ptr  \n");
1608
        } else {
1609
                set_Monitoring_header_pkt_cb = monitoring_header_pkt_cb;
1610
        }
1611
}
1612

    
1613
void mlRegisterGetRecvDataInf(get_recv_data_inf_cb recv_data_inf_cb){
1614

    
1615
        if (recv_data_inf_cb == NULL) {
1616
                error("ML: Register get_recv_data_inf_cb: NULL ptr  \n");
1617
        } else {
1618
                get_Recv_data_inf_cb = recv_data_inf_cb;
1619
        }
1620
}
1621

    
1622
void mlRegisterGetSendDataInf(get_send_data_inf_cb  send_data_inf_cb){
1623

    
1624
        if (send_data_inf_cb == NULL) {
1625
                error("ML: Register get_send_data_inf_cb: NULL ptr  \n");
1626
        } else {
1627
                get_Send_data_inf_cb = send_data_inf_cb;
1628
        }
1629
}
1630

    
1631
void mlRegisterSetMonitoringHeaderDataCb(set_monitoring_header_data_cb monitoring_header_data_cb){
1632

    
1633
        if (monitoring_header_data_cb == NULL) {
1634
                error("ML: Register set_monitoring_header_data_cb : NULL ptr  \n");
1635
        } else {
1636
                set_Monitoring_header_data_cb = monitoring_header_data_cb;
1637
        }
1638
}
1639

    
1640
void mlSetRecvTimeout(struct timeval timeout_value){
1641

    
1642
        recv_timeout = timeout_value;
1643
#ifdef RTX
1644
        unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1645
        total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1646
        last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1647
        last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1648
        fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);        
1649
        fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);        
1650
#endif
1651
}
1652

    
1653
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){
1654

    
1655
        return getTTL(socketfd, ttl);
1656

    
1657
}
1658

    
1659
socketID_handle mlGetLocalSocketID(int *errorstatus){
1660

    
1661
        if (NAT_traversal == false) {
1662
                *errorstatus = 2;
1663
                return NULL;
1664
        }
1665

    
1666
        *errorstatus = 0;
1667
        return &local_socketID;
1668

    
1669
}
1670

    
1671

    
1672
/**************************** END of MONL functions *************************/
1673

    
1674
/**************************** GENERAL functions *************************/
1675

    
1676
void mlRegisterRecvConnectionCb(receive_connection_cb recv_conn_cb){
1677

    
1678
        if (recv_conn_cb == NULL) {
1679
                error("ML: Register receive_connection_cb: NULL ptr  \n");
1680
        }else {
1681
                receive_Connection_cb = recv_conn_cb;
1682
        }
1683
}
1684

    
1685
void mlRegisterErrorConnectionCb(connection_failed_cb conn_failed){
1686

    
1687
        if (conn_failed == NULL) {
1688
                error("ML: Register connection_failed_cb: NULL ptr  \n");
1689
        } else {
1690
                failed_Connection_cb = conn_failed;
1691
        }
1692
}
1693

    
1694
void mlRegisterRecvDataCb(receive_data_cb data_cb,unsigned char msgtype){
1695

    
1696
    if (msgtype > 126) {
1697

    
1698
            error
1699
            ("ML: Could not register recv_data callback. Msgtype is greater then 126 \n");
1700

    
1701
    }
1702

    
1703
    if (data_cb == NULL) {
1704

    
1705
            error("ML: Register receive data callback: NUll ptr \n ");
1706

    
1707
    } else {
1708

    
1709
        recvcbbuf[msgtype] = data_cb;
1710

    
1711
    }
1712

    
1713
}
1714

    
1715
void mlCloseSocket(socketID_handle socketID){
1716

    
1717
        free(socketID);
1718

    
1719
}
1720

    
1721
void keepalive_fn(evutil_socket_t fd, short what, void *arg) {
1722
        socketID_handle peer = arg;
1723

    
1724
        int con_id = mlConnectionExist(peer, false);
1725
        if (con_id < 0 || connectbuf[con_id]->defaultSendParams.keepalive <= 0) {
1726
                /* Connection fell from under us or keepalive was disabled */
1727
                free(arg);
1728
                return;
1729
        }
1730

    
1731
        /* do what we gotta do */
1732
        if ( connectbuf[con_id]->status == READY) {
1733
                char keepaliveMsg[32] = "";
1734
                sprintf(keepaliveMsg, "KEEPALIVE %d", connectbuf[con_id]->keepalive_seq++);
1735
                send_msg(con_id, MSG_TYPE_ML_KEEPALIVE, keepaliveMsg, 1 + strlen(keepaliveMsg), false, 
1736
                        &(connectbuf[con_id]->defaultSendParams));
1737
        }
1738

    
1739
        /* re-schedule */
1740
        struct timeval t = { 0,0 };
1741
        t.tv_sec = connectbuf[con_id]->defaultSendParams.keepalive;
1742
        if (connectbuf[con_id]->defaultSendParams.keepalive) 
1743
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1744
}
1745

    
1746
void setupKeepalive(int conn_id) {
1747
        /* Save the peer's address for us */
1748
        socketID_handle peer = malloc(sizeof(socket_ID));
1749
        memcpy(peer, &connectbuf[conn_id]->external_socketID, sizeof(socket_ID));
1750

    
1751
        struct timeval t = { 0,0 };
1752
        t.tv_sec = connectbuf[conn_id]->defaultSendParams.keepalive;
1753

    
1754
        if (connectbuf[conn_id]->defaultSendParams.keepalive) 
1755
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1756
}
1757

    
1758
/* connection functions */
1759
int mlOpenConnection(socketID_handle external_socketID,receive_connection_cb connection_cb,void *arg, const send_params defaultSendParams){
1760

    
1761
        int con_id;
1762
        if (external_socketID == NULL) {
1763
                error("ML: cannot open connection: one of the socketIDs is NULL\n");
1764
                return -1;
1765
        }
1766
        if (NAT_traversal == false) {
1767
                error("ML: cannot open connection: NAT traversal for socketID still in progress\n");
1768
                return -1;
1769
        }
1770
        if (connection_cb == NULL) {
1771
                error("ML: cannot open connection: connection_cb is NULL\n");
1772
                return -1;
1773
        }
1774

    
1775
        // check if that connection already exist
1776

    
1777
        con_id = mlConnectionExist(external_socketID, false);
1778
        if (con_id >= 0) {
1779
                // overwrite defaultSendParams
1780
                bool newKeepalive = 
1781
                        connectbuf[con_id]->defaultSendParams.keepalive == 0 && defaultSendParams.keepalive != 0;
1782
                connectbuf[con_id]->defaultSendParams = defaultSendParams;
1783
                if (newKeepalive) setupKeepalive(con_id);
1784
                // if so check if it is ready to use
1785
                if (connectbuf[con_id]->status == READY) {
1786
                                // if so use the callback immediately
1787
                                (connection_cb) (con_id, arg);
1788

    
1789
                // otherwise just write the connection cb and the arg pointer
1790
                // into the connection struct
1791
                } else {
1792
                        struct receive_connection_cb_list *temp;
1793
                        temp = malloc(sizeof(struct receive_connection_cb_list));
1794
                        temp->next = NULL;
1795
                        temp->connection_cb = connection_cb;
1796
                        temp->arg = arg;
1797
                        if(connectbuf[con_id]->connection_last != NULL) {
1798
                                connectbuf[con_id]->connection_last->next = temp;
1799
                                connectbuf[con_id]->connection_last = temp;
1800
                        } else
1801
                                connectbuf[con_id]->connection_last = connectbuf[con_id]->connection_head = temp;
1802
                }
1803
                return con_id;
1804
        }
1805
        // make entry in connection_establishment array
1806
        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
1807
                if (connectbuf[con_id] == NULL) {
1808
                        connectbuf[con_id] = (connect_data *) malloc(sizeof(connect_data));
1809
                        memset(connectbuf[con_id],0,sizeof(connect_data));
1810
                        connectbuf[con_id]->starttime = time(NULL);
1811
                        memcpy(&connectbuf[con_id]->external_socketID, external_socketID, sizeof(socket_ID));
1812
                        connectbuf[con_id]->pmtusize = DSLSLIM;
1813
                        connectbuf[con_id]->timeout_event = NULL;
1814
                        connectbuf[con_id]->status = INVITE;
1815
                        connectbuf[con_id]->seqnr = 0;
1816
                        connectbuf[con_id]->internal_connect = !compare_external_address_socketIDs(external_socketID, &local_socketID);
1817
                        connectbuf[con_id]->connectionID = con_id;
1818

    
1819
                        connectbuf[con_id]->connection_head = connectbuf[con_id]->connection_last = malloc(sizeof(struct receive_connection_cb_list));
1820
                        connectbuf[con_id]->connection_last->next = NULL;
1821
                        connectbuf[con_id]->connection_last->connection_cb = connection_cb;
1822
                        connectbuf[con_id]->connection_last->arg = arg;
1823
                        connectbuf[con_id]->external_connectionID = -1;
1824

    
1825
                        connectbuf[con_id]->defaultSendParams = defaultSendParams;
1826
                        if (defaultSendParams.keepalive) setupKeepalive(con_id);
1827
                        break;
1828
                }
1829
        } //end of for
1830

    
1831
        if (con_id == CONNECTBUFSIZE) {
1832
                error("ML: Could not open connection: connection buffer full\n");
1833
                return -1;
1834
        }
1835

    
1836
        // create and send a connection message
1837
        info("ML:Sending INVITE to %s (lconn:%d)\n",conid_to_string(con_id), con_id);
1838
        send_conn_msg_with_pmtu_discovery(con_id, connectbuf[con_id]->pmtusize, INVITE);
1839

    
1840
        return con_id;
1841

    
1842
}
1843

    
1844
void mlCloseConnection(const int connectionID){
1845

    
1846
        // remove it from the connection array
1847
        if(connectbuf[connectionID]) {
1848
                if(connectbuf[connectionID]->ctrl_msg_buf) {
1849
                        free(connectbuf[connectionID]->ctrl_msg_buf);
1850
                }
1851
                // remove related events
1852
                if (connectbuf[connectionID]->timeout_event) {
1853
                        event_del(connectbuf[connectionID]->timeout_event);
1854
                        event_free(connectbuf[connectionID]->timeout_event);
1855
                        connectbuf[connectionID]->timeout_event = NULL;
1856
                }
1857
                free(connectbuf[connectionID]);
1858
                connectbuf[connectionID] = NULL;
1859
        }
1860

    
1861
}
1862

    
1863
void mlSendData(const int connectionID,char *sendbuf,int bufsize,unsigned char msgtype,send_params *sParams){
1864

    
1865
        if (connectionID < 0) {
1866
                error("ML: send data failed: connectionID does not exist\n");
1867
                return;
1868
        }
1869

    
1870
        if (connectbuf[connectionID] == NULL) {
1871
                error("ML: send data failed: connectionID does not exist\n");
1872
                return;
1873
        }
1874
        if (connectbuf[connectionID]->status != READY) {
1875
            error("ML: send data failed: connection is not active\n");
1876
            return;
1877
        }
1878

    
1879
        if (sParams == NULL) {
1880
                sParams = &(connectbuf[connectionID]->defaultSendParams);
1881
        }
1882

    
1883
        send_msg(connectionID, msgtype, sendbuf, bufsize, false, sParams);
1884

    
1885
}
1886

    
1887
/* transmit data functions  */
1888
int mlSendAllData(const int connectionID,send_all_data_container *container,int nr_entries,unsigned char msgtype,send_params *sParams){
1889

    
1890
    if (nr_entries < 1 || nr_entries > 5) {
1891

    
1892
        error
1893
            ("ML : sendALlData : nr_enties is not between 1 and 5 \n ");
1894
        return 0;
1895

    
1896
    } else {
1897

    
1898
        if (nr_entries == 1) {
1899

    
1900
                mlSendData(connectionID, container->buffer_1,
1901
                      container->length_1, msgtype, sParams);
1902

    
1903
            return 1;
1904

    
1905
        } else if (nr_entries == 2) {
1906

    
1907
            int buflen = container->length_1 + container->length_2;
1908
            char buf[buflen];
1909
            memcpy(buf, container->buffer_1, container->length_1);
1910
            memcpy(&buf[container->length_1], container->buffer_2,
1911
                   container->length_2);
1912
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
1913

    
1914
            return 1;
1915

    
1916
        } else if (nr_entries == 3) {
1917

    
1918
            int buflen =
1919
                container->length_1 + container->length_2 +
1920
                container->length_3;
1921
            char buf[buflen];
1922
            memcpy(buf, container->buffer_1, container->length_1);
1923
            memcpy(&buf[container->length_1], container->buffer_2,
1924
                   container->length_2);
1925
            memcpy(&buf[container->length_2], container->buffer_3,
1926
                   container->length_3);
1927
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
1928

    
1929

    
1930
            return 1;
1931

    
1932
        } else if (nr_entries == 4) {
1933

    
1934
            int buflen =
1935
                container->length_1 + container->length_2 +
1936
                container->length_3 + container->length_4;
1937
            char buf[buflen];
1938
            memcpy(buf, container->buffer_1, container->length_1);
1939
            memcpy(&buf[container->length_1], container->buffer_2,
1940
                   container->length_2);
1941
            memcpy(&buf[container->length_2], container->buffer_3,
1942
                   container->length_3);
1943
            memcpy(&buf[container->length_3], container->buffer_4,
1944
                   container->length_4);
1945
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
1946

    
1947
            return 1;
1948

    
1949
        } else {
1950

    
1951
            int buflen =
1952
                container->length_1 + container->length_2 +
1953
                container->length_3 + container->length_4 +
1954
                container->length_5;
1955
            char buf[buflen];
1956
            memcpy(buf, container->buffer_1, container->length_1);
1957
            memcpy(&buf[container->length_1], container->buffer_2,
1958
                   container->length_2);
1959
            memcpy(&buf[container->length_2], container->buffer_3,
1960
                   container->length_3);
1961
            memcpy(&buf[container->length_3], container->buffer_4,
1962
                   container->length_4);
1963
            memcpy(&buf[container->length_4], container->buffer_5,
1964
                   container->length_5);
1965
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
1966

    
1967
            return 1;
1968
        }
1969

    
1970
    }
1971

    
1972
}
1973

    
1974
int mlRecvData(const int connectionID,char *recvbuf,int *bufsize,recv_params *rParams){
1975

    
1976
        //TODO yet to be converted
1977
        return 0;
1978
#if 0
1979
        if (rParams == NULL) {
1980
                error("ML: recv_data failed: recv_params is a NULL ptr\n");
1981
                return 0;
1982
    } else {
1983

1984
        info("ML: recv data called \n");
1985

1986
        int i = 0;
1987
        int returnValue = 0;
1988
        double timeout = (double) recv_timeout.tv_sec;
1989
        time_t endtime = time(NULL);
1990

1991
        for (i = 0; i < RECVDATABUFSIZE; i++) {
1992

1993
            if (recvdatabuf[i] != NULL) {
1994

1995
                if (recvdatabuf[i]->connectionID == connectionID) {
1996

1997
                    info("ML: recv data has entry  \n");
1998

1999
                    double timepass = difftime(endtime, recvdatabuf[i]->starttime);
2000

2001
                    // check if the specified connection has data and it
2002
                    // is complete
2003
                    // check the data seqnr
2004
                    // if(connectionID == recvdatabuf[i]->connectionID &&
2005
                    // 1 == recvdatabuf[i]->status){
2006

2007
                    if (1 == recvdatabuf[i]->status) {
2008

2009
                        // info("transmissionHandler: recv_data set is
2010
                        // complete \n" );
2011

2012
                        // debug("debud \n");
2013

2014
                        // exchange the pointers
2015
                        int buffersize = 0;
2016
                        buffersize = recvdatabuf[i]->bufsize;
2017
                        *bufsize = buffersize;
2018
                        // recvbuf = recvdatabuf[i]->recvbuf;
2019

2020
                        // info("buffersize %d \n",buffersize);
2021
                        memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2022
                               buffersize);
2023
                        // debug(" recvbuf %s \n",recvbuf );
2024

2025
//                         double nrMissFrags =
2026
//                             (double) recvdatabuf[i]->nrFragments /
2027
//                             (double) recvdatabuf[i]->recvFragments;
2028
//                         int nrMissingFragments = (int) ceil(nrMissFrags);
2029

2030
//                        rParams->nrMissingFragments = nrMissingFragments;
2031
//                         rParams->nrFragments = recvdatabuf[i]->nrFragments;
2032
                        rParams->msgtype = recvdatabuf[i]->msgtype;
2033
                        rParams->connectionID =
2034
                            recvdatabuf[i]->connectionID;
2035

2036
                        // break from the loop
2037
                        // debug(" recvbuf %s \n ",recvbuf);
2038

2039
                        // double nrMissFrags =
2040
                        // (double)recvdatabuf[i]->nrFragments /
2041
                        // (double)recvdatabuf[i]->recvFragments;
2042
                        // int nrMissingFragments =
2043
                        // (int)ceil(nrMissFrags);
2044

2045
                        if(get_Recv_data_inf_cb != NULL) {
2046
                                mon_data_inf recv_data_inf;
2047

2048
                                recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2049
                                recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2050
                                recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2051
                                recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2052
//                                 recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2053
//                                 recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2054
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
2055
                                recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2056
                                recv_data_inf.nrMissingFragments = nrMissingFragments;
2057
                                recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2058
                                recv_data_inf.priority = false;
2059
                                recv_data_inf.padding = false;
2060
                                recv_data_inf.confirmation = false;
2061
                                recv_data_inf.reliable = false;
2062

2063
                                // send data recv callback to monitoring module
2064

2065
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2066
                        }
2067

2068

2069
                        // free the allocated memory
2070
                        free(recvdatabuf[i]);
2071
                        recvdatabuf[i] = NULL;
2072

2073
                        returnValue = 1;
2074
                        break;
2075

2076
                    }
2077

2078
                    if (recvdatabuf[i] != NULL) {
2079

2080
                        if (timepass > timeout) {
2081

2082
                            info("ML: recv_data timeout called  \n");
2083

2084
                            // some data about the missing chunks should
2085
                            // be added here
2086
                            // exchange the pointers
2087
                            int buffersize = 0;
2088
                            buffersize = recvdatabuf[i]->bufsize;
2089
                            *bufsize = buffersize;
2090
                            // recvbuf = recvdatabuf[i]->recvbuf;
2091

2092
                            double nrMissFrags =
2093
                                (double) recvdatabuf[i]->nrFragments /
2094
                                (double) recvdatabuf[i]->recvFragments;
2095
                            int nrMissingFragments =
2096
                                (int) ceil(nrMissFrags);
2097

2098
                            // debug(" recvbuf %s \n",recvbuf );
2099

2100
                            memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2101
                                   buffersize);
2102

2103
                            rParams->nrMissingFragments =
2104
                                nrMissingFragments;
2105
                            rParams->nrFragments =
2106
                                recvdatabuf[i]->nrFragments;
2107
                            rParams->msgtype = recvdatabuf[i]->msgtype;
2108
                            rParams->connectionID =
2109
                                recvdatabuf[i]->connectionID;
2110

2111
                                if(get_Recv_data_inf_cb != NULL) {
2112
                                        mon_data_inf recv_data_inf;
2113

2114
                                        recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2115
                                        recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2116
                                        recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2117
                                        recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2118
                                        recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2119
                                        recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2120
                                        gettimeofday(&recv_data_inf.arrival_time, NULL);
2121
                                        recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2122
                                        recv_data_inf.nrMissingFragments = nrMissingFragments;
2123
                                        recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2124
                                        recv_data_inf.priority = false;
2125
                                        recv_data_inf.padding = false;
2126
                                        recv_data_inf.confirmation = false;
2127
                                        recv_data_inf.reliable = false;
2128

2129
                                        // send data recv callback to monitoring module
2130

2131
                                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2132
                                }
2133

2134
                            // free the allocated memory
2135
                            free(recvdatabuf[i]);
2136
                            recvdatabuf[i] = NULL;
2137

2138
                            returnValue = 1;
2139
                            break;
2140

2141
                        }
2142
                    }
2143

2144
                }
2145

2146
            }
2147
            // debug("2 recvbuf %s \n ",recvbuf);
2148
        }
2149
        return returnValue;
2150
    }
2151
#endif
2152

    
2153
}
2154

    
2155
int mlSocketIDToString(socketID_handle socketID,char* socketID_string, size_t len){
2156

    
2157
        char internal_addr[INET_ADDRSTRLEN];
2158
        char external_addr[INET_ADDRSTRLEN];
2159

    
2160
        assert(socketID);
2161

    
2162
        inet_ntop(AF_INET, &(socketID->internal_addr.udpaddr.sin_addr.s_addr), internal_addr, INET_ADDRSTRLEN);
2163
        inet_ntop(AF_INET, &(socketID->external_addr.udpaddr.sin_addr.s_addr), external_addr, INET_ADDRSTRLEN);
2164

    
2165
        snprintf(socketID_string,len,"%s:%d-%s:%d", internal_addr, ntohs(socketID->internal_addr.udpaddr.sin_port),
2166
                external_addr,        ntohs(socketID->external_addr.udpaddr.sin_port));
2167
        return 0;
2168

    
2169
}
2170

    
2171
int mlStringToSocketID(const char* socketID_string, socketID_handle socketID){
2172

    
2173
        //@TODO add checks against malformed string
2174
        char external_addr[INET_ADDRSTRLEN];
2175
        int external_port;
2176
        char internal_addr[INET_ADDRSTRLEN];
2177
        int internal_port;
2178

    
2179
        char *pch;
2180
        char *s = strdup(socketID_string);
2181

    
2182
        //replace ':' with a blank
2183
        pch=strchr(s,':');
2184
        while (pch!=NULL){
2185
                                *pch = ' ';
2186
                pch=strchr(pch+1,':');
2187
        }
2188
        pch=strchr(s,'-');
2189
        if(pch) *pch = ' ';
2190

    
2191
        sscanf(s,"%s %d %s %d", internal_addr, &internal_port,
2192
                external_addr, &external_port);
2193

    
2194
        //set structure to 0, we initialize each byte, since it will be sent on the net later
2195
        memset(socketID, 0, sizeof(struct _socket_ID));
2196

    
2197
        if(inet_pton(AF_INET, internal_addr, &(socketID->internal_addr.udpaddr.sin_addr)) == 0)
2198
                return EINVAL;
2199
        socketID->internal_addr.udpaddr.sin_family = AF_INET;
2200
        socketID->internal_addr.udpaddr.sin_port = htons(internal_port);
2201

    
2202

    
2203
        if(inet_pton(AF_INET, external_addr, &(socketID->external_addr.udpaddr.sin_addr)) ==0)
2204
                return EINVAL;
2205
        socketID->external_addr.udpaddr.sin_family = AF_INET;
2206
        socketID->external_addr.udpaddr.sin_port = htons(external_port);
2207

    
2208
        free(s);
2209
        return 0;
2210

    
2211
}
2212

    
2213
int mlGetConnectionStatus(int connectionID){
2214

    
2215
        if(connectbuf[connectionID])
2216
                return connectbuf[connectionID]->status == READY;
2217
        return -1;
2218
    
2219
}
2220

    
2221

    
2222
int mlConnectionExist(socketID_handle socketID, bool ready){
2223

    
2224
    /*
2225
     * check if another connection for the external connectionID exist
2226
     * that was established \ within the last 2 seconds
2227
     */
2228
        int i;
2229
        for (i = 0; i < CONNECTBUFSIZE; i++)
2230
                if (connectbuf[i] != NULL)
2231
                        if (mlCompareSocketIDs(&(connectbuf[i]->external_socketID), socketID) == 0) {
2232
                                if (ready) return (connectbuf[i]->status == READY ? i : -1);;
2233
                                return i;
2234
                                }
2235

    
2236
    return -1;
2237

    
2238
}
2239

    
2240
//Added by Robert Birke as comodity functions
2241

    
2242
//int mlPrintSocketID(socketID_handle socketID) {
2243
//        char str[SOCKETID_STRING_SIZE];
2244
//        mlSocketIDToString(socketID, str, sizeof(str));
2245
//        printf(stderr,"int->%s<-ext\n",str);
2246
//}
2247

    
2248
/*
2249
 * hash code of a socketID
2250
 * TODO might think of a better way
2251
 */
2252
int mlHashSocketID(socketID_handle sock) {
2253
        //assert(sock);
2254
   return sock->internal_addr.udpaddr.sin_port +
2255
                        sock->external_addr.udpaddr.sin_port;
2256
}
2257

    
2258
int mlCompareSocketIDs(socketID_handle sock1, socketID_handle sock2) {
2259

    
2260
        assert(sock1 && sock2);
2261

    
2262
        /*
2263
        * compare internal addr
2264
        */
2265
        if(sock1 == NULL || sock2 == NULL)
2266
                return 1;
2267

    
2268
        if (sock1->internal_addr.udpaddr.sin_addr.s_addr !=
2269
            sock2->internal_addr.udpaddr.sin_addr.s_addr)
2270
                        return 1;
2271

    
2272
        if (sock1->internal_addr.udpaddr.sin_port !=
2273
                 sock2->internal_addr.udpaddr.sin_port)
2274
                        return 1;
2275

    
2276
        /*
2277
        * compare external addr
2278
        */
2279
        if (sock1->external_addr.udpaddr.sin_addr.s_addr !=
2280
            sock2->external_addr.udpaddr.sin_addr.s_addr)
2281
                        return 1;
2282

    
2283
        if (sock1->external_addr.udpaddr.sin_port !=
2284
                 sock2->external_addr.udpaddr.sin_port)
2285
                        return 1;
2286

    
2287
        return 0;
2288
}
2289

    
2290
int mlCompareSocketIDsByPort(socketID_handle sock1, socketID_handle sock2)
2291
{
2292
        if(sock1 == NULL || sock2 == NULL)
2293
                return 1;
2294
 
2295
        if (sock1->internal_addr.udpaddr.sin_port !=
2296
                 sock2->internal_addr.udpaddr.sin_port)
2297
                        return 1;
2298

    
2299
        if (sock1->external_addr.udpaddr.sin_port !=
2300
                 sock2->external_addr.udpaddr.sin_port)
2301
                        return 1;
2302
        return 0;
2303
}
2304

    
2305
int mlGetPathMTU(int ConnectionId) {
2306
        if(ConnectionId < 0 || ConnectionId >= CONNECTBUFSIZE)
2307
                return -1;
2308
        if (connectbuf[ConnectionId] != NULL)
2309
                return connectbuf[ConnectionId]->pmtusize;
2310
        return -1;
2311
}
2312

    
2313
/**************************** END of GENERAL functions *************************/
2314

    
2315
/**************************** NAT functions *************************/
2316

    
2317
/* setter  */
2318
void mlSetStunServer(const int port,const char *ipaddr){
2319

    
2320
        stun_server.sin_family = AF_INET;
2321
        if (ipaddr == NULL)
2322
                stun_server.sin_addr.s_addr = htonl(INADDR_NONE);
2323
        else
2324
                stun_server.sin_addr.s_addr = resolve(ipaddr);
2325
        stun_server.sin_port = htons(port);
2326

    
2327
}
2328

    
2329
int mlGetExternalIP(char* external_addr){
2330

    
2331
        socketaddrgen udpgen;
2332
        struct sockaddr_in udpaddr;
2333

    
2334
        udpgen = local_socketID.external_addr;
2335
        udpaddr = udpgen.udpaddr;
2336

    
2337
        inet_ntop(AF_INET, &(udpaddr.sin_addr), external_addr,
2338
                        INET_ADDRSTRLEN);
2339

    
2340
        if (external_addr == NULL) {
2341

    
2342
        return -1;
2343

    
2344
        } else {
2345

    
2346
        return 0;
2347

    
2348
        }
2349

    
2350
}
2351

    
2352
/**************************** END of NAT functions *************************/