Statistics
| Branch: | Revision:

napa-baselibs / ml / ml.c @ 1401ea55

History | View | Annotate | Download (79.1 KB)

1
/*
2
 * Copyright (c) 2009 NEC Europe Ltd. All Rights Reserved.
3
 * Authors: Kristian Beckers  <beckers@nw.neclab.eu>
4
 *          Sebastian Kiesel  <kiesel@nw.neclab.eu>
5
 * Copyright (c) 2011 Csaba Kiraly <kiraly@disi.unitn.it>
6
 *
7
 * This file is part of the Messaging Library.
8
 *
9
 * The Messaging Library is free software: you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public License as
11
 * published by the Free Software Foundation, either version 3 of the
12
 * License, or (at your option) any later version.
13
 *
14
 * The Messaging Library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
17
 * General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public License
20
 * along with the Messaging Library.  If not, see <http://www.gnu.org/licenses/>.
21
 *
22
 */
23

    
24
#include <ml_all.h>
25

    
26
#ifdef FEC
27
#include "fec/RSfec.h"
28
int prev_sqnr=0;  //used to track the previous seq nrs, so that not to consider extra pakts >k.
29
int *pix;         //keep track of packet indexes.
30
int nix=0;        //counter incrimented after the arrivel of each packet. pix[nix]
31
int cnt=0;
32
#endif
33

    
34
/**************************** START OF INTERNALS ***********************/
35

    
36

    
37
/*
38
 * reserved message type for internal puposes
39
 */
40
#define MSG_TYPE_ML_KEEPALIVE 0x126        //TODO: check that it is really interpreted as internal
41

    
42
/*
43
 * a pointer to a libevent instance
44
 */
45
struct event_base *base;
46

    
47
/*
48
 * define the nr of connections the messaging layer can handle
49
 */
50
#define CONNECTBUFSIZE 10000
51
/*
52
 * define the nr of data that can be received parallel
53
 */
54
#define RECVDATABUFSIZE 10000
55
/*
56
 * define an array for message multiplexing
57
 */
58
#define MSGMULTIPLEXSIZE 127
59

    
60

    
61
/*
62
 * timeout before thinking that the STUN server can't be connected
63
 */
64
#define NAT_TRAVERSAL_TIMEOUT { 1, 0 }
65

    
66
/*
67
 * timeout before thinking of an mtu problem (check MAX_TRIALS as well)
68
 */
69
#define PMTU_TIMEOUT 1000000 // in usec
70

    
71
/*
72
 * retry sending connection messages this many times before reducing pmtu
73
 */
74
#define MAX_TRIALS 3
75

    
76
/*
77
 * default timeout value between the first and the last received packet of a message
78
 */
79
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
80

    
81
#ifdef RTX
82
/*
83
 * default timeout value for a packet reception
84
 */
85
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
86

    
87
/*
88
 * default timeout value for a packet reception
89
 */
90
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
91

    
92
/*
93
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
94
 */
95
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
96

    
97
#endif
98

    
99

    
100
/*
101
 * global variables
102
 */
103

    
104
/*
105
 * define a buffer of pointers to connect structures
106
 */
107
connect_data *connectbuf[CONNECTBUFSIZE];
108

    
109
/*
110
 * define a pointer buffer with pointers to recv_data structures
111
 */
112
recvdata *recvdatabuf[RECVDATABUFSIZE];
113

    
114
/*
115
 * define a pointer buffer for message multiplexing
116
 */
117
receive_data_cb recvcbbuf[MSGMULTIPLEXSIZE];
118

    
119
/*
120
 * stun server address
121
 */
122
struct sockaddr_in stun_server;
123

    
124
/*
125
 * receive timeout
126
 */
127
static struct timeval recv_timeout = RECV_TIMEOUT_DEFAULT;
128

    
129
/*
130
 * boolean NAT traversal successful if true
131
 */
132
boolean NAT_traversal;
133

    
134
/*
135
 * file descriptor for local socket
136
 */
137
evutil_socket_t socketfd;
138

    
139
/*
140
 * local socketID
141
 */
142
socket_ID local_socketID;
143

    
144
socketID_handle loc_socketID = &local_socketID;
145

    
146
/*
147
 * callback function pointers
148
 */
149
/*
150
 * monitoring module callbacks
151
 */
152
get_recv_pkt_inf_cb get_Recv_pkt_inf_cb = NULL;
153
get_send_pkt_inf_cb get_Send_pkt_inf_cb = NULL;
154
set_monitoring_header_pkt_cb set_Monitoring_header_pkt_cb = NULL;
155
get_recv_data_inf_cb get_Recv_data_inf_cb = NULL;
156
get_send_data_inf_cb get_Send_data_inf_cb = NULL;
157
set_monitoring_header_data_cb set_Monitoring_header_data_cb = NULL;
158
/*
159
 * connection callbacks
160
 */
161
receive_connection_cb receive_Connection_cb = NULL;
162
connection_failed_cb failed_Connection_cb = NULL;
163
/*
164
 * local socketID callback
165
 */
166
receive_localsocketID_cb receive_SocketID_cb;
167

    
168
/*
169
 * boolean that defines if received data is transmitted to the upper layer
170
 * via callback or via upper layer polling
171
 */
172
boolean recv_data_callback;
173

    
174
/*
175
 * helper function to get rid of a warning
176
 */
177
#ifndef _WIN32
178
int min(int a, int b) {
179
        if (a > b) return b;
180
        return a;
181
}
182
#endif
183

    
184
#ifdef RTX
185
//*********Counters**********
186

    
187
struct Counters {
188
        unsigned int receivedCompleteMsgCounter;
189
        unsigned int receivedIncompleteMsgCounter;
190
        unsigned int receivedDataPktCounter;
191
        unsigned int receivedRTXDataPktCounter;
192
        unsigned int receivedNACK1PktCounter;
193
        unsigned int receivedNACKMorePktCounter;
194
        unsigned int sentDataPktCounter;
195
        unsigned int sentRTXDataPktCtr;
196
        unsigned int sentNACK1PktCounter;
197
        unsigned int sentNACKMorePktCounter;
198
} counters;
199

    
200
extern unsigned int sentRTXDataPktCounter;
201

    
202
/*
203
 * receive timeout for a packet
204
 */
205
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
206

    
207

    
208
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
209

    
210
void mlShowCounters() {
211
        counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
212
        fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
213
        return;
214
}
215

    
216
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
217
{
218
        struct nack_msg *nackmsg;
219
        
220
        msgbuf += msg_h->len_mon_data_hdr;
221
        msg_size -= msg_h->len_mon_data_hdr;
222
        nackmsg = (struct nack_msg*) msgbuf;
223
        
224
        unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
225
        //if (gapSize == 1349) counters.receivedNACK1PktCounter++;
226
        //else counters.receivedNACKMorePktCounter++;
227

    
228
        rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);        
229
}
230

    
231
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams);
232

    
233
void pkt_recv_timeout_cb(int fd, short event, void *arg){
234
        int recv_id = (long) arg;
235
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
236

    
237
        //check if message still exists        
238
        if (recvdatabuf[recv_id] == NULL) return;
239

    
240
        //check if gap was filled in the meantime
241
        if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
242
                recvdatabuf[recv_id]->firstGap++;
243
                return;        
244
        }
245

    
246
        struct nack_msg nackmsg;
247
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
248
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
249
        nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
250
        nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
251
        recvdatabuf[recv_id]->firstGap++;
252

    
253
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
254

    
255
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
256
}
257

    
258
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
259
        int recv_id = (long) arg;
260
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
261

    
262
        if (recvdatabuf[recv_id] == NULL) {
263
                return;
264
        }
265

    
266
        if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
267
                debug("ML: freeing last packet timeout for %d",recv_id);
268
                event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
269
                event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
270
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
271
        }
272

    
273
        if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
274

    
275
        struct nack_msg nackmsg;
276
        nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
277
        nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
278
        nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
279
        nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
280

    
281
        unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
282

    
283
        send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));        
284
}
285

    
286
#endif
287

    
288
/*
289
 * compare the external IP address of two socketIDs
290
 */
291
bool compare_address(struct sockaddr_in *sock1, struct sockaddr_in *sock2, int netmaskbits, bool port)
292
{
293
        int i;
294
        uint32_t mask = 0;
295

    
296
        for (i = 0; i < netmaskbits; i ++) {
297
                mask += 1<<i;
298
        }
299
        mask = htonl(mask);
300

    
301
        return ((sock1->sin_addr.s_addr | mask )  == (sock2->sin_addr.s_addr | mask)) &&
302
               (!port || (sock1->sin_port == sock2->sin_port) );
303
}
304

    
305
/*
306
 * decide whether it is worth trying to connect with internal connect
307
 */
308
bool internal_connect_plausible(socketID_handle sock1, socketID_handle sock2)
309
{
310
        return compare_address (&sock1->external_addr.udpaddr, &sock2->external_addr.udpaddr, 0, false);
311
}
312

    
313
/*
314
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
315
 */
316
const char *conid_to_string(int con_id)
317
{
318
        static char s[INET_ADDRSTRLEN+1+5+1+INET_ADDRSTRLEN+1+5+1];
319
        mlSocketIDToString(&connectbuf[con_id]->external_socketID, s, sizeof(s));
320
        return s;
321
}
322

    
323
void register_recv_localsocketID_cb(receive_localsocketID_cb local_socketID_cb)
324
{
325
        if (local_socketID_cb == NULL) {
326
                error("ML : Register receive_localsocketID_cb: NULL ptr \n");
327
        } else {
328
                receive_SocketID_cb = local_socketID_cb;
329
        }
330
}
331

    
332

    
333
//void keep_connection_alive(const int connectionID)
334
//{
335
//
336
//    // to be done with the NAT traversal
337
//    // send a message over the wire
338
//    printf("\n");
339
//
340
//}
341

    
342
void unsetStunServer()
343
{
344
        stun_server.sin_addr.s_addr = INADDR_NONE;
345
}
346

    
347
bool isStunDefined()
348
{
349
        return stun_server.sin_addr.s_addr != INADDR_NONE;
350
}
351

    
352
void send_msg(int con_id, int msg_type, void* msg, int msg_len, bool truncable, send_params * sParams) {
353
#ifdef FEC
354
        int chk_msg_len=msg_len;
355
        int n=4;
356
        int k=64;
357
        int lcnt=0;
358
        int icnt=0;
359
        int i=0;
360
#endif
361
        socketaddrgen udpgen;
362
        bool retry;
363
        int pkt_len, offset;
364
        struct iovec iov[4];
365

    
366
        char h_pkt[MON_PKT_HEADER_SPACE];
367
        char h_data[MON_DATA_HEADER_SPACE];
368

    
369
        struct msg_header msg_h;
370

    
371
        debug("ML: send_msg to %s conID:%d extID:%d\n", conid_to_string(con_id), con_id, connectbuf[con_id]->external_connectionID);
372

    
373
        iov[0].iov_base = &msg_h;
374
        iov[0].iov_len = MSG_HEADER_SIZE;
375

    
376
        msg_h.local_con_id = htonl(con_id);
377
        msg_h.remote_con_id = htonl(connectbuf[con_id]->external_connectionID);
378
        msg_h.msg_type = msg_type;
379
        msg_h.msg_seq_num = htonl(connectbuf[con_id]->seqnr++);
380

    
381

    
382
        iov[1].iov_len = iov[2].iov_len = 0;
383
        iov[1].iov_base = h_pkt;
384
        iov[2].iov_base = h_data;
385

    
386

    
387
        if (connectbuf[con_id]->internal_connect)
388
                udpgen = connectbuf[con_id]->external_socketID.internal_addr;
389
        else
390
                udpgen = connectbuf[con_id]->external_socketID.external_addr;
391

    
392
        do{
393
#ifdef FEC
394
                char *Pmsg = NULL;
395
                int npaksX2 = 0;
396
                void *code;
397
                char **src = NULL;
398
                char **pkt = NULL;
399
#endif
400
                offset = 0;
401
                retry = false;
402
                // Monitoring layer hook
403
                if(set_Monitoring_header_data_cb != NULL) {
404
                        iov[2].iov_len = ((set_Monitoring_header_data_cb) (&(connectbuf[con_id]->external_socketID), msg_type));
405
                }
406
                msg_h.len_mon_data_hdr = iov[2].iov_len;
407

    
408
                if(get_Send_data_inf_cb != NULL && iov[2].iov_len != 0) {
409
                        mon_data_inf sd_data_inf;
410

    
411
                        memset(h_data, 0, MON_DATA_HEADER_SPACE);
412

    
413
                        sd_data_inf.remote_socketID = &(connectbuf[con_id]->external_socketID);
414
#ifdef FEC
415
                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize){
416
                           //@add padding bits to msg!
417
                           int npaks=0;
418
                           int toffset=0;
419
                           int tpkt_len=connectbuf[con_id]->pmtusize;
420
                           int ipad = (connectbuf[con_id]->pmtusize-(msg_len%(connectbuf[con_id]->pmtusize)));
421
                           Pmsg = (char*) malloc((msg_len + ipad)*sizeof ( char* ));
422
                            for(i=0; i<msg_len; i++){
423
                                *(Pmsg+i)=*(((char *)msg)+i);
424
                                icnt++;
425
                            }
426
                            for(i=msg_len; i<(msg_len+ipad); i++){
427
                                *(Pmsg+i)=0;
428
                                icnt++;
429
                            }
430
                            msg=Pmsg;
431
                            msg_len=(msg_len+ipad);
432
                            npaks=(int)(msg_len/connectbuf[con_id]->pmtusize);
433
                            npaksX2=2*npaks; //2 times.
434
                            src = ( char ** ) malloc ( npaksX2 * sizeof ( char* ));
435
                            pkt = ( char ** ) malloc ( npaksX2 * sizeof ( char* ));
436
                            code = fec_new(npaks,256);
437
                            for(i=0; i<npaks; i++){
438
                              src[i]= (msg + toffset);
439
                              toffset += tpkt_len;
440
                            }
441
                            for(i=npaks; i<npaksX2; i++){//X2
442
                              src[i] = malloc( tpkt_len * sizeof ( char ) );
443
                            }
444
                            for(i=0; i<npaksX2; i++){//X2
445
                             pkt[i] = ( char* )malloc( tpkt_len * sizeof ( char ) );
446
                             fec_encode(code, src, pkt[i], i, tpkt_len) ;
447
                            }
448
                            for(i=npaks; i<npaksX2; i++){//X2
449
                              free(src[i]);
450
                            }
451
                        }
452
#endif
453
                        sd_data_inf.buffer = msg;
454
                        sd_data_inf.bufSize = msg_len;
455
                        sd_data_inf.msgtype = msg_type;
456
                        sd_data_inf.monitoringDataHeader = iov[2].iov_base;
457
                        sd_data_inf.monitoringDataHeaderLen = iov[2].iov_len;
458
                        sd_data_inf.priority = sParams->priority;
459
                        sd_data_inf.padding = sParams->padding;
460
                        sd_data_inf.confirmation = sParams->confirmation;
461
                        sd_data_inf.reliable = sParams->reliable;
462
                        memset(&sd_data_inf.arrival_time, 0, sizeof(struct timeval));
463

    
464
                        (get_Send_data_inf_cb) ((void *) &sd_data_inf);
465
                }
466

    
467
                do {
468
                        bool break2 = false;
469

    
470
                        if(set_Monitoring_header_pkt_cb != NULL) {
471
                                iov[1].iov_len = (set_Monitoring_header_pkt_cb) (&(connectbuf[con_id]->external_socketID), msg_type);
472
                        }
473
#ifdef FEC
474
                        pkt_len = min(connectbuf[con_id]->pmtusize, chk_msg_len - offset) ;
475
#else
476
                        pkt_len = min(connectbuf[con_id]->pmtusize - iov[2].iov_len - iov[1].iov_len - iov[0].iov_len, msg_len - offset) ;
477
#endif
478
                        iov[3].iov_len = pkt_len;
479
#ifdef FEC
480
                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){ //&& lcnt<(msg_len/connectbuf[con_id]->pmtusize)
481
                              iov[3].iov_base = pkt[lcnt];
482
                              chk_msg_len=(msg_len*2); //half-rate.
483
                        } else {
484
                              iov[3].iov_base = msg + offset;
485
                              chk_msg_len=msg_len;
486
                        }
487
#else        
488
                        iov[3].iov_base = msg + offset;
489
#endif
490

    
491
                        //fill header
492
                        msg_h.len_mon_packet_hdr = iov[1].iov_len;
493
                        msg_h.offset = htonl(offset);
494
                        msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
495

    
496
#ifdef FEC
497
                        if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
498
                                mon_pkt_inf pkt_info;
499
                                memset(h_pkt,0,MON_PKT_HEADER_SPACE);
500
                                pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
501
                                if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){
502
                                    pkt_info.buffer = pkt[lcnt];
503
                                    chk_msg_len=(msg_len*2);
504
                                } else {
505
                                    pkt_info.buffer = msg + offset;
506
                                    chk_msg_len=msg_len;
507
                                }
508

    
509
                                pkt_info.bufSize = pkt_len;
510
                                pkt_info.msgtype = msg_type;
511
                                pkt_info.dataID = connectbuf[con_id]->seqnr;
512
                                pkt_info.offset = offset;
513
                                pkt_info.datasize = msg_len;
514
                                pkt_info.monitoringHeaderLen = iov[1].iov_len;
515
                                pkt_info.monitoringHeader = iov[1].iov_base;
516
                                pkt_info.ttl = -1;
517
                                memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
518
                                (get_Send_pkt_inf_cb) ((void *) &pkt_info);
519
                        }
520
#endif
521

    
522

    
523
                        debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
524
                        int priority = 0; 
525
                        if ((msg_type == ML_CON_MSG)
526
#ifdef RTX
527
 || (msg_type == ML_NACK_MSG)
528
#endif
529
) priority = HP;
530
                        //fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
531
                        switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
532
                                case MSGLEN:
533
                                        info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
534
                                        // TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
535
                                        connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
536
                                        if (connectbuf[con_id]->pmtusize > 0) {
537
                                                connectbuf[con_id]->delay = true;
538
                                                retry = true;
539
                                        }
540
                                        break2 = true;
541
                                        break;
542
                                case FAILURE:
543
                                        info("ML: sending message failed (to:%s conID:%d lconID:%d msgsize:%d msgtype:%d offset:%d)\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, msg_h.msg_type, offset);
544
                                        break2 = true;
545
                                        break;
546
                                case THROTTLE:
547
                                        debug("THROTTLE on output"); 
548
                                        break2 = true;
549
                                        break;
550
                                case OK:
551
#ifdef RTX
552
                                        if (msg_type < 127) counters.sentDataPktCounter++;
553
#endif
554
                                        //update
555
                                        offset += pkt_len;
556
#ifdef FEC
557
                                        if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize && lcnt<((msg_len*2)/connectbuf[con_id]->pmtusize)){
558
                                          lcnt++;
559
                                        }
560
#endif
561
                                        //transmit data header only in the first packet
562
                                        iov[2].iov_len = 0;
563
                                        break;
564
                        }
565
                        if (break2) break;
566
#ifdef FEC
567
                } while(offset != chk_msg_len && !truncable);
568
                if(msg_type==17 && msg_len>connectbuf[con_id]->pmtusize){ //free the pointers.
569
                        free(Pmsg);
570
                        free(src);
571
                        for(i=0; i<npaksX2; i++) {
572
                          free(pkt[i]);
573
                        }
574
                        free(pkt);
575
                        fec_free(code);
576
                }
577
#else
578
                } while(offset != msg_len && !truncable);
579
#endif
580
        } while(retry);
581
        //fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
582
        //fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
583
}
584

    
585
void pmtu_timeout_cb(int fd, short event, void *arg);
586

    
587
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
588
        //monitoring layer hook
589
        if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
590
                mon_pkt_inf pkt_info;        
591

    
592
                struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
593

    
594
                memset(iov[1].iov_base,0,iov[1].iov_len);
595

    
596
                pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
597
                pkt_info.buffer = iov[3].iov_base;
598
                pkt_info.bufSize = iov[3].iov_len;
599
                pkt_info.msgtype = msg_h->msg_type;
600
                pkt_info.dataID = ntohl(msg_h->msg_seq_num);
601
                pkt_info.offset = ntohl(msg_h->offset);
602
                pkt_info.datasize = ntohl(msg_h->msg_length);
603
                pkt_info.monitoringHeaderLen = iov[1].iov_len;
604
                pkt_info.monitoringHeader = iov[1].iov_base;
605
                pkt_info.ttl = -1;
606
                memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
607

    
608
                (get_Send_pkt_inf_cb) ((void *) &pkt_info);
609
        }
610

    
611
         //struct msg_header *msg_h;
612
    //msg_h = (struct msg_header *) iov[0].iov_base;        
613

    
614
        //fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
615

    
616
        return sendPacketFinal(udpSocket, iov, len, socketaddr);
617
}
618

    
619
void reschedule_conn_msg(int con_id)
620
{
621
        if (connectbuf[con_id]->timeout_event) {
622
                /* delete old timout */        
623
                event_del(connectbuf[con_id]->timeout_event);
624
                event_free(connectbuf[con_id]->timeout_event);
625
        }
626
        connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
627
        evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
628
}
629

    
630
void send_conn_msg(int con_id, int buf_size, int command_type)
631
{
632
        if (buf_size < sizeof(struct conn_msg)) {
633
                error("ML: requested connection message size is too small\n");
634
                return;
635
        }
636

    
637
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
638
                connectbuf[con_id]->ctrl_msg_buf = malloc(buf_size);
639
                memset(connectbuf[con_id]->ctrl_msg_buf, 0, buf_size);
640
        }
641

    
642
        if(connectbuf[con_id]->ctrl_msg_buf == NULL) {
643
                error("ML: can not allocate memory for connection message\n");
644
                return;
645
        }
646

    
647
        struct conn_msg *msg_header = (struct conn_msg*) connectbuf[con_id]->ctrl_msg_buf;
648

    
649
        msg_header->comand_type = command_type;
650
        msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
651

    
652
        memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
653
  {
654
                        char buf[SOCKETID_STRING_SIZE];
655
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
656
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
657
   }
658
        send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
659
}
660

    
661
void send_conn_msg_with_pmtu_discovery(int con_id, int buf_size, int command_type)
662
{
663
        struct timeval tout = {0,0};
664
        tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
665
        connectbuf[con_id]->timeout_value = tout;
666
        connectbuf[con_id]->trials = 1;
667
        send_conn_msg(con_id, buf_size, command_type);
668
        reschedule_conn_msg(con_id);
669
}
670

    
671
void resend_conn_msg(int con_id)
672
{
673
        connectbuf[con_id]->trials++;
674
        send_conn_msg(con_id, connectbuf[con_id]->pmtusize, connectbuf[con_id]->status);
675
        reschedule_conn_msg(con_id);
676
}
677

    
678
void recv_conn_msg(struct msg_header *msg_h, char *msgbuf, int msg_size, struct sockaddr_in *recv_addr)
679
{
680
        struct conn_msg *con_msg;
681
        int free_con_id, con_id;
682

    
683
        time_t now = time(NULL);
684
        double timediff = 0.0;
685
        char sock_id_str[1000];
686
        
687
        msgbuf += msg_h->len_mon_data_hdr;
688
        msg_size -= msg_h->len_mon_data_hdr;
689
        con_msg = (struct conn_msg *)msgbuf;
690
        
691
        //verify message validity
692
        if (msg_size < sizeof(struct conn_msg)) {
693
                char recv_addr_str[INET_ADDRSTRLEN];
694
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
695
                info("Invalid conn_msg received from %s\n", recv_addr_str);
696
                return;
697
        }
698

    
699
        //decode sock_id for debug messages
700
        mlSocketIDToString(&con_msg->sock_id,sock_id_str,999);
701

    
702
        if (con_msg->sock_id.internal_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr &&
703
            con_msg->sock_id.external_addr.udpaddr.sin_addr.s_addr != recv_addr->sin_addr.s_addr   ) {
704
                char recv_addr_str[INET_ADDRSTRLEN];
705
                inet_ntop(AF_INET, &(recv_addr->sin_addr.s_addr), recv_addr_str, INET_ADDRSTRLEN);
706
                info("Conn msg received from %s, but claims to be from %s", recv_addr_str, sock_id_str);
707
                return;
708
        }
709

    
710
        // Monitoring layer hook
711
        if(get_Recv_data_inf_cb != NULL) {
712
                // update pointer to the real data
713
                mon_data_inf recv_data_inf;
714
                recv_data_inf.remote_socketID = &(con_msg->sock_id);
715
                recv_data_inf.buffer = msgbuf;
716
                recv_data_inf.bufSize = msg_size;
717
                recv_data_inf.msgtype = msg_h->msg_type;
718
                recv_data_inf.monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
719
                recv_data_inf.monitoringDataHeader = msg_h->len_mon_data_hdr ? msgbuf : NULL;
720
                gettimeofday(&recv_data_inf.arrival_time, NULL);
721
                recv_data_inf.firstPacketArrived = true;
722
                recv_data_inf.recvFragments = 1;
723
                recv_data_inf.priority = false;
724
                recv_data_inf.padding = false;
725
                recv_data_inf.confirmation = false;
726
                recv_data_inf.reliable = false;
727

    
728
                // send data recv callback to monitoring module
729
                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
730
        }
731

    
732
        // check the connection command type
733
        switch (con_msg->comand_type) {
734
                /*
735
                * if INVITE: enter a new socket make new entry in connect array
736
                * send an ok
737
                */
738
                case INVITE:
739
                        info("ML: received INVITE from %s (size:%d)\n", sock_id_str, msg_size);
740
                        /*
741
                        * check if another connection for the external connectionID exist
742
                        * that was established within the last 2 seconds
743
                        */
744
                        free_con_id = -1;
745
                        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
746
                                if (connectbuf[con_id] != NULL) {
747
                                        if (mlCompareSocketIDs(&(connectbuf[con_id]->external_socketID), &(con_msg->sock_id)) == 0) {
748
                                                //timediff = difftime(now, connectbuf[con_id]->starttime);        //TODO: why this timeout? Shouldn't the connection be closed instead if there is a timeout?
749
                                                //if (timediff < 2)
750
                                                //update remote connection ID
751
                                                if (connectbuf[con_id]->external_connectionID != msg_h->local_con_id) {
752
                                                        warn("ML: updating remote connection ID for %s: from %d to %d\n",sock_id_str, connectbuf[con_id]->external_connectionID, msg_h->local_con_id);
753
                                                        connectbuf[con_id]->external_connectionID = msg_h->local_con_id;
754
                                                }
755
                                                break;
756
                                        }
757
                                } else if(free_con_id == -1)
758
                                        free_con_id = con_id;
759
                        }
760

    
761
                        if (con_id == CONNECTBUFSIZE) {
762
                                // create an entry in the connecttrybuf
763
                                if(free_con_id == -1) {
764
                                        error("ML: no new connect_buf available\n");
765
                                        return;
766
                                }
767
                                connectbuf[free_con_id] = (connect_data *) malloc(sizeof(connect_data));
768
                                memset(connectbuf[free_con_id],0,sizeof(connect_data));
769
                                connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
770
                                connectbuf[free_con_id]->starttime = time(NULL);
771
                                memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
772
                //Workaround to support reuse of socketID
773
                                connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
774
                                connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
775
                                connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;        // bootstrap pmtu from the other's size. Not strictly needed, but a good hint
776
                                connectbuf[free_con_id]->timeout_event = NULL;
777
                                connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
778
                                connectbuf[free_con_id]->internal_connect =
779
                                        (con_msg->sock_id.internal_addr.udpaddr.sin_addr.s_addr == recv_addr->sin_addr.s_addr);
780
                                con_id = free_con_id;
781
                        }
782

    
783
                        //if(connectbuf[con_id]->status <= CONNECT) { //TODO: anwer anyway. Why the outher would invite otherwise?
784
                                //update status and send back answer
785
                                connectbuf[con_id]->status = CONNECT;
786
                                send_conn_msg_with_pmtu_discovery(con_id, con_msg->pmtu_size, CONNECT);
787
                        //}
788
                        break;
789
                case CONNECT:
790
                        info("ML: received CONNECT from %s (size:%d)\n", sock_id_str, msg_size);
791

    
792
                        if(msg_h->remote_con_id != -1 && connectbuf[msg_h->remote_con_id] == NULL) {
793
                                error("ML: received CONNECT for inexistent connection rconID:%d\n",msg_h->remote_con_id);
794
                                return;
795
                        }
796

    
797
                        /*
798
                        * check if the connection status is not already 1 or 2
799
                        */
800
                        if (connectbuf[msg_h->remote_con_id]->status == INVITE) {
801
                                // set the external connectionID
802
                                connectbuf[msg_h->remote_con_id]->external_connectionID = msg_h->local_con_id;
803
                                // change status con_msg the connection_data
804
                                connectbuf[msg_h->remote_con_id]->status = READY;
805
                                // change pmtusize in the connection_data: not needed. receiving a CONNECT means our INVITE went through. So why change pmtu?
806
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
807

    
808
                                // send the READY
809
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
810

    
811
                                if (receive_Connection_cb != NULL)
812
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
813

    
814
                                // call all registered callbacks
815
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
816
                                        struct receive_connection_cb_list *temp;
817
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
818
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
819
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
820
                                        free(temp);
821
                                }
822
                                connectbuf[msg_h->remote_con_id]->connection_head =
823
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
824
                        } else {
825
                                // send the READY
826
                                send_conn_msg_with_pmtu_discovery(msg_h->remote_con_id, con_msg->pmtu_size, READY);
827
                        }
828

    
829
                        debug("ML: active connection established\n");
830
                        break;
831

    
832
                        /*
833
                        * if READY: find the entry in the connection array set the
834
                        * connection active change the pmtu size
835
                        */
836
                case READY:
837
                        info("ML: received READY from %s (size:%d)\n", sock_id_str, msg_size);
838
                        if(connectbuf[msg_h->remote_con_id] == NULL) {
839
                                error("ML: received READY for inexistent connection\n");
840
                                return;
841
                        }
842
                        /*
843
                        * checks if the connection is not already established
844
                        */
845
                        if (connectbuf[msg_h->remote_con_id]->status == CONNECT) {
846
                                // change status of the connection
847
                                connectbuf[msg_h->remote_con_id]->status = READY;
848
                                // change pmtusize: not needed. pmtu doesn't have to be symmetric
849
                                //connectbuf[msg_h->remote_con_id]->pmtusize = con_msg->pmtu_size;
850

    
851
                                if (receive_Connection_cb != NULL)
852
                                        (receive_Connection_cb) (msg_h->remote_con_id, NULL);
853

    
854
                                while(connectbuf[msg_h->remote_con_id]->connection_head != NULL) {
855
                                        struct receive_connection_cb_list *temp;
856
                                        temp = connectbuf[msg_h->remote_con_id]->connection_head;
857
                                        (temp->connection_cb) (msg_h->remote_con_id, temp->arg);
858
                                        connectbuf[msg_h->remote_con_id]->connection_head = temp->next;
859
                                        free(temp);
860
                                }
861
                                connectbuf[msg_h->remote_con_id]->connection_head =
862
                                        connectbuf[msg_h->remote_con_id]->connection_last = NULL;
863
                                debug("ML: passive connection established\n");
864
                        }
865
                        break;
866
        }
867
}
868

    
869
void recv_stun_msg(char *msgbuf, int recvSize)
870
{
871
        /*
872
        * create empty stun message struct
873
        */
874
        StunMessage resp;
875
        memset(&resp, 0, sizeof(StunMessage));
876
        /*
877
        * parse the message
878
        */
879
        int returnValue = 0;
880
        returnValue = recv_stun_message(msgbuf, recvSize, &resp);
881

    
882
        if (returnValue == 0) {
883
                /*
884
                * read the reflexive Address into the local_socketID
885
                */
886
                struct sockaddr_in reflexiveAddr = {0};
887
                reflexiveAddr.sin_family = AF_INET;
888
                reflexiveAddr.sin_addr.s_addr = htonl(resp.mappedAddress.ipv4.addr);
889
                reflexiveAddr.sin_port = htons(resp.mappedAddress.ipv4.port);
890
                socketaddrgen reflexiveAddres = {0};
891
                reflexiveAddres.udpaddr = reflexiveAddr;
892
                local_socketID.external_addr = reflexiveAddres;
893
                NAT_traversal = true;
894
                // callback to the upper layer indicating that the socketID is now
895
                // ready to use
896
                {
897
                        char buf[SOCKETID_STRING_SIZE];
898
                        mlSocketIDToString(&local_socketID,buf,sizeof(buf));
899
                         debug("received local socket_address: %s\n", buf);
900
                }
901
                (receive_SocketID_cb) (&local_socketID, 0);
902
        }
903
}
904

    
905
//done
906
void recv_timeout_cb(int fd, short event, void *arg)
907
{
908
        int recv_id = (long) arg;
909
        debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
910

    
911
        if (recvdatabuf[recv_id] == NULL) {
912
                return;
913
        }
914

    
915
        if(recvdatabuf[recv_id]->status == ACTIVE) {
916
                // Monitoring layer hook
917
                if(get_Recv_data_inf_cb != NULL) {
918
                        mon_data_inf recv_data_inf;
919

    
920
                        recv_data_inf.remote_socketID =
921
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
922
                        recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
923
                        recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
924
                        recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
925
                        recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
926
                        recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
927
                                recvdatabuf[recv_id]->recvbuf : NULL;
928
                        gettimeofday(&recv_data_inf.arrival_time, NULL);
929
                        recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
930
                        recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
931
                        recv_data_inf.priority = false;
932
                        recv_data_inf.padding = false;
933
                        recv_data_inf.confirmation = false;
934
                        recv_data_inf.reliable = false;
935

    
936
                        // send data recv callback to monitoring module
937

    
938
//                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
939
                }
940

    
941
                // Get the right callback
942
                receive_data_cb receive_data_callback = recvcbbuf[recvdatabuf[recv_id]->msgtype];
943

    
944
                recv_params rParams;
945

    
946
                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->arrivedBytes;
947
                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
948
                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
949
                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
950
                rParams.remote_socketID =
951
                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
952
                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
953

    
954
#ifdef RTX
955
                counters.receivedIncompleteMsgCounter++;
956
                //mlShowCounters();
957
                //fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);                
958
#endif
959
                 //(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
960
        }
961

    
962
        //clean up
963
        if (recvdatabuf[recv_id]->timeout_event) {
964
                debug("ML: freeing timeout for %d",recv_id);
965
                event_del(recvdatabuf[recv_id]->timeout_event);
966
                event_free(recvdatabuf[recv_id]->timeout_event);
967
                recvdatabuf[recv_id]->timeout_event = NULL;
968
        }
969
        free(recvdatabuf[recv_id]->recvbuf);
970
#ifdef RTX
971
        if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
972
                debug("ML: freeing last packet timeout for %d",recv_id);
973
                event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
974
                event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
975
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
976
        }
977
#endif
978
#ifdef FEC
979
        free(recvdatabuf[recv_id]->pix);
980
        free(recvdatabuf[recv_id]->pix_chk);
981
#endif
982
        free(recvdatabuf[recv_id]);
983
        recvdatabuf[recv_id] = NULL;
984
}
985

    
986
// process a single recv data message
987
void recv_data_msg(struct msg_header *msg_h, char *msgbuf, int bufsize)
988
{
989
#ifdef FEC
990
        void *code;
991
        int n, k, i, j;
992
        n=4;
993
        k=64;
994
        char **src;
995
#endif
996
        debug("ML: received packet of size %d with rconID:%d lconID:%d type:%d offset:%d inlength: %d\n",bufsize,msg_h->remote_con_id,msg_h->local_con_id,msg_h->msg_type,msg_h->offset, msg_h->msg_length);
997

    
998
        int recv_id, free_recv_id = -1;
999
        int pmtusize;
1000

    
1001
        if(connectbuf[msg_h->remote_con_id] == NULL) {
1002
                debug("ML: Received a message not related to any opened connection!\n");
1003
                return;
1004
        }
1005
        pmtusize = connectbuf[msg_h->remote_con_id]->pmtusize;
1006

    
1007
#ifdef RTX
1008
        counters.receivedDataPktCounter++;
1009
#endif        
1010
        // check if a recv_data exist and enter data
1011
        for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++) {
1012
                if (recvdatabuf[recv_id] != NULL) {
1013
                        if (msg_h->remote_con_id == recvdatabuf[recv_id]->connectionID &&
1014
                                        msg_h->msg_seq_num == recvdatabuf[recv_id]->seqnr)
1015
                                                break;
1016
                } else
1017
                        if(free_recv_id == -1)
1018
                                free_recv_id = recv_id;
1019
  }
1020

    
1021
        if(recv_id == RECVDATABUFSIZE) {
1022
                debug(" recv id not found (free found: %d)\n", free_recv_id);
1023
                //no recv_data found: create one
1024
                recv_id = free_recv_id;
1025
                recvdatabuf[recv_id] = (recvdata *) malloc(sizeof(recvdata));
1026
                memset(recvdatabuf[recv_id], 0, sizeof(recvdata));
1027
                recvdatabuf[recv_id]->connectionID = msg_h->remote_con_id;
1028
                recvdatabuf[recv_id]->seqnr = msg_h->msg_seq_num;
1029
                recvdatabuf[recv_id]->monitoringDataHeaderLen = msg_h->len_mon_data_hdr;
1030
                recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
1031
                recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
1032
                recvdatabuf[recv_id]->arrivedBytes = 0;        //count this without the Mon headers
1033
#ifdef RTX
1034
                recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
1035
                recvdatabuf[recv_id]->expectedOffset = 0;
1036
                recvdatabuf[recv_id]->gapCounter = 0;
1037
                recvdatabuf[recv_id]->firstGap = 0;
1038
                recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1039
#endif
1040

    
1041
                /*
1042
                * read the timeout data and set it
1043
                */
1044
                recvdatabuf[recv_id]->timeout_value = recv_timeout;
1045
                recvdatabuf[recv_id]->timeout_event = NULL;
1046
                recvdatabuf[recv_id]->recvID = recv_id;
1047
                recvdatabuf[recv_id]->starttime = time(NULL);
1048
                recvdatabuf[recv_id]->msgtype = msg_h->msg_type;
1049

    
1050
#ifdef FEC
1051
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize){
1052
                  recvdatabuf[recv_id]->nix=0;
1053
                  recvdatabuf[recv_id]->pix = ( int * ) malloc ( (recvdatabuf[recv_id]->bufsize/pmtusize) * sizeof ( int ));
1054
                  recvdatabuf[recv_id]->pix_chk = ( int * ) malloc ( (recvdatabuf[recv_id]->bufsize/pmtusize) * sizeof ( int ));
1055
                  for(i=0;i<(recvdatabuf[recv_id]->bufsize/pmtusize);i++)
1056
                      recvdatabuf[recv_id]->pix_chk[i] = 0;
1057
                }
1058
#endif
1059

    
1060
                // fill the buffer with zeros
1061
                memset(recvdatabuf[recv_id]->recvbuf, 0, recvdatabuf[recv_id]->bufsize);
1062
                debug(" new @ id:%d\n",recv_id);
1063
        } else {        //message structure already exists, no need to create new
1064
                debug(" found @ id:%d (arrived before this packet: bytes:%d fragments%d\n",recv_id, recvdatabuf[recv_id]->arrivedBytes, recvdatabuf[recv_id]->recvFragments);
1065
                if(recvdatabuf[recv_id]->status == COMPLETE) {
1066
                  return;
1067
                }
1068
        }
1069

    
1070
        //if first packet extract mon data header and advance pointer
1071
        if (msg_h->offset == 0) {
1072
                //fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
1073
                memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
1074
                msgbuf += msg_h->len_mon_data_hdr;
1075
                bufsize -= msg_h->len_mon_data_hdr;
1076
                recvdatabuf[recv_id]->firstPacketArrived = 1;
1077
        }
1078

    
1079

    
1080
        // increment fragmentnr
1081
        recvdatabuf[recv_id]->recvFragments++;
1082
        // increment the arrivedBytes
1083
        recvdatabuf[recv_id]->arrivedBytes += bufsize; 
1084

    
1085
        //fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
1086

    
1087
        // enter the data into the buffer
1088
#ifdef FEC
1089
        if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize && recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]==0)
1090
          memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + (recvdatabuf[recv_id]->nix)*pmtusize, msgbuf, bufsize);
1091
        else
1092
          memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
1093
        //TODO very basic checkif all fragments arrived: has to be reviewed
1094
#else
1095
        memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
1096
#endif
1097

    
1098
#ifdef RTX
1099
        // detecting a new gap        
1100
        if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
1101
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
1102
                recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
1103
                if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
1104
                event_base_once(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id, &pkt_recv_timeout);
1105
        }
1106
        
1107
        //filling the gap by delayed packets
1108
        if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
1109
                counters.receivedRTXDataPktCounter++;
1110
                //skip retransmitted packets
1111
                if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
1112
                        int i;
1113
                        //fprintf(stderr,"firstGap: %d        gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
1114
                        for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
1115
                                if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
1116
                                        recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
1117
                                        break;
1118
                                }
1119
                                if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
1120
                                        recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
1121
                                        break;
1122
                                }
1123
                        }
1124
                } else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
1125
                        //counters.receivedRTXDataPktCounter++;
1126
                        }
1127
        }
1128

    
1129
        //updating the expectedOffset        
1130
        if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
1131
#endif
1132

    
1133
        //TODO very basic checkif all fragments arrived: has to be reviewed
1134
        if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) {
1135
                recvdatabuf[recv_id]->status = COMPLETE; //buffer full -> msg completly arrived
1136
#ifdef FEC
1137
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize){
1138
                  prev_sqnr=recvdatabuf[recv_id]->seqnr;
1139
                  int npaks=0;
1140
                  int toffset=20;
1141
                  int tpkt_len=pmtusize;
1142
                  npaks=(int)(recvdatabuf[recv_id]->bufsize/pmtusize);
1143
                  src = ( char ** )malloc ( npaks * sizeof ( char * ));
1144
                  code = fec_new(npaks,256);
1145
                  for(i=0; i<npaks; i++){
1146
                      src[i] = ( char * )malloc(tpkt_len * sizeof ( char ) );
1147
                      for(j=0; j<tpkt_len; j++){
1148
                        if (toffset+j < recvdatabuf[recv_id]->bufsize) {
1149
                          *(src[i]+j)=*(recvdatabuf[recv_id]->recvbuf+toffset+j);
1150
                        }else {
1151
                          *(src[i]+j)=0;
1152
                        }
1153
                      }
1154
                      toffset += tpkt_len;
1155
                  }
1156
                  recvdatabuf[recv_id]->pix[recvdatabuf[recv_id]->nix]=(int)(msg_h->offset/pmtusize);
1157
                  fec_decode(code, src, recvdatabuf[recv_id]->pix, tpkt_len);
1158
                  toffset=20;
1159
                  for(i=0; i<npaks; i++){
1160
                    for(j=0; j<tpkt_len; j++){
1161
                      if (toffset+j < recvdatabuf[recv_id]->bufsize) {
1162
                        *(recvdatabuf[recv_id]->recvbuf+toffset+j)=*(src[i]+j);
1163
                      }
1164
                    }
1165
                    toffset+=tpkt_len;
1166
                  }
1167
                  recvdatabuf[recv_id]->firstPacketArrived = 1;        //we've decoded the first packet as well
1168
                    fec_free(code);
1169
                    for(i=0; i<npaks; i++){
1170
                      free(src[i]);
1171
                    }
1172
                    free(src);
1173
                    nix=0;
1174
                    recvdatabuf[recv_id]->nix=0;
1175
                }
1176
#endif
1177
        } else {
1178
                recvdatabuf[recv_id]->status = ACTIVE;
1179
#ifdef FEC
1180
                if(recvdatabuf[recv_id]->msgtype==17 && recvdatabuf[recv_id]->bufsize>pmtusize && recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]==0){
1181
                  recvdatabuf[recv_id]->pix[recvdatabuf[recv_id]->nix]=(int)(msg_h->offset/pmtusize);
1182
                  recvdatabuf[recv_id]->pix_chk[recvdatabuf[recv_id]->nix]=1;
1183
                  recvdatabuf[recv_id]->nix++;
1184
                }
1185
#endif
1186
        }
1187

    
1188
        if (recv_data_callback) {
1189
                if(recvdatabuf[recv_id]->status == COMPLETE) {
1190
                        // Monitoring layer hook
1191
                        if(get_Recv_data_inf_cb != NULL) {
1192
                                mon_data_inf recv_data_inf;
1193

    
1194
                                recv_data_inf.remote_socketID =
1195
                                         &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1196
                                recv_data_inf.buffer = recvdatabuf[recv_id]->recvbuf;
1197
                                recv_data_inf.bufSize = recvdatabuf[recv_id]->bufsize;
1198
                                recv_data_inf.msgtype = recvdatabuf[recv_id]->msgtype;
1199
                                recv_data_inf.monitoringDataHeaderLen = recvdatabuf[recv_id]->monitoringDataHeaderLen;
1200
                                recv_data_inf.monitoringDataHeader = recvdatabuf[recv_id]->monitoringDataHeaderLen ?
1201
                                        recvdatabuf[recv_id]->recvbuf : NULL;
1202
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
1203
                                recv_data_inf.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1204
                                recv_data_inf.recvFragments = recvdatabuf[recv_id]->recvFragments;
1205
                                recv_data_inf.priority = false;
1206
                                recv_data_inf.padding = false;
1207
                                recv_data_inf.confirmation = false;
1208
                                recv_data_inf.reliable = false;
1209

    
1210
                                // send data recv callback to monitoring module
1211

    
1212
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
1213
                        }
1214

    
1215
                        // Get the right callback
1216
                        receive_data_cb receive_data_callback = recvcbbuf[msg_h->msg_type];
1217
                        if (receive_data_callback) {
1218

    
1219
                                recv_params rParams;
1220

    
1221
                                rParams.nrMissingBytes = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen - recvdatabuf[recv_id]->arrivedBytes;
1222
                                rParams.recvFragments = recvdatabuf[recv_id]->recvFragments;
1223
                                rParams.msgtype = recvdatabuf[recv_id]->msgtype;
1224
                                rParams.connectionID = recvdatabuf[recv_id]->connectionID;
1225
                                rParams.remote_socketID =
1226
                                        &(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
1227

    
1228
                                char str[1000];
1229
                                mlSocketIDToString(rParams.remote_socketID,str,999);
1230
                                debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
1231
                                rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1232

    
1233
#ifdef RTX
1234
                                counters.receivedCompleteMsgCounter++;
1235
                                //mlShowCounters();
1236
#endif
1237

    
1238
                                (receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1239
                                        recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1240
                        } else {
1241
                            warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1242
                        }
1243
                        
1244
                        //clean up after a timeout
1245
                }
1246

    
1247
                //start time out for cleaning up this slot
1248
                if (!recvdatabuf[recv_id]->timeout_event) {
1249
                        //TODO make timeout at least a DEFINE
1250
                        recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
1251
                        evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1252
#ifdef RTX
1253
                        recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1254
                        evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1255
#endif
1256
                }
1257
        }
1258
}
1259

    
1260
//done
1261
void pmtu_timeout_cb(int fd, short event, void *arg)
1262
{
1263

    
1264
        int con_id = (long) arg;
1265
        pmtu new_pmtusize;
1266

    
1267
        debug("ML: pmtu timeout called (lcon:%d)\n",con_id);
1268

    
1269
        if(connectbuf[con_id] == NULL) {
1270
                error("ML: pmtu timeout called on non existing con_id\n");
1271
                return;
1272
        }
1273

    
1274
        if(connectbuf[con_id]->status == READY) {
1275
                // nothing to do anymore
1276
                event_del(connectbuf[con_id]->timeout_event);
1277
                event_free(connectbuf[con_id]->timeout_event);
1278
                connectbuf[con_id]->timeout_event = NULL;
1279
                return;
1280
        }
1281

    
1282
        info("ML: pmtu timeout while connecting(to:%s lcon:%d status:%d size:%d trial:%d tout:%ld.%06ld)\n",conid_to_string(con_id), con_id, connectbuf[con_id]->status, connectbuf[con_id]->pmtusize, connectbuf[con_id]->trials, connectbuf[con_id]->timeout_value.tv_sec, connectbuf[con_id]->timeout_value.tv_usec);
1283

    
1284
        if(connectbuf[con_id]->delay || connectbuf[con_id]->trials == MAX_TRIALS - 1) {
1285
                double delay = connectbuf[con_id]->timeout_value.tv_sec + connectbuf[con_id]->timeout_value.tv_usec / 1000000.0;
1286
                delay = delay * 2;
1287
                info("\tML: increasing pmtu timeout to %f sec\n", delay);
1288
                connectbuf[con_id]->timeout_value.tv_sec = floor(delay);
1289
                connectbuf[con_id]->timeout_value.tv_usec = fmod(delay, 1.0) * 1000000.0;
1290
                if(connectbuf[con_id]->delay) {
1291
                        connectbuf[con_id]->delay = false;
1292
                        reschedule_conn_msg(con_id);
1293
                }
1294
        }
1295

    
1296
        if(connectbuf[con_id]->trials == MAX_TRIALS) {
1297
                // decrement the pmtu size
1298
                struct timeval tout = {0,0};
1299
                tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
1300
                info("\tML: decreasing pmtu estimate from %d to %d\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize));
1301
                connectbuf[con_id]->pmtusize = pmtu_decrement(connectbuf[con_id]->pmtusize);
1302
                connectbuf[con_id]->timeout_value = tout; 
1303
                connectbuf[con_id]->trials = 0;
1304
        }
1305

    
1306
        //error in PMTU discovery?
1307
        if (connectbuf[con_id]->pmtusize == P_ERROR) {
1308
                if (connectbuf[con_id]->internal_connect == true) {
1309
                        //as of now we tried directly connecting, now let's try trough the NAT
1310
                        connectbuf[con_id]->internal_connect = false;
1311
                        connectbuf[con_id]->pmtusize = DSLSLIM;
1312
                } else {
1313
                        //nothing to do we have to give up
1314
                        error("ML: Could not create connection with connectionID %i!\n",con_id);
1315
                        // envoke the callback for failed connection establishment
1316
                        if(failed_Connection_cb != NULL)
1317
                                (failed_Connection_cb) (con_id, NULL);
1318
                        // delete the connection entry
1319
                        mlCloseConnection(con_id);
1320
                        return;
1321
                }
1322
        }
1323

    
1324
        //retry
1325
        resend_conn_msg(con_id);
1326
}
1327

    
1328

    
1329
int schedule_pmtu_timeout(int con_id)
1330
{
1331
        if (! connectbuf[con_id]->timeout_event) {
1332
                struct timeval tout = {0,0};
1333
                tout.tv_usec = PMTU_TIMEOUT * (1.0+ 0.1 *((double)rand()/(double)RAND_MAX-0.5));
1334
                connectbuf[con_id]->timeout_value = tout;
1335
                connectbuf[con_id]->trials = 1;
1336
                connectbuf[con_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &pmtu_timeout_cb, (void *) (long)con_id);
1337
                evtimer_add(connectbuf[con_id]->timeout_event, &connectbuf[con_id]->timeout_value);
1338
        }
1339
}
1340

    
1341
/*
1342
 * decrements the mtu size
1343
 */
1344
pmtu pmtu_decrement(pmtu pmtusize)
1345
{
1346
        pmtu pmtu_return_size;
1347
        switch(pmtusize) {
1348
        case MAX:
1349
                //return DSL;
1350
                return DSLSLIM;        //shortcut to use less vales
1351
        case DSL:
1352
                return DSLMEDIUM;
1353
        case DSLMEDIUM:
1354
                return DSLSLIM;
1355
        case DSLSLIM:
1356
                //return BELOWDSL;
1357
                return MIN;        //shortcut to use less vales
1358
        case BELOWDSL:
1359
                return MIN;
1360
        case MIN:
1361
                return P_ERROR;
1362
        default:
1363
                warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
1364
                return MIN;
1365
        }
1366
}
1367

    
1368
// called when an ICMP pmtu error message (type 3, code 4) is received
1369
void pmtu_error_cb_th(char *msg, int msglen)
1370
{
1371
        debug("ML: pmtu_error callback called msg_size: %d\n",msglen);
1372
        //TODO debug
1373
        return;
1374

    
1375
    char *msgbufptr = NULL;
1376
    int msgtype;
1377
    int connectionID;
1378
    pmtu pmtusize;
1379
    pmtu new_pmtusize;
1380
    int dead = 0;
1381

    
1382
    // check the packettype
1383
    msgbufptr = &msg[0];
1384

    
1385
    // check the msgtype
1386
    msgbufptr = &msg[1];
1387
    memcpy(&msgtype, msgbufptr, 4);
1388

    
1389
    if (msgtype == 0) {
1390

    
1391
        // get the connectionID
1392
        msgbufptr = &msg[5];
1393
        memcpy(&connectionID, msgbufptr, 4);
1394

    
1395
        int msgtype_c = connectbuf[connectionID]->status;
1396
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1397

    
1398
        if (msgtype_c != msgtype) {
1399
            dead = 1;
1400
        }
1401

    
1402

    
1403
    } else if (msgtype == 1) {
1404

    
1405
        // read the connectionID
1406
        msgbufptr = &msg[9];
1407
        memcpy(&connectionID, msgbufptr, 4);
1408

    
1409
        int msgtype_c = connectbuf[connectionID]->status;
1410
//        pmtusize = connectbuf[connectionID]->pmtutrysize;
1411

    
1412
        if (msgtype_c != msgtype) {
1413
            dead = 1;
1414
        }
1415

    
1416
    }
1417
    // decrement the pmtu size
1418
    new_pmtusize = pmtu_decrement(pmtusize);
1419

    
1420
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1421

    
1422
    if (new_pmtusize == P_ERROR) {
1423
                error("ML:  Could not create connection with connectionID %i !\n",
1424
                        connectionID);
1425

    
1426
                if(failed_Connection_cb != NULL)
1427
                        (failed_Connection_cb) (connectionID, NULL);
1428
                // set the message type to a non existent message
1429
                msgtype = 2;
1430
                // delete the connection entry
1431
                 mlCloseConnection(connectionID);
1432
        }
1433

    
1434
    if (msgtype == 0 && dead != 1) {
1435

    
1436
        // stop the timeout event
1437
        // timeout_del(connectbuf[connectionID]->timeout);
1438
        /*
1439
         * libevent2
1440
         */
1441

    
1442
        // event_del(connectbuf[connectionID]->timeout);
1443

    
1444

    
1445
        // create and send a connection message
1446
//         create_conn_msg(new_pmtusize, connectionID,
1447
//                         &local_socketID, INVITE);
1448

    
1449
//        send_conn_msg(connectionID, new_pmtusize);
1450

    
1451
        // set a timeout event for the pmtu discovery
1452
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1453
        // *)&connectionID);
1454

    
1455
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1456

    
1457
        /*
1458
         * libevent2
1459
         */
1460

    
1461
        struct event *ev;
1462
        ev = evtimer_new(base, pmtu_timeout_cb,
1463
                         (void *) connectbuf[connectionID]);
1464

    
1465
        // connectbuf[connectionID]->timeout = ev;
1466

    
1467
        event_add(ev, &connectbuf[connectionID]->timeout_value);
1468

    
1469
    } else if (msgtype == 1 && dead != 1) {
1470

    
1471
        // stop the timeout event
1472
        // timeout_del(connectbuf[connectionID]->timeout);
1473

    
1474
        /*
1475
         * libevent2
1476
         */
1477
        // info("still here 11 \n");
1478
        // printf("ev %d \n",connectbuf[connectionID]->timeout);
1479
        // event_del(connectbuf[connectionID]->timeout );
1480
        // evtimer_del(connectbuf[connectionID]->timeout );
1481

    
1482

    
1483
//         // create and send a connection message
1484
//         create_conn_msg(new_pmtusize,
1485
//                         connectbuf[connectionID]->connectionID,
1486
//                         NULL, CONNECT);
1487

    
1488
        //send_conn_msg(connectionID, new_pmtusize);
1489

    
1490
        // set a timeout event for the pmtu discovery
1491
        // timeout_set(connectbuf[connectionID]->timeout,pmtu_timeout_cb,(void
1492
        // *)&connectionID);
1493
        // timeout_add(connectbuf[connectionID]->timeout,&connectbuf[connectionID]->timeout_value);
1494

    
1495
        /*
1496
         * libevent2
1497
         */
1498
        // struct event *ev;
1499
        // ev = evtimer_new(base,pmtu_timeout_cb, (void
1500
        // *)connectbuf[connectionID]);
1501
        // connectbuf[connectionID]->timeout = ev;
1502
        // event_add(ev,&connectbuf[connectionID]->timeout_value);
1503

    
1504
    }
1505
}
1506

    
1507
/*
1508
 * what to do once a packet arrived if it is a conn packet send it to
1509
 * recv_conn handler if it is a data packet send it to the recv_data
1510
 * handler
1511
 */
1512

    
1513
//done --
1514
void recv_pkg(int fd, short event, void *arg)
1515
{
1516
        debug("ML: recv_pkg called\n");
1517

    
1518
        struct msg_header *msg_h;
1519
        char msgbuf[MAX];
1520
        pmtu recvSize = MAX;
1521
        char *bufptr = msgbuf;
1522
        int ttl;
1523
        struct sockaddr_in recv_addr;
1524
        int msg_size;
1525

    
1526
        recvPacket(fd, msgbuf, &recvSize, &recv_addr, pmtu_error_cb_th, &ttl);
1527

    
1528

    
1529
        // check if it is not just an ERROR message
1530
        if(recvSize < 0)
1531
                return;
1532

    
1533
        // @TODO check if this simplistic STUN message recognition really always works, probably not
1534
        unsigned short stun_bind_response = 0x0101;
1535
        unsigned short * msgspot = (unsigned short *) msgbuf;
1536
        if (*msgspot == stun_bind_response) {
1537
                debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1538
                recv_stun_msg(msgbuf, recvSize);
1539
                return;
1540
        }
1541

    
1542
        msg_h = (struct msg_header *) msgbuf;
1543

    
1544
        uint32_t inlen = ntohl(msg_h->msg_length);
1545
        if(inlen > 0x20000 || inlen < ntohl(msg_h->offset) || inlen == 0) {
1546
            warn("ML: BAD PACKET received from: %s:%d (len: %d < %d [=%08X] o:%d)", 
1547
                  inet_ntoa(recv_addr.sin_addr), recv_addr.sin_port,
1548
                               recvSize, inlen, inlen, ntohl(msg_h->offset));
1549
 warn("ML: received %d: %02X %02X %02X %02X %02X %02X - %02X %02X %02X %02X %02X %02X", recvSize,
1550
            msgbuf[0], msgbuf[1],msgbuf[2],msgbuf[3],msgbuf[4],msgbuf[5],
1551
            msgbuf[6],msgbuf[7],msgbuf[8],msgbuf[9],msgbuf[10],msgbuf[11]);
1552

    
1553
            return;
1554
       }
1555

    
1556
        /* convert header from network to host order */
1557
        msg_h->offset = ntohl(msg_h->offset);
1558
        msg_h->msg_length = ntohl(msg_h->msg_length);
1559
        msg_h->local_con_id = ntohl(msg_h->local_con_id);
1560
        msg_h->remote_con_id = ntohl(msg_h->remote_con_id);
1561
        msg_h->msg_seq_num = ntohl(msg_h->msg_seq_num);
1562

    
1563
        //verify minimum size
1564
        if (recvSize < sizeof(struct msg_header)) {
1565
          info("UDP packet too small, can't be an ML packet");
1566
          return;
1567
        }
1568

    
1569
        //TODO add more verifications
1570

    
1571
        bufptr += MSG_HEADER_SIZE + msg_h->len_mon_packet_hdr;
1572
        msg_size = recvSize - MSG_HEADER_SIZE - msg_h->len_mon_packet_hdr;
1573

    
1574
        //verify more fields
1575
        if (msg_size < 0) {
1576
          info("Corrupted UDP packet received");
1577
          return;
1578
        }
1579

    
1580
        if(get_Recv_pkt_inf_cb != NULL) {
1581
                mon_pkt_inf msginfNow;
1582
                msginfNow.monitoringHeaderLen = msg_h->len_mon_packet_hdr;
1583
                msginfNow.monitoringHeader = msg_h->len_mon_packet_hdr ? &msgbuf[0] + MSG_HEADER_SIZE : NULL;
1584
                //TODO rethink this ...
1585
                if(msg_h->msg_type == ML_CON_MSG) {
1586
                        struct conn_msg *c_msg = (struct conn_msg *) bufptr;
1587
                        msginfNow.remote_socketID = &(c_msg->sock_id);
1588
                }
1589
                else if(msg_h->remote_con_id < 0 || 
1590
                       msg_h->remote_con_id >= CONNECTBUFSIZE || 
1591
                       connectbuf[msg_h->remote_con_id] == NULL) {
1592
                        error("ML: received pkg called with non existent connection\n");
1593
                        return;
1594
                } else
1595
                        msginfNow.remote_socketID = &(connectbuf[msg_h->remote_con_id]->external_socketID);
1596
                msginfNow.buffer = bufptr;
1597
                msginfNow.bufSize = recvSize;
1598
                msginfNow.msgtype = msg_h->msg_type;
1599
                msginfNow.ttl = ttl;
1600
                msginfNow.dataID = msg_h->msg_seq_num;
1601
                msginfNow.offset = msg_h->offset;
1602
                msginfNow.datasize = msg_h->msg_length;
1603
                gettimeofday(&msginfNow.arrival_time, NULL);
1604
                (get_Recv_pkt_inf_cb) ((void *) &msginfNow);
1605
        }
1606

    
1607

    
1608
        switch(msg_h->msg_type) {
1609
                case ML_CON_MSG:
1610
                        debug("ML: received conn pkg\n");
1611
                        recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1612
                        break;
1613
#ifdef RTX
1614
                case ML_NACK_MSG:
1615
                        debug("ML: received nack pkg\n");
1616
                        recv_nack_msg(msg_h, bufptr, msg_size);
1617
                        break;
1618
#endif
1619
                default:
1620
                        if(msg_h->msg_type < 127) {
1621
                                debug("ML: received data pkg\n");
1622
                                recv_data_msg(msg_h, bufptr, msg_size);
1623
                                break;
1624
                        }
1625
                        debug("ML: unrecognised msg_type\n");
1626
                        break;
1627
        }
1628
}
1629

    
1630

    
1631
void try_stun();
1632

    
1633
/*
1634
 * the timeout of the NAT traversal
1635
 */
1636
void nat_traversal_timeout(int fd, short event, void *arg)
1637
{
1638
debug("X. NatTrTo %d\n", NAT_traversal);
1639
        if (NAT_traversal == false) {
1640
                debug("ML: NAT traversal request re-send\n");
1641
                if(receive_SocketID_cb)
1642
                        (receive_SocketID_cb) (&local_socketID, 2);
1643
                try_stun();
1644
        }
1645
debug("X. NatTrTo\n");
1646
}
1647

    
1648
//return IP address, or INADDR_NONE if can't resolve
1649
unsigned long resolve(const char *ipaddr)
1650
{
1651
        struct hostent *h = gethostbyname(ipaddr);
1652
        if (!h) {
1653
                error("ML: Unable to resolve host name %s\n", ipaddr);
1654
                return INADDR_NONE;
1655
        }
1656
        unsigned long *addr = (unsigned long *) (h->h_addr);
1657
        return *addr;
1658
}
1659

    
1660

    
1661
/*
1662
 * returns the file descriptor, or <0 on error. The ipaddr can be a null
1663
 * pointer. Then all available ipaddr on the machine are choosen.
1664
 */
1665
int create_socket(const int port, const char *ipaddr)
1666
{
1667
        struct sockaddr_in udpaddr = {0};
1668
        udpaddr.sin_family = AF_INET;
1669
        debug("X. create_socket %s, %d\n", ipaddr, port);
1670
        if (ipaddr == NULL) {
1671
                /*
1672
                * try to guess the local IP address
1673
                */
1674
                const char *ipaddr_iface = mlAutodetectIPAddress();
1675
                if (ipaddr_iface) {
1676
                        udpaddr.sin_addr.s_addr = inet_addr(ipaddr_iface);
1677
                } else {
1678
                        udpaddr.sin_addr.s_addr = INADDR_ANY;
1679
                }
1680
        } else {
1681
                udpaddr.sin_addr.s_addr = inet_addr(ipaddr);
1682
        }
1683
        udpaddr.sin_port = htons(port);
1684

    
1685
        socketaddrgen udpgen;
1686
        memset(&udpgen,0,sizeof(socketaddrgen));        //this will be sent over the net, so set it to 0
1687
        udpgen.udpaddr = udpaddr;
1688
        local_socketID.internal_addr = udpgen;
1689

    
1690
        socketfd = createSocket(port, ipaddr);
1691
        if (socketfd < 0){
1692
                return socketfd;
1693
        }
1694

    
1695
        struct event *ev;
1696
        ev = event_new(base, socketfd, EV_READ | EV_PERSIST, recv_pkg, NULL);
1697

    
1698
        event_add(ev, NULL);
1699

    
1700
        try_stun();
1701

    
1702
        return socketfd;
1703
}
1704

    
1705
/*
1706
 * try to figure out external IP using STUN, if defined
1707
 */
1708
void try_stun()
1709
{
1710
        if (isStunDefined()) {
1711
                struct timeval timeout_value_NAT_traversal = NAT_TRAVERSAL_TIMEOUT;
1712

    
1713
                /*
1714
                * send the NAT traversal STUN request
1715
                */
1716
                 send_stun_request(socketfd, &stun_server);
1717

    
1718
                /*
1719
                * enter a NAT traversal timeout that takes care of retransmission
1720
                */
1721
                event_base_once(base, -1, EV_TIMEOUT, &nat_traversal_timeout, NULL, &timeout_value_NAT_traversal);
1722

    
1723
                NAT_traversal = false;
1724
        } else {
1725
                /*
1726
                * Assume we have accessibility and copy internal address to external one
1727
                */
1728
                local_socketID.external_addr = local_socketID.internal_addr;
1729
                NAT_traversal = true; // @TODO: this is not really NAT traversal, but a flag that init is over
1730
                // callback to the upper layer indicating that the socketID is now
1731
                // ready to use
1732
                if(receive_SocketID_cb)
1733
                        (receive_SocketID_cb) (&local_socketID, 0); //success
1734
        }
1735
}
1736

    
1737
/**************************** END OF INTERNAL ***********************/
1738

    
1739
/**************************** MONL functions *************************/
1740

    
1741
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg){
1742

    
1743
/*X*/ //  fprintf(stderr,"MLINIT1 %s, %d, %s, %d\n", ipaddr, port, stun_ipaddr, stun_port);
1744
        base = (struct event_base *) arg;
1745
        recv_data_callback = recv_data_cb;
1746
        mlSetRecvTimeout(timeout_value);
1747
        if (stun_ipaddr) {
1748
                 mlSetStunServer(stun_port, stun_ipaddr);
1749
        } else {
1750

    
1751
        }
1752
        register_recv_localsocketID_cb(local_socketID_cb);
1753
/*X*/ //  fprintf(stderr,"MLINIT1\n");
1754
        return create_socket(port, ipaddr);
1755
}
1756

    
1757
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold) {
1758
        setOutputRateParams(bucketsize, drainrate);
1759
        setQueuesParams (maxQueueSize, maxQueueSizeRTX, maxTimeToHold);
1760
}
1761
     
1762
void mlSetVerbosity (int log_level) {
1763
        setLogLevel(log_level);
1764
}
1765

    
1766
/* register callbacks  */
1767
void mlRegisterGetRecvPktInf(get_recv_pkt_inf_cb recv_pkt_inf_cb){
1768

    
1769
        if (recv_pkt_inf_cb == NULL) {
1770
                error("ML: Register get_recv_pkt_inf_cb failed: NULL ptr  \n");
1771
        } else {
1772
                get_Recv_pkt_inf_cb = recv_pkt_inf_cb;
1773
        }
1774
}
1775

    
1776
void mlRegisterGetSendPktInf(get_send_pkt_inf_cb  send_pkt_inf_cb){
1777

    
1778
        if (send_pkt_inf_cb == NULL) {
1779
                error("ML: Register get_send_pkt_inf_cb: NULL ptr  \n");
1780
        } else {
1781
                get_Send_pkt_inf_cb = send_pkt_inf_cb;
1782
        }
1783
}
1784

    
1785

    
1786
void mlRegisterSetMonitoringHeaderPktCb(set_monitoring_header_pkt_cb monitoring_header_pkt_cb ){
1787

    
1788
        if (monitoring_header_pkt_cb == NULL) {
1789
                error("ML: Register set_monitoring_header_pkt_cb: NULL ptr  \n");
1790
        } else {
1791
                set_Monitoring_header_pkt_cb = monitoring_header_pkt_cb;
1792
        }
1793
}
1794

    
1795
void mlRegisterGetRecvDataInf(get_recv_data_inf_cb recv_data_inf_cb){
1796

    
1797
        if (recv_data_inf_cb == NULL) {
1798
                error("ML: Register get_recv_data_inf_cb: NULL ptr  \n");
1799
        } else {
1800
                get_Recv_data_inf_cb = recv_data_inf_cb;
1801
        }
1802
}
1803

    
1804
void mlRegisterGetSendDataInf(get_send_data_inf_cb  send_data_inf_cb){
1805

    
1806
        if (send_data_inf_cb == NULL) {
1807
                error("ML: Register get_send_data_inf_cb: NULL ptr  \n");
1808
        } else {
1809
                get_Send_data_inf_cb = send_data_inf_cb;
1810
        }
1811
}
1812

    
1813
void mlRegisterSetMonitoringHeaderDataCb(set_monitoring_header_data_cb monitoring_header_data_cb){
1814

    
1815
        if (monitoring_header_data_cb == NULL) {
1816
                error("ML: Register set_monitoring_header_data_cb : NULL ptr  \n");
1817
        } else {
1818
                set_Monitoring_header_data_cb = monitoring_header_data_cb;
1819
        }
1820
}
1821

    
1822
void mlSetRecvTimeout(struct timeval timeout_value){
1823

    
1824
        recv_timeout = timeout_value;
1825
#ifdef RTX
1826
        unsigned int total_usec = recv_timeout.tv_sec * 1000000 + recv_timeout.tv_usec;
1827
        total_usec = total_usec * LAST_PKT_RECV_TIMEOUT_FRACTION;
1828
        last_pkt_recv_timeout.tv_sec = total_usec / 1000000;
1829
        last_pkt_recv_timeout.tv_usec = total_usec - last_pkt_recv_timeout.tv_sec * 1000000;
1830
        fprintf(stderr,"Timeout for receiving message: %d : %d\n", recv_timeout.tv_sec, recv_timeout.tv_usec);        
1831
        fprintf(stderr,"Timeout for last pkt: %d : %d\n", last_pkt_recv_timeout.tv_sec, last_pkt_recv_timeout.tv_usec);        
1832
#endif
1833
}
1834

    
1835
int mlGetStandardTTL(socketID_handle socketID,uint8_t *ttl){
1836

    
1837
        return getTTL(socketfd, ttl);
1838

    
1839
}
1840

    
1841
socketID_handle mlGetLocalSocketID(int *errorstatus){
1842

    
1843
        if (NAT_traversal == false) {
1844
                *errorstatus = 2;
1845
                return NULL;
1846
        }
1847

    
1848
        *errorstatus = 0;
1849
        return &local_socketID;
1850

    
1851
}
1852

    
1853

    
1854
/**************************** END of MONL functions *************************/
1855

    
1856
/**************************** GENERAL functions *************************/
1857

    
1858
void mlRegisterRecvConnectionCb(receive_connection_cb recv_conn_cb){
1859

    
1860
        if (recv_conn_cb == NULL) {
1861
                error("ML: Register receive_connection_cb: NULL ptr  \n");
1862
        }else {
1863
                receive_Connection_cb = recv_conn_cb;
1864
        }
1865
}
1866

    
1867
void mlRegisterErrorConnectionCb(connection_failed_cb conn_failed){
1868

    
1869
        if (conn_failed == NULL) {
1870
                error("ML: Register connection_failed_cb: NULL ptr  \n");
1871
        } else {
1872
                failed_Connection_cb = conn_failed;
1873
        }
1874
}
1875

    
1876
void mlRegisterRecvDataCb(receive_data_cb data_cb,unsigned char msgtype){
1877

    
1878
    if (msgtype > 126) {
1879

    
1880
            error
1881
            ("ML: Could not register recv_data callback. Msgtype is greater then 126 \n");
1882

    
1883
    }
1884

    
1885
    if (data_cb == NULL) {
1886

    
1887
            error("ML: Register receive data callback: NUll ptr \n ");
1888

    
1889
    } else {
1890

    
1891
        recvcbbuf[msgtype] = data_cb;
1892

    
1893
    }
1894

    
1895
}
1896

    
1897
void mlCloseSocket(socketID_handle socketID){
1898

    
1899
        free(socketID);
1900

    
1901
}
1902

    
1903
void keepalive_fn(evutil_socket_t fd, short what, void *arg) {
1904
        socketID_handle peer = arg;
1905

    
1906
        int con_id = mlConnectionExist(peer, false);
1907
        if (con_id < 0 || connectbuf[con_id]->defaultSendParams.keepalive <= 0) {
1908
                /* Connection fell from under us or keepalive was disabled */
1909
                free(arg);
1910
                return;
1911
        }
1912

    
1913
        /* do what we gotta do */
1914
        if ( connectbuf[con_id]->status == READY) {
1915
                char keepaliveMsg[32] = "";
1916
                sprintf(keepaliveMsg, "KEEPALIVE %d", connectbuf[con_id]->keepalive_seq++);
1917
                send_msg(con_id, MSG_TYPE_ML_KEEPALIVE, keepaliveMsg, 1 + strlen(keepaliveMsg), false, 
1918
                        &(connectbuf[con_id]->defaultSendParams));
1919
        }
1920

    
1921
        /* re-schedule */
1922
        struct timeval t = { 0,0 };
1923
        t.tv_sec = connectbuf[con_id]->defaultSendParams.keepalive;
1924
        if (connectbuf[con_id]->defaultSendParams.keepalive) 
1925
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1926
}
1927

    
1928
void setupKeepalive(int conn_id) {
1929
        /* Save the peer's address for us */
1930
        socketID_handle peer = malloc(sizeof(socket_ID));
1931
        memcpy(peer, &connectbuf[conn_id]->external_socketID, sizeof(socket_ID));
1932

    
1933
        struct timeval t = { 0,0 };
1934
        t.tv_sec = connectbuf[conn_id]->defaultSendParams.keepalive;
1935

    
1936
        if (connectbuf[conn_id]->defaultSendParams.keepalive) 
1937
                event_base_once(base, -1, EV_TIMEOUT, keepalive_fn, peer, &t);
1938
}
1939

    
1940
/* connection functions */
1941
int mlOpenConnection(socketID_handle external_socketID,receive_connection_cb connection_cb,void *arg, const send_params defaultSendParams){
1942

    
1943
        int con_id;
1944
        if (external_socketID == NULL) {
1945
                error("ML: cannot open connection: one of the socketIDs is NULL\n");
1946
                return -1;
1947
        }
1948
        if (NAT_traversal == false) {
1949
                error("ML: cannot open connection: NAT traversal for socketID still in progress\n");
1950
                return -1;
1951
        }
1952
        if (connection_cb == NULL) {
1953
                error("ML: cannot open connection: connection_cb is NULL\n");
1954
                return -1;
1955
        }
1956

    
1957
        // check if that connection already exist
1958

    
1959
        con_id = mlConnectionExist(external_socketID, false);
1960
        if (con_id >= 0) {
1961
                // overwrite defaultSendParams
1962
                bool newKeepalive = 
1963
                        connectbuf[con_id]->defaultSendParams.keepalive == 0 && defaultSendParams.keepalive != 0;
1964
                connectbuf[con_id]->defaultSendParams = defaultSendParams;
1965
                if (newKeepalive) setupKeepalive(con_id);
1966
                // if so check if it is ready to use
1967
                if (connectbuf[con_id]->status == READY) {
1968
                                // if so use the callback immediately
1969
                                (connection_cb) (con_id, arg);
1970

    
1971
                // otherwise just write the connection cb and the arg pointer
1972
                // into the connection struct
1973
                } else {
1974
                        struct receive_connection_cb_list *temp;
1975
                        temp = malloc(sizeof(struct receive_connection_cb_list));
1976
                        temp->next = NULL;
1977
                        temp->connection_cb = connection_cb;
1978
                        temp->arg = arg;
1979
                        if(connectbuf[con_id]->connection_last != NULL) {
1980
                                connectbuf[con_id]->connection_last->next = temp;
1981
                                connectbuf[con_id]->connection_last = temp;
1982
                        } else
1983
                                connectbuf[con_id]->connection_last = connectbuf[con_id]->connection_head = temp;
1984
                }
1985
                return con_id;
1986
        }
1987
        // make entry in connection_establishment array
1988
        for (con_id = 0; con_id < CONNECTBUFSIZE; con_id++) {
1989
                if (connectbuf[con_id] == NULL) {
1990
                        connectbuf[con_id] = (connect_data *) malloc(sizeof(connect_data));
1991
                        memset(connectbuf[con_id],0,sizeof(connect_data));
1992
                        connectbuf[con_id]->starttime = time(NULL);
1993
                        memcpy(&connectbuf[con_id]->external_socketID, external_socketID, sizeof(socket_ID));
1994
                        connectbuf[con_id]->pmtusize = DSLSLIM;
1995
                        connectbuf[con_id]->timeout_event = NULL;
1996
                        connectbuf[con_id]->status = INVITE;
1997
                        connectbuf[con_id]->seqnr = 0;
1998
                        connectbuf[con_id]->internal_connect = internal_connect_plausible(external_socketID, &local_socketID);
1999
                        connectbuf[con_id]->connectionID = con_id;
2000

    
2001
                        connectbuf[con_id]->connection_head = connectbuf[con_id]->connection_last = malloc(sizeof(struct receive_connection_cb_list));
2002
                        connectbuf[con_id]->connection_last->next = NULL;
2003
                        connectbuf[con_id]->connection_last->connection_cb = connection_cb;
2004
                        connectbuf[con_id]->connection_last->arg = arg;
2005
                        connectbuf[con_id]->external_connectionID = -1;
2006

    
2007
                        connectbuf[con_id]->defaultSendParams = defaultSendParams;
2008
                        if (defaultSendParams.keepalive) setupKeepalive(con_id);
2009
                        break;
2010
                }
2011
        } //end of for
2012

    
2013
        if (con_id == CONNECTBUFSIZE) {
2014
                error("ML: Could not open connection: connection buffer full\n");
2015
                return -1;
2016
        }
2017

    
2018
        // create and send a connection message
2019
        info("ML:Sending INVITE to %s (lconn:%d)\n",conid_to_string(con_id), con_id);
2020
        send_conn_msg_with_pmtu_discovery(con_id, connectbuf[con_id]->pmtusize, INVITE);
2021

    
2022
        return con_id;
2023

    
2024
}
2025

    
2026
void mlCloseConnection(const int connectionID){
2027

    
2028
        // remove it from the connection array
2029
        if(connectbuf[connectionID]) {
2030
                if(connectbuf[connectionID]->ctrl_msg_buf) {
2031
                        free(connectbuf[connectionID]->ctrl_msg_buf);
2032
                }
2033
                // remove related events
2034
                if (connectbuf[connectionID]->timeout_event) {
2035
                        event_del(connectbuf[connectionID]->timeout_event);
2036
                        event_free(connectbuf[connectionID]->timeout_event);
2037
                        connectbuf[connectionID]->timeout_event = NULL;
2038
                }
2039
                free(connectbuf[connectionID]);
2040
                connectbuf[connectionID] = NULL;
2041
        }
2042

    
2043
}
2044

    
2045
void mlSendData(const int connectionID,char *sendbuf,int bufsize,unsigned char msgtype,send_params *sParams){
2046

    
2047
        if (connectionID < 0) {
2048
                error("ML: send data failed: connectionID does not exist\n");
2049
                return;
2050
        }
2051

    
2052
        if (connectbuf[connectionID] == NULL) {
2053
                error("ML: send data failed: connectionID does not exist\n");
2054
                return;
2055
        }
2056
        if (connectbuf[connectionID]->status != READY) {
2057
            error("ML: send data failed: connection is not active\n");
2058
            return;
2059
        }
2060

    
2061
        if (sParams == NULL) {
2062
                sParams = &(connectbuf[connectionID]->defaultSendParams);
2063
        }
2064

    
2065
        send_msg(connectionID, msgtype, sendbuf, bufsize, false, sParams);
2066

    
2067
}
2068

    
2069
/* transmit data functions  */
2070
int mlSendAllData(const int connectionID,send_all_data_container *container,int nr_entries,unsigned char msgtype,send_params *sParams){
2071

    
2072
    if (nr_entries < 1 || nr_entries > 5) {
2073

    
2074
        error
2075
            ("ML : sendALlData : nr_enties is not between 1 and 5 \n ");
2076
        return 0;
2077

    
2078
    } else {
2079

    
2080
        if (nr_entries == 1) {
2081

    
2082
                mlSendData(connectionID, container->buffer_1,
2083
                      container->length_1, msgtype, sParams);
2084

    
2085
            return 1;
2086

    
2087
        } else if (nr_entries == 2) {
2088

    
2089
            int buflen = container->length_1 + container->length_2;
2090
            char buf[buflen];
2091
            memcpy(buf, container->buffer_1, container->length_1);
2092
            memcpy(&buf[container->length_1], container->buffer_2,
2093
                   container->length_2);
2094
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2095

    
2096
            return 1;
2097

    
2098
        } else if (nr_entries == 3) {
2099

    
2100
            int buflen =
2101
                container->length_1 + container->length_2 +
2102
                container->length_3;
2103
            char buf[buflen];
2104
            memcpy(buf, container->buffer_1, container->length_1);
2105
            memcpy(&buf[container->length_1], container->buffer_2,
2106
                   container->length_2);
2107
            memcpy(&buf[container->length_2], container->buffer_3,
2108
                   container->length_3);
2109
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2110

    
2111

    
2112
            return 1;
2113

    
2114
        } else if (nr_entries == 4) {
2115

    
2116
            int buflen =
2117
                container->length_1 + container->length_2 +
2118
                container->length_3 + container->length_4;
2119
            char buf[buflen];
2120
            memcpy(buf, container->buffer_1, container->length_1);
2121
            memcpy(&buf[container->length_1], container->buffer_2,
2122
                   container->length_2);
2123
            memcpy(&buf[container->length_2], container->buffer_3,
2124
                   container->length_3);
2125
            memcpy(&buf[container->length_3], container->buffer_4,
2126
                   container->length_4);
2127
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2128

    
2129
            return 1;
2130

    
2131
        } else {
2132

    
2133
            int buflen =
2134
                container->length_1 + container->length_2 +
2135
                container->length_3 + container->length_4 +
2136
                container->length_5;
2137
            char buf[buflen];
2138
            memcpy(buf, container->buffer_1, container->length_1);
2139
            memcpy(&buf[container->length_1], container->buffer_2,
2140
                   container->length_2);
2141
            memcpy(&buf[container->length_2], container->buffer_3,
2142
                   container->length_3);
2143
            memcpy(&buf[container->length_3], container->buffer_4,
2144
                   container->length_4);
2145
            memcpy(&buf[container->length_4], container->buffer_5,
2146
                   container->length_5);
2147
            mlSendData(connectionID, buf, buflen, msgtype, sParams);
2148

    
2149
            return 1;
2150
        }
2151

    
2152
    }
2153

    
2154
}
2155

    
2156
int mlRecvData(const int connectionID,char *recvbuf,int *bufsize,recv_params *rParams){
2157

    
2158
        //TODO yet to be converted
2159
        return 0;
2160
#if 0
2161
        if (rParams == NULL) {
2162
                error("ML: recv_data failed: recv_params is a NULL ptr\n");
2163
                return 0;
2164
    } else {
2165

2166
        info("ML: recv data called \n");
2167

2168
        int i = 0;
2169
        int returnValue = 0;
2170
        double timeout = (double) recv_timeout.tv_sec;
2171
        time_t endtime = time(NULL);
2172

2173
        for (i = 0; i < RECVDATABUFSIZE; i++) {
2174

2175
            if (recvdatabuf[i] != NULL) {
2176

2177
                if (recvdatabuf[i]->connectionID == connectionID) {
2178

2179
                    info("ML: recv data has entry  \n");
2180

2181
                    double timepass = difftime(endtime, recvdatabuf[i]->starttime);
2182

2183
                    // check if the specified connection has data and it
2184
                    // is complete
2185
                    // check the data seqnr
2186
                    // if(connectionID == recvdatabuf[i]->connectionID &&
2187
                    // 1 == recvdatabuf[i]->status){
2188

2189
                    if (1 == recvdatabuf[i]->status) {
2190

2191
                        // info("transmissionHandler: recv_data set is
2192
                        // complete \n" );
2193

2194
                        // debug("debud \n");
2195

2196
                        // exchange the pointers
2197
                        int buffersize = 0;
2198
                        buffersize = recvdatabuf[i]->bufsize;
2199
                        *bufsize = buffersize;
2200
                        // recvbuf = recvdatabuf[i]->recvbuf;
2201

2202
                        // info("buffersize %d \n",buffersize);
2203
                        memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2204
                               buffersize);
2205
                        // debug(" recvbuf %s \n",recvbuf );
2206

2207
//                         double nrMissFrags =
2208
//                             (double) recvdatabuf[i]->nrFragments /
2209
//                             (double) recvdatabuf[i]->recvFragments;
2210
//                         int nrMissingFragments = (int) ceil(nrMissFrags);
2211

2212
//                        rParams->nrMissingFragments = nrMissingFragments;
2213
//                         rParams->nrFragments = recvdatabuf[i]->nrFragments;
2214
                        rParams->msgtype = recvdatabuf[i]->msgtype;
2215
                        rParams->connectionID =
2216
                            recvdatabuf[i]->connectionID;
2217

2218
                        // break from the loop
2219
                        // debug(" recvbuf %s \n ",recvbuf);
2220

2221
                        // double nrMissFrags =
2222
                        // (double)recvdatabuf[i]->nrFragments /
2223
                        // (double)recvdatabuf[i]->recvFragments;
2224
                        // int nrMissingFragments =
2225
                        // (int)ceil(nrMissFrags);
2226

2227
                        if(get_Recv_data_inf_cb != NULL) {
2228
                                mon_data_inf recv_data_inf;
2229

2230
                                recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2231
                                recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2232
                                recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2233
                                recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2234
//                                 recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2235
//                                 recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2236
                                gettimeofday(&recv_data_inf.arrival_time, NULL);
2237
                                recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2238
                                recv_data_inf.nrMissingFragments = nrMissingFragments;
2239
                                recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2240
                                recv_data_inf.priority = false;
2241
                                recv_data_inf.padding = false;
2242
                                recv_data_inf.confirmation = false;
2243
                                recv_data_inf.reliable = false;
2244

2245
                                // send data recv callback to monitoring module
2246

2247
                                (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2248
                        }
2249

2250

2251
                        // free the allocated memory
2252
                        free(recvdatabuf[i]);
2253
                        recvdatabuf[i] = NULL;
2254

2255
                        returnValue = 1;
2256
                        break;
2257

2258
                    }
2259

2260
                    if (recvdatabuf[i] != NULL) {
2261

2262
                        if (timepass > timeout) {
2263

2264
                            info("ML: recv_data timeout called  \n");
2265

2266
                            // some data about the missing chunks should
2267
                            // be added here
2268
                            // exchange the pointers
2269
                            int buffersize = 0;
2270
                            buffersize = recvdatabuf[i]->bufsize;
2271
                            *bufsize = buffersize;
2272
                            // recvbuf = recvdatabuf[i]->recvbuf;
2273

2274
                            double nrMissFrags =
2275
                                (double) recvdatabuf[i]->nrFragments /
2276
                                (double) recvdatabuf[i]->recvFragments;
2277
                            int nrMissingFragments =
2278
                                (int) ceil(nrMissFrags);
2279

2280
                            // debug(" recvbuf %s \n",recvbuf );
2281

2282
                            memcpy(recvbuf, recvdatabuf[i]->recvbuf,
2283
                                   buffersize);
2284

2285
                            rParams->nrMissingFragments =
2286
                                nrMissingFragments;
2287
                            rParams->nrFragments =
2288
                                recvdatabuf[i]->nrFragments;
2289
                            rParams->msgtype = recvdatabuf[i]->msgtype;
2290
                            rParams->connectionID =
2291
                                recvdatabuf[i]->connectionID;
2292

2293
                                if(get_Recv_data_inf_cb != NULL) {
2294
                                        mon_data_inf recv_data_inf;
2295

2296
                                        recv_data_inf.remote_socketID = &(connectbuf[connectionID]->external_socketID);
2297
                                        recv_data_inf.buffer = recvdatabuf[i]->recvbuf;
2298
                                        recv_data_inf.bufSize = recvdatabuf[i]->bufsize;
2299
                                        recv_data_inf.msgtype = recvdatabuf[i]->msgtype;
2300
                                        recv_data_inf.monitoringHeaderType = recvdatabuf[i]->monitoringHeaderType;
2301
                                        recv_data_inf.monitoringDataHeader = recvdatabuf[i]->monitoringDataHeader;
2302
                                        gettimeofday(&recv_data_inf.arrival_time, NULL);
2303
                                        recv_data_inf.firstPacketArrived = recvdatabuf[i]->firstPacketArrived;
2304
                                        recv_data_inf.nrMissingFragments = nrMissingFragments;
2305
                                        recv_data_inf.nrFragments = recvdatabuf[i]->nrFragments;
2306
                                        recv_data_inf.priority = false;
2307
                                        recv_data_inf.padding = false;
2308
                                        recv_data_inf.confirmation = false;
2309
                                        recv_data_inf.reliable = false;
2310

2311
                                        // send data recv callback to monitoring module
2312

2313
                                        (get_Recv_data_inf_cb) ((void *) &recv_data_inf);
2314
                                }
2315

2316
                            // free the allocated memory
2317
                            free(recvdatabuf[i]);
2318
                            recvdatabuf[i] = NULL;
2319

2320
                            returnValue = 1;
2321
                            break;
2322

2323
                        }
2324
                    }
2325

2326
                }
2327

2328
            }
2329
            // debug("2 recvbuf %s \n ",recvbuf);
2330
        }
2331
        return returnValue;
2332
    }
2333
#endif
2334

    
2335
}
2336

    
2337
int mlSocketIDToString(socketID_handle socketID,char* socketID_string, size_t len){
2338

    
2339
        char internal_addr[INET_ADDRSTRLEN];
2340
        char external_addr[INET_ADDRSTRLEN];
2341

    
2342
        assert(socketID);
2343

    
2344
        inet_ntop(AF_INET, &(socketID->internal_addr.udpaddr.sin_addr.s_addr), internal_addr, INET_ADDRSTRLEN);
2345
        inet_ntop(AF_INET, &(socketID->external_addr.udpaddr.sin_addr.s_addr), external_addr, INET_ADDRSTRLEN);
2346

    
2347
        snprintf(socketID_string,len,"%s:%d-%s:%d", internal_addr, ntohs(socketID->internal_addr.udpaddr.sin_port),
2348
                external_addr,        ntohs(socketID->external_addr.udpaddr.sin_port));
2349
        return 0;
2350

    
2351
}
2352

    
2353
int mlStringToSocketID(const char* socketID_string, socketID_handle socketID){
2354

    
2355
        //@TODO add checks against malformed string
2356
        char external_addr[INET_ADDRSTRLEN];
2357
        int external_port;
2358
        char internal_addr[INET_ADDRSTRLEN];
2359
        int internal_port;
2360

    
2361
        char *pch;
2362
        char *s = strdup(socketID_string);
2363

    
2364
        //replace ':' with a blank
2365
        pch=strchr(s,':');
2366
        while (pch!=NULL){
2367
                                *pch = ' ';
2368
                pch=strchr(pch+1,':');
2369
        }
2370
        pch=strchr(s,'-');
2371
        if(pch) *pch = ' ';
2372

    
2373
        sscanf(s,"%s %d %s %d", internal_addr, &internal_port,
2374
                external_addr, &external_port);
2375

    
2376
        //set structure to 0, we initialize each byte, since it will be sent on the net later
2377
        memset(socketID, 0, sizeof(struct _socket_ID));
2378

    
2379
        if(inet_pton(AF_INET, internal_addr, &(socketID->internal_addr.udpaddr.sin_addr)) == 0)
2380
                return EINVAL;
2381
        socketID->internal_addr.udpaddr.sin_family = AF_INET;
2382
        socketID->internal_addr.udpaddr.sin_port = htons(internal_port);
2383

    
2384

    
2385
        if(inet_pton(AF_INET, external_addr, &(socketID->external_addr.udpaddr.sin_addr)) ==0)
2386
                return EINVAL;
2387
        socketID->external_addr.udpaddr.sin_family = AF_INET;
2388
        socketID->external_addr.udpaddr.sin_port = htons(external_port);
2389

    
2390
        free(s);
2391
        return 0;
2392

    
2393
}
2394

    
2395
int mlGetConnectionStatus(int connectionID){
2396

    
2397
        if(connectbuf[connectionID])
2398
                return connectbuf[connectionID]->status == READY;
2399
        return -1;
2400
    
2401
}
2402

    
2403

    
2404
int mlConnectionExist(socketID_handle socketID, bool ready){
2405

    
2406
    /*
2407
     * check if another connection for the external connectionID exist
2408
     * that was established \ within the last 2 seconds
2409
     */
2410
        int i;
2411
        for (i = 0; i < CONNECTBUFSIZE; i++)
2412
                if (connectbuf[i] != NULL)
2413
                        if (mlCompareSocketIDs(&(connectbuf[i]->external_socketID), socketID) == 0) {
2414
                                if (ready) return (connectbuf[i]->status == READY ? i : -1);;
2415
                                return i;
2416
                                }
2417

    
2418
    return -1;
2419

    
2420
}
2421

    
2422
//Added by Robert Birke as comodity functions
2423

    
2424
//int mlPrintSocketID(socketID_handle socketID) {
2425
//        char str[SOCKETID_STRING_SIZE];
2426
//        mlSocketIDToString(socketID, str, sizeof(str));
2427
//        printf(stderr,"int->%s<-ext\n",str);
2428
//}
2429

    
2430
/*
2431
 * hash code of a socketID
2432
 * TODO might think of a better way
2433
 */
2434
int mlHashSocketID(socketID_handle sock) {
2435
        //assert(sock);
2436
   return sock->internal_addr.udpaddr.sin_port +
2437
                        sock->external_addr.udpaddr.sin_port;
2438
}
2439

    
2440
int mlCompareSocketIDs(socketID_handle sock1, socketID_handle sock2) {
2441

    
2442
        assert(sock1 && sock2);
2443

    
2444
        /*
2445
        * compare internal addr
2446
        */
2447
        if(sock1 == NULL || sock2 == NULL)
2448
                return 1;
2449

    
2450
        if (sock1->internal_addr.udpaddr.sin_addr.s_addr !=
2451
            sock2->internal_addr.udpaddr.sin_addr.s_addr)
2452
                        return memcmp(&sock1->internal_addr.udpaddr.sin_addr.s_addr, &sock2->internal_addr.udpaddr.sin_addr.s_addr, sizeof(sock1->internal_addr.udpaddr.sin_addr.s_addr));
2453

    
2454
        if (sock1->internal_addr.udpaddr.sin_port !=
2455
                 sock2->internal_addr.udpaddr.sin_port)
2456
                        return memcmp(&sock1->internal_addr.udpaddr.sin_port, &sock2->internal_addr.udpaddr.sin_port, sizeof(sock1->internal_addr.udpaddr.sin_port));
2457

    
2458
        /*
2459
        * compare external addr
2460
        */
2461
        if (sock1->external_addr.udpaddr.sin_addr.s_addr !=
2462
            sock2->external_addr.udpaddr.sin_addr.s_addr)
2463
                        return memcmp(&sock1->external_addr.udpaddr.sin_addr.s_addr, &sock2->external_addr.udpaddr.sin_addr.s_addr, sizeof(sock1->external_addr.udpaddr.sin_addr.s_addr));
2464

    
2465
        if (sock1->external_addr.udpaddr.sin_port !=
2466
                 sock2->external_addr.udpaddr.sin_port)
2467
                        return memcmp(&sock1->external_addr.udpaddr.sin_port, &sock2->external_addr.udpaddr.sin_port, sizeof(sock1->external_addr.udpaddr.sin_port));
2468

    
2469
        return 0;
2470
}
2471

    
2472
int mlCompareSocketIDsByPort(socketID_handle sock1, socketID_handle sock2)
2473
{
2474
        if(sock1 == NULL || sock2 == NULL)
2475
                return 1;
2476
 
2477
        if (sock1->internal_addr.udpaddr.sin_port !=
2478
                 sock2->internal_addr.udpaddr.sin_port)
2479
                        return 1;
2480

    
2481
        if (sock1->external_addr.udpaddr.sin_port !=
2482
                 sock2->external_addr.udpaddr.sin_port)
2483
                        return 1;
2484
        return 0;
2485
}
2486

    
2487
int mlGetPathMTU(int ConnectionId) {
2488
        if(ConnectionId < 0 || ConnectionId >= CONNECTBUFSIZE)
2489
                return -1;
2490
        if (connectbuf[ConnectionId] != NULL)
2491
                return connectbuf[ConnectionId]->pmtusize;
2492
        return -1;
2493
}
2494

    
2495
/**************************** END of GENERAL functions *************************/
2496

    
2497
/**************************** NAT functions *************************/
2498

    
2499
/* setter  */
2500
void mlSetStunServer(const int port,const char *ipaddr){
2501

    
2502
        stun_server.sin_family = AF_INET;
2503
        if (ipaddr == NULL)
2504
                stun_server.sin_addr.s_addr = htonl(INADDR_NONE);
2505
        else
2506
                stun_server.sin_addr.s_addr = resolve(ipaddr);
2507
        stun_server.sin_port = htons(port);
2508

    
2509
}
2510

    
2511
int mlGetExternalIP(char* external_addr){
2512

    
2513
        socketaddrgen udpgen;
2514
        struct sockaddr_in udpaddr;
2515

    
2516
        udpgen = local_socketID.external_addr;
2517
        udpaddr = udpgen.udpaddr;
2518

    
2519
        inet_ntop(AF_INET, &(udpaddr.sin_addr), external_addr,
2520
                        INET_ADDRSTRLEN);
2521

    
2522
        if (external_addr == NULL) {
2523

    
2524
        return -1;
2525

    
2526
        } else {
2527

    
2528
        return 0;
2529

    
2530
        }
2531

    
2532
}
2533

    
2534
/**************************** END of NAT functions *************************/