Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ 89d4c79d

History | View | Annotate | Download (15.9 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17

    
18
#include "msg_types.h"/**/
19

    
20

    
21
#ifdef MONL
22
#include "mon.h"
23
#include "grapes_log.h"
24
#include "repoclient.h"
25
#include "grapes.h"
26
#endif
27

    
28

    
29
/**
30
 * libevent pointer
31
 */
32
struct event_base *base;
33

    
34
#define NH_BUFFER_SIZE 1000
35
#define NH_PACKET_TIMEOUT {0, 500*1000}
36
#define NH_ML_INIT_TIMEOUT {1, 0}
37

    
38
static int sIdx = 0;
39
static int rIdx = 0;
40

    
41
typedef struct nodeID {
42
        socketID_handle addr;
43
        int connID;        // connection associated to this node, -1 if myself
44
        int refcnt;
45
#ifdef MONL
46
        //n quick and dirty static vector for measures TODO: make it dinamic
47
        MonHandler mhs[20];
48
        int n_mhs;
49
#endif
50
//        int addrSize;
51
//        int addrStringSize;
52
} nodeID;
53

    
54
typedef struct msgData_cb {
55
        int bIdx;        // index of the message in the proper buffer
56
        unsigned char msgType; // message type
57
        int mSize;        // message size
58
        bool conn_cb_called;
59
        bool cancelled;
60
} msgData_cb;
61

    
62
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
63
static int timeoutFired = 0;
64

    
65
// pointers to the msgs to be send
66
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
67
// pointers to the received msgs + sender nodeID
68
struct receivedB {
69
        struct nodeID *id;
70
        int len;
71
        uint8_t *data;
72
};
73
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
74
/**/ static int recv_counter =0; static int snd_counter =0;
75

    
76

    
77
/**
78
 * Look for a free slot in the received buffer and allocates it for immediate use
79
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
80
 */
81
static int next_R() {
82
        const int size = 1024;
83
        if (receivedBuffer[rIdx].data==NULL) {
84
                receivedBuffer[rIdx].data = malloc(size);
85
        }
86
        else {
87
                int count;
88
                for (count=0;count<NH_BUFFER_SIZE;count++) {
89
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
90
                        if (receivedBuffer[rIdx].data==NULL)
91
                                break;
92
                }
93
                if (count==NH_BUFFER_SIZE)
94
                        return -1;
95
                else {
96
                        receivedBuffer[rIdx].data = malloc(size);
97
                }
98
        }
99
        memset(receivedBuffer[rIdx].data,0,size);
100
        return rIdx;
101
}
102

    
103
/**
104
 * Look for a free slot in the sending buffer and allocates it for immediate use
105
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
106
 */
107
static int next_S() {
108
        const int size = 1024;
109
        if (sendingBuffer[sIdx]==NULL) {
110
                sendingBuffer[sIdx] = malloc(size);
111
        }
112
        else {
113
                int count;
114
                for (count=0;count<NH_BUFFER_SIZE;count++) {
115
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
116
                        if (sendingBuffer[sIdx]==NULL)
117
                                break;
118
                }
119
                if (count==NH_BUFFER_SIZE)
120
                        return -1;
121
                else {
122
                        sendingBuffer[sIdx] = malloc(size);
123
                }
124
        }
125
        return sIdx;
126
}
127

    
128

    
129
/**
130
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
131
 * @param local_socketID
132
 * @param errorstatus
133
 */
134
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
135
        switch (errorstatus) {
136
        case 0:
137
                //
138
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
139
        //        me->addrSize = SOCKETID_SIZE;
140
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
141
                me->connID = -1;
142
                me->refcnt = 1;
143
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
144
                break;
145
        case -1:
146
                //
147
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
148
                exit(1);
149
                break;
150
        case 1:
151
                //
152
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
153
                exit(1);
154
                break;
155
        case 2:
156
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
157
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
158
            mlSetStunServer(0,NULL);
159
            break;
160
        default :        // should never happen
161
                //
162
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
163
        }
164

    
165
}
166

    
167
/**
168
 * Timeout callback to be set in the eventlib loop as needed
169
 * @param socket
170
 * @param flag
171
 * @param arg
172
 */
173
static void t_out_cb (int socket, short flag, void* arg) {
174

    
175
        timeoutFired = 1;
176
//        fprintf(stderr,"TIMEOUT!!!\n");
177
//        event_base_loopbreak(base);
178
}
179

    
180
/**
181
 * Callback called by ml when a remote node ask for a connection
182
 * @param connectionID
183
 * @param arg
184
 */
185
static void receive_conn_cb(int connectionID, void *arg) {
186
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
187

    
188
}
189

    
190
void free_sending_buffer(int i)
191
{
192
        free(sendingBuffer[i]);
193
        sendingBuffer[i] = NULL;
194
}
195

    
196
/**
197
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
198
 * @param connectionID
199
 * @param arg
200
 */
201
static void connReady_cb (int connectionID, void *arg) {
202

    
203
        msgData_cb *p;
204
        p = (msgData_cb *)arg;
205
        if (p == NULL) return;
206
        if (p->cancelled) {
207
            free(p);
208
            return;
209
        }
210
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
211
/**/char mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
212
        if (mt!=MSG_TYPE_TOPOLOGY &&
213
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
214
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
215
                                snd_counter,mt+'0', p->mSize);}
216
        free_sending_buffer(p->bIdx);
217
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
218
        //        event_base_loopbreak(base);
219
        p->conn_cb_called = true;
220
}
221

    
222
/**
223
 * Callback called by ml when a connection error occurs
224
 * @param connectionID
225
 * @param arg
226
 */
227
static void connError_cb (int connectionID, void *arg) {
228
        // simply get rid of the msg in the buffer....
229
        msgData_cb *p;
230
        p = (msgData_cb *)arg;
231
        if (p != NULL) {
232
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
233
                if (p->cancelled) {
234
                        free(p);//p->mSize = -1;
235
                } else {
236
                        p->conn_cb_called = true;
237
                }
238
        }
239
        //        event_base_loopbreak(base);
240
}
241

    
242

    
243
/**
244
 * Callback to receive data from ml
245
 * @param buffer
246
 * @param buflen
247
 * @param msgtype
248
 * @param arg
249
 */
250
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
251
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
252
//        fprintf(stderr, "Net-helper : called back with some news...\n");
253
/**/ ++recv_counter;
254
        char str[SOCKETID_STRING_SIZE];
255
        if (arg->remote_socketID != NULL)
256
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
257
        else
258
                sprintf(str,"!Unknown!");
259
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
260
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
261
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
262
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
263
        }
264
        else {
265
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
266
                // buffering the received message only if possible, otherwise ignore it...
267
                int index = next_R();
268
                if (index >=0) {
269
                //        receivedBuffer[index][0] = malloc(buflen);
270
                        if (receivedBuffer[index].data == NULL) {
271
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
272
                                return;
273
                        }
274
                        // creating a new sender nodedID
275
                        receivedBuffer[index].id = malloc(sizeof(nodeID));
276
                        if (receivedBuffer[index].id==NULL) {
277
                                free (receivedBuffer[index].data);
278
                                receivedBuffer[index].data = NULL;
279
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
280
                                return;
281
                        }
282
                        else {
283
                                memset(receivedBuffer[index].id, 0, sizeof(struct nodeID));
284
                                nodeID *remote = receivedBuffer[index].id;
285
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
286
                                memset(receivedBuffer[index].data,0,buflen);
287
                                receivedBuffer[index].len = buflen;
288
                                //*(receivedBuffer[index][0]) = buflen;
289
                                memcpy(receivedBuffer[index].data,buffer,buflen);
290
                                  // get the socketID of the sender
291
                                remote->addr = malloc(SOCKETID_SIZE);
292
                                if (remote->addr == NULL) {
293
                                          free (remote);
294
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
295
                                          return;
296
                                }
297
                                else {
298
                                        memset(remote->addr, 0, SOCKETID_SIZE);
299
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
300
                                //        remote->addrSize = SOCKETID_SIZE;
301
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
302
                                        remote->connID = arg->connectionID;
303
                                        remote->refcnt = 1;
304
                                }
305
                        }
306
                }
307
  }
308
//        event_base_loopbreak(base);
309
}
310

    
311

    
312
struct nodeID *net_helper_init(const char *IPaddr, int port) {
313

    
314
        struct timeval tout = NH_ML_INIT_TIMEOUT;
315
        int s;
316
        base = event_base_new();
317

    
318
        me = malloc(sizeof(nodeID));
319
        if (me == NULL) {
320
                return NULL;
321
        }
322
        memset(me,0,sizeof(nodeID));
323
        me->addr = malloc(SOCKETID_SIZE);
324
        if (me->addr == NULL) {
325
                free(me);
326
                return NULL;
327
        }
328
        memset(me->addr,0,SOCKETID_SIZE);
329
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
330
        me->refcnt = 1;
331

    
332
        int i;
333
        for (i=0;i<NH_BUFFER_SIZE;i++) {
334
                sendingBuffer[i] = NULL;
335
                receivedBuffer[i].data = NULL;
336
        }
337

    
338
        mlRegisterErrorConnectionCb(&connError_cb);
339
        mlRegisterRecvConnectionCb(&receive_conn_cb);
340
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
341
        if (s < 0) {
342
                fprintf(stderr, "Net-helper : error initializing ML!\n");
343
                free(me);
344
                return NULL;
345
        }
346

    
347
#ifdef MONL
348
        eventbase = base;
349
        void *repoclient;
350

    
351
        // Initialize logging
352
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
353

    
354
        repInit("");
355
        repoclient = repOpen("repository.napa-wine.eu:9832");
356
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
357
        monInit(base, repoclient);
358
#endif
359

    
360
        while (me->connID<-1) {
361
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
362
                event_base_loop(base,EVLOOP_ONCE);
363
        }
364
        timeoutFired = 0;
365
//        fprintf(stderr,"Net-helper init : back from init!\n");
366

    
367
        return me;
368
}
369

    
370

    
371
void bind_msg_type (unsigned char msgtype) {
372

    
373
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
374
}
375

    
376

    
377
void send_to_peer_cb(int fd, short event, void *arg)
378
{
379
        msgData_cb *p = (msgData_cb *) arg;
380
        if (p->conn_cb_called) {
381
                free(p);
382
        }
383
        else { //don't send it anymore
384
                free_sending_buffer(p->bIdx);
385
                p->cancelled = true;
386
                // don't free p, the other timeout will do it
387
        }
388
}
389

    
390
/**
391
 * Called by the application to send data to a remote peer
392
 * @param from
393
 * @param to
394
 * @param buffer_ptr
395
 * @param buffer_size
396
 * @return The dimension of the buffer or -1 if a connection error occurred.
397
 */
398
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
399
{
400
        // if buffer is full, discard the message and return an error flag
401
        int index = next_S();
402
        if (index<0) {
403
                // free(buffer_ptr);
404
                fprintf(stderr,"Net-helper: buffer full\n ");
405
                return -1;
406
        }
407
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
408
        memset(sendingBuffer[index],0,buffer_size);
409
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
410
        // free(buffer_ptr);
411
        msgData_cb *p = malloc(sizeof(msgData_cb));
412
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
413
        int current = p->bIdx;
414
        send_params params = {0,0,0,0};
415
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
416
        if (to->connID<0) {
417
                free_sending_buffer(current);
418
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
419
                free(p);
420
                return -1;
421
        }
422
        else {
423
                struct timeval timeout = NH_PACKET_TIMEOUT;
424
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
425
                return buffer_size; //p->mSize;
426
        }
427

    
428
}
429

    
430

    
431
/**
432
 * Called by an application to receive data from remote peers
433
 * @param local
434
 * @param remote
435
 * @param buffer_ptr
436
 * @param buffer_size
437
 * @return The number of received bytes or -1 if some error occurred.
438
 */
439
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
440
{
441
        int size;
442
        if (receivedBuffer[rIdx].id==NULL) {        //block till first message arrives
443
                wait4data(local, NULL);
444
        }
445

    
446
        (*remote) = receivedBuffer[rIdx].id;
447
        // retrieve a msg from the buffer
448
        size = receivedBuffer[rIdx].len;
449
        if (size>buffer_size) {
450
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
451
                return -1;
452
        }
453
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
454
        free(receivedBuffer[rIdx].data);
455
        receivedBuffer[rIdx].data = NULL;
456
        receivedBuffer[rIdx].id = NULL;
457

    
458
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
459

    
460
        return size;
461
}
462

    
463

    
464
int wait4data(const struct nodeID *n, struct timeval *tout) {
465

    
466
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
467
        if (tout) {        //if tout==NULL, loop wait infinitely
468
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
469
        }
470
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
471
        //        event_base_dispatch(base);
472
                event_base_loop(base,EVLOOP_ONCE);
473
        }
474
        timeoutFired = 0;
475
//        fprintf(stderr,"Back from eventlib loop.\n");
476

    
477
        if (receivedBuffer[rIdx].data!=NULL)
478
                return 1;
479
        else
480
                return 0;
481
}
482

    
483
socketID_handle getRemoteSocketID(const char *ip, int port) {
484
        char str[SOCKETID_STRING_SIZE];
485
        socketID_handle h;
486

    
487
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
488
        h = malloc(SOCKETID_SIZE);
489
        mlStringToSocketID(str, h);
490

    
491
        return h;
492
}
493

    
494
struct nodeID *create_node(const char *rem_IP, int rem_port) {
495
        struct nodeID *remote = malloc(sizeof(nodeID));
496
        if (remote == NULL) {
497
                return NULL;
498
        }
499
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
500
//        if (remote->addr == NULL) {
501
//                free(remote);
502
//                return NULL;
503
//        }
504
//        remote->addrSize = SOCKETID_SIZE;
505
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
506
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
507
        send_params params = {0,0,0,0};
508
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
509
        remote->refcnt = 1;
510
        return remote;
511
}
512

    
513
// TODO: check why closing the connection is annoying for the ML
514
void nodeid_free(struct nodeID *n) {
515

    
516
//        mlCloseConnection(n->connID);
517
//        mlCloseSocket(n->addr);
518
//        free(n);
519
        if (n && (--(n->refcnt) == 0)) {
520
        //        mlCloseConnection(n->connID);
521
                mlCloseSocket(n->addr);
522
                free(n);
523
        }
524
}
525

    
526

    
527
const char *node_addr(const struct nodeID *s)
528
{
529
  static char addr[256];
530
  // TODO: mlSocketIDToString always return 0 !!!
531
  int r = mlSocketIDToString(s->addr,addr,256);
532
  if (!r)
533
          return addr;
534
  else
535
          return "";
536
}
537

    
538
struct nodeID *nodeid_dup(struct nodeID *s)
539
{
540
//  struct nodeID *res;
541
//
542
//  res = malloc(sizeof(struct nodeID));
543
//  if (res != NULL) {
544
//          res->addr = malloc(SOCKETID_SIZE);
545
//          if (res->addr != NULL) {
546
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
547
//        //         res->addrSize = SOCKETID_SIZE;
548
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
549
//                 res->connID = s->connID;
550
//          }
551
//          else {
552
//                free(res);
553
//                res = NULL;
554
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
555
//          }
556
//  }
557
//        return res;
558
        s->refcnt++;
559
        return s;
560
}
561

    
562
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
563
{
564
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
565
}
566

    
567
int nodeid_dump(uint8_t *b, const struct nodeID *s)
568
{
569
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
570
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
571

    
572
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
573
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
574

    
575
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
576
}
577

    
578
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
579
{
580
  struct nodeID *res;
581
  res = malloc(sizeof(struct nodeID));
582
  if (res != NULL) {
583
          memset(res,0,sizeof(struct nodeID));
584
          res->addr = malloc(SOCKETID_SIZE);
585
          if (res->addr != NULL) {
586
                  memset(res->addr,0,SOCKETID_SIZE);
587
                  //memcpy(res->addr, b, SOCKETID_SIZE);
588
                  //*len = SOCKETID_SIZE;
589
                  *len = strlen((char*)b) + 1;
590
                  mlStringToSocketID((char *)b,res->addr);
591
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
592
        //          res->addrSize = SOCKETID_SIZE;
593
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
594
                  res->connID = -1;
595
                  res->refcnt = 1;
596
          }
597
          else {
598
                  free(res);
599
                  res = NULL;
600
                  // TODO: what about *len in this case???
601
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
602
          }
603
  }
604

    
605

    
606
  return res;
607
}