Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ fd2787d5

History | View | Annotate | Download (16.5 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14
#include <assert.h>
15

    
16

    
17
#include "net_helper.h"
18
#include "ml.h"
19

    
20
#include "msg_types.h"/**/
21

    
22

    
23
#ifdef MONL
24
#include "mon.h"
25
#include "grapes_log.h"
26
#include "repoclient.h"
27
#include "grapes.h"
28
#endif
29

    
30

    
31
/**
32
 * libevent pointer
33
 */
34
struct event_base *base;
35

    
36
#define NH_BUFFER_SIZE 1000
37
#define NH_LOOKUP_SIZE 1000
38
#define NH_PACKET_TIMEOUT {0, 500*1000}
39
#define NH_ML_INIT_TIMEOUT {1, 0}
40

    
41
#define FDSSIZE 16
42

    
43
static int sIdx = 0;
44
static int rIdxML = 0;        //reveive from ML to this buffer position
45
static int rIdxUp = 0;        //hand up to layer above at this buffer position
46

    
47
typedef struct nodeID {
48
        socketID_handle addr;
49
        int connID;        // connection associated to this node, -1 if myself
50
        int refcnt;
51
#ifdef MONL
52
        //n quick and dirty static vector for measures TODO: make it dinamic
53
        MonHandler mhs[20];
54
        int n_mhs;
55
#endif
56
//        int addrSize;
57
//        int addrStringSize;
58
} nodeID;
59

    
60
typedef struct msgData_cb {
61
        int bIdx;        // index of the message in the proper buffer
62
        unsigned char msgType; // message type
63
        int mSize;        // message size
64
        bool conn_cb_called;
65
        bool cancelled;
66
} msgData_cb;
67

    
68
static struct nodeID **lookup_array;
69
static int lookup_max = NH_LOOKUP_SIZE;
70
static int lookup_curr = 0;
71

    
72
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
73
static int timeoutFired = 0;
74
static bool fdTriggered = false;
75

    
76
// pointers to the msgs to be send
77
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
78
// pointers to the received msgs + sender nodeID
79
struct receivedB {
80
        struct nodeID *id;
81
        int len;
82
        uint8_t *data;
83
};
84
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
85
/**/ static int recv_counter =0; static int snd_counter =0;
86

    
87

    
88
static void connReady_cb (int connectionID, void *arg);
89
static struct nodeID *new_node(socketID_handle peer) {
90
        send_params params = {0,0,0,0};
91
        struct nodeID *res = malloc(sizeof(struct nodeID));
92
        if (!res) {
93
                 fprintf(stderr, "Net-helper : memory error\n");
94
                 return NULL;
95
        }
96
        memset(res, 0, sizeof(struct nodeID));
97

    
98
        res->addr = malloc(SOCKETID_SIZE);
99
        if (! res->addr) {
100
                free (res);
101
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
102
                return NULL;
103
        }
104
        memset(res->addr, 0, SOCKETID_SIZE);
105
        memcpy(res->addr, peer ,SOCKETID_SIZE);
106

    
107
        res->refcnt = 1;
108

    
109
        res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
110

    
111
        return res;
112
}
113

    
114

    
115
static struct nodeID **id_lookup(socketID_handle target) {
116

    
117
        int i,here=-1;
118
        for (i=0;i<lookup_curr;i++) {
119
                if (lookup_array[i] == NULL) {
120
                        if (here < 0) {
121
                                here = i;
122
                        }
123
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
124
                        return &lookup_array[i];
125
                }
126
        }
127

    
128
        if (here == -1) {
129
                here = lookup_curr++;
130
        }
131

    
132
        if (lookup_curr > lookup_max) {
133
                lookup_max *= 2;
134
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
135
        }
136

    
137
        lookup_array[here] = new_node(target);
138
        return &lookup_array[here];
139
}
140

    
141
static struct nodeID *id_lookup_dup(socketID_handle target) {
142
        return nodeid_dup(*id_lookup(target));
143
}
144

    
145

    
146
/**
147
 * Look for a free slot in the received buffer and allocates it for immediate use
148
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
149
 */
150
static int next_R() {
151
        if (receivedBuffer[rIdxML].data==NULL) {
152
                int ret = rIdxML;
153
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
154
                return ret;
155
        } else {
156
                //TODO: handle receive overload situation!
157
                return -1;
158
        }
159
}
160

    
161
/**
162
 * Look for a free slot in the sending buffer and allocates it for immediate use
163
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
164
 */
165
static int next_S() {
166
        if (sendingBuffer[sIdx]) {
167
                int count;
168
                for (count=0;count<NH_BUFFER_SIZE;count++) {
169
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
170
                        if (sendingBuffer[sIdx]==NULL)
171
                                break;
172
                }
173
                if (count==NH_BUFFER_SIZE) {
174
                        return -1;
175
                }
176
        }
177
        return sIdx;
178
}
179

    
180

    
181
/**
182
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
183
 * @param local_socketID
184
 * @param errorstatus
185
 */
186
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
187
        switch (errorstatus) {
188
        case 0:
189
                me->addr = malloc(SOCKETID_SIZE);
190
                if (! me->addr) {
191
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
192
                        return;
193
                }
194

    
195
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
196
        //        me->addrSize = SOCKETID_SIZE;
197
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
198
                me->connID = -1;
199
                me->refcnt = 1;
200
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
201
                break;
202
        case -1:
203
                //
204
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
205
                exit(1);
206
                break;
207
        case 1:
208
                //
209
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
210
                exit(1);
211
                break;
212
        case 2:
213
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
214
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
215
            mlSetStunServer(0,NULL);
216
            break;
217
        default :        // should never happen
218
                //
219
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
220
        }
221

    
222
}
223

    
224
/**
225
 * Timeout callback to be set in the eventlib loop as needed
226
 * @param socket
227
 * @param flag
228
 * @param arg
229
 */
230
static void t_out_cb (int socket, short flag, void* arg) {
231

    
232
        timeoutFired = 1;
233
//        fprintf(stderr,"TIMEOUT!!!\n");
234
//        event_base_loopbreak(base);
235
}
236

    
237
/**
238
 * File descriptor readable callback to be set in the eventlib loop as needed
239
 */
240
static void fd_cb (int fd, short flag, void* arg)
241
{
242
  //fprintf(stderr, "\twait4data: fd %d triggered\n", fd);
243
  fdTriggered = true;
244
  *((bool*)arg) = true;
245
}
246

    
247
/**
248
 * Callback called by ml when a remote node ask for a connection
249
 * @param connectionID
250
 * @param arg
251
 */
252
static void receive_conn_cb(int connectionID, void *arg) {
253
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
254

    
255
}
256

    
257
void free_sending_buffer(int i)
258
{
259
        free(sendingBuffer[i]);
260
        sendingBuffer[i] = NULL;
261
}
262

    
263
/**
264
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
265
 * @param connectionID
266
 * @param arg
267
 */
268
static void connReady_cb (int connectionID, void *arg) {
269
        char mt;
270
        msgData_cb *p;
271
        p = (msgData_cb *)arg;
272
        if (p == NULL) return;
273
        if (p->cancelled) {
274
            free(p);
275
            return;
276
        }
277
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
278
/**/        mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
279
        if (mt!=MSG_TYPE_TOPOLOGY &&
280
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
281
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
282
                                snd_counter,mt+'0', p->mSize);}
283
        free_sending_buffer(p->bIdx);
284
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
285
        //        event_base_loopbreak(base);
286
        p->conn_cb_called = true;
287
}
288

    
289
/**
290
 * Callback called by ml when a connection error occurs
291
 * @param connectionID
292
 * @param arg
293
 */
294
static void connError_cb (int connectionID, void *arg) {
295
        // simply get rid of the msg in the buffer....
296
        msgData_cb *p;
297
        p = (msgData_cb *)arg;
298
        if (p != NULL) {
299
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
300
                if (p->cancelled) {
301
                        free(p);//p->mSize = -1;
302
                } else {
303
                        p->conn_cb_called = true;
304
                }
305
        }
306
        //        event_base_loopbreak(base);
307
}
308

    
309

    
310
/**
311
 * Callback to receive data from ml
312
 * @param buffer
313
 * @param buflen
314
 * @param msgtype
315
 * @param arg
316
 */
317
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
318
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
319
        char str[SOCKETID_STRING_SIZE];
320
//        fprintf(stderr, "Net-helper : called back with some news...\n");
321
/**/ ++recv_counter;
322
        if (arg->remote_socketID != NULL)
323
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
324
        else
325
                sprintf(str,"!Unknown!");
326
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
327
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
328
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
329
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
330
        }
331
        else {
332
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
333
                // buffering the received message only if possible, otherwise ignore it...
334
                int index = next_R();
335
                if (index<0) {
336
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
337
                        return;
338
                } else {
339
                        receivedBuffer[index].data = malloc(buflen);
340
                        if (receivedBuffer[index].data == NULL) {
341
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
342
                                return;
343
                        }
344
                        receivedBuffer[index].len = buflen;
345
                        memcpy(receivedBuffer[index].data,buffer,buflen);
346
                          // save the socketID of the sender
347
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
348
                }
349
  }
350
//        event_base_loopbreak(base);
351
}
352

    
353

    
354
struct nodeID *net_helper_init(const char *IPaddr, int port) {
355

    
356
        struct timeval tout = NH_ML_INIT_TIMEOUT;
357
        int s, i;
358
        base = event_base_new();
359
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
360

    
361
        me = malloc(sizeof(nodeID));
362
        if (me == NULL) {
363
                return NULL;
364
        }
365
        memset(me,0,sizeof(nodeID));
366
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
367
        me->refcnt = 1;
368

    
369
        for (i=0;i<NH_BUFFER_SIZE;i++) {
370
                sendingBuffer[i] = NULL;
371
                receivedBuffer[i].data = NULL;
372
        }
373

    
374
        mlRegisterErrorConnectionCb(&connError_cb);
375
        mlRegisterRecvConnectionCb(&receive_conn_cb);
376
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
377
        if (s < 0) {
378
                fprintf(stderr, "Net-helper : error initializing ML!\n");
379
                free(me);
380
                return NULL;
381
        }
382

    
383
#ifdef MONL
384
{
385
        void *repoclient;
386
        eventbase = base;
387

    
388
        // Initialize logging
389
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
390

    
391
        repInit("");
392
        repoclient = repOpen("79.120.193.115:9832",60);        //repository.napa-wine.eu
393
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
394
        monInit(base, repoclient);
395
}
396
#endif
397

    
398
        while (me->connID<-1) {
399
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
400
                event_base_loop(base,EVLOOP_ONCE);
401
        }
402
        timeoutFired = 0;
403
//        fprintf(stderr,"Net-helper init : back from init!\n");
404

    
405
        return me;
406
}
407

    
408

    
409
void bind_msg_type (unsigned char msgtype) {
410

    
411
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
412
}
413

    
414

    
415
void send_to_peer_cb(int fd, short event, void *arg)
416
{
417
        msgData_cb *p = (msgData_cb *) arg;
418
        if (p->conn_cb_called) {
419
                free(p);
420
        }
421
        else { //don't send it anymore
422
                free_sending_buffer(p->bIdx);
423
                p->cancelled = true;
424
                // don't free p, the other timeout will do it
425
        }
426
}
427

    
428
/**
429
 * Called by the application to send data to a remote peer
430
 * @param from
431
 * @param to
432
 * @param buffer_ptr
433
 * @param buffer_size
434
 * @return The dimension of the buffer or -1 if a connection error occurred.
435
 */
436
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
437
{
438
        msgData_cb *p;
439
        int current;
440
        send_params params = {0,0,0,0};
441

    
442
        if (buffer_size <= 0) {
443
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
444
                return buffer_size;
445
        }
446

    
447
        // if buffer is full, discard the message and return an error flag
448
        int index = next_S();
449
        if (index<0) {
450
                // free(buffer_ptr);
451
                fprintf(stderr,"Net-helper: send buffer full\n ");
452
                return -1;
453
        }
454
        sendingBuffer[index] = malloc(buffer_size);
455
        if (! sendingBuffer[index]){
456
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
457
                return -1;
458
        }
459
        memset(sendingBuffer[index],0,buffer_size);
460
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
461
        // free(buffer_ptr);
462
        p = malloc(sizeof(msgData_cb));
463
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
464
        current = p->bIdx;
465

    
466
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
467
        if (to->connID<0) {
468
                free_sending_buffer(current);
469
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
470
                free(p);
471
                return -1;
472
        }
473
        else {
474
                struct timeval timeout = NH_PACKET_TIMEOUT;
475
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
476
                return buffer_size; //p->mSize;
477
        }
478

    
479
}
480

    
481

    
482
/**
483
 * Called by an application to receive data from remote peers
484
 * @param local
485
 * @param remote
486
 * @param buffer_ptr
487
 * @param buffer_size
488
 * @return The number of received bytes or -1 if some error occurred.
489
 */
490
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
491
{
492
        int size;
493
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
494
                wait4data(local, NULL, NULL);
495
        }
496

    
497
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
498

    
499
        (*remote) = receivedBuffer[rIdxUp].id;
500
        // retrieve a msg from the buffer
501
        size = receivedBuffer[rIdxUp].len;
502
        if (size>buffer_size) {
503
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
504
                return -1;
505
        }
506
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
507
        free(receivedBuffer[rIdxUp].data);
508
        receivedBuffer[rIdxUp].data = NULL;
509
        receivedBuffer[rIdxUp].id = NULL;
510

    
511
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
512

    
513
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
514

    
515
        return size;
516
}
517

    
518

    
519
int wait4data(const struct nodeID *n, struct timeval *tout, int *fds) {
520

    
521
        struct event *timeout_ev;
522
        struct event *fd_ev[FDSSIZE];
523
        bool fd_triggered[FDSSIZE] = { false };
524
        int i;
525

    
526
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
527
        if (tout) {        //if tout==NULL, loop wait infinitely
528
          timeout_ev = event_new(base, -1, EV_TIMEOUT, &t_out_cb, NULL);
529
          event_add(timeout_ev, tout);
530
        }
531
        for (i = 0; fds && fds[i] != -1; i ++) {
532
          if (i >= FDSSIZE) {
533
            fprintf(stderr, "Can't listen on more than %d file descriptors!\n", FDSSIZE);
534
            break;
535
          }
536
          fd_ev[i] = event_new(base, fds[i], EV_READ, &fd_cb, &fd_triggered[i]);
537
          event_add(fd_ev[i], NULL);
538
        }
539

    
540
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0 && fdTriggered==0) {
541
        //        event_base_dispatch(base);
542
                event_base_loop(base,EVLOOP_ONCE);
543
        }
544

    
545
        //delete one-time events
546
        event_del(timeout_ev);
547
        event_free(timeout_ev);
548
        for (i = 0; fds && fds[i] != -1; i ++) {
549
          if (! fd_triggered[i]) {
550
            fds[i] = -2;
551
            event_del(fd_ev[i]);
552
          //} else {
553
            //fprintf(stderr, "\twait4data: fd %d triggered\n", fds[i]);
554
          }
555
          event_free(fd_ev[i]);
556
        }
557

    
558
        if (fdTriggered) {
559
          fdTriggered = false;
560
          //fprintf(stderr, "\twait4data: fd event\n");
561
          return 2;
562
        } else if (timeoutFired) {
563
          timeoutFired = 0;
564
          //fprintf(stderr, "\twait4data: timed out\n");
565
          return 0;
566
        } else if (receivedBuffer[rIdxUp].data!=NULL) {
567
          //fprintf(stderr, "\twait4data: ML receive\n");
568
          return 1;
569
        } else {
570
          fprintf(stderr, "BUG in wait4data\n");
571
          exit(EXIT_FAILURE);
572
        }
573
}
574

    
575
socketID_handle getRemoteSocketID(const char *ip, int port) {
576
        char str[SOCKETID_STRING_SIZE];
577
        socketID_handle h;
578

    
579
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
580
        h = malloc(SOCKETID_SIZE);
581
        mlStringToSocketID(str, h);
582

    
583
        return h;
584
}
585

    
586
struct nodeID *create_node(const char *rem_IP, int rem_port) {
587
        socketID_handle s;
588
        struct nodeID *remote;
589

    
590
        s = getRemoteSocketID(rem_IP, rem_port);
591
        remote = id_lookup_dup(s);
592
        free(s);
593

    
594
        return remote;
595
}
596

    
597
// TODO: check why closing the connection is annoying for the ML
598
void nodeid_free(struct nodeID *n) {
599
        if (n && (--(n->refcnt) == 1)) {
600
/*
601
                struct nodeID **npos;
602
        //        mlCloseConnection(n->connID);
603
                npos = id_lookup(n->addr);
604
                *npos = NULL;
605
                mlCloseSocket(n->addr);
606
                free(n);
607
*/
608
        }
609
}
610

    
611

    
612
const char *node_addr(const struct nodeID *s)
613
{
614
  static char addr[256];
615
  // TODO: mlSocketIDToString always return 0 !!!
616
  int r = mlSocketIDToString(s->addr,addr,256);
617
  if (!r)
618
          return addr;
619
  else
620
          return "";
621
}
622

    
623
struct nodeID *nodeid_dup(struct nodeID *s)
624
{
625
        s->refcnt++;
626
        return s;
627
}
628

    
629
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
630
{
631
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
632
}
633

    
634
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
635
{
636
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
637

    
638
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
639
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
640

    
641
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
642
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
643

    
644
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
645
}
646

    
647
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
648
{
649
  uint8_t sid[SOCKETID_SIZE];
650
  socketID_handle h = (socketID_handle) sid;
651
  mlStringToSocketID((char *)b,h);
652
  *len = strlen((char*)b) + 1;
653
  return id_lookup_dup(h);
654
}