Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ ba1166ab

History | View | Annotate | Download (14.2 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

    
19
/**
20
 * libevent pointer
21
 */
22
struct event_base *base;
23

    
24
#define NH_BUFFER_SIZE 100
25

    
26
static int sIdx = 0;
27
static int rIdx = 0;
28

    
29
typedef struct nodeID {
30
        socketID_handle addr;
31
        int connID;        // connection associated to this node, -1 if myself
32
        int refcnt;
33
//        int addrSize;
34
//        int addrStringSize;
35
} nodeID;
36

    
37
typedef struct msgData_cb {
38
        int bIdx;        // index of the message in the proper buffer
39
        unsigned char msgType; // message type
40
        int mSize;        // message size
41
} msgData_cb;
42

    
43
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
44
static int timeoutFired = 0;
45

    
46
// pointers to the msgs to be send
47
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
48
// pointers to the received msgs + sender nodeID
49
struct receivedB {
50
        struct nodeID *id;
51
        int len;
52
        uint8_t *data;
53
};
54
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
55
/**/ static int recv_counter =0;
56

    
57
/**
58
 * Look for a free slot in the received buffer and allocates it for immediate use
59
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
60
 */
61
static int next_R() {
62
        const int size = 1024;
63
        if (receivedBuffer[rIdx].data==NULL) {
64
                receivedBuffer[rIdx].data = malloc(size);
65
        }
66
        else {
67
                int count;
68
                for (count=0;count<NH_BUFFER_SIZE;count++) {
69
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
70
                        if (receivedBuffer[rIdx].data==NULL)
71
                                break;
72
                }
73
                if (count==NH_BUFFER_SIZE)
74
                        return -1;
75
                else {
76
                        receivedBuffer[rIdx].data = malloc(size);
77
                }
78
        }
79
        memset(receivedBuffer[rIdx].data,0,size);
80
        return rIdx;
81
}
82

    
83
/**
84
 * Look for a free slot in the sending buffer and allocates it for immediate use
85
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
86
 */
87
static int next_S() {
88
        const int size = 1024;
89
        if (sendingBuffer[sIdx]==NULL) {
90
                sendingBuffer[sIdx] = malloc(size);
91
        }
92
        else {
93
                int count;
94
                for (count=0;count<NH_BUFFER_SIZE;count++) {
95
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
96
                        if (sendingBuffer[sIdx]==NULL)
97
                                break;
98
                }
99
                if (count==NH_BUFFER_SIZE)
100
                        return -1;
101
                else {
102
                        sendingBuffer[sIdx] = malloc(size);
103
                }
104
        }
105
        return sIdx;
106
}
107

    
108

    
109
/**
110
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
111
 * @param local_socketID
112
 * @param errorstatus
113
 */
114
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
115
        switch (errorstatus) {
116
        case 0:
117
                //
118
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
119
        //        me->addrSize = SOCKETID_SIZE;
120
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
121
                me->connID = -1;
122
                me->refcnt = 1;
123
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
124
                break;
125
        case -1:
126
                //
127
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
128
                exit(1);
129
                break;
130
        case 1:
131
                //
132
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
133
                exit(1);
134
                break;
135
        case 2:
136
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
137
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
138
            mlSetStunServer(0,NULL);
139
            break;
140
        default :        // should never happen
141
                //
142
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
143
        }
144

    
145
}
146

    
147
/**
148
 * Timeout callback to be set in the eventlib loop as needed
149
 * @param socket
150
 * @param flag
151
 * @param arg
152
 */
153
static void t_out_cb (int socket, short flag, void* arg) {
154

    
155
        timeoutFired = 1;
156
//        fprintf(stderr,"TIMEOUT!!!\n");
157
//        event_base_loopbreak(base);
158
}
159

    
160
/**
161
 * Callback called by ml when a remote node ask for a connection
162
 * @param connectionID
163
 * @param arg
164
 */
165
static void receive_conn_cb(int connectionID, void *arg) {
166
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
167

    
168
}
169

    
170
/**
171
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
172
 * @param connectionID
173
 * @param arg
174
 */
175
static void connReady_cb (int connectionID, void *arg) {
176

    
177
        msgData_cb *p;
178
        p = (msgData_cb *)arg;
179
        if (p == NULL) return;
180
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
181
///**/fprintf(stderr,"Sent message of type # %c and size %d\n",
182
//                ((char*)sendingBuffer[p->bIdx])[0]+'0', p->mSize);
183
        free(sendingBuffer[p->bIdx]);
184
        sendingBuffer[p->bIdx] = NULL;
185
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
186
        //        event_base_loopbreak(base);
187
        free(p);
188
}
189

    
190
/**
191
 * Callback called by ml when a connection error occurs
192
 * @param connectionID
193
 * @param arg
194
 */
195
static void connError_cb (int connectionID, void *arg) {
196
        // simply get rid of the msg in the buffer....
197
        msgData_cb *p;
198
        p = (msgData_cb *)arg;
199
        if (p != NULL) {
200
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
201
                free(sendingBuffer[p->bIdx]);
202
                sendingBuffer[p->bIdx] = NULL;
203
                p->mSize = -1;
204
        }
205
        //        event_base_loopbreak(base);
206
}
207

    
208

    
209
/**
210
 * Callback to receive data from ml
211
 * @param buffer
212
 * @param buflen
213
 * @param msgtype
214
 * @param arg
215
 */
216
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
217
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
218
//        fprintf(stderr, "Net-helper : called back with some news...\n");
219
/**/ ++recv_counter;
220
        char str[SOCKETID_STRING_SIZE];
221
        if (arg->remote_socketID != NULL)
222
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
223
        else
224
                sprintf(str,"!Unknown!");
225
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
226
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
227
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
228
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?" => Missing first!":"");
229
        }
230
        else {
231
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
232
                // buffering the received message only if possible, otherwise ignore it...
233
                int index = next_R();
234
                if (index >=0) {
235
                //        receivedBuffer[index][0] = malloc(buflen);
236
                        if (receivedBuffer[index].data == NULL) {
237
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
238
                                return;
239
                        }
240
                        // creating a new sender nodedID
241
                        receivedBuffer[index].id = malloc(sizeof(nodeID));
242
                        if (receivedBuffer[index].id==NULL) {
243
                                free (receivedBuffer[index].data);
244
                                receivedBuffer[index].data = NULL;
245
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
246
                                return;
247
                        }
248
                        else {
249
                                size_t lenlen = sizeof(int);
250
                                memset(receivedBuffer[index].id, 0, sizeof(struct nodeID));
251
                                nodeID *remote; remote = receivedBuffer[index].id;
252
                                receivedBuffer[index].data = realloc(receivedBuffer[index].data,buflen);
253
                                memset(receivedBuffer[index].data,0,buflen);
254
                                receivedBuffer[index].len = buflen;
255
                                //*(receivedBuffer[index][0]) = buflen;
256
                                memcpy(receivedBuffer[index].data,buffer,buflen);
257
                                  // get the socketID of the sender
258
                                remote->addr = malloc(SOCKETID_SIZE);
259
                                if (remote->addr == NULL) {
260
                                          free (remote);
261
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
262
                                          return;
263
                                }
264
                                else {
265
                                        memset(remote->addr, 0, SOCKETID_SIZE);
266
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
267
                                //        remote->addrSize = SOCKETID_SIZE;
268
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
269
                                        remote->connID = arg->connectionID;
270
                                        remote->refcnt = 1;
271
                                }
272
                        }
273
                }
274
  }
275
//        event_base_loopbreak(base);
276
}
277

    
278

    
279
struct nodeID *net_helper_init(const char *IPaddr, int port) {
280

    
281
        struct timeval tout = {1, 0};
282
        base = event_base_new();
283

    
284
        me = malloc(sizeof(nodeID));
285
        if (me == NULL) {
286
                return NULL;
287
        }
288
        memset(me,0,sizeof(nodeID));
289
        me->addr = malloc(SOCKETID_SIZE);
290
        if (me->addr == NULL) {
291
                free(me);
292
                return NULL;
293
        }
294
        memset(me->addr,0,SOCKETID_SIZE);
295
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
296
        me->refcnt = 1;
297

    
298
        int i;
299
        for (i=0;i<NH_BUFFER_SIZE;i++) {
300
                sendingBuffer[i] = NULL;
301
                receivedBuffer[i].data = NULL;
302
        }
303

    
304
        mlRegisterErrorConnectionCb(&connError_cb);
305
        mlRegisterRecvConnectionCb(&receive_conn_cb);
306
        mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
307
        while (me->connID<-1) {
308
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
309
                event_base_loop(base,EVLOOP_ONCE);
310
        }
311
        timeoutFired = 0;
312
//        fprintf(stderr,"Net-helper init : back from init!\n");
313

    
314
        return me;
315
}
316

    
317

    
318
void bind_msg_type (unsigned char msgtype) {
319

    
320
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
321
}
322

    
323

    
324
/**
325
 * Called by the application to send data to a remote peer
326
 * @param from
327
 * @param to
328
 * @param buffer_ptr
329
 * @param buffer_size
330
 * @return The dimension of the buffer or -1 if a connection error occurred.
331
 */
332
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
333
{
334
        // if buffer is full, discard the message and return an error flag
335
        int index = next_S();
336
        if (index<0) {
337
                // free(buffer_ptr);
338
                return -1;
339
        }
340
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
341
        memset(sendingBuffer[index],0,buffer_size);
342
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
343
        // free(buffer_ptr);
344
        msgData_cb *p = malloc(sizeof(msgData_cb));
345
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
346
        int current = p->bIdx;
347
        send_params params = {0,0,0,0};
348
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
349
        if (to->connID<0) {
350
                free(sendingBuffer[current]);
351
                sendingBuffer[current] = NULL;
352
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
353
                free(p);
354
                return -1;
355
        }
356
        else {
357
                return buffer_size; //p->mSize;
358
        }
359

    
360
}
361

    
362

    
363
/**
364
 * Called by an application to receive data from remote peers
365
 * @param local
366
 * @param remote
367
 * @param buffer_ptr
368
 * @param buffer_size
369
 * @return The number of received bytes or -1 if some error occurred.
370
 */
371
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
372
{
373
        int size;
374
        // this should never happen... if it does, index handling is faulty...
375
        if (receivedBuffer[rIdx].id==NULL) {
376
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
377
                return -1;
378
        }
379

    
380
        (*remote) = receivedBuffer[rIdx].id;
381
        // retrieve a msg from the buffer
382
        size = receivedBuffer[rIdx].len;
383
        if (size>buffer_size) {
384
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
385
                return -1;
386
        }
387
        memcpy(buffer_ptr, receivedBuffer[rIdx].data, size);
388
        free(receivedBuffer[rIdx].data);
389
        receivedBuffer[rIdx].data = NULL;
390
        receivedBuffer[rIdx].id = NULL;
391

    
392
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
393

    
394
        return size;
395
}
396

    
397

    
398
int wait4data(const struct nodeID *n, struct timeval *tout) {
399

    
400
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
401
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
402
        while(receivedBuffer[rIdx].data==NULL && timeoutFired==0) {
403
        //        event_base_dispatch(base);
404
                event_base_loop(base,EVLOOP_ONCE);
405
        }
406
        timeoutFired = 0;
407
//        fprintf(stderr,"Back from eventlib loop.\n");
408

    
409
        if (receivedBuffer[rIdx].data!=NULL)
410
                return 1;
411
        else
412
                return 0;
413
}
414

    
415

    
416

    
417

    
418
struct nodeID *create_node(const char *rem_IP, int rem_port) {
419
        struct nodeID *remote = malloc(sizeof(nodeID));
420
        if (remote == NULL) {
421
                return NULL;
422
        }
423
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
424
//        if (remote->addr == NULL) {
425
//                free(remote);
426
//                return NULL;
427
//        }
428
//        remote->addrSize = SOCKETID_SIZE;
429
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
430
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
431
        send_params params = {0,0,0,0};
432
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
433
        remote->refcnt = 1;
434
        return remote;
435
}
436

    
437
// TODO: check why closing the connection is annoying for the ML
438
void nodeid_free(struct nodeID *n) {
439

    
440
//        mlCloseConnection(n->connID);
441
//        mlCloseSocket(n->addr);
442
//        free(n);
443
        if (n && (--(n->refcnt) == 0)) {
444
        //        mlCloseConnection(n->connID);
445
                mlCloseSocket(n->addr);
446
                free(n);
447
        }
448
}
449

    
450

    
451
const char *node_addr(const struct nodeID *s)
452
{
453
  static char addr[256];
454
  // TODO: mlSocketIDToString always return 0 !!!
455
  int r = mlSocketIDToString(s->addr,addr,256);
456
  if (!r)
457
          return addr;
458
  else
459
          return "";
460
}
461

    
462
struct nodeID *nodeid_dup(struct nodeID *s)
463
{
464
//  struct nodeID *res;
465
//
466
//  res = malloc(sizeof(struct nodeID));
467
//  if (res != NULL) {
468
//          res->addr = malloc(SOCKETID_SIZE);
469
//          if (res->addr != NULL) {
470
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
471
//        //         res->addrSize = SOCKETID_SIZE;
472
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
473
//                 res->connID = s->connID;
474
//          }
475
//          else {
476
//                free(res);
477
//                res = NULL;
478
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
479
//          }
480
//  }
481
//        return res;
482
        s->refcnt++;
483
        return s;
484
}
485

    
486
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
487
{
488
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
489
}
490

    
491
int nodeid_dump(uint8_t *b, const struct nodeID *s)
492
{
493
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
494
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
495

    
496
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
497
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
498

    
499
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
500
}
501

    
502
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
503
{
504
  struct nodeID *res;
505
  res = malloc(sizeof(struct nodeID));
506
  if (res != NULL) {
507
          memset(res,0,sizeof(struct nodeID));
508
          res->addr = malloc(SOCKETID_SIZE);
509
          if (res->addr != NULL) {
510
                  memset(res->addr,0,SOCKETID_SIZE);
511
                  //memcpy(res->addr, b, SOCKETID_SIZE);
512
                  //*len = SOCKETID_SIZE;
513
                  *len = strlen((char*)b) + 1;
514
                  mlStringToSocketID((char *)b,res->addr);
515
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
516
        //          res->addrSize = SOCKETID_SIZE;
517
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
518
                  res->connID = -1;
519
                  res->refcnt = 1;
520
          }
521
          else {
522
                  free(res);
523
                  res = NULL;
524
                  // TODO: what about *len in this case???
525
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
526
          }
527
  }
528

    
529

    
530
  return res;
531
}