Revision 22354b6f

View differences:

net_helper-ml.c
1
/*
2
 *  Copyright (c) 2009 Marco Biazzini
3
 *
4
 *  This is free software; see GPL.txt
5
 *
6
 */
7

  
8
#include <event2/event.h>
9
#include <arpa/inet.h>
10
#include <unistd.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <string.h>
14

  
15
#include "net_helper.h"
16
#include "ml.h"
17
#include "ml_helpers.h"
18

  
19
/**
20
 * libevent pointer
21
 */
22
struct event_base *base;
23

  
24
#define NH_BUFFER_SIZE 100
25
#define NH_MSG_MAXSIZE 1024
26

  
27
static int sIdx = 0;
28
static int rIdx = 0;
29

  
30
typedef struct nodeID {
31
	socketID_handle addr;
32
	int connID;	// connection associated to this node, -1 if myself
33
	int addrSize;
34
	int addrStringSize;
35
} nodeID;
36

  
37
typedef struct msgData_cb {
38
	int bIdx;	// index of the message in the proper buffer
39
	unsigned char msgType; // message type
40
	int mSize;	// message size
41
} msgData_cb;
42

  
43
static nodeID *me; //TODO: is it possible to get rid of this (notwithstanding ml callback)??
44
static int timeoutFired = 0;
45

  
46
// pointers to the msgs to be send
47
static uint8_t *sendingBuffer[NH_BUFFER_SIZE];
48
// pointers to the received msgs + sender nodeID
49
static uint8_t *receivedBuffer[NH_BUFFER_SIZE][2];
50

  
51

  
52
/**
53
 * Look for a free slot in the received buffer and allocates it for immediate use
54
 * @return the index of a free slot in the received msgs buffer, -1 if no free slot available.
55
 */
56
static int next_R() {
57
	if (receivedBuffer[rIdx][0]==NULL) {
58
		receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
59
	}
60
	else {
61
		int count;
62
		for (count=0;count<NH_BUFFER_SIZE;count++) {
63
			rIdx = (++rIdx)%NH_BUFFER_SIZE;
64
			if (receivedBuffer[rIdx][0]==NULL)
65
				break;
66
		}
67
		if (count==NH_BUFFER_SIZE)
68
			return -1;
69
		else {
70
			receivedBuffer[rIdx][0] = malloc(NH_MSG_MAXSIZE);
71
		}
72
	}
73
	memset(receivedBuffer[rIdx][0],0,NH_MSG_MAXSIZE);
74
	return rIdx;
75
}
76

  
77
/**
78
 * Look for a free slot in the sending buffer and allocates it for immediate use
79
 * @return the index of a free slot in the sending msgs buffer, -1 if no free slot available.
80
 */
81
static int next_S() {
82
	if (sendingBuffer[sIdx]==NULL) {
83
		sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
84
	}
85
	else {
86
		int count;
87
		for (count=0;count<NH_BUFFER_SIZE;count++) {
88
			sIdx = (++sIdx)%NH_BUFFER_SIZE;
89
			if (sendingBuffer[sIdx]==NULL)
90
				break;
91
		}
92
		if (count==NH_BUFFER_SIZE)
93
			return -1;
94
		else {
95
			sendingBuffer[sIdx] = malloc(NH_MSG_MAXSIZE);
96
		}
97
	}
98
	memset(sendingBuffer[sIdx],0,NH_MSG_MAXSIZE);
99
	return sIdx;
100
}
101

  
102
/**
103
 * Callback used by ml to confirm its initialization. Create a valid self nodeID and register to receive data from remote peers.
104
 * @param local_socketID
105
 * @param errorstatus
106
 */
107
static void init_myNodeID_cb (socketID_handle local_socketID,int errorstatus) {
108
	switch (errorstatus) {
109
	case 0:
110
		//
111
		memcpy(me->addr,local_socketID,SOCKETID_SIZE);
112
		me->addrSize = SOCKETID_SIZE;
113
		me->addrStringSize = SOCKETID_STRING_SIZE;
114
		me->connID = -1;
115
		fprintf(stderr,"Net-helper init : received my own socket: %s.\n",node_addr(me));
116
		break;
117
	case -1:
118
		//
119
		fprintf(stderr,"Net-helper init : socket error occurred in ml while creating socket\n");
120
		break;
121
	case 1:
122
		//
123
		fprintf(stderr,"Net-helper init : NAT traversal failed while creating socket\n");
124
		break;
125
	case 2:
126
	    fprintf(stderr,"Net-helper init : NAT traversal timeout while creating socket\n");
127
	    break;
128
	default :	// should never happen
129
		//
130
		fprintf(stderr,"Net-helper init : Unknown error in ml while creating socket\n");
131
	}
132

  
133
}
134

  
135
/**
136
 * Timeout callback to be set in the eventlib loop as needed
137
 * @param socket
138
 * @param flag
139
 * @param arg
140
 */
141
static void t_out_cb (int socket, short flag, void* arg) {
142

  
143
	timeoutFired = 1;
144
	fprintf(stderr,"TIMEOUT!!!\n");
145
//	event_base_loopbreak(base);
146
}
147

  
148
/**
149
 * Callback called by ml when a remote node ask for a connection
150
 * @param connectionID
151
 * @param arg
152
 */
153
static void receive_conn_cb(int connectionID, void *arg) {
154
    fprintf(stderr, "Net-helper : remote peer opened the connection %d with arg = %d\n", connectionID,(int)arg);
155

  
156
}
157

  
158
/**
159
 * Callback called by the ml when a connection is ready to be used to send data to a remote peer
160
 * @param connectionID
161
 * @param arg
162
 */
163
static void connReady_cb (int connectionID, void *arg) {
164

  
165
	msgData_cb *p;
166
	p = (msgData_cb *)arg;
167
	if (p == NULL) return;
168
	send_params params = {0,0,0,0};
169
	send_Data(connectionID,(char *)(sendingBuffer[p->bIdx]),p->mSize,p->msgType,&params);
170
	free(sendingBuffer[p->bIdx]);
171
	sendingBuffer[p->bIdx] = NULL;
172
	fprintf(stderr,"Net-helper: Message # %d for connection %d sent!\n ", p->bIdx,connectionID);
173
	//	event_base_loopbreak(base);
174
}
175

  
176
/**
177
 * Callback called by ml when a connection error occurs
178
 * @param connectionID
179
 * @param arg
180
 */
181
static void connError_cb (int connectionID, void *arg) {
182
	// simply get rid of the msg in the buffer....
183
	msgData_cb *p;
184
	p = (msgData_cb *)arg;
185
	if (p != NULL) {
186
		fprintf(stderr,"Net-helper: Connection %d could not be established to send msg %d.\n ", connectionID,p->bIdx);
187
		free(sendingBuffer[p->bIdx]);
188
		sendingBuffer[p->bIdx] = NULL;
189
		free(p);
190
	}
191
	//	event_base_loopbreak(base);
192
}
193

  
194
/**
195
 * Callback to receive data from ml
196
 * @param buffer
197
 * @param buflen
198
 * @param msgtype
199
 * @param arg
200
 */
201
static void recv_data_cb(char *buffer, int buflen, unsigned char msgtype, recv_params *arg) {
202
// TODO: lacks a void* arg... moreover: recv_params has a msgtype, but there is also a msgtype explicit argument...
203
//	fprintf(stderr, "Net-helper : called back with some news...\n");
204
	char str[SOCKETID_STRING_SIZE]; //str[SOCKETID_STRING_SIZE]=0;
205
	if (arg->remote_socketID != NULL)
206
		socketID_To_String(arg->remote_socketID,str,SOCKETID_STRING_SIZE);
207
	else
208
		sprintf(str,"!Unknown!");
209
	if (arg->nrMissingBytes || !arg->firstPacketArrived) {
210
	    fprintf(stderr, "Net-helper : corrupted message arrived from %s\n",str);
211
	}
212
	else {
213
		fprintf(stderr, "Net-helper : message arrived from %s\n",str);
214
		// buffering the received message only if possible, otherwise ignore it...
215
		int index = next_R();
216
		if (index >=0) {
217
		//	receivedBuffer[index][0] = malloc(buflen);
218
			if (receivedBuffer[index][0] == NULL) {
219
				fprintf(stderr, "Net-helper : memory error while creating a new message buffer \n");
220
				return;
221
			}
222
			// creating a new sender nodedID
223
			receivedBuffer[index][1] = malloc(sizeof(nodeID));
224
			if (receivedBuffer[index][1]==NULL) {
225
				free (receivedBuffer[index][0]);
226
				receivedBuffer[index][0] = NULL;
227
				fprintf(stderr, "Net-helper : memory error while creating a new nodeID. Message from %s is lost.\n", str);
228
				return;
229
			}
230
			else {
231
				nodeID *remote; remote = (nodeID*)(receivedBuffer[index][1]);
232
				memcpy(receivedBuffer[index][0],buffer,buflen);
233
				  // get the socketID of the sender
234
				remote->addr = malloc(SOCKETID_SIZE);
235
				if (remote->addr == NULL) {
236
					  free (remote);
237
					  fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
238
					  return;
239
				}
240
				else {
241
					memcpy(remote->addr, arg->remote_socketID ,SOCKETID_SIZE);
242
					remote->addrSize = SOCKETID_SIZE;
243
					remote->addrStringSize = SOCKETID_STRING_SIZE;
244
					remote->connID = arg->connectionID;
245
				}
246
			}
247
		}
248
  }
249
//	event_base_loopbreak(base);
250
}
251

  
252
/**
253
 * Initialize the self nodeID structure, the ml and create a socket in ml to get the knownHost
254
 * @param IPaddr
255
 * @param port
256
 * @param srv_IP
257
 * @param srv_port
258
 * @return NULL, because to have a valid nodeID you have to wait for ml to call back...
259
 */
260
struct nodeID *net_helper_init(const char *IPaddr, int port,unsigned char msgtypes[], int msgtypes_len) {
261

  
262
	struct timeval tout = {1, 0};
263
	base = event_base_new();
264

  
265
	me = malloc(sizeof(nodeID));
266
	if (me == NULL) {
267
		return NULL;
268
	}
269
	me->addr = malloc(SOCKETID_SIZE);
270
	if (me->addr == NULL) {
271
		free(me);
272
		return NULL;
273
	}
274
	me->addrSize = 0;	// dirty trick to spot later if the ml has called back ...
275

  
276
	int i;
277
	for (i=0;i<NH_BUFFER_SIZE;i++) {
278
		sendingBuffer[i] = NULL;
279
		receivedBuffer[i][0] = NULL;
280
	}
281

  
282
	register_Error_connection_cb(&connError_cb);
283
	register_Recv_connection_cb(&receive_conn_cb);
284
	for (i=0;i<msgtypes_len;i++) {
285
		register_Recv_data_cb(&recv_data_cb,msgtypes[i]);
286
	}
287
	init_messaging_layer(1,tout,port,IPaddr,0,NULL,&init_myNodeID_cb,base);
288
//	fprintf(stderr,"Net-helper init : back from init!\n");
289
//	register_recv_localsocketID_cb(init_myNodeID_cb);
290
	return me;
291
}
292

  
293

  
294
/**
295
 * Called by the application to send data to a remote peer
296
 * @param from
297
 * @param to
298
 * @param buffer_ptr
299
 * @param buffer_size
300
 * @return The number of sent bytes or -1 if a connection error occurred.
301
 */
302
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
303
{
304
	// if buffer is full, discard the message and return an error flag
305
	int index = next_S();
306
	if (index<0) {
307
		// free(buffer_ptr);
308
		return -1;
309
	}
310
	memcpy(sendingBuffer[index],buffer_ptr,buffer_size);
311
	// free(buffer_ptr);
312
	msgData_cb *p = malloc(sizeof(msgData_cb));
313
	p->bIdx = index; p->mSize = buffer_size; p->msgType = (unsigned char)buffer_ptr[0];
314
	int current = p->bIdx;
315
	to->connID = open_Connection(to->addr,&connReady_cb,p);
316
	if (to->connID<0) {
317
		free(sendingBuffer[current]);
318
		sendingBuffer[current] = NULL;
319
		fprintf(stderr,"Net-helper: Couldn't get a connection ID to send msg %d.\n ", p->bIdx);
320
	}
321
	else {
322
		fprintf(stderr,"Net-helper: Got a connection ID to send msg %d to %s.\n ",
323
			current,node_addr(to));
324
	//		struct timeval tout = {0,500};
325
	//		event_base_once(base,0, EV_TIMEOUT, &t_out_cb, NULL, &tout);
326
		while (sendingBuffer[current] != NULL)
327
			event_base_loop(base,EVLOOP_ONCE);//  EVLOOP_NONBLOCK
328
//		fprintf(stderr,"Net-helper: Back from eventlib loop with status %d.\n", ok);
329
	}
330
	free(p);
331
	return to->connID;
332

  
333
}
334

  
335

  
336
/**
337
 * Called by an application to receive data from remote peers
338
 * @param local
339
 * @param remote
340
 * @param buffer_ptr
341
 * @param buffer_size
342
 * @return The number of received bytes or -1 if some error occurred.
343
 */
344
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
345
{
346
	// this should never happen... if it does, index handling is faulty...
347
	if (receivedBuffer[rIdx][1]==NULL) {
348
		fprintf(stderr, "Net-helper : memory error while creating a new nodeID \n");
349
		return -1;
350
	}
351

  
352
	(*remote) = (nodeID*)(receivedBuffer[rIdx][1]);
353
	// retrieve a msg from the buffer
354
	memcpy(buffer_ptr, receivedBuffer[rIdx][0], buffer_size);
355

  
356
	free(receivedBuffer[rIdx][0]);
357
	receivedBuffer[rIdx][0] = NULL;
358
	receivedBuffer[rIdx][1] = NULL;
359

  
360
//	fprintf(stderr, "Net-helper : I've got mail!!!\n");
361

  
362
	return buffer_size;
363
}
364

  
365

  
366
int wait4data(const struct nodeID *n, struct timeval *tout) {
367

  
368
//	fprintf(stderr,"Net-helper : Waiting for data to come...\n");
369
	event_base_once(base,-1, EV_TIMEOUT, &t_out_cb, NULL, tout);
370
	while(receivedBuffer[rIdx][0]==NULL && timeoutFired==0) {
371
	//	event_base_dispatch(base);
372
		event_base_loop(base,EVLOOP_ONCE);
373
	}
374
	timeoutFired = 0;
375
//	fprintf(stderr,"Back from eventlib loop.\n");
376

  
377
	if (receivedBuffer[rIdx][0]!=NULL)
378
		return 1;
379
	else
380
		return 0;
381
}
382

  
383

  
384

  
385

  
386
struct nodeID *create_node(const char *rem_IP, int rem_port) {
387
	struct nodeID *remote = malloc(sizeof(nodeID));
388
	if (remote == NULL) {
389
		return NULL;
390
	}
391
//	remote->addr = malloc(sizeof(SOCKETID_SIZE));
392
//	if (remote->addr == NULL) {
393
//		free(remote);
394
//		return NULL;
395
//	}
396
	remote->addrSize = SOCKETID_SIZE;
397
	remote->addrStringSize = SOCKETID_STRING_SIZE;
398
	remote->addr = getRemoteSocketID(rem_IP, rem_port);
399
	remote->connID = open_Connection(remote->addr,&connReady_cb,NULL);
400
	return remote;
401
}
402

  
403
void delete_node(struct nodeID *n) {
404
// TODO: check why closing the connection is annoying for the ML
405
//	close_Connection(n->connID);
406
	close_Socket(n->addr);
407
	free(n);
408
}
409

  
410

  
411
const char *node_addr(const struct nodeID *s)
412
{
413
  static char addr[256];
414
  // TODO: socketID_To_String always return 0 !!!
415
  int r = socketID_To_String(s->addr,addr,256);
416
  if (!r)
417
	  return addr;
418
  else
419
	  return "";
420
}
421

  
422
struct nodeID *nodeid_dup(const struct nodeID *s)
423
{
424
  struct nodeID *res;
425

  
426
  res = malloc(sizeof(struct nodeID));
427
  if (res != NULL) {
428
	  res->addr = malloc(SOCKETID_SIZE);
429
	  if (res->addr != NULL) {
430
		 memcpy(res->addr, s->addr, SOCKETID_SIZE);
431
		 res->addrSize = SOCKETID_SIZE;
432
		 res->addrStringSize = SOCKETID_STRING_SIZE;
433
		 res->connID = s->connID;
434
	  }
435
	  else {
436
		free(res);
437
		res = NULL;
438
		fprintf(stderr,"Net-helper : Error while duplicating nodeID...\n");
439
	  }
440
  }
441

  
442
  return res;
443
}
444

  
445
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
446
{
447
  return (memcmp(&s1->addr, &s2->addr, SOCKETID_SIZE) == 0);
448
}
449

  
450
int nodeid_dump(uint8_t *b, const struct nodeID *s)
451
{
452
  socketID_To_String(s->addr,(char *)b,SOCKETID_STRING_SIZE);
453
  fprintf(stderr,"Dumping nodeID : ho scritto %s (%d bytes)\n",b, strlen((char *)b));
454
//  return SOCKETID_STRING_SIZE;
455
  return strlen((char *)b);
456

  
457
//	memcpy(b, s->addr, SOCKETID_SIZE);
458
//	return SOCKETID_SIZE;
459

  
460
}
461

  
462
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
463
{
464
  struct nodeID *res;
465
  res = malloc(sizeof(struct nodeID));
466
  if (res != NULL) {
467
	  res->addr = malloc(SOCKETID_SIZE);
468
	  if (res->addr != NULL) {
469
		  // memcpy(res->addr, b, SOCKETID_SIZE);
470
		  *len = strlen((char*)b);
471
//		  char str[(*len)+1]; str[(*len)]= '\0';
472
//		  snprintf(str,(*len),"%s",(char *)b);
473
		  string_To_SocketID((char *)b/*str*/,res->addr);
474
		  res->addrSize = SOCKETID_SIZE;
475
		  res->addrStringSize = SOCKETID_STRING_SIZE;
476
		  res->connID = -1;
477
	  }
478
	  else {
479
		  free(res);
480
		  res = NULL;
481
		  fprintf(stderr,"Net-helper : Error while 'undumping' nodeID...\n");
482
	  }
483
  }
484
//  *len = SOCKETID_STRING_SIZE; // SOCKETID_SIZE; //sizeof(struct nodeID);
485

  
486
  return res;
487
}

Also available in: Unified diff