Statistics
| Branch: | Revision:

grapes / src / net_helper-ml.c @ 8e0afb75

History | View | Annotate | Download (17.3 KB)

1 574aabfa MarcoBiazzini
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4 ca44152b CsabaKiraly
 *  This is free software; see lgpl-2.1.txt
5 574aabfa MarcoBiazzini
 */
6
7
#include <event2/event.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12 22afdba3 Csaba Kiraly
#include <stdbool.h>
13 574aabfa MarcoBiazzini
#include <string.h>
14 8e2d87c7 Csaba Kiraly
#include <assert.h>
15 7a6c9ad9 CsabaKiraly
#include <signal.h>
16 2694dce1 MarcoBiazzini
17 574aabfa MarcoBiazzini
#include "net_helper.h"
18
#include "ml.h"
19 c919f1bf Csaba Kiraly
#include "config.h"
20 574aabfa MarcoBiazzini
21 5941d7a1 Csaba Kiraly
#include "grapes_msg_types.h"
22 df8f8ee0 MarcoBiazzini
23 ece5c501 CsabaKiraly
24
#ifdef MONL
25
#include "mon.h"
26
#include "grapes_log.h"
27
#include "repoclient.h"
28
#include "grapes.h"
29
#endif
30
31
32 574aabfa MarcoBiazzini
/**
33
 * libevent pointer
34
 */
35
struct event_base *base;
36
37 432f93b8 CsabaKiraly
#define NH_BUFFER_SIZE 1000
38 0f35a7cd Csaba Kiraly
#define NH_LOOKUP_SIZE 1000
39 8551dbe5 CsabaKiraly
#define NH_PACKET_TIMEOUT {0, 500*1000}
40 560add79 CsabaKiraly
#define NH_ML_INIT_TIMEOUT {1, 0}
41 574aabfa MarcoBiazzini
42 22afdba3 Csaba Kiraly
#define FDSSIZE 16
43
44 574aabfa MarcoBiazzini
static int sIdx = 0;
45 8e2d87c7 Csaba Kiraly
static int rIdxML = 0;        //reveive from ML to this buffer position
46
static int rIdxUp = 0;        //hand up to layer above at this buffer position
47 574aabfa MarcoBiazzini
48
typedef struct nodeID {
49
        socketID_handle addr;
50
        int connID;        // connection associated to this node, -1 if myself
51 2694dce1 MarcoBiazzini
        int refcnt;
52 ece5c501 CsabaKiraly
#ifdef MONL
53
        //n quick and dirty static vector for measures TODO: make it dinamic
54 0bffcee4 CsabaKiraly
        MonHandler mhs[20];
55 ece5c501 CsabaKiraly
        int n_mhs;
56
#endif
57 2694dce1 MarcoBiazzini
//        int addrSize;
58
//        int addrStringSize;
59 574aabfa MarcoBiazzini
} nodeID;
60
61
typedef struct msgData_cb {
62
        int bIdx;        // index of the message in the proper buffer
63
        unsigned char msgType; // message type
64
        int mSize;        // message size
65 30453328 CsabaKiraly
        bool conn_cb_called;
66
        bool cancelled;
67 574aabfa MarcoBiazzini
} msgData_cb;
68
69 0f35a7cd Csaba Kiraly
static struct nodeID **lookup_array;
70 b8cbbdcc Csaba Kiraly
static int lookup_max = NH_LOOKUP_SIZE;
71 0f35a7cd Csaba Kiraly
static int lookup_curr = 0;
72
73 574aabfa MarcoBiazzini
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
74
static int timeoutFired = 0;
75 22afdba3 Csaba Kiraly
static bool fdTriggered = false;
76 574aabfa MarcoBiazzini
77
// pointers to the msgs to be send
78
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
79
// pointers to the received msgs + sender nodeID
80 ab61e3b6 CsabaKiraly
struct receivedB {
81
        struct nodeID *id;
82
        int len;
83
        uint8_t *data;
84
};
85
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
86 d741d4f3 MarcoBiazzini
/**/ static int recv_counter =0;
87 df8f8ee0 MarcoBiazzini
88 574aabfa MarcoBiazzini
89 bd360b95 Csaba Kiraly
static void connReady_cb (int connectionID, void *arg);
90
static struct nodeID *new_node(socketID_handle peer) {
91
        send_params params = {0,0,0,0};
92
        struct nodeID *res = malloc(sizeof(struct nodeID));
93
        if (!res) {
94
                 fprintf(stderr, "Net-helper : memory error\n");
95 0f35a7cd Csaba Kiraly
                 return NULL;
96 bd360b95 Csaba Kiraly
        }
97
        memset(res, 0, sizeof(struct nodeID));
98
99
        res->addr = malloc(SOCKETID_SIZE);
100
        if (! res->addr) {
101
                free (res);
102
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
103
                return NULL;
104
        }
105
        memset(res->addr, 0, SOCKETID_SIZE);
106
        memcpy(res->addr, peer ,SOCKETID_SIZE);
107 0f35a7cd Csaba Kiraly
108 bd360b95 Csaba Kiraly
        res->refcnt = 1;
109
110
        res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
111
112
        return res;
113 0f35a7cd Csaba Kiraly
}
114
115
116 bd360b95 Csaba Kiraly
static struct nodeID **id_lookup(socketID_handle target) {
117 0f35a7cd Csaba Kiraly
118
        int i,here=-1;
119
        for (i=0;i<lookup_curr;i++) {
120
                if (lookup_array[i] == NULL) {
121 bd360b95 Csaba Kiraly
                        if (here < 0) {
122 0f35a7cd Csaba Kiraly
                                here = i;
123
                        }
124 bd360b95 Csaba Kiraly
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
125
                        return &lookup_array[i];
126 0f35a7cd Csaba Kiraly
                }
127
        }
128
129 bd360b95 Csaba Kiraly
        if (here == -1) {
130
                here = lookup_curr++;
131 0f35a7cd Csaba Kiraly
        }
132
133 bd360b95 Csaba Kiraly
        if (lookup_curr > lookup_max) {
134 0f35a7cd Csaba Kiraly
                lookup_max *= 2;
135
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
136
        }
137
138 bd360b95 Csaba Kiraly
        lookup_array[here] = new_node(target);
139
        return &lookup_array[here];
140
}
141 0f35a7cd Csaba Kiraly
142 bd360b95 Csaba Kiraly
static struct nodeID *id_lookup_dup(socketID_handle target) {
143
        return nodeid_dup(*id_lookup(target));
144 0f35a7cd Csaba Kiraly
}
145
146
147 574aabfa MarcoBiazzini
/**
148
 * Look for a free slot in the received buffer and allocates it for immediate use
149
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
150
 */
151
static int next_R() {
152 8e2d87c7 Csaba Kiraly
        if (receivedBuffer[rIdxML].data==NULL) {
153
                int ret = rIdxML;
154
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
155
                return ret;
156
        } else {
157
                //TODO: handle receive overload situation!
158
                return -1;
159 574aabfa MarcoBiazzini
        }
160
}
161
162
/**
163
 * Look for a free slot in the sending buffer and allocates it for immediate use
164
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
165
 */
166
static int next_S() {
167 eca194c9 Csaba Kiraly
        if (sendingBuffer[sIdx]) {
168 574aabfa MarcoBiazzini
                int count;
169
                for (count=0;count<NH_BUFFER_SIZE;count++) {
170 eca194c9 Csaba Kiraly
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
171 574aabfa MarcoBiazzini
                        if (sendingBuffer[sIdx]==NULL)
172
                                break;
173
                }
174 eca194c9 Csaba Kiraly
                if (count==NH_BUFFER_SIZE) {
175 574aabfa MarcoBiazzini
                        return -1;
176
                }
177
        }
178
        return sIdx;
179
}
180
181 cd2a2a41 MarcoBiazzini
182 574aabfa MarcoBiazzini
/**
183
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
184
 * @param local_socketID
185
 * @param errorstatus
186
 */
187
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
188
        switch (errorstatus) {
189
        case 0:
190 bd360b95 Csaba Kiraly
                me->addr = malloc(SOCKETID_SIZE);
191
                if (! me->addr) {
192
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
193
                        return;
194
                }
195
196 574aabfa MarcoBiazzini
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
197 2694dce1 MarcoBiazzini
        //        me->addrSize = SOCKETID_SIZE;
198
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
199 574aabfa MarcoBiazzini
                me->connID = -1;
200 2694dce1 MarcoBiazzini
                me->refcnt = 1;
201
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
202 574aabfa MarcoBiazzini
                break;
203
        case -1:
204
                //
205
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
206 2694dce1 MarcoBiazzini
                exit(1);
207 574aabfa MarcoBiazzini
                break;
208
        case 1:
209
                //
210
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
211 2694dce1 MarcoBiazzini
                exit(1);
212 574aabfa MarcoBiazzini
                break;
213
        case 2:
214
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
215 0c487aac CsabaKiraly
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
216
            mlSetStunServer(0,NULL);
217 574aabfa MarcoBiazzini
            break;
218
        default :        // should never happen
219
                //
220
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
221
        }
222
223
}
224
225
/**
226
 * Timeout callback to be set in the eventlib loop as needed
227
 * @param socket
228
 * @param flag
229
 * @param arg
230
 */
231
static void t_out_cb (int socket, short flag, void* arg) {
232
233
        timeoutFired = 1;
234 2694dce1 MarcoBiazzini
//        fprintf(stderr,"TIMEOUT!!!\n");
235 574aabfa MarcoBiazzini
//        event_base_loopbreak(base);
236
}
237
238
/**
239 22afdba3 Csaba Kiraly
 * File descriptor readable callback to be set in the eventlib loop as needed
240
 */
241
static void fd_cb (int fd, short flag, void* arg)
242
{
243
  //fprintf(stderr, "\twait4data: fd %d triggered\n", fd);
244
  fdTriggered = true;
245
  *((bool*)arg) = true;
246
}
247
248
/**
249 574aabfa MarcoBiazzini
 * Callback called by ml when a remote node ask for a connection
250
 * @param connectionID
251
 * @param arg
252
 */
253
static void receive_conn_cb(int connectionID, void *arg) {
254 2694dce1 MarcoBiazzini
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
255 574aabfa MarcoBiazzini
256
}
257
258 30453328 CsabaKiraly
void free_sending_buffer(int i)
259
{
260
        free(sendingBuffer[i]);
261
        sendingBuffer[i] = NULL;
262
}
263
264 574aabfa MarcoBiazzini
/**
265
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
266
 * @param connectionID
267
 * @param arg
268
 */
269
static void connReady_cb (int connectionID, void *arg) {
270 d741d4f3 MarcoBiazzini
271 574aabfa MarcoBiazzini
        msgData_cb *p;
272
        p = (msgData_cb *)arg;
273
        if (p == NULL) return;
274 30453328 CsabaKiraly
        if (p->cancelled) {
275
            free(p);
276
            return;
277
        }
278 14922983 TivadarSzemethy
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
279 30453328 CsabaKiraly
        free_sending_buffer(p->bIdx);
280 2694dce1 MarcoBiazzini
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
281 574aabfa MarcoBiazzini
        //        event_base_loopbreak(base);
282 30453328 CsabaKiraly
        p->conn_cb_called = true;
283 574aabfa MarcoBiazzini
}
284
285
/**
286
 * Callback called by ml when a connection error occurs
287
 * @param connectionID
288
 * @param arg
289
 */
290
static void connError_cb (int connectionID, void *arg) {
291
        // simply get rid of the msg in the buffer....
292
        msgData_cb *p;
293
        p = (msgData_cb *)arg;
294
        if (p != NULL) {
295
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
296 30453328 CsabaKiraly
                if (p->cancelled) {
297
                        free(p);//p->mSize = -1;
298
                } else {
299
                        p->conn_cb_called = true;
300
                }
301 574aabfa MarcoBiazzini
        }
302
        //        event_base_loopbreak(base);
303
}
304
305 cd2a2a41 MarcoBiazzini
306 574aabfa MarcoBiazzini
/**
307
 * Callback to receive data from ml
308
 * @param buffer
309
 * @param buflen
310
 * @param msgtype
311
 * @param arg
312
 */
313
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
314
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
315 3f1c81a4 Csaba Kiraly
        char str[SOCKETID_STRING_SIZE];
316 574aabfa MarcoBiazzini
//        fprintf(stderr, "Net-helper : called back with some news...\n");
317 7930ec45 MarcoBiazzini
/**/ ++recv_counter;
318 574aabfa MarcoBiazzini
        if (arg->remote_socketID != NULL)
319 a6911c72 MarcoBiazzini
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
320 574aabfa MarcoBiazzini
        else
321
                sprintf(str,"!Unknown!");
322
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
323
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
324 ca79d43a CsabaKiraly
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
325 16adbb7b CsabaKiraly
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
326 574aabfa MarcoBiazzini
        }
327
        else {
328 2694dce1 MarcoBiazzini
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
329 574aabfa MarcoBiazzini
                // buffering the received message only if possible, otherwise ignore it...
330
                int index = next_R();
331 8e2d87c7 Csaba Kiraly
                if (index<0) {
332
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
333
                        return;
334
                } else {
335
                        receivedBuffer[index].data = malloc(buflen);
336 ab61e3b6 CsabaKiraly
                        if (receivedBuffer[index].data == NULL) {
337 8e2d87c7 Csaba Kiraly
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
338 574aabfa MarcoBiazzini
                                return;
339
                        }
340 8e2d87c7 Csaba Kiraly
                        receivedBuffer[index].len = buflen;
341
                        memcpy(receivedBuffer[index].data,buffer,buflen);
342
                          // save the socketID of the sender
343 bd360b95 Csaba Kiraly
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
344 574aabfa MarcoBiazzini
                }
345
  }
346
//        event_base_loopbreak(base);
347
}
348
349 cd2a2a41 MarcoBiazzini
350 c919f1bf Csaba Kiraly
struct nodeID *net_helper_init(const char *IPaddr, int port, const char *config) {
351 574aabfa MarcoBiazzini
352 560add79 CsabaKiraly
        struct timeval tout = NH_ML_INIT_TIMEOUT;
353 3f1c81a4 Csaba Kiraly
        int s, i;
354 55ee8e18 Csaba Kiraly
        struct tag *cfg_tags;
355
        const char *res;
356 ebc13190 Csaba Kiraly
        const char *stun_server = "stun.ekiga.net";
357
        int stun_port = 3478;
358 7360415c Csaba Kiraly
        const char *repo_address = "79.120.193.115:9832";
359
        int publish_interval = 60;
360 55ee8e18 Csaba Kiraly
361 7a6c9ad9 CsabaKiraly
        signal(SIGPIPE, SIG_IGN); // workaround for a known issue in libevent2 with SIGPIPE on TPC connections
362 574aabfa MarcoBiazzini
        base = event_base_new();
363 0f35a7cd Csaba Kiraly
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
364 574aabfa MarcoBiazzini
365 55ee8e18 Csaba Kiraly
        cfg_tags = config_parse(config);
366 c78a4ff0 Csaba Kiraly
        if (!cfg_tags) {
367
                return NULL;
368
        }
369 55ee8e18 Csaba Kiraly
370 ebc13190 Csaba Kiraly
        res = config_value_str(cfg_tags, "stun_server");
371
        if (res) {
372
                stun_server = res;
373
        }
374
        config_value_int(cfg_tags, "stun_port", &stun_port);
375
376 7360415c Csaba Kiraly
        res = config_value_str(cfg_tags, "repo_address");
377
        if (res) {
378
                repo_address = res;
379
        }
380
        
381
        config_value_int(cfg_tags, "publish_interval", &publish_interval);
382
383 574aabfa MarcoBiazzini
        me = malloc(sizeof(nodeID));
384
        if (me == NULL) {
385
                return NULL;
386
        }
387 2694dce1 MarcoBiazzini
        memset(me,0,sizeof(nodeID));
388
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
389
        me->refcnt = 1;
390 574aabfa MarcoBiazzini
391
        for (i=0;i<NH_BUFFER_SIZE;i++) {
392
                sendingBuffer[i] = NULL;
393 ab61e3b6 CsabaKiraly
                receivedBuffer[i].data = NULL;
394 574aabfa MarcoBiazzini
        }
395
396 a6911c72 MarcoBiazzini
        mlRegisterErrorConnectionCb(&connError_cb);
397
        mlRegisterRecvConnectionCb(&receive_conn_cb);
398 ebc13190 Csaba Kiraly
        s = mlInit(1, tout, port, IPaddr, stun_port, stun_server, &init_myNodeID_cb, base);
399 3b4eedf4 CsabaKiraly
        if (s < 0) {
400
                fprintf(stderr, "Net-helper : error initializing ML!\n");
401
                free(me);
402
                return NULL;
403
        }
404
405 ece5c501 CsabaKiraly
#ifdef MONL
406 ccdb7ae7 Csaba Kiraly
{
407 ece5c501 CsabaKiraly
        void *repoclient;
408 ccdb7ae7 Csaba Kiraly
        eventbase = base;
409 8a103ce2 Csaba Kiraly
410
        // Initialize logging
411
        grapesInitLog(DCLOG_WARNING, NULL, NULL);
412
413 ece5c501 CsabaKiraly
        repInit("");
414 7360415c Csaba Kiraly
        repoclient = repOpen(repo_address, publish_interval);        //repository.napa-wine.eu
415 ece5c501 CsabaKiraly
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
416
        monInit(base, repoclient);
417 ccdb7ae7 Csaba Kiraly
}
418 ece5c501 CsabaKiraly
#endif
419 55ee8e18 Csaba Kiraly
        free(cfg_tags);
420 ece5c501 CsabaKiraly
421 2694dce1 MarcoBiazzini
        while (me->connID<-1) {
422
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
423
                event_base_loop(base,EVLOOP_ONCE);
424
        }
425
        timeoutFired = 0;
426 574aabfa MarcoBiazzini
//        fprintf(stderr,"Net-helper init : back from init!\n");
427 2694dce1 MarcoBiazzini
428 574aabfa MarcoBiazzini
        return me;
429
}
430
431
432 c95c6af7 MarcoBiazzini
void bind_msg_type (unsigned char msgtype) {
433
434 a6911c72 MarcoBiazzini
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
435 8c435a0b AlessandroRusso
}
436
437
438 30453328 CsabaKiraly
void send_to_peer_cb(int fd, short event, void *arg)
439
{
440
        msgData_cb *p = (msgData_cb *) arg;
441
        if (p->conn_cb_called) {
442
                free(p);
443
        }
444
        else { //don't send it anymore
445
                free_sending_buffer(p->bIdx);
446
                p->cancelled = true;
447
                // don't free p, the other timeout will do it
448
        }
449
}
450
451 574aabfa MarcoBiazzini
/**
452
 * Called by the application to send data to a remote peer
453
 * @param from
454
 * @param to
455
 * @param buffer_ptr
456
 * @param buffer_size
457 2694dce1 MarcoBiazzini
 * @return The dimension of the buffer or -1 if a connection error occurred.
458 574aabfa MarcoBiazzini
 */
459
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
460
{
461 3f1c81a4 Csaba Kiraly
        msgData_cb *p;
462
        int current;
463
        send_params params = {0,0,0,0};
464
465 41620dda Csaba Kiraly
        if (buffer_size <= 0) {
466
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
467
                return buffer_size;
468
        }
469
470 574aabfa MarcoBiazzini
        // if buffer is full, discard the message and return an error flag
471
        int index = next_S();
472
        if (index<0) {
473
                // free(buffer_ptr);
474 eca194c9 Csaba Kiraly
                fprintf(stderr,"Net-helper: send buffer full\n ");
475
                return -1;
476
        }
477
        sendingBuffer[index] = malloc(buffer_size);
478
        if (! sendingBuffer[index]){
479
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
480 574aabfa MarcoBiazzini
                return -1;
481
        }
482 546f9767 AlessandroRusso
        memset(sendingBuffer[index],0,buffer_size);
483 574aabfa MarcoBiazzini
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
484
        // free(buffer_ptr);
485 3f1c81a4 Csaba Kiraly
        p = malloc(sizeof(msgData_cb));
486 30453328 CsabaKiraly
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
487 3f1c81a4 Csaba Kiraly
        current = p->bIdx;
488
489 14922983 TivadarSzemethy
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
490 574aabfa MarcoBiazzini
        if (to->connID<0) {
491 30453328 CsabaKiraly
                free_sending_buffer(current);
492 574aabfa MarcoBiazzini
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
493 2694dce1 MarcoBiazzini
                free(p);
494
                return -1;
495 574aabfa MarcoBiazzini
        }
496
        else {
497 8551dbe5 CsabaKiraly
                struct timeval timeout = NH_PACKET_TIMEOUT;
498 30453328 CsabaKiraly
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
499 27061356 MarcoBiazzini
                return buffer_size; //p->mSize;
500 574aabfa MarcoBiazzini
        }
501
502
}
503
504
505
/**
506
 * Called by an application to receive data from remote peers
507
 * @param local
508
 * @param remote
509
 * @param buffer_ptr
510
 * @param buffer_size
511
 * @return The number of received bytes or -1 if some error occurred.
512
 */
513
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
514
{
515 871ae532 CsabaKiraly
        int size;
516 8e2d87c7 Csaba Kiraly
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
517 7f2aa1be Csaba Kiraly
                wait4data(local, NULL, NULL);
518 574aabfa MarcoBiazzini
        }
519
520 8e2d87c7 Csaba Kiraly
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
521
522
        (*remote) = receivedBuffer[rIdxUp].id;
523 574aabfa MarcoBiazzini
        // retrieve a msg from the buffer
524 8e2d87c7 Csaba Kiraly
        size = receivedBuffer[rIdxUp].len;
525 871ae532 CsabaKiraly
        if (size>buffer_size) {
526
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
527
                return -1;
528
        }
529 8e2d87c7 Csaba Kiraly
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
530
        free(receivedBuffer[rIdxUp].data);
531
        receivedBuffer[rIdxUp].data = NULL;
532
        receivedBuffer[rIdxUp].id = NULL;
533
534
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
535 574aabfa MarcoBiazzini
536
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
537
538 2694dce1 MarcoBiazzini
        return size;
539 574aabfa MarcoBiazzini
}
540
541
542 22afdba3 Csaba Kiraly
int wait4data(const struct nodeID *n, struct timeval *tout, int *fds) {
543
544 93932efc Csaba Kiraly
        struct event *timeout_ev = NULL;
545 22afdba3 Csaba Kiraly
        struct event *fd_ev[FDSSIZE];
546
        bool fd_triggered[FDSSIZE] = { false };
547
        int i;
548 574aabfa MarcoBiazzini
549
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
550 d329ad81 CsabaKiraly
        if (tout) {        //if tout==NULL, loop wait infinitely
551 22afdba3 Csaba Kiraly
          timeout_ev = event_new(base, -1, EV_TIMEOUT, &t_out_cb, NULL);
552
          event_add(timeout_ev, tout);
553
        }
554
        for (i = 0; fds && fds[i] != -1; i ++) {
555
          if (i >= FDSSIZE) {
556
            fprintf(stderr, "Can't listen on more than %d file descriptors!\n", FDSSIZE);
557
            break;
558
          }
559
          fd_ev[i] = event_new(base, fds[i], EV_READ, &fd_cb, &fd_triggered[i]);
560
          event_add(fd_ev[i], NULL);
561 d329ad81 CsabaKiraly
        }
562 22afdba3 Csaba Kiraly
563
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0 && fdTriggered==0) {
564 574aabfa MarcoBiazzini
        //        event_base_dispatch(base);
565
                event_base_loop(base,EVLOOP_ONCE);
566
        }
567
568 22afdba3 Csaba Kiraly
        //delete one-time events
569 93932efc Csaba Kiraly
        if (timeout_ev) {
570
          if (!timeoutFired) event_del(timeout_ev);
571
          event_free(timeout_ev);
572
        }
573 22afdba3 Csaba Kiraly
        for (i = 0; fds && fds[i] != -1; i ++) {
574
          if (! fd_triggered[i]) {
575
            fds[i] = -2;
576
            event_del(fd_ev[i]);
577
          //} else {
578
            //fprintf(stderr, "\twait4data: fd %d triggered\n", fds[i]);
579
          }
580
          event_free(fd_ev[i]);
581
        }
582
583
        if (fdTriggered) {
584
          fdTriggered = false;
585
          //fprintf(stderr, "\twait4data: fd event\n");
586
          return 2;
587
        } else if (timeoutFired) {
588
          timeoutFired = 0;
589
          //fprintf(stderr, "\twait4data: timed out\n");
590
          return 0;
591
        } else if (receivedBuffer[rIdxUp].data!=NULL) {
592
          //fprintf(stderr, "\twait4data: ML receive\n");
593
          return 1;
594
        } else {
595
          fprintf(stderr, "BUG in wait4data\n");
596
          exit(EXIT_FAILURE);
597
        }
598 574aabfa MarcoBiazzini
}
599
600 e4cbfa7e CsabaKiraly
socketID_handle getRemoteSocketID(const char *ip, int port) {
601
        char str[SOCKETID_STRING_SIZE];
602
        socketID_handle h;
603 574aabfa MarcoBiazzini
604 e4cbfa7e CsabaKiraly
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
605
        h = malloc(SOCKETID_SIZE);
606
        mlStringToSocketID(str, h);
607 574aabfa MarcoBiazzini
608 e4cbfa7e CsabaKiraly
        return h;
609
}
610 574aabfa MarcoBiazzini
611
struct nodeID *create_node(const char *rem_IP, int rem_port) {
612 bd360b95 Csaba Kiraly
        socketID_handle s;
613
        struct nodeID *remote;
614
615
        s = getRemoteSocketID(rem_IP, rem_port);
616
        remote = id_lookup_dup(s);
617
        free(s);
618
619 574aabfa MarcoBiazzini
        return remote;
620
}
621
622 e55fe1f8 Marco Biazzini
const char *node_ip(const struct nodeID *s) {
623
        static char ip[64];
624
        int len;
625
        char *start, *end;
626
        const char *tmp = node_addr(s);
627
        start = strstr(tmp, "-") + 1;
628
        end = strstr(start, ":");
629
        len = end - start;
630
        memcpy(ip, start, len);
631
        ip[len] = 0;
632
633
        return (const char *)ip;
634
}
635
636
637 574aabfa MarcoBiazzini
// TODO: check why closing the connection is annoying for the ML
638 dccd020d MarcoBiazzini
void nodeid_free(struct nodeID *n) {
639 bd360b95 Csaba Kiraly
        if (n && (--(n->refcnt) == 1)) {
640 f5cd5266 Csaba Kiraly
/*
641 ccdb7ae7 Csaba Kiraly
                struct nodeID **npos;
642 a6911c72 MarcoBiazzini
        //        mlCloseConnection(n->connID);
643 bd360b95 Csaba Kiraly
                npos = id_lookup(n->addr);
644
                *npos = NULL;
645 a6911c72 MarcoBiazzini
                mlCloseSocket(n->addr);
646 2694dce1 MarcoBiazzini
                free(n);
647 f5cd5266 Csaba Kiraly
*/
648 2694dce1 MarcoBiazzini
        }
649 574aabfa MarcoBiazzini
}
650
651
652
const char *node_addr(const struct nodeID *s)
653
{
654
  static char addr[256];
655 a6911c72 MarcoBiazzini
  // TODO: mlSocketIDToString always return 0 !!!
656
  int r = mlSocketIDToString(s->addr,addr,256);
657 574aabfa MarcoBiazzini
  if (!r)
658
          return addr;
659
  else
660
          return "";
661
}
662
663 2694dce1 MarcoBiazzini
struct nodeID *nodeid_dup(struct nodeID *s)
664 574aabfa MarcoBiazzini
{
665 2694dce1 MarcoBiazzini
        s->refcnt++;
666
        return s;
667 574aabfa MarcoBiazzini
}
668
669
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
670
{
671 a6911c72 MarcoBiazzini
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
672 574aabfa MarcoBiazzini
}
673
674 0772765b Csaba Kiraly
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
675 574aabfa MarcoBiazzini
{
676 0772765b Csaba Kiraly
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
677
678 a6911c72 MarcoBiazzini
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
679 2694dce1 MarcoBiazzini
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
680 574aabfa MarcoBiazzini
681 2694dce1 MarcoBiazzini
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
682
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
683 574aabfa MarcoBiazzini
684 4173f623 CsabaKiraly
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
685 574aabfa MarcoBiazzini
}
686
687
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
688
{
689 8088babe Csaba Kiraly
  uint8_t sid[SOCKETID_SIZE];
690
  socketID_handle h = (socketID_handle) sid;
691 0f35a7cd Csaba Kiraly
  mlStringToSocketID((char *)b,h);
692
  *len = strlen((char*)b) + 1;
693 bd360b95 Csaba Kiraly
  return id_lookup_dup(h);
694 574aabfa MarcoBiazzini
}