Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ 1deeaf91

History | View | Annotate | Download (15.3 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13
#include <assert.h>
14

    
15

    
16
#include "net_helper.h"
17
#include "ml.h"
18

    
19
#include "msg_types.h"/**/
20

    
21

    
22
#ifdef MONL
23
#include "mon.h"
24
#include "grapes_log.h"
25
#include "repoclient.h"
26
#include "grapes.h"
27
#endif
28

    
29

    
30
/**
31
 * libevent pointer
32
 */
33
struct event_base *base;
34

    
35
#define NH_BUFFER_SIZE 1000
36
#define NH_LOOKUP_SIZE 1000
37
#define NH_PACKET_TIMEOUT {0, 500*1000}
38
#define NH_ML_INIT_TIMEOUT {1, 0}
39

    
40
static int sIdx = 0;
41
static int rIdxML = 0;        //reveive from ML to this buffer position
42
static int rIdxUp = 0;        //hand up to layer above at this buffer position
43

    
44
typedef struct nodeID {
45
        socketID_handle addr;
46
        int connID;        // connection associated to this node, -1 if myself
47
        int refcnt;
48
#ifdef MONL
49
        //n quick and dirty static vector for measures TODO: make it dinamic
50
        MonHandler mhs[20];
51
        int n_mhs;
52
#endif
53
//        int addrSize;
54
//        int addrStringSize;
55
} nodeID;
56

    
57
typedef struct msgData_cb {
58
        int bIdx;        // index of the message in the proper buffer
59
        unsigned char msgType; // message type
60
        int mSize;        // message size
61
        bool conn_cb_called;
62
        bool cancelled;
63
} msgData_cb;
64

    
65
static struct nodeID **lookup_array;
66
static int lookup_max = NH_LOOKUP_SIZE;
67
static int lookup_curr = 0;
68

    
69
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
70
static int timeoutFired = 0;
71

    
72
// pointers to the msgs to be send
73
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
74
// pointers to the received msgs + sender nodeID
75
struct receivedB {
76
        struct nodeID *id;
77
        int len;
78
        uint8_t *data;
79
};
80
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
81
/**/ static int recv_counter =0; static int snd_counter =0;
82

    
83

    
84
static void connReady_cb (int connectionID, void *arg);
85
static struct nodeID *new_node(socketID_handle peer) {
86
        send_params params = {0,0,0,0};
87
        struct nodeID *res = malloc(sizeof(struct nodeID));
88
        if (!res) {
89
                 fprintf(stderr, "Net-helper : memory error\n");
90
                 return NULL;
91
        }
92
        memset(res, 0, sizeof(struct nodeID));
93

    
94
        res->addr = malloc(SOCKETID_SIZE);
95
        if (! res->addr) {
96
                free (res);
97
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
98
                return NULL;
99
        }
100
        memset(res->addr, 0, SOCKETID_SIZE);
101
        memcpy(res->addr, peer ,SOCKETID_SIZE);
102

    
103
        res->refcnt = 1;
104

    
105
        res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
106

    
107
        return res;
108
}
109

    
110

    
111
static struct nodeID **id_lookup(socketID_handle target) {
112

    
113
        int i,here=-1;
114
        for (i=0;i<lookup_curr;i++) {
115
                if (lookup_array[i] == NULL) {
116
                        if (here < 0) {
117
                                here = i;
118
                        }
119
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
120
                        return &lookup_array[i];
121
                }
122
        }
123

    
124
        if (here == -1) {
125
                here = lookup_curr++;
126
        }
127

    
128
        if (lookup_curr > lookup_max) {
129
                lookup_max *= 2;
130
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
131
        }
132

    
133
        lookup_array[here] = new_node(target);
134
        return &lookup_array[here];
135
}
136

    
137
static struct nodeID *id_lookup_dup(socketID_handle target) {
138
        return nodeid_dup(*id_lookup(target));
139
}
140

    
141

    
142
/**
143
 * Look for a free slot in the received buffer and allocates it for immediate use
144
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
145
 */
146
static int next_R() {
147
        if (receivedBuffer[rIdxML].data==NULL) {
148
                int ret = rIdxML;
149
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
150
                return ret;
151
        } else {
152
                //TODO: handle receive overload situation!
153
                return -1;
154
        }
155
}
156

    
157
/**
158
 * Look for a free slot in the sending buffer and allocates it for immediate use
159
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
160
 */
161
static int next_S() {
162
        if (sendingBuffer[sIdx]) {
163
                int count;
164
                for (count=0;count<NH_BUFFER_SIZE;count++) {
165
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
166
                        if (sendingBuffer[sIdx]==NULL)
167
                                break;
168
                }
169
                if (count==NH_BUFFER_SIZE) {
170
                        return -1;
171
                }
172
        }
173
        return sIdx;
174
}
175

    
176

    
177
/**
178
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
179
 * @param local_socketID
180
 * @param errorstatus
181
 */
182
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
183
        switch (errorstatus) {
184
        case 0:
185
                me->addr = malloc(SOCKETID_SIZE);
186
                if (! me->addr) {
187
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
188
                        return;
189
                }
190

    
191
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
192
        //        me->addrSize = SOCKETID_SIZE;
193
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
194
                me->connID = -1;
195
                me->refcnt = 1;
196
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
197
                break;
198
        case -1:
199
                //
200
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
201
                exit(1);
202
                break;
203
        case 1:
204
                //
205
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
206
                exit(1);
207
                break;
208
        case 2:
209
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
210
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
211
            mlSetStunServer(0,NULL);
212
            break;
213
        default :        // should never happen
214
                //
215
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
216
        }
217

    
218
}
219

    
220
/**
221
 * Timeout callback to be set in the eventlib loop as needed
222
 * @param socket
223
 * @param flag
224
 * @param arg
225
 */
226
static void t_out_cb (int socket, short flag, void* arg) {
227

    
228
        timeoutFired = 1;
229
//        fprintf(stderr,"TIMEOUT!!!\n");
230
//        event_base_loopbreak(base);
231
}
232

    
233
/**
234
 * Callback called by ml when a remote node ask for a connection
235
 * @param connectionID
236
 * @param arg
237
 */
238
static void receive_conn_cb(int connectionID, void *arg) {
239
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
240

    
241
}
242

    
243
void free_sending_buffer(int i)
244
{
245
        free(sendingBuffer[i]);
246
        sendingBuffer[i] = NULL;
247
}
248

    
249
/**
250
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
251
 * @param connectionID
252
 * @param arg
253
 */
254
static void connReady_cb (int connectionID, void *arg) {
255
        char mt;
256
        msgData_cb *p;
257
        p = (msgData_cb *)arg;
258
        if (p == NULL) return;
259
        if (p->cancelled) {
260
            free(p);
261
            return;
262
        }
263
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
264
/**/        mt = ((char*)sendingBuffer[p->bIdx])[0]; ++snd_counter;
265
        if (mt!=MSG_TYPE_TOPOLOGY &&
266
                mt!=MSG_TYPE_CHUNK && mt!=MSG_TYPE_SIGNALLING) {
267
                        fprintf(stderr,"Net-helper ERROR! Sent message # %d of type %c and size %d\n",
268
                                snd_counter,mt+'0', p->mSize);}
269
        free_sending_buffer(p->bIdx);
270
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
271
        //        event_base_loopbreak(base);
272
        p->conn_cb_called = true;
273
}
274

    
275
/**
276
 * Callback called by ml when a connection error occurs
277
 * @param connectionID
278
 * @param arg
279
 */
280
static void connError_cb (int connectionID, void *arg) {
281
        // simply get rid of the msg in the buffer....
282
        msgData_cb *p;
283
        p = (msgData_cb *)arg;
284
        if (p != NULL) {
285
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
286
                if (p->cancelled) {
287
                        free(p);//p->mSize = -1;
288
                } else {
289
                        p->conn_cb_called = true;
290
                }
291
        }
292
        //        event_base_loopbreak(base);
293
}
294

    
295

    
296
/**
297
 * Callback to receive data from ml
298
 * @param buffer
299
 * @param buflen
300
 * @param msgtype
301
 * @param arg
302
 */
303
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
304
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
305
        char str[SOCKETID_STRING_SIZE];
306
//        fprintf(stderr, "Net-helper : called back with some news...\n");
307
/**/ ++recv_counter;
308
        if (arg->remote_socketID != NULL)
309
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
310
        else
311
                sprintf(str,"!Unknown!");
312
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
313
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
314
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
315
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
316
        }
317
        else {
318
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
319
                // buffering the received message only if possible, otherwise ignore it...
320
                int index = next_R();
321
                if (index<0) {
322
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
323
                        return;
324
                } else {
325
                        receivedBuffer[index].data = malloc(buflen);
326
                        if (receivedBuffer[index].data == NULL) {
327
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
328
                                return;
329
                        }
330
                        receivedBuffer[index].len = buflen;
331
                        memcpy(receivedBuffer[index].data,buffer,buflen);
332
                          // save the socketID of the sender
333
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
334
                }
335
  }
336
//        event_base_loopbreak(base);
337
}
338

    
339

    
340
struct nodeID *net_helper_init(const char *IPaddr, int port) {
341

    
342
        struct timeval tout = NH_ML_INIT_TIMEOUT;
343
        int s, i;
344
        base = event_base_new();
345
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
346

    
347
        me = malloc(sizeof(nodeID));
348
        if (me == NULL) {
349
                return NULL;
350
        }
351
        memset(me,0,sizeof(nodeID));
352
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
353
        me->refcnt = 1;
354

    
355
        for (i=0;i<NH_BUFFER_SIZE;i++) {
356
                sendingBuffer[i] = NULL;
357
                receivedBuffer[i].data = NULL;
358
        }
359

    
360
        mlRegisterErrorConnectionCb(&connError_cb);
361
        mlRegisterRecvConnectionCb(&receive_conn_cb);
362
        s = mlInit(1,tout,port,IPaddr,3478,"stun.ekiga.net",&init_myNodeID_cb,base);
363
        if (s < 0) {
364
                fprintf(stderr, "Net-helper : error initializing ML!\n");
365
                free(me);
366
                return NULL;
367
        }
368

    
369
#ifdef MONL
370
{
371
        void *repoclient;
372
        eventbase = base;
373

    
374
        // Initialize logging
375
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
376

    
377
        repInit("");
378
        repoclient = repOpen("79.120.193.115:9832",60);        //repository.napa-wine.eu
379
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
380
        monInit(base, repoclient);
381
}
382
#endif
383

    
384
        while (me->connID<-1) {
385
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
386
                event_base_loop(base,EVLOOP_ONCE);
387
        }
388
        timeoutFired = 0;
389
//        fprintf(stderr,"Net-helper init : back from init!\n");
390

    
391
        return me;
392
}
393

    
394

    
395
void bind_msg_type (unsigned char msgtype) {
396

    
397
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
398
}
399

    
400

    
401
void send_to_peer_cb(int fd, short event, void *arg)
402
{
403
        msgData_cb *p = (msgData_cb *) arg;
404
        if (p->conn_cb_called) {
405
                free(p);
406
        }
407
        else { //don't send it anymore
408
                free_sending_buffer(p->bIdx);
409
                p->cancelled = true;
410
                // don't free p, the other timeout will do it
411
        }
412
}
413

    
414
/**
415
 * Called by the application to send data to a remote peer
416
 * @param from
417
 * @param to
418
 * @param buffer_ptr
419
 * @param buffer_size
420
 * @return The dimension of the buffer or -1 if a connection error occurred.
421
 */
422
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
423
{
424
        msgData_cb *p;
425
        int current;
426
        send_params params = {0,0,0,0};
427

    
428
        if (buffer_size <= 0) {
429
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
430
                return buffer_size;
431
        }
432

    
433
        // if buffer is full, discard the message and return an error flag
434
        int index = next_S();
435
        if (index<0) {
436
                // free(buffer_ptr);
437
                fprintf(stderr,"Net-helper: send buffer full\n ");
438
                return -1;
439
        }
440
        sendingBuffer[index] = malloc(buffer_size);
441
        if (! sendingBuffer[index]){
442
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
443
                return -1;
444
        }
445
        memset(sendingBuffer[index],0,buffer_size);
446
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
447
        // free(buffer_ptr);
448
        p = malloc(sizeof(msgData_cb));
449
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
450
        current = p->bIdx;
451

    
452
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
453
        if (to->connID<0) {
454
                free_sending_buffer(current);
455
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
456
                free(p);
457
                return -1;
458
        }
459
        else {
460
                struct timeval timeout = NH_PACKET_TIMEOUT;
461
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
462
                return buffer_size; //p->mSize;
463
        }
464

    
465
}
466

    
467

    
468
/**
469
 * Called by an application to receive data from remote peers
470
 * @param local
471
 * @param remote
472
 * @param buffer_ptr
473
 * @param buffer_size
474
 * @return The number of received bytes or -1 if some error occurred.
475
 */
476
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
477
{
478
        int size;
479
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
480
                wait4data(local, NULL, NULL);
481
        }
482

    
483
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
484

    
485
        (*remote) = receivedBuffer[rIdxUp].id;
486
        // retrieve a msg from the buffer
487
        size = receivedBuffer[rIdxUp].len;
488
        if (size>buffer_size) {
489
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
490
                return -1;
491
        }
492
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
493
        free(receivedBuffer[rIdxUp].data);
494
        receivedBuffer[rIdxUp].data = NULL;
495
        receivedBuffer[rIdxUp].id = NULL;
496

    
497
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
498

    
499
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
500

    
501
        return size;
502
}
503

    
504

    
505
int wait4data(const struct nodeID *n, struct timeval *tout, fd_set *dummy) {
506

    
507
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
508
        if (tout) {        //if tout==NULL, loop wait infinitely
509
                event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
510
        }
511
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0) {
512
        //        event_base_dispatch(base);
513
                event_base_loop(base,EVLOOP_ONCE);
514
        }
515
        timeoutFired = 0;
516
//        fprintf(stderr,"Back from eventlib loop.\n");
517

    
518
        if (receivedBuffer[rIdxUp].data!=NULL)
519
                return 1;
520
        else
521
                return 0;
522
}
523

    
524
socketID_handle getRemoteSocketID(const char *ip, int port) {
525
        char str[SOCKETID_STRING_SIZE];
526
        socketID_handle h;
527

    
528
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
529
        h = malloc(SOCKETID_SIZE);
530
        mlStringToSocketID(str, h);
531

    
532
        return h;
533
}
534

    
535
struct nodeID *create_node(const char *rem_IP, int rem_port) {
536
        socketID_handle s;
537
        struct nodeID *remote;
538

    
539
        s = getRemoteSocketID(rem_IP, rem_port);
540
        remote = id_lookup_dup(s);
541
        free(s);
542

    
543
        return remote;
544
}
545

    
546
// TODO: check why closing the connection is annoying for the ML
547
void nodeid_free(struct nodeID *n) {
548
        if (n && (--(n->refcnt) == 1)) {
549
/*
550
                struct nodeID **npos;
551
        //        mlCloseConnection(n->connID);
552
                npos = id_lookup(n->addr);
553
                *npos = NULL;
554
                mlCloseSocket(n->addr);
555
                free(n);
556
*/
557
        }
558
}
559

    
560

    
561
const char *node_addr(const struct nodeID *s)
562
{
563
  static char addr[256];
564
  // TODO: mlSocketIDToString always return 0 !!!
565
  int r = mlSocketIDToString(s->addr,addr,256);
566
  if (!r)
567
          return addr;
568
  else
569
          return "";
570
}
571

    
572
struct nodeID *nodeid_dup(struct nodeID *s)
573
{
574
        s->refcnt++;
575
        return s;
576
}
577

    
578
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
579
{
580
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
581
}
582

    
583
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
584
{
585
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
586

    
587
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
588
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
589

    
590
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
591
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
592

    
593
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
594
}
595

    
596
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
597
{
598
  uint8_t sid[SOCKETID_SIZE];
599
  socketID_handle h = (socketID_handle) sid;
600
  mlStringToSocketID((char *)b,h);
601
  *len = strlen((char*)b) + 1;
602
  return id_lookup_dup(h);
603
}