Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ 1ef0ee9b

History | View | Annotate | Download (19.7 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#ifndef _WIN32
9
#include <arpa/inet.h>
10
#endif
11
#include <unistd.h>
12
#include <stdlib.h>
13
#include <stdio.h>
14
#include <stdbool.h>
15
#include <string.h>
16
#include <assert.h>
17
#include <signal.h>
18

    
19
#include "net_helper.h"
20
#include "ml.h"
21
#include "config.h"
22

    
23
#include "grapes_msg_types.h"
24

    
25

    
26
#ifdef MONL
27
#include "mon.h"
28
#include "repoclient.h"
29
#endif
30

    
31
#include "napa.h"
32
#include "napa_log.h"
33

    
34
/**
35
 * libevent pointer
36
 */
37
struct event_base *base;
38

    
39
#define NH_BUFFER_SIZE 1000
40
#define NH_LOOKUP_SIZE 1000
41
#define NH_PACKET_TIMEOUT {0, 500*1000}
42
#define NH_ML_INIT_TIMEOUT {1, 0}
43

    
44
#define FDSSIZE 16
45

    
46
#define STUN_SERVER_DEFAULT "77.72.174.163+stun.softjoys.com+stun.ekiga.org"
47
#define STUN_PORT_DEFAULT 3478
48
#define STUN_SERVERS_MAX 32
49
static char *stun_servers[STUN_SERVERS_MAX];
50
static int stun_servers_cnt = 0;
51

    
52
static bool connect_on_know = false;        //whether to try to connect as soon as we get to know a nodeID
53

    
54
static int sIdx = 0;
55
static int rIdxML = 0;        //reveive from ML to this buffer position
56
static int rIdxUp = 0;        //hand up to layer above at this buffer position
57

    
58
typedef struct nodeID {
59
        socketID_handle addr;
60
        int connID;        // connection associated to this node, -1 if myself
61
        int refcnt;
62
#ifdef MONL
63
        //n quick and dirty static vector for measures TODO: make it dinamic
64
        MonHandler mhs[20];
65
        int n_mhs;
66
#endif
67
//        int addrSize;
68
//        int addrStringSize;
69
} nodeID;
70

    
71
typedef struct msgData_cb {
72
        int bIdx;        // index of the message in the proper buffer
73
        unsigned char msgType; // message type
74
        int mSize;        // message size
75
        bool conn_cb_called;
76
        bool cancelled;
77
} msgData_cb;
78

    
79
static struct nodeID **lookup_array;
80
static int lookup_max = NH_LOOKUP_SIZE;
81
static int lookup_curr = 0;
82

    
83
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
84
static int timeoutFired = 0;
85
static bool fdTriggered = false;
86

    
87
// pointers to the msgs to be send
88
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
89
// pointers to the received msgs + sender nodeID
90
struct receivedB {
91
        struct nodeID *id;
92
        int len;
93
        uint8_t *data;
94
};
95
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
96
/**/ static int recv_counter =0;
97

    
98

    
99
static void connReady_cb (int connectionID, void *arg);
100
static struct nodeID *new_node(socketID_handle peer) {
101
        send_params params = {0,0,0,0};
102
        struct nodeID *res = malloc(sizeof(struct nodeID));
103
        if (!res) {
104
                 fprintf(stderr, "Net-helper : memory error\n");
105
                 return NULL;
106
        }
107
        memset(res, 0, sizeof(struct nodeID));
108

    
109
        res->addr = malloc(SOCKETID_SIZE);
110
        if (! res->addr) {
111
                free (res);
112
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
113
                return NULL;
114
        }
115
        memset(res->addr, 0, SOCKETID_SIZE);
116
        memcpy(res->addr, peer ,SOCKETID_SIZE);
117

    
118
        res->refcnt = 1;
119

    
120
        if (connect_on_know) {
121
                res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
122
        }
123

    
124
        return res;
125
}
126

    
127

    
128
static struct nodeID **id_lookup(socketID_handle target) {
129

    
130
        int i,here=-1;
131
        for (i=0;i<lookup_curr;i++) {
132
                if (lookup_array[i] == NULL) {
133
                        if (here < 0) {
134
                                here = i;
135
                        }
136
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
137
                        return &lookup_array[i];
138
                }
139
        }
140

    
141
        if (here == -1) {
142
                here = lookup_curr++;
143
        }
144

    
145
        if (lookup_curr > lookup_max) {
146
                lookup_max *= 2;
147
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
148
        }
149

    
150
        lookup_array[here] = new_node(target);
151
        return &lookup_array[here];
152
}
153

    
154
static struct nodeID *id_lookup_dup(socketID_handle target) {
155
        return nodeid_dup(*id_lookup(target));
156
}
157

    
158

    
159
/**
160
 * Look for a free slot in the received buffer and allocates it for immediate use
161
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
162
 */
163
static int next_R() {
164
        if (receivedBuffer[rIdxML].data==NULL) {
165
                int ret = rIdxML;
166
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
167
                return ret;
168
        } else {
169
                //TODO: handle receive overload situation!
170
                return -1;
171
        }
172
}
173

    
174
/**
175
 * Look for a free slot in the sending buffer and allocates it for immediate use
176
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
177
 */
178
static int next_S() {
179
        if (sendingBuffer[sIdx]) {
180
                int count;
181
                for (count=0;count<NH_BUFFER_SIZE;count++) {
182
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
183
                        if (sendingBuffer[sIdx]==NULL)
184
                                break;
185
                }
186
                if (count==NH_BUFFER_SIZE) {
187
                        return -1;
188
                }
189
        }
190
        return sIdx;
191
}
192

    
193

    
194
/**
195
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
196
 * @param local_socketID
197
 * @param errorstatus
198
 */
199
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
200
        static int stun_retry_cnt = 0;
201
        int stun_retries = 1;        //set number of retries (0: no retry)
202
        char *stun_server;
203
        char *c;
204
        int stun_port;
205

    
206
        switch (errorstatus) {
207
        case 0:
208
                me->addr = malloc(SOCKETID_SIZE);
209
                if (! me->addr) {
210
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
211
                        return;
212
                }
213

    
214
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
215
        //        me->addrSize = SOCKETID_SIZE;
216
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
217
                me->connID = -1;
218
                me->refcnt = 1;
219
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
220
                break;
221
        case -1:
222
                //
223
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
224
                exit(1);
225
                break;
226
        case 1:
227
                //
228
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
229
                exit(1);
230
                break;
231
        case 2:
232
                fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
233

    
234
                if (stun_servers[stun_servers_cnt+1]) {
235
                        stun_servers_cnt++;
236
                } else {
237
                        stun_servers_cnt = 0;
238
                        stun_retry_cnt++;
239
                }
240
                stun_server = strdup(stun_servers[stun_servers_cnt]);
241

    
242
                if ((c = strchr(stun_server,':'))) {
243
                        *c = 0;
244
                        stun_port = atoi(c+1);
245
                } else {
246
                        stun_port = STUN_PORT_DEFAULT;
247
                }
248

    
249
                //fprintf(stderr, "STUN server: %s:%d\n", stun_server, stun_port);
250

    
251
                if (stun_retry_cnt > stun_retries) {
252
                        fprintf(stderr,"Net-helper init : Retrying without STUN\n");
253
                        mlSetStunServer(0,NULL);
254
                } else {
255
                        mlSetStunServer(stun_port, stun_server);
256
                }
257
                free(stun_server);
258
                break;
259
        default :        // should never happen
260
                //
261
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
262
        }
263

    
264
}
265

    
266
/**
267
 * Timeout callback to be set in the eventlib loop as needed
268
 * @param socket
269
 * @param flag
270
 * @param arg
271
 */
272
static void t_out_cb (int socket, short flag, void* arg) {
273

    
274
        timeoutFired = 1;
275
//        fprintf(stderr,"TIMEOUT!!!\n");
276
//        event_base_loopbreak(base);
277
}
278

    
279
/**
280
 * File descriptor readable callback to be set in the eventlib loop as needed
281
 */
282
static void fd_cb (int fd, short flag, void* arg)
283
{
284
  //fprintf(stderr, "\twait4data: fd %d triggered\n", fd);
285
  fdTriggered = true;
286
  *((bool*)arg) = true;
287
}
288

    
289
/**
290
 * Callback called by ml when a remote node ask for a connection
291
 * @param connectionID
292
 * @param arg
293
 */
294
static void receive_conn_cb(int connectionID, void *arg) {
295
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
296

    
297
}
298

    
299
void free_sending_buffer(int i)
300
{
301
        free(sendingBuffer[i]);
302
        sendingBuffer[i] = NULL;
303
}
304

    
305
/**
306
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
307
 * @param connectionID
308
 * @param arg
309
 */
310
static void connReady_cb (int connectionID, void *arg) {
311

    
312
        msgData_cb *p;
313
        p = (msgData_cb *)arg;
314
        if (p == NULL) return;
315
        if (p->cancelled) {
316
            free(p);
317
            return;
318
        }
319
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
320
        free_sending_buffer(p->bIdx);
321
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
322
        //        event_base_loopbreak(base);
323
        p->conn_cb_called = true;
324
}
325

    
326
/**
327
 * Callback called by ml when a connection error occurs
328
 * @param connectionID
329
 * @param arg
330
 */
331
static void connError_cb (int connectionID, void *arg) {
332
        // simply get rid of the msg in the buffer....
333
        msgData_cb *p;
334
        p = (msgData_cb *)arg;
335
        if (p != NULL) {
336
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
337
                if (p->cancelled) {
338
                        free(p);//p->mSize = -1;
339
                } else {
340
                        p->conn_cb_called = true;
341
                }
342
        }
343
        //        event_base_loopbreak(base);
344
}
345

    
346

    
347
/**
348
 * Callback to receive data from ml
349
 * @param buffer
350
 * @param buflen
351
 * @param msgtype
352
 * @param arg
353
 */
354
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
355
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
356
        char str[SOCKETID_STRING_SIZE];
357
//        fprintf(stderr, "Net-helper : called back with some news...\n");
358
/**/ ++recv_counter;
359
        if (arg->remote_socketID != NULL)
360
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
361
        else
362
                sprintf(str,"!Unknown!");
363
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
364
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
365
            fprintf(stderr, "\tMessage # %d -- Missing # %d bytes%s\n",
366
                        recv_counter, arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
367
        }
368
        else {
369
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
370
                // buffering the received message only if possible, otherwise ignore it...
371
                int index = next_R();
372
                if (index<0) {
373
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
374
                        return;
375
                } else {
376
                        receivedBuffer[index].data = malloc(buflen);
377
                        if (receivedBuffer[index].data == NULL) {
378
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
379
                                return;
380
                        }
381
                        receivedBuffer[index].len = buflen;
382
                        memcpy(receivedBuffer[index].data,buffer,buflen);
383
                          // save the socketID of the sender
384
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
385
                }
386
  }
387
//        event_base_loopbreak(base);
388
}
389

    
390
struct nodeID *net_helper_init(const char *IPaddr, int port, const char *config) {
391

    
392
        struct timeval tout = NH_ML_INIT_TIMEOUT;
393
        int s, i;
394
        struct tag *cfg_tags;
395
        const char *res;
396
        char *stun_server_str;
397
        char *stun_server;
398
        int stun_port;
399
        char *c;
400
        const char *repo_address = NULL;
401
        int publish_interval = 60;
402

    
403
        int verbosity = DCLOG_ERROR;
404

    
405
        int bucketsize = 80000; /* this allows a burst of 80000 Bytes [Bytes] */
406
        int rate = 10000000; /* 10Mbit/s [bits/s]*/
407
        int queuesize = 1000000; /* up to 1MB of data will be stored in the shaper transmission queue [Bytes]*/
408
        int RTXqueuesize = 1000000; /* up to 1 MB of data will be stored in the shaper retransmission queue [Bytes] */
409
        double RTXholtdingtime = 1.0; /* [seconds] */
410

    
411
#ifndef _WIN32
412
        signal(SIGPIPE, SIG_IGN); // workaround for a known issue in libevent2 with SIGPIPE on TPC connections
413
#endif
414
        base = event_base_new();
415
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
416

    
417
        cfg_tags = config_parse(config);
418
        if (!cfg_tags) {
419
                return NULL;
420
        }
421

    
422
        stun_server_str = config_value_str_default(cfg_tags, "stun_server", STUN_SERVER_DEFAULT);
423

    
424
        res = config_value_str(cfg_tags, "repo_address");
425
        if (res) {
426
                repo_address = res;
427
        }
428
        
429
        config_value_int(cfg_tags, "publish_interval", &publish_interval);
430

    
431
        config_value_int(cfg_tags, "verbosity", &verbosity);
432

    
433
        config_value_int(cfg_tags, "bucketsize", &bucketsize);
434
        config_value_int(cfg_tags, "rate", &rate);
435
        config_value_int(cfg_tags, "queuesize", &queuesize);
436
        config_value_int(cfg_tags, "RTXqueuesize", &RTXqueuesize);
437
        config_value_double(cfg_tags, "RTXholtdingtime", &RTXholtdingtime);
438

    
439
        me = malloc(sizeof(nodeID));
440
        if (me == NULL) {
441
                return NULL;
442
        }
443
        memset(me,0,sizeof(nodeID));
444
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
445
        me->refcnt = 1;
446

    
447
        for (i=0;i<NH_BUFFER_SIZE;i++) {
448
                sendingBuffer[i] = NULL;
449
                receivedBuffer[i].data = NULL;
450
        }
451

    
452
        mlRegisterErrorConnectionCb(&connError_cb);
453
        mlRegisterRecvConnectionCb(&receive_conn_cb);
454

    
455
        for (i = 1, stun_servers[0] = strdup(stun_server_str); i<STUN_SERVERS_MAX ; i++) {
456
                char *next = strchr(stun_servers[i-1], '+');
457
                if (next) {
458
                        *next = 0;
459
                        stun_servers[i] = strdup(next+1);
460
                } else {
461
                        break;
462
                }
463
        }
464

    
465
        stun_servers_cnt = 0;
466
        stun_server = strdup(stun_servers[stun_servers_cnt]);
467
        if ((c = strchr(stun_server,':'))) {
468
                *c = 0;
469
                stun_port = atoi(c+1);
470
        } else {
471
                stun_port = STUN_PORT_DEFAULT;
472
        }
473

    
474
        //fprintf(stderr, "STUN server: %s:%d\n", stun_server, stun_port);
475

    
476
        s = mlInit(1, tout, port, IPaddr, stun_port, stun_server, &init_myNodeID_cb, base);
477
        if (s < 0) {
478
                fprintf(stderr, "Net-helper : error initializing ML!\n");
479
                free(me);
480
                free(stun_server);
481
                return NULL;
482
        }
483

    
484
        mlSetVerbosity(verbosity);
485

    
486
        mlSetRateLimiterParams(bucketsize, rate, queuesize, RTXqueuesize, RTXholtdingtime);
487

    
488
#ifdef MONL
489
{
490
        void *repoclient;
491
        eventbase = base;
492

    
493
        // Initialize logging
494
        napaInitLog(verbosity, NULL, NULL);
495

    
496
        repInit("");
497
        repoclient = repOpen(repo_address, publish_interval);        //repository.napa-wine.eu
498
        // NULL is inow valid for disabled repo 
499
        // if (repoclient == NULL) fatal("Unable to initialize repoclient");
500
        monInit(base, repoclient);
501
}
502
#endif
503
        free(cfg_tags);
504

    
505
        while (me->connID<-1) {
506
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
507
                event_base_loop(base,EVLOOP_ONCE);
508
        }
509
        timeoutFired = 0;
510
//        fprintf(stderr,"Net-helper init : back from init!\n");
511

    
512
        free(stun_server);
513
        return me;
514
}
515

    
516

    
517
void bind_msg_type (unsigned char msgtype) {
518

    
519
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
520
}
521

    
522

    
523
void send_to_peer_cb(int fd, short event, void *arg)
524
{
525
        msgData_cb *p = (msgData_cb *) arg;
526
        if (p->conn_cb_called) {
527
                free(p);
528
        }
529
        else { //don't send it anymore
530
                free_sending_buffer(p->bIdx);
531
                p->cancelled = true;
532
                // don't free p, the other timeout will do it
533
        }
534
}
535

    
536
/**
537
 * Called by the application to send data to a remote peer
538
 * @param from
539
 * @param to
540
 * @param buffer_ptr
541
 * @param buffer_size
542
 * @return The dimension of the buffer or -1 if a connection error occurred.
543
 */
544
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
545
{
546
        msgData_cb *p;
547
        int current;
548
        send_params params = {0,0,0,0};
549

    
550
        if (buffer_size <= 0) {
551
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
552
                return buffer_size;
553
        }
554

    
555
        // if buffer is full, discard the message and return an error flag
556
        int index = next_S();
557
        if (index<0) {
558
                // free(buffer_ptr);
559
                fprintf(stderr,"Net-helper: send buffer full\n ");
560
                return -1;
561
        }
562
        sendingBuffer[index] = malloc(buffer_size);
563
        if (! sendingBuffer[index]){
564
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
565
                return -1;
566
        }
567
        memset(sendingBuffer[index],0,buffer_size);
568
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
569
        // free(buffer_ptr);
570
        p = malloc(sizeof(msgData_cb));
571
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
572
        current = p->bIdx;
573

    
574
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
575
        if (to->connID<0) {
576
                free_sending_buffer(current);
577
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
578
                free(p);
579
                return -1;
580
        }
581
        else {
582
                struct timeval timeout = NH_PACKET_TIMEOUT;
583
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
584
                return buffer_size; //p->mSize;
585
        }
586

    
587
}
588

    
589

    
590
/**
591
 * Called by an application to receive data from remote peers
592
 * @param local
593
 * @param remote
594
 * @param buffer_ptr
595
 * @param buffer_size
596
 * @return The number of received bytes or -1 if some error occurred.
597
 */
598
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
599
{
600
        int size;
601
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
602
                wait4data(local, NULL, NULL);
603
        }
604

    
605
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
606

    
607
        (*remote) = receivedBuffer[rIdxUp].id;
608
        // retrieve a msg from the buffer
609
        size = receivedBuffer[rIdxUp].len;
610
        if (size>buffer_size) {
611
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
612
                return -1;
613
        }
614
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
615
        free(receivedBuffer[rIdxUp].data);
616
        receivedBuffer[rIdxUp].data = NULL;
617
        receivedBuffer[rIdxUp].id = NULL;
618

    
619
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
620

    
621
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
622

    
623
        return size;
624
}
625

    
626

    
627
int wait4data(const struct nodeID *n, struct timeval *tout, int *fds) {
628

    
629
        struct event *timeout_ev = NULL;
630
        struct event *fd_ev[FDSSIZE];
631
        bool fd_triggered[FDSSIZE] = { false };
632
        int i;
633

    
634
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
635
        if (tout) {        //if tout==NULL, loop wait infinitely
636
          timeout_ev = event_new(base, -1, EV_TIMEOUT, &t_out_cb, NULL);
637
          event_add(timeout_ev, tout);
638
        }
639
        for (i = 0; fds && fds[i] != -1; i ++) {
640
          if (i >= FDSSIZE) {
641
            fprintf(stderr, "Can't listen on more than %d file descriptors!\n", FDSSIZE);
642
            break;
643
          }
644
          fd_ev[i] = event_new(base, fds[i], EV_READ, &fd_cb, &fd_triggered[i]);
645
          event_add(fd_ev[i], NULL);
646
        }
647

    
648
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0 && fdTriggered==0) {
649
        //        event_base_dispatch(base);
650
                event_base_loop(base,EVLOOP_ONCE);
651
        }
652

    
653
        //delete one-time events
654
        if (timeout_ev) {
655
          if (!timeoutFired) event_del(timeout_ev);
656
          event_free(timeout_ev);
657
        }
658
        for (i = 0; fds && fds[i] != -1; i ++) {
659
          if (! fd_triggered[i]) {
660
            fds[i] = -2;
661
            event_del(fd_ev[i]);
662
          //} else {
663
            //fprintf(stderr, "\twait4data: fd %d triggered\n", fds[i]);
664
          }
665
          event_free(fd_ev[i]);
666
        }
667

    
668
        if (fdTriggered) {
669
          fdTriggered = false;
670
          //fprintf(stderr, "\twait4data: fd event\n");
671
          return 2;
672
        } else if (timeoutFired) {
673
          timeoutFired = 0;
674
          //fprintf(stderr, "\twait4data: timed out\n");
675
          return 0;
676
        } else if (receivedBuffer[rIdxUp].data!=NULL) {
677
          //fprintf(stderr, "\twait4data: ML receive\n");
678
          return 1;
679
        } else {
680
          fprintf(stderr, "BUG in wait4data\n");
681
          exit(EXIT_FAILURE);
682
        }
683
}
684

    
685
socketID_handle getRemoteSocketID(const char *ip, int port) {
686
        char str[SOCKETID_STRING_SIZE];
687
        socketID_handle h;
688

    
689
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
690
        h = malloc(SOCKETID_SIZE);
691
        mlStringToSocketID(str, h);
692

    
693
        return h;
694
}
695

    
696
struct nodeID *create_node(const char *rem_IP, int rem_port) {
697
        socketID_handle s;
698
        struct nodeID *remote;
699

    
700
        s = getRemoteSocketID(rem_IP, rem_port);
701
        remote = id_lookup_dup(s);
702
        free(s);
703

    
704
        return remote;
705
}
706

    
707
int node_ip(const struct nodeID *s, char *ip, int size) {
708
        int len;
709
        const char *start, *end;
710
        char tmp[256];
711

    
712
        node_addr(s, tmp, 256);
713

    
714
        start = strstr(tmp, "-") + 1;
715
        end = strstr(start, ":");
716
        len = end - start;
717
        if (len >= size) {
718
                return -1;
719
        }
720
        memcpy(ip, start, len);
721
        ip[len] = 0;
722

    
723
        return 1;
724
}
725

    
726
// TODO: check why closing the connection is annoying for the ML
727
void nodeid_free(struct nodeID *n) {
728
        if (n && (--(n->refcnt) == 1)) {
729
/*
730
                struct nodeID **npos;
731
        //        mlCloseConnection(n->connID);
732
                npos = id_lookup(n->addr);
733
                *npos = NULL;
734
                mlCloseSocket(n->addr);
735
                free(n);
736
*/
737
        }
738
}
739

    
740

    
741
int node_addr(const struct nodeID *s, char *addr, int len)
742
{
743
  // TODO: mlSocketIDToString always return 0 !!!
744
  int r = mlSocketIDToString(s->addr,addr,len);
745
  if (!r)
746
          return 1;
747
  else
748
          return -1;
749
}
750

    
751
struct nodeID *nodeid_dup(struct nodeID *s)
752
{
753
        s->refcnt++;
754
        return s;
755
}
756

    
757
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
758
{
759
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
760
}
761

    
762
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
763
{
764
        return mlCompareSocketIDs(s1->addr,s2->addr);
765
}
766

    
767
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
768
{
769
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
770

    
771
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
772
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
773

    
774
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
775
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
776

    
777
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
778
}
779

    
780
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
781
{
782
  uint8_t sid[SOCKETID_SIZE];
783
  socketID_handle h = (socketID_handle) sid;
784
  mlStringToSocketID((char *)b,h);
785
  *len = strlen((char*)b) + 1;
786
  return id_lookup_dup(h);
787
}