Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ c2fe2492

History | View | Annotate | Download (15.9 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17

    
18
#include "msg_types.h"/**/
19

    
20

    
21
#ifdef MONL
22
#include "mon.h"
23
#include "grapes_log.h"
24
#include "repoclient.h"
25
#include "grapes.h"
26
#endif
27

    
28

    
29
/**
30
 * libevent pointer
31
 */
32
struct event_base *base;
33

    
34
#define NH_BUFFER_SIZE 1000
35
#define NH_LOOKUP_SIZE 1000
36
#define NH_PACKET_TIMEOUT {0, 500*1000}
37
#define NH_ML_INIT_TIMEOUT {1, 0}
38

    
39
static int sIdx = 0;
40
static int rIdx = 0;
41

    
42
typedef struct nodeID {
43
        socketID_handle addr;
44
        int connID;        // connection associated to this node, -1 if myself
45
        int refcnt;
46
#ifdef MONL
47
        //n quick and dirty static vector for measures TODO: make it dinamic
48
        MonHandler mhs[20];
49
        int n_mhs;
50
#endif
51
//        int addrSize;
52
//        int addrStringSize;
53
} nodeID;
54

    
55
typedef struct msgData_cb {
56
        int bIdx;        // index of the message in the proper buffer
57
        unsigned char msgType; // message type
58
        int mSize;        // message size
59
        bool conn_cb_called;
60
        bool cancelled;
61
} msgData_cb;
62

    
63
static struct nodeID **lookup_array;
64
static int lookup_max = 1000;
65
static int lookup_curr = 0;
66

    
67
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
68
static int timeoutFired = 0;
69

    
70
// pointers to the msgs to be send
71
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
72
// pointers to the received msgs + sender nodeID
73
struct receivedB {
74
        struct nodeID *id;
75
        int len;
76
        uint8_t *data;
77
};
78
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
79
/**/ static int recv_counter =0; static int snd_counter =0;
80

    
81

    
82
static struct nodeID *new_node(socketID_handle peer, int conn_id) {
83
         struct nodeID *res = malloc(sizeof(struct nodeID));
84
         if (!res)
85
                 return NULL;
86
         memset(res, 0, sizeof(struct nodeID));
87
         res->addr = malloc(SOCKETID_SIZE);
88
         if (res->addr) {
89
                 memset(res->addr, 0, SOCKETID_SIZE);
90
                 memcpy(res->addr, peer ,SOCKETID_SIZE);
91
                 //        remote->addrSize = SOCKETID_SIZE;
92
                 //        remote->addrStringSize = SOCKETID_STRING_SIZE;
93
                 res->connID = conn_id;
94
                 res->refcnt = 1;
95
         }
96
         else {
97
                 free (res);
98
                 fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
99
                 return NULL;
100
         }
101

    
102
         return res;
103
}
104

    
105

    
106
static struct nodeID *id_lookup(socketID_handle target, int conn_id) {
107

    
108
        int i,here=-1;
109
        for (i=0;i<lookup_curr;i++) {
110
                if (lookup_array[i] == NULL) {
111
                        if (here < 0)
112
                                here = i;
113
                }
114
                else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
115
                        return nodeid_dup(lookup_array[i]);
116
                }
117
                else {
118
                        if (lookup_array[i]->refcnt == 1) {
119
                                nodeid_free(lookup_array[i]);
120
                                lookup_array[i] = NULL;
121
                                if (here < 0)
122
                                        here = i;
123
                        }
124
                }
125
        }
126

    
127
        if (here >= 0) {
128
                lookup_array[here] = new_node(target,conn_id);
129
                return nodeid_dup(lookup_array[here]);
130
        }
131

    
132
        if (lookup_curr == lookup_max) {
133
                lookup_max *= 2;
134
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
135
        }
136

    
137
        lookup_array[lookup_curr] = new_node(target,conn_id);
138
        return nodeid_dup(lookup_array[lookup_curr++]);
139

    
140
}
141

    
142

    
143
/**
144
 * Look for a free slot in the received buffer and allocates it for immediate use
145
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
146
 */
147
static int next_R() {
148
        const int size = 1024;
149
        if (receivedBuffer[rIdx].data==NULL) {
150
                receivedBuffer[rIdx].data = malloc(size);
151
        }
152
        else {
153
                int count;
154
                for (count=0;count<NH_BUFFER_SIZE;count++) {
155
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
156
                        if (receivedBuffer[rIdx].data==NULL)
157
                                break;
158
                }
159
                if (count==NH_BUFFER_SIZE)
160
                        return -1;
161
                else {
162
                        receivedBuffer[rIdx].data = malloc(size);
163
                }
164
        }
165
        memset(receivedBuffer[rIdx].data,0,size);
166
        return rIdx;
167
}
168

    
169
/**
170
 * Look for a free slot in the sending buffer and allocates it for immediate use
171
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
172
 */
173
static int next_S() {
174
        const int size = 1024;
175
        if (sendingBuffer[sIdx]==NULL) {
176
                sendingBuffer[sIdx] = malloc(size);
177
        }
178
        else {
179
                int count;
180
                for (count=0;count<NH_BUFFER_SIZE;count++) {
181
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
182
                        if (sendingBuffer[sIdx]==NULL)
183
                                break;
184
                }
185
                if (count==NH_BUFFER_SIZE)
186
                        return -1;
187
                else {
188
                        sendingBuffer[sIdx] = malloc(size);
189
                }
190
        }
191
        return sIdx;
192
}
193

    
194

    
195
/**
196
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
197
 * @param local_socketID
198
 * @param errorstatus
199
 */
200
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
201
        switch (errorstatus) {
202
        case 0:
203
                //
204
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
205
        //        me->addrSize = SOCKETID_SIZE;
206
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
207
                me->connID = -1;
208
                me->refcnt = 1;
209
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
210
                break;
211
        case -1:
212
                //
213
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
214
                exit(1);
215
                break;
216
        case 1:
217
                //
218
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
219
                exit(1);
220
                break;
221
        case 2:
222
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
223
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
224
            mlSetStunServer(0,NULL);
225
            break;
226
        default :        // should never happen
227
                //
228
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
229
        }
230

    
231
}
232

    
233
/**
234
 * Timeout callback to be set in the eventlib loop as needed
235
 * @param socket
236
 * @param flag
237
 * @param arg
238
 */
239
static void t_out_cb (int socket, short flag, void* arg) {
240

    
241
        timeoutFired = 1;
242
//        fprintf(stderr,"TIMEOUT!!!\n");
243
//        event_base_loopbreak(base);
244
}
245

    
246
/**
247
 * Callback called by ml when a remote node ask for a connection
248
 * @param connectionID
249
 * @param arg
250
 */
251
static void receive_conn_cb(int connectionID, void *arg) {
252
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
253

    
254
}
255

    
256
void free_sending_buffer(int i)
257
{
258
        free(sendingBuffer[i]);
259
        sendingBuffer[i] = NULL;
260
}
261

    
262
/**
263
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
264
 * @param connectionID
265
 * @param arg
266
 */
267
static void connReady_cb (int connectionID, void *arg) {
268

    
269
        msgData_cb *p;
270
        p = (msgData_cb *)arg;
271
        if (p == NULL) return;
272
        if (p->cancelled) {
273
            free(p);
274
            return;
275
        }
276
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
277
/**/char mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
278
        if (mt!=MSG_TYPE_TOPOLOGY &&
279
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
280
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
281
                                snd_counter,mt+'0', p->mSize);}
282
        free_sending_buffer(p->bIdx);
283
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
284
        //        event_base_loopbreak(base);
285
        p->conn_cb_called = true;
286
}
287

    
288
/**
289
 * Callback called by ml when a connection error occurs
290
 * @param connectionID
291
 * @param arg
292
 */
293
static void connError_cb (int connectionID, void *arg) {
294
        // simply get rid of the msg in the buffer....
295
        msgData_cb *p;
296
        p = (msgData_cb *)arg;
297
        if (p != NULL) {
298
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
299
                if (p->cancelled) {
300
                        free(p);//p->mSize = -1;
301
                } else {
302
                        p->conn_cb_called = true;
303
                }
304
        }
305
        //        event_base_loopbreak(base);
306
}
307

    
308

    
309
/**
310
 * Callback to receive data from ml
311
 * @param buffer
312
 * @param buflen
313
 * @param msgtype
314
 * @param arg
315
 */
316
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
317
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
318
//        fprintf(stderr, "Net-helper : called back with some news...\n");
319
/**/ ++recv_counter;
320
        char str[SOCKETID_STRING_SIZE];
321
        if (arg->remote_socketID != NULL)
322
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
323
        else
324
                sprintf(str,"!Unknown!");
325
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
326
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
327
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
328
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
329
        }
330
        else {
331
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
332
                // buffering the received message only if possible, otherwise ignore it...
333
                int index = next_R();
334
                if (index >=0) {
335
                //        receivedBuffer[index][0] = malloc(buflen);
336
                        if (receivedBuffer[index].data == NULL) {
337
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
338
                                return;
339
                        }
340
                        // creating a new sender nodedID
341
                        receivedBuffer[index].id = id_lookup(arg->remote_socketID, arg->connectionID);
342
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
343
                                memset(receivedBuffer[index].data,0,buflen);
344
                                receivedBuffer[index].len = buflen;
345
                                //*(receivedBuffer[index][0]) = buflen;
346
                                memcpy(receivedBuffer[index].data,buffer,buflen);
347
                                  // get the socketID of the sender
348
                }
349
  }
350
//        event_base_loopbreak(base);
351
}
352

    
353

    
354
struct nodeID *net_helper_init(const char *IPaddr, int port) {
355

    
356
        struct timeval tout = NH_ML_INIT_TIMEOUT;
357
        int s;
358
        base = event_base_new();
359
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
360

    
361
        me = malloc(sizeof(nodeID));
362
        if (me == NULL) {
363
                return NULL;
364
        }
365
        memset(me,0,sizeof(nodeID));
366
        me->addr = malloc(SOCKETID_SIZE);
367
        if (me->addr == NULL) {
368
                free(me);
369
                return NULL;
370
        }
371
        memset(me->addr,0,SOCKETID_SIZE);
372
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
373
        me->refcnt = 1;
374

    
375
        int i;
376
        for (i=0;i<NH_BUFFER_SIZE;i++) {
377
                sendingBuffer[i] = NULL;
378
                receivedBuffer[i].data = NULL;
379
        }
380

    
381
        mlRegisterErrorConnectionCb(&connError_cb);
382
        mlRegisterRecvConnectionCb(&receive_conn_cb);
383
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
384
        if (s < 0) {
385
                fprintf(stderr, "Net-helper : error initializing ML!\n");
386
                free(me);
387
                return NULL;
388
        }
389

    
390
#ifdef MONL
391
        eventbase = base;
392
        void *repoclient;
393

    
394
        // Initialize logging
395
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
396

    
397
        repInit("");
398
        repoclient = repOpen("repository.napa-wine.eu:9832",60);
399
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
400
        monInit(base, repoclient);
401
#endif
402

    
403
        while (me->connID<-1) {
404
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
405
                event_base_loop(base,EVLOOP_ONCE);
406
        }
407
        timeoutFired = 0;
408
//        fprintf(stderr,"Net-helper init : back from init!\n");
409

    
410
        return me;
411
}
412

    
413

    
414
void bind_msg_type (unsigned char msgtype) {
415

    
416
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
417
}
418

    
419

    
420
void send_to_peer_cb(int fd, short event, void *arg)
421
{
422
        msgData_cb *p = (msgData_cb *) arg;
423
        if (p->conn_cb_called) {
424
                free(p);
425
        }
426
        else { //don't send it anymore
427
                free_sending_buffer(p->bIdx);
428
                p->cancelled = true;
429
                // don't free p, the other timeout will do it
430
        }
431
}
432

    
433
/**
434
 * Called by the application to send data to a remote peer
435
 * @param from
436
 * @param to
437
 * @param buffer_ptr
438
 * @param buffer_size
439
 * @return The dimension of the buffer or -1 if a connection error occurred.
440
 */
441
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
442
{
443
        // if buffer is full, discard the message and return an error flag
444
        int index = next_S();
445
        if (index<0) {
446
                // free(buffer_ptr);
447
                fprintf(stderr,"Net-helper: buffer full\n ");
448
                return -1;
449
        }
450
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
451
        memset(sendingBuffer[index],0,buffer_size);
452
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
453
        // free(buffer_ptr);
454
        msgData_cb *p = malloc(sizeof(msgData_cb));
455
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
456
        int current = p->bIdx;
457
        send_params params = {0,0,0,0};
458
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
459
        if (to->connID<0) {
460
                free_sending_buffer(current);
461
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
462
                free(p);
463
                return -1;
464
        }
465
        else {
466
                struct timeval timeout = NH_PACKET_TIMEOUT;
467
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
468
                return buffer_size; //p->mSize;
469
        }
470

    
471
}
472

    
473

    
474
/**
475
 * Called by an application to receive data from remote peers
476
 * @param local
477
 * @param remote
478
 * @param buffer_ptr
479
 * @param buffer_size
480
 * @return The number of received bytes or -1 if some error occurred.
481
 */
482
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
483
{
484
        int size;
485
        if (receivedBuffer[rIdx].id==NULL) {        //block till first message arrives
486
                wait4data(local, NULL, NULL);
487
        }
488

    
489
        (*remote) = receivedBuffer[rIdx].id;
490
        // retrieve a msg from the buffer
491
        size = receivedBuffer[rIdx].len;
492
        if (size>buffer_size) {
493
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
494
                return -1;
495
        }
496
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
497
        free(receivedBuffer[rIdx].data);
498
        receivedBuffer[rIdx].data = NULL;
499
        receivedBuffer[rIdx].id = NULL;
500

    
501
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
502

    
503
        return size;
504
}
505

    
506

    
507
int wait4data(const struct nodeID *n, struct timeval *tout, fd_set *dummy) {
508

    
509
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
510
        if (tout) {        //if tout==NULL, loop wait infinitely
511
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
512
        }
513
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
514
        //        event_base_dispatch(base);
515
                event_base_loop(base,EVLOOP_ONCE);
516
        }
517
        timeoutFired = 0;
518
//        fprintf(stderr,"Back from eventlib loop.\n");
519

    
520
        if (receivedBuffer[rIdx].data!=NULL)
521
                return 1;
522
        else
523
                return 0;
524
}
525

    
526
socketID_handle getRemoteSocketID(const char *ip, int port) {
527
        char str[SOCKETID_STRING_SIZE];
528
        socketID_handle h;
529

    
530
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
531
        h = malloc(SOCKETID_SIZE);
532
        mlStringToSocketID(str, h);
533

    
534
        return h;
535
}
536

    
537
struct nodeID *create_node(const char *rem_IP, int rem_port) {
538
        struct nodeID *remote = malloc(sizeof(nodeID));
539
        if (remote == NULL) {
540
                return NULL;
541
        }
542
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
543
        send_params params = {0,0,0,0};
544
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
545
        remote->refcnt = 1;
546
        return remote;
547
}
548

    
549
// TODO: check why closing the connection is annoying for the ML
550
void nodeid_free(struct nodeID *n) {
551

    
552
//        mlCloseConnection(n->connID);
553
//        mlCloseSocket(n->addr);
554
//        free(n);
555
        if (n && (--(n->refcnt) == 0)) {
556
        //        mlCloseConnection(n->connID);
557
                mlCloseSocket(n->addr);
558
                free(n);
559
        }
560
}
561

    
562

    
563
const char *node_addr(const struct nodeID *s)
564
{
565
  static char addr[256];
566
  // TODO: mlSocketIDToString always return 0 !!!
567
  int r = mlSocketIDToString(s->addr,addr,256);
568
  if (!r)
569
          return addr;
570
  else
571
          return "";
572
}
573

    
574
struct nodeID *nodeid_dup(struct nodeID *s)
575
{
576
//  struct nodeID *res;
577
//
578
//  res = malloc(sizeof(struct nodeID));
579
//  if (res != NULL) {
580
//          res->addr = malloc(SOCKETID_SIZE);
581
//          if (res->addr != NULL) {
582
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
583
//        //         res->addrSize = SOCKETID_SIZE;
584
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
585
//                 res->connID = s->connID;
586
//          }
587
//          else {
588
//                free(res);
589
//                res = NULL;
590
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
591
//          }
592
//  }
593
//        return res;
594
        s->refcnt++;
595
        return s;
596
}
597

    
598
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
599
{
600
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
601
}
602

    
603
int nodeid_dump(uint8_t *b, const struct nodeID *s)
604
{
605
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
606
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
607

    
608
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
609
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
610

    
611
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
612
}
613

    
614
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
615
{
616
  uint8_t sid[SOCKETID_SIZE];
617
  socketID_handle h = (socketID_handle) sid;
618
  mlStringToSocketID((char *)b,h);
619
  *len = strlen((char*)b) + 1;
620
  return id_lookup(h,-1);
621
}