Statistics
| Branch: | Revision:

grapes / src / Chunkiser / input-stream-udp.c @ e44ff2bb

History | View | Annotate | Download (3.93 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#ifndef _WIN32
8
#include <sys/socket.h>
9
#include <netinet/in.h>
10
#else
11
#include <winsock2.h>
12
#endif
13
#include <unistd.h>
14
#include <stdlib.h>
15
#include <stdint.h>
16
#include <stdio.h>
17
#include <string.h>
18
#include <limits.h>
19
#include <string.h>
20

    
21
#include "int_coding.h"
22
#include "payload.h"
23
#include "config.h"
24
#include "chunkiser_iface.h"
25

    
26
#define UDP_PORTS_NUM_MAX 10
27
#define UDP_BUF_SIZE 65536
28

    
29
struct chunkiser_ctx {
30
  int fds[UDP_PORTS_NUM_MAX + 1];
31
  int id;
32
  uint64_t start_time;
33
  uint8_t *buff;
34
  int size;
35
  int counter;
36
  int every;
37
};
38

    
39
static int input_get_udp(uint8_t *data, int fd)
40
{
41
  ssize_t msglen;
42

    
43
  msglen = recv(fd, data, UDP_BUF_SIZE, 0);
44
  if (msglen <= 0) {
45
    return 0;
46
  }
47

    
48
  return msglen;
49
}
50

    
51
static int listen_udp(int port)
52
{
53
  struct sockaddr_in servaddr;
54
  int r;
55
  int fd;
56

    
57
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
58
  if (fd < 0) {
59
    return -1;
60
  }
61

    
62
#ifndef _WIN32
63
  fcntl(fd, F_SETFL, O_NONBLOCK);
64
#else
65
  {
66
    unsigned long nonblocking = 1;
67
    ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
68
  }
69
#endif
70

    
71
  memset(&servaddr, 0, sizeof(servaddr));
72
  servaddr.sin_family = AF_INET;
73
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
74
  servaddr.sin_port = htons(port);
75
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
76
  if (r < 0) {
77
    close(fd);
78

    
79
    return -1;
80
  }
81
  fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
82

    
83
  return fd;
84
}
85

    
86
static const int *ports_parse(const char *config)
87
{
88
  static int res[UDP_PORTS_NUM_MAX + 1];
89
  int i = 0;
90
  struct tag *cfg_tags;
91

    
92
  cfg_tags = config_parse(config);
93
  if (cfg_tags) {
94
    int j;
95

    
96
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
97
      char tag[8];
98

    
99
      sprintf(tag, "port%d", j);
100
      if (config_value_int(cfg_tags, tag, &res[i])) {
101
        i++;
102
      }
103
    }
104
  }
105
  free(cfg_tags);
106
  res[i] = -1;
107

    
108
  return res;
109
}
110

    
111
static struct chunkiser_ctx *udp_open(const char *fname, int *period, const char *config)
112
{
113
  struct chunkiser_ctx *res;
114
  struct timeval tv;
115
  int i;
116
  const int *ports;
117

    
118
  res = malloc(sizeof(struct chunkiser_ctx));
119
  if (res == NULL) {
120
    return NULL;
121
  }
122

    
123
  ports = ports_parse(config);
124
  if (ports[0] == -1) {
125
    free(res);
126

    
127
    return NULL;
128
  }
129

    
130
  for (i = 0; ports[i] >= 0; i++) {
131
    res->fds[i] = listen_udp(ports[i]);
132
    if (res->fds[i] < 0) {
133
      fprintf(stderr, "Cannot open port %d\n", ports[i]);
134
      for (; i>=0 ; i--) {
135
        close(res->fds[i]);
136
      }
137
      free(res);
138

    
139
      return NULL;
140
    }
141
  }
142
  res->fds[i] = -1;
143

    
144
  gettimeofday(&tv, NULL);
145
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
146
  res->id = 1;
147

    
148
  res->buff = NULL;
149
  res->size = 0;
150
  res->counter = 0;
151
  res->every = 1;
152
  *period = 0;
153

    
154
  return res;
155
}
156

    
157
static void udp_close(struct chunkiser_ctx  *s)
158
{
159
  int i;
160

    
161
  for (i = 0; s->fds[i] >= 0; i++) {
162
    close(s->fds[i]);
163
  }
164
  free(s);
165
}
166

    
167
static uint8_t *udp_chunkise(struct chunkiser_ctx *s, int id, int *size, uint64_t *ts)
168
{
169
  int i;
170

    
171
  if (s->buff == NULL) {
172
    s->buff = malloc(UDP_BUF_SIZE + UDP_CHUNK_HEADER_SIZE);
173
    if (s->buff == NULL) {
174
      *size = -1;
175

    
176
      return NULL;
177
    }
178
  }
179
  for (i = 0; s->fds[i] >= 0; i++) {
180
    if ((*size = input_get_udp(s->buff + s->size + UDP_CHUNK_HEADER_SIZE, s->fds[i]))) {
181
      uint8_t *res = s->buff;
182
      struct timeval now;
183

    
184
      udp_chunk_header_write(s->buff + s->size, *size, i);
185
      *size += UDP_CHUNK_HEADER_SIZE;
186
      s->size += *size;
187

    
188
      if (++s->counter % s->every) {
189
        *size = 0;
190

    
191
        return NULL;
192
      }
193

    
194
      s->size = 0;
195
      s->counter = 0;
196
      s->buff = NULL;
197
      gettimeofday(&now, NULL);
198
      *ts = now.tv_sec * 1000000ULL + now.tv_usec;
199

    
200
      return res;
201
    }
202
  }
203
  *size = 0;        // FIXME: Unneeded?
204

    
205
  return NULL;
206
}
207

    
208
const int *udp_get_fds(const struct chunkiser_ctx *s)
209
{
210
  return s->fds;
211
}
212

    
213
struct chunkiser_iface in_udp = {
214
  .open = udp_open,
215
  .close = udp_close,
216
  .chunkise = udp_chunkise,
217
  .get_fds = udp_get_fds,
218
};
219