Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ df781cb0

History | View | Annotate | Download (14.4 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

    
19
#include "msg_types.h"/**/
20

    
21
/**
22
 * libevent pointer
23
 */
24
struct event_base *base;
25

    
26
#define NH_BUFFER_SIZE 100
27

    
28
static int sIdx = 0;
29
static int rIdx = 0;
30

    
31
typedef struct nodeID {
32
        socketID_handle addr;
33
        int connID;        // connection associated to this node, -1 if myself
34
        int refcnt;
35
//        int addrSize;
36
//        int addrStringSize;
37
} nodeID;
38

    
39
typedef struct msgData_cb {
40
        int bIdx;        // index of the message in the proper buffer
41
        unsigned char msgType; // message type
42
        int mSize;        // message size
43
} msgData_cb;
44

    
45
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
46
static int timeoutFired = 0;
47

    
48
// pointers to the msgs to be send
49
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
50
// pointers to the received msgs + sender nodeID
51
struct receivedB {
52
        struct nodeID *id;
53
        int len;
54
        uint8_t *data;
55
};
56
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
57
/**/ static int recv_counter =0; static int snd_counter =0;
58

    
59

    
60
/**
61
 * Look for a free slot in the received buffer and allocates it for immediate use
62
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
63
 */
64
static int next_R() {
65
        const int size = 1024;
66
        if (receivedBuffer[rIdx].data==NULL) {
67
                receivedBuffer[rIdx].data = malloc(size);
68
        }
69
        else {
70
                int count;
71
                for (count=0;count<NH_BUFFER_SIZE;count++) {
72
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
73
                        if (receivedBuffer[rIdx].data==NULL)
74
                                break;
75
                }
76
                if (count==NH_BUFFER_SIZE)
77
                        return -1;
78
                else {
79
                        receivedBuffer[rIdx].data = malloc(size);
80
                }
81
        }
82
        memset(receivedBuffer[rIdx].data,0,size);
83
        return rIdx;
84
}
85

    
86
/**
87
 * Look for a free slot in the sending buffer and allocates it for immediate use
88
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
89
 */
90
static int next_S() {
91
        const int size = 1024;
92
        if (sendingBuffer[sIdx]==NULL) {
93
                sendingBuffer[sIdx] = malloc(size);
94
        }
95
        else {
96
                int count;
97
                for (count=0;count<NH_BUFFER_SIZE;count++) {
98
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
99
                        if (sendingBuffer[sIdx]==NULL)
100
                                break;
101
                }
102
                if (count==NH_BUFFER_SIZE)
103
                        return -1;
104
                else {
105
                        sendingBuffer[sIdx] = malloc(size);
106
                }
107
        }
108
        return sIdx;
109
}
110

    
111

    
112
/**
113
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
114
 * @param local_socketID
115
 * @param errorstatus
116
 */
117
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
118
        switch (errorstatus) {
119
        case 0:
120
                //
121
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
122
        //        me->addrSize = SOCKETID_SIZE;
123
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
124
                me->connID = -1;
125
                me->refcnt = 1;
126
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
127
                break;
128
        case -1:
129
                //
130
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
131
                exit(1);
132
                break;
133
        case 1:
134
                //
135
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
136
                exit(1);
137
                break;
138
        case 2:
139
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
140
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
141
            mlSetStunServer(0,NULL);
142
            break;
143
        default :        // should never happen
144
                //
145
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
146
        }
147

    
148
}
149

    
150
/**
151
 * Timeout callback to be set in the eventlib loop as needed
152
 * @param socket
153
 * @param flag
154
 * @param arg
155
 */
156
static void t_out_cb (int socket, short flag, void* arg) {
157

    
158
        timeoutFired = 1;
159
//        fprintf(stderr,"TIMEOUT!!!\n");
160
//        event_base_loopbreak(base);
161
}
162

    
163
/**
164
 * Callback called by ml when a remote node ask for a connection
165
 * @param connectionID
166
 * @param arg
167
 */
168
static void receive_conn_cb(int connectionID, void *arg) {
169
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
170

    
171
}
172

    
173
/**
174
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
175
 * @param connectionID
176
 * @param arg
177
 */
178
static void connReady_cb (int connectionID, void *arg) {
179

    
180
        msgData_cb *p;
181
        p = (msgData_cb *)arg;
182
        if (p == NULL) return;
183
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
184
/**/char mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
185
        if (mt!=MSG_TYPE_TOPOLOGY &&
186
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
187
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
188
                                snd_counter,mt+'0', p->mSize);}
189
        free(sendingBuffer[p->bIdx]);
190
        sendingBuffer[p->bIdx] = NULL;
191
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
192
        //        event_base_loopbreak(base);
193
        free(p);
194
}
195

    
196
/**
197
 * Callback called by ml when a connection error occurs
198
 * @param connectionID
199
 * @param arg
200
 */
201
static void connError_cb (int connectionID, void *arg) {
202
        // simply get rid of the msg in the buffer....
203
        msgData_cb *p;
204
        p = (msgData_cb *)arg;
205
        if (p != NULL) {
206
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
207
                free(sendingBuffer[p->bIdx]);
208
                sendingBuffer[p->bIdx] = NULL;
209
                free(p);//p->mSize = -1;
210
        }
211
        //        event_base_loopbreak(base);
212
}
213

    
214

    
215
/**
216
 * Callback to receive data from ml
217
 * @param buffer
218
 * @param buflen
219
 * @param msgtype
220
 * @param arg
221
 */
222
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
223
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
224
//        fprintf(stderr, "Net-helper : called back with some news...\n");
225
/**/ ++recv_counter;
226
        char str[SOCKETID_STRING_SIZE];
227
        if (arg->remote_socketID != NULL)
228
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
229
        else
230
                sprintf(str,"!Unknown!");
231
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
232
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
233
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
234
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?" => Missing first!":"");
235
        }
236
        else {
237
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
238
                // buffering the received message only if possible, otherwise ignore it...
239
                int index = next_R();
240
                if (index >=0) {
241
                //        receivedBuffer[index][0] = malloc(buflen);
242
                        if (receivedBuffer[index].data == NULL) {
243
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
244
                                return;
245
                        }
246
                        // creating a new sender nodedID
247
                        receivedBuffer[index].id = malloc(sizeof(nodeID));
248
                        if (receivedBuffer[index].id==NULL) {
249
                                free (receivedBuffer[index].data);
250
                                receivedBuffer[index].data = NULL;
251
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
252
                                return;
253
                        }
254
                        else {
255
                                size_t lenlen = sizeof(int);
256
                                memset(receivedBuffer[index].id, 0, sizeof(struct nodeID));
257
                                nodeID *remote; remote = receivedBuffer[index].id;
258
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
259
                                memset(receivedBuffer[index].data,0,buflen);
260
                                receivedBuffer[index].len = buflen;
261
                                //*(receivedBuffer[index][0]) = buflen;
262
                                memcpy(receivedBuffer[index].data,buffer,buflen);
263
                                  // get the socketID of the sender
264
                                remote->addr = malloc(SOCKETID_SIZE);
265
                                if (remote->addr == NULL) {
266
                                          free (remote);
267
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
268
                                          return;
269
                                }
270
                                else {
271
                                        memset(remote->addr, 0, SOCKETID_SIZE);
272
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
273
                                //        remote->addrSize = SOCKETID_SIZE;
274
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
275
                                        remote->connID = arg->connectionID;
276
                                        remote->refcnt = 1;
277
                                }
278
                        }
279
                }
280
  }
281
//        event_base_loopbreak(base);
282
}
283

    
284

    
285
struct nodeID *net_helper_init(const char *IPaddr, int port) {
286

    
287
        struct timeval tout = {1, 0};
288
        base = event_base_new();
289

    
290
        me = malloc(sizeof(nodeID));
291
        if (me == NULL) {
292
                return NULL;
293
        }
294
        memset(me,0,sizeof(nodeID));
295
        me->addr = malloc(SOCKETID_SIZE);
296
        if (me->addr == NULL) {
297
                free(me);
298
                return NULL;
299
        }
300
        memset(me->addr,0,SOCKETID_SIZE);
301
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
302
        me->refcnt = 1;
303

    
304
        int i;
305
        for (i=0;i<NH_BUFFER_SIZE;i++) {
306
                sendingBuffer[i] = NULL;
307
                receivedBuffer[i].data = NULL;
308
        }
309

    
310
        mlRegisterErrorConnectionCb(&connError_cb);
311
        mlRegisterRecvConnectionCb(&receive_conn_cb);
312
        mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
313
        while (me->connID<-1) {
314
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
315
                event_base_loop(base,EVLOOP_ONCE);
316
        }
317
        timeoutFired = 0;
318
//        fprintf(stderr,"Net-helper init : back from init!\n");
319

    
320
        return me;
321
}
322

    
323

    
324
void bind_msg_type (unsigned char msgtype) {
325

    
326
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
327
}
328

    
329

    
330
/**
331
 * Called by the application to send data to a remote peer
332
 * @param from
333
 * @param to
334
 * @param buffer_ptr
335
 * @param buffer_size
336
 * @return The dimension of the buffer or -1 if a connection error occurred.
337
 */
338
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
339
{
340
        // if buffer is full, discard the message and return an error flag
341
        int index = next_S();
342
        if (index<0) {
343
                // free(buffer_ptr);
344
                return -1;
345
        }
346
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
347
        memset(sendingBuffer[index],0,buffer_size);
348
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
349
        // free(buffer_ptr);
350
        msgData_cb *p = malloc(sizeof(msgData_cb));
351
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
352
        int current = p->bIdx;
353
        send_params params = {0,0,0,0};
354
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
355
        if (to->connID<0) {
356
                free(sendingBuffer[current]);
357
                sendingBuffer[current] = NULL;
358
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
359
                free(p);
360
                return -1;
361
        }
362
        else {
363
                return buffer_size; //p->mSize;
364
        }
365

    
366
}
367

    
368

    
369
/**
370
 * Called by an application to receive data from remote peers
371
 * @param local
372
 * @param remote
373
 * @param buffer_ptr
374
 * @param buffer_size
375
 * @return The number of received bytes or -1 if some error occurred.
376
 */
377
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
378
{
379
        int size;
380
        // this should never happen... if it does, index handling is faulty...
381
        if (receivedBuffer[rIdx].id==NULL) {
382
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
383
                return -1;
384
        }
385

    
386
        (*remote) = receivedBuffer[rIdx].id;
387
        // retrieve a msg from the buffer
388
        size = receivedBuffer[rIdx].len;
389
        if (size>buffer_size) {
390
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
391
                return -1;
392
        }
393
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
394
        free(receivedBuffer[rIdx].data);
395
        receivedBuffer[rIdx].data = NULL;
396
        receivedBuffer[rIdx].id = NULL;
397

    
398
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
399

    
400
        return size;
401
}
402

    
403

    
404
int wait4data(const struct nodeID *n, struct timeval *tout) {
405

    
406
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
407
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
408
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
409
        //        event_base_dispatch(base);
410
                event_base_loop(base,EVLOOP_ONCE);
411
        }
412
        timeoutFired = 0;
413
//        fprintf(stderr,"Back from eventlib loop.\n");
414

    
415
        if (receivedBuffer[rIdx].data!=NULL)
416
                return 1;
417
        else
418
                return 0;
419
}
420

    
421

    
422

    
423

    
424
struct nodeID *create_node(const char *rem_IP, int rem_port) {
425
        struct nodeID *remote = malloc(sizeof(nodeID));
426
        if (remote == NULL) {
427
                return NULL;
428
        }
429
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
430
//        if (remote->addr == NULL) {
431
//                free(remote);
432
//                return NULL;
433
//        }
434
//        remote->addrSize = SOCKETID_SIZE;
435
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
436
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
437
        send_params params = {0,0,0,0};
438
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
439
        remote->refcnt = 1;
440
        return remote;
441
}
442

    
443
// TODO: check why closing the connection is annoying for the ML
444
void nodeid_free(struct nodeID *n) {
445

    
446
//        mlCloseConnection(n->connID);
447
//        mlCloseSocket(n->addr);
448
//        free(n);
449
        if (n && (--(n->refcnt) == 0)) {
450
        //        mlCloseConnection(n->connID);
451
                mlCloseSocket(n->addr);
452
                free(n);
453
        }
454
}
455

    
456

    
457
const char *node_addr(const struct nodeID *s)
458
{
459
  static char addr[256];
460
  // TODO: mlSocketIDToString always return 0 !!!
461
  int r = mlSocketIDToString(s->addr,addr,256);
462
  if (!r)
463
          return addr;
464
  else
465
          return "";
466
}
467

    
468
struct nodeID *nodeid_dup(struct nodeID *s)
469
{
470
//  struct nodeID *res;
471
//
472
//  res = malloc(sizeof(struct nodeID));
473
//  if (res != NULL) {
474
//          res->addr = malloc(SOCKETID_SIZE);
475
//          if (res->addr != NULL) {
476
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
477
//        //         res->addrSize = SOCKETID_SIZE;
478
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
479
//                 res->connID = s->connID;
480
//          }
481
//          else {
482
//                free(res);
483
//                res = NULL;
484
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
485
//          }
486
//  }
487
//        return res;
488
        s->refcnt++;
489
        return s;
490
}
491

    
492
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
493
{
494
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
495
}
496

    
497
int nodeid_dump(uint8_t *b, const struct nodeID *s)
498
{
499
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
500
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
501

    
502
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
503
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
504

    
505
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
506
}
507

    
508
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
509
{
510
  struct nodeID *res;
511
  res = malloc(sizeof(struct nodeID));
512
  if (res != NULL) {
513
          memset(res,0,sizeof(struct nodeID));
514
          res->addr = malloc(SOCKETID_SIZE);
515
          if (res->addr != NULL) {
516
                  memset(res->addr,0,SOCKETID_SIZE);
517
                  //memcpy(res->addr, b, SOCKETID_SIZE);
518
                  //*len = SOCKETID_SIZE;
519
                  *len = strlen((char*)b) + 1;
520
                  mlStringToSocketID((char *)b,res->addr);
521
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
522
        //          res->addrSize = SOCKETID_SIZE;
523
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
524
                  res->connID = -1;
525
                  res->refcnt = 1;
526
          }
527
          else {
528
                  free(res);
529
                  res = NULL;
530
                  // TODO: what about *len in this case???
531
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
532
          }
533
  }
534

    
535

    
536
  return res;
537
}