Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ b833ea12

History | View | Annotate | Download (15.2 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17

    
18
#include "msg_types.h"/**/
19

    
20

    
21
#ifdef MONL
22
#include "mon.h"
23
#include "grapes_log.h"
24
#include "repoclient.h"
25
#include "grapes.h"
26
#endif
27

    
28

    
29
/**
30
 * libevent pointer
31
 */
32
struct event_base *base;
33

    
34
#define NH_BUFFER_SIZE 1000
35
#define NH_LOOKUP_SIZE 1000
36
#define NH_PACKET_TIMEOUT {0, 500*1000}
37
#define NH_ML_INIT_TIMEOUT {1, 0}
38

    
39
static int sIdx = 0;
40
static int rIdx = 0;
41

    
42
typedef struct nodeID {
43
        socketID_handle addr;
44
        int connID;        // connection associated to this node, -1 if myself
45
        int refcnt;
46
#ifdef MONL
47
        //n quick and dirty static vector for measures TODO: make it dinamic
48
        MonHandler mhs[20];
49
        int n_mhs;
50
#endif
51
//        int addrSize;
52
//        int addrStringSize;
53
} nodeID;
54

    
55
typedef struct msgData_cb {
56
        int bIdx;        // index of the message in the proper buffer
57
        unsigned char msgType; // message type
58
        int mSize;        // message size
59
        bool conn_cb_called;
60
        bool cancelled;
61
} msgData_cb;
62

    
63
static struct nodeID **lookup_array;
64
static int lookup_max = NH_LOOKUP_SIZE;
65
static int lookup_curr = 0;
66

    
67
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
68
static int timeoutFired = 0;
69

    
70
// pointers to the msgs to be send
71
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
72
// pointers to the received msgs + sender nodeID
73
struct receivedB {
74
        struct nodeID *id;
75
        int len;
76
        uint8_t *data;
77
};
78
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
79
/**/ static int recv_counter =0; static int snd_counter =0;
80

    
81

    
82
static void connReady_cb (int connectionID, void *arg);
83
static struct nodeID *new_node(socketID_handle peer) {
84
        send_params params = {0,0,0,0};
85
        struct nodeID *res = malloc(sizeof(struct nodeID));
86
        if (!res) {
87
                 fprintf(stderr, "Net-helper : memory error\n");
88
                 return NULL;
89
        }
90
        memset(res, 0, sizeof(struct nodeID));
91

    
92
        res->addr = malloc(SOCKETID_SIZE);
93
        if (! res->addr) {
94
                free (res);
95
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
96
                return NULL;
97
        }
98
        memset(res->addr, 0, SOCKETID_SIZE);
99
        memcpy(res->addr, peer ,SOCKETID_SIZE);
100

    
101
        res->refcnt = 1;
102

    
103
        res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
104

    
105
        return res;
106
}
107

    
108

    
109
static struct nodeID **id_lookup(socketID_handle target) {
110

    
111
        int i,here=-1;
112
        for (i=0;i<lookup_curr;i++) {
113
                if (lookup_array[i] == NULL) {
114
                        if (here < 0) {
115
                                here = i;
116
                        }
117
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
118
                        return &lookup_array[i];
119
                }
120
        }
121

    
122
        if (here == -1) {
123
                here = lookup_curr++;
124
        }
125

    
126
        if (lookup_curr > lookup_max) {
127
                lookup_max *= 2;
128
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
129
        }
130

    
131
        lookup_array[here] = new_node(target);
132
        return &lookup_array[here];
133
}
134

    
135
static struct nodeID *id_lookup_dup(socketID_handle target) {
136
        return nodeid_dup(*id_lookup(target));
137
}
138

    
139

    
140
/**
141
 * Look for a free slot in the received buffer and allocates it for immediate use
142
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
143
 */
144
static int next_R() {
145
        const int size = 1024;
146
        if (receivedBuffer[rIdx].data==NULL) {
147
                receivedBuffer[rIdx].data = malloc(size);
148
        }
149
        else {
150
                int count;
151
                for (count=0;count<NH_BUFFER_SIZE;count++) {
152
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
153
                        if (receivedBuffer[rIdx].data==NULL)
154
                                break;
155
                }
156
                if (count==NH_BUFFER_SIZE)
157
                        return -1;
158
                else {
159
                        receivedBuffer[rIdx].data = malloc(size);
160
                }
161
        }
162
        memset(receivedBuffer[rIdx].data,0,size);
163
        return rIdx;
164
}
165

    
166
/**
167
 * Look for a free slot in the sending buffer and allocates it for immediate use
168
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
169
 */
170
static int next_S() {
171
        const int size = 1024;
172
        if (sendingBuffer[sIdx]==NULL) {
173
                sendingBuffer[sIdx] = malloc(size);
174
        }
175
        else {
176
                int count;
177
                for (count=0;count<NH_BUFFER_SIZE;count++) {
178
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
179
                        if (sendingBuffer[sIdx]==NULL)
180
                                break;
181
                }
182
                if (count==NH_BUFFER_SIZE)
183
                        return -1;
184
                else {
185
                        sendingBuffer[sIdx] = malloc(size);
186
                }
187
        }
188
        return sIdx;
189
}
190

    
191

    
192
/**
193
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
194
 * @param local_socketID
195
 * @param errorstatus
196
 */
197
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
198
        switch (errorstatus) {
199
        case 0:
200
                me->addr = malloc(SOCKETID_SIZE);
201
                if (! me->addr) {
202
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
203
                        return;
204
                }
205

    
206
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
207
        //        me->addrSize = SOCKETID_SIZE;
208
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
209
                me->connID = -1;
210
                me->refcnt = 1;
211
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
212
                break;
213
        case -1:
214
                //
215
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
216
                exit(1);
217
                break;
218
        case 1:
219
                //
220
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
221
                exit(1);
222
                break;
223
        case 2:
224
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
225
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
226
            mlSetStunServer(0,NULL);
227
            break;
228
        default :        // should never happen
229
                //
230
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
231
        }
232

    
233
}
234

    
235
/**
236
 * Timeout callback to be set in the eventlib loop as needed
237
 * @param socket
238
 * @param flag
239
 * @param arg
240
 */
241
static void t_out_cb (int socket, short flag, void* arg) {
242

    
243
        timeoutFired = 1;
244
//        fprintf(stderr,"TIMEOUT!!!\n");
245
//        event_base_loopbreak(base);
246
}
247

    
248
/**
249
 * Callback called by ml when a remote node ask for a connection
250
 * @param connectionID
251
 * @param arg
252
 */
253
static void receive_conn_cb(int connectionID, void *arg) {
254
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
255

    
256
}
257

    
258
void free_sending_buffer(int i)
259
{
260
        free(sendingBuffer[i]);
261
        sendingBuffer[i] = NULL;
262
}
263

    
264
/**
265
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
266
 * @param connectionID
267
 * @param arg
268
 */
269
static void connReady_cb (int connectionID, void *arg) {
270
        char mt;
271
        msgData_cb *p;
272
        p = (msgData_cb *)arg;
273
        if (p == NULL) return;
274
        if (p->cancelled) {
275
            free(p);
276
            return;
277
        }
278
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
279
/**/        mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
280
        if (mt!=MSG_TYPE_TOPOLOGY &&
281
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
282
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
283
                                snd_counter,mt+'0', p->mSize);}
284
        free_sending_buffer(p->bIdx);
285
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
286
        //        event_base_loopbreak(base);
287
        p->conn_cb_called = true;
288
}
289

    
290
/**
291
 * Callback called by ml when a connection error occurs
292
 * @param connectionID
293
 * @param arg
294
 */
295
static void connError_cb (int connectionID, void *arg) {
296
        // simply get rid of the msg in the buffer....
297
        msgData_cb *p;
298
        p = (msgData_cb *)arg;
299
        if (p != NULL) {
300
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
301
                if (p->cancelled) {
302
                        free(p);//p->mSize = -1;
303
                } else {
304
                        p->conn_cb_called = true;
305
                }
306
        }
307
        //        event_base_loopbreak(base);
308
}
309

    
310

    
311
/**
312
 * Callback to receive data from ml
313
 * @param buffer
314
 * @param buflen
315
 * @param msgtype
316
 * @param arg
317
 */
318
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
319
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
320
        char str[SOCKETID_STRING_SIZE];
321
//        fprintf(stderr, "Net-helper : called back with some news...\n");
322
/**/ ++recv_counter;
323
        if (arg->remote_socketID != NULL)
324
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
325
        else
326
                sprintf(str,"!Unknown!");
327
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
328
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
329
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
330
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
331
        }
332
        else {
333
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
334
                // buffering the received message only if possible, otherwise ignore it...
335
                int index = next_R();
336
                if (index >=0) {
337
                //        receivedBuffer[index][0] = malloc(buflen);
338
                        if (receivedBuffer[index].data == NULL) {
339
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
340
                                return;
341
                        }
342
                        // creating a new sender nodedID
343
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
344
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
345
                                memset(receivedBuffer[index].data,0,buflen);
346
                                receivedBuffer[index].len = buflen;
347
                                //*(receivedBuffer[index][0]) = buflen;
348
                                memcpy(receivedBuffer[index].data,buffer,buflen);
349
                                  // get the socketID of the sender
350
                }
351
  }
352
//        event_base_loopbreak(base);
353
}
354

    
355

    
356
struct nodeID *net_helper_init(const char *IPaddr, int port) {
357

    
358
        struct timeval tout = NH_ML_INIT_TIMEOUT;
359
        int s, i;
360
        base = event_base_new();
361
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
362

    
363
        me = malloc(sizeof(nodeID));
364
        if (me == NULL) {
365
                return NULL;
366
        }
367
        memset(me,0,sizeof(nodeID));
368
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
369
        me->refcnt = 1;
370

    
371
        for (i=0;i<NH_BUFFER_SIZE;i++) {
372
                sendingBuffer[i] = NULL;
373
                receivedBuffer[i].data = NULL;
374
        }
375

    
376
        mlRegisterErrorConnectionCb(&connError_cb);
377
        mlRegisterRecvConnectionCb(&receive_conn_cb);
378
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
379
        if (s < 0) {
380
                fprintf(stderr, "Net-helper : error initializing ML!\n");
381
                free(me);
382
                return NULL;
383
        }
384

    
385
#ifdef MONL
386
        eventbase = base;
387
        void *repoclient;
388

    
389
        // Initialize logging
390
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
391

    
392
        repInit("");
393
        repoclient = repOpen("repository.napa-wine.eu:9832",60);
394
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
395
        monInit(base, repoclient);
396
#endif
397

    
398
        while (me->connID<-1) {
399
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
400
                event_base_loop(base,EVLOOP_ONCE);
401
        }
402
        timeoutFired = 0;
403
//        fprintf(stderr,"Net-helper init : back from init!\n");
404

    
405
        return me;
406
}
407

    
408

    
409
void bind_msg_type (unsigned char msgtype) {
410

    
411
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
412
}
413

    
414

    
415
void send_to_peer_cb(int fd, short event, void *arg)
416
{
417
        msgData_cb *p = (msgData_cb *) arg;
418
        if (p->conn_cb_called) {
419
                free(p);
420
        }
421
        else { //don't send it anymore
422
                free_sending_buffer(p->bIdx);
423
                p->cancelled = true;
424
                // don't free p, the other timeout will do it
425
        }
426
}
427

    
428
/**
429
 * Called by the application to send data to a remote peer
430
 * @param from
431
 * @param to
432
 * @param buffer_ptr
433
 * @param buffer_size
434
 * @return The dimension of the buffer or -1 if a connection error occurred.
435
 */
436
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
437
{
438
        msgData_cb *p;
439
        int current;
440
        send_params params = {0,0,0,0};
441

    
442
        // if buffer is full, discard the message and return an error flag
443
        int index = next_S();
444
        if (index<0) {
445
                // free(buffer_ptr);
446
                fprintf(stderr,"Net-helper: buffer full\n ");
447
                return -1;
448
        }
449
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
450
        memset(sendingBuffer[index],0,buffer_size);
451
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
452
        // free(buffer_ptr);
453
        p = malloc(sizeof(msgData_cb));
454
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
455
        current = p->bIdx;
456

    
457
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
458
        if (to->connID<0) {
459
                free_sending_buffer(current);
460
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
461
                free(p);
462
                return -1;
463
        }
464
        else {
465
                struct timeval timeout = NH_PACKET_TIMEOUT;
466
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
467
                return buffer_size; //p->mSize;
468
        }
469

    
470
}
471

    
472

    
473
/**
474
 * Called by an application to receive data from remote peers
475
 * @param local
476
 * @param remote
477
 * @param buffer_ptr
478
 * @param buffer_size
479
 * @return The number of received bytes or -1 if some error occurred.
480
 */
481
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
482
{
483
        int size;
484
        if (receivedBuffer[rIdx].id==NULL) {        //block till first message arrives
485
                wait4data(local, NULL, NULL);
486
        }
487

    
488
        (*remote) = receivedBuffer[rIdx].id;
489
        // retrieve a msg from the buffer
490
        size = receivedBuffer[rIdx].len;
491
        if (size>buffer_size) {
492
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
493
                return -1;
494
        }
495
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
496
        free(receivedBuffer[rIdx].data);
497
        receivedBuffer[rIdx].data = NULL;
498
        receivedBuffer[rIdx].id = NULL;
499

    
500
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
501

    
502
        return size;
503
}
504

    
505

    
506
int wait4data(const struct nodeID *n, struct timeval *tout, fd_set *dummy) {
507

    
508
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
509
        if (tout) {        //if tout==NULL, loop wait infinitely
510
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
511
        }
512
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
513
        //        event_base_dispatch(base);
514
                event_base_loop(base,EVLOOP_ONCE);
515
        }
516
        timeoutFired = 0;
517
//        fprintf(stderr,"Back from eventlib loop.\n");
518

    
519
        if (receivedBuffer[rIdx].data!=NULL)
520
                return 1;
521
        else
522
                return 0;
523
}
524

    
525
socketID_handle getRemoteSocketID(const char *ip, int port) {
526
        char str[SOCKETID_STRING_SIZE];
527
        socketID_handle h;
528

    
529
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
530
        h = malloc(SOCKETID_SIZE);
531
        mlStringToSocketID(str, h);
532

    
533
        return h;
534
}
535

    
536
struct nodeID *create_node(const char *rem_IP, int rem_port) {
537
        socketID_handle s;
538
        struct nodeID *remote;
539

    
540
        s = getRemoteSocketID(rem_IP, rem_port);
541
        remote = id_lookup_dup(s);
542
        free(s);
543

    
544
        return remote;
545
}
546

    
547
// TODO: check why closing the connection is annoying for the ML
548
void nodeid_free(struct nodeID *n) {
549
        if (n && (--(n->refcnt) == 1)) {
550
                struct nodeID **npos;
551

    
552
/*
553
        //        mlCloseConnection(n->connID);
554
                npos = id_lookup(n->addr);
555
                *npos = NULL;
556
                mlCloseSocket(n->addr);
557
                free(n);
558
*/
559
        }
560
}
561

    
562

    
563
const char *node_addr(const struct nodeID *s)
564
{
565
  static char addr[256];
566
  // TODO: mlSocketIDToString always return 0 !!!
567
  int r = mlSocketIDToString(s->addr,addr,256);
568
  if (!r)
569
          return addr;
570
  else
571
          return "";
572
}
573

    
574
struct nodeID *nodeid_dup(struct nodeID *s)
575
{
576
        s->refcnt++;
577
        return s;
578
}
579

    
580
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
581
{
582
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
583
}
584

    
585
int nodeid_dump(uint8_t *b, const struct nodeID *s)
586
{
587
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
588
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
589

    
590
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
591
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
592

    
593
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
594
}
595

    
596
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
597
{
598
  uint8_t sid[SOCKETID_SIZE];
599
  socketID_handle h = (socketID_handle) sid;
600
  mlStringToSocketID((char *)b,h);
601
  *len = strlen((char*)b) + 1;
602
  return id_lookup_dup(h);
603
}