Statistics
| Branch: | Revision:

grapes / src / net_helper-udp.c @ c5922d21

History | View | Annotate | Download (12.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *  Copyright (c) 2010 Alessandro Russo
5
 *
6
 *  This is free software; see lgpl-2.1.txt
7
 */
8

    
9
#include <sys/types.h>
10
#include <unistd.h>
11
#include <errno.h>
12
#include <stdlib.h>
13
#include <stdint.h>
14
#include <stdio.h>
15
#include <string.h>
16

    
17
#ifndef _WIN32
18
#include <sys/socket.h>
19
#include <netinet/in.h>
20
#include <netdb.h>
21
#include <arpa/inet.h>
22
#else
23
#define _WIN32_WINNT 0x0501 /* WINNT>=0x501 (WindowsXP) for supporting getaddrinfo/freeaddrinfo.*/
24
#include "win32-net.h"
25
#endif
26

    
27
#include "net_helper.h"
28

    
29
#define MAX_MSG_SIZE 1024 * 60
30
enum L3PROTOCOL {IPv4, IPv6} l3 = IPv4;
31

    
32
struct nodeID {
33
  struct sockaddr_storage addr;
34
  int fd;
35
};
36

    
37
#ifdef _WIN32
38
static int inet_aton(const char *cp, struct in_addr *addr)
39
{
40
    if( cp==NULL || addr==NULL )
41
    {
42
        return(0);
43
    }
44

    
45
    addr->s_addr = inet_addr(cp);
46
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
47
}
48

    
49
struct iovec {                    /* Scatter/gather array items */
50
  void  *iov_base;              /* Starting address */
51
  size_t iov_len;               /* Number of bytes to transfer */
52
};
53

    
54
struct msghdr {
55
  void         *msg_name;       /* optional address */
56
  socklen_t     msg_namelen;    /* size of address */
57
  struct iovec *msg_iov;        /* scatter/gather array */
58
  size_t        msg_iovlen;     /* # elements in msg_iov */
59
  void         *msg_control;    /* ancillary data, see below */
60
  socklen_t     msg_controllen; /* ancillary data buffer len */
61
  int           msg_flags;      /* flags on received message */
62
};
63

    
64
#define MIN(A,B)    ((A)<(B) ? (A) : (B))
65
ssize_t recvmsg (int sd, struct msghdr *msg, int flags)
66
{
67
  ssize_t bytes_read;
68
  size_t expected_recv_size;
69
  ssize_t left2move;
70
  char *tmp_buf;
71
  char *tmp;
72
  int i;
73

    
74
  assert (msg->msg_iov);
75

    
76
  expected_recv_size = 0;
77
  for (i = 0; i < msg->msg_iovlen; i++)
78
    expected_recv_size += msg->msg_iov[i].iov_len;
79
  tmp_buf = malloc (expected_recv_size);
80
  if (!tmp_buf)
81
    return -1;
82

    
83
  left2move = bytes_read = recvfrom (sd,
84
                                     tmp_buf,
85
                                     expected_recv_size,
86
                                     flags,
87
                                     (struct sockaddr *) msg->msg_name,
88
                                     &msg->msg_namelen);
89

    
90
  for (tmp = tmp_buf, i = 0; i < msg->msg_iovlen; i++)
91
    {
92
      if (left2move <= 0)
93
        break;
94
      assert (msg->msg_iov[i].iov_base);
95
      memcpy (msg->msg_iov[i].iov_base,
96
              tmp, MIN (msg->msg_iov[i].iov_len, left2move));
97
      left2move -= msg->msg_iov[i].iov_len;
98
      tmp += msg->msg_iov[i].iov_len;
99
    }
100

    
101
  free (tmp_buf);
102

    
103
  return bytes_read;
104
}
105

    
106
ssize_t sendmsg (int sd, struct msghdr * msg, int flags)
107
{
108
  ssize_t bytes_send;
109
  size_t expected_send_size;
110
  size_t left2move;
111
  char *tmp_buf;
112
  char *tmp;
113
  int i;
114

    
115
  assert (msg->msg_iov);
116

    
117
  expected_send_size = 0;
118
  for (i = 0; i < msg->msg_iovlen; i++)
119
    expected_send_size += msg->msg_iov[i].iov_len;
120
  tmp_buf = malloc (expected_send_size);
121
  if (!tmp_buf)
122
    return -1;
123

    
124
  for (tmp = tmp_buf, left2move = expected_send_size, i = 0; i <
125
       msg->msg_iovlen; i++)
126
    {
127
      if (left2move <= 0)
128
        break;
129
      assert (msg->msg_iov[i].iov_base);
130
      memcpy (tmp,
131
              msg->msg_iov[i].iov_base,
132
              MIN (msg->msg_iov[i].iov_len, left2move));
133
      left2move -= msg->msg_iov[i].iov_len;
134
      tmp += msg->msg_iov[i].iov_len;
135
    }
136

    
137
  bytes_send = sendto (sd,
138
                       tmp_buf,
139
                       expected_send_size,
140
                       flags,
141
                       (struct sockaddr *) msg->msg_name, msg->msg_namelen);
142

    
143
  free (tmp_buf);
144

    
145
  return bytes_send;
146
}
147

    
148
#endif
149

    
150
int wait4data(const struct nodeID *s, struct timeval *tout, int *user_fds)
151
/* returns 0 if timeout expires 
152
 * returns -1 in case of error of the select function
153
 * retruns 1 if the nodeID file descriptor is ready to be read
154
 *                                         (i.e., some data is ready from the network socket)
155
 * returns 2 if some of the user_fds file descriptors is ready
156
 */
157
{
158
  fd_set fds;
159
  int i, res, max_fd;
160

    
161
  FD_ZERO(&fds);
162
  if (s) {
163
    max_fd = s->fd;
164
    FD_SET(s->fd, &fds);
165
  } else {
166
    max_fd = -1;
167
  }
168
  if (user_fds) {
169
    for (i = 0; user_fds[i] != -1; i++) {
170
      FD_SET(user_fds[i], &fds);
171
      if (user_fds[i] > max_fd) {
172
        max_fd = user_fds[i];
173
      }
174
    }
175
  }
176
  res = select(max_fd + 1, &fds, NULL, NULL, tout);
177
  if (res <= 0) {
178
    return res;
179
  }
180
  if (s && FD_ISSET(s->fd, &fds)) {
181
    return 1;
182
  }
183

    
184
  /* If execution arrives here, user_fds cannot be 0
185
     (an FD is ready, and it's not s->fd) */
186
  for (i = 0; user_fds[i] != -1; i++) {
187
    if (!FD_ISSET(user_fds[i], &fds)) {
188
      user_fds[i] = -2;
189
    }
190
  }
191

    
192
  return 2;
193
}
194

    
195
struct nodeID *create_node(const char *IPaddr, int port)
196
{
197
  struct nodeID *s;
198
  int res;
199
  struct addrinfo hints, *result;
200

    
201
  memset(&hints, 0, sizeof(hints));
202
  hints.ai_family = AF_UNSPEC;
203
  hints.ai_flags = AI_NUMERICHOST;
204

    
205
  s = malloc(sizeof(struct nodeID));
206
  memset(s, 0, sizeof(struct nodeID));
207

    
208
  if ((res = getaddrinfo(IPaddr, NULL, &hints, &result)))
209
  {
210
    fprintf(stderr, "Cannot resolve hostname '%s'\n", IPaddr);
211
    return NULL;
212
  }
213
  s->addr.ss_family = result->ai_family;
214
  switch (result->ai_family)
215
  {
216
    case (AF_INET):
217
      ((struct sockaddr_in *)&s->addr)->sin_port = htons(port);
218
      res = inet_pton (result->ai_family, IPaddr, &((struct sockaddr_in *)&s->addr)->sin_addr);
219
    break;
220
    case (AF_INET6):
221
      ((struct sockaddr_in6 *)&s->addr)->sin6_port = htons(port);
222
      res = inet_pton (result->ai_family, IPaddr, &(((struct sockaddr_in6 *) &s->addr)->sin6_addr));
223
    break;
224
    default:
225
      fprintf(stderr, "Cannot resolve address family %d for '%s'\n", result->ai_family, IPaddr);
226
      res = 0;
227
      break;
228
  }
229
  freeaddrinfo(result);
230
  if (res != 1)
231
  {
232
    fprintf(stderr, "Could not convert address '%s'\n", IPaddr);
233
    free(s);
234

    
235
    return NULL;
236
  }
237

    
238
  s->fd = -1;
239

    
240
  return s;
241
}
242

    
243
struct nodeID *net_helper_init(const char *my_addr, int port, const char *config)
244
{
245
  int res;
246
  struct nodeID *myself;
247

    
248
  myself = create_node(my_addr, port);
249
  if (myself == NULL) {
250
    fprintf(stderr, "Error creating my socket (%s:%d)!\n", my_addr, port);
251

    
252
    return NULL;
253
  }
254
  myself->fd =  socket(myself->addr.ss_family, SOCK_DGRAM, 0);
255
  if (myself->fd < 0) {
256
    free(myself);
257

    
258
    return NULL;
259
  }
260
//  TODO:
261
//  if (addr->sa_family == AF_INET6) {
262
//      r = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
263
//  }
264

    
265
  fprintf(stderr, "My sock: %d\n", myself->fd);
266

    
267
  switch (myself->addr.ss_family)
268
  {
269
    case (AF_INET):
270
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in));
271
    break;
272
    case (AF_INET6):
273
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in6));
274
    break;
275
    default:
276
      fprintf(stderr, "Cannot resolve address family %d in bind\n", myself->addr.ss_family);
277
      res = 0;
278
    break;
279
  }
280

    
281
  if (res < 0) {
282
    /* bind failed: not a local address... Just close the socket! */
283
    close(myself->fd);
284
    free(myself);
285

    
286
    return NULL;
287
  }
288

    
289
  return myself;
290
}
291

    
292
void bind_msg_type (uint8_t msgtype)
293
{
294
}
295

    
296
struct my_hdr_t {
297
  uint8_t m_seq;
298
  uint8_t frag_seq;
299
  uint8_t frags;
300
} __attribute__((packed));
301

    
302
int send_to_peer(const struct nodeID *from, const struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
303
{
304
  struct msghdr msg = {0};
305
  static struct my_hdr_t my_hdr;
306
  struct iovec iov[2];
307
  int res;
308

    
309
  if (buffer_size <= 0) return -1;
310

    
311
  iov[0].iov_base = &my_hdr;
312
  iov[0].iov_len = sizeof(struct my_hdr_t);
313
  msg.msg_name = &to->addr;
314
  msg.msg_namelen = sizeof(struct sockaddr_storage);
315
  msg.msg_iovlen = 2;
316
  msg.msg_iov = iov;
317

    
318
  my_hdr.m_seq++;
319
  my_hdr.frags = (buffer_size / (MAX_MSG_SIZE)) + 1;
320
  my_hdr.frag_seq = 0;
321

    
322
  do {
323
    iov[1].iov_base = buffer_ptr;
324
    if (buffer_size > MAX_MSG_SIZE) {
325
      iov[1].iov_len = MAX_MSG_SIZE;
326
    } else {
327
      iov[1].iov_len = buffer_size;
328
    }
329
    my_hdr.frag_seq++;
330

    
331
    buffer_size -= iov[1].iov_len;
332
    buffer_ptr += iov[1].iov_len;
333
    res = sendmsg(from->fd, &msg, 0);
334

    
335
    if (res  < 0){
336
      int error = errno;
337
      fprintf(stderr,"net-helper: sendmsg failed errno %d: %s\n", error, strerror(error));
338
    }
339
  } while (buffer_size > 0);
340

    
341
  return res;
342
}
343

    
344
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
345
{
346
  int res, recv, m_seq, frag_seq;
347
  struct sockaddr_storage raddr;
348
  struct msghdr msg = {0};
349
  static struct my_hdr_t my_hdr;
350
  struct iovec iov[2];
351

    
352
  iov[0].iov_base = &my_hdr;
353
  iov[0].iov_len = sizeof(struct my_hdr_t);
354
  msg.msg_name = &raddr;
355
  msg.msg_namelen = sizeof(struct sockaddr_storage);
356
  msg.msg_iovlen = 2;
357
  msg.msg_iov = iov;
358

    
359
  *remote = malloc(sizeof(struct nodeID));
360
  if (*remote == NULL) {
361
    return -1;
362
  }
363

    
364
  recv = 0;
365
  m_seq = -1;
366
  frag_seq = 0;
367
  do {
368
    iov[1].iov_base = buffer_ptr;
369
    if (buffer_size > MAX_MSG_SIZE) {
370
      iov[1].iov_len = MAX_MSG_SIZE;
371
    } else {
372
      iov[1].iov_len = buffer_size;
373
    }
374
    buffer_size -= iov[1].iov_len;
375
    buffer_ptr += iov[1].iov_len;
376
    res = recvmsg(local->fd, &msg, 0);
377
    recv += (res - sizeof(struct my_hdr_t));
378
    if (m_seq != -1 && my_hdr.m_seq != m_seq) {
379
      return -1;
380
    } else {
381
      m_seq = my_hdr.m_seq;
382
    }
383
    if (my_hdr.frag_seq != frag_seq + 1) {
384
      return -1;
385
    } else {
386
     frag_seq++;
387
    }
388
  } while ((my_hdr.frag_seq < my_hdr.frags) && (buffer_size > 0));
389
  memcpy(&(*remote)->addr, &raddr, msg.msg_namelen);
390
  (*remote)->fd = -1;
391

    
392
  return recv;
393
}
394

    
395
int node_addr(const struct nodeID *s, char *addr, int len)
396
{
397
  int n;
398

    
399
        if (s && node_ip(s, addr, len)>=0)
400
        {
401
                n = snprintf(addr + strlen(addr), len - strlen(addr) - 1, ":%d", node_port(s));
402
        } else
403
                n = snprintf(addr, len , "None");
404

    
405
  return n;
406
}
407

    
408
struct nodeID *nodeid_dup(const struct nodeID *s)
409
{
410
  struct nodeID *res;
411

    
412
  res = malloc(sizeof(struct nodeID));
413
  if (res != NULL) {
414
    memcpy(res, s, sizeof(struct nodeID));
415
  }
416

    
417
  return res;
418
}
419

    
420
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
421
{
422
        return (nodeid_cmp(s1,s2) == 0);
423
//  return (memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage)) == 0);
424
}
425

    
426
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
427
{
428
        char ip1[80], ip2[80];
429
        int port1,port2,res;
430

    
431
  if(s1 && s2)
432
  {
433
    port1=node_port(s1);
434
    port2=node_port(s2);
435
    node_ip(s1,ip1,80);
436
    node_ip(s2,ip2,80);
437

    
438
  //        int res = (port1^port2)|strcmp(ip1,ip2);
439
  //        fprintf(stderr,"Comparing %s:%d and %s:%d\n",ip1,port1,ip2,port2);
440
  //        fprintf(stderr,"Result: %d\n",res);
441
    res  = strcmp(ip1,ip2);
442
    if (res!=0)
443
      return res;
444
    else
445
      return port1-port2;
446
  //                return port1 == port2 ? 0 : 1;
447

    
448
  //  return memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage));
449
  }
450
  else
451
    return 0;
452
}
453

    
454
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
455
{
456
  if (max_write_size < sizeof(struct sockaddr_storage)) return -1;
457

    
458
  memcpy(b, &s->addr, sizeof(struct sockaddr_storage));
459

    
460
  return sizeof(struct sockaddr_storage);
461
}
462

    
463
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
464
{
465
  struct nodeID *res;
466
  res = malloc(sizeof(struct nodeID));
467
  if (res != NULL) {
468
    memcpy(&res->addr, b, sizeof(struct sockaddr_storage));
469
    res->fd = -1;
470
  }
471
  *len = sizeof(struct sockaddr_storage);
472

    
473
  return res;
474
}
475

    
476
void nodeid_free(struct nodeID *s)
477
{
478
  free(s);
479
}
480

    
481
int node_ip(const struct nodeID *s, char *ip, int len)
482
{
483
        int res = 0;
484
  switch (s->addr.ss_family)
485
  {
486
    case AF_INET:
487
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in *)&s->addr)->sin_addr, ip, len);
488
      break;
489
    case AF_INET6:
490
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in6 *)&s->addr)->sin6_addr, ip, len);
491
      break;
492
    default:
493
                        res = -1;
494
      break;
495
  }
496
  if (!ip) {
497
          perror("inet_ntop");
498
                res = -1;
499
  }
500
  if (ip && res <0 && len)
501
    ip[0] = '\0';
502
  return res;
503
}
504

    
505
int node_port(const struct nodeID *s)
506
{
507
  int res;
508
  switch (s->addr.ss_family)
509
  {
510
    case AF_INET:
511
      res = ntohs(((const struct sockaddr_in *) &s->addr)->sin_port);
512
      break;
513
    case AF_INET6:
514
      res = ntohs(((const struct sockaddr_in6 *)&s->addr)->sin6_port);
515
      break;
516
    default:
517
      res = -1;
518
      break;
519
  }
520
  return res;
521
}