Statistics
| Branch: | Revision:

grapes / src / Chunkiser / input-stream-udp.c @ 176b8de8

History | View | Annotate | Download (3.97 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#ifndef _WIN32
8
#include <sys/socket.h>
9
#include <netinet/in.h>
10
#include <fcntl.h>
11
#else
12
#include <winsock2.h>
13
#endif
14
#include <unistd.h>
15
#include <stdlib.h>
16
#include <stdint.h>
17
#include <stdio.h>
18
#include <string.h>
19
#include <limits.h>
20
#include <string.h>
21

    
22
#include "int_coding.h"
23
#include "payload.h"
24
#include "grapes_config.h"
25
#include "chunkiser_iface.h"
26

    
27
#define UDP_PORTS_NUM_MAX 10
28
#define UDP_BUF_SIZE 65536
29

    
30
struct chunkiser_ctx {
31
  int fds[UDP_PORTS_NUM_MAX + 1];
32
  int id;
33
  uint64_t start_time;
34
  uint8_t *buff;
35
  int size;
36
  int counter;
37
  int every;
38
};
39

    
40
static int input_get_udp(uint8_t *data, int fd)
41
{
42
  ssize_t msglen;
43

    
44
  msglen = recv(fd, data, UDP_BUF_SIZE, 0);
45
  if (msglen <= 0) {
46
    return 0;
47
  }
48

    
49
  return msglen;
50
}
51

    
52
static int listen_udp(int port)
53
{
54
  struct sockaddr_in servaddr;
55
  int r;
56
  int fd;
57

    
58
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
59
  if (fd < 0) {
60
    return -1;
61
  }
62

    
63
#ifndef _WIN32
64
  fcntl(fd, F_SETFL, O_NONBLOCK);
65
#else
66
  {
67
    unsigned long nonblocking = 1;
68
    ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
69
  }
70
#endif
71

    
72
  memset(&servaddr, 0, sizeof(servaddr));
73
  servaddr.sin_family = AF_INET;
74
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
75
  servaddr.sin_port = htons(port);
76
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
77
  if (r < 0) {
78
    close(fd);
79

    
80
    return -1;
81
  }
82
  fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
83

    
84
  return fd;
85
}
86

    
87
static const int *ports_parse(const char *config)
88
{
89
  static int res[UDP_PORTS_NUM_MAX + 1];
90
  int i = 0;
91
  struct tag *cfg_tags;
92

    
93
  cfg_tags = grapes_config_parse(config);
94
  if (cfg_tags) {
95
    int j;
96

    
97
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
98
      char tag[8];
99

    
100
      sprintf(tag, "port%d", j);
101
      if (grapes_config_value_int(cfg_tags, tag, &res[i])) {
102
        i++;
103
      }
104
    }
105
  }
106
  free(cfg_tags);
107
  res[i] = -1;
108

    
109
  return res;
110
}
111

    
112
static struct chunkiser_ctx *udp_open(const char *fname, int *period, const char *config)
113
{
114
  struct chunkiser_ctx *res;
115
  struct timeval tv;
116
  int i;
117
  const int *ports;
118

    
119
  res = malloc(sizeof(struct chunkiser_ctx));
120
  if (res == NULL) {
121
    return NULL;
122
  }
123

    
124
  ports = ports_parse(config);
125
  if (ports[0] == -1) {
126
    free(res);
127

    
128
    return NULL;
129
  }
130

    
131
  for (i = 0; ports[i] >= 0; i++) {
132
    res->fds[i] = listen_udp(ports[i]);
133
    if (res->fds[i] < 0) {
134
      fprintf(stderr, "Cannot open port %d\n", ports[i]);
135
      for (; i>=0 ; i--) {
136
        close(res->fds[i]);
137
      }
138
      free(res);
139

    
140
      return NULL;
141
    }
142
  }
143
  res->fds[i] = -1;
144

    
145
  gettimeofday(&tv, NULL);
146
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
147
  res->id = 1;
148

    
149
  res->buff = NULL;
150
  res->size = 0;
151
  res->counter = 0;
152
  res->every = 1;
153
  *period = 0;
154

    
155
  return res;
156
}
157

    
158
static void udp_close(struct chunkiser_ctx  *s)
159
{
160
  int i;
161

    
162
  for (i = 0; s->fds[i] >= 0; i++) {
163
    close(s->fds[i]);
164
  }
165
  free(s);
166
}
167

    
168
static uint8_t *udp_chunkise(struct chunkiser_ctx *s, int id, int *size, uint64_t *ts)
169
{
170
  int i;
171

    
172
  if (s->buff == NULL) {
173
    s->buff = malloc(UDP_BUF_SIZE + UDP_PAYLOAD_HEADER_SIZE);
174
    if (s->buff == NULL) {
175
      *size = -1;
176

    
177
      return NULL;
178
    }
179
  }
180
  for (i = 0; s->fds[i] >= 0; i++) {
181
    if ((*size = input_get_udp(s->buff + s->size + UDP_PAYLOAD_HEADER_SIZE, s->fds[i]))) {
182
      uint8_t *res = s->buff;
183
      struct timeval now;
184

    
185
      udp_payload_header_write(s->buff + s->size, *size, i);
186
      *size += UDP_PAYLOAD_HEADER_SIZE;
187
      s->size += *size;
188

    
189
      if (++s->counter % s->every) {
190
        *size = 0;
191

    
192
        return NULL;
193
      }
194

    
195
      s->size = 0;
196
      s->counter = 0;
197
      s->buff = NULL;
198
      gettimeofday(&now, NULL);
199
      *ts = now.tv_sec * 1000000ULL + now.tv_usec;
200

    
201
      return res;
202
    }
203
  }
204
  *size = 0;        // FIXME: Unneeded?
205

    
206
  return NULL;
207
}
208

    
209
const int *udp_get_fds(const struct chunkiser_ctx *s)
210
{
211
  return s->fds;
212
}
213

    
214
struct chunkiser_iface in_udp = {
215
  .open = udp_open,
216
  .close = udp_close,
217
  .chunkise = udp_chunkise,
218
  .get_fds = udp_get_fds,
219
};
220