Statistics
| Branch: | Revision:

streamers / input-udp.c @ 4c9e6c8b

History | View | Annotate | Download (4.56 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <sys/socket.h>
8
#include <netinet/in.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdint.h>
12
#include <stdio.h>
13
#include <string.h>
14
#include <limits.h>
15

    
16
#include <chunk.h>
17

    
18
#include "input.h"
19
#include "input-stream.h"
20
#include "dbg.h"
21
#include "io_udp.h"
22

    
23
#define UDP_PORTS_NUM_MAX 10
24

    
25
struct input_desc {
26
  char ip[16];
27
  int base_port;
28
  int ports;
29
  int fds[UDP_PORTS_NUM_MAX + 1];
30
  int id;
31
  int interframe;
32
  uint64_t start_time;
33
  uint8_t *data;
34
  int size;
35
  int counter;
36
  int every;
37
};
38

    
39
static int listen_udp(int port)
40
{
41
  struct sockaddr_in servaddr;
42
  int r;
43
  int fd;
44

    
45
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
46
  if (fd < 0) {
47
    return -1;
48
  }
49

    
50
  bzero(&servaddr, sizeof(servaddr));
51
  servaddr.sin_family = AF_INET;
52
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
53
  servaddr.sin_port = htons(port);
54
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
55
  if (r < 0) {
56
    close(fd);
57
    return -1;
58
  }
59

    
60
  fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
61
  return fd;
62
}
63

    
64
static int config_parse(struct input_desc *desc, const char *config)
65
{
66
  int res, i;
67

    
68
  if (!config) {
69
    return -1;
70
  }
71

    
72
  res = sscanf(config, "udp://%15[0-9.]:%d%n", desc->ip, &desc->base_port, &i);
73
  if (res < 2) {
74
    fprintf(stderr,"error parsing input config: %s\n", config);
75
    return -2;
76
  }
77
  if (*(config + i) == 0) {
78
    desc->ports = 1;
79
  } else {
80
    int max_port;
81
    res = sscanf(config + i, "-%d", &max_port);
82
    if (res == 1) {
83
      desc->ports = max_port - desc->base_port + 1;
84
      if (desc->ports <=0) {
85
        fprintf(stderr,"error parsing input config: negative port range %s\n", config + i);
86
        return -3;
87
      }
88
    } else {
89
      fprintf(stderr,"error parsing input config: bad port range %s\n", config + i);
90
      return -4;
91
    }
92
  }
93

    
94
  desc->every = 1;
95

    
96
  return 1;
97
}
98

    
99
struct input_desc *input_open(const char *config, uint16_t flags, int *fds, int fds_size)
100
{
101
  struct input_desc *res;
102
  struct timeval tv;
103
  int ret, i;
104

    
105
  res = malloc(sizeof(struct input_desc));
106
  if (res == NULL) {
107
    return NULL;
108
  }
109

    
110
  ret = config_parse(res, config);
111
  if (ret < 0) {
112
    free(res);
113
    return NULL;
114
  }
115

    
116
  if ( res->ports > UDP_PORTS_NUM_MAX) {
117
    fprintf(stderr, "UDP input supports only %d ports\n", UDP_PORTS_NUM_MAX);
118
    free(res);
119
    return NULL;
120
  }
121

    
122
  if (! fds || fds_size <= res->ports + 1) {
123
    fprintf(stderr, "UDP input module needs more then %d file descriptors\n", fds_size-1);
124
    free(res);
125
    return NULL;
126
  }
127

    
128
  for (i = 0; i < res->ports; i++) {
129
    res->fds[i] = fds[i] = listen_udp(res->base_port + i);
130
    if (fds[i] < 0) {
131
      for (; i>=0 ; i--) {
132
        close(fds[i]);
133
      }
134
      res->fds[0] = fds[0] = -1;
135
      free(res);
136
      return NULL;
137
    }
138
  }
139
  res->fds[i] = fds[i] = -1;
140

    
141
  gettimeofday(&tv, NULL);
142
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
143
  //res->id = (res->start_time / res->interframe) % INT_MAX; //FIXME
144
  res->id = 1;
145

    
146
  res->data = NULL;
147
  res->size = 0;
148
  res->counter = 0;
149

    
150
  return res;
151
}
152

    
153
void input_close(struct input_desc *s)
154
{
155
  int i;
156

    
157
  for (i = 0; i < s->ports; i++) {
158
    close(s->fds[i]);
159
  }
160
  free(s);
161
}
162

    
163
#define UDP_BUF_SIZE 65536
164
int input_get_udp(struct input_desc *s, struct chunk *c, int fd_index)
165
{
166
  uint8_t buf[UDP_BUF_SIZE];
167
  size_t buflen = UDP_BUF_SIZE;
168
  ssize_t msglen;
169
  struct timeval now;
170
  uint8_t *data;
171

    
172
  fprintf(stderr,"\treading on fd:%d (index:%d)\n", s->fds[fd_index], fd_index);
173
  msglen = recv(s->fds[fd_index], buf, buflen, MSG_DONTWAIT);
174
  if (msglen < 0) {
175
    c->data = NULL;
176
    return 0;
177
  }
178
  if (msglen == 0) {
179
    c->data = NULL;
180
    return 0;
181
  }
182
  fprintf(stderr,"\treceived %d bytes\n",msglen);
183

    
184
  s->data = realloc(s->data, s->size + sizeof(struct io_udp_header) + msglen);
185
  data = s->data + s->size;
186
  ((struct io_udp_header*)data)->size = msglen;
187
  ((struct io_udp_header*)data)->portdiff = fd_index;
188
  memcpy(data + sizeof(struct io_udp_header), buf, msglen);
189
  s->size += sizeof(struct io_udp_header) + msglen;
190

    
191
  if (++s->counter % s->every) {
192
    c->data = NULL;
193
    return 0;
194
  }
195

    
196
  c->data = s->data;
197
  s->data = NULL;
198
  c->size = s->size;
199
  s->size = 0;
200
  c->id = s->id++;
201
  c->attributes_size = 0;
202
  c->attributes = NULL;
203
  gettimeofday(&now, NULL);
204
  c->timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
205
  s->counter = 0;
206

    
207
  return 1;
208
}
209

    
210
int input_get(struct input_desc *s, struct chunk *c)
211
{
212
  int i;
213

    
214
  fprintf(stderr,"input_get called\n");
215

    
216
  for (i = 0; i < s->ports; i++) {
217
    if (input_get_udp(s, c, i)) {
218
      return 999999;
219
    }
220
  }
221
  return 999999;
222
}