Statistics
| Branch: | Revision:

pstreamer / src / net_helper-udp.c @ 1f5baf20

History | View | Annotate | Download (12.3 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *  Copyright (c) 2010 Alessandro Russo
5
 *
6
 *  This is free software; see lgpl-2.1.txt
7
 */
8

    
9
#include <sys/types.h>
10
#include <unistd.h>
11
#include <errno.h>
12
#include <stdlib.h>
13
#include <stdint.h>
14
#include <stdio.h>
15
#include <string.h>
16
#include <pstreamer_event.h>
17

    
18
#ifndef _WIN32
19
#include <sys/socket.h>
20
#include <netinet/in.h>
21
#include <netdb.h>
22
#include <arpa/inet.h>
23
#else
24
#define _WIN32_WINNT 0x0501 /* WINNT>=0x501 (WindowsXP) for supporting getaddrinfo/freeaddrinfo.*/
25
#include "win32-net.h"
26
#endif
27

    
28
#include "net_helper.h"
29

    
30
#define MAX_MSG_SIZE 1024 * 60
31
enum L3PROTOCOL {IPv4, IPv6} l3 = IPv4;
32

    
33
struct nodeID {
34
  struct sockaddr_storage addr;
35
  int fd;
36
};
37

    
38
#ifdef _WIN32
39
static int inet_aton(const char *cp, struct in_addr *addr)
40
{
41
    if( cp==NULL || addr==NULL )
42
    {
43
        return(0);
44
    }
45

    
46
    addr->s_addr = inet_addr(cp);
47
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
48
}
49

    
50
struct iovec {                    /* Scatter/gather array items */
51
  void  *iov_base;              /* Starting address */
52
  size_t iov_len;               /* Number of bytes to transfer */
53
};
54

    
55
struct msghdr {
56
  void         *msg_name;       /* optional address */
57
  socklen_t     msg_namelen;    /* size of address */
58
  struct iovec *msg_iov;        /* scatter/gather array */
59
  size_t        msg_iovlen;     /* # elements in msg_iov */
60
  void         *msg_control;    /* ancillary data, see below */
61
  socklen_t     msg_controllen; /* ancillary data buffer len */
62
  int           msg_flags;      /* flags on received message */
63
};
64

    
65
#define MIN(A,B)    ((A)<(B) ? (A) : (B))
66
ssize_t recvmsg (int sd, struct msghdr *msg, int flags)
67
{
68
  ssize_t bytes_read;
69
  size_t expected_recv_size;
70
  ssize_t left2move;
71
  char *tmp_buf;
72
  char *tmp;
73
  int i;
74

    
75
  assert (msg->msg_iov);
76

    
77
  expected_recv_size = 0;
78
  for (i = 0; i < msg->msg_iovlen; i++)
79
    expected_recv_size += msg->msg_iov[i].iov_len;
80
  tmp_buf = malloc (expected_recv_size);
81
  if (!tmp_buf)
82
    return -1;
83

    
84
  left2move = bytes_read = recvfrom (sd,
85
                                     tmp_buf,
86
                                     expected_recv_size,
87
                                     flags,
88
                                     (struct sockaddr *) msg->msg_name,
89
                                     &msg->msg_namelen);
90

    
91
  for (tmp = tmp_buf, i = 0; i < msg->msg_iovlen; i++)
92
    {
93
      if (left2move <= 0)
94
        break;
95
      assert (msg->msg_iov[i].iov_base);
96
      memcpy (msg->msg_iov[i].iov_base,
97
              tmp, MIN (msg->msg_iov[i].iov_len, left2move));
98
      left2move -= msg->msg_iov[i].iov_len;
99
      tmp += msg->msg_iov[i].iov_len;
100
    }
101

    
102
  free (tmp_buf);
103

    
104
  return bytes_read;
105
}
106

    
107
ssize_t sendmsg (int sd, struct msghdr * msg, int flags)
108
{
109
  ssize_t bytes_send;
110
  size_t expected_send_size;
111
  size_t left2move;
112
  char *tmp_buf;
113
  char *tmp;
114
  int i;
115

    
116
  assert (msg->msg_iov);
117

    
118
  expected_send_size = 0;
119
  for (i = 0; i < msg->msg_iovlen; i++)
120
    expected_send_size += msg->msg_iov[i].iov_len;
121
  tmp_buf = malloc (expected_send_size);
122
  if (!tmp_buf)
123
    return -1;
124

    
125
  for (tmp = tmp_buf, left2move = expected_send_size, i = 0; i <
126
       msg->msg_iovlen; i++)
127
    {
128
      if (left2move <= 0)
129
        break;
130
      assert (msg->msg_iov[i].iov_base);
131
      memcpy (tmp,
132
              msg->msg_iov[i].iov_base,
133
              MIN (msg->msg_iov[i].iov_len, left2move));
134
      left2move -= msg->msg_iov[i].iov_len;
135
      tmp += msg->msg_iov[i].iov_len;
136
    }
137

    
138
  bytes_send = sendto (sd,
139
                       tmp_buf,
140
                       expected_send_size,
141
                       flags,
142
                       (struct sockaddr *) msg->msg_name, msg->msg_namelen);
143

    
144
  free (tmp_buf);
145

    
146
  return bytes_send;
147
}
148

    
149
#endif
150

    
151
int wait4data(const struct nodeID *s, struct timeval *tout, int *user_fds)
152
/* returns 0 if timeout expires 
153
 * returns -1 in case of error of the select function
154
 * retruns 1 if the nodeID file descriptor is ready to be read
155
 *                                         (i.e., some data is ready from the network socket)
156
 * returns 2 if some of the user_fds file descriptors is ready
157
 */
158
{
159
  fd_set fds;
160
  int i, res, max_fd;
161

    
162
  FD_ZERO(&fds);
163
  if (s) {
164
    max_fd = s->fd;
165
    FD_SET(s->fd, &fds);
166
  } else {
167
    max_fd = -1;
168
  }
169
  if (user_fds) {
170
    for (i = 0; user_fds[i] != -1; i++) {
171
      FD_SET(user_fds[i], &fds);
172
      if (user_fds[i] > max_fd) {
173
        max_fd = user_fds[i];
174
      }
175
    }
176
  }
177
  res = select(max_fd + 1, &fds, NULL, NULL, tout);
178
  if (res <= 0) {
179
    return res;
180
  }
181
  if (s && FD_ISSET(s->fd, &fds)) {
182
    return 1;
183
  }
184

    
185
  /* If execution arrives here, user_fds cannot be 0
186
     (an FD is ready, and it's not s->fd) */
187
  for (i = 0; user_fds[i] != -1; i++) {
188
    if (!FD_ISSET(user_fds[i], &fds)) {
189
      user_fds[i] = -2;
190
    }
191
  }
192

    
193
  return 2;
194
}
195

    
196
int register_network_fds(const struct nodeID *s, fd_register_f func, void *handler)
197
{
198
        if (s) 
199
                func(handler, s->fd, 'r');
200
        return 0;
201
}
202

    
203
struct nodeID *create_node(const char *IPaddr, int port)
204
{
205
  struct nodeID *s;
206
  int res;
207
  struct addrinfo hints, *result;
208

    
209
  memset(&hints, 0, sizeof(hints));
210
  hints.ai_family = AF_UNSPEC;
211
  hints.ai_flags = AI_NUMERICHOST;
212

    
213
  s = malloc(sizeof(struct nodeID));
214
  memset(s, 0, sizeof(struct nodeID));
215

    
216
  if ((res = getaddrinfo(IPaddr, NULL, &hints, &result)))
217
  {
218
    fprintf(stderr, "Cannot resolve hostname '%s'\n", IPaddr);
219
    return NULL;
220
  }
221
  s->addr.ss_family = result->ai_family;
222
  switch (result->ai_family)
223
  {
224
    case (AF_INET):
225
      ((struct sockaddr_in *)&s->addr)->sin_port = htons(port);
226
      res = inet_pton (result->ai_family, IPaddr, &((struct sockaddr_in *)&s->addr)->sin_addr);
227
    break;
228
    case (AF_INET6):
229
      ((struct sockaddr_in6 *)&s->addr)->sin6_port = htons(port);
230
      res = inet_pton (result->ai_family, IPaddr, &(((struct sockaddr_in6 *) &s->addr)->sin6_addr));
231
    break;
232
    default:
233
      fprintf(stderr, "Cannot resolve address family %d for '%s'\n", result->ai_family, IPaddr);
234
      res = 0;
235
      break;
236
  }
237
  freeaddrinfo(result);
238
  if (res != 1)
239
  {
240
    fprintf(stderr, "Could not convert address '%s'\n", IPaddr);
241
    free(s);
242

    
243
    return NULL;
244
  }
245

    
246
  s->fd = -1;
247

    
248
  return s;
249
}
250

    
251
struct nodeID *net_helper_init(const char *my_addr, int port, const char *config)
252
{
253
  int res;
254
  struct nodeID *myself;
255

    
256
  myself = create_node(my_addr, port);
257
  if (myself == NULL) {
258
    fprintf(stderr, "Error creating my socket (%s:%d)!\n", my_addr, port);
259

    
260
    return NULL;
261
  }
262
  myself->fd =  socket(myself->addr.ss_family, SOCK_DGRAM, 0);
263
  if (myself->fd < 0) {
264
    free(myself);
265

    
266
    return NULL;
267
  }
268
//  TODO:
269
//  if (addr->sa_family == AF_INET6) {
270
//      r = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
271
//  }
272

    
273
  fprintf(stderr, "My sock: %d\n", myself->fd);
274

    
275
  switch (myself->addr.ss_family)
276
  {
277
    case (AF_INET):
278
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in));
279
    break;
280
    case (AF_INET6):
281
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in6));
282
    break;
283
    default:
284
      fprintf(stderr, "Cannot resolve address family %d in bind\n", myself->addr.ss_family);
285
      res = 0;
286
    break;
287
  }
288

    
289
  if (res < 0) {
290
    /* bind failed: not a local address... Just close the socket! */
291
    close(myself->fd);
292
    free(myself);
293

    
294
    return NULL;
295
  }
296

    
297
  return myself;
298
}
299

    
300
void bind_msg_type (uint8_t msgtype)
301
{
302
}
303

    
304
struct my_hdr_t {
305
  uint8_t m_seq;
306
  uint8_t frag_seq;
307
  uint8_t frags;
308
} __attribute__((packed));
309

    
310
int send_to_peer(const struct nodeID *from, const struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
311
{
312
  struct msghdr msg = {0};
313
  static struct my_hdr_t my_hdr;
314
  struct iovec iov[2];
315
  int res;
316

    
317
  if (buffer_size <= 0) return -1;
318

    
319
  iov[0].iov_base = &my_hdr;
320
  iov[0].iov_len = sizeof(struct my_hdr_t);
321
  msg.msg_name = (void *)&to->addr;
322
  msg.msg_namelen = sizeof(struct sockaddr_storage);
323
  msg.msg_iovlen = 2;
324
  msg.msg_iov = iov;
325

    
326
  my_hdr.m_seq++;
327
  my_hdr.frags = (buffer_size / (MAX_MSG_SIZE)) + 1;
328
  my_hdr.frag_seq = 0;
329

    
330
  do {
331
    iov[1].iov_base = (void *)buffer_ptr;
332
    if (buffer_size > MAX_MSG_SIZE) {
333
      iov[1].iov_len = MAX_MSG_SIZE;
334
    } else {
335
      iov[1].iov_len = buffer_size;
336
    }
337
    my_hdr.frag_seq++;
338

    
339
    buffer_size -= iov[1].iov_len;
340
    buffer_ptr += iov[1].iov_len;
341
    res = sendmsg(from->fd, &msg, 0);
342

    
343
    if (res  < 0){
344
      int error = errno;
345
      fprintf(stderr,"net-helper: sendmsg failed errno %d: %s\n", error, strerror(error));
346
    }
347
  } while (buffer_size > 0);
348

    
349
  return res;
350
}
351

    
352
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
353
{
354
  int res, recv, m_seq, frag_seq;
355
  struct sockaddr_storage raddr;
356
  struct msghdr msg = {0};
357
  static struct my_hdr_t my_hdr;
358
  struct iovec iov[2];
359

    
360
  iov[0].iov_base = &my_hdr;
361
  iov[0].iov_len = sizeof(struct my_hdr_t);
362
  msg.msg_name = &raddr;
363
  msg.msg_namelen = sizeof(struct sockaddr_storage);
364
  msg.msg_iovlen = 2;
365
  msg.msg_iov = iov;
366

    
367
  *remote = malloc(sizeof(struct nodeID));
368
  if (*remote == NULL) {
369
    return -1;
370
  }
371
  memset(*remote, 0, sizeof(struct nodeID));
372

    
373
  recv = 0;
374
  m_seq = -1;
375
  frag_seq = 0;
376
  do {
377
    iov[1].iov_base = buffer_ptr;
378
    if (buffer_size > MAX_MSG_SIZE) {
379
      iov[1].iov_len = MAX_MSG_SIZE;
380
    } else {
381
      iov[1].iov_len = buffer_size;
382
    }
383
    buffer_size -= iov[1].iov_len;
384
    buffer_ptr += iov[1].iov_len;
385
    res = recvmsg(local->fd, &msg, 0);
386
    recv += (res - sizeof(struct my_hdr_t));
387
    if (m_seq != -1 && my_hdr.m_seq != m_seq) {
388
      return -1;
389
    } else {
390
      m_seq = my_hdr.m_seq;
391
    }
392
    if (my_hdr.frag_seq != frag_seq + 1) {
393
      return -1;
394
    } else {
395
     frag_seq++;
396
    }
397
  } while ((my_hdr.frag_seq < my_hdr.frags) && (buffer_size > 0));
398
  memcpy(&(*remote)->addr, &raddr, msg.msg_namelen);
399
  (*remote)->fd = -1;
400

    
401
  return recv;
402
}
403

    
404
int node_addr(const struct nodeID *s, char *addr, int len)
405
{
406
  int n;
407

    
408
        if (s && node_ip(s, addr, len)>=0)
409
        {
410
                n = snprintf(addr + strlen(addr), len - strlen(addr) - 1, ":%d", node_port(s));
411
        } else
412
                n = snprintf(addr, len , "None");
413

    
414
  return n;
415
}
416

    
417
struct nodeID *nodeid_dup(const struct nodeID *s)
418
{
419
  struct nodeID *res;
420

    
421
  res = malloc(sizeof(struct nodeID));
422
  if (res != NULL) {
423
    memcpy(res, s, sizeof(struct nodeID));
424
  }
425

    
426
  return res;
427
}
428

    
429
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
430
{
431
        return (nodeid_cmp(s1,s2) == 0);
432
//  return (memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage)) == 0);
433
}
434

    
435
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
436
{
437
        char ip1[80], ip2[80];
438
        int port1,port2,res;
439

    
440
  if(s1 && s2)
441
  {
442
    port1=node_port(s1);
443
    port2=node_port(s2);
444
    node_ip(s1,ip1,80);
445
    node_ip(s2,ip2,80);
446

    
447
  //        int res = (port1^port2)|strcmp(ip1,ip2);
448
  //        fprintf(stderr,"Comparing %s:%d and %s:%d\n",ip1,port1,ip2,port2);
449
  //        fprintf(stderr,"Result: %d\n",res);
450
    res  = strcmp(ip1,ip2);
451
    if (res!=0)
452
      return res;
453
    else
454
      return port1-port2;
455
  //                return port1 == port2 ? 0 : 1;
456

    
457
  //  return memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage));
458
  }
459
  else
460
    return 0;
461
}
462

    
463
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
464
{
465
  if (max_write_size < sizeof(struct sockaddr_storage)) return -1;
466

    
467
  memcpy(b, &s->addr, sizeof(struct sockaddr_storage));
468

    
469
  return sizeof(struct sockaddr_storage);
470
}
471

    
472
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
473
{
474
  struct nodeID *res;
475
  res = malloc(sizeof(struct nodeID));
476
  if (res != NULL) {
477
    memcpy(&res->addr, b, sizeof(struct sockaddr_storage));
478
    res->fd = -1;
479
  }
480
  *len = sizeof(struct sockaddr_storage);
481

    
482
  return res;
483
}
484

    
485
void nodeid_free(struct nodeID *s)
486
{
487
  if (s)
488
  {
489
    if (s->fd >= 0)
490
      close(s->fd);
491
    free(s);
492
  }
493
}
494

    
495
int node_ip(const struct nodeID *s, char *ip, int len)
496
{
497
        int res = 0;
498
  switch (s->addr.ss_family)
499
  {
500
    case AF_INET:
501
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in *)&s->addr)->sin_addr, ip, len);
502
      break;
503
    case AF_INET6:
504
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in6 *)&s->addr)->sin6_addr, ip, len);
505
      break;
506
    default:
507
                        res = -1;
508
      break;
509
  }
510
  if (!ip) {
511
          perror("inet_ntop");
512
                res = -1;
513
  }
514
  if (ip && res <0 && len)
515
    ip[0] = '\0';
516
  return res;
517
}
518

    
519
int node_port(const struct nodeID *s)
520
{
521
  int res;
522
  switch (s->addr.ss_family)
523
  {
524
    case AF_INET:
525
      res = ntohs(((const struct sockaddr_in *) &s->addr)->sin_port);
526
      break;
527
    case AF_INET6:
528
      res = ntohs(((const struct sockaddr_in6 *)&s->addr)->sin6_port);
529
      break;
530
    default:
531
      res = -1;
532
      break;
533
  }
534
  return res;
535
}