Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ ad6c7e81

History | View | Annotate | Download (13.9 KB)

1 22354b6f MarcoBiazzini
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see GPL.txt
5
 *
6
 */
7
8
#include <event2/event.h>
9
#include <arpa/inet.h>
10
#include <unistd.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <string.h>
14
15 ff72192d MarcoBiazzini
16 22354b6f MarcoBiazzini
#include "net_helper.h"
17
#include "ml.h"
18
#include "ml_helpers.h"
19
20
/**
21
 * libevent pointer
22
 */
23
struct event_base *base;
24
25
#define NH_BUFFER_SIZE 100
26
27
static int sIdx = 0;
28
static int rIdx = 0;
29
30
typedef struct nodeID {
31
        socketID_handle addr;
32
        int connID;        // connection associated to this node, -1 if myself
33 ff72192d MarcoBiazzini
        int refcnt;
34
//        int addrSize;
35
//        int addrStringSize;
36 22354b6f MarcoBiazzini
} nodeID;
37
38
typedef struct msgData_cb {
39
        int bIdx;        // index of the message in the proper buffer
40
        unsigned char msgType; // message type
41
        int mSize;        // message size
42
} msgData_cb;
43
44
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
45
static int timeoutFired = 0;
46
47
// pointers to the msgs to be send
48
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
49
// pointers to the received msgs + sender nodeID
50
static uint8_t *receivedBuffer[NH_BUFFER_SIZE][2];
51
52
53
/**
54
 * Look for a free slot in the received buffer and allocates it for immediate use
55
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
56
 */
57
static int next_R() {
58 ad6c7e81 AlessandroRusso
        const int size = 1024;
59 22354b6f MarcoBiazzini
        if (receivedBuffer[rIdx][0]==NULL) {
60 ad6c7e81 AlessandroRusso
                receivedBuffer[rIdx][0] = malloc(size);
61 22354b6f MarcoBiazzini
        }
62
        else {
63
                int count;
64
                for (count=0;count<NH_BUFFER_SIZE;count++) {
65
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
66
                        if (receivedBuffer[rIdx][0]==NULL)
67
                                break;
68
                }
69
                if (count==NH_BUFFER_SIZE)
70
                        return -1;
71
                else {
72 ad6c7e81 AlessandroRusso
                        receivedBuffer[rIdx][0] = malloc(size);
73 22354b6f MarcoBiazzini
                }
74
        }
75 ad6c7e81 AlessandroRusso
        memset(receivedBuffer[rIdx][0],0,size);
76 22354b6f MarcoBiazzini
        return rIdx;
77
}
78
79
/**
80
 * Look for a free slot in the sending buffer and allocates it for immediate use
81
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
82
 */
83
static int next_S() {
84 ad6c7e81 AlessandroRusso
        const int size = 1024;
85 22354b6f MarcoBiazzini
        if (sendingBuffer[sIdx]==NULL) {
86 ad6c7e81 AlessandroRusso
                sendingBuffer[sIdx] = malloc(size);
87 22354b6f MarcoBiazzini
        }
88
        else {
89
                int count;
90
                for (count=0;count<NH_BUFFER_SIZE;count++) {
91
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
92
                        if (sendingBuffer[sIdx]==NULL)
93
                                break;
94
                }
95
                if (count==NH_BUFFER_SIZE)
96
                        return -1;
97
                else {
98 ad6c7e81 AlessandroRusso
                        sendingBuffer[sIdx] = malloc(size);
99 22354b6f MarcoBiazzini
                }
100
        }
101
        return sIdx;
102
}
103
104 79768707 MarcoBiazzini
105 22354b6f MarcoBiazzini
/**
106
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
107
 * @param local_socketID
108
 * @param errorstatus
109
 */
110
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
111
        switch (errorstatus) {
112
        case 0:
113
                //
114
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
115 ff72192d MarcoBiazzini
        //        me->addrSize = SOCKETID_SIZE;
116
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
117 22354b6f MarcoBiazzini
                me->connID = -1;
118 ff72192d MarcoBiazzini
                me->refcnt = 1;
119
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
120 22354b6f MarcoBiazzini
                break;
121
        case -1:
122
                //
123
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
124 ff72192d MarcoBiazzini
                exit(1);
125 22354b6f MarcoBiazzini
                break;
126
        case 1:
127
                //
128
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
129 ff72192d MarcoBiazzini
                exit(1);
130 22354b6f MarcoBiazzini
                break;
131
        case 2:
132
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
133 ff72192d MarcoBiazzini
            exit(2);
134 22354b6f MarcoBiazzini
            break;
135
        default :        // should never happen
136
                //
137
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
138
        }
139
140
}
141
142
/**
143
 * Timeout callback to be set in the eventlib loop as needed
144
 * @param socket
145
 * @param flag
146
 * @param arg
147
 */
148
static void t_out_cb (int socket, short flag, void* arg) {
149
150
        timeoutFired = 1;
151 ff72192d MarcoBiazzini
//        fprintf(stderr,"TIMEOUT!!!\n");
152 22354b6f MarcoBiazzini
//        event_base_loopbreak(base);
153
}
154
155
/**
156
 * Callback called by ml when a remote node ask for a connection
157
 * @param connectionID
158
 * @param arg
159
 */
160
static void receive_conn_cb(int connectionID, void *arg) {
161 ff72192d MarcoBiazzini
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
162 22354b6f MarcoBiazzini
163
}
164
165
/**
166
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
167
 * @param connectionID
168
 * @param arg
169
 */
170
static void connReady_cb (int connectionID, void *arg) {
171
172
        msgData_cb *p;
173
        p = (msgData_cb *)arg;
174
        if (p == NULL) return;
175
        send_params params = {0,0,0,0};
176
        send_Data(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,&params);
177
        free(sendingBuffer[p->bIdx]);
178
        sendingBuffer[p->bIdx] = NULL;
179 ff72192d MarcoBiazzini
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
180 22354b6f MarcoBiazzini
        //        event_base_loopbreak(base);
181
}
182
183
/**
184
 * Callback called by ml when a connection error occurs
185
 * @param connectionID
186
 * @param arg
187
 */
188
static void connError_cb (int connectionID, void *arg) {
189
        // simply get rid of the msg in the buffer....
190
        msgData_cb *p;
191
        p = (msgData_cb *)arg;
192
        if (p != NULL) {
193
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
194
                free(sendingBuffer[p->bIdx]);
195
                sendingBuffer[p->bIdx] = NULL;
196 ad6c7e81 AlessandroRusso
                p->mSize = -1;
197 22354b6f MarcoBiazzini
        }
198
        //        event_base_loopbreak(base);
199
}
200
201 79768707 MarcoBiazzini
202 22354b6f MarcoBiazzini
/**
203
 * Callback to receive data from ml
204
 * @param buffer
205
 * @param buflen
206
 * @param msgtype
207
 * @param arg
208
 */
209
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
210
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
211
//        fprintf(stderr, "Net-helper : called back with some news...\n");
212 ff72192d MarcoBiazzini
        char str[SOCKETID_STRING_SIZE];
213 22354b6f MarcoBiazzini
        if (arg->remote_socketID != NULL)
214
                socketID_To_String(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
215
        else
216
                sprintf(str,"!Unknown!");
217
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
218
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
219
        }
220
        else {
221 ff72192d MarcoBiazzini
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
222 22354b6f MarcoBiazzini
                // buffering the received message only if possible, otherwise ignore it...
223
                int index = next_R();
224
                if (index >=0) {
225
                //        receivedBuffer[index][0] = malloc(buflen);
226
                        if (receivedBuffer[index][0] == NULL) {
227
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
228
                                return;
229
                        }
230
                        // creating a new sender nodedID
231
                        receivedBuffer[index][1] = malloc(sizeof(nodeID));
232
                        if (receivedBuffer[index][1]==NULL) {
233
                                free (receivedBuffer[index][0]);
234
                                receivedBuffer[index][0] = NULL;
235
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
236
                                return;
237
                        }
238
                        else {
239 ff72192d MarcoBiazzini
                                memset(receivedBuffer[index][1], 0, sizeof(struct nodeID));
240 22354b6f MarcoBiazzini
                                nodeID *remote; remote = (nodeID*)(receivedBuffer[index][1]);
241 ff72192d MarcoBiazzini
                                receivedBuffer[index][0] = realloc(receivedBuffer[index][0],buflen+sizeof(int));
242 ad6c7e81 AlessandroRusso
                                memset(receivedBuffer[index][0],0,buflen+sizeof(int));
243
                                memcpy(receivedBuffer[index][0],&buflen,sizeof(int));
244
                                //*(receivedBuffer[index][0]) = buflen;
245 ff72192d MarcoBiazzini
                                memcpy((receivedBuffer[index][0])+sizeof(int),buffer,buflen);
246 22354b6f MarcoBiazzini
                                  // get the socketID of the sender
247
                                remote->addr = malloc(SOCKETID_SIZE);
248
                                if (remote->addr == NULL) {
249
                                          free (remote);
250
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
251
                                          return;
252
                                }
253
                                else {
254 ff72192d MarcoBiazzini
                                        memset(remote->addr, 0, SOCKETID_SIZE);
255 22354b6f MarcoBiazzini
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
256 ff72192d MarcoBiazzini
                                //        remote->addrSize = SOCKETID_SIZE;
257
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
258 22354b6f MarcoBiazzini
                                        remote->connID = arg->connectionID;
259 ff72192d MarcoBiazzini
                                        remote->refcnt = 1;
260 22354b6f MarcoBiazzini
                                }
261
                        }
262
                }
263
  }
264
//        event_base_loopbreak(base);
265
}
266
267 79768707 MarcoBiazzini
268 7b017ba0 AlessandroRusso
struct nodeID *net_helper_init(const char *IPaddr, int port) {
269 22354b6f MarcoBiazzini
270
        struct timeval tout = {1, 0};
271
        base = event_base_new();
272
273
        me = malloc(sizeof(nodeID));
274
        if (me == NULL) {
275
                return NULL;
276
        }
277 ff72192d MarcoBiazzini
        memset(me,0,sizeof(nodeID));
278 22354b6f MarcoBiazzini
        me->addr = malloc(SOCKETID_SIZE);
279
        if (me->addr == NULL) {
280
                free(me);
281
                return NULL;
282
        }
283 ff72192d MarcoBiazzini
        memset(me->addr,0,SOCKETID_SIZE);
284
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
285
        me->refcnt = 1;
286 22354b6f MarcoBiazzini
287
        int i;
288
        for (i=0;i<NH_BUFFER_SIZE;i++) {
289
                sendingBuffer[i] = NULL;
290
                receivedBuffer[i][0] = NULL;
291
        }
292
293
        register_Error_connection_cb(&connError_cb);
294
        register_Recv_connection_cb(&receive_conn_cb);
295
        init_messaging_layer(1,tout,port,IPaddr,0,NULL,&init_myNodeID_cb,base);
296 ff72192d MarcoBiazzini
        while (me->connID<-1) {
297
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
298
                event_base_loop(base,EVLOOP_ONCE);
299
        }
300
        timeoutFired = 0;
301 22354b6f MarcoBiazzini
//        fprintf(stderr,"Net-helper init : back from init!\n");
302 ff72192d MarcoBiazzini
303 22354b6f MarcoBiazzini
        return me;
304
}
305
306
307 7b017ba0 AlessandroRusso
void bind_msg_types (unsigned char msgtypes[], int msgtypes_len) {
308
        int i;
309
        for (i=0;i<msgtypes_len;i++) {
310
                        register_Recv_data_cb(&recv_data_cb,msgtypes[i]);
311
        }
312
}
313
314
315 22354b6f MarcoBiazzini
/**
316
 * Called by the application to send data to a remote peer
317
 * @param from
318
 * @param to
319
 * @param buffer_ptr
320
 * @param buffer_size
321 ff72192d MarcoBiazzini
 * @return The dimension of the buffer or -1 if a connection error occurred.
322 22354b6f MarcoBiazzini
 */
323
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
324
{
325
        // if buffer is full, discard the message and return an error flag
326
        int index = next_S();
327
        if (index<0) {
328
                // free(buffer_ptr);
329
                return -1;
330
        }
331 ad6c7e81 AlessandroRusso
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
332
        memset(sendingBuffer[index],0,buffer_size);
333 22354b6f MarcoBiazzini
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
334
        // free(buffer_ptr);
335
        msgData_cb *p = malloc(sizeof(msgData_cb));
336
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
337
        int current = p->bIdx;
338
        to->connID = open_Connection(to->addr,&connReady_cb,p);
339
        if (to->connID<0) {
340
                free(sendingBuffer[current]);
341
                sendingBuffer[current] = NULL;
342
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
343 ff72192d MarcoBiazzini
                free(p);
344
                return -1;
345 22354b6f MarcoBiazzini
        }
346
        else {
347 ff72192d MarcoBiazzini
        //        fprintf(stderr,"Net-helper: Got a connection ID to send msg %d to %s.\n ",current,node_addr(to));
348 22354b6f MarcoBiazzini
        //                struct timeval tout = {0,500};
349
        //                event_base_once(base,0, EV_TIMEOUT, &t_out_cb, NULL, &tout);
350
                while (sendingBuffer[current] != NULL)
351
                        event_base_loop(base,EVLOOP_ONCE);//  EVLOOP_NONBLOCK
352
//                fprintf(stderr,"Net-helper: Back from eventlib loop with status %d.\n", ok);
353 ad6c7e81 AlessandroRusso
                int size = p->mSize;
354 ff72192d MarcoBiazzini
                free(p);
355 ad6c7e81 AlessandroRusso
                return size;
356 22354b6f MarcoBiazzini
        }
357
358
}
359
360
361
/**
362
 * Called by an application to receive data from remote peers
363
 * @param local
364
 * @param remote
365
 * @param buffer_ptr
366
 * @param buffer_size
367
 * @return The number of received bytes or -1 if some error occurred.
368
 */
369
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
370
{
371
        // this should never happen... if it does, index handling is faulty...
372
        if (receivedBuffer[rIdx][1]==NULL) {
373
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
374
                return -1;
375
        }
376
377
        (*remote) = (nodeID*)(receivedBuffer[rIdx][1]);
378
        // retrieve a msg from the buffer
379 ff72192d MarcoBiazzini
        memcpy(buffer_ptr, (receivedBuffer[rIdx][0])+sizeof(int), buffer_size);
380
        int size = (int)(*(receivedBuffer[rIdx][0]));
381 22354b6f MarcoBiazzini
        free(receivedBuffer[rIdx][0]);
382
        receivedBuffer[rIdx][0] = NULL;
383
        receivedBuffer[rIdx][1] = NULL;
384
385
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
386
387 ff72192d MarcoBiazzini
        return size;
388 22354b6f MarcoBiazzini
}
389
390
391
int wait4data(const struct nodeID *n, struct timeval *tout) {
392
393
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
394
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
395
        while(receivedBuffer[rIdx][0]==NULL && timeoutFired==0) {
396
        //        event_base_dispatch(base);
397
                event_base_loop(base,EVLOOP_ONCE);
398
        }
399
        timeoutFired = 0;
400
//        fprintf(stderr,"Back from eventlib loop.\n");
401
402
        if (receivedBuffer[rIdx][0]!=NULL)
403
                return 1;
404
        else
405
                return 0;
406
}
407
408
409
410
411
struct nodeID *create_node(const char *rem_IP, int rem_port) {
412
        struct nodeID *remote = malloc(sizeof(nodeID));
413
        if (remote == NULL) {
414
                return NULL;
415
        }
416
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
417
//        if (remote->addr == NULL) {
418
//                free(remote);
419
//                return NULL;
420
//        }
421 ff72192d MarcoBiazzini
//        remote->addrSize = SOCKETID_SIZE;
422
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
423 22354b6f MarcoBiazzini
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
424
        remote->connID = open_Connection(remote->addr,&connReady_cb,NULL);
425 ff72192d MarcoBiazzini
        remote->refcnt = 1;
426 22354b6f MarcoBiazzini
        return remote;
427
}
428
429
// TODO: check why closing the connection is annoying for the ML
430 ff72192d MarcoBiazzini
void delete_node(struct nodeID *n) {
431
432 22354b6f MarcoBiazzini
//        close_Connection(n->connID);
433 ff72192d MarcoBiazzini
//        close_Socket(n->addr);
434
//        free(n);
435
        if (n && (--(n->refcnt) == 0)) {
436
        //        close_Connection(n->connID);
437
                close_Socket(n->addr);
438
                free(n);
439
        }
440 22354b6f MarcoBiazzini
}
441
442
443
const char *node_addr(const struct nodeID *s)
444
{
445
  static char addr[256];
446
  // TODO: socketID_To_String always return 0 !!!
447
  int r = socketID_To_String(s->addr,addr,256);
448
  if (!r)
449
          return addr;
450
  else
451
          return "";
452
}
453
454 ff72192d MarcoBiazzini
struct nodeID *nodeid_dup(struct nodeID *s)
455 22354b6f MarcoBiazzini
{
456 ff72192d MarcoBiazzini
//  struct nodeID *res;
457
//
458
//  res = malloc(sizeof(struct nodeID));
459
//  if (res != NULL) {
460
//          res->addr = malloc(SOCKETID_SIZE);
461
//          if (res->addr != NULL) {
462
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
463
//        //         res->addrSize = SOCKETID_SIZE;
464
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
465
//                 res->connID = s->connID;
466
//          }
467
//          else {
468
//                free(res);
469
//                res = NULL;
470
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
471
//          }
472
//  }
473
//        return res;
474
        s->refcnt++;
475
        return s;
476 22354b6f MarcoBiazzini
}
477
478
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
479
{
480 ff72192d MarcoBiazzini
        return (compare_socketIDs(s1->addr,s2->addr) == 0);
481 22354b6f MarcoBiazzini
}
482
483
int nodeid_dump(uint8_t *b, const struct nodeID *s)
484
{
485
  socketID_To_String(s->addr,(char *)b,SOCKETID_STRING_SIZE);
486 ff72192d MarcoBiazzini
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
487 22354b6f MarcoBiazzini
  return strlen((char *)b);
488
489 ff72192d MarcoBiazzini
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
490
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
491 22354b6f MarcoBiazzini
492
}
493
494
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
495
{
496
  struct nodeID *res;
497
  res = malloc(sizeof(struct nodeID));
498
  if (res != NULL) {
499 ff72192d MarcoBiazzini
          memset(res,0,sizeof(struct nodeID));
500 22354b6f MarcoBiazzini
          res->addr = malloc(SOCKETID_SIZE);
501
          if (res->addr != NULL) {
502 ff72192d MarcoBiazzini
                  memset(res->addr,0,SOCKETID_SIZE);
503
                  //memcpy(res->addr, b, SOCKETID_SIZE);
504
                  //*len = SOCKETID_SIZE;
505 22354b6f MarcoBiazzini
                  *len = strlen((char*)b);
506 ff72192d MarcoBiazzini
                  string_To_SocketID((char *)b,res->addr);
507
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
508
        //          res->addrSize = SOCKETID_SIZE;
509
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
510 22354b6f MarcoBiazzini
                  res->connID = -1;
511 ff72192d MarcoBiazzini
                  res->refcnt = 1;
512 22354b6f MarcoBiazzini
          }
513
          else {
514
                  free(res);
515
                  res = NULL;
516 ff72192d MarcoBiazzini
                  // TODO: what about *len in this case???
517 22354b6f MarcoBiazzini
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
518
          }
519
  }
520 ff72192d MarcoBiazzini
521 22354b6f MarcoBiazzini
522
  return res;
523
}