Statistics
| Branch: | Revision:

pstreamer / src / net_helper-udp.c @ 37b37976

History | View | Annotate | Download (12.4 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *  Copyright (c) 2010 Alessandro Russo
5
 *
6
 *  This is free software; see lgpl-2.1.txt
7
 */
8

    
9
#include <sys/types.h>
10
#include <unistd.h>
11
#include <errno.h>
12
#include <stdlib.h>
13
#include <stdint.h>
14
#include <stdio.h>
15
#include <string.h>
16
#include <pstreamer_event.h>
17

    
18
#ifndef _WIN32
19
#include <sys/socket.h>
20
#include <netinet/in.h>
21
#include <netdb.h>
22
#include <arpa/inet.h>
23
#else
24
#define _WIN32_WINNT 0x0501 /* WINNT>=0x501 (WindowsXP) for supporting getaddrinfo/freeaddrinfo.*/
25
#include "win32-net.h"
26
#endif
27

    
28
#include "net_helper.h"
29

    
30
#define MAX_MSG_SIZE 1024 * 60
31
enum L3PROTOCOL {IPv4, IPv6} l3 = IPv4;
32

    
33
struct nodeID {
34
  struct sockaddr_storage addr;
35
  int fd;
36
};
37

    
38
#ifdef _WIN32
39
static int inet_aton(const char *cp, struct in_addr *addr)
40
{
41
    if( cp==NULL || addr==NULL )
42
    {
43
        return(0);
44
    }
45

    
46
    addr->s_addr = inet_addr(cp);
47
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
48
}
49

    
50
struct iovec {                    /* Scatter/gather array items */
51
  void  *iov_base;              /* Starting address */
52
  size_t iov_len;               /* Number of bytes to transfer */
53
};
54

    
55
struct msghdr {
56
  void         *msg_name;       /* optional address */
57
  socklen_t     msg_namelen;    /* size of address */
58
  struct iovec *msg_iov;        /* scatter/gather array */
59
  size_t        msg_iovlen;     /* # elements in msg_iov */
60
  void         *msg_control;    /* ancillary data, see below */
61
  socklen_t     msg_controllen; /* ancillary data buffer len */
62
  int           msg_flags;      /* flags on received message */
63
};
64

    
65
#define MIN(A,B)    ((A)<(B) ? (A) : (B))
66
ssize_t recvmsg (int sd, struct msghdr *msg, int flags)
67
{
68
  ssize_t bytes_read;
69
  size_t expected_recv_size;
70
  ssize_t left2move;
71
  char *tmp_buf;
72
  char *tmp;
73
  int i;
74

    
75
  assert (msg->msg_iov);
76

    
77
  expected_recv_size = 0;
78
  for (i = 0; i < msg->msg_iovlen; i++)
79
    expected_recv_size += msg->msg_iov[i].iov_len;
80
  tmp_buf = malloc (expected_recv_size);
81
  if (!tmp_buf)
82
    return -1;
83

    
84
  left2move = bytes_read = recvfrom (sd,
85
                                     tmp_buf,
86
                                     expected_recv_size,
87
                                     flags,
88
                                     (struct sockaddr *) msg->msg_name,
89
                                     &msg->msg_namelen);
90

    
91
  for (tmp = tmp_buf, i = 0; i < msg->msg_iovlen; i++)
92
    {
93
      if (left2move <= 0)
94
        break;
95
      assert (msg->msg_iov[i].iov_base);
96
      memcpy (msg->msg_iov[i].iov_base,
97
              tmp, MIN (msg->msg_iov[i].iov_len, left2move));
98
      left2move -= msg->msg_iov[i].iov_len;
99
      tmp += msg->msg_iov[i].iov_len;
100
    }
101

    
102
  free (tmp_buf);
103

    
104
  return bytes_read;
105
}
106

    
107
ssize_t sendmsg (int sd, struct msghdr * msg, int flags)
108
{
109
  ssize_t bytes_send;
110
  size_t expected_send_size;
111
  size_t left2move;
112
  char *tmp_buf;
113
  char *tmp;
114
  int i;
115

    
116
  assert (msg->msg_iov);
117

    
118
  expected_send_size = 0;
119
  for (i = 0; i < msg->msg_iovlen; i++)
120
    expected_send_size += msg->msg_iov[i].iov_len;
121
  tmp_buf = malloc (expected_send_size);
122
  if (!tmp_buf)
123
    return -1;
124

    
125
  for (tmp = tmp_buf, left2move = expected_send_size, i = 0; i <
126
       msg->msg_iovlen; i++)
127
    {
128
      if (left2move <= 0)
129
        break;
130
      assert (msg->msg_iov[i].iov_base);
131
      memcpy (tmp,
132
              msg->msg_iov[i].iov_base,
133
              MIN (msg->msg_iov[i].iov_len, left2move));
134
      left2move -= msg->msg_iov[i].iov_len;
135
      tmp += msg->msg_iov[i].iov_len;
136
    }
137

    
138
  bytes_send = sendto (sd,
139
                       tmp_buf,
140
                       expected_send_size,
141
                       flags,
142
                       (struct sockaddr *) msg->msg_name, msg->msg_namelen);
143

    
144
  free (tmp_buf);
145

    
146
  return bytes_send;
147
}
148

    
149
#endif
150

    
151
int wait4data(const struct nodeID *s, struct timeval *tout, int *user_fds)
152
/* returns 0 if timeout expires 
153
 * returns -1 in case of error of the select function
154
 * retruns 1 if the nodeID file descriptor is ready to be read
155
 *                                         (i.e., some data is ready from the network socket)
156
 * returns 2 if some of the user_fds file descriptors is ready
157
 */
158
{
159
  fd_set fds;
160
  int i, res, max_fd;
161

    
162
  FD_ZERO(&fds);
163
  if (s) {
164
    max_fd = s->fd;
165
    FD_SET(s->fd, &fds);
166
  } else {
167
    max_fd = -1;
168
  }
169
  if (user_fds) {
170
    for (i = 0; user_fds[i] != -1; i++) {
171
      FD_SET(user_fds[i], &fds);
172
      if (user_fds[i] > max_fd) {
173
        max_fd = user_fds[i];
174
      }
175
    }
176
  }
177
  res = select(max_fd + 1, &fds, NULL, NULL, tout);
178
  if (res <= 0) {
179
    return res;
180
  }
181
  if (s && FD_ISSET(s->fd, &fds)) {
182
    return 1;
183
  }
184

    
185
  /* If execution arrives here, user_fds cannot be 0
186
     (an FD is ready, and it's not s->fd) */
187
  for (i = 0; user_fds[i] != -1; i++) {
188
    if (!FD_ISSET(user_fds[i], &fds)) {
189
      user_fds[i] = -2;
190
    }
191
  }
192

    
193
  return 2;
194
}
195

    
196
int register_network_fds(const struct nodeID *s, fd_register_f func, void *handler)
197
{
198
        if (s) 
199
                func(handler, s->fd, 'r');
200
        return 0;
201
}
202

    
203
struct nodeID *create_node(const char *IPaddr, int port)
204
{
205
  struct nodeID *s;
206
  int res;
207
  struct addrinfo hints, *result;
208

    
209
  memset(&hints, 0, sizeof(hints));
210
  hints.ai_family = AF_UNSPEC;
211
  hints.ai_flags = AI_NUMERICHOST;
212

    
213
  s = malloc(sizeof(struct nodeID));
214
  memset(s, 0, sizeof(struct nodeID));
215

    
216
  if ((res = getaddrinfo(IPaddr, NULL, &hints, &result)))
217
  {
218
    fprintf(stderr, "Cannot resolve hostname '%s'\n", IPaddr);
219
    free(s);
220
    return NULL;
221
  }
222
  s->addr.ss_family = result->ai_family;
223
  switch (result->ai_family)
224
  {
225
    case (AF_INET):
226
      ((struct sockaddr_in *)&s->addr)->sin_port = htons(port);
227
      res = inet_pton (result->ai_family, IPaddr, &((struct sockaddr_in *)&s->addr)->sin_addr);
228
    break;
229
    case (AF_INET6):
230
      ((struct sockaddr_in6 *)&s->addr)->sin6_port = htons(port);
231
      res = inet_pton (result->ai_family, IPaddr, &(((struct sockaddr_in6 *) &s->addr)->sin6_addr));
232
    break;
233
    default:
234
      fprintf(stderr, "Cannot resolve address family %d for '%s'\n", result->ai_family, IPaddr);
235
      res = 0;
236
      break;
237
  }
238
  freeaddrinfo(result);
239
  if (res != 1)
240
  {
241
    fprintf(stderr, "Could not convert address '%s'\n", IPaddr);
242
    free(s);
243

    
244
    return NULL;
245
  }
246

    
247
  s->fd = -1;
248

    
249
  return s;
250
}
251

    
252
struct nodeID *net_helper_init(const char *my_addr, int port, const char *config)
253
{
254
  int res;
255
  struct nodeID *myself;
256

    
257
  myself = create_node(my_addr, port);
258
  if (myself == NULL) {
259
    fprintf(stderr, "Error creating my socket (%s:%d)!\n", my_addr, port);
260

    
261
    return NULL;
262
  }
263
  myself->fd =  socket(myself->addr.ss_family, SOCK_DGRAM, 0);
264
  if (myself->fd < 0) {
265
    free(myself);
266

    
267
    return NULL;
268
  }
269
//  TODO:
270
//  if (addr->sa_family == AF_INET6) {
271
//      r = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
272
//  }
273

    
274
  fprintf(stderr, "My sock: %d\n", myself->fd);
275

    
276
  switch (myself->addr.ss_family)
277
  {
278
    case (AF_INET):
279
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in));
280
    break;
281
    case (AF_INET6):
282
        res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in6));
283
    break;
284
    default:
285
      fprintf(stderr, "Cannot resolve address family %d in bind\n", myself->addr.ss_family);
286
      res = 0;
287
    break;
288
  }
289

    
290
  if (res < 0) {
291
    /* bind failed: not a local address... Just close the socket! */
292
    close(myself->fd);
293
    free(myself);
294

    
295
    return NULL;
296
  }
297

    
298
  return myself;
299
}
300

    
301
void bind_msg_type (uint8_t msgtype)
302
{
303
}
304

    
305
struct my_hdr_t {
306
  uint8_t m_seq;
307
  uint8_t frag_seq;
308
  uint8_t frags;
309
} __attribute__((packed));
310

    
311
int send_to_peer(const struct nodeID *from, const struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
312
{
313
  struct msghdr msg = {0};
314
  static struct my_hdr_t my_hdr;
315
  struct iovec iov[2];
316
  int res;
317

    
318
  if (buffer_size <= 0) return -1;
319

    
320
  iov[0].iov_base = &my_hdr;
321
  iov[0].iov_len = sizeof(struct my_hdr_t);
322
  msg.msg_name = (void *)&to->addr;
323
  msg.msg_namelen = sizeof(struct sockaddr_storage);
324
  msg.msg_iovlen = 2;
325
  msg.msg_iov = iov;
326

    
327
  my_hdr.m_seq++;
328
  my_hdr.frags = (buffer_size / (MAX_MSG_SIZE)) + 1;
329
  my_hdr.frag_seq = 0;
330

    
331
  do {
332
    iov[1].iov_base = (void *)buffer_ptr;
333
    if (buffer_size > MAX_MSG_SIZE) {
334
      iov[1].iov_len = MAX_MSG_SIZE;
335
    } else {
336
      iov[1].iov_len = buffer_size;
337
    }
338
    my_hdr.frag_seq++;
339

    
340
    buffer_size -= iov[1].iov_len;
341
    buffer_ptr += iov[1].iov_len;
342
    res = sendmsg(from->fd, &msg, 0);
343

    
344
    if (res  < 0){
345
      int error = errno;
346
      fprintf(stderr,"net-helper: sendmsg failed errno %d: %s\n", error, strerror(error));
347
    }
348
  } while (buffer_size > 0);
349

    
350
  return res;
351
}
352

    
353
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
354
{
355
  int res, recv, m_seq, frag_seq;
356
  struct sockaddr_storage raddr;
357
  struct msghdr msg = {0};
358
  static struct my_hdr_t my_hdr;
359
  struct iovec iov[2];
360

    
361
  iov[0].iov_base = &my_hdr;
362
  iov[0].iov_len = sizeof(struct my_hdr_t);
363
  msg.msg_name = &raddr;
364
  msg.msg_namelen = sizeof(struct sockaddr_storage);
365
  msg.msg_iovlen = 2;
366
  msg.msg_iov = iov;
367

    
368
  *remote = malloc(sizeof(struct nodeID));
369
  if (*remote == NULL) {
370
    return -1;
371
  }
372
  memset(*remote, 0, sizeof(struct nodeID));
373

    
374
  recv = 0;
375
  m_seq = -1;
376
  frag_seq = 0;
377
  do {
378
    iov[1].iov_base = buffer_ptr;
379
    if (buffer_size > MAX_MSG_SIZE) {
380
      iov[1].iov_len = MAX_MSG_SIZE;
381
    } else {
382
      iov[1].iov_len = buffer_size;
383
    }
384
    buffer_size -= iov[1].iov_len;
385
    buffer_ptr += iov[1].iov_len;
386
    res = recvmsg(local->fd, &msg, 0);
387
    recv += (res - sizeof(struct my_hdr_t));
388
    if (m_seq != -1 && my_hdr.m_seq != m_seq) {
389
      return -1;
390
    } else {
391
      m_seq = my_hdr.m_seq;
392
    }
393
    if (my_hdr.frag_seq != frag_seq + 1) {
394
      return -1;
395
    } else {
396
     frag_seq++;
397
    }
398
  } while ((my_hdr.frag_seq < my_hdr.frags) && (buffer_size > 0));
399
  memcpy(&(*remote)->addr, &raddr, msg.msg_namelen);
400
  (*remote)->fd = -1;
401

    
402
  return recv;
403
}
404

    
405
int node_addr(const struct nodeID *s, char *addr, int len)
406
{
407
  int n;
408

    
409
        if (s && node_ip(s, addr, len)>=0)
410
        {
411
                n = snprintf(addr + strlen(addr), len - strlen(addr) - 1, ":%d", node_port(s));
412
        } else
413
                n = snprintf(addr, len , "None");
414

    
415
  return n;
416
}
417

    
418
struct nodeID *nodeid_dup(const struct nodeID *s)
419
{
420
  struct nodeID *res;
421

    
422
  res = malloc(sizeof(struct nodeID));
423
  if (res != NULL) {
424
    memcpy(res, s, sizeof(struct nodeID));
425
  }
426

    
427
  return res;
428
}
429

    
430
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
431
{
432
        return (nodeid_cmp(s1,s2) == 0);
433
//  return (memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage)) == 0);
434
}
435

    
436
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
437
{
438
        char ip1[80], ip2[80];
439
        int port1,port2,res;
440

    
441
  if(s1 && s2)
442
  {
443
    port1=node_port(s1);
444
    port2=node_port(s2);
445
    node_ip(s1,ip1,80);
446
    node_ip(s2,ip2,80);
447

    
448
  //        int res = (port1^port2)|strcmp(ip1,ip2);
449
  //        fprintf(stderr,"Comparing %s:%d and %s:%d\n",ip1,port1,ip2,port2);
450
  //        fprintf(stderr,"Result: %d\n",res);
451
    res  = strcmp(ip1,ip2);
452
    if (res!=0)
453
      return res;
454
    else
455
      return port1-port2;
456
  //                return port1 == port2 ? 0 : 1;
457

    
458
  //  return memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_storage));
459
  }
460
  else
461
    return 0;
462
}
463

    
464
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
465
{
466
  if (max_write_size < sizeof(struct sockaddr_storage)) return -1;
467

    
468
  memcpy(b, &s->addr, sizeof(struct sockaddr_storage));
469

    
470
  return sizeof(struct sockaddr_storage);
471
}
472

    
473
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
474
{
475
  struct nodeID *res;
476
  res = malloc(sizeof(struct nodeID));
477
  if (res != NULL) {
478
    memcpy(&res->addr, b, sizeof(struct sockaddr_storage));
479
    res->fd = -1;
480
  }
481
  *len = sizeof(struct sockaddr_storage);
482

    
483
  return res;
484
}
485

    
486
void nodeid_free(struct nodeID *s)
487
{
488
  if (s)
489
  {
490
    if (s->fd >= 0)
491
      close(s->fd);
492
    free(s);
493
  }
494
}
495

    
496
int node_ip(const struct nodeID *s, char *ip, int len)
497
{
498
        int res = 0;
499
  switch (s->addr.ss_family)
500
  {
501
    case AF_INET:
502
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in *)&s->addr)->sin_addr, ip, len);
503
      break;
504
    case AF_INET6:
505
      inet_ntop(s->addr.ss_family, &((const struct sockaddr_in6 *)&s->addr)->sin6_addr, ip, len);
506
      break;
507
    default:
508
                        res = -1;
509
      break;
510
  }
511
  if (!ip) {
512
          perror("inet_ntop");
513
                res = -1;
514
  }
515
  if (ip && res <0 && len)
516
    ip[0] = '\0';
517
  return res;
518
}
519

    
520
int node_port(const struct nodeID *s)
521
{
522
  int res;
523
  switch (s->addr.ss_family)
524
  {
525
    case AF_INET:
526
      res = ntohs(((const struct sockaddr_in *) &s->addr)->sin_port);
527
      break;
528
    case AF_INET6:
529
      res = ntohs(((const struct sockaddr_in6 *)&s->addr)->sin6_port);
530
      break;
531
    default:
532
      res = -1;
533
      break;
534
  }
535
  return res;
536
}