Statistics
| Branch: | Revision:

grapes / som / net_helper-ml.c @ 2694dce1

History | View | Annotate | Download (14.1 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see GPL.txt
5
 *
6
 */
7
#include <netinet/in.h>
8
#include <sys/uio.h>
9
#include "util/udpSocket.h"
10
#include <sys/socket.h>
11

    
12
#include <event2/event.h>
13
#include <arpa/inet.h>
14
#include <unistd.h>
15
#include <stdlib.h>
16
#include <stdio.h>
17
#include <string.h>
18

    
19

    
20
#include "net_helper.h"
21
#include "ml.h"
22
#include "ml_helpers.h"
23

    
24
/**
25
 * libevent pointer
26
 */
27
struct event_base *base;
28

    
29
#define NH_BUFFER_SIZE 100
30
#define NH_MSG_MAXSIZE 1024
31

    
32
static int sIdx = 0;
33
static int rIdx = 0;
34

    
35
typedef struct nodeID {
36
        socketID_handle addr;
37
        int connID;        // connection associated to this node, -1 if myself
38
        int refcnt;
39
//        int addrSize;
40
//        int addrStringSize;
41
} nodeID;
42

    
43
typedef struct msgData_cb {
44
        int bIdx;        // index of the message in the proper buffer
45
        unsigned char msgType; // message type
46
        int mSize;        // message size
47
} msgData_cb;
48

    
49
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
50
static int timeoutFired = 0;
51

    
52
// pointers to the msgs to be send
53
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
54
// pointers to the received msgs + sender nodeID
55
static uint8_t *receivedBuffer[NH_BUFFER_SIZE][2];
56

    
57

    
58
/**
59
 * Look for a free slot in the received buffer and allocates it for immediate use
60
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
61
 */
62
static int next_R() {
63
        if (receivedBuffer[rIdx][0]==NULL) {
64
                receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
65
        }
66
        else {
67
                int count;
68
                for (count=0;count<NH_BUFFER_SIZE;count++) {
69
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
70
                        if (receivedBuffer[rIdx][0]==NULL)
71
                                break;
72
                }
73
                if (count==NH_BUFFER_SIZE)
74
                        return -1;
75
                else {
76
                        receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
77
                }
78
        }
79
        memset(receivedBuffer[rIdx][0],0,NH_MSG_MAXSIZE);
80
        return rIdx;
81
}
82

    
83
/**
84
 * Look for a free slot in the sending buffer and allocates it for immediate use
85
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
86
 */
87
static int next_S() {
88
        if (sendingBuffer[sIdx]==NULL) {
89
                sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
90
        }
91
        else {
92
                int count;
93
                for (count=0;count<NH_BUFFER_SIZE;count++) {
94
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
95
                        if (sendingBuffer[sIdx]==NULL)
96
                                break;
97
                }
98
                if (count==NH_BUFFER_SIZE)
99
                        return -1;
100
                else {
101
                        sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
102
                }
103
        }
104
        memset(sendingBuffer[sIdx],0,NH_MSG_MAXSIZE);
105
        return sIdx;
106
}
107

    
108
/**
109
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
110
 * @param local_socketID
111
 * @param errorstatus
112
 */
113
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
114
        switch (errorstatus) {
115
        case 0:
116
                //
117
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
118
        //        me->addrSize = SOCKETID_SIZE;
119
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
120
                me->connID = -1;
121
                me->refcnt = 1;
122
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
123
                break;
124
        case -1:
125
                //
126
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
127
                exit(1);
128
                break;
129
        case 1:
130
                //
131
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
132
                exit(1);
133
                break;
134
        case 2:
135
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
136
            exit(2);
137
            break;
138
        default :        // should never happen
139
                //
140
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
141
        }
142

    
143
}
144

    
145
/**
146
 * Timeout callback to be set in the eventlib loop as needed
147
 * @param socket
148
 * @param flag
149
 * @param arg
150
 */
151
static void t_out_cb (int socket, short flag, void* arg) {
152

    
153
        timeoutFired = 1;
154
//        fprintf(stderr,"TIMEOUT!!!\n");
155
//        event_base_loopbreak(base);
156
}
157

    
158
/**
159
 * Callback called by ml when a remote node ask for a connection
160
 * @param connectionID
161
 * @param arg
162
 */
163
static void receive_conn_cb(int connectionID, void *arg) {
164
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
165

    
166
}
167

    
168
/**
169
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
170
 * @param connectionID
171
 * @param arg
172
 */
173
static void connReady_cb (int connectionID, void *arg) {
174

    
175
        msgData_cb *p;
176
        p = (msgData_cb *)arg;
177
        if (p == NULL) return;
178
        send_params params = {0,0,0,0};
179
        send_Data(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,&params);
180
        free(sendingBuffer[p->bIdx]);
181
        sendingBuffer[p->bIdx] = NULL;
182
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
183
        //        event_base_loopbreak(base);
184
}
185

    
186
/**
187
 * Callback called by ml when a connection error occurs
188
 * @param connectionID
189
 * @param arg
190
 */
191
static void connError_cb (int connectionID, void *arg) {
192
        // simply get rid of the msg in the buffer....
193
        msgData_cb *p;
194
        p = (msgData_cb *)arg;
195
        if (p != NULL) {
196
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
197
                free(sendingBuffer[p->bIdx]);
198
                sendingBuffer[p->bIdx] = NULL;
199
                free(p);
200
        }
201
        //        event_base_loopbreak(base);
202
}
203

    
204
/**
205
 * Callback to receive data from ml
206
 * @param buffer
207
 * @param buflen
208
 * @param msgtype
209
 * @param arg
210
 */
211
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
212
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
213
//        fprintf(stderr, "Net-helper : called back with some news...\n");
214
        char str[SOCKETID_STRING_SIZE];
215
        if (arg->remote_socketID != NULL)
216
                socketID_To_String(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
217
        else
218
                sprintf(str,"!Unknown!");
219
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
220
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
221
        }
222
        else {
223
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
224
                // buffering the received message only if possible, otherwise ignore it...
225
                int index = next_R();
226
                if (index >=0) {
227
                //        receivedBuffer[index][0] = malloc(buflen);
228
                        if (receivedBuffer[index][0] == NULL) {
229
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
230
                                return;
231
                        }
232
                        // creating a new sender nodedID
233
                        receivedBuffer[index][1] = malloc(sizeof(nodeID));
234
                        if (receivedBuffer[index][1]==NULL) {
235
                                free (receivedBuffer[index][0]);
236
                                receivedBuffer[index][0] = NULL;
237
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
238
                                return;
239
                        }
240
                        else {
241
                                memset(receivedBuffer[index][1], 0, sizeof(struct nodeID));
242
                                nodeID *remote; remote = (nodeID*)(receivedBuffer[index][1]);
243
                                receivedBuffer[index][0] = realloc(receivedBuffer[index][0],buflen+sizeof(int));
244
                                *(receivedBuffer[index][0]) = buflen;
245
                                memcpy((receivedBuffer[index][0])+sizeof(int),buffer,buflen);
246
                                  // get the socketID of the sender
247
                                remote->addr = malloc(SOCKETID_SIZE);
248
                                if (remote->addr == NULL) {
249
                                          free (remote);
250
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
251
                                          return;
252
                                }
253
                                else {
254
                                        memset(remote->addr, 0, SOCKETID_SIZE);
255
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
256
                                //        remote->addrSize = SOCKETID_SIZE;
257
                                //        remote->addrStringSize = SOCKETID_STRING_SIZE;
258
                                        remote->connID = arg->connectionID;
259
                                        remote->refcnt = 1;
260
                                }
261
                        }
262
                }
263
  }
264
//        event_base_loopbreak(base);
265
}
266

    
267
/**
268
 * Initialize the self nodeID structure, the ml and create a socket in ml to get the knownHost
269
 * @param IPaddr
270
 * @param port
271
 * @param srv_IP
272
 * @param srv_port
273
 * @return NULL, because to have a valid nodeID you have to wait for ml to call back...
274
 */
275
struct nodeID *net_helper_init(const char *IPaddr, int port,unsigned char msgtypes[], int msgtypes_len) {
276

    
277
        struct timeval tout = {1, 0};
278
        base = event_base_new();
279

    
280
        me = malloc(sizeof(nodeID));
281
        if (me == NULL) {
282
                return NULL;
283
        }
284
        memset(me,0,sizeof(nodeID));
285
        me->addr = malloc(SOCKETID_SIZE);
286
        if (me->addr == NULL) {
287
                free(me);
288
                return NULL;
289
        }
290
        memset(me->addr,0,SOCKETID_SIZE);
291
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
292
        me->refcnt = 1;
293

    
294
        int i;
295
        for (i=0;i<NH_BUFFER_SIZE;i++) {
296
                sendingBuffer[i] = NULL;
297
                receivedBuffer[i][0] = NULL;
298
        }
299

    
300
        register_Error_connection_cb(&connError_cb);
301
        register_Recv_connection_cb(&receive_conn_cb);
302
        for (i=0;i<msgtypes_len;i++) {
303
                register_Recv_data_cb(&recv_data_cb,msgtypes[i]);
304
        }
305
        init_messaging_layer(1,tout,port,IPaddr,0,NULL,&init_myNodeID_cb,base);
306
        while (me->connID<-1) {
307
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
308
                event_base_loop(base,EVLOOP_ONCE);
309
        }
310
        timeoutFired = 0;
311
//        fprintf(stderr,"Net-helper init : back from init!\n");
312

    
313
        return me;
314
}
315

    
316

    
317
/**
318
 * Called by the application to send data to a remote peer
319
 * @param from
320
 * @param to
321
 * @param buffer_ptr
322
 * @param buffer_size
323
 * @return The dimension of the buffer or -1 if a connection error occurred.
324
 */
325
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
326
{
327
        // if buffer is full, discard the message and return an error flag
328
        int index = next_S();
329
        if (index<0) {
330
                // free(buffer_ptr);
331
                return -1;
332
        }
333
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
334
        // free(buffer_ptr);
335
        msgData_cb *p = malloc(sizeof(msgData_cb));
336
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
337
        int current = p->bIdx;
338
        to->connID = open_Connection(to->addr,&connReady_cb,p);
339
        if (to->connID<0) {
340
                free(sendingBuffer[current]);
341
                sendingBuffer[current] = NULL;
342
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
343
                free(p);
344
                return -1;
345
        }
346
        else {
347
        //        fprintf(stderr,"Net-helper: Got a connection ID to send msg %d to %s.\n ",current,node_addr(to));
348
        //                struct timeval tout = {0,500};
349
        //                event_base_once(base,0, EV_TIMEOUT, &t_out_cb, NULL, &tout);
350
                while (sendingBuffer[current] != NULL)
351
                        event_base_loop(base,EVLOOP_ONCE);//  EVLOOP_NONBLOCK
352
//                fprintf(stderr,"Net-helper: Back from eventlib loop with status %d.\n", ok);
353
                free(p);
354
                return buffer_size;
355
        }
356

    
357
}
358

    
359

    
360
/**
361
 * Called by an application to receive data from remote peers
362
 * @param local
363
 * @param remote
364
 * @param buffer_ptr
365
 * @param buffer_size
366
 * @return The number of received bytes or -1 if some error occurred.
367
 */
368
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
369
{
370
        // this should never happen... if it does, index handling is faulty...
371
        if (receivedBuffer[rIdx][1]==NULL) {
372
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
373
                return -1;
374
        }
375

    
376
        (*remote) = (nodeID*)(receivedBuffer[rIdx][1]);
377
        // retrieve a msg from the buffer
378
        memcpy(buffer_ptr, (receivedBuffer[rIdx][0])+sizeof(int), buffer_size);
379
        int size = (int)(*(receivedBuffer[rIdx][0]));
380
        free(receivedBuffer[rIdx][0]);
381
        receivedBuffer[rIdx][0] = NULL;
382
        receivedBuffer[rIdx][1] = NULL;
383

    
384
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
385

    
386
        return size;
387
}
388

    
389

    
390
int wait4data(const struct nodeID *n, struct timeval *tout) {
391

    
392
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
393
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
394
        while(receivedBuffer[rIdx][0]==NULL && timeoutFired==0) {
395
        //        event_base_dispatch(base);
396
                event_base_loop(base,EVLOOP_ONCE);
397
        }
398
        timeoutFired = 0;
399
//        fprintf(stderr,"Back from eventlib loop.\n");
400

    
401
        if (receivedBuffer[rIdx][0]!=NULL)
402
                return 1;
403
        else
404
                return 0;
405
}
406

    
407

    
408

    
409

    
410
struct nodeID *create_node(const char *rem_IP, int rem_port) {
411
        struct nodeID *remote = malloc(sizeof(nodeID));
412
        if (remote == NULL) {
413
                return NULL;
414
        }
415
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
416
//        if (remote->addr == NULL) {
417
//                free(remote);
418
//                return NULL;
419
//        }
420
//        remote->addrSize = SOCKETID_SIZE;
421
//        remote->addrStringSize = SOCKETID_STRING_SIZE;
422
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
423
        remote->connID = open_Connection(remote->addr,&connReady_cb,NULL);
424
        remote->refcnt = 1;
425
        return remote;
426
}
427

    
428
// TODO: check why closing the connection is annoying for the ML
429
void delete_node(struct nodeID *n) {
430

    
431
//        close_Connection(n->connID);
432
//        close_Socket(n->addr);
433
//        free(n);
434
        if (n && (--(n->refcnt) == 0)) {
435
        //        close_Connection(n->connID);
436
                close_Socket(n->addr);
437
                free(n);
438
        }
439
}
440

    
441

    
442
const char *node_addr(const struct nodeID *s)
443
{
444
  static char addr[256];
445
  // TODO: socketID_To_String always return 0 !!!
446
  int r = socketID_To_String(s->addr,addr,256);
447
  if (!r)
448
          return addr;
449
  else
450
          return "";
451
}
452

    
453
struct nodeID *nodeid_dup(struct nodeID *s)
454
{
455
//  struct nodeID *res;
456
//
457
//  res = malloc(sizeof(struct nodeID));
458
//  if (res != NULL) {
459
//          res->addr = malloc(SOCKETID_SIZE);
460
//          if (res->addr != NULL) {
461
//                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
462
//        //         res->addrSize = SOCKETID_SIZE;
463
//        //         res->addrStringSize = SOCKETID_STRING_SIZE;
464
//                 res->connID = s->connID;
465
//          }
466
//          else {
467
//                free(res);
468
//                res = NULL;
469
//                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
470
//          }
471
//  }
472
//        return res;
473
        s->refcnt++;
474
        return s;
475
}
476

    
477
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
478
{
479
        return (compare_socketIDs(s1->addr,s2->addr) == 0);
480
}
481

    
482
int nodeid_dump(uint8_t *b, const struct nodeID *s)
483
{
484
  socketID_To_String(s->addr,(char *)b,SOCKETID_STRING_SIZE);
485
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
486
  return strlen((char *)b);
487

    
488
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
489
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
490

    
491
}
492

    
493
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
494
{
495
  struct nodeID *res;
496
  res = malloc(sizeof(struct nodeID));
497
  if (res != NULL) {
498
          memset(res,0,sizeof(struct nodeID));
499
          res->addr = malloc(SOCKETID_SIZE);
500
          if (res->addr != NULL) {
501
                  memset(res->addr,0,SOCKETID_SIZE);
502
                  //memcpy(res->addr, b, SOCKETID_SIZE);
503
                  //*len = SOCKETID_SIZE;
504
                  *len = strlen((char*)b);
505
                  string_To_SocketID((char *)b,res->addr);
506
        //          fprintf(stderr,"Node undumped : %s\n",node_addr(res));
507
        //          res->addrSize = SOCKETID_SIZE;
508
        //          res->addrStringSize = SOCKETID_STRING_SIZE;
509
                  res->connID = -1;
510
                  res->refcnt = 1;
511
          }
512
          else {
513
                  free(res);
514
                  res = NULL;
515
                  // TODO: what about *len in this case???
516
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
517
          }
518
  }
519

    
520

    
521
  return res;
522
}