Statistics
| Branch: | Revision:

grapes / src / net_helper.c @ 0bfe6568

History | View | Annotate | Download (6.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <sys/types.h>
9
#include <sys/socket.h>
10
#include <netinet/in.h>
11
#include <arpa/inet.h>
12
#include <unistd.h>
13
#include <errno.h>
14
#include <stdlib.h>
15
#include <stdio.h>
16
#include <string.h>
17

    
18
#include "net_helper.h"
19

    
20
#define MAX_MSG_SIZE 1024 * 60
21

    
22
struct nodeID {
23
  struct sockaddr_in addr;
24
  int fd;
25
};
26

    
27
int wait4data(const struct nodeID *s, struct timeval *tout, int *user_fds)
28
{
29
  fd_set fds;
30
  int i, res, max_fd;
31

    
32
  FD_ZERO(&fds);
33
  if (s) {
34
    max_fd = s->fd;
35
    FD_SET(s->fd, &fds);
36
  } else {
37
    max_fd = -1;
38
  }
39
  if (user_fds) {
40
    for (i = 0; user_fds[i] != -1; i++) {
41
      FD_SET(user_fds[i], &fds);
42
      if (user_fds[i] > max_fd) {
43
        max_fd = user_fds[i];
44
      }
45
    }
46
  }
47
  res = select(max_fd + 1, &fds, NULL, NULL, tout);
48
  if (res <= 0) {
49
    return res;
50
  }
51
  if (s && FD_ISSET(s->fd, &fds)) {
52
    return 1;
53
  }
54

    
55
  /* If execution arrives here, user_fds cannot be 0
56
     (an FD is ready, and it's not s->fd) */
57
  for (i = 0; user_fds[i] != -1; i++) {
58
    if (!FD_ISSET(user_fds[i], &fds)) {
59
      user_fds[i] = -2;
60
    }
61
  }
62

    
63
  return 2;
64
}
65

    
66
struct nodeID *create_node(const char *IPaddr, int port)
67
{
68
  struct nodeID *s;
69
  int res;
70

    
71
  s = malloc(sizeof(struct nodeID));
72
  memset(s, 0, sizeof(struct nodeID));
73
  s->addr.sin_family = AF_INET;
74
  s->addr.sin_port = htons(port);
75
  res = inet_aton(IPaddr, &s->addr.sin_addr);
76
  if (res == 0) {
77
    free(s);
78

    
79
    return NULL;
80
  }
81

    
82
  s->fd = -1;
83

    
84
  return s;
85
}
86

    
87
struct nodeID *net_helper_init(const char *my_addr, int port, const char *config)
88
{
89
  int res;
90
  struct nodeID *myself;
91

    
92
  myself = create_node(my_addr, port);
93
  if (myself == NULL) {
94
    fprintf(stderr, "Error creating my socket (%s:%d)!\n", my_addr, port);
95
  }
96
  myself->fd =  socket(AF_INET, SOCK_DGRAM, 0);
97
  if (myself->fd < 0) {
98
    free(myself);
99
    
100
    return NULL;
101
  }
102
  fprintf(stderr, "My sock: %d\n", myself->fd);
103

    
104
  res = bind(myself->fd, (struct sockaddr *)&myself->addr, sizeof(struct sockaddr_in));
105
  if (res < 0) {
106
    /* bind failed: not a local address... Just close the socket! */
107
    close(myself->fd);
108
    free(myself);
109

    
110
    return NULL;
111
  }
112

    
113
  return myself;
114
}
115

    
116
void bind_msg_type (uint8_t msgtype)
117
{
118
}
119

    
120
void reg_message_send(int size, uint8_t type);
121

    
122
struct my_hdr_t {
123
  uint8_t m_seq;
124
  uint8_t frag_seq;
125
  uint8_t frags;
126
} __attribute__((packed));
127

    
128
int send_to_peer(const struct nodeID *from, struct nodeID *to, const uint8_t *buffer_ptr, int buffer_size)
129
{
130
  static struct msghdr msg;
131
  static struct my_hdr_t my_hdr;
132
  struct iovec iov[2];
133
  int res;
134

    
135
  if (buffer_size <= 0) return;
136
  reg_message_send(buffer_size, buffer_ptr[0]);
137

    
138
  iov[0].iov_base = &my_hdr;
139
  iov[0].iov_len = sizeof(struct my_hdr_t);
140
  msg.msg_name = &to->addr;
141
  msg.msg_namelen = sizeof(struct sockaddr_in);
142
  msg.msg_iovlen = 2;
143
  msg.msg_iov = iov;
144

    
145
  my_hdr.m_seq++;
146
  my_hdr.frags = (buffer_size / (MAX_MSG_SIZE)) + 1;
147
  my_hdr.frag_seq = 0;
148

    
149
  do {
150
    iov[1].iov_base = buffer_ptr;
151
    if (buffer_size > MAX_MSG_SIZE) {
152
      iov[1].iov_len = MAX_MSG_SIZE;
153
    } else {
154
      iov[1].iov_len = buffer_size;
155
    }
156
    my_hdr.frag_seq++;
157

    
158
    buffer_size -= iov[1].iov_len;
159
    buffer_ptr += iov[1].iov_len;
160
    res = sendmsg(from->fd, &msg, 0);
161

    
162
    if (res  < 0){
163
      int error = errno;
164
      fprintf(stderr,"net-helper: sendmsg failed errno %d: %s\n", error, strerror(error));
165
    }
166
  } while (buffer_size > 0);
167

    
168
  return res;
169
}
170

    
171
void reg_message_recv(int size, uint8_t type);
172

    
173
int recv_from_peer(const struct nodeID *local, struct nodeID **remote, uint8_t *buffer_ptr, int buffer_size)
174
{
175
  int res, recv, m_seq, frag_seq;
176
  struct sockaddr_in raddr;
177
  static struct msghdr msg;
178
  static struct my_hdr_t my_hdr;
179
  struct iovec iov[2];
180
  uint8_t *buffer_ptr_orig = buffer_ptr;
181

    
182
  iov[0].iov_base = &my_hdr;
183
  iov[0].iov_len = sizeof(struct my_hdr_t);
184
  msg.msg_name = &raddr;
185
  msg.msg_namelen = sizeof(struct sockaddr_in);
186
  msg.msg_iovlen = 2;
187
  msg.msg_iov = iov;
188

    
189
  *remote = malloc(sizeof(struct nodeID));
190
  if (*remote == NULL) {
191
    return -1;
192
  }
193

    
194
  recv = 0;
195
  m_seq = -1;
196
  frag_seq = 0;
197
  do {
198
    iov[1].iov_base = buffer_ptr;
199
    if (buffer_size > MAX_MSG_SIZE) {
200
      iov[1].iov_len = MAX_MSG_SIZE;
201
    } else {
202
      iov[1].iov_len = buffer_size;
203
    }
204
    buffer_size -= iov[1].iov_len;
205
    buffer_ptr += iov[1].iov_len;
206
    res = recvmsg(local->fd, &msg, 0);
207
    recv += (res - sizeof(struct my_hdr_t));
208
    if (m_seq != -1 && my_hdr.m_seq != m_seq) {
209
      return -1;
210
    } else {
211
      m_seq = my_hdr.m_seq;
212
    }
213
    if (my_hdr.frag_seq != frag_seq + 1) {
214
      return -1;
215
    } else {
216
     frag_seq++;
217
    }
218
  } while ((my_hdr.frag_seq < my_hdr.frags) && (buffer_size > 0));
219
  memcpy(&(*remote)->addr, &raddr, msg.msg_namelen);
220
  (*remote)->fd = -1;
221

    
222
  reg_message_recv(recv, buffer_ptr_orig[0]);
223

    
224
  return recv;
225
}
226

    
227
const char *node_addr(const struct nodeID *s)
228
{
229
  static char addr[256];
230

    
231
  sprintf(addr, "%s:%d", inet_ntoa(s->addr.sin_addr), ntohs(s->addr.sin_port));
232

    
233
  return addr;
234
}
235

    
236
struct nodeID *nodeid_dup(struct nodeID *s)
237
{
238
  struct nodeID *res;
239

    
240
  res = malloc(sizeof(struct nodeID));
241
  if (res != NULL) {
242
    memcpy(res, s, sizeof(struct nodeID));
243
  }
244

    
245
  return res;
246
}
247

    
248
int nodeid_equal(const struct nodeID *s1, const struct nodeID *s2)
249
{
250
  return (memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_in)) == 0);
251
}
252

    
253
int nodeid_cmp(const struct nodeID *s1, const struct nodeID *s2)
254
{
255
  return memcmp(&s1->addr, &s2->addr, sizeof(struct sockaddr_in));
256
}
257

    
258
int nodeid_dump(uint8_t *b, const struct nodeID *s, size_t max_write_size)
259
{
260
  if (max_write_size < sizeof(struct sockaddr_in)) return -1;
261

    
262
  memcpy(b, &s->addr, sizeof(struct sockaddr_in));
263

    
264
  return sizeof(struct sockaddr_in);
265
}
266

    
267
struct nodeID *nodeid_undump(const uint8_t *b, int *len)
268
{
269
  struct nodeID *res;
270
  res = malloc(sizeof(struct nodeID));
271
  if (res != NULL) {
272
    memcpy(&res->addr, b, sizeof(struct sockaddr_in));
273
    res->fd = -1;
274
  }
275
  *len = sizeof(struct sockaddr_in);
276

    
277
  return res;
278
}
279

    
280
void nodeid_free(struct nodeID *s)
281
{
282
  free(s);
283
}
284

    
285
const char *node_ip(const struct nodeID *s)
286
{
287
  static char ip[64];
288

    
289
  sprintf(ip, "%s", inet_ntoa(s->addr.sin_addr));
290

    
291
  return ip;
292
}