Statistics
| Branch: | Revision:

grapes / som / net_helper-ml.c @ 3b4eedf4

History | View | Annotate | Download (15.1 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

    
19
#include "msg_types.h"/**/
20

    
21
/**
22
 * libevent pointer
23
 */
24
struct event_base *base;
25

    
26
#define NH_BUFFER_SIZE 1000
27

    
28
static int sIdx = 0;
29
static int rIdx = 0;
30

    
31
typedef struct nodeID {
32
        socketID_handle addr;
33
        int connID;        // connection associated to this node, -1 if myself
34
        int refcnt;
35
//        int addrSize;
36
//        int addrStringSize;
37
} nodeID;
38

    
39
typedef struct msgData_cb {
40
        int bIdx;        // index of the message in the proper buffer
41
        unsigned char msgType; // message type
42
        int mSize;        // message size
43
        bool conn_cb_called;
44
        bool cancelled;
45
} msgData_cb;
46

    
47
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
48
static int timeoutFired = 0;
49

    
50
// pointers to the msgs to be send
51
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
52
// pointers to the received msgs + sender nodeID
53
struct receivedB {
54
        struct nodeID *id;
55
        int len;
56
        uint8_t *data;
57
};
58
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
59
/**/ static int recv_counter =0; static int snd_counter =0;
60

    
61

    
62
/**
63
 * Look for a free slot in the received buffer and allocates it for immediate use
64
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
65
 */
66
static int next_R() {
67
        const int size = 1024;
68
        if (receivedBuffer[rIdx].data==NULL) {
69
                receivedBuffer[rIdx].data = malloc(size);
70
        }
71
        else {
72
                int count;
73
                for (count=0;count<NH_BUFFER_SIZE;count++) {
74
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
75
                        if (receivedBuffer[rIdx].data==NULL)
76
                                break;
77
                }
78
                if (count==NH_BUFFER_SIZE)
79
                        return -1;
80
                else {
81
                        receivedBuffer[rIdx].data = malloc(size);
82
                }
83
        }
84
        memset(receivedBuffer[rIdx].data,0,size);
85
        return rIdx;
86
}
87

    
88
/**
89
 * Look for a free slot in the sending buffer and allocates it for immediate use
90
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
91
 */
92
static int next_S() {
93
        const int size = 1024;
94
        if (sendingBuffer[sIdx]==NULL) {
95
                sendingBuffer[sIdx] = malloc(size);
96
        }
97
        else {
98
                int count;
99
                for (count=0;count<NH_BUFFER_SIZE;count++) {
100
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
101
                        if (sendingBuffer[sIdx]==NULL)
102
                                break;
103
                }
104
                if (count==NH_BUFFER_SIZE)
105
                        return -1;
106
                else {
107
                        sendingBuffer[sIdx] = malloc(size);
108
                }
109
        }
110
        return sIdx;
111
}
112

    
113

    
114
/**
115
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
116
 * @param local_socketID
117
 * @param errorstatus
118
 */
119
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
120
        switch (errorstatus) {
121
        case 0:
122
                //
123
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
124
        //        me->addrSize = SOCKETID_SIZE;
125
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
126
                me->connID = -1;
127
                me->refcnt = 1;
128
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
129
                break;
130
        case -1:
131
                //
132
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
133
                exit(1);
134
                break;
135
        case 1:
136
                //
137
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
138
                exit(1);
139
                break;
140
        case 2:
141
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
142
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
143
            mlSetStunServer(0,NULL);
144
            break;
145
        default :        // should never happen
146
                //
147
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
148
        }
149

    
150
}
151

    
152
/**
153
 * Timeout callback to be set in the eventlib loop as needed
154
 * @param socket
155
 * @param flag
156
 * @param arg
157
 */
158
static void t_out_cb (int socket, short flag, void* arg) {
159

    
160
        timeoutFired = 1;
161
//        fprintf(stderr,"TIMEOUT!!!\n");
162
//        event_base_loopbreak(base);
163
}
164

    
165
/**
166
 * Callback called by ml when a remote node ask for a connection
167
 * @param connectionID
168
 * @param arg
169
 */
170
static void receive_conn_cb(int connectionID, void *arg) {
171
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
172

    
173
}
174

    
175
void free_sending_buffer(int i)
176
{
177
        free(sendingBuffer[i]);
178
        sendingBuffer[i] = NULL;
179
}
180

    
181
/**
182
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
183
 * @param connectionID
184
 * @param arg
185
 */
186
static void connReady_cb (int connectionID, void *arg) {
187

    
188
        msgData_cb *p;
189
        p = (msgData_cb *)arg;
190
        if (p == NULL) return;
191
        if (p->cancelled) {
192
            free(p);
193
            return;
194
        }
195
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
196
/**/char mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
197
        if (mt!=MSG_TYPE_TOPOLOGY &&
198
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
199
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
200
                                snd_counter,mt+'0', p->mSize);}
201
        free_sending_buffer(p->bIdx);
202
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
203
        //        event_base_loopbreak(base);
204
        p->conn_cb_called = true;
205
}
206

    
207
/**
208
 * Callback called by ml when a connection error occurs
209
 * @param connectionID
210
 * @param arg
211
 */
212
static void connError_cb (int connectionID, void *arg) {
213
        // simply get rid of the msg in the buffer....
214
        msgData_cb *p;
215
        p = (msgData_cb *)arg;
216
        if (p != NULL) {
217
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
218
                if (p->cancelled) {
219
                        free(p);//p->mSize = -1;
220
                } else {
221
                        p->conn_cb_called = true;
222
                }
223
        }
224
        //        event_base_loopbreak(base);
225
}
226

    
227

    
228
/**
229
 * Callback to receive data from ml
230
 * @param buffer
231
 * @param buflen
232
 * @param msgtype
233
 * @param arg
234
 */
235
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
236
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
237
//        fprintf(stderr, "Net-helper : called back with some news...\n");
238
/**/ ++recv_counter;
239
        char str[SOCKETID_STRING_SIZE];
240
        if (arg->remote_socketID != NULL)
241
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
242
        else
243
                sprintf(str,"!Unknown!");
244
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
245
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
246
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
247
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
248
        }
249
        else {
250
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
251
                // buffering the received message only if possible, otherwise ignore it...
252
                int index = next_R();
253
                if (index >=0) {
254
                //        receivedBuffer[index][0] = malloc(buflen);
255
                        if (receivedBuffer[index].data == NULL) {
256
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
257
                                return;
258
                        }
259
                        // creating a new sender nodedID
260
                        receivedBuffer[index].id = malloc(sizeof(nodeID));
261
                        if (receivedBuffer[index].id==NULL) {
262
                                free (receivedBuffer[index].data);
263
                                receivedBuffer[index].data = NULL;
264
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
265
                                return;
266
                        }
267
                        else {
268
                                memset(receivedBuffer[index].id, 0, sizeof(struct nodeID));
269
                                nodeID *remote = receivedBuffer[index].id;
270
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
271
                                memset(receivedBuffer[index].data,0,buflen);
272
                                receivedBuffer[index].len = buflen;
273
                                //*(receivedBuffer[index][0]) = buflen;
274
                                memcpy(receivedBuffer[index].data,buffer,buflen);
275
                                  // get the socketID of the sender
276
                                remote->addr = malloc(SOCKETID_SIZE);
277
                                if (remote->addr == NULL) {
278
                                          free (remote);
279
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
280
                                          return;
281
                                }
282
                                else {
283
                                        memset(remote->addr, 0, SOCKETID_SIZE);
284
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
285
                                //        remote->addrSize = SOCKETID_SIZE;
286
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
287
                                        remote->connID = arg->connectionID;
288
                                        remote->refcnt = 1;
289
                                }
290
                        }
291
                }
292
  }
293
//        event_base_loopbreak(base);
294
}
295

    
296

    
297
struct nodeID *net_helper_init(const char *IPaddr, int port) {
298

    
299
        struct timeval tout = {1, 0};
300
        int s;
301
        base = event_base_new();
302

    
303
        me = malloc(sizeof(nodeID));
304
        if (me == NULL) {
305
                return NULL;
306
        }
307
        memset(me,0,sizeof(nodeID));
308
        me->addr = malloc(SOCKETID_SIZE);
309
        if (me->addr == NULL) {
310
                free(me);
311
                return NULL;
312
        }
313
        memset(me->addr,0,SOCKETID_SIZE);
314
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
315
        me->refcnt = 1;
316

    
317
        int i;
318
        for (i=0;i<NH_BUFFER_SIZE;i++) {
319
                sendingBuffer[i] = NULL;
320
                receivedBuffer[i].data = NULL;
321
        }
322

    
323
        mlRegisterErrorConnectionCb(&connError_cb);
324
        mlRegisterRecvConnectionCb(&receive_conn_cb);
325
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
326
        if (s < 0) {
327
                fprintf(stderr, "Net-helper : error initializing ML!\n");
328
                free(me);
329
                return NULL;
330
        }
331

    
332
        while (me->connID<-1) {
333
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
334
                event_base_loop(base,EVLOOP_ONCE);
335
        }
336
        timeoutFired = 0;
337
//        fprintf(stderr,"Net-helper init : back from init!\n");
338

    
339
        return me;
340
}
341

    
342

    
343
void bind_msg_type (unsigned char msgtype) {
344

    
345
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
346
}
347

    
348

    
349
void send_to_peer_cb(int fd, short event, void *arg)
350
{
351
        msgData_cb *p = (msgData_cb *) arg;
352
        if (p->conn_cb_called) {
353
                free(p);
354
        }
355
        else { //don't send it anymore
356
                free_sending_buffer(p->bIdx);
357
                p->cancelled = true;
358
                // don't free p, the other timeout will do it
359
        }
360
}
361

    
362
/**
363
 * Called by the application to send data to a remote peer
364
 * @param from
365
 * @param to
366
 * @param buffer_ptr
367
 * @param buffer_size
368
 * @return The dimension of the buffer or -1 if a connection error occurred.
369
 */
370
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
371
{
372
        // if buffer is full, discard the message and return an error flag
373
        int index = next_S();
374
        if (index<0) {
375
                // free(buffer_ptr);
376
                fprintf(stderr,"Net-helper: buffer full\n ");
377
                return -1;
378
        }
379
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
380
        memset(sendingBuffer[index],0,buffer_size);
381
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
382
        // free(buffer_ptr);
383
        msgData_cb *p = malloc(sizeof(msgData_cb));
384
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
385
        int current = p->bIdx;
386
        send_params params = {0,0,0,0};
387
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
388
        if (to->connID<0) {
389
                free_sending_buffer(current);
390
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
391
                free(p);
392
                return -1;
393
        }
394
        else {
395
                struct timeval timeout = {0, 500*1000};
396
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
397
                return buffer_size; //p->mSize;
398
        }
399

    
400
}
401

    
402

    
403
/**
404
 * Called by an application to receive data from remote peers
405
 * @param local
406
 * @param remote
407
 * @param buffer_ptr
408
 * @param buffer_size
409
 * @return The number of received bytes or -1 if some error occurred.
410
 */
411
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
412
{
413
        int size;
414
        if (receivedBuffer[rIdx].id==NULL) {        //block till first message arrives
415
                wait4data(local, NULL);
416
        }
417

    
418
        (*remote) = receivedBuffer[rIdx].id;
419
        // retrieve a msg from the buffer
420
        size = receivedBuffer[rIdx].len;
421
        if (size>buffer_size) {
422
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
423
                return -1;
424
        }
425
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
426
        free(receivedBuffer[rIdx].data);
427
        receivedBuffer[rIdx].data = NULL;
428
        receivedBuffer[rIdx].id = NULL;
429

    
430
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
431

    
432
        return size;
433
}
434

    
435

    
436
int wait4data(const struct nodeID *n, struct timeval *tout) {
437

    
438
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
439
        if (tout) {        //if tout==NULL, loop wait infinitely
440
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
441
        }
442
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
443
        //        event_base_dispatch(base);
444
                event_base_loop(base,EVLOOP_ONCE);
445
        }
446
        timeoutFired = 0;
447
//        fprintf(stderr,"Back from eventlib loop.\n");
448

    
449
        if (receivedBuffer[rIdx].data!=NULL)
450
                return 1;
451
        else
452
                return 0;
453
}
454

    
455

    
456

    
457

    
458
struct nodeID *create_node(const char *rem_IP, int rem_port) {
459
        struct nodeID *remote = malloc(sizeof(nodeID));
460
        if (remote == NULL) {
461
                return NULL;
462
        }
463
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
464
//        if (remote->addr == NULL) {
465
//                free(remote);
466
//                return NULL;
467
//        }
468
//        remote->addrSize = SOCKETID_SIZE;
469
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
470
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
471
        send_params params = {0,0,0,0};
472
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
473
        remote->refcnt = 1;
474
        return remote;
475
}
476

    
477
// TODO: check why closing the connection is annoying for the ML
478
void nodeid_free(struct nodeID *n) {
479

    
480
//        mlCloseConnection(n->connID);
481
//        mlCloseSocket(n->addr);
482
//        free(n);
483
        if (n && (--(n->refcnt) == 0)) {
484
        //        mlCloseConnection(n->connID);
485
                mlCloseSocket(n->addr);
486
                free(n);
487
        }
488
}
489

    
490

    
491
const char *node_addr(const struct nodeID *s)
492
{
493
  static char addr[256];
494
  // TODO: mlSocketIDToString always return 0 !!!
495
  int r = mlSocketIDToString(s->addr,addr,256);
496
  if (!r)
497
          return addr;
498
  else
499
          return "";
500
}
501

    
502
struct nodeID *nodeid_dup(struct nodeID *s)
503
{
504
//  struct nodeID *res;
505
//
506
//  res = malloc(sizeof(struct nodeID));
507
//  if (res != NULL) {
508
//          res->addr = malloc(SOCKETID_SIZE);
509
//          if (res->addr != NULL) {
510
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
511
//        //         res->addrSize = SOCKETID_SIZE;
512
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
513
//                 res->connID = s->connID;
514
//          }
515
//          else {
516
//                free(res);
517
//                res = NULL;
518
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
519
//          }
520
//  }
521
//        return res;
522
        s->refcnt++;
523
        return s;
524
}
525

    
526
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
527
{
528
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
529
}
530

    
531
int nodeid_dump(uint8_t *b, const struct nodeID *s)
532
{
533
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
534
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
535

    
536
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
537
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
538

    
539
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
540
}
541

    
542
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
543
{
544
  struct nodeID *res;
545
  res = malloc(sizeof(struct nodeID));
546
  if (res != NULL) {
547
          memset(res,0,sizeof(struct nodeID));
548
          res->addr = malloc(SOCKETID_SIZE);
549
          if (res->addr != NULL) {
550
                  memset(res->addr,0,SOCKETID_SIZE);
551
                  //memcpy(res->addr, b, SOCKETID_SIZE);
552
                  //*len = SOCKETID_SIZE;
553
                  *len = strlen((char*)b) + 1;
554
                  mlStringToSocketID((char *)b,res->addr);
555
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
556
        //          res->addrSize = SOCKETID_SIZE;
557
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
558
                  res->connID = -1;
559
                  res->refcnt = 1;
560
          }
561
          else {
562
                  free(res);
563
                  res = NULL;
564
                  // TODO: what about *len in this case???
565
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
566
          }
567
  }
568

    
569

    
570
  return res;
571
}