Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ b712fc10

History | View | Annotate | Download (15.3 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17

    
18
#include "msg_types.h"/**/
19

    
20
/**
21
 * libevent pointer
22
 */
23
struct event_base *base;
24

    
25
#define NH_BUFFER_SIZE 1000
26

    
27
static int sIdx = 0;
28
static int rIdx = 0;
29

    
30
typedef struct nodeID {
31
        socketID_handle addr;
32
        int connID;        // connection associated to this node, -1 if myself
33
        int refcnt;
34
//        int addrSize;
35
//        int addrStringSize;
36
} nodeID;
37

    
38
typedef struct msgData_cb {
39
        int bIdx;        // index of the message in the proper buffer
40
        unsigned char msgType; // message type
41
        int mSize;        // message size
42
        bool conn_cb_called;
43
        bool cancelled;
44
} msgData_cb;
45

    
46
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
47
static int timeoutFired = 0;
48

    
49
// pointers to the msgs to be send
50
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
51
// pointers to the received msgs + sender nodeID
52
struct receivedB {
53
        struct nodeID *id;
54
        int len;
55
        uint8_t *data;
56
};
57
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
58
/**/ static int recv_counter =0; static int snd_counter =0;
59

    
60

    
61
/**
62
 * Look for a free slot in the received buffer and allocates it for immediate use
63
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
64
 */
65
static int next_R() {
66
        const int size = 1024;
67
        if (receivedBuffer[rIdx].data==NULL) {
68
                receivedBuffer[rIdx].data = malloc(size);
69
        }
70
        else {
71
                int count;
72
                for (count=0;count<NH_BUFFER_SIZE;count++) {
73
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
74
                        if (receivedBuffer[rIdx].data==NULL)
75
                                break;
76
                }
77
                if (count==NH_BUFFER_SIZE)
78
                        return -1;
79
                else {
80
                        receivedBuffer[rIdx].data = malloc(size);
81
                }
82
        }
83
        memset(receivedBuffer[rIdx].data,0,size);
84
        return rIdx;
85
}
86

    
87
/**
88
 * Look for a free slot in the sending buffer and allocates it for immediate use
89
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
90
 */
91
static int next_S() {
92
        const int size = 1024;
93
        if (sendingBuffer[sIdx]==NULL) {
94
                sendingBuffer[sIdx] = malloc(size);
95
        }
96
        else {
97
                int count;
98
                for (count=0;count<NH_BUFFER_SIZE;count++) {
99
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
100
                        if (sendingBuffer[sIdx]==NULL)
101
                                break;
102
                }
103
                if (count==NH_BUFFER_SIZE)
104
                        return -1;
105
                else {
106
                        sendingBuffer[sIdx] = malloc(size);
107
                }
108
        }
109
        return sIdx;
110
}
111

    
112

    
113
/**
114
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
115
 * @param local_socketID
116
 * @param errorstatus
117
 */
118
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
119
        switch (errorstatus) {
120
        case 0:
121
                //
122
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
123
        //        me->addrSize = SOCKETID_SIZE;
124
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
125
                me->connID = -1;
126
                me->refcnt = 1;
127
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
128
                break;
129
        case -1:
130
                //
131
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
132
                exit(1);
133
                break;
134
        case 1:
135
                //
136
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
137
                exit(1);
138
                break;
139
        case 2:
140
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
141
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
142
            mlSetStunServer(0,NULL);
143
            break;
144
        default :        // should never happen
145
                //
146
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
147
        }
148

    
149
}
150

    
151
/**
152
 * Timeout callback to be set in the eventlib loop as needed
153
 * @param socket
154
 * @param flag
155
 * @param arg
156
 */
157
static void t_out_cb (int socket, short flag, void* arg) {
158

    
159
        timeoutFired = 1;
160
//        fprintf(stderr,"TIMEOUT!!!\n");
161
//        event_base_loopbreak(base);
162
}
163

    
164
/**
165
 * Callback called by ml when a remote node ask for a connection
166
 * @param connectionID
167
 * @param arg
168
 */
169
static void receive_conn_cb(int connectionID, void *arg) {
170
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
171

    
172
}
173

    
174
void free_sending_buffer(int i)
175
{
176
        free(sendingBuffer[i]);
177
        sendingBuffer[i] = NULL;
178
}
179

    
180
/**
181
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
182
 * @param connectionID
183
 * @param arg
184
 */
185
static void connReady_cb (int connectionID, void *arg) {
186

    
187
        msgData_cb *p;
188
        p = (msgData_cb *)arg;
189
        if (p == NULL) return;
190
        if (p->cancelled) {
191
            free(p);
192
            return;
193
        }
194
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
195
/**/char mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
196
        if (mt!=MSG_TYPE_TOPOLOGY &&
197
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
198
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
199
                                snd_counter,mt+'0', p->mSize);}
200
        free_sending_buffer(p->bIdx);
201
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
202
        //        event_base_loopbreak(base);
203
        p->conn_cb_called = true;
204
}
205

    
206
/**
207
 * Callback called by ml when a connection error occurs
208
 * @param connectionID
209
 * @param arg
210
 */
211
static void connError_cb (int connectionID, void *arg) {
212
        // simply get rid of the msg in the buffer....
213
        msgData_cb *p;
214
        p = (msgData_cb *)arg;
215
        if (p != NULL) {
216
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
217
                if (p->cancelled) {
218
                        free(p);//p->mSize = -1;
219
                } else {
220
                        p->conn_cb_called = true;
221
                }
222
        }
223
        //        event_base_loopbreak(base);
224
}
225

    
226

    
227
/**
228
 * Callback to receive data from ml
229
 * @param buffer
230
 * @param buflen
231
 * @param msgtype
232
 * @param arg
233
 */
234
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
235
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
236
//        fprintf(stderr, "Net-helper : called back with some news...\n");
237
/**/ ++recv_counter;
238
        char str[SOCKETID_STRING_SIZE];
239
        if (arg->remote_socketID != NULL)
240
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
241
        else
242
                sprintf(str,"!Unknown!");
243
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
244
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
245
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
246
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
247
        }
248
        else {
249
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
250
                // buffering the received message only if possible, otherwise ignore it...
251
                int index = next_R();
252
                if (index >=0) {
253
                //        receivedBuffer[index][0] = malloc(buflen);
254
                        if (receivedBuffer[index].data == NULL) {
255
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
256
                                return;
257
                        }
258
                        // creating a new sender nodedID
259
                        receivedBuffer[index].id = malloc(sizeof(nodeID));
260
                        if (receivedBuffer[index].id==NULL) {
261
                                free (receivedBuffer[index].data);
262
                                receivedBuffer[index].data = NULL;
263
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
264
                                return;
265
                        }
266
                        else {
267
                                memset(receivedBuffer[index].id, 0, sizeof(struct nodeID));
268
                                nodeID *remote = receivedBuffer[index].id;
269
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
270
                                memset(receivedBuffer[index].data,0,buflen);
271
                                receivedBuffer[index].len = buflen;
272
                                //*(receivedBuffer[index][0]) = buflen;
273
                                memcpy(receivedBuffer[index].data,buffer,buflen);
274
                                  // get the socketID of the sender
275
                                remote->addr = malloc(SOCKETID_SIZE);
276
                                if (remote->addr == NULL) {
277
                                          free (remote);
278
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
279
                                          return;
280
                                }
281
                                else {
282
                                        memset(remote->addr, 0, SOCKETID_SIZE);
283
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
284
                                //        remote->addrSize = SOCKETID_SIZE;
285
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
286
                                        remote->connID = arg->connectionID;
287
                                        remote->refcnt = 1;
288
                                }
289
                        }
290
                }
291
  }
292
//        event_base_loopbreak(base);
293
}
294

    
295

    
296
struct nodeID *net_helper_init(const char *IPaddr, int port) {
297

    
298
        struct timeval tout = {1, 0};
299
        int s;
300
        base = event_base_new();
301

    
302
        me = malloc(sizeof(nodeID));
303
        if (me == NULL) {
304
                return NULL;
305
        }
306
        memset(me,0,sizeof(nodeID));
307
        me->addr = malloc(SOCKETID_SIZE);
308
        if (me->addr == NULL) {
309
                free(me);
310
                return NULL;
311
        }
312
        memset(me->addr,0,SOCKETID_SIZE);
313
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
314
        me->refcnt = 1;
315

    
316
        int i;
317
        for (i=0;i<NH_BUFFER_SIZE;i++) {
318
                sendingBuffer[i] = NULL;
319
                receivedBuffer[i].data = NULL;
320
        }
321

    
322
        mlRegisterErrorConnectionCb(&connError_cb);
323
        mlRegisterRecvConnectionCb(&receive_conn_cb);
324
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
325
        if (s < 0) {
326
                fprintf(stderr, "Net-helper : error initializing ML!\n");
327
                free(me);
328
                return NULL;
329
        }
330

    
331
        while (me->connID<-1) {
332
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
333
                event_base_loop(base,EVLOOP_ONCE);
334
        }
335
        timeoutFired = 0;
336
//        fprintf(stderr,"Net-helper init : back from init!\n");
337

    
338
        return me;
339
}
340

    
341

    
342
void bind_msg_type (unsigned char msgtype) {
343

    
344
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
345
}
346

    
347

    
348
void send_to_peer_cb(int fd, short event, void *arg)
349
{
350
        msgData_cb *p = (msgData_cb *) arg;
351
        if (p->conn_cb_called) {
352
                free(p);
353
        }
354
        else { //don't send it anymore
355
                free_sending_buffer(p->bIdx);
356
                p->cancelled = true;
357
                // don't free p, the other timeout will do it
358
        }
359
}
360

    
361
/**
362
 * Called by the application to send data to a remote peer
363
 * @param from
364
 * @param to
365
 * @param buffer_ptr
366
 * @param buffer_size
367
 * @return The dimension of the buffer or -1 if a connection error occurred.
368
 */
369
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
370
{
371
        // if buffer is full, discard the message and return an error flag
372
        int index = next_S();
373
        if (index<0) {
374
                // free(buffer_ptr);
375
                fprintf(stderr,"Net-helper: buffer full\n ");
376
                return -1;
377
        }
378
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
379
        memset(sendingBuffer[index],0,buffer_size);
380
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
381
        // free(buffer_ptr);
382
        msgData_cb *p = malloc(sizeof(msgData_cb));
383
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
384
        int current = p->bIdx;
385
        send_params params = {0,0,0,0};
386
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
387
        if (to->connID<0) {
388
                free_sending_buffer(current);
389
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
390
                free(p);
391
                return -1;
392
        }
393
        else {
394
                struct timeval timeout = {0, 500*1000};
395
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
396
                return buffer_size; //p->mSize;
397
        }
398

    
399
}
400

    
401

    
402
/**
403
 * Called by an application to receive data from remote peers
404
 * @param local
405
 * @param remote
406
 * @param buffer_ptr
407
 * @param buffer_size
408
 * @return The number of received bytes or -1 if some error occurred.
409
 */
410
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
411
{
412
        int size;
413
        if (receivedBuffer[rIdx].id==NULL) {        //block till first message arrives
414
                wait4data(local, NULL);
415
        }
416

    
417
        (*remote) = receivedBuffer[rIdx].id;
418
        // retrieve a msg from the buffer
419
        size = receivedBuffer[rIdx].len;
420
        if (size>buffer_size) {
421
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
422
                return -1;
423
        }
424
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
425
        free(receivedBuffer[rIdx].data);
426
        receivedBuffer[rIdx].data = NULL;
427
        receivedBuffer[rIdx].id = NULL;
428

    
429
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
430

    
431
        return size;
432
}
433

    
434

    
435
int wait4data(const struct nodeID *n, struct timeval *tout) {
436

    
437
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
438
        if (tout) {        //if tout==NULL, loop wait infinitely
439
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
440
        }
441
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
442
        //        event_base_dispatch(base);
443
                event_base_loop(base,EVLOOP_ONCE);
444
        }
445
        timeoutFired = 0;
446
//        fprintf(stderr,"Back from eventlib loop.\n");
447

    
448
        if (receivedBuffer[rIdx].data!=NULL)
449
                return 1;
450
        else
451
                return 0;
452
}
453

    
454
socketID_handle getRemoteSocketID(const char *ip, int port) {
455
        char str[SOCKETID_STRING_SIZE];
456
        socketID_handle h;
457

    
458
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
459
        h = malloc(SOCKETID_SIZE);
460
        mlStringToSocketID(str, h);
461

    
462
        return h;
463
}
464

    
465
struct nodeID *create_node(const char *rem_IP, int rem_port) {
466
        struct nodeID *remote = malloc(sizeof(nodeID));
467
        if (remote == NULL) {
468
                return NULL;
469
        }
470
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
471
//        if (remote->addr == NULL) {
472
//                free(remote);
473
//                return NULL;
474
//        }
475
//        remote->addrSize = SOCKETID_SIZE;
476
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
477
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
478
        send_params params = {0,0,0,0};
479
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
480
        remote->refcnt = 1;
481
        return remote;
482
}
483

    
484
// TODO: check why closing the connection is annoying for the ML
485
void nodeid_free(struct nodeID *n) {
486

    
487
//        mlCloseConnection(n->connID);
488
//        mlCloseSocket(n->addr);
489
//        free(n);
490
        if (n && (--(n->refcnt) == 0)) {
491
        //        mlCloseConnection(n->connID);
492
                mlCloseSocket(n->addr);
493
                free(n);
494
        }
495
}
496

    
497

    
498
const char *node_addr(const struct nodeID *s)
499
{
500
  static char addr[256];
501
  // TODO: mlSocketIDToString always return 0 !!!
502
  int r = mlSocketIDToString(s->addr,addr,256);
503
  if (!r)
504
          return addr;
505
  else
506
          return "";
507
}
508

    
509
struct nodeID *nodeid_dup(struct nodeID *s)
510
{
511
//  struct nodeID *res;
512
//
513
//  res = malloc(sizeof(struct nodeID));
514
//  if (res != NULL) {
515
//          res->addr = malloc(SOCKETID_SIZE);
516
//          if (res->addr != NULL) {
517
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
518
//        //         res->addrSize = SOCKETID_SIZE;
519
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
520
//                 res->connID = s->connID;
521
//          }
522
//          else {
523
//                free(res);
524
//                res = NULL;
525
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
526
//          }
527
//  }
528
//        return res;
529
        s->refcnt++;
530
        return s;
531
}
532

    
533
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
534
{
535
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
536
}
537

    
538
int nodeid_dump(uint8_t *b, const struct nodeID *s)
539
{
540
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
541
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
542

    
543
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
544
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
545

    
546
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
547
}
548

    
549
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
550
{
551
  struct nodeID *res;
552
  res = malloc(sizeof(struct nodeID));
553
  if (res != NULL) {
554
          memset(res,0,sizeof(struct nodeID));
555
          res->addr = malloc(SOCKETID_SIZE);
556
          if (res->addr != NULL) {
557
                  memset(res->addr,0,SOCKETID_SIZE);
558
                  //memcpy(res->addr, b, SOCKETID_SIZE);
559
                  //*len = SOCKETID_SIZE;
560
                  *len = strlen((char*)b) + 1;
561
                  mlStringToSocketID((char *)b,res->addr);
562
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
563
        //          res->addrSize = SOCKETID_SIZE;
564
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
565
                  res->connID = -1;
566
                  res->refcnt = 1;
567
          }
568
          else {
569
                  free(res);
570
                  res = NULL;
571
                  // TODO: what about *len in this case???
572
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
573
          }
574
  }
575

    
576

    
577
  return res;
578
}