Statistics
| Branch: | Revision:

streamers / net_helper-ml.c @ 22354b6f

History | View | Annotate | Download (13.1 KB)

1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see GPL.txt
5
 *
6
 */
7

    
8
#include <event2/event.h>
9
#include <arpa/inet.h>
10
#include <unistd.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <string.h>
14

    
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

    
19
/**
20
 * libevent pointer
21
 */
22
struct event_base *base;
23

    
24
#define NH_BUFFER_SIZE 100
25
#define NH_MSG_MAXSIZE 1024
26

    
27
static int sIdx = 0;
28
static int rIdx = 0;
29

    
30
typedef struct nodeID {
31
        socketID_handle addr;
32
        int connID;        // connection associated to this node, -1 if myself
33
        int addrSize;
34
        int addrStringSize;
35
} nodeID;
36

    
37
typedef struct msgData_cb {
38
        int bIdx;        // index of the message in the proper buffer
39
        unsigned char msgType; // message type
40
        int mSize;        // message size
41
} msgData_cb;
42

    
43
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
44
static int timeoutFired = 0;
45

    
46
// pointers to the msgs to be send
47
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
48
// pointers to the received msgs + sender nodeID
49
static uint8_t *receivedBuffer[NH_BUFFER_SIZE][2];
50

    
51

    
52
/**
53
 * Look for a free slot in the received buffer and allocates it for immediate use
54
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
55
 */
56
static int next_R() {
57
        if (receivedBuffer[rIdx][0]==NULL) {
58
                receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
59
        }
60
        else {
61
                int count;
62
                for (count=0;count<NH_BUFFER_SIZE;count++) {
63
                        rIdx = (++rIdx)%NH_BUFFER_SIZE;
64
                        if (receivedBuffer[rIdx][0]==NULL)
65
                                break;
66
                }
67
                if (count==NH_BUFFER_SIZE)
68
                        return -1;
69
                else {
70
                        receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
71
                }
72
        }
73
        memset(receivedBuffer[rIdx][0],0,NH_MSG_MAXSIZE);
74
        return rIdx;
75
}
76

    
77
/**
78
 * Look for a free slot in the sending buffer and allocates it for immediate use
79
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
80
 */
81
static int next_S() {
82
        if (sendingBuffer[sIdx]==NULL) {
83
                sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
84
        }
85
        else {
86
                int count;
87
                for (count=0;count<NH_BUFFER_SIZE;count++) {
88
                        sIdx = (++sIdx)%NH_BUFFER_SIZE;
89
                        if (sendingBuffer[sIdx]==NULL)
90
                                break;
91
                }
92
                if (count==NH_BUFFER_SIZE)
93
                        return -1;
94
                else {
95
                        sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
96
                }
97
        }
98
        memset(sendingBuffer[sIdx],0,NH_MSG_MAXSIZE);
99
        return sIdx;
100
}
101

    
102
/**
103
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
104
 * @param local_socketID
105
 * @param errorstatus
106
 */
107
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
108
        switch (errorstatus) {
109
        case 0:
110
                //
111
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
112
                me->addrSize = SOCKETID_SIZE;
113
                me->addrStringSize = SOCKETID_STRING_SIZE;
114
                me->connID = -1;
115
                fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
116
                break;
117
        case -1:
118
                //
119
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
120
                break;
121
        case 1:
122
                //
123
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
124
                break;
125
        case 2:
126
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
127
            break;
128
        default :        // should never happen
129
                //
130
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
131
        }
132

    
133
}
134

    
135
/**
136
 * Timeout callback to be set in the eventlib loop as needed
137
 * @param socket
138
 * @param flag
139
 * @param arg
140
 */
141
static void t_out_cb (int socket, short flag, void* arg) {
142

    
143
        timeoutFired = 1;
144
        fprintf(stderr,"TIMEOUT!!!\n");
145
//        event_base_loopbreak(base);
146
}
147

    
148
/**
149
 * Callback called by ml when a remote node ask for a connection
150
 * @param connectionID
151
 * @param arg
152
 */
153
static void receive_conn_cb(int connectionID, void *arg) {
154
    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
155

    
156
}
157

    
158
/**
159
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
160
 * @param connectionID
161
 * @param arg
162
 */
163
static void connReady_cb (int connectionID, void *arg) {
164

    
165
        msgData_cb *p;
166
        p = (msgData_cb *)arg;
167
        if (p == NULL) return;
168
        send_params params = {0,0,0,0};
169
        send_Data(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,&params);
170
        free(sendingBuffer[p->bIdx]);
171
        sendingBuffer[p->bIdx] = NULL;
172
        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
173
        //        event_base_loopbreak(base);
174
}
175

    
176
/**
177
 * Callback called by ml when a connection error occurs
178
 * @param connectionID
179
 * @param arg
180
 */
181
static void connError_cb (int connectionID, void *arg) {
182
        // simply get rid of the msg in the buffer....
183
        msgData_cb *p;
184
        p = (msgData_cb *)arg;
185
        if (p != NULL) {
186
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
187
                free(sendingBuffer[p->bIdx]);
188
                sendingBuffer[p->bIdx] = NULL;
189
                free(p);
190
        }
191
        //        event_base_loopbreak(base);
192
}
193

    
194
/**
195
 * Callback to receive data from ml
196
 * @param buffer
197
 * @param buflen
198
 * @param msgtype
199
 * @param arg
200
 */
201
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
202
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
203
//        fprintf(stderr, "Net-helper : called back with some news...\n");
204
        char str[SOCKETID_STRING_SIZE]; //str[SOCKETID_STRING_SIZE]=0;
205
        if (arg->remote_socketID != NULL)
206
                socketID_To_String(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
207
        else
208
                sprintf(str,"!Unknown!");
209
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
210
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
211
        }
212
        else {
213
                fprintf(stderr, "Net-helper : message arrived from %s\n",str);
214
                // buffering the received message only if possible, otherwise ignore it...
215
                int index = next_R();
216
                if (index >=0) {
217
                //        receivedBuffer[index][0] = malloc(buflen);
218
                        if (receivedBuffer[index][0] == NULL) {
219
                                fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
220
                                return;
221
                        }
222
                        // creating a new sender nodedID
223
                        receivedBuffer[index][1] = malloc(sizeof(nodeID));
224
                        if (receivedBuffer[index][1]==NULL) {
225
                                free (receivedBuffer[index][0]);
226
                                receivedBuffer[index][0] = NULL;
227
                                fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
228
                                return;
229
                        }
230
                        else {
231
                                nodeID *remote; remote = (nodeID*)(receivedBuffer[index][1]);
232
                                memcpy(receivedBuffer[index][0],buffer,buflen);
233
                                  // get the socketID of the sender
234
                                remote->addr = malloc(SOCKETID_SIZE);
235
                                if (remote->addr == NULL) {
236
                                          free (remote);
237
                                          fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
238
                                          return;
239
                                }
240
                                else {
241
                                        memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
242
                                        remote->addrSize = SOCKETID_SIZE;
243
                                        remote->addrStringSize = SOCKETID_STRING_SIZE;
244
                                        remote->connID = arg->connectionID;
245
                                }
246
                        }
247
                }
248
  }
249
//        event_base_loopbreak(base);
250
}
251

    
252
/**
253
 * Initialize the self nodeID structure, the ml and create a socket in ml to get the knownHost
254
 * @param IPaddr
255
 * @param port
256
 * @param srv_IP
257
 * @param srv_port
258
 * @return NULL, because to have a valid nodeID you have to wait for ml to call back...
259
 */
260
struct nodeID *net_helper_init(const char *IPaddr, int port,unsigned char msgtypes[], int msgtypes_len) {
261

    
262
        struct timeval tout = {1, 0};
263
        base = event_base_new();
264

    
265
        me = malloc(sizeof(nodeID));
266
        if (me == NULL) {
267
                return NULL;
268
        }
269
        me->addr = malloc(SOCKETID_SIZE);
270
        if (me->addr == NULL) {
271
                free(me);
272
                return NULL;
273
        }
274
        me->addrSize = 0;        // dirty trick to spot later if the ml has called back ...
275

    
276
        int i;
277
        for (i=0;i<NH_BUFFER_SIZE;i++) {
278
                sendingBuffer[i] = NULL;
279
                receivedBuffer[i][0] = NULL;
280
        }
281

    
282
        register_Error_connection_cb(&connError_cb);
283
        register_Recv_connection_cb(&receive_conn_cb);
284
        for (i=0;i<msgtypes_len;i++) {
285
                register_Recv_data_cb(&recv_data_cb,msgtypes[i]);
286
        }
287
        init_messaging_layer(1,tout,port,IPaddr,0,NULL,&init_myNodeID_cb,base);
288
//        fprintf(stderr,"Net-helper init : back from init!\n");
289
//        register_recv_localsocketID_cb(init_myNodeID_cb);
290
        return me;
291
}
292

    
293

    
294
/**
295
 * Called by the application to send data to a remote peer
296
 * @param from
297
 * @param to
298
 * @param buffer_ptr
299
 * @param buffer_size
300
 * @return The number of sent bytes or -1 if a connection error occurred.
301
 */
302
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
303
{
304
        // if buffer is full, discard the message and return an error flag
305
        int index = next_S();
306
        if (index<0) {
307
                // free(buffer_ptr);
308
                return -1;
309
        }
310
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
311
        // free(buffer_ptr);
312
        msgData_cb *p = malloc(sizeof(msgData_cb));
313
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
314
        int current = p->bIdx;
315
        to->connID = open_Connection(to->addr,&connReady_cb,p);
316
        if (to->connID<0) {
317
                free(sendingBuffer[current]);
318
                sendingBuffer[current] = NULL;
319
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
320
        }
321
        else {
322
                fprintf(stderr,"Net-helper: Got a connection ID to send msg %d to %s.\n ",
323
                        current,node_addr(to));
324
        //                struct timeval tout = {0,500};
325
        //                event_base_once(base,0, EV_TIMEOUT, &t_out_cb, NULL, &tout);
326
                while (sendingBuffer[current] != NULL)
327
                        event_base_loop(base,EVLOOP_ONCE);//  EVLOOP_NONBLOCK
328
//                fprintf(stderr,"Net-helper: Back from eventlib loop with status %d.\n", ok);
329
        }
330
        free(p);
331
        return to->connID;
332

    
333
}
334

    
335

    
336
/**
337
 * Called by an application to receive data from remote peers
338
 * @param local
339
 * @param remote
340
 * @param buffer_ptr
341
 * @param buffer_size
342
 * @return The number of received bytes or -1 if some error occurred.
343
 */
344
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
345
{
346
        // this should never happen... if it does, index handling is faulty...
347
        if (receivedBuffer[rIdx][1]==NULL) {
348
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
349
                return -1;
350
        }
351

    
352
        (*remote) = (nodeID*)(receivedBuffer[rIdx][1]);
353
        // retrieve a msg from the buffer
354
        memcpy(buffer_ptr, receivedBuffer[rIdx][0], buffer_size);
355

    
356
        free(receivedBuffer[rIdx][0]);
357
        receivedBuffer[rIdx][0] = NULL;
358
        receivedBuffer[rIdx][1] = NULL;
359

    
360
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
361

    
362
        return buffer_size;
363
}
364

    
365

    
366
int wait4data(const struct nodeID *n, struct timeval *tout) {
367

    
368
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
369
        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
370
        while(receivedBuffer[rIdx][0]==NULL && timeoutFired==0) {
371
        //        event_base_dispatch(base);
372
                event_base_loop(base,EVLOOP_ONCE);
373
        }
374
        timeoutFired = 0;
375
//        fprintf(stderr,"Back from eventlib loop.\n");
376

    
377
        if (receivedBuffer[rIdx][0]!=NULL)
378
                return 1;
379
        else
380
                return 0;
381
}
382

    
383

    
384

    
385

    
386
struct nodeID *create_node(const char *rem_IP, int rem_port) {
387
        struct nodeID *remote = malloc(sizeof(nodeID));
388
        if (remote == NULL) {
389
                return NULL;
390
        }
391
//        remote->addr = malloc(sizeof(SOCKETID_SIZE));
392
//        if (remote->addr == NULL) {
393
//                free(remote);
394
//                return NULL;
395
//        }
396
        remote->addrSize = SOCKETID_SIZE;
397
        remote->addrStringSize = SOCKETID_STRING_SIZE;
398
        remote->addr = getRemoteSocketID(rem_IP, rem_port);
399
        remote->connID = open_Connection(remote->addr,&connReady_cb,NULL);
400
        return remote;
401
}
402

    
403
void delete_node(struct nodeID *n) {
404
// TODO: check why closing the connection is annoying for the ML
405
//        close_Connection(n->connID);
406
        close_Socket(n->addr);
407
        free(n);
408
}
409

    
410

    
411
const char *node_addr(const struct nodeID *s)
412
{
413
  static char addr[256];
414
  // TODO: socketID_To_String always return 0 !!!
415
  int r = socketID_To_String(s->addr,addr,256);
416
  if (!r)
417
          return addr;
418
  else
419
          return "";
420
}
421

    
422
struct nodeID *nodeid_dup(const struct nodeID *s)
423
{
424
  struct nodeID *res;
425

    
426
  res = malloc(sizeof(struct nodeID));
427
  if (res != NULL) {
428
          res->addr = malloc(SOCKETID_SIZE);
429
          if (res->addr != NULL) {
430
                 memcpy(res->addr, s->addr, SOCKETID_SIZE);
431
                 res->addrSize = SOCKETID_SIZE;
432
                 res->addrStringSize = SOCKETID_STRING_SIZE;
433
                 res->connID = s->connID;
434
          }
435
          else {
436
                free(res);
437
                res = NULL;
438
                fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
439
          }
440
  }
441

    
442
  return res;
443
}
444

    
445
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
446
{
447
  return (memcmp(&s1->addr, &s2->addr, SOCKETID_SIZE) == 0);
448
}
449

    
450
int nodeid_dump(uint8_t *b, const struct nodeID *s)
451
{
452
  socketID_To_String(s->addr,(char *)b,SOCKETID_STRING_SIZE);
453
  fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
454
//  return SOCKETID_STRING_SIZE;
455
  return strlen((char *)b);
456

    
457
//        memcpy(b, s->addr, SOCKETID_SIZE);
458
//        return SOCKETID_SIZE;
459

    
460
}
461

    
462
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
463
{
464
  struct nodeID *res;
465
  res = malloc(sizeof(struct nodeID));
466
  if (res != NULL) {
467
          res->addr = malloc(SOCKETID_SIZE);
468
          if (res->addr != NULL) {
469
                  // memcpy(res->addr, b, SOCKETID_SIZE);
470
                  *len = strlen((char*)b);
471
//                  char str[(*len)+1]; str[(*len)]= '\0';
472
//                  snprintf(str,(*len),"%s",(char *)b);
473
                  string_To_SocketID((char *)b/*str*/,res->addr);
474
                  res->addrSize = SOCKETID_SIZE;
475
                  res->addrStringSize = SOCKETID_STRING_SIZE;
476
                  res->connID = -1;
477
          }
478
          else {
479
                  free(res);
480
                  res = NULL;
481
                  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
482
          }
483
  }
484
//  *len = SOCKETID_STRING_SIZE; // SOCKETID_SIZE; //sizeof(struct nodeID);
485

    
486
  return res;
487
}