Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ 215cedbd

History | View | Annotate | Download (18.6 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#ifndef _WIN32
9
#include <arpa/inet.h>
10
#endif
11
#include <unistd.h>
12
#include <stdlib.h>
13
#include <stdio.h>
14
#include <stdbool.h>
15
#include <string.h>
16
#include <assert.h>
17
#include <signal.h>
18

    
19
#include "net_helper.h"
20
#include "ml.h"
21
#include "config.h"
22

    
23
#include "grapes_msg_types.h"
24

    
25

    
26
#ifdef MONL
27
#include "mon.h"
28
#include "repoclient.h"
29
#endif
30

    
31
#include "napa.h"
32
#include "napa_log.h"
33

    
34
/**
35
 * libevent pointer
36
 */
37
struct event_base *base;
38

    
39
#define NH_BUFFER_SIZE 1000
40
#define NH_LOOKUP_SIZE 1000
41
#define NH_PACKET_TIMEOUT {0, 500*1000}
42
#define NH_ML_INIT_TIMEOUT {1, 0}
43

    
44
#define FDSSIZE 16
45

    
46
static bool connect_on_know = false;        //whether to try to connect as soon as we get to know a nodeID
47

    
48
static int sIdx = 0;
49
static int rIdxML = 0;        //reveive from ML to this buffer position
50
static int rIdxUp = 0;        //hand up to layer above at this buffer position
51

    
52
typedef struct nodeID {
53
        socketID_handle addr;
54
        int connID;        // connection associated to this node, -1 if myself
55
        int refcnt;
56
#ifdef MONL
57
        //n quick and dirty static vector for measures TODO: make it dinamic
58
        MonHandler mhs[20];
59
        int n_mhs;
60
#endif
61
//        int addrSize;
62
//        int addrStringSize;
63
} nodeID;
64

    
65
typedef struct msgData_cb {
66
        int bIdx;        // index of the message in the proper buffer
67
        unsigned char msgType; // message type
68
        int mSize;        // message size
69
        bool conn_cb_called;
70
        bool cancelled;
71
} msgData_cb;
72

    
73
static struct nodeID **lookup_array;
74
static int lookup_max = NH_LOOKUP_SIZE;
75
static int lookup_curr = 0;
76

    
77
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
78
static int timeoutFired = 0;
79
static bool fdTriggered = false;
80

    
81
// pointers to the msgs to be send
82
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
83
// pointers to the received msgs + sender nodeID
84
struct receivedB {
85
        struct nodeID *id;
86
        int len;
87
        uint8_t *data;
88
};
89
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
90
/**/ static int recv_counter =0;
91

    
92

    
93
static void connReady_cb (int connectionID, void *arg);
94
static struct nodeID *new_node(socketID_handle peer) {
95
        send_params params = {0,0,0,0};
96
        struct nodeID *res = malloc(sizeof(struct nodeID));
97
        if (!res) {
98
                 fprintf(stderr, "Net-helper : memory error\n");
99
                 return NULL;
100
        }
101
        memset(res, 0, sizeof(struct nodeID));
102

    
103
        res->addr = malloc(SOCKETID_SIZE);
104
        if (! res->addr) {
105
                free (res);
106
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
107
                return NULL;
108
        }
109
        memset(res->addr, 0, SOCKETID_SIZE);
110
        memcpy(res->addr, peer ,SOCKETID_SIZE);
111

    
112
        res->refcnt = 1;
113

    
114
        if (connect_on_know) {
115
                res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
116
        }
117

    
118
        return res;
119
}
120

    
121

    
122
static struct nodeID **id_lookup(socketID_handle target) {
123

    
124
        int i,here=-1;
125
        for (i=0;i<lookup_curr;i++) {
126
                if (lookup_array[i] == NULL) {
127
                        if (here < 0) {
128
                                here = i;
129
                        }
130
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
131
                        return &lookup_array[i];
132
                }
133
        }
134

    
135
        if (here == -1) {
136
                here = lookup_curr++;
137
        }
138

    
139
        if (lookup_curr > lookup_max) {
140
                lookup_max *= 2;
141
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
142
        }
143

    
144
        lookup_array[here] = new_node(target);
145
        return &lookup_array[here];
146
}
147

    
148
static struct nodeID *id_lookup_dup(socketID_handle target) {
149
        return nodeid_dup(*id_lookup(target));
150
}
151

    
152

    
153
/**
154
 * Look for a free slot in the received buffer and allocates it for immediate use
155
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
156
 */
157
static int next_R() {
158
        if (receivedBuffer[rIdxML].data==NULL) {
159
                int ret = rIdxML;
160
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
161
                return ret;
162
        } else {
163
                //TODO: handle receive overload situation!
164
                return -1;
165
        }
166
}
167

    
168
/**
169
 * Look for a free slot in the sending buffer and allocates it for immediate use
170
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
171
 */
172
static int next_S() {
173
        if (sendingBuffer[sIdx]) {
174
                int count;
175
                for (count=0;count<NH_BUFFER_SIZE;count++) {
176
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
177
                        if (sendingBuffer[sIdx]==NULL)
178
                                break;
179
                }
180
                if (count==NH_BUFFER_SIZE) {
181
                        return -1;
182
                }
183
        }
184
        return sIdx;
185
}
186

    
187

    
188
/**
189
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
190
 * @param local_socketID
191
 * @param errorstatus
192
 */
193
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
194
        static int stun_retry_cnt = 0;
195
        int stun_retries = 2;        //set number of retries (0: no retry)
196

    
197
        switch (errorstatus) {
198
        case 0:
199
                me->addr = malloc(SOCKETID_SIZE);
200
                if (! me->addr) {
201
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
202
                        return;
203
                }
204

    
205
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
206
        //        me->addrSize = SOCKETID_SIZE;
207
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
208
                me->connID = -1;
209
                me->refcnt = 1;
210
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
211
                break;
212
        case -1:
213
                //
214
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
215
                exit(1);
216
                break;
217
        case 1:
218
                //
219
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
220
                exit(1);
221
                break;
222
        case 2:
223
                fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
224
                if (++stun_retry_cnt > stun_retries) {
225
                        fprintf(stderr,"Net-helper init : Retrying without STUN\n");
226
                        mlSetStunServer(0,NULL);
227
                }
228
            break;
229
        default :        // should never happen
230
                //
231
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
232
        }
233

    
234
}
235

    
236
/**
237
 * Timeout callback to be set in the eventlib loop as needed
238
 * @param socket
239
 * @param flag
240
 * @param arg
241
 */
242
static void t_out_cb (int socket, short flag, void* arg) {
243

    
244
        timeoutFired = 1;
245
//        fprintf(stderr,"TIMEOUT!!!\n");
246
//        event_base_loopbreak(base);
247
}
248

    
249
/**
250
 * File descriptor readable callback to be set in the eventlib loop as needed
251
 */
252
static void fd_cb (int fd, short flag, void* arg)
253
{
254
  //fprintf(stderr, "\twait4data: fd %d triggered\n", fd);
255
  fdTriggered = true;
256
  *((bool*)arg) = true;
257
}
258

    
259
/**
260
 * Callback called by ml when a remote node ask for a connection
261
 * @param connectionID
262
 * @param arg
263
 */
264
static void receive_conn_cb(int connectionID, void *arg) {
265
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
266

    
267
}
268

    
269
void free_sending_buffer(int i)
270
{
271
        free(sendingBuffer[i]);
272
        sendingBuffer[i] = NULL;
273
}
274

    
275
/**
276
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
277
 * @param connectionID
278
 * @param arg
279
 */
280
static void connReady_cb (int connectionID, void *arg) {
281

    
282
        msgData_cb *p;
283
        p = (msgData_cb *)arg;
284
        if (p == NULL) return;
285
        if (p->cancelled) {
286
            free(p);
287
            return;
288
        }
289
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
290
        free_sending_buffer(p->bIdx);
291
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
292
        //        event_base_loopbreak(base);
293
        p->conn_cb_called = true;
294
}
295

    
296
/**
297
 * Callback called by ml when a connection error occurs
298
 * @param connectionID
299
 * @param arg
300
 */
301
static void connError_cb (int connectionID, void *arg) {
302
        // simply get rid of the msg in the buffer....
303
        msgData_cb *p;
304
        p = (msgData_cb *)arg;
305
        if (p != NULL) {
306
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
307
                if (p->cancelled) {
308
                        free(p);//p->mSize = -1;
309
                } else {
310
                        p->conn_cb_called = true;
311
                }
312
        }
313
        //        event_base_loopbreak(base);
314
}
315

    
316

    
317
/**
318
 * Callback to receive data from ml
319
 * @param buffer
320
 * @param buflen
321
 * @param msgtype
322
 * @param arg
323
 */
324
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
325
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
326
        char str[SOCKETID_STRING_SIZE];
327
//        fprintf(stderr, "Net-helper : called back with some news...\n");
328
/**/ ++recv_counter;
329
        if (arg->remote_socketID != NULL)
330
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
331
        else
332
                sprintf(str,"!Unknown!");
333
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
334
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
335
            fprintf(stderr, "\tMessage # %d -- Missing # %d bytes%s\n",
336
                        recv_counter, arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
337
        }
338
        else {
339
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
340
                // buffering the received message only if possible, otherwise ignore it...
341
                int index = next_R();
342
                if (index<0) {
343
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
344
                        return;
345
                } else {
346
                        receivedBuffer[index].data = malloc(buflen);
347
                        if (receivedBuffer[index].data == NULL) {
348
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
349
                                return;
350
                        }
351
                        receivedBuffer[index].len = buflen;
352
                        memcpy(receivedBuffer[index].data,buffer,buflen);
353
                          // save the socketID of the sender
354
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
355
                }
356
  }
357
//        event_base_loopbreak(base);
358
}
359

    
360

    
361
struct nodeID *net_helper_init(const char *IPaddr, int port, const char *config) {
362

    
363
        struct timeval tout = NH_ML_INIT_TIMEOUT;
364
        int s, i;
365
        struct tag *cfg_tags;
366
        const char *res;
367
        const char *stun_server = "130.192.9.140";        //rucola.polito.it
368
        int stun_port = 3478;
369
        const char *repo_address = NULL;
370
        int publish_interval = 60;
371

    
372
        int verbosity = DCLOG_ERROR;
373

    
374
        int bucketsize = 80000; /* this allows a burst of 80000 Bytes [Bytes] */
375
        int rate = 10000000; /* 10Mbit/s [bits/s]*/
376
        int queuesize = 1000000; /* up to 1MB of data will be stored in the shaper transmission queue [Bytes]*/
377
        int RTXqueuesize = 1000000; /* up to 1 MB of data will be stored in the shaper retransmission queue [Bytes] */
378
        double RTXholtdingtime = 1.0; /* [seconds] */
379

    
380
#ifndef _WIN32
381
        signal(SIGPIPE, SIG_IGN); // workaround for a known issue in libevent2 with SIGPIPE on TPC connections
382
#endif
383
        base = event_base_new();
384
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
385

    
386
        cfg_tags = config_parse(config);
387
        if (!cfg_tags) {
388
                return NULL;
389
        }
390

    
391
        res = config_value_str(cfg_tags, "stun_server");
392
        if (res) {
393
                stun_server = res;
394
        }
395
        config_value_int(cfg_tags, "stun_port", &stun_port);
396

    
397
        res = config_value_str(cfg_tags, "repo_address");
398
        if (res) {
399
                repo_address = res;
400
        }
401
        
402
        config_value_int(cfg_tags, "publish_interval", &publish_interval);
403

    
404
        config_value_int(cfg_tags, "verbosity", &verbosity);
405

    
406
        config_value_int(cfg_tags, "bucketsize", &bucketsize);
407
        config_value_int(cfg_tags, "rate", &rate);
408
        config_value_int(cfg_tags, "queuesize", &queuesize);
409
        config_value_int(cfg_tags, "RTXqueuesize", &RTXqueuesize);
410
        config_value_double(cfg_tags, "RTXholtdingtime", &RTXholtdingtime);
411

    
412
        me = malloc(sizeof(nodeID));
413
        if (me == NULL) {
414
                return NULL;
415
        }
416
        memset(me,0,sizeof(nodeID));
417
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
418
        me->refcnt = 1;
419

    
420
        for (i=0;i<NH_BUFFER_SIZE;i++) {
421
                sendingBuffer[i] = NULL;
422
                receivedBuffer[i].data = NULL;
423
        }
424

    
425
        mlRegisterErrorConnectionCb(&connError_cb);
426
        mlRegisterRecvConnectionCb(&receive_conn_cb);
427
        s = mlInit(1, tout, port, IPaddr, stun_port, stun_server, &init_myNodeID_cb, base);
428
        if (s < 0) {
429
                fprintf(stderr, "Net-helper : error initializing ML!\n");
430
                free(me);
431
                return NULL;
432
        }
433

    
434
        mlSetVerbosity(verbosity);
435

    
436
        mlSetRateLimiterParams(bucketsize, rate, queuesize, RTXqueuesize, RTXholtdingtime);
437

    
438
#ifdef MONL
439
{
440
        void *repoclient;
441
        eventbase = base;
442

    
443
        // Initialize logging
444
        napaInitLog(verbosity, NULL, NULL);
445

    
446
        repInit("");
447
        repoclient = repOpen(repo_address, publish_interval);        //repository.napa-wine.eu
448
        // NULL is inow valid for disabled repo 
449
        // if (repoclient == NULL) fatal("Unable to initialize repoclient");
450
        monInit(base, repoclient);
451
}
452
#endif
453
        free(cfg_tags);
454

    
455
        while (me->connID<-1) {
456
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
457
                event_base_loop(base,EVLOOP_ONCE);
458
        }
459
        timeoutFired = 0;
460
//        fprintf(stderr,"Net-helper init : back from init!\n");
461

    
462
        return me;
463
}
464

    
465

    
466
void bind_msg_type (unsigned char msgtype) {
467

    
468
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
469
}
470

    
471

    
472
void send_to_peer_cb(int fd, short event, void *arg)
473
{
474
        msgData_cb *p = (msgData_cb *) arg;
475
        if (p->conn_cb_called) {
476
                free(p);
477
        }
478
        else { //don't send it anymore
479
                free_sending_buffer(p->bIdx);
480
                p->cancelled = true;
481
                // don't free p, the other timeout will do it
482
        }
483
}
484

    
485
/**
486
 * Called by the application to send data to a remote peer
487
 * @param from
488
 * @param to
489
 * @param buffer_ptr
490
 * @param buffer_size
491
 * @return The dimension of the buffer or -1 if a connection error occurred.
492
 */
493
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
494
{
495
        msgData_cb *p;
496
        int current;
497
        send_params params = {0,0,0,0};
498

    
499
        if (buffer_size <= 0) {
500
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
501
                return buffer_size;
502
        }
503

    
504
        // if buffer is full, discard the message and return an error flag
505
        int index = next_S();
506
        if (index<0) {
507
                // free(buffer_ptr);
508
                fprintf(stderr,"Net-helper: send buffer full\n ");
509
                return -1;
510
        }
511
        sendingBuffer[index] = malloc(buffer_size);
512
        if (! sendingBuffer[index]){
513
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
514
                return -1;
515
        }
516
        memset(sendingBuffer[index],0,buffer_size);
517
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
518
        // free(buffer_ptr);
519
        p = malloc(sizeof(msgData_cb));
520
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
521
        current = p->bIdx;
522

    
523
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
524
        if (to->connID<0) {
525
                free_sending_buffer(current);
526
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
527
                free(p);
528
                return -1;
529
        }
530
        else {
531
                struct timeval timeout = NH_PACKET_TIMEOUT;
532
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
533
                return buffer_size; //p->mSize;
534
        }
535

    
536
}
537

    
538

    
539
/**
540
 * Called by an application to receive data from remote peers
541
 * @param local
542
 * @param remote
543
 * @param buffer_ptr
544
 * @param buffer_size
545
 * @return The number of received bytes or -1 if some error occurred.
546
 */
547
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
548
{
549
        int size;
550
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
551
                wait4data(local, NULL, NULL);
552
        }
553

    
554
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
555

    
556
        (*remote) = receivedBuffer[rIdxUp].id;
557
        // retrieve a msg from the buffer
558
        size = receivedBuffer[rIdxUp].len;
559
        if (size>buffer_size) {
560
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
561
                return -1;
562
        }
563
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
564
        free(receivedBuffer[rIdxUp].data);
565
        receivedBuffer[rIdxUp].data = NULL;
566
        receivedBuffer[rIdxUp].id = NULL;
567

    
568
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
569

    
570
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
571

    
572
        return size;
573
}
574

    
575

    
576
int wait4data(const struct nodeID *n, struct timeval *tout, int *fds) {
577

    
578
        struct event *timeout_ev = NULL;
579
        struct event *fd_ev[FDSSIZE];
580
        bool fd_triggered[FDSSIZE] = { false };
581
        int i;
582

    
583
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
584
        if (tout) {        //if tout==NULL, loop wait infinitely
585
          timeout_ev = event_new(base, -1, EV_TIMEOUT, &t_out_cb, NULL);
586
          event_add(timeout_ev, tout);
587
        }
588
        for (i = 0; fds && fds[i] != -1; i ++) {
589
          if (i >= FDSSIZE) {
590
            fprintf(stderr, "Can't listen on more than %d file descriptors!\n", FDSSIZE);
591
            break;
592
          }
593
          fd_ev[i] = event_new(base, fds[i], EV_READ, &fd_cb, &fd_triggered[i]);
594
          event_add(fd_ev[i], NULL);
595
        }
596

    
597
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0 && fdTriggered==0) {
598
        //        event_base_dispatch(base);
599
                event_base_loop(base,EVLOOP_ONCE);
600
        }
601

    
602
        //delete one-time events
603
        if (timeout_ev) {
604
          if (!timeoutFired) event_del(timeout_ev);
605
          event_free(timeout_ev);
606
        }
607
        for (i = 0; fds && fds[i] != -1; i ++) {
608
          if (! fd_triggered[i]) {
609
            fds[i] = -2;
610
            event_del(fd_ev[i]);
611
          //} else {
612
            //fprintf(stderr, "\twait4data: fd %d triggered\n", fds[i]);
613
          }
614
          event_free(fd_ev[i]);
615
        }
616

    
617
        if (fdTriggered) {
618
          fdTriggered = false;
619
          //fprintf(stderr, "\twait4data: fd event\n");
620
          return 2;
621
        } else if (timeoutFired) {
622
          timeoutFired = 0;
623
          //fprintf(stderr, "\twait4data: timed out\n");
624
          return 0;
625
        } else if (receivedBuffer[rIdxUp].data!=NULL) {
626
          //fprintf(stderr, "\twait4data: ML receive\n");
627
          return 1;
628
        } else {
629
          fprintf(stderr, "BUG in wait4data\n");
630
          exit(EXIT_FAILURE);
631
        }
632
}
633

    
634
socketID_handle getRemoteSocketID(const char *ip, int port) {
635
        char str[SOCKETID_STRING_SIZE];
636
        socketID_handle h;
637

    
638
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
639
        h = malloc(SOCKETID_SIZE);
640
        mlStringToSocketID(str, h);
641

    
642
        return h;
643
}
644

    
645
struct nodeID *create_node(const char *rem_IP, int rem_port) {
646
        socketID_handle s;
647
        struct nodeID *remote;
648

    
649
        s = getRemoteSocketID(rem_IP, rem_port);
650
        remote = id_lookup_dup(s);
651
        free(s);
652

    
653
        return remote;
654
}
655

    
656
int node_ip(const struct nodeID *s, char *ip, int size) {
657
        int len;
658
        const char *start, *end;
659
        char tmp[256];
660

    
661
        node_addr(s, tmp, 256);
662

    
663
        start = strstr(tmp, "-") + 1;
664
        end = strstr(start, ":");
665
        len = end - start;
666
        if (len >= size) {
667
                return -1;
668
        }
669
        memcpy(ip, start, len);
670
        ip[len] = 0;
671

    
672
        return 1;
673
}
674

    
675
// TODO: check why closing the connection is annoying for the ML
676
void nodeid_free(struct nodeID *n) {
677
        if (n && (--(n->refcnt) == 1)) {
678
/*
679
                struct nodeID **npos;
680
        //        mlCloseConnection(n->connID);
681
                npos = id_lookup(n->addr);
682
                *npos = NULL;
683
                mlCloseSocket(n->addr);
684
                free(n);
685
*/
686
        }
687
}
688

    
689

    
690
int node_addr(const struct nodeID *s, char *addr, int len)
691
{
692
  // TODO: mlSocketIDToString always return 0 !!!
693
  int r = mlSocketIDToString(s->addr,addr,len);
694
  if (!r)
695
          return 1;
696
  else
697
          return -1;
698
}
699

    
700
struct nodeID *nodeid_dup(struct nodeID *s)
701
{
702
        s->refcnt++;
703
        return s;
704
}
705

    
706
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
707
{
708
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
709
}
710

    
711
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
712
{
713
        return mlCompareSocketIDs(s1->addr,s2->addr);
714
}
715

    
716
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
717
{
718
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
719

    
720
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
721
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
722

    
723
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
724
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
725

    
726
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
727
}
728

    
729
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
730
{
731
  uint8_t sid[SOCKETID_SIZE];
732
  socketID_handle h = (socketID_handle) sid;
733
  mlStringToSocketID((char *)b,h);
734
  *len = strlen((char*)b) + 1;
735
  return id_lookup_dup(h);
736
}