Statistics
| Branch: | Revision:

napa-baselibs / ml / ml.c @ cc96298a

History | View | Annotate | Download (80.1 KB)

1
/*
2
 * Copyright (c) 2009 NEC Europe Ltd. All Rights Reserved.
3
 * Authors: Kristian Beckers  <beckers@nw.neclab.eu>
4
 *          Sebastian Kiesel  <kiesel@nw.neclab.eu>
5
 * Copyright (c) 2011 Csaba Kiraly <kiraly@disi.unitn.it>
6
 *
7
 * This file is part of the Messaging Library.
8
 *
9
 * The Messaging Library is free software: you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public License as
11
 * published by the Free Software Foundation, either version 3 of the
12
 * License, or (at your option) any later version.
13
 *
14
 * The Messaging Library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
17
 * General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public License
20
 * along with the Messaging Library.  If not, see <http://www.gnu.org/licenses/>.
21
 *
22
 */
23

    
24
#include <ml_all.h>
25

    
26
#ifdef FEC
27
#include "fec/RSfec.h"
28
int prev_sqnr=0;  //used to track the previous seq nrs, so that not to consider extra pakts >k.
29
int *pix;         //keep track of packet indexes.
30
int nix=0;        //counter incrimented after the arrivel of each packet. pix[nix]
31
int cnt=0;
32
int chk_complete[5000]={0};
33
#endif
34

    
35
/**************************** START OF INTERNALS ***********************/
36

    
37

    
38
/*
39
 * reserved message type for internal puposes
40
 */
41
#define MSG_TYPE_ML_KEEPALIVE 0x126        //TODO: check that it is really interpreted as internal
42

    
43
/*
44
 * a pointer to a libevent instance
45
 */
46
struct event_base *base;
47

    
48
/*
49
 * define the nr of connections the messaging layer can handle
50
 */
51
#define CONNECTBUFSIZE 10000
52
/*
53
 * define the nr of data that can be received parallel
54
 */
55
#define RECVDATABUFSIZE 10000
56
/*
57
 * define an array for message multiplexing
58
 */
59
#define MSGMULTIPLEXSIZE 127
60

    
61

    
62
/*
63
 * timeout before thinking that the STUN server can't be connected
64
 */
65
#define NAT_TRAVERSAL_TIMEOUT { 1, 0 }
66

    
67
/*
68
 * timeout before thinking of an mtu problem (check MAX_TRIALS as well)
69
 */
70
#define PMTU_TIMEOUT 1000000 // in usec
71

    
72
/*
73
 * retry sending connection messages this many times before reducing pmtu
74
 */
75
#define MAX_TRIALS 3
76

    
77
/*
78
 * default timeout value between the first and the last received packet of a message
79
 */
80
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
81

    
82
#ifdef RTX
83
/*
84
 * default timeout value for a packet reception
85
 */
86
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
87

    
88
/*
89
 * default timeout value for a packet reception
90
 */
91
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
92

    
93
/*
94
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
95
 */
96
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
97

    
98
#endif
99

    
100

    
101
/*
102
 * global variables
103
 */
104

    
105
/*
106
 * define a buffer of pointers to connect structures
107
 */
108
connect_data *connectbuf[CONNECTBUFSIZE];
109

    
110
/*
111
 * define a pointer buffer with pointers to recv_data structures
112
 */
113
recvdata *recvdatabuf[RECVDATABUFSIZE];
114

    
115
/*
116
 * define a pointer buffer for message multiplexing
117
 */
118
receive_data_cb recvcbbuf[MSGMULTIPLEXSIZE];
119

    
120
/*
121
 * stun server address
122
 */
123
struct sockaddr_in stun_server;
124

    
125
/*
126
 * receive timeout
127
 */
128
static struct timeval recv_timeout = RECV_TIMEOUT_DEFAULT;
129

    
130
/*
131
 * boolean NAT traversal successful if true
132
 */
133
boolean NAT_traversal;
134

    
135
/*
136
 * file descriptor for local socket
137
 */
138
evutil_socket_t socketfd;
139

    
140
/*
141
 * local socketID
142
 */
143
socket_ID local_socketID;
144

    
145
socketID_handle loc_socketID = &local_socketID;
146

    
147
/*
148
 * callback function pointers
149
 */
150
/*
151
 * monitoring module callbacks
152
 */
153
get_recv_pkt_inf_cb get_Recv_pkt_inf_cb = NULL;
154
get_send_pkt_inf_cb get_Send_pkt_inf_cb = NULL;
155
set_monitoring_header_pkt_cb set_Monitoring_header_pkt_cb = NULL;
156
get_recv_data_inf_cb get_Recv_data_inf_cb = NULL;
157
get_send_data_inf_cb get_Send_data_inf_cb = NULL;
158
set_monitoring_header_data_cb set_Monitoring_header_data_cb = NULL;
159
/*
160
 * connection callbacks
161
 */
162
receive_connection_cb receive_Connection_cb = NULL;
163
connection_failed_cb failed_Connection_cb = NULL;
164
/*
165
 * local socketID callback
166
 */
167
receive_localsocketID_cb receive_SocketID_cb;
168

    
169
/*
170
 * boolean that defines if received data is transmitted to the upper layer
171
 * via callback or via upper layer polling
172
 */
173
boolean recv_data_callback;
174

    
175
/*
176
 * helper function to get rid of a warning
177
 */
178
#ifndef _WIN32
179
int min(int a, int b) {
180
        if (a > b) return b;
181
        return a;
182
}
183
#endif
184

    
185
#ifdef RTX
186
//*********Counters**********
187

    
188
struct Counters {
189
        unsigned int receivedCompleteMsgCounter;
190
        unsigned int receivedIncompleteMsgCounter;
191
        unsigned int receivedDataPktCounter;
192
        unsigned int receivedRTXDataPktCounter;
193
        unsigned int receivedNACK1PktCounter;
194
        unsigned int receivedNACKMorePktCounter;
195
        unsigned int sentDataPktCounter;
196
        unsigned int sentRTXDataPktCtr;
197
        unsigned int sentNACK1PktCounter;
198
        unsigned int sentNACKMorePktCounter;
199
} counters;
200

    
201
extern unsigned int sentRTXDataPktCounter;
202

    
203
/*
204
 * receive timeout for a packet
205
 */
206
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
207

    
208

    
209
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
210

    
211
void mlShowCounters() {
212
        counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
213
        fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
214
        return;
215
}
216

    
217
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
218
{
219
        struct nack_msg *nackmsg;
220
        
221
        msgbuf += msg_h->len_mon_data_hdr;
222
        msg_size -= msg_h->len_mon_data_hdr;
223
        nackmsg = (struct nack_msg*) msgbuf;
224
        
225
        unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
226
        //if (gapSize == 1349) counters.receivedNACK1PktCounter++;
227
        //else counters.receivedNACKMorePktCounter++;
228

    
229
        rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);        
230
}
231

    
232
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams);
233

    
234
void pkt_recv_timeout_cb(int fd, short event, void *arg){
235
        int recv_id = (long) arg;
236
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
237

    
238
        //check if message still exists        
239
        if (recvdatabuf[recv_id] == NULL) return;
240

    
241
        //check if gap was filled in the meantime
242
        if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
243
                recvdatabuf[recv_id]->firstGap++;
244
                return;        
245
        }
246

    
247
        struct nack_msg nackmsg;
248
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
249
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
250
        nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
251
        nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
252
        recvdatabuf[recv_id]->firstGap++;
253

    
254
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
255

    
256
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
257
}
258

    
259
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
260
        int recv_id = (long) arg;
261
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
262

    
263
        if (recvdatabuf[recv_id] == NULL) {
264
                return;
265
        }
266

    
267
        if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
268
                debug("ML: freeing last packet timeout for %d",recv_id);
269
                event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
270
                event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
271
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
272
        }
273

    
274
        if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
275

    
276
        struct nack_msg nackmsg;
277
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
278
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
279
        nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
280
        nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
281

    
282
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
283

    
284
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
285
}
286

    
287
#endif
288

    
289
/*
290
 * compare the external IP address of two socketIDs
291
 */
292
bool compare_address(struct sockaddr_in *sock1, struct sockaddr_in *sock2, int netmaskbits, bool port)
293
{
294
        int i;
295
        uint32_t mask = 0;
296

    
297
        for (i = 0; i < netmaskbits; i ++) {
298
                mask += 1<<i;
299
        }
300
        mask = htonl(mask);
301

    
302
        return ((sock1->sin_addr.s_addr | mask )  == (sock2->sin_addr.s_addr | mask)) &&
303
               (!port || (sock1->sin_port == sock2->sin_port) );
304
}
305

    
306
/*
307
 * decide whether it is worth trying to connect with internal connect
308
 */
309
bool internal_connect_plausible(socketID_handle sock1, socketID_handle sock2)
310
{
311
        return compare_address (&sock1->external_addr.udpaddr, &sock2->external_addr.udpaddr, 0, false);
312
}
313

    
314
/*
315
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
316
 */
317
const char *conid_to_string(int con_id)
318
{
319
        static char s[INET_ADDRSTRLEN+1+5+1+INET_ADDRSTRLEN+1+5+1];
320
        mlSocketIDToString(&connectbuf[con_id]->external_socketID, s, sizeof(s));
321
        return s;
322
}
323

    
324
void register_recv_localsocketID_cb(receive_localsocketID_cb local_socketID_cb)
325
{
326
        if (local_socketID_cb == NULL) {
327
                error("ML : Register receive_localsocketID_cb: NULL ptr \n");
328
        } else {
329
                receive_SocketID_cb = local_socketID_cb;
330
        }
331
}
332

    
333

    
334
//void keep_connection_alive(const int connectionID)
335
//{
336
//
337
//    // to be done with the NAT traversal
338
//    // send a message over the wire
339
//    printf("\n");
340
//
341
//}
342

    
343
void unsetStunServer()
344
{
345
        stun_server.sin_addr.s_addr = INADDR_NONE;
346
}
347

    
348
bool isStunDefined()
349
{
350
        return stun_server.sin_addr.s_addr != INADDR_NONE;
351
}
352

    
353
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams) {
354
#ifdef FEC
355
        int chk_msg_len=msg_len;
356
        int n=4;
357
        int k=64;
358
        int lcnt=0;
359
        int icnt=0;
360
        int i=0;
361
#endif
362
        socketaddrgen udpgen;
363
        bool retry;
364
        int pkt_len, offset;
365
        struct iovec iov[4];
366

    
367
        char h_pkt[MON_PKT_HEADER_SPACE];
368
        char h_data[MON_DATA_HEADER_SPACE];
369

    
370
        struct msg_header msg_h;
371

    
372
        debug("ML: send_msg to %s conID:%d extID:%d\n", conid_to_string(con_id), con_id, connectbuf[con_id]->external_connectionID);
373

    
374
        iov[0].iov_base = &msg_h;
375
        iov[0].iov_len = MSG_HEADER_SIZE;
376

    
377
        msg_h.local_con_id = htonl(con_id);
378
        msg_h.remote_con_id = htonl(connectbuf[con_id]->external_connectionID);
379
        msg_h.msg_type = msg_type;
380
        msg_h.msg_seq_num = htonl(connectbuf[con_id]->seqnr++);
381

    
382

    
383
        iov[1].iov_len = iov[2].iov_len = 0;
384
        iov[1].iov_base = h_pkt;
385
        iov[2].iov_base = h_data;
386

    
387

    
388
        if (connectbuf[con_id]->internal_connect)
389
                udpgen = connectbuf[con_id]->external_socketID.internal_addr;
390
        else
391
                udpgen = connectbuf[con_id]->external_socketID.external_addr;
392

    
393
        do{
394
#ifdef FEC
395
                char *Pmsg = NULL;
396
                int npaksX2 = 0;
397
                void *code;
398
                char **src = NULL;
399
                char **pkt = NULL;
400
#endif
401
                offset = 0;
402
                retry = false;
403
                // Monitoring layer hook
404
                if(set_Monitoring_header_data_cb != NULL) {
405
                        iov[2].iov_len = ((set_Monitoring_header_data_cb) (&(connectbuf[con_id]->external_socketID), msg_type));
406
                }
407
                msg_h.len_mon_data_hdr = iov[2].iov_len;
408

    
409
                if(get_Send_data_inf_cb != NULL && iov[2].iov_len != 0) {
410
                        mon_data_inf sd_data_inf;
411

    
412
                        memset(h_data, 0, MON_DATA_HEADER_SPACE);
413

    
414
                        sd_data_inf.remote_socketID = &(connectbuf[con_id]->external_socketID);
415
#ifdef FEC
416
                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize){
417
                           //@add padding bits to msg!
418
                           int npaks=0;
419
                           int toffset=0;
420
                           int tpkt_len=connectbuf[con_id]->pmtusize;
421
                           int ipad = (connectbuf[con_id]->pmtusize-(msg_len%(connectbuf[con_id]->pmtusize)));
422
                           Pmsg = (char*) malloc((msg_len + ipad)*sizeof ( char* ));
423
                            for(i=0; i<msg_len; i++){
424
                                *(Pmsg+i)=*(((char *)msg)+i);
425
                                icnt++;
426
                            }
427
                            for(i=msg_len; i<(msg_len+ipad); i++){
428
                                *(Pmsg+i)=0;
429
                                icnt++;
430
                            }
431
                            msg=Pmsg;
432
                            msg_len=(msg_len+ipad);
433
                            npaks=(int)(msg_len/connectbuf[con_id]->pmtusize);
434
                            npaksX2=2*npaks; //2 times.
435
                            src = ( char ** ) malloc ( npaksX2 * sizeof ( char* ));
436
                            pkt = ( char ** ) malloc ( npaksX2 * sizeof ( char* ));
437
                            code = fec_new(npaks,256);
438
                            for(i=0; i<npaks; i++){
439
                              src[i]= (msg + toffset);
440
                              toffset += tpkt_len;
441
                            }
442
                            for(i=npaks; i<npaksX2; i++){//X2
443
                              src[i] = malloc( tpkt_len * sizeof ( char ) );
444
                            }
445
                            for(i=0; i<npaksX2; i++){//X2
446
                             pkt[i] = ( char* )malloc( tpkt_len * sizeof ( char ) );
447
                             fec_encode(code, src, pkt[i], i, tpkt_len) ;
448
                            }
449
                            for(i=npaks; i<npaksX2; i++){//X2
450
                              free(src[i]);
451
                            }
452
                        }
453
#endif
454
                        sd_data_inf.buffer = msg;
455
                        sd_data_inf.bufSize = msg_len;
456
                        sd_data_inf.msgtype = msg_type;
457
                        sd_data_inf.monitoringDataHeader = iov[2].iov_base;
458
                        sd_data_inf.monitoringDataHeaderLen = iov[2].iov_len;
459
                        sd_data_inf.priority = sParams->priority;
460
                        sd_data_inf.padding = sParams->padding;
461
                        sd_data_inf.confirmation = sParams->confirmation;
462
                        sd_data_inf.reliable = sParams->reliable;
463
                        memset(&sd_data_inf.arrival_time, 0, sizeof(struct timeval));
464

    
465
                        (get_Send_data_inf_cb) ((void *) &sd_data_inf);
466
                }
467

    
468
                do {
469
                        bool break2 = false;
470

    
471
                        if(set_Monitoring_header_pkt_cb != NULL) {
472
                                iov[1].iov_len = (set_Monitoring_header_pkt_cb) (&(connectbuf[con_id]->external_socketID), msg_type);
473
                        }
474
#ifdef FEC
475
                        pkt_len = min(connectbuf[con_id]->pmtusize, chk_msg_len - offset) ;
476
#else
477
                        pkt_len = min(connectbuf[con_id]->pmtusize - iov[2].iov_len - iov[1].iov_len - iov[0].iov_len, msg_len - offset) ;
478
#endif
479
                        iov[3].iov_len = pkt_len;
480
#ifdef FEC
481
                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){ //&& lcnt<(msg_len/connectbuf[con_id]->pmtusize)
482
                              iov[3].iov_base = pkt[lcnt];
483
                              chk_msg_len=(msg_len*2); //half-rate.
484
                        } else {
485
                              iov[3].iov_base = msg + offset;
486
                              chk_msg_len=msg_len;
487
                        }
488
#else        
489
                        iov[3].iov_base = msg + offset;
490
#endif
491

    
492
                        //fill header
493
                        msg_h.len_mon_packet_hdr = iov[1].iov_len;
494
                        msg_h.offset = htonl(offset);
495
                        msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
496

    
497
#ifdef FEC
498
                        if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
499
                                mon_pkt_inf pkt_info;
500
                                memset(h_pkt,0,MON_PKT_HEADER_SPACE);
501
                                pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
502
                                if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){
503
                                    pkt_info.buffer = pkt[lcnt];
504
                                    chk_msg_len=(msg_len*2);
505
                                } else {
506
                                    pkt_info.buffer = msg + offset;
507
                                    chk_msg_len=msg_len;
508
                                }
509

    
510
                                pkt_info.bufSize = pkt_len;
511
                                pkt_info.msgtype = msg_type;
512
                                pkt_info.dataID = connectbuf[con_id]->seqnr;
513
                                pkt_info.offset = offset;
514
                                pkt_info.datasize = msg_len;
515
                                pkt_info.monitoringHeaderLen = iov[1].iov_len;
516
                                pkt_info.monitoringHeader = iov[1].iov_base;
517
                                pkt_info.ttl = -1;
518
                                memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
519
                                (get_Send_pkt_inf_cb) ((void *) &pkt_info);
520
                        }
521
#endif
522

    
523

    
524
                        debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
525
                        int priority = 0; 
526
                        if ((msg_type == ML_CON_MSG)
527
#ifdef RTX
528
 || (msg_type == ML_NACK_MSG)
529
#endif
530
) priority = HP;
531
                        //fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
532
                        switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
533
                                case MSGLEN:
534
                                        info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
535
                                        // TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
536
                                        connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
537
                                        if (connectbuf[con_id]->pmtusize > 0) {
538
                                                connectbuf[con_id]->delay = true;
539
                                                retry = true;
540
                                        }
541
                                        break2 = true;
542
                                        break;
543
                                case FAILURE:
544
                                        info("ML: sending message failed (to:%s conID:%d lconID:%d msgsize:%d msgtype:%d offset:%d)\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, msg_h.msg_type, offset);
545
                                        break2 = true;
546
                                        break;
547
                                case THROTTLE:
548
                                        debug("THROTTLE on output"); 
549
                                        break2 = true;
550
                                        break;
551
                                case OK:
552
#ifdef RTX
553
                                        if (msg_type < 127) counters.sentDataPktCounter++;
554
#endif
555
                                        //update
556
                                        offset += pkt_len;
557
#ifdef FEC
558
                                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){
559
                                          lcnt++;
560
                                        }
561
#endif
562
                                        //transmit data header only in the first packet
563
                                        iov[2].iov_len = 0;
564
                                        break;
565
                        }
566
                        if (break2) break;
567
#ifdef FEC
568
                } while(offset != chk_msg_len && !truncable);
569
                if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize){ //free the pointers.
570
                        free(Pmsg);
571
                        free(src);
572
                        for(i=0; i<npaksX2; i++) {
573
                          free(pkt[i]);
574
                        }
575
                        free(pkt);
576
                        fec_free(code);
577
                }
578
#else
579
                } while(offset != msg_len && !truncable);
580
#endif
581
        } while(retry);
582
        //fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
583
        //fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
584
}
585

    
586
void pmtu_timeout_cb(int fd, short event, void *arg);
587

    
588
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
589
        //monitoring layer hook
590
        if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
591
                mon_pkt_inf pkt_info;        
592

    
593
                struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
594

    
595
                memset(iov[1].iov_base,0,iov[1].iov_len);
596

    
597
                pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
598
                pkt_info.buffer = iov[3].iov_base;
599
                pkt_info.bufSize = iov[3].iov_len;
600
                pkt_info.msgtype = msg_h->msg_type;
601
                pkt_info.dataID = ntohl(msg_h->msg_seq_num);
602
                pkt_info.offset = ntohl(msg_h->offset);
603
                pkt_info.datasize = ntohl(msg_h->msg_length);
604
                pkt_info.monitoringHeaderLen = iov[1].iov_len;
605
                pkt_info.monitoringHeader = iov[1].iov_base;
606
                pkt_info.ttl = -1;
607
                memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
608

    
609
                (get_Send_pkt_inf_cb) ((void *) &pkt_info);
610
        }
611

    
612
         //struct msg_header *msg_h;
613
    //msg_h = (struct msg_header *) iov[0].iov_base;        
614

    
615
        //fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
616

    
617
        return sendPacketFinal(udpSocket, iov, len, socketaddr);
618
}
619

    
620
void reschedule_conn_msg(int con_id)
621
{
622
        if (connectbuf[con_id]->timeout_event) {
623
                /* delete old timout */        
624
                event_del(connectbuf[con_id]->timeout_event);
625
                event_free(connectbuf[con_id]->timeout_event);
626
        }
627
        connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
628
        evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
629
}
630

    
631
void send_conn_msg(int con_id, int buf_size, int command_type)
632
{
633
        if (buf_size < sizeof(struct conn_msg)) {
634
                error("ML: requested connection message size is too small\n");
635
                return;
636
        }
637

    
638
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
639
                connectbuf[con_id]->ctrl_msg_buf = malloc(buf_size);
640
                memset(connectbuf[con_id]->ctrl_msg_buf, 0, buf_size);
641
        }
642

    
643
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
644
                error("ML: can not allocate memory for connection message\n");
645
                return;
646
        }
647

    
648
        struct conn_msg *msg_header = (struct conn_msg*) connectbuf[con_id]->ctrl_msg_buf;
649

    
650
        msg_header->comand_type = command_type;
651
        msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
652

    
653
        memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
654
  {
655
                        char buf[SOCKETID_STRING_SIZE];
656
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
657
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
658
   }
659
        send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
660
}
661

    
662
void send_conn_msg_with_pmtu_discovery(int con_id, int buf_size, int command_type)
663
{
664
        struct timeval tout = {0,0};
665
        tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
666
        connectbuf[con_id]->timeout_value = tout;
667
        connectbuf[con_id]->trials = 1;
668
        send_conn_msg(con_id, buf_size, command_type);
669
        reschedule_conn_msg(con_id);
670
}
671

    
672
void resend_conn_msg(int con_id)
673
{
674
        connectbuf[con_id]->trials++;
675
        send_conn_msg(con_id, connectbuf[con_id]->pmtusize, connectbuf[con_id]->status);
676
        reschedule_conn_msg(con_id);
677
}
678

    
679
void recv_conn_msg(struct msg_header *msg_h, char *msgbuf, int msg_size, struct sockaddr_in *recv_addr)
680
{
681
        struct conn_msg *con_msg;
682
        int free_con_id, con_id;
683

    
684
        time_t now = time(NULL);
685
        double timediff = 0.0;
686
        char sock_id_str[1000];
687
        
688
        msgbuf += msg_h->len_mon_data_hdr;
689
        msg_size -= msg_h->len_mon_data_hdr;
690
        con_msg = (struct conn_msg *)msgbuf;
691
        
692
        //verify message validity
693
        if (msg_size < sizeof(struct conn_msg)) {
694
                char recv_addr_str[INET_ADDRSTRLEN];
695
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
696
                info("Invalid conn_msg received from %s\n", recv_addr_str);
697
                return;
698
        }
699

    
700
        //decode sock_id for debug messages
701
        mlSocketIDToString(&con_msg->sock_id,sock_id_str,999);
702

    
703
        if (con_msg->sock_id.internal_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr &&
704
            con_msg->sock_id.external_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr   ) {
705
                char recv_addr_str[INET_ADDRSTRLEN];
706
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
707
                info("Conn msg received from %s, but claims to be from %s", recv_addr_str, sock_id_str);
708
                return;
709
        }
710

    
711
        // Monitoring layer hook
712
        if(get_Recv_data_inf_cb != NULL) {
713
                // update pointer to the real data
714
                mon_data_inf recv_data_inf;
715
                recv_data_inf.remote_socketID = &(con_msg->sock_id);
716
                recv_data_inf.buffer = msgbuf;
717
                recv_data_inf.bufSize = msg_size;
718
                recv_data_inf.msgtype = msg_h->msg_type;
719
                recv_data_inf.monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
720
                recv_data_inf.monitoringDataHeader = msg_h->len_mon_data_hdr ? msgbuf : NULL;
721
                gettimeofday(&recv_data_inf.arrival_time, NULL);
722
                recv_data_inf.firstPacketArrived = true;
723
                recv_data_inf.recvFragments = 1;
724
                recv_data_inf.priority = false;
725
                recv_data_inf.padding = false;
726
                recv_data_inf.confirmation = false;
727
                recv_data_inf.reliable = false;
728

    
729
                // send data recv callback to monitoring module
730
                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
731
        }
732

    
733
        // check the connection command type
734
        switch (con_msg->comand_type) {
735
                /*
736
                * if INVITE: enter a new socket make new entry in connect array
737
                * send an ok
738
                */
739
                case INVITE:
740
                        info("ML: received INVITE from %s (size:%d)\n", sock_id_str, msg_size);
741
                        /*
742
                        * check if another connection for the external connectionID exist
743
                        * that was established within the last 2 seconds
744
                        */
745
                        free_con_id = -1;
746
                        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
747
                                if (connectbuf[con_id] != NULL) {
748
                                        if (mlCompareSocketIDs(&(connectbuf[con_id]->external_socketID), &(con_msg->sock_id)) == 0) {
749
                                                //timediff = difftime(now, connectbuf[con_id]->starttime);        //TODO: why this timeout? Shouldn't the connection be closed instead if there is a timeout?
750
                                                //if (timediff < 2)
751
                                                //update remote connection ID
752
                                                if (connectbuf[con_id]->external_connectionID != msg_h->local_con_id) {
753
                                                        warn("ML: updating remote connection ID for %s: from %d to %d\n",sock_id_str, connectbuf[con_id]->external_connectionID, msg_h->local_con_id);
754
                                                        connectbuf[con_id]->external_connectionID = msg_h->local_con_id;
755
                                                }
756
                                                break;
757
                                        }
758
                                } else if(free_con_id == -1)
759
                                        free_con_id = con_id;
760
                        }
761

    
762
                        if (con_id == CONNECTBUFSIZE) {
763
                                // create an entry in the connecttrybuf
764
                                if(free_con_id == -1) {
765
                                        error("ML: no new connect_buf available\n");
766
                                        return;
767
                                }
768
                                connectbuf[free_con_id] = (connect_data *) malloc(sizeof(connect_data));
769
                                memset(connectbuf[free_con_id],0,sizeof(connect_data));
770
                                connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
771
                                connectbuf[free_con_id]->starttime = time(NULL);
772
                                memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
773
                //Workaround to support reuse of socketID
774
                                connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
775
                                connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
776
                                connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;        // bootstrap pmtu from the other's size. Not strictly needed, but a good hint
777
                                connectbuf[free_con_id]->timeout_event = NULL;
778
                                connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
779
                                connectbuf[free_con_id]->internal_connect =
780
                                        (con_msg->sock_id.internal_addr.udpaddr.sin_addr.s_addr == recv_addr->sin_addr.s_addr);
781
                                con_id = free_con_id;
782
                        }
783

    
784
                        //if(connectbuf[con_id]->status <= CONNECT) { //TODO: anwer anyway. Why the outher would invite otherwise?
785
                                //update status and send back answer
786
                                connectbuf[con_id]->status = CONNECT;
787
                                send_conn_msg_with_pmtu_discovery(con_id, con_msg->pmtu_size, CONNECT);
788
                        //}
789
                        break;
790
                case CONNECT:
791
                        info("ML: received CONNECT from %s (size:%d)\n", sock_id_str, msg_size);
792

    
793
                        if(msg_h->remote_con_id != -1 && connectbuf[msg_h->remote_con_id] == NULL) {
794
                                error("ML: received CONNECT for inexistent connection rconID:%d\n",msg_h->remote_con_id);
795
                                return;
796
                        }
797

    
798
                        /*
799
                        * check if the connection status is not already 1 or 2
800
                        */
801
                        if (connectbuf[msg_h->remote_con_id]->status == INVITE) {
802
                                // set the external connectionID
803
                                connectbuf[msg_h->remote_con_id]->external_connectionID = msg_h->local_con_id;
804
                                // change status con_msg the connection_data
805
                                connectbuf[msg_h->remote_con_id]->status = READY;
806
                                // change pmtusize in the connection_data: not needed. receiving a CONNECT means our INVITE went through. So why change pmtu?
807
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
808

    
809
                                // send the READY
810
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
811

    
812
                                if (receive_Connection_cb != NULL)
813
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
814

    
815
                                // call all registered callbacks
816
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
817
                                        struct receive_connection_cb_list *temp;
818
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
819
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
820
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
821
                                        free(temp);
822
                                }
823
                                connectbuf[msg_h->remote_con_id]->connection_head =
824
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
825
                        } else {
826
                                // send the READY
827
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
828
                        }
829

    
830
                        debug("ML: active connection established\n");
831
                        break;
832

    
833
                        /*
834
                        * if READY: find the entry in the connection array set the
835
                        * connection active change the pmtu size
836
                        */
837
                case READY:
838
                        info("ML: received READY from %s (size:%d)\n", sock_id_str, msg_size);
839
                        if(connectbuf[msg_h->remote_con_id] == NULL) {
840
                                error("ML: received READY for inexistent connection\n");
841
                                return;
842
                        }
843
                        /*
844
                        * checks if the connection is not already established
845
                        */
846
                        if (connectbuf[msg_h->remote_con_id]->status == CONNECT) {
847
                                // change status of the connection
848
                                connectbuf[msg_h->remote_con_id]->status = READY;
849
                                // change pmtusize: not needed. pmtu doesn't have to be symmetric
850
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
851

    
852
                                if (receive_Connection_cb != NULL)
853
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
854

    
855
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
856
                                        struct receive_connection_cb_list *temp;
857
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
858
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
859
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
860
                                        free(temp);
861
                                }
862
                                connectbuf[msg_h->remote_con_id]->connection_head =
863
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
864
                                debug("ML: passive connection established\n");
865
                        }
866
                        break;
867
        }
868
}
869

    
870
void recv_stun_msg(char *msgbuf, int recvSize)
871
{
872
        /*
873
        * create empty stun message struct
874
        */
875
        StunMessage resp;
876
        memset(&resp, 0, sizeof(StunMessage));
877
        /*
878
        * parse the message
879
        */
880
        int returnValue = 0;
881
        returnValue = recv_stun_message(msgbuf, recvSize, &resp);
882

    
883
        if (returnValue == 0) {
884
                /*
885
                * read the reflexive Address into the local_socketID
886
                */
887
                struct sockaddr_in reflexiveAddr = {0};
888
                reflexiveAddr.sin_family = AF_INET;
889
                reflexiveAddr.sin_addr.s_addr = htonl(resp.mappedAddress.ipv4.addr);
890
                reflexiveAddr.sin_port = htons(resp.mappedAddress.ipv4.port);
891
                socketaddrgen reflexiveAddres = {0};
892
                reflexiveAddres.udpaddr = reflexiveAddr;
893
                local_socketID.external_addr = reflexiveAddres;
894
                NAT_traversal = true;
895
                // callback to the upper layer indicating that the socketID is now
896
                // ready to use
897
                {
898
                        char buf[SOCKETID_STRING_SIZE];
899
                        mlSocketIDToString(&local_socketID,buf,sizeof(buf));
900
                         debug("received local socket_address: %s\n", buf);
901
                }
902
                (receive_SocketID_cb) (&local_socketID, 0);
903
        }
904
}
905

    
906
//done
907
void recv_timeout_cb(int fd, short event, void *arg)
908
{
909
        int recv_id = (long) arg;
910
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
911

    
912
        if (recvdatabuf[recv_id] == NULL) {
913
                return;
914
        }
915

    
916

    
917
/*        if(recvdatabuf[recv_id]->status == ACTIVE) {
918
                //TODO make timeout at least a DEFINE
919
                struct timeval timeout = { 4, 0 };
920
                recvdatabuf[recv_id]->status = INACTIVE;
921
                event_base_once(base, -1, EV_TIMEOUT, recv_timeout_cb,
922
                        arg, &timeout);
923
                return;
924
        }
925
*/
926

    
927
        if(recvdatabuf[recv_id]->status == ACTIVE) {
928
                // Monitoring layer hook
929
                if(get_Recv_data_inf_cb != NULL) {
930
                        mon_data_inf recv_data_inf;
931

    
932
                        recv_data_inf.remote_socketID =
933
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
934
                        recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
935
                        recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
936
                        recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
937
                        recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
938
                        recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
939
                                recvdatabuf[recv_id]->recvbuf : NULL;
940
                        gettimeofday(&recv_data_inf.arrival_time, NULL);
941
                        recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
942
                        recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
943
                        recv_data_inf.priority = false;
944
                        recv_data_inf.padding = false;
945
                        recv_data_inf.confirmation = false;
946
                        recv_data_inf.reliable = false;
947

    
948
                        // send data recv callback to monitoring module
949

    
950
//                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
951
                }
952

    
953
                // Get the right callback
954
                receive_data_cb receive_data_callback = recvcbbuf[recvdatabuf[recv_id]->msgtype];
955

    
956
                recv_params rParams;
957

    
958
                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->arrivedBytes;
959
                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
960
                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
961
                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
962
                rParams.remote_socketID =
963
                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
964
                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
965

    
966
#ifdef RTX
967
                counters.receivedIncompleteMsgCounter++;
968
                //mlShowCounters();
969
                //fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);                
970
#endif
971
                 //(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
972

    
973
                //clean up
974
                if (recvdatabuf[recv_id]->timeout_event) {
975
                        event_del(recvdatabuf[recv_id]->timeout_event);
976
                        event_free(recvdatabuf[recv_id]->timeout_event);
977
                        recvdatabuf[recv_id]->timeout_event = NULL;
978
                }
979
                free(recvdatabuf[recv_id]->recvbuf);
980
#ifdef FEC
981
                free(recvdatabuf[recv_id]->pix);
982
                free(recvdatabuf[recv_id]->pix_chk);
983
#endif
984
                free(recvdatabuf[recv_id]);
985
                recvdatabuf[recv_id] = NULL;
986
        }
987
}
988

    
989
// process a single recv data message
990
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
991
{
992
#ifdef FEC
993
        void *code;
994
        int n, k, i, j;
995
        n=4;
996
        k=64;
997
        char **src;
998
#endif
999
        debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d inlength: %d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset, msg_h->msg_length);
1000

    
1001
        int recv_id, free_recv_id = -1;
1002
        int pmtusize;
1003

    
1004
        if(connectbuf[msg_h->remote_con_id] == NULL) {
1005
                debug("ML: Received a message not related to any opened connection!\n");
1006
                return;
1007
        }
1008
        pmtusize = connectbuf[msg_h->remote_con_id]->pmtusize;
1009

    
1010
#ifdef RTX
1011
        counters.receivedDataPktCounter++;
1012
#endif        
1013
        // check if a recv_data exist and enter data
1014
        for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++) {
1015
                if (recvdatabuf[recv_id] != NULL) {
1016
                        if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID &&
1017
                                        msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
1018
                                                break;
1019
                } else
1020
                        if(free_recv_id == -1)
1021
                                free_recv_id = recv_id;
1022
  }
1023

    
1024
#ifdef FEC
1025
        // Consider only first k packets and decline the rest >k packets.
1026
        if((msg_h->msg_type==17) && ((msg_h->msg_length + msg_h->len_mon_data_hdr)>pmtusize) && (chk_complete[msg_h->msg_seq_num]==1)){
1027
          return;
1028
        }
1029
#endif
1030

    
1031
        if(recv_id == RECVDATABUFSIZE) {
1032
                debug(" recv id not found (free found: %d)\n", free_recv_id);
1033
                //no recv_data found: create one
1034
                recv_id = free_recv_id;
1035
                recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
1036
                memset(recvdatabuf[recv_id], 0, sizeof(recvdata));
1037
                recvdatabuf[recv_id]->connectionID = msg_h->remote_con_id;
1038
                recvdatabuf[recv_id]->seqnr = msg_h->msg_seq_num;
1039
                recvdatabuf[recv_id]->monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
1040
                recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
1041
                recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
1042
                recvdatabuf[recv_id]->arrivedBytes = 0;        //count this without the Mon headers
1043
#ifdef RTX
1044
                recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
1045
                recvdatabuf[recv_id]->expectedOffset = 0;
1046
                recvdatabuf[recv_id]->gapCounter = 0;
1047
                recvdatabuf[recv_id]->firstGap = 0;
1048
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1049
#endif
1050

    
1051
                /*
1052
                * read the timeout data and set it
1053
                */
1054
                recvdatabuf[recv_id]->timeout_value = recv_timeout;
1055
                recvdatabuf[recv_id]->timeout_event = NULL;
1056
                recvdatabuf[recv_id]->recvID = recv_id;
1057
                recvdatabuf[recv_id]->starttime = time(NULL);
1058
                recvdatabuf[recv_id]->msgtype = msg_h->msg_type;
1059

    
1060
#ifdef FEC
1061
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize){
1062
                  chk_complete[msg_h->msg_seq_num]=0;
1063
                  recvdatabuf[recv_id]->nix=0;
1064
                  recvdatabuf[recv_id]->pix = ( int * ) malloc ( (recvdatabuf[recv_id]->bufsize/pmtusize) * sizeof ( int ));
1065
                  recvdatabuf[recv_id]->pix_chk = ( int * ) malloc ( (recvdatabuf[recv_id]->bufsize/pmtusize) * sizeof ( int ));
1066
                  for(i=0;i<(recvdatabuf[recv_id]->bufsize/pmtusize);i++)
1067
                      recvdatabuf[recv_id]->pix_chk[i] = 0;
1068
                }
1069
#endif
1070

    
1071
                // fill the buffer with zeros
1072
                memset(recvdatabuf[recv_id]->recvbuf, 0, recvdatabuf[recv_id]->bufsize);
1073
                debug(" new @ id:%d\n",recv_id);
1074
        } else {        //message structure already exists, no need to create new
1075
                debug(" found @ id:%d (arrived before this packet: bytes:%d fragments%d\n",recv_id, recvdatabuf[recv_id]->arrivedBytes, recvdatabuf[recv_id]->recvFragments);
1076
        }
1077

    
1078
        //if first packet extract mon data header and advance pointer
1079
        if (msg_h->offset == 0) {
1080
                //fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
1081
                memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
1082
                msgbuf += msg_h->len_mon_data_hdr;
1083
                bufsize -= msg_h->len_mon_data_hdr;
1084
                recvdatabuf[recv_id]->firstPacketArrived = 1;
1085
        }
1086

    
1087

    
1088
        // increment fragmentnr
1089
        recvdatabuf[recv_id]->recvFragments++;
1090
        // increment the arrivedBytes
1091
        recvdatabuf[recv_id]->arrivedBytes += bufsize; 
1092

    
1093
        //fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
1094

    
1095
        // enter the data into the buffer
1096
#ifdef FEC
1097
        if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize && recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]==0)
1098
          memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + (recvdatabuf[recv_id]->nix)*pmtusize, msgbuf, bufsize);
1099
        else
1100
          memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
1101
        //TODO very basic checkif all fragments arrived: has to be reviewed
1102
#else
1103
        memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
1104
#endif
1105

    
1106
#ifdef RTX
1107
        // detecting a new gap        
1108
        if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
1109
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
1110
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
1111
                if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
1112
                event_base_once(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id, &pkt_recv_timeout);
1113
        }
1114
        
1115
        //filling the gap by delayed packets
1116
        if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
1117
                counters.receivedRTXDataPktCounter++;
1118
                //skip retransmitted packets
1119
                if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
1120
                        int i;
1121
                        //fprintf(stderr,"firstGap: %d        gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
1122
                        for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
1123
                                if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
1124
                                        recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
1125
                                        break;
1126
                                }
1127
                                if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
1128
                                        recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
1129
                                        break;
1130
                                }
1131
                        }
1132
                } else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
1133
                        //counters.receivedRTXDataPktCounter++;
1134
                        }
1135
        }
1136

    
1137
        //updating the expectedOffset        
1138
        if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
1139
#endif
1140

    
1141
        //TODO very basic checkif all fragments arrived: has to be reviewed
1142
        if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) {
1143
                recvdatabuf[recv_id]->status = COMPLETE; //buffer full -> msg completly arrived
1144
#ifdef FEC
1145
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize){
1146
                  chk_complete[msg_h->msg_seq_num]=1;
1147
                  prev_sqnr=recvdatabuf[recv_id]->seqnr;
1148
                  int npaks=0;
1149
                  int toffset=20;
1150
                  int tpkt_len=pmtusize;
1151
                  npaks=(int)(recvdatabuf[recv_id]->bufsize/pmtusize);
1152
                  src = ( char ** )malloc ( npaks * sizeof ( char * ));
1153
                  code = fec_new(npaks,256);
1154
                  for(i=0; i<npaks; i++){
1155
                      src[i] = ( char * )malloc(tpkt_len * sizeof ( char ) );
1156
                      for(j=0; j<tpkt_len; j++){
1157
                        if (toffset+j < recvdatabuf[recv_id]->bufsize) {
1158
                          *(src[i]+j)=*(recvdatabuf[recv_id]->recvbuf+toffset+j);
1159
                        }else {
1160
                          *(src[i]+j)=0;
1161
                        }
1162
                      }
1163
                      toffset += tpkt_len;
1164
                  }
1165
                  recvdatabuf[recv_id]->pix[recvdatabuf[recv_id]->nix]=(int)(msg_h->offset/pmtusize);
1166
                  fec_decode(code, src, recvdatabuf[recv_id]->pix, tpkt_len);
1167
                  toffset=20;
1168
                  for(i=0; i<npaks; i++){
1169
                    for(j=0; j<tpkt_len; j++){
1170
                      if (toffset+j < recvdatabuf[recv_id]->bufsize) {
1171
                        *(recvdatabuf[recv_id]->recvbuf+toffset+j)=*(src[i]+j);
1172
                      }
1173
                    }
1174
                    toffset+=tpkt_len;
1175
                  }
1176
                    fec_free(code);
1177
                    for(i=0; i<npaks; i++){
1178
                      free(src[i]);
1179
                    }
1180
                    free(src);
1181
                    free(recvdatabuf[recv_id]->pix);
1182
                    free(recvdatabuf[recv_id]->pix_chk);
1183
                    nix=0;
1184
                    recvdatabuf[recv_id]->nix=0;
1185
                }
1186
#endif
1187
        } else {
1188
                recvdatabuf[recv_id]->status = ACTIVE;
1189
#ifdef FEC
1190
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize && recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]==0){
1191
                  recvdatabuf[recv_id]->pix[recvdatabuf[recv_id]->nix]=(int)(msg_h->offset/pmtusize);
1192
                  recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]=1;
1193
                  recvdatabuf[recv_id]->nix++;
1194
                }
1195
#endif
1196
        }
1197

    
1198
        if (recv_data_callback) {
1199
                if(recvdatabuf[recv_id]->status == COMPLETE) {
1200
                        // Monitoring layer hook
1201
                        if(get_Recv_data_inf_cb != NULL) {
1202
                                mon_data_inf recv_data_inf;
1203

    
1204
                                recv_data_inf.remote_socketID =
1205
                                         &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1206
                                recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
1207
                                recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
1208
                                recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
1209
                                recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
1210
                                recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
1211
                                        recvdatabuf[recv_id]->recvbuf : NULL;
1212
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
1213
                                recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1214
                                recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
1215
                                recv_data_inf.priority = false;
1216
                                recv_data_inf.padding = false;
1217
                                recv_data_inf.confirmation = false;
1218
                                recv_data_inf.reliable = false;
1219

    
1220
                                // send data recv callback to monitoring module
1221

    
1222
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
1223
                        }
1224

    
1225
                        // Get the right callback
1226
                        receive_data_cb receive_data_callback = recvcbbuf[msg_h->msg_type];
1227
                        if (receive_data_callback) {
1228

    
1229
                                recv_params rParams;
1230

    
1231
                                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen - recvdatabuf[recv_id]->arrivedBytes;
1232
                                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
1233
                                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
1234
                                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
1235
                                rParams.remote_socketID =
1236
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1237

    
1238
                                char str[1000];
1239
                                mlSocketIDToString(rParams.remote_socketID,str,999);
1240
                                debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
1241
                                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1242

    
1243
#ifdef RTX
1244
                                counters.receivedCompleteMsgCounter++;
1245
                                //mlShowCounters();
1246
#endif
1247

    
1248
                                (receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1249
                                        recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1250
                        } else {
1251
                            warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1252
                        }
1253
                        
1254
                        //clean up
1255
                        if (recvdatabuf[recv_id]->timeout_event) {
1256
                                debug("ML: freeing timeout for %d",recv_id);
1257
                                event_del(recvdatabuf[recv_id]->timeout_event);
1258
                                event_free(recvdatabuf[recv_id]->timeout_event);
1259
                                recvdatabuf[recv_id]->timeout_event = NULL;
1260
                        } else {
1261
                                debug("ML: received in 1 packet\n",recv_id);
1262
                        }
1263
#ifdef RTX
1264
                        if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1265
                                debug("ML: freeing last packet timeout for %d",recv_id);
1266
                                event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1267
                                event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1268
                                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1269
                        }
1270
                        //fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);        
1271
#endif
1272
                        free(recvdatabuf[recv_id]->recvbuf);
1273
                        free(recvdatabuf[recv_id]);
1274
                        recvdatabuf[recv_id] = NULL;
1275
                } else { // not COMPLETE
1276
                        if (!recvdatabuf[recv_id]->timeout_event) {
1277
                                //start time out
1278
                                //TODO make timeout at least a DEFINE
1279
                                recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
1280
                                evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1281
#ifdef RTX
1282
                                recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1283
                                evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1284
#endif
1285
                        }
1286
                }
1287
        }
1288
}
1289

    
1290
//done
1291
void pmtu_timeout_cb(int fd, short event, void *arg)
1292
{
1293

    
1294
        int con_id = (long) arg;
1295
        pmtu new_pmtusize;
1296

    
1297
        debug("ML: pmtu timeout called (lcon:%d)\n",con_id);
1298

    
1299
        if(connectbuf[con_id] == NULL) {
1300
                error("ML: pmtu timeout called on non existing con_id\n");
1301
                return;
1302
        }
1303

    
1304
        if(connectbuf[con_id]->status == READY) {
1305
                // nothing to do anymore
1306
                event_del(connectbuf[con_id]->timeout_event);
1307
                event_free(connectbuf[con_id]->timeout_event);
1308
                connectbuf[con_id]->timeout_event = NULL;
1309
                return;
1310
        }
1311

    
1312
        info("ML: pmtu timeout while connecting(to:%s lcon:%d status:%d size:%d trial:%d tout:%ld.%06ld)\n",conid_to_string(con_id), con_id, connectbuf[con_id]->status, connectbuf[con_id]->pmtusize, connectbuf[con_id]->trials, connectbuf[con_id]->timeout_value.tv_sec, connectbuf[con_id]->timeout_value.tv_usec);
1313

    
1314
        if(connectbuf[con_id]->delay || connectbuf[con_id]->trials == MAX_TRIALS - 1) {
1315
                double delay = connectbuf[con_id]->timeout_value.tv_sec + connectbuf[con_id]->timeout_value.tv_usec / 1000000.0;
1316
                delay = delay * 2;
1317
                info("\tML: increasing pmtu timeout to %f sec\n", delay);
1318
                connectbuf[con_id]->timeout_value.tv_sec = floor(delay);
1319
                connectbuf[con_id]->timeout_value.tv_usec = fmod(delay, 1.0) * 1000000.0;
1320
                if(connectbuf[con_id]->delay) {
1321
                        connectbuf[con_id]->delay = false;
1322
                        reschedule_conn_msg(con_id);
1323
                }
1324
        }
1325

    
1326
        if(connectbuf[con_id]->trials == MAX_TRIALS) {
1327
                // decrement the pmtu size
1328
                struct timeval tout = {0,0};
1329
                tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
1330
                info("\tML: decreasing pmtu estimate from %d to %d\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize));
1331
                connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
1332
                connectbuf[con_id]->timeout_value = tout; 
1333
                connectbuf[con_id]->trials = 0;
1334
        }
1335

    
1336
        //error in PMTU discovery?
1337
        if (connectbuf[con_id]->pmtusize == P_ERROR) {
1338
                if (connectbuf[con_id]->internal_connect == true) {
1339
                        //as of now we tried directly connecting, now let's try trough the NAT
1340
                        connectbuf[con_id]->internal_connect = false;
1341
                        connectbuf[con_id]->pmtusize = DSLSLIM;
1342
                } else {
1343
                        //nothing to do we have to give up
1344
                        error("ML: Could not create connection with connectionID %i!\n",con_id);
1345
                        // envoke the callback for failed connection establishment
1346
                        if(failed_Connection_cb != NULL)
1347
                                (failed_Connection_cb) (con_id, NULL);
1348
                        // delete the connection entry
1349
                        mlCloseConnection(con_id);
1350
                        return;
1351
                }
1352
        }
1353

    
1354
        //retry
1355
        resend_conn_msg(con_id);
1356
}
1357

    
1358

    
1359
int schedule_pmtu_timeout(int con_id)
1360
{
1361
        if (! connectbuf[con_id]->timeout_event) {
1362
                struct timeval tout = {0,0};
1363
                tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
1364
                connectbuf[con_id]->timeout_value = tout;
1365
                connectbuf[con_id]->trials = 1;
1366
                connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
1367
                evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
1368
        }
1369
}
1370

    
1371
/*
1372
 * decrements the mtu size
1373
 */
1374
pmtu pmtu_decrement(pmtu pmtusize)
1375
{
1376
        pmtu pmtu_return_size;
1377
        switch(pmtusize) {
1378
        case MAX:
1379
                //return DSL;
1380
                return DSLSLIM;        //shortcut to use less vales
1381
        case DSL:
1382
                return DSLMEDIUM;
1383
        case DSLMEDIUM:
1384
                return DSLSLIM;
1385
        case DSLSLIM:
1386
                //return BELOWDSL;
1387
                return MIN;        //shortcut to use less vales
1388
        case BELOWDSL:
1389
                return MIN;
1390
        case MIN:
1391
                return P_ERROR;
1392
        default:
1393
                warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
1394
                return MIN;
1395
        }
1396
}
1397

    
1398
// called when an ICMP pmtu error message (type 3, code 4) is received
1399
void pmtu_error_cb_th(char *msg, int msglen)
1400
{
1401
        debug("ML: pmtu_error callback called msg_size: %d\n",msglen);
1402
        //TODO debug
1403
        return;
1404

    
1405
    char *msgbufptr = NULL;
1406
    int msgtype;
1407
    int connectionID;
1408
    pmtu pmtusize;
1409
    pmtu new_pmtusize;
1410
    int dead = 0;
1411

    
1412
    // check the packettype
1413
    msgbufptr = &msg[0];
1414

    
1415
    // check the msgtype
1416
    msgbufptr = &msg[1];
1417
    memcpy(&msgtype, msgbufptr, 4);
1418

    
1419
    if (msgtype == 0) {
1420

    
1421
        // get the connectionID
1422
        msgbufptr = &msg[5];
1423
        memcpy(&connectionID, msgbufptr, 4);
1424

    
1425
        int msgtype_c = connectbuf[connectionID]->status;
1426
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1427

    
1428
        if (msgtype_c != msgtype) {
1429
            dead = 1;
1430
        }
1431

    
1432

    
1433
    } else if (msgtype == 1) {
1434

    
1435
        // read the connectionID
1436
        msgbufptr = &msg[9];
1437
        memcpy(&connectionID, msgbufptr, 4);
1438

    
1439
        int msgtype_c = connectbuf[connectionID]->status;
1440
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1441

    
1442
        if (msgtype_c != msgtype) {
1443
            dead = 1;
1444
        }
1445

    
1446
    }
1447
    // decrement the pmtu size
1448
    new_pmtusize = pmtu_decrement(pmtusize);
1449

    
1450
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1451

    
1452
    if (new_pmtusize == P_ERROR) {
1453
                error("ML:  Could not create connection with connectionID %i !\n",
1454
                        connectionID);
1455

    
1456
                if(failed_Connection_cb != NULL)
1457
                        (failed_Connection_cb) (connectionID, NULL);
1458
                // set the message type to a non existent message
1459
                msgtype = 2;
1460
                // delete the connection entry
1461
                 mlCloseConnection(connectionID);
1462
        }
1463

    
1464
    if (msgtype == 0 && dead != 1) {
1465

    
1466
        // stop the timeout event
1467
        // timeout_del(connectbuf[connectionID]->timeout);
1468
        /*
1469
         * libevent2
1470
         */
1471

    
1472
        // event_del(connectbuf[connectionID]->timeout);
1473

    
1474

    
1475
        // create and send a connection message
1476
//         create_conn_msg(new_pmtusize, connectionID,
1477
//                         &local_socketID, INVITE);
1478

    
1479
//        send_conn_msg(connectionID, new_pmtusize);
1480

    
1481
        // set a timeout event for the pmtu discovery
1482
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1483
        // *)&connectionID);
1484

    
1485
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1486

    
1487
        /*
1488
         * libevent2
1489
         */
1490

    
1491
        struct event *ev;
1492
        ev = evtimer_new(base, pmtu_timeout_cb,
1493
                         (void *) connectbuf[connectionID]);
1494

    
1495
        // connectbuf[connectionID]->timeout = ev;
1496

    
1497
        event_add(ev, &connectbuf[connectionID]->timeout_value);
1498

    
1499
    } else if (msgtype == 1 && dead != 1) {
1500

    
1501
        // stop the timeout event
1502
        // timeout_del(connectbuf[connectionID]->timeout);
1503

    
1504
        /*
1505
         * libevent2
1506
         */
1507
        // info("still here 11 \n");
1508
        // printf("ev %d \n",connectbuf[connectionID]->timeout);
1509
        // event_del(connectbuf[connectionID]->timeout );
1510
        // evtimer_del(connectbuf[connectionID]->timeout );
1511

    
1512

    
1513
//         // create and send a connection message
1514
//         create_conn_msg(new_pmtusize,
1515
//                         connectbuf[connectionID]->connectionID,
1516
//                         NULL, CONNECT);
1517

    
1518
        //send_conn_msg(connectionID, new_pmtusize);
1519

    
1520
        // set a timeout event for the pmtu discovery
1521
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1522
        // *)&connectionID);
1523
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1524

    
1525
        /*
1526
         * libevent2
1527
         */
1528
        // struct event *ev;
1529
        // ev = evtimer_new(base,pmtu_timeout_cb, (void
1530
        // *)connectbuf[connectionID]);
1531
        // connectbuf[connectionID]->timeout = ev;
1532
        // event_add(ev,&connectbuf[connectionID]->timeout_value);
1533

    
1534
    }
1535
}
1536

    
1537
/*
1538
 * what to do once a packet arrived if it is a conn packet send it to
1539
 * recv_conn handler if it is a data packet send it to the recv_data
1540
 * handler
1541
 */
1542

    
1543
//done --
1544
void recv_pkg(int fd, short event, void *arg)
1545
{
1546
        debug("ML: recv_pkg called\n");
1547

    
1548
        struct msg_header *msg_h;
1549
        char msgbuf[MAX];
1550
        pmtu recvSize = MAX;
1551
        char *bufptr = msgbuf;
1552
        int ttl;
1553
        struct sockaddr_in recv_addr;
1554
        int msg_size;
1555

    
1556
        recvPacket(fd, msgbuf, &recvSize, &recv_addr, pmtu_error_cb_th, &ttl);
1557

    
1558

    
1559
        // check if it is not just an ERROR message
1560
        if(recvSize < 0)
1561
                return;
1562

    
1563
        // @TODO check if this simplistic STUN message recognition really always works, probably not
1564
        unsigned short stun_bind_response = 0x0101;
1565
        unsigned short * msgspot = (unsigned short *) msgbuf;
1566
        if (*msgspot == stun_bind_response) {
1567
                debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1568
                recv_stun_msg(msgbuf, recvSize);
1569
                return;
1570
        }
1571

    
1572
        msg_h = (struct msg_header *) msgbuf;
1573

    
1574
        uint32_t inlen = ntohl(msg_h->msg_length);
1575
        if(inlen > 0x20000 || inlen < ntohl(msg_h->offset) || inlen == 0) {
1576
            warn("ML: BAD PACKET received from: %s:%d (len: %d < %d [=%08X] o:%d)", 
1577
                  inet_ntoa(recv_addr.sin_addr), recv_addr.sin_port,
1578
                               recvSize, inlen, inlen, ntohl(msg_h->offset));
1579
 warn("ML: received %d: %02X %02X %02X %02X %02X %02X - %02X %02X %02X %02X %02X %02X", recvSize,
1580
            msgbuf[0], msgbuf[1],msgbuf[2],msgbuf[3],msgbuf[4],msgbuf[5],
1581
            msgbuf[6],msgbuf[7],msgbuf[8],msgbuf[9],msgbuf[10],msgbuf[11]);
1582

    
1583
            return;
1584
       }
1585

    
1586
        /* convert header from network to host order */
1587
        msg_h->offset = ntohl(msg_h->offset);
1588
        msg_h->msg_length = ntohl(msg_h->msg_length);
1589
        msg_h->local_con_id = ntohl(msg_h->local_con_id);
1590
        msg_h->remote_con_id = ntohl(msg_h->remote_con_id);
1591
        msg_h->msg_seq_num = ntohl(msg_h->msg_seq_num);
1592

    
1593
        //verify minimum size
1594
        if (recvSize < sizeof(struct msg_header)) {
1595
          info("UDP packet too small, can't be an ML packet");
1596
          return;
1597
        }
1598

    
1599
        //TODO add more verifications
1600

    
1601
        bufptr += MSG_HEADER_SIZE + msg_h->len_mon_packet_hdr;
1602
        msg_size = recvSize - MSG_HEADER_SIZE - msg_h->len_mon_packet_hdr;
1603

    
1604
        //verify more fields
1605
        if (msg_size < 0) {
1606
          info("Corrupted UDP packet received");
1607
          return;
1608
        }
1609

    
1610
        if(get_Recv_pkt_inf_cb != NULL) {
1611
                mon_pkt_inf msginfNow;
1612
                msginfNow.monitoringHeaderLen = msg_h->len_mon_packet_hdr;
1613
                msginfNow.monitoringHeader = msg_h->len_mon_packet_hdr ? &msgbuf[0] + MSG_HEADER_SIZE : NULL;
1614
                //TODO rethink this ...
1615
                if(msg_h->msg_type == ML_CON_MSG) {
1616
                        struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1617
                        msginfNow.remote_socketID = &(c_msg->sock_id);
1618
                }
1619
                else if(msg_h->remote_con_id < 0 || 
1620
                       msg_h->remote_con_id >= CONNECTBUFSIZE || 
1621
                       connectbuf[msg_h->remote_con_id] == NULL) {
1622
                        error("ML: received pkg called with non existent connection\n");
1623
                        return;
1624
                } else
1625
                        msginfNow.remote_socketID = &(connectbuf[msg_h->remote_con_id]->external_socketID);
1626
                msginfNow.buffer = bufptr;
1627
                msginfNow.bufSize = recvSize;
1628
                msginfNow.msgtype = msg_h->msg_type;
1629
                msginfNow.ttl = ttl;
1630
                msginfNow.dataID = msg_h->msg_seq_num;
1631
                msginfNow.offset = msg_h->offset;
1632
                msginfNow.datasize = msg_h->msg_length;
1633
                gettimeofday(&msginfNow.arrival_time, NULL);
1634
                (get_Recv_pkt_inf_cb) ((void *) &msginfNow);
1635
        }
1636

    
1637

    
1638
        switch(msg_h->msg_type) {
1639
                case ML_CON_MSG:
1640
                        debug("ML: received conn pkg\n");
1641
                        recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1642
                        break;
1643
#ifdef RTX
1644
                case ML_NACK_MSG:
1645
                        debug("ML: received nack pkg\n");
1646
                        recv_nack_msg(msg_h, bufptr, msg_size);
1647
                        break;
1648
#endif
1649
                default:
1650
                        if(msg_h->msg_type < 127) {
1651
                                debug("ML: received data pkg\n");
1652
                                recv_data_msg(msg_h, bufptr, msg_size);
1653
                                break;
1654
                        }
1655
                        debug("ML: unrecognised msg_type\n");
1656
                        break;
1657
        }
1658
}
1659

    
1660

    
1661
void try_stun();
1662

    
1663
/*
1664
 * the timeout of the NAT traversal
1665
 */
1666
void nat_traversal_timeout(int fd, short event, void *arg)
1667
{
1668
debug("X. NatTrTo %d\n", NAT_traversal);
1669
        if (NAT_traversal == false) {
1670
                debug("ML: NAT traversal request re-send\n");
1671
                if(receive_SocketID_cb)
1672
                        (receive_SocketID_cb) (&local_socketID, 2);
1673
                try_stun();
1674
        }
1675
debug("X. NatTrTo\n");
1676
}
1677

    
1678
//return IP address, or INADDR_NONE if can't resolve
1679
unsigned long resolve(const char *ipaddr)
1680
{
1681
        struct hostent *h = gethostbyname(ipaddr);
1682
        if (!h) {
1683
                error("ML: Unable to resolve host name %s\n", ipaddr);
1684
                return INADDR_NONE;
1685
        }
1686
        unsigned long *addr = (unsigned long *) (h->h_addr);
1687
        return *addr;
1688
}
1689

    
1690

    
1691
/*
1692
 * returns the file descriptor, or <0 on error. The ipaddr can be a null
1693
 * pointer. Then all available ipaddr on the machine are choosen.
1694
 */
1695
int create_socket(const int port, const char *ipaddr)
1696
{
1697
        struct sockaddr_in udpaddr = {0};
1698
        udpaddr.sin_family = AF_INET;
1699
        debug("X. create_socket %s, %d\n", ipaddr, port);
1700
        if (ipaddr == NULL) {
1701
                /*
1702
                * try to guess the local IP address
1703
                */
1704
                const char *ipaddr_iface = mlAutodetectIPAddress();
1705
                if (ipaddr_iface) {
1706
                        udpaddr.sin_addr.s_addr = inet_addr(ipaddr_iface);
1707
                } else {
1708
                        udpaddr.sin_addr.s_addr = INADDR_ANY;
1709
                }
1710
        } else {
1711
                udpaddr.sin_addr.s_addr = inet_addr(ipaddr);
1712
        }
1713
        udpaddr.sin_port = htons(port);
1714

    
1715
        socketaddrgen udpgen;
1716
        memset(&udpgen,0,sizeof(socketaddrgen));        //this will be sent over the net, so set it to 0
1717
        udpgen.udpaddr = udpaddr;
1718
        local_socketID.internal_addr = udpgen;
1719

    
1720
        socketfd = createSocket(port, ipaddr);
1721
        if (socketfd < 0){
1722
                return socketfd;
1723
        }
1724

    
1725
        struct event *ev;
1726
        ev = event_new(base, socketfd, EV_READ | EV_PERSIST, recv_pkg, NULL);
1727

    
1728
        event_add(ev, NULL);
1729

    
1730
        try_stun();
1731

    
1732
        return socketfd;
1733
}
1734

    
1735
/*
1736
 * try to figure out external IP using STUN, if defined
1737
 */
1738
void try_stun()
1739
{
1740
        if (isStunDefined()) {
1741
                struct timeval timeout_value_NAT_traversal = NAT_TRAVERSAL_TIMEOUT;
1742

    
1743
                /*
1744
                * send the NAT traversal STUN request
1745
                */
1746
                 send_stun_request(socketfd, &stun_server);
1747

    
1748
                /*
1749
                * enter a NAT traversal timeout that takes care of retransmission
1750
                */
1751
                event_base_once(base, -1, EV_TIMEOUT, &nat_traversal_timeout, NULL, &timeout_value_NAT_traversal);
1752

    
1753
                NAT_traversal = false;
1754
        } else {
1755
                /*
1756
                * Assume we have accessibility and copy internal address to external one
1757
                */
1758
                local_socketID.external_addr = local_socketID.internal_addr;
1759
                NAT_traversal = true; // @TODO: this is not really NAT traversal, but a flag that init is over
1760
                // callback to the upper layer indicating that the socketID is now
1761
                // ready to use
1762
                if(receive_SocketID_cb)
1763
                        (receive_SocketID_cb) (&local_socketID, 0); //success
1764
        }
1765
}
1766

    
1767
/**************************** END OF INTERNAL ***********************/
1768

    
1769
/**************************** MONL functions *************************/
1770

    
1771
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1772

    
1773
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1774
        base = (struct event_base *) arg;
1775
        recv_data_callback = recv_data_cb;
1776
        mlSetRecvTimeout(timeout_value);
1777
        if (stun_ipaddr) {
1778
                 mlSetStunServer(stun_port, stun_ipaddr);
1779
        } else {
1780

    
1781
        }
1782
        register_recv_localsocketID_cb(local_socketID_cb);
1783
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1784
        return create_socket(port, ipaddr);
1785
}
1786

    
1787
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1788
        setOutputRateParams(bucketsize, drainrate);
1789
        setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1790
}
1791
     
1792
void mlSetVerbosity (int log_level) {
1793
        setLogLevel(log_level);
1794
}
1795

    
1796
/* register callbacks  */
1797
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
1798

    
1799
        if (recv_pkt_inf_cb == NULL) {
1800
                error("ML: Register get_recv_pkt_inf_cb failed: NULL ptr  \n");
1801
        } else {
1802
                get_Recv_pkt_inf_cb = recv_pkt_inf_cb;
1803
        }
1804
}
1805

    
1806
void mlRegisterGetSendPktInf(get_send_pkt_inf_cb  send_pkt_inf_cb){
1807

    
1808
        if (send_pkt_inf_cb == NULL) {
1809
                error("ML: Register get_send_pkt_inf_cb: NULL ptr  \n");
1810
        } else {
1811
                get_Send_pkt_inf_cb = send_pkt_inf_cb;
1812
        }
1813
}
1814

    
1815

    
1816
void mlRegisterSetMonitoringHeaderPktCb(set_monitoring_header_pkt_cb monitoring_header_pkt_cb ){
1817

    
1818
        if (monitoring_header_pkt_cb == NULL) {
1819
                error("ML: Register set_monitoring_header_pkt_cb: NULL ptr  \n");
1820
        } else {
1821
                set_Monitoring_header_pkt_cb = monitoring_header_pkt_cb;
1822
        }
1823
}
1824

    
1825
void mlRegisterGetRecvDataInf(get_recv_data_inf_cb recv_data_inf_cb){
1826

    
1827
        if (recv_data_inf_cb == NULL) {
1828
                error("ML: Register get_recv_data_inf_cb: NULL ptr  \n");
1829
        } else {
1830
                get_Recv_data_inf_cb = recv_data_inf_cb;
1831
        }
1832
}
1833

    
1834
void mlRegisterGetSendDataInf(get_send_data_inf_cb  send_data_inf_cb){
1835

    
1836
        if (send_data_inf_cb == NULL) {
1837
                error("ML: Register get_send_data_inf_cb: NULL ptr  \n");
1838
        } else {
1839
                get_Send_data_inf_cb = send_data_inf_cb;
1840
        }
1841
}
1842

    
1843
void mlRegisterSetMonitoringHeaderDataCb(set_monitoring_header_data_cb monitoring_header_data_cb){
1844

    
1845
        if (monitoring_header_data_cb == NULL) {
1846
                error("ML: Register set_monitoring_header_data_cb : NULL ptr  \n");
1847
        } else {
1848
                set_Monitoring_header_data_cb = monitoring_header_data_cb;
1849
        }
1850
}
1851

    
1852
void mlSetRecvTimeout(struct timeval timeout_value){
1853

    
1854
        recv_timeout = timeout_value;
1855
#ifdef RTX
1856
        unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1857
        total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1858
        last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1859
        last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1860
        fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);        
1861
        fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);        
1862
#endif
1863
}
1864

    
1865
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){
1866

    
1867
        return getTTL(socketfd, ttl);
1868

    
1869
}
1870

    
1871
socketID_handle mlGetLocalSocketID(int *errorstatus){
1872

    
1873
        if (NAT_traversal == false) {
1874
                *errorstatus = 2;
1875
                return NULL;
1876
        }
1877

    
1878
        *errorstatus = 0;
1879
        return &local_socketID;
1880

    
1881
}
1882

    
1883

    
1884
/**************************** END of MONL functions *************************/
1885

    
1886
/**************************** GENERAL functions *************************/
1887

    
1888
void mlRegisterRecvConnectionCb(receive_connection_cb recv_conn_cb){
1889

    
1890
        if (recv_conn_cb == NULL) {
1891
                error("ML: Register receive_connection_cb: NULL ptr  \n");
1892
        }else {
1893
                receive_Connection_cb = recv_conn_cb;
1894
        }
1895
}
1896

    
1897
void mlRegisterErrorConnectionCb(connection_failed_cb conn_failed){
1898

    
1899
        if (conn_failed == NULL) {
1900
                error("ML: Register connection_failed_cb: NULL ptr  \n");
1901
        } else {
1902
                failed_Connection_cb = conn_failed;
1903
        }
1904
}
1905

    
1906
void mlRegisterRecvDataCb(receive_data_cb data_cb,unsigned char msgtype){
1907

    
1908
    if (msgtype > 126) {
1909

    
1910
            error
1911
            ("ML: Could not register recv_data callback. Msgtype is greater then 126 \n");
1912

    
1913
    }
1914

    
1915
    if (data_cb == NULL) {
1916

    
1917
            error("ML: Register receive data callback: NUll ptr \n ");
1918

    
1919
    } else {
1920

    
1921
        recvcbbuf[msgtype] = data_cb;
1922

    
1923
    }
1924

    
1925
}
1926

    
1927
void mlCloseSocket(socketID_handle socketID){
1928

    
1929
        free(socketID);
1930

    
1931
}
1932

    
1933
void keepalive_fn(evutil_socket_t fd, short what, void *arg) {
1934
        socketID_handle peer = arg;
1935

    
1936
        int con_id = mlConnectionExist(peer, false);
1937
        if (con_id < 0 || connectbuf[con_id]->defaultSendParams.keepalive <= 0) {
1938
                /* Connection fell from under us or keepalive was disabled */
1939
                free(arg);
1940
                return;
1941
        }
1942

    
1943
        /* do what we gotta do */
1944
        if ( connectbuf[con_id]->status == READY) {
1945
                char keepaliveMsg[32] = "";
1946
                sprintf(keepaliveMsg, "KEEPALIVE %d", connectbuf[con_id]->keepalive_seq++);
1947
                send_msg(con_id, MSG_TYPE_ML_KEEPALIVE, keepaliveMsg, 1 + strlen(keepaliveMsg), false, 
1948
                        &(connectbuf[con_id]->defaultSendParams));
1949
        }
1950

    
1951
        /* re-schedule */
1952
        struct timeval t = { 0,0 };
1953
        t.tv_sec = connectbuf[con_id]->defaultSendParams.keepalive;
1954
        if (connectbuf[con_id]->defaultSendParams.keepalive) 
1955
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1956
}
1957

    
1958
void setupKeepalive(int conn_id) {
1959
        /* Save the peer's address for us */
1960
        socketID_handle peer = malloc(sizeof(socket_ID));
1961
        memcpy(peer, &connectbuf[conn_id]->external_socketID, sizeof(socket_ID));
1962

    
1963
        struct timeval t = { 0,0 };
1964
        t.tv_sec = connectbuf[conn_id]->defaultSendParams.keepalive;
1965

    
1966
        if (connectbuf[conn_id]->defaultSendParams.keepalive) 
1967
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1968
}
1969

    
1970
/* connection functions */
1971
int mlOpenConnection(socketID_handle external_socketID,receive_connection_cb connection_cb,void *arg, const send_params defaultSendParams){
1972

    
1973
        int con_id;
1974
        if (external_socketID == NULL) {
1975
                error("ML: cannot open connection: one of the socketIDs is NULL\n");
1976
                return -1;
1977
        }
1978
        if (NAT_traversal == false) {
1979
                error("ML: cannot open connection: NAT traversal for socketID still in progress\n");
1980
                return -1;
1981
        }
1982
        if (connection_cb == NULL) {
1983
                error("ML: cannot open connection: connection_cb is NULL\n");
1984
                return -1;
1985
        }
1986

    
1987
        // check if that connection already exist
1988

    
1989
        con_id = mlConnectionExist(external_socketID, false);
1990
        if (con_id >= 0) {
1991
                // overwrite defaultSendParams
1992
                bool newKeepalive = 
1993
                        connectbuf[con_id]->defaultSendParams.keepalive == 0 && defaultSendParams.keepalive != 0;
1994
                connectbuf[con_id]->defaultSendParams = defaultSendParams;
1995
                if (newKeepalive) setupKeepalive(con_id);
1996
                // if so check if it is ready to use
1997
                if (connectbuf[con_id]->status == READY) {
1998
                                // if so use the callback immediately
1999
                                (connection_cb) (con_id, arg);
2000

    
2001
                // otherwise just write the connection cb and the arg pointer
2002
                // into the connection struct
2003
                } else {
2004
                        struct receive_connection_cb_list *temp;
2005
                        temp = malloc(sizeof(struct receive_connection_cb_list));
2006
                        temp->next = NULL;
2007
                        temp->connection_cb = connection_cb;
2008
                        temp->arg = arg;
2009
                        if(connectbuf[con_id]->connection_last != NULL) {
2010
                                connectbuf[con_id]->connection_last->next = temp;
2011
                                connectbuf[con_id]->connection_last = temp;
2012
                        } else
2013
                                connectbuf[con_id]->connection_last = connectbuf[con_id]->connection_head = temp;
2014
                }
2015
                return con_id;
2016
        }
2017
        // make entry in connection_establishment array
2018
        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
2019
                if (connectbuf[con_id] == NULL) {
2020
                        connectbuf[con_id] = (connect_data *) malloc(sizeof(connect_data));
2021
                        memset(connectbuf[con_id],0,sizeof(connect_data));
2022
                        connectbuf[con_id]->starttime = time(NULL);
2023
                        memcpy(&connectbuf[con_id]->external_socketID, external_socketID, sizeof(socket_ID));
2024
                        connectbuf[con_id]->pmtusize = DSLSLIM;
2025
                        connectbuf[con_id]->timeout_event = NULL;
2026
                        connectbuf[con_id]->status = INVITE;
2027
                        connectbuf[con_id]->seqnr = 0;
2028
                        connectbuf[con_id]->internal_connect = internal_connect_plausible(external_socketID, &local_socketID);
2029
                        connectbuf[con_id]->connectionID = con_id;
2030

    
2031
                        connectbuf[con_id]->connection_head = connectbuf[con_id]->connection_last = malloc(sizeof(struct receive_connection_cb_list));
2032
                        connectbuf[con_id]->connection_last->next = NULL;
2033
                        connectbuf[con_id]->connection_last->connection_cb = connection_cb;
2034
                        connectbuf[con_id]->connection_last->arg = arg;
2035
                        connectbuf[con_id]->external_connectionID = -1;
2036

    
2037
                        connectbuf[con_id]->defaultSendParams = defaultSendParams;
2038
                        if (defaultSendParams.keepalive) setupKeepalive(con_id);
2039
                        break;
2040
                }
2041
        } //end of for
2042

    
2043
        if (con_id == CONNECTBUFSIZE) {
2044
                error("ML: Could not open connection: connection buffer full\n");
2045
                return -1;
2046
        }
2047

    
2048
        // create and send a connection message
2049
        info("ML:Sending INVITE to %s (lconn:%d)\n",conid_to_string(con_id), con_id);
2050
        send_conn_msg_with_pmtu_discovery(con_id, connectbuf[con_id]->pmtusize, INVITE);
2051

    
2052
        return con_id;
2053

    
2054
}
2055

    
2056
void mlCloseConnection(const int connectionID){
2057

    
2058
        // remove it from the connection array
2059
        if(connectbuf[connectionID]) {
2060
                if(connectbuf[connectionID]->ctrl_msg_buf) {
2061
                        free(connectbuf[connectionID]->ctrl_msg_buf);
2062
                }
2063
                // remove related events
2064
                if (connectbuf[connectionID]->timeout_event) {
2065
                        event_del(connectbuf[connectionID]->timeout_event);
2066
                        event_free(connectbuf[connectionID]->timeout_event);
2067
                        connectbuf[connectionID]->timeout_event = NULL;
2068
                }
2069
                free(connectbuf[connectionID]);
2070
                connectbuf[connectionID] = NULL;
2071
        }
2072

    
2073
}
2074

    
2075
void mlSendData(const int connectionID,char *sendbuf,int bufsize,unsigned char msgtype,send_params *sParams){
2076

    
2077
        if (connectionID < 0) {
2078
                error("ML: send data failed: connectionID does not exist\n");
2079
                return;
2080
        }
2081

    
2082
        if (connectbuf[connectionID] == NULL) {
2083
                error("ML: send data failed: connectionID does not exist\n");
2084
                return;
2085
        }
2086
        if (connectbuf[connectionID]->status != READY) {
2087
            error("ML: send data failed: connection is not active\n");
2088
            return;
2089
        }
2090

    
2091
        if (sParams == NULL) {
2092
                sParams = &(connectbuf[connectionID]->defaultSendParams);
2093
        }
2094

    
2095
        send_msg(connectionID, msgtype, sendbuf, bufsize, false, sParams);
2096

    
2097
}
2098

    
2099
/* transmit data functions  */
2100
int mlSendAllData(const int connectionID,send_all_data_container *container,int nr_entries,unsigned char msgtype,send_params *sParams){
2101

    
2102
    if (nr_entries < 1 || nr_entries > 5) {
2103

    
2104
        error
2105
            ("ML : sendALlData : nr_enties is not between 1 and 5 \n ");
2106
        return 0;
2107

    
2108
    } else {
2109

    
2110
        if (nr_entries == 1) {
2111

    
2112
                mlSendData(connectionID, container->buffer_1,
2113
                      container->length_1, msgtype, sParams);
2114

    
2115
            return 1;
2116

    
2117
        } else if (nr_entries == 2) {
2118

    
2119
            int buflen = container->length_1 + container->length_2;
2120
            char buf[buflen];
2121
            memcpy(buf, container->buffer_1, container->length_1);
2122
            memcpy(&buf[container->length_1], container->buffer_2,
2123
                   container->length_2);
2124
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2125

    
2126
            return 1;
2127

    
2128
        } else if (nr_entries == 3) {
2129

    
2130
            int buflen =
2131
                container->length_1 + container->length_2 +
2132
                container->length_3;
2133
            char buf[buflen];
2134
            memcpy(buf, container->buffer_1, container->length_1);
2135
            memcpy(&buf[container->length_1], container->buffer_2,
2136
                   container->length_2);
2137
            memcpy(&buf[container->length_2], container->buffer_3,
2138
                   container->length_3);
2139
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2140

    
2141

    
2142
            return 1;
2143

    
2144
        } else if (nr_entries == 4) {
2145

    
2146
            int buflen =
2147
                container->length_1 + container->length_2 +
2148
                container->length_3 + container->length_4;
2149
            char buf[buflen];
2150
            memcpy(buf, container->buffer_1, container->length_1);
2151
            memcpy(&buf[container->length_1], container->buffer_2,
2152
                   container->length_2);
2153
            memcpy(&buf[container->length_2], container->buffer_3,
2154
                   container->length_3);
2155
            memcpy(&buf[container->length_3], container->buffer_4,
2156
                   container->length_4);
2157
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2158

    
2159
            return 1;
2160

    
2161
        } else {
2162

    
2163
            int buflen =
2164
                container->length_1 + container->length_2 +
2165
                container->length_3 + container->length_4 +
2166
                container->length_5;
2167
            char buf[buflen];
2168
            memcpy(buf, container->buffer_1, container->length_1);
2169
            memcpy(&buf[container->length_1], container->buffer_2,
2170
                   container->length_2);
2171
            memcpy(&buf[container->length_2], container->buffer_3,
2172
                   container->length_3);
2173
            memcpy(&buf[container->length_3], container->buffer_4,
2174
                   container->length_4);
2175
            memcpy(&buf[container->length_4], container->buffer_5,
2176
                   container->length_5);
2177
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2178

    
2179
            return 1;
2180
        }
2181

    
2182
    }
2183

    
2184
}
2185

    
2186
int mlRecvData(const int connectionID,char *recvbuf,int *bufsize,recv_params *rParams){
2187

    
2188
        //TODO yet to be converted
2189
        return 0;
2190
#if 0
2191
        if (rParams == NULL) {
2192
                error("ML: recv_data failed: recv_params is a NULL ptr\n");
2193
                return 0;
2194
    } else {
2195

2196
        info("ML: recv data called \n");
2197

2198
        int i = 0;
2199
        int returnValue = 0;
2200
        double timeout = (double) recv_timeout.tv_sec;
2201
        time_t endtime = time(NULL);
2202

2203
        for (i = 0; i < RECVDATABUFSIZE; i++) {
2204

2205
            if (recvdatabuf[i] != NULL) {
2206

2207
                if (recvdatabuf[i]->connectionID == connectionID) {
2208

2209
                    info("ML: recv data has entry  \n");
2210

2211
                    double timepass = difftime(endtime, recvdatabuf[i]->starttime);
2212

2213
                    // check if the specified connection has data and it
2214
                    // is complete
2215
                    // check the data seqnr
2216
                    // if(connectionID == recvdatabuf[i]->connectionID &&
2217
                    // 1 == recvdatabuf[i]->status){
2218

2219
                    if (1 == recvdatabuf[i]->status) {
2220

2221
                        // info("transmissionHandler: recv_data set is
2222
                        // complete \n" );
2223

2224
                        // debug("debud \n");
2225

2226
                        // exchange the pointers
2227
                        int buffersize = 0;
2228
                        buffersize = recvdatabuf[i]->bufsize;
2229
                        *bufsize = buffersize;
2230
                        // recvbuf = recvdatabuf[i]->recvbuf;
2231

2232
                        // info("buffersize %d \n",buffersize);
2233
                        memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2234
                               buffersize);
2235
                        // debug(" recvbuf %s \n",recvbuf );
2236

2237
//                         double nrMissFrags =
2238
//                             (double) recvdatabuf[i]->nrFragments /
2239
//                             (double) recvdatabuf[i]->recvFragments;
2240
//                         int nrMissingFragments = (int) ceil(nrMissFrags);
2241

2242
//                        rParams->nrMissingFragments = nrMissingFragments;
2243
//                         rParams->nrFragments = recvdatabuf[i]->nrFragments;
2244
                        rParams->msgtype = recvdatabuf[i]->msgtype;
2245
                        rParams->connectionID =
2246
                            recvdatabuf[i]->connectionID;
2247

2248
                        // break from the loop
2249
                        // debug(" recvbuf %s \n ",recvbuf);
2250

2251
                        // double nrMissFrags =
2252
                        // (double)recvdatabuf[i]->nrFragments /
2253
                        // (double)recvdatabuf[i]->recvFragments;
2254
                        // int nrMissingFragments =
2255
                        // (int)ceil(nrMissFrags);
2256

2257
                        if(get_Recv_data_inf_cb != NULL) {
2258
                                mon_data_inf recv_data_inf;
2259

2260
                                recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2261
                                recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2262
                                recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2263
                                recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2264
//                                 recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2265
//                                 recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2266
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
2267
                                recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2268
                                recv_data_inf.nrMissingFragments = nrMissingFragments;
2269
                                recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2270
                                recv_data_inf.priority = false;
2271
                                recv_data_inf.padding = false;
2272
                                recv_data_inf.confirmation = false;
2273
                                recv_data_inf.reliable = false;
2274

2275
                                // send data recv callback to monitoring module
2276

2277
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2278
                        }
2279

2280

2281
                        // free the allocated memory
2282
                        free(recvdatabuf[i]);
2283
                        recvdatabuf[i] = NULL;
2284

2285
                        returnValue = 1;
2286
                        break;
2287

2288
                    }
2289

2290
                    if (recvdatabuf[i] != NULL) {
2291

2292
                        if (timepass > timeout) {
2293

2294
                            info("ML: recv_data timeout called  \n");
2295

2296
                            // some data about the missing chunks should
2297
                            // be added here
2298
                            // exchange the pointers
2299
                            int buffersize = 0;
2300
                            buffersize = recvdatabuf[i]->bufsize;
2301
                            *bufsize = buffersize;
2302
                            // recvbuf = recvdatabuf[i]->recvbuf;
2303

2304
                            double nrMissFrags =
2305
                                (double) recvdatabuf[i]->nrFragments /
2306
                                (double) recvdatabuf[i]->recvFragments;
2307
                            int nrMissingFragments =
2308
                                (int) ceil(nrMissFrags);
2309

2310
                            // debug(" recvbuf %s \n",recvbuf );
2311

2312
                            memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2313
                                   buffersize);
2314

2315
                            rParams->nrMissingFragments =
2316
                                nrMissingFragments;
2317
                            rParams->nrFragments =
2318
                                recvdatabuf[i]->nrFragments;
2319
                            rParams->msgtype = recvdatabuf[i]->msgtype;
2320
                            rParams->connectionID =
2321
                                recvdatabuf[i]->connectionID;
2322

2323
                                if(get_Recv_data_inf_cb != NULL) {
2324
                                        mon_data_inf recv_data_inf;
2325

2326
                                        recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2327
                                        recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2328
                                        recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2329
                                        recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2330
                                        recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2331
                                        recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2332
                                        gettimeofday(&recv_data_inf.arrival_time, NULL);
2333
                                        recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2334
                                        recv_data_inf.nrMissingFragments = nrMissingFragments;
2335
                                        recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2336
                                        recv_data_inf.priority = false;
2337
                                        recv_data_inf.padding = false;
2338
                                        recv_data_inf.confirmation = false;
2339
                                        recv_data_inf.reliable = false;
2340

2341
                                        // send data recv callback to monitoring module
2342

2343
                                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2344
                                }
2345

2346
                            // free the allocated memory
2347
                            free(recvdatabuf[i]);
2348
                            recvdatabuf[i] = NULL;
2349

2350
                            returnValue = 1;
2351
                            break;
2352

2353
                        }
2354
                    }
2355

2356
                }
2357

2358
            }
2359
            // debug("2 recvbuf %s \n ",recvbuf);
2360
        }
2361
        return returnValue;
2362
    }
2363
#endif
2364

    
2365
}
2366

    
2367
int mlSocketIDToString(socketID_handle socketID,char* socketID_string, size_t len){
2368

    
2369
        char internal_addr[INET_ADDRSTRLEN];
2370
        char external_addr[INET_ADDRSTRLEN];
2371

    
2372
        assert(socketID);
2373

    
2374
        inet_ntop(AF_INET, &(socketID->internal_addr.udpaddr.sin_addr.s_addr), internal_addr, INET_ADDRSTRLEN);
2375
        inet_ntop(AF_INET, &(socketID->external_addr.udpaddr.sin_addr.s_addr), external_addr, INET_ADDRSTRLEN);
2376

    
2377
        snprintf(socketID_string,len,"%s:%d-%s:%d", internal_addr, ntohs(socketID->internal_addr.udpaddr.sin_port),
2378
                external_addr,        ntohs(socketID->external_addr.udpaddr.sin_port));
2379
        return 0;
2380

    
2381
}
2382

    
2383
int mlStringToSocketID(const char* socketID_string, socketID_handle socketID){
2384

    
2385
        //@TODO add checks against malformed string
2386
        char external_addr[INET_ADDRSTRLEN];
2387
        int external_port;
2388
        char internal_addr[INET_ADDRSTRLEN];
2389
        int internal_port;
2390

    
2391
        char *pch;
2392
        char *s = strdup(socketID_string);
2393

    
2394
        //replace ':' with a blank
2395
        pch=strchr(s,':');
2396
        while (pch!=NULL){
2397
                                *pch = ' ';
2398
                pch=strchr(pch+1,':');
2399
        }
2400
        pch=strchr(s,'-');
2401
        if(pch) *pch = ' ';
2402

    
2403
        sscanf(s,"%s %d %s %d", internal_addr, &internal_port,
2404
                external_addr, &external_port);
2405

    
2406
        //set structure to 0, we initialize each byte, since it will be sent on the net later
2407
        memset(socketID, 0, sizeof(struct _socket_ID));
2408

    
2409
        if(inet_pton(AF_INET, internal_addr, &(socketID->internal_addr.udpaddr.sin_addr)) == 0)
2410
                return EINVAL;
2411
        socketID->internal_addr.udpaddr.sin_family = AF_INET;
2412
        socketID->internal_addr.udpaddr.sin_port = htons(internal_port);
2413

    
2414

    
2415
        if(inet_pton(AF_INET, external_addr, &(socketID->external_addr.udpaddr.sin_addr)) ==0)
2416
                return EINVAL;
2417
        socketID->external_addr.udpaddr.sin_family = AF_INET;
2418
        socketID->external_addr.udpaddr.sin_port = htons(external_port);
2419

    
2420
        free(s);
2421
        return 0;
2422

    
2423
}
2424

    
2425
int mlGetConnectionStatus(int connectionID){
2426

    
2427
        if(connectbuf[connectionID])
2428
                return connectbuf[connectionID]->status == READY;
2429
        return -1;
2430
    
2431
}
2432

    
2433

    
2434
int mlConnectionExist(socketID_handle socketID, bool ready){
2435

    
2436
    /*
2437
     * check if another connection for the external connectionID exist
2438
     * that was established \ within the last 2 seconds
2439
     */
2440
        int i;
2441
        for (i = 0; i < CONNECTBUFSIZE; i++)
2442
                if (connectbuf[i] != NULL)
2443
                        if (mlCompareSocketIDs(&(connectbuf[i]->external_socketID), socketID) == 0) {
2444
                                if (ready) return (connectbuf[i]->status == READY ? i : -1);;
2445
                                return i;
2446
                                }
2447

    
2448
    return -1;
2449

    
2450
}
2451

    
2452
//Added by Robert Birke as comodity functions
2453

    
2454
//int mlPrintSocketID(socketID_handle socketID) {
2455
//        char str[SOCKETID_STRING_SIZE];
2456
//        mlSocketIDToString(socketID, str, sizeof(str));
2457
//        printf(stderr,"int->%s<-ext\n",str);
2458
//}
2459

    
2460
/*
2461
 * hash code of a socketID
2462
 * TODO might think of a better way
2463
 */
2464
int mlHashSocketID(socketID_handle sock) {
2465
        //assert(sock);
2466
   return sock->internal_addr.udpaddr.sin_port +
2467
                        sock->external_addr.udpaddr.sin_port;
2468
}
2469

    
2470
int mlCompareSocketIDs(socketID_handle sock1, socketID_handle sock2) {
2471

    
2472
        assert(sock1 && sock2);
2473

    
2474
        /*
2475
        * compare internal addr
2476
        */
2477
        if(sock1 == NULL || sock2 == NULL)
2478
                return 1;
2479

    
2480
        if (sock1->internal_addr.udpaddr.sin_addr.s_addr !=
2481
            sock2->internal_addr.udpaddr.sin_addr.s_addr)
2482
                        return memcmp(&sock1->internal_addr.udpaddr.sin_addr.s_addr, &sock2->internal_addr.udpaddr.sin_addr.s_addr, sizeof(sock1->internal_addr.udpaddr.sin_addr.s_addr));
2483

    
2484
        if (sock1->internal_addr.udpaddr.sin_port !=
2485
                 sock2->internal_addr.udpaddr.sin_port)
2486
                        return memcmp(&sock1->internal_addr.udpaddr.sin_port, &sock2->internal_addr.udpaddr.sin_port, sizeof(sock1->internal_addr.udpaddr.sin_port));
2487

    
2488
        /*
2489
        * compare external addr
2490
        */
2491
        if (sock1->external_addr.udpaddr.sin_addr.s_addr !=
2492
            sock2->external_addr.udpaddr.sin_addr.s_addr)
2493
                        return memcmp(&sock1->external_addr.udpaddr.sin_addr.s_addr, &sock2->external_addr.udpaddr.sin_addr.s_addr, sizeof(sock1->external_addr.udpaddr.sin_addr.s_addr));
2494

    
2495
        if (sock1->external_addr.udpaddr.sin_port !=
2496
                 sock2->external_addr.udpaddr.sin_port)
2497
                        return memcmp(&sock1->external_addr.udpaddr.sin_port, &sock2->external_addr.udpaddr.sin_port, sizeof(sock1->external_addr.udpaddr.sin_port));
2498

    
2499
        return 0;
2500
}
2501

    
2502
int mlCompareSocketIDsByPort(socketID_handle sock1, socketID_handle sock2)
2503
{
2504
        if(sock1 == NULL || sock2 == NULL)
2505
                return 1;
2506
 
2507
        if (sock1->internal_addr.udpaddr.sin_port !=
2508
                 sock2->internal_addr.udpaddr.sin_port)
2509
                        return 1;
2510

    
2511
        if (sock1->external_addr.udpaddr.sin_port !=
2512
                 sock2->external_addr.udpaddr.sin_port)
2513
                        return 1;
2514
        return 0;
2515
}
2516

    
2517
int mlGetPathMTU(int ConnectionId) {
2518
        if(ConnectionId < 0 || ConnectionId >= CONNECTBUFSIZE)
2519
                return -1;
2520
        if (connectbuf[ConnectionId] != NULL)
2521
                return connectbuf[ConnectionId]->pmtusize;
2522
        return -1;
2523
}
2524

    
2525
/**************************** END of GENERAL functions *************************/
2526

    
2527
/**************************** NAT functions *************************/
2528

    
2529
/* setter  */
2530
void mlSetStunServer(const int port,const char *ipaddr){
2531

    
2532
        stun_server.sin_family = AF_INET;
2533
        if (ipaddr == NULL)
2534
                stun_server.sin_addr.s_addr = htonl(INADDR_NONE);
2535
        else
2536
                stun_server.sin_addr.s_addr = resolve(ipaddr);
2537
        stun_server.sin_port = htons(port);
2538

    
2539
}
2540

    
2541
int mlGetExternalIP(char* external_addr){
2542

    
2543
        socketaddrgen udpgen;
2544
        struct sockaddr_in udpaddr;
2545

    
2546
        udpgen = local_socketID.external_addr;
2547
        udpaddr = udpgen.udpaddr;
2548

    
2549
        inet_ntop(AF_INET, &(udpaddr.sin_addr), external_addr,
2550
                        INET_ADDRSTRLEN);
2551

    
2552
        if (external_addr == NULL) {
2553

    
2554
        return -1;
2555

    
2556
        } else {
2557

    
2558
        return 0;
2559

    
2560
        }
2561

    
2562
}
2563

    
2564
/**************************** END of NAT functions *************************/