Statistics
| Branch: | Revision:

grapes / src / net_helper-ml.c @ cb71ca5e

History | View | Annotate | Download (18.4 KB)

1 574aabfa MarcoBiazzini
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4 ca44152b CsabaKiraly
 *  This is free software; see lgpl-2.1.txt
5 574aabfa MarcoBiazzini
 */
6
7
#include <event2/event.h>
8 23cf6ee5 Csaba Kiraly
#ifndef _WIN32
9 574aabfa MarcoBiazzini
#include <arpa/inet.h>
10 ebe734eb Csaba Kiraly
#endif
11 574aabfa MarcoBiazzini
#include <unistd.h>
12
#include <stdlib.h>
13
#include <stdio.h>
14 22afdba3 Csaba Kiraly
#include <stdbool.h>
15 574aabfa MarcoBiazzini
#include <string.h>
16 8e2d87c7 Csaba Kiraly
#include <assert.h>
17 7a6c9ad9 CsabaKiraly
#include <signal.h>
18 2694dce1 MarcoBiazzini
19 574aabfa MarcoBiazzini
#include "net_helper.h"
20
#include "ml.h"
21 c919f1bf Csaba Kiraly
#include "config.h"
22 574aabfa MarcoBiazzini
23 5941d7a1 Csaba Kiraly
#include "grapes_msg_types.h"
24 df8f8ee0 MarcoBiazzini
25 ece5c501 CsabaKiraly
26
#ifdef MONL
27
#include "mon.h"
28
#include "repoclient.h"
29
#endif
30
31 55aecb63 Csaba Kiraly
#include "napa.h"
32
#include "napa_log.h"
33 ece5c501 CsabaKiraly
34 574aabfa MarcoBiazzini
/**
35
 * libevent pointer
36
 */
37
struct event_base *base;
38
39 432f93b8 CsabaKiraly
#define NH_BUFFER_SIZE 1000
40 0f35a7cd Csaba Kiraly
#define NH_LOOKUP_SIZE 1000
41 8551dbe5 CsabaKiraly
#define NH_PACKET_TIMEOUT {0, 500*1000}
42 560add79 CsabaKiraly
#define NH_ML_INIT_TIMEOUT {1, 0}
43 574aabfa MarcoBiazzini
44 22afdba3 Csaba Kiraly
#define FDSSIZE 16
45
46 cb71ca5e Csaba Kiraly
static bool connect_on_know = false;        //whether to try to connect as soon as we get to know a nodeID
47
48 574aabfa MarcoBiazzini
static int sIdx = 0;
49 8e2d87c7 Csaba Kiraly
static int rIdxML = 0;        //reveive from ML to this buffer position
50
static int rIdxUp = 0;        //hand up to layer above at this buffer position
51 574aabfa MarcoBiazzini
52
typedef struct nodeID {
53
        socketID_handle addr;
54
        int connID;        // connection associated to this node, -1 if myself
55 2694dce1 MarcoBiazzini
        int refcnt;
56 ece5c501 CsabaKiraly
#ifdef MONL
57
        //n quick and dirty static vector for measures TODO: make it dinamic
58 0bffcee4 CsabaKiraly
        MonHandler mhs[20];
59 ece5c501 CsabaKiraly
        int n_mhs;
60
#endif
61 2694dce1 MarcoBiazzini
//        int addrSize;
62
//        int addrStringSize;
63 574aabfa MarcoBiazzini
} nodeID;
64
65
typedef struct msgData_cb {
66
        int bIdx;        // index of the message in the proper buffer
67
        unsigned char msgType; // message type
68
        int mSize;        // message size
69 30453328 CsabaKiraly
        bool conn_cb_called;
70
        bool cancelled;
71 574aabfa MarcoBiazzini
} msgData_cb;
72
73 0f35a7cd Csaba Kiraly
static struct nodeID **lookup_array;
74 b8cbbdcc Csaba Kiraly
static int lookup_max = NH_LOOKUP_SIZE;
75 0f35a7cd Csaba Kiraly
static int lookup_curr = 0;
76
77 574aabfa MarcoBiazzini
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
78
static int timeoutFired = 0;
79 22afdba3 Csaba Kiraly
static bool fdTriggered = false;
80 574aabfa MarcoBiazzini
81
// pointers to the msgs to be send
82
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
83
// pointers to the received msgs + sender nodeID
84 ab61e3b6 CsabaKiraly
struct receivedB {
85
        struct nodeID *id;
86
        int len;
87
        uint8_t *data;
88
};
89
static struct receivedB receivedBuffer[NH_BUFFER_SIZE];
90 d741d4f3 MarcoBiazzini
/**/ static int recv_counter =0;
91 df8f8ee0 MarcoBiazzini
92 574aabfa MarcoBiazzini
93 bd360b95 Csaba Kiraly
static void connReady_cb (int connectionID, void *arg);
94
static struct nodeID *new_node(socketID_handle peer) {
95
        send_params params = {0,0,0,0};
96
        struct nodeID *res = malloc(sizeof(struct nodeID));
97
        if (!res) {
98
                 fprintf(stderr, "Net-helper : memory error\n");
99 0f35a7cd Csaba Kiraly
                 return NULL;
100 bd360b95 Csaba Kiraly
        }
101
        memset(res, 0, sizeof(struct nodeID));
102
103
        res->addr = malloc(SOCKETID_SIZE);
104
        if (! res->addr) {
105
                free (res);
106
                fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
107
                return NULL;
108
        }
109
        memset(res->addr, 0, SOCKETID_SIZE);
110
        memcpy(res->addr, peer ,SOCKETID_SIZE);
111 0f35a7cd Csaba Kiraly
112 bd360b95 Csaba Kiraly
        res->refcnt = 1;
113
114 cb71ca5e Csaba Kiraly
        if (connect_on_know) {
115
                res->connID = mlOpenConnection(peer, &connReady_cb, NULL, params);
116
        }
117 bd360b95 Csaba Kiraly
118
        return res;
119 0f35a7cd Csaba Kiraly
}
120
121
122 bd360b95 Csaba Kiraly
static struct nodeID **id_lookup(socketID_handle target) {
123 0f35a7cd Csaba Kiraly
124
        int i,here=-1;
125
        for (i=0;i<lookup_curr;i++) {
126
                if (lookup_array[i] == NULL) {
127 bd360b95 Csaba Kiraly
                        if (here < 0) {
128 0f35a7cd Csaba Kiraly
                                here = i;
129
                        }
130 bd360b95 Csaba Kiraly
                } else if (!mlCompareSocketIDs(lookup_array[i]->addr,target)) {
131
                        return &lookup_array[i];
132 0f35a7cd Csaba Kiraly
                }
133
        }
134
135 bd360b95 Csaba Kiraly
        if (here == -1) {
136
                here = lookup_curr++;
137 0f35a7cd Csaba Kiraly
        }
138
139 bd360b95 Csaba Kiraly
        if (lookup_curr > lookup_max) {
140 0f35a7cd Csaba Kiraly
                lookup_max *= 2;
141
                lookup_array = realloc(lookup_array,lookup_max*sizeof(struct nodeID*));
142
        }
143
144 bd360b95 Csaba Kiraly
        lookup_array[here] = new_node(target);
145
        return &lookup_array[here];
146
}
147 0f35a7cd Csaba Kiraly
148 bd360b95 Csaba Kiraly
static struct nodeID *id_lookup_dup(socketID_handle target) {
149
        return nodeid_dup(*id_lookup(target));
150 0f35a7cd Csaba Kiraly
}
151
152
153 574aabfa MarcoBiazzini
/**
154
 * Look for a free slot in the received buffer and allocates it for immediate use
155
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
156
 */
157
static int next_R() {
158 8e2d87c7 Csaba Kiraly
        if (receivedBuffer[rIdxML].data==NULL) {
159
                int ret = rIdxML;
160
                rIdxML = (rIdxML+1)%NH_BUFFER_SIZE;
161
                return ret;
162
        } else {
163
                //TODO: handle receive overload situation!
164
                return -1;
165 574aabfa MarcoBiazzini
        }
166
}
167
168
/**
169
 * Look for a free slot in the sending buffer and allocates it for immediate use
170
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
171
 */
172
static int next_S() {
173 eca194c9 Csaba Kiraly
        if (sendingBuffer[sIdx]) {
174 574aabfa MarcoBiazzini
                int count;
175
                for (count=0;count<NH_BUFFER_SIZE;count++) {
176 eca194c9 Csaba Kiraly
                        sIdx = (sIdx+1)%NH_BUFFER_SIZE;
177 574aabfa MarcoBiazzini
                        if (sendingBuffer[sIdx]==NULL)
178
                                break;
179
                }
180 eca194c9 Csaba Kiraly
                if (count==NH_BUFFER_SIZE) {
181 574aabfa MarcoBiazzini
                        return -1;
182
                }
183
        }
184
        return sIdx;
185
}
186
187 cd2a2a41 MarcoBiazzini
188 574aabfa MarcoBiazzini
/**
189
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
190
 * @param local_socketID
191
 * @param errorstatus
192
 */
193
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
194
        switch (errorstatus) {
195
        case 0:
196 bd360b95 Csaba Kiraly
                me->addr = malloc(SOCKETID_SIZE);
197
                if (! me->addr) {
198
                        fprintf(stderr, "Net-helper : memory error while creating my new nodeID \n");
199
                        return;
200
                }
201
202 574aabfa MarcoBiazzini
                memcpy(me->addr,local_socketID,SOCKETID_SIZE);
203 2694dce1 MarcoBiazzini
        //        me->addrSize = SOCKETID_SIZE;
204
        //        me->addrStringSize = SOCKETID_STRING_SIZE;
205 574aabfa MarcoBiazzini
                me->connID = -1;
206 2694dce1 MarcoBiazzini
                me->refcnt = 1;
207
        //        fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
208 574aabfa MarcoBiazzini
                break;
209
        case -1:
210
                //
211
                fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
212 2694dce1 MarcoBiazzini
                exit(1);
213 574aabfa MarcoBiazzini
                break;
214
        case 1:
215
                //
216
                fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
217 2694dce1 MarcoBiazzini
                exit(1);
218 574aabfa MarcoBiazzini
                break;
219
        case 2:
220
            fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
221 0c487aac CsabaKiraly
            fprintf(stderr,"Net-helper init : Retrying without STUN\n");
222
            mlSetStunServer(0,NULL);
223 574aabfa MarcoBiazzini
            break;
224
        default :        // should never happen
225
                //
226
                fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
227
        }
228
229
}
230
231
/**
232
 * Timeout callback to be set in the eventlib loop as needed
233
 * @param socket
234
 * @param flag
235
 * @param arg
236
 */
237
static void t_out_cb (int socket, short flag, void* arg) {
238
239
        timeoutFired = 1;
240 2694dce1 MarcoBiazzini
//        fprintf(stderr,"TIMEOUT!!!\n");
241 574aabfa MarcoBiazzini
//        event_base_loopbreak(base);
242
}
243
244
/**
245 22afdba3 Csaba Kiraly
 * File descriptor readable callback to be set in the eventlib loop as needed
246
 */
247
static void fd_cb (int fd, short flag, void* arg)
248
{
249
  //fprintf(stderr, "\twait4data: fd %d triggered\n", fd);
250
  fdTriggered = true;
251
  *((bool*)arg) = true;
252
}
253
254
/**
255 574aabfa MarcoBiazzini
 * Callback called by ml when a remote node ask for a connection
256
 * @param connectionID
257
 * @param arg
258
 */
259
static void receive_conn_cb(int connectionID, void *arg) {
260 2694dce1 MarcoBiazzini
//    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
261 574aabfa MarcoBiazzini
262
}
263
264 30453328 CsabaKiraly
void free_sending_buffer(int i)
265
{
266
        free(sendingBuffer[i]);
267
        sendingBuffer[i] = NULL;
268
}
269
270 574aabfa MarcoBiazzini
/**
271
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
272
 * @param connectionID
273
 * @param arg
274
 */
275
static void connReady_cb (int connectionID, void *arg) {
276 d741d4f3 MarcoBiazzini
277 574aabfa MarcoBiazzini
        msgData_cb *p;
278
        p = (msgData_cb *)arg;
279
        if (p == NULL) return;
280 30453328 CsabaKiraly
        if (p->cancelled) {
281
            free(p);
282
            return;
283
        }
284 14922983 TivadarSzemethy
        mlSendData(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,NULL);
285 30453328 CsabaKiraly
        free_sending_buffer(p->bIdx);
286 2694dce1 MarcoBiazzini
//        fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
287 574aabfa MarcoBiazzini
        //        event_base_loopbreak(base);
288 30453328 CsabaKiraly
        p->conn_cb_called = true;
289 574aabfa MarcoBiazzini
}
290
291
/**
292
 * Callback called by ml when a connection error occurs
293
 * @param connectionID
294
 * @param arg
295
 */
296
static void connError_cb (int connectionID, void *arg) {
297
        // simply get rid of the msg in the buffer....
298
        msgData_cb *p;
299
        p = (msgData_cb *)arg;
300
        if (p != NULL) {
301
                fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
302 30453328 CsabaKiraly
                if (p->cancelled) {
303
                        free(p);//p->mSize = -1;
304
                } else {
305
                        p->conn_cb_called = true;
306
                }
307 574aabfa MarcoBiazzini
        }
308
        //        event_base_loopbreak(base);
309
}
310
311 cd2a2a41 MarcoBiazzini
312 574aabfa MarcoBiazzini
/**
313
 * Callback to receive data from ml
314
 * @param buffer
315
 * @param buflen
316
 * @param msgtype
317
 * @param arg
318
 */
319
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
320
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
321 3f1c81a4 Csaba Kiraly
        char str[SOCKETID_STRING_SIZE];
322 574aabfa MarcoBiazzini
//        fprintf(stderr, "Net-helper : called back with some news...\n");
323 7930ec45 MarcoBiazzini
/**/ ++recv_counter;
324 574aabfa MarcoBiazzini
        if (arg->remote_socketID != NULL)
325 a6911c72 MarcoBiazzini
                mlSocketIDToString(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
326 574aabfa MarcoBiazzini
        else
327
                sprintf(str,"!Unknown!");
328
        if (arg->nrMissingBytes || !arg->firstPacketArrived) {
329
            fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
330 ca79d43a CsabaKiraly
/**/    fprintf(stderr, "\tMessage # %d -- Message type: %hhd -- Missing # %d bytes%s\n",
331 16adbb7b CsabaKiraly
                        recv_counter, buffer[0],arg->nrMissingBytes, arg->firstPacketArrived?"":", Missing first!");
332 574aabfa MarcoBiazzini
        }
333
        else {
334 2694dce1 MarcoBiazzini
        //        fprintf(stderr, "Net-helper : message arrived from %s\n",str);
335 574aabfa MarcoBiazzini
                // buffering the received message only if possible, otherwise ignore it...
336
                int index = next_R();
337 8e2d87c7 Csaba Kiraly
                if (index<0) {
338
                        fprintf(stderr,"Net-helper: receive buffer full\n ");
339
                        return;
340
                } else {
341
                        receivedBuffer[index].data = malloc(buflen);
342 ab61e3b6 CsabaKiraly
                        if (receivedBuffer[index].data == NULL) {
343 8e2d87c7 Csaba Kiraly
                                fprintf(stderr,"Net-helper: memory full, can't receive!\n ");
344 574aabfa MarcoBiazzini
                                return;
345
                        }
346 8e2d87c7 Csaba Kiraly
                        receivedBuffer[index].len = buflen;
347
                        memcpy(receivedBuffer[index].data,buffer,buflen);
348
                          // save the socketID of the sender
349 bd360b95 Csaba Kiraly
                        receivedBuffer[index].id = id_lookup_dup(arg->remote_socketID);
350 574aabfa MarcoBiazzini
                }
351
  }
352
//        event_base_loopbreak(base);
353
}
354
355 cd2a2a41 MarcoBiazzini
356 c919f1bf Csaba Kiraly
struct nodeID *net_helper_init(const char *IPaddr, int port, const char *config) {
357 574aabfa MarcoBiazzini
358 560add79 CsabaKiraly
        struct timeval tout = NH_ML_INIT_TIMEOUT;
359 3f1c81a4 Csaba Kiraly
        int s, i;
360 55ee8e18 Csaba Kiraly
        struct tag *cfg_tags;
361
        const char *res;
362 ebc13190 Csaba Kiraly
        const char *stun_server = "stun.ekiga.net";
363
        int stun_port = 3478;
364 f8232cf0 CsabaKiraly
        const char *repo_address = NULL;
365 7360415c Csaba Kiraly
        int publish_interval = 60;
366 e71c2944 Csaba Kiraly
367 02f0b9c3 Csaba Kiraly
        int verbosity = DCLOG_ERROR;
368 55ee8e18 Csaba Kiraly
369 bc246ff1 Csaba Kiraly
        int bucketsize = 80000; /* this allows a burst of 80000 Bytes [Bytes] */
370 d34221fa Csaba Kiraly
        int rate = 10000000; /* 10Mbit/s [bits/s]*/
371 bc246ff1 Csaba Kiraly
        int queuesize = 1000000; /* up to 1MB of data will be stored in the shaper transmission queue [Bytes]*/
372
        int RTXqueuesize = 1000000; /* up to 1 MB of data will be stored in the shaper retransmission queue [Bytes] */
373
        double RTXholtdingtime = 1.0; /* [seconds] */
374 61c45d19 Csaba Kiraly
375 23cf6ee5 Csaba Kiraly
#ifndef _WIN32
376 7a6c9ad9 CsabaKiraly
        signal(SIGPIPE, SIG_IGN); // workaround for a known issue in libevent2 with SIGPIPE on TPC connections
377 ebe734eb Csaba Kiraly
#endif
378 574aabfa MarcoBiazzini
        base = event_base_new();
379 0f35a7cd Csaba Kiraly
        lookup_array = calloc(lookup_max,sizeof(struct nodeID *));
380 574aabfa MarcoBiazzini
381 55ee8e18 Csaba Kiraly
        cfg_tags = config_parse(config);
382 c78a4ff0 Csaba Kiraly
        if (!cfg_tags) {
383
                return NULL;
384
        }
385 55ee8e18 Csaba Kiraly
386 ebc13190 Csaba Kiraly
        res = config_value_str(cfg_tags, "stun_server");
387
        if (res) {
388
                stun_server = res;
389
        }
390
        config_value_int(cfg_tags, "stun_port", &stun_port);
391
392 7360415c Csaba Kiraly
        res = config_value_str(cfg_tags, "repo_address");
393
        if (res) {
394
                repo_address = res;
395
        }
396
        
397
        config_value_int(cfg_tags, "publish_interval", &publish_interval);
398
399 02f0b9c3 Csaba Kiraly
        config_value_int(cfg_tags, "verbosity", &verbosity);
400
401 61c45d19 Csaba Kiraly
        config_value_int(cfg_tags, "bucketsize", &bucketsize);
402
        config_value_int(cfg_tags, "rate", &rate);
403
        config_value_int(cfg_tags, "queuesize", &queuesize);
404
        config_value_int(cfg_tags, "RTXqueuesize", &RTXqueuesize);
405
        config_value_double(cfg_tags, "RTXholtdingtime", &RTXholtdingtime);
406
407 574aabfa MarcoBiazzini
        me = malloc(sizeof(nodeID));
408
        if (me == NULL) {
409
                return NULL;
410
        }
411 2694dce1 MarcoBiazzini
        memset(me,0,sizeof(nodeID));
412
        me->connID = -10;        // dirty trick to spot later if the ml has called back ...
413
        me->refcnt = 1;
414 574aabfa MarcoBiazzini
415
        for (i=0;i<NH_BUFFER_SIZE;i++) {
416
                sendingBuffer[i] = NULL;
417 ab61e3b6 CsabaKiraly
                receivedBuffer[i].data = NULL;
418 574aabfa MarcoBiazzini
        }
419
420 a6911c72 MarcoBiazzini
        mlRegisterErrorConnectionCb(&connError_cb);
421
        mlRegisterRecvConnectionCb(&receive_conn_cb);
422 ebc13190 Csaba Kiraly
        s = mlInit(1, tout, port, IPaddr, stun_port, stun_server, &init_myNodeID_cb, base);
423 3b4eedf4 CsabaKiraly
        if (s < 0) {
424
                fprintf(stderr, "Net-helper : error initializing ML!\n");
425
                free(me);
426
                return NULL;
427
        }
428
429 ec4c5a8d RobertBirke
        mlSetVerbosity(verbosity);
430
431 61c45d19 Csaba Kiraly
        mlSetRateLimiterParams(bucketsize, rate, queuesize, RTXqueuesize, RTXholtdingtime);
432
433 ece5c501 CsabaKiraly
#ifdef MONL
434 ccdb7ae7 Csaba Kiraly
{
435 ece5c501 CsabaKiraly
        void *repoclient;
436 ccdb7ae7 Csaba Kiraly
        eventbase = base;
437 8a103ce2 Csaba Kiraly
438
        // Initialize logging
439 55aecb63 Csaba Kiraly
        napaInitLog(verbosity, NULL, NULL);
440 8a103ce2 Csaba Kiraly
441 ece5c501 CsabaKiraly
        repInit("");
442 7360415c Csaba Kiraly
        repoclient = repOpen(repo_address, publish_interval);        //repository.napa-wine.eu
443 c67cb9c1 ArpadBakay
        // NULL is inow valid for disabled repo 
444
        // if (repoclient == NULL) fatal("Unable to initialize repoclient");
445 ece5c501 CsabaKiraly
        monInit(base, repoclient);
446 ccdb7ae7 Csaba Kiraly
}
447 ece5c501 CsabaKiraly
#endif
448 55ee8e18 Csaba Kiraly
        free(cfg_tags);
449 ece5c501 CsabaKiraly
450 2694dce1 MarcoBiazzini
        while (me->connID<-1) {
451
        //        event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, &tout);
452
                event_base_loop(base,EVLOOP_ONCE);
453
        }
454
        timeoutFired = 0;
455 574aabfa MarcoBiazzini
//        fprintf(stderr,"Net-helper init : back from init!\n");
456 2694dce1 MarcoBiazzini
457 574aabfa MarcoBiazzini
        return me;
458
}
459
460
461 c95c6af7 MarcoBiazzini
void bind_msg_type (unsigned char msgtype) {
462
463 a6911c72 MarcoBiazzini
                        mlRegisterRecvDataCb(&recv_data_cb,msgtype);
464 8c435a0b AlessandroRusso
}
465
466
467 30453328 CsabaKiraly
void send_to_peer_cb(int fd, short event, void *arg)
468
{
469
        msgData_cb *p = (msgData_cb *) arg;
470
        if (p->conn_cb_called) {
471
                free(p);
472
        }
473
        else { //don't send it anymore
474
                free_sending_buffer(p->bIdx);
475
                p->cancelled = true;
476
                // don't free p, the other timeout will do it
477
        }
478
}
479
480 574aabfa MarcoBiazzini
/**
481
 * Called by the application to send data to a remote peer
482
 * @param from
483
 * @param to
484
 * @param buffer_ptr
485
 * @param buffer_size
486 2694dce1 MarcoBiazzini
 * @return The dimension of the buffer or -1 if a connection error occurred.
487 574aabfa MarcoBiazzini
 */
488
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
489
{
490 3f1c81a4 Csaba Kiraly
        msgData_cb *p;
491
        int current;
492
        send_params params = {0,0,0,0};
493
494 41620dda Csaba Kiraly
        if (buffer_size <= 0) {
495
                fprintf(stderr,"Net-helper: message size problematic: %d\n", buffer_size);
496
                return buffer_size;
497
        }
498
499 574aabfa MarcoBiazzini
        // if buffer is full, discard the message and return an error flag
500
        int index = next_S();
501
        if (index<0) {
502
                // free(buffer_ptr);
503 eca194c9 Csaba Kiraly
                fprintf(stderr,"Net-helper: send buffer full\n ");
504
                return -1;
505
        }
506
        sendingBuffer[index] = malloc(buffer_size);
507
        if (! sendingBuffer[index]){
508
                fprintf(stderr,"Net-helper: memory full, can't send!\n ");
509 574aabfa MarcoBiazzini
                return -1;
510
        }
511 546f9767 AlessandroRusso
        memset(sendingBuffer[index],0,buffer_size);
512 574aabfa MarcoBiazzini
        memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
513
        // free(buffer_ptr);
514 3f1c81a4 Csaba Kiraly
        p = malloc(sizeof(msgData_cb));
515 30453328 CsabaKiraly
        p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0]; p->conn_cb_called = false; p->cancelled = false;
516 3f1c81a4 Csaba Kiraly
        current = p->bIdx;
517
518 14922983 TivadarSzemethy
        to->connID = mlOpenConnection(to->addr,&connReady_cb,p, params);
519 574aabfa MarcoBiazzini
        if (to->connID<0) {
520 30453328 CsabaKiraly
                free_sending_buffer(current);
521 574aabfa MarcoBiazzini
                fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
522 2694dce1 MarcoBiazzini
                free(p);
523
                return -1;
524 574aabfa MarcoBiazzini
        }
525
        else {
526 8551dbe5 CsabaKiraly
                struct timeval timeout = NH_PACKET_TIMEOUT;
527 30453328 CsabaKiraly
                event_base_once(base, -1, EV_TIMEOUT, send_to_peer_cb, (void *) p, &timeout);
528 27061356 MarcoBiazzini
                return buffer_size; //p->mSize;
529 574aabfa MarcoBiazzini
        }
530
531
}
532
533
534
/**
535
 * Called by an application to receive data from remote peers
536
 * @param local
537
 * @param remote
538
 * @param buffer_ptr
539
 * @param buffer_size
540
 * @return The number of received bytes or -1 if some error occurred.
541
 */
542
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
543
{
544 871ae532 CsabaKiraly
        int size;
545 8e2d87c7 Csaba Kiraly
        if (receivedBuffer[rIdxUp].data==NULL) {        //block till first message arrives
546 7f2aa1be Csaba Kiraly
                wait4data(local, NULL, NULL);
547 574aabfa MarcoBiazzini
        }
548
549 8e2d87c7 Csaba Kiraly
        assert(receivedBuffer[rIdxUp].data && receivedBuffer[rIdxUp].id);
550
551
        (*remote) = receivedBuffer[rIdxUp].id;
552 574aabfa MarcoBiazzini
        // retrieve a msg from the buffer
553 8e2d87c7 Csaba Kiraly
        size = receivedBuffer[rIdxUp].len;
554 871ae532 CsabaKiraly
        if (size>buffer_size) {
555
                fprintf(stderr, "Net-helper : recv_from_peer: buffer too small (size:%d > buffer_size: %d)!\n",size,buffer_size);
556
                return -1;
557
        }
558 8e2d87c7 Csaba Kiraly
        memcpy(buffer_ptr, receivedBuffer[rIdxUp].data, size);
559
        free(receivedBuffer[rIdxUp].data);
560
        receivedBuffer[rIdxUp].data = NULL;
561
        receivedBuffer[rIdxUp].id = NULL;
562
563
        rIdxUp = (rIdxUp+1)%NH_BUFFER_SIZE;
564 574aabfa MarcoBiazzini
565
//        fprintf(stderr, "Net-helper : I've got mail!!!\n");
566
567 2694dce1 MarcoBiazzini
        return size;
568 574aabfa MarcoBiazzini
}
569
570
571 22afdba3 Csaba Kiraly
int wait4data(const struct nodeID *n, struct timeval *tout, int *fds) {
572
573 93932efc Csaba Kiraly
        struct event *timeout_ev = NULL;
574 22afdba3 Csaba Kiraly
        struct event *fd_ev[FDSSIZE];
575
        bool fd_triggered[FDSSIZE] = { false };
576
        int i;
577 574aabfa MarcoBiazzini
578
//        fprintf(stderr,"Net-helper : Waiting for data to come...\n");
579 d329ad81 CsabaKiraly
        if (tout) {        //if tout==NULL, loop wait infinitely
580 22afdba3 Csaba Kiraly
          timeout_ev = event_new(base, -1, EV_TIMEOUT, &t_out_cb, NULL);
581
          event_add(timeout_ev, tout);
582
        }
583
        for (i = 0; fds && fds[i] != -1; i ++) {
584
          if (i >= FDSSIZE) {
585
            fprintf(stderr, "Can't listen on more than %d file descriptors!\n", FDSSIZE);
586
            break;
587
          }
588
          fd_ev[i] = event_new(base, fds[i], EV_READ, &fd_cb, &fd_triggered[i]);
589
          event_add(fd_ev[i], NULL);
590 d329ad81 CsabaKiraly
        }
591 22afdba3 Csaba Kiraly
592
        while(receivedBuffer[rIdxUp].data==NULL && timeoutFired==0 && fdTriggered==0) {
593 574aabfa MarcoBiazzini
        //        event_base_dispatch(base);
594
                event_base_loop(base,EVLOOP_ONCE);
595
        }
596
597 22afdba3 Csaba Kiraly
        //delete one-time events
598 93932efc Csaba Kiraly
        if (timeout_ev) {
599
          if (!timeoutFired) event_del(timeout_ev);
600
          event_free(timeout_ev);
601
        }
602 22afdba3 Csaba Kiraly
        for (i = 0; fds && fds[i] != -1; i ++) {
603
          if (! fd_triggered[i]) {
604
            fds[i] = -2;
605
            event_del(fd_ev[i]);
606
          //} else {
607
            //fprintf(stderr, "\twait4data: fd %d triggered\n", fds[i]);
608
          }
609
          event_free(fd_ev[i]);
610
        }
611
612
        if (fdTriggered) {
613
          fdTriggered = false;
614
          //fprintf(stderr, "\twait4data: fd event\n");
615
          return 2;
616
        } else if (timeoutFired) {
617
          timeoutFired = 0;
618
          //fprintf(stderr, "\twait4data: timed out\n");
619
          return 0;
620
        } else if (receivedBuffer[rIdxUp].data!=NULL) {
621
          //fprintf(stderr, "\twait4data: ML receive\n");
622
          return 1;
623
        } else {
624
          fprintf(stderr, "BUG in wait4data\n");
625
          exit(EXIT_FAILURE);
626
        }
627 574aabfa MarcoBiazzini
}
628
629 e4cbfa7e CsabaKiraly
socketID_handle getRemoteSocketID(const char *ip, int port) {
630
        char str[SOCKETID_STRING_SIZE];
631
        socketID_handle h;
632 574aabfa MarcoBiazzini
633 e4cbfa7e CsabaKiraly
        snprintf(str, SOCKETID_STRING_SIZE, "%s:%d-%s:%d", ip, port, ip, port);
634
        h = malloc(SOCKETID_SIZE);
635
        mlStringToSocketID(str, h);
636 574aabfa MarcoBiazzini
637 e4cbfa7e CsabaKiraly
        return h;
638
}
639 574aabfa MarcoBiazzini
640
struct nodeID *create_node(const char *rem_IP, int rem_port) {
641 bd360b95 Csaba Kiraly
        socketID_handle s;
642
        struct nodeID *remote;
643
644
        s = getRemoteSocketID(rem_IP, rem_port);
645
        remote = id_lookup_dup(s);
646
        free(s);
647
648 574aabfa MarcoBiazzini
        return remote;
649
}
650
651 e55fe1f8 Marco Biazzini
const char *node_ip(const struct nodeID *s) {
652
        static char ip[64];
653
        int len;
654 ebe734eb Csaba Kiraly
        const char *start, *end;
655 e55fe1f8 Marco Biazzini
        const char *tmp = node_addr(s);
656
        start = strstr(tmp, "-") + 1;
657
        end = strstr(start, ":");
658
        len = end - start;
659
        memcpy(ip, start, len);
660
        ip[len] = 0;
661
662
        return (const char *)ip;
663
}
664
665 574aabfa MarcoBiazzini
// TODO: check why closing the connection is annoying for the ML
666 dccd020d MarcoBiazzini
void nodeid_free(struct nodeID *n) {
667 bd360b95 Csaba Kiraly
        if (n && (--(n->refcnt) == 1)) {
668 f5cd5266 Csaba Kiraly
/*
669 ccdb7ae7 Csaba Kiraly
                struct nodeID **npos;
670 a6911c72 MarcoBiazzini
        //        mlCloseConnection(n->connID);
671 bd360b95 Csaba Kiraly
                npos = id_lookup(n->addr);
672
                *npos = NULL;
673 a6911c72 MarcoBiazzini
                mlCloseSocket(n->addr);
674 2694dce1 MarcoBiazzini
                free(n);
675 f5cd5266 Csaba Kiraly
*/
676 2694dce1 MarcoBiazzini
        }
677 574aabfa MarcoBiazzini
}
678
679
680
const char *node_addr(const struct nodeID *s)
681
{
682
  static char addr[256];
683 a6911c72 MarcoBiazzini
  // TODO: mlSocketIDToString always return 0 !!!
684
  int r = mlSocketIDToString(s->addr,addr,256);
685 574aabfa MarcoBiazzini
  if (!r)
686
          return addr;
687
  else
688
          return "";
689
}
690
691 2694dce1 MarcoBiazzini
struct nodeID *nodeid_dup(struct nodeID *s)
692 574aabfa MarcoBiazzini
{
693 2694dce1 MarcoBiazzini
        s->refcnt++;
694
        return s;
695 574aabfa MarcoBiazzini
}
696
697
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
698
{
699 a6911c72 MarcoBiazzini
        return (mlCompareSocketIDs(s1->addr,s2->addr) == 0);
700 574aabfa MarcoBiazzini
}
701
702 0772765b Csaba Kiraly
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
703 574aabfa MarcoBiazzini
{
704 0772765b Csaba Kiraly
  if (max_write_size < SOCKETID_STRING_SIZE) return -1;
705
706 a6911c72 MarcoBiazzini
  mlSocketIDToString(s->addr,(char *)b,SOCKETID_STRING_SIZE);
707 2694dce1 MarcoBiazzini
  //fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
708 574aabfa MarcoBiazzini
709 2694dce1 MarcoBiazzini
//        memcpy(b, s->addr,SOCKETID_SIZE);//sizeof(struct sockaddr_in6)*2
710
//        return SOCKETID_SIZE;//sizeof(struct sockaddr_in6)*2;
711 574aabfa MarcoBiazzini
712 4173f623 CsabaKiraly
  return 1 + strlen((char *)b);        //terminating \0 IS included in the size
713 574aabfa MarcoBiazzini
}
714
715
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
716
{
717 8088babe Csaba Kiraly
  uint8_t sid[SOCKETID_SIZE];
718
  socketID_handle h = (socketID_handle) sid;
719 0f35a7cd Csaba Kiraly
  mlStringToSocketID((char *)b,h);
720
  *len = strlen((char*)b) + 1;
721 bd360b95 Csaba Kiraly
  return id_lookup_dup(h);
722 574aabfa MarcoBiazzini
}