Statistics
| Branch: | Revision:

grapes / som / net_helper-ml.c @ 7930ec45

History | View | Annotate | Download (14.1 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

    
19
/**
20
 * libevent pointer
21
 */
22
struct event_base *base;
23

    
24
#define NH_BUFFER_SIZE 100
25

    
26
static int sIdx = 0;
27
static int rIdx = 0;
28

    
29
typedef struct nodeID {
30
        socketID_handle addr;
31
        int connID;        // connection associated to this node, -1 if myself
32
        int refcnt;
33
//        int addrSize;
34
//        int addrStringSize;
35
} nodeID;
36

    
37
typedef struct msgData_cb {
38
        int bIdx;        // index of the message in the proper buffer
39
        unsigned char msgType; // message type
40
        int mSize;        // message size
41
} msgData_cb;
42

    
43
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
44
static int timeoutFired = 0;
45

    
46
// pointers to the msgs to be send
47
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
48
// pointers to the received msgs + sender nodeID
49
static uint8_t *receivedBuffer[NH_BUFFER_SIZE][2];
50
/**/ static int recv_counter =0;
51

    
52
/**
53
 * Look for a free slot in the received buffer and allocates it for immediate use
54
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
55
 */
56
static int next_R() {
57
        const int size = 1024;
58
        if (receivedBuffer[rIdx][0]==NULL) {
59
                receivedBuffer[rIdx][0] = malloc(size);
60
        }
61
        else {
62
                int count;
63
                for (count=0;count<NH_BUFFER_SIZE;count++) {
64
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
65
                        if (receivedBuffer[rIdx][0]==NULL)
66
                                break;
67
                }
68
                if (count==NH_BUFFER_SIZE)
69
                        return -1;
70
                else {
71
                        receivedBuffer[rIdx][0] = malloc(size);
72
                }
73
        }
74
        memset(receivedBuffer[rIdx][0],0,size);
75
        return rIdx;
76
}
77

    
78
/**
79
 * Look for a free slot in the sending buffer and allocates it for immediate use
80
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
81
 */
82
static int next_S() {
83
        const int size = 1024;
84
        if (sendingBuffer[sIdx]==NULL) {
85
                sendingBuffer[sIdx] = malloc(size);
86
        }
87
        else {
88
                int count;
89
                for (count=0;count<NH_BUFFER_SIZE;count++) {
90
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
91
                        if (sendingBuffer[sIdx]==NULL)
92
                                break;
93
                }
94
                if (count==NH_BUFFER_SIZE)
95
                        return -1;
96
                else {
97
                        sendingBuffer[sIdx] = malloc(size);
98
                }
99
        }
100
        return sIdx;
101
}
102

    
103

    
104
/**
105
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
106
 * @param local_socketID
107
 * @param errorstatus
108
 */
109
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
110
        switch (errorstatus) {
111
        case 0:
112
                //
113
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
114
        //        me->addrSize = SOCKETID_SIZE;
115
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
116
                me->connID = -1;
117
                me->refcnt = 1;
118
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
119
                break;
120
        case -1:
121
                //
122
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
123
                exit(1);
124
                break;
125
        case 1:
126
                //
127
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
128
                exit(1);
129
                break;
130
        case 2:
131
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
132
            exit(2);
133
            break;
134
        default :        // should never happen
135
                //
136
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
137
        }
138

    
139
}
140

    
141
/**
142
 * Timeout callback to be set in the eventlib loop as needed
143
 * @param socket
144
 * @param flag
145
 * @param arg
146
 */
147
static void t_out_cb (int socket, short flag, void* arg) {
148

    
149
        timeoutFired = 1;
150
//        fprintf(stderr,"TIMEOUT!!!\n");
151
//        event_base_loopbreak(base);
152
}
153

    
154
/**
155
 * Callback called by ml when a remote node ask for a connection
156
 * @param connectionID
157
 * @param arg
158
 */
159
static void receive_conn_cb(int connectionID, void *arg) {
160
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
161

    
162
}
163

    
164
/**
165
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
166
 * @param connectionID
167
 * @param arg
168
 */
169
static void connReady_cb (int connectionID, void *arg) {
170

    
171
        msgData_cb *p;
172
        p = (msgData_cb *)arg;
173
        if (p == NULL) return;
174
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
175
///**/fprintf(stderr,"Sent message of type # %c and size %d\n",
176
//                ((char*)sendingBuffer[p->bIdx])[0]+'0', p->mSize);
177
        free(sendingBuffer[p->bIdx]);
178
        sendingBuffer[p->bIdx] = NULL;
179
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
180
        //        event_base_loopbreak(base);
181
        free(p);
182
}
183

    
184
/**
185
 * Callback called by ml when a connection error occurs
186
 * @param connectionID
187
 * @param arg
188
 */
189
static void connError_cb (int connectionID, void *arg) {
190
        // simply get rid of the msg in the buffer....
191
        msgData_cb *p;
192
        p = (msgData_cb *)arg;
193
        if (p != NULL) {
194
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
195
                free(sendingBuffer[p->bIdx]);
196
                sendingBuffer[p->bIdx] = NULL;
197
                p->mSize = -1;
198
        }
199
        //        event_base_loopbreak(base);
200
}
201

    
202

    
203
/**
204
 * Callback to receive data from ml
205
 * @param buffer
206
 * @param buflen
207
 * @param msgtype
208
 * @param arg
209
 */
210
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
211
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
212
//        fprintf(stderr, "Net-helper : called back with some news...\n");
213
/**/ ++recv_counter;
214
        char str[SOCKETID_STRING_SIZE];
215
        if (arg->remote_socketID != NULL)
216
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
217
        else
218
                sprintf(str,"!Unknown!");
219
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
220
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
221
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %c -- Missing # %d frags\n",
222
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?" => Missing first!":"");
223
        }
224
        else {
225
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
226
                // buffering the received message only if possible, otherwise ignore it...
227
                int index = next_R();
228
                if (index >=0) {
229
                //        receivedBuffer[index][0] = malloc(buflen);
230
                        if (receivedBuffer[index][0] == NULL) {
231
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
232
                                return;
233
                        }
234
                        // creating a new sender nodedID
235
                        receivedBuffer[index][1] = malloc(sizeof(nodeID));
236
                        if (receivedBuffer[index][1]==NULL) {
237
                                free (receivedBuffer[index][0]);
238
                                receivedBuffer[index][0] = NULL;
239
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
240
                                return;
241
                        }
242
                        else {
243
                                memset(receivedBuffer[index][1], 0, sizeof(struct nodeID));
244
                                nodeID *remote; remote = (nodeID*)(receivedBuffer[index][1]);
245
                                receivedBuffer[index][0] = realloc(receivedBuffer[index][0],buflen+sizeof(int));
246
                                memset(receivedBuffer[index][0],0,buflen+sizeof(int));
247
                                memcpy(receivedBuffer[index][0],&buflen,sizeof(int));
248
                                //*(receivedBuffer[index][0]) = buflen;
249
                                memcpy((receivedBuffer[index][0])+sizeof(int),buffer,buflen);
250
                                  // get the socketID of the sender
251
                                remote->addr = malloc(SOCKETID_SIZE);
252
                                if (remote->addr == NULL) {
253
                                          free (remote);
254
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
255
                                          return;
256
                                }
257
                                else {
258
                                        memset(remote->addr, 0, SOCKETID_SIZE);
259
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
260
                                //        remote->addrSize = SOCKETID_SIZE;
261
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
262
                                        remote->connID = arg->connectionID;
263
                                        remote->refcnt = 1;
264
                                }
265
                        }
266
                }
267
  }
268
//        event_base_loopbreak(base);
269
}
270

    
271

    
272
struct nodeID *net_helper_init(const char *IPaddr, int port) {
273

    
274
        struct timeval tout = {1, 0};
275
        base = event_base_new();
276

    
277
        me = malloc(sizeof(nodeID));
278
        if (me == NULL) {
279
                return NULL;
280
        }
281
        memset(me,0,sizeof(nodeID));
282
        me->addr = malloc(SOCKETID_SIZE);
283
        if (me->addr == NULL) {
284
                free(me);
285
                return NULL;
286
        }
287
        memset(me->addr,0,SOCKETID_SIZE);
288
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
289
        me->refcnt = 1;
290

    
291
        int i;
292
        for (i=0;i<NH_BUFFER_SIZE;i++) {
293
                sendingBuffer[i] = NULL;
294
                receivedBuffer[i][0] = NULL;
295
        }
296

    
297
        mlRegisterErrorConnectionCb(&connError_cb);
298
        mlRegisterRecvConnectionCb(&receive_conn_cb);
299
        mlInit(1,tout,port,IPaddr,0,NULL,&init_myNodeID_cb,base);
300
        while (me->connID<-1) {
301
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
302
                event_base_loop(base,EVLOOP_ONCE);
303
        }
304
        timeoutFired = 0;
305
//        fprintf(stderr,"Net-helper init : back from init!\n");
306

    
307
        return me;
308
}
309

    
310

    
311
void bind_msg_type (unsigned char msgtype) {
312

    
313
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
314
}
315

    
316

    
317
/**
318
 * Called by the application to send data to a remote peer
319
 * @param from
320
 * @param to
321
 * @param buffer_ptr
322
 * @param buffer_size
323
 * @return The dimension of the buffer or -1 if a connection error occurred.
324
 */
325
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
326
{
327
        // if buffer is full, discard the message and return an error flag
328
        int index = next_S();
329
        if (index<0) {
330
                // free(buffer_ptr);
331
                return -1;
332
        }
333
        sendingBuffer[index] = realloc(sendingBuffer[index],buffer_size);
334
        memset(sendingBuffer[index],0,buffer_size);
335
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
336
        // free(buffer_ptr);
337
        msgData_cb *p = malloc(sizeof(msgData_cb));
338
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
339
        int current = p->bIdx;
340
        send_params params = {0,0,0,0};
341
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
342
        if (to->connID<0) {
343
                free(sendingBuffer[current]);
344
                sendingBuffer[current] = NULL;
345
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
346
                free(p);
347
                return -1;
348
        }
349
        else {
350
                return buffer_size; //p->mSize;
351
        }
352

    
353
}
354

    
355

    
356
/**
357
 * Called by an application to receive data from remote peers
358
 * @param local
359
 * @param remote
360
 * @param buffer_ptr
361
 * @param buffer_size
362
 * @return The number of received bytes or -1 if some error occurred.
363
 */
364
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
365
{
366
        int size;
367
        // this should never happen... if it does, index handling is faulty...
368
        if (receivedBuffer[rIdx][1]==NULL) {
369
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
370
                return -1;
371
        }
372

    
373
        (*remote) = (nodeID*)(receivedBuffer[rIdx][1]);
374
        // retrieve a msg from the buffer
375
        size = *((int*)(receivedBuffer[rIdx][0]));
376
        if (size>buffer_size) {
377
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
378
                return -1;
379
        }
380
        memcpy(buffer_ptr, (receivedBuffer[rIdx][0])+sizeof(int), size);
381
        free(receivedBuffer[rIdx][0]);
382
        receivedBuffer[rIdx][0] = NULL;
383
        receivedBuffer[rIdx][1] = NULL;
384

    
385
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
386

    
387
        return size;
388
}
389

    
390

    
391
int wait4data(const struct nodeID *n, struct timeval *tout) {
392

    
393
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
394
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
395
        while(receivedBuffer[rIdx][0]==NULL && timeoutFired==0) {
396
        //        event_base_dispatch(base);
397
                event_base_loop(base,EVLOOP_ONCE);
398
        }
399
        timeoutFired = 0;
400
//        fprintf(stderr,"Back from eventlib loop.\n");
401

    
402
        if (receivedBuffer[rIdx][0]!=NULL)
403
                return 1;
404
        else
405
                return 0;
406
}
407

    
408

    
409

    
410

    
411
struct nodeID *create_node(const char *rem_IP, int rem_port) {
412
        struct nodeID *remote = malloc(sizeof(nodeID));
413
        if (remote == NULL) {
414
                return NULL;
415
        }
416
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
417
//        if (remote->addr == NULL) {
418
//                free(remote);
419
//                return NULL;
420
//        }
421
//        remote->addrSize = SOCKETID_SIZE;
422
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
423
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
424
        send_params params = {0,0,0,0};
425
        remote->connID = mlOpenConnection(remote->addr,&connReady_cb,NULL, params);
426
        remote->refcnt = 1;
427
        return remote;
428
}
429

    
430
// TODO: check why closing the connection is annoying for the ML
431
void nodeid_free(struct nodeID *n) {
432

    
433
//        mlCloseConnection(n->connID);
434
//        mlCloseSocket(n->addr);
435
//        free(n);
436
        if (n && (--(n->refcnt) == 0)) {
437
        //        mlCloseConnection(n->connID);
438
                mlCloseSocket(n->addr);
439
                free(n);
440
        }
441
}
442

    
443

    
444
const char *node_addr(const struct nodeID *s)
445
{
446
  static char addr[256];
447
  // TODO: mlSocketIDToString always return 0 !!!
448
  int r = mlSocketIDToString(s->addr,addr,256);
449
  if (!r)
450
          return addr;
451
  else
452
          return "";
453
}
454

    
455
struct nodeID *nodeid_dup(struct nodeID *s)
456
{
457
//  struct nodeID *res;
458
//
459
//  res = malloc(sizeof(struct nodeID));
460
//  if (res != NULL) {
461
//          res->addr = malloc(SOCKETID_SIZE);
462
//          if (res->addr != NULL) {
463
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
464
//        //         res->addrSize = SOCKETID_SIZE;
465
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
466
//                 res->connID = s->connID;
467
//          }
468
//          else {
469
//                free(res);
470
//                res = NULL;
471
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
472
//          }
473
//  }
474
//        return res;
475
        s->refcnt++;
476
        return s;
477
}
478

    
479
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
480
{
481
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
482
}
483

    
484
int nodeid_dump(uint8_t *b, const struct nodeID *s)
485
{
486
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
487
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
488

    
489
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
490
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
491

    
492
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
493
}
494

    
495
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
496
{
497
  struct nodeID *res;
498
  res = malloc(sizeof(struct nodeID));
499
  if (res != NULL) {
500
          memset(res,0,sizeof(struct nodeID));
501
          res->addr = malloc(SOCKETID_SIZE);
502
          if (res->addr != NULL) {
503
                  memset(res->addr,0,SOCKETID_SIZE);
504
                  //memcpy(res->addr, b, SOCKETID_SIZE);
505
                  //*len = SOCKETID_SIZE;
506
                  *len = strlen((char*)b) + 1;
507
                  mlStringToSocketID((char *)b,res->addr);
508
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
509
        //          res->addrSize = SOCKETID_SIZE;
510
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
511
                  res->connID = -1;
512
                  res->refcnt = 1;
513
          }
514
          else {
515
                  free(res);
516
                  res = NULL;
517
                  // TODO: what about *len in this case???
518
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
519
          }
520
  }
521

    
522

    
523
  return res;
524
}