Statistics
| Branch: | Revision:

grapes / src / Chunkiser / input-stream-udp.c @ 997c2b68

History | View | Annotate | Download (3.72 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <sys/socket.h>
8
#include <netinet/in.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdint.h>
12
#include <stdio.h>
13
#include <string.h>
14
#include <limits.h>
15
#include <string.h>
16

    
17
#include "int_coding.h"
18
#include "payload.h"
19
#include "config.h"
20
#include "chunkiser_iface.h"
21

    
22
#define UDP_PORTS_NUM_MAX 10
23
#define UDP_BUF_SIZE 65536
24

    
25
struct chunkiser_ctx {
26
  int fds[UDP_PORTS_NUM_MAX + 1];
27
  int id;
28
  uint64_t start_time;
29
  uint8_t *buff;
30
  int size;
31
  int counter;
32
  int every;
33
};
34

    
35
static int input_get_udp(uint8_t *data, int fd)
36
{
37
  ssize_t msglen;
38

    
39
  msglen = recv(fd, data, UDP_BUF_SIZE, MSG_DONTWAIT);
40
  if (msglen <= 0) {
41
    return 0;
42
  }
43

    
44
  return msglen;
45
}
46

    
47
static int listen_udp(int port)
48
{
49
  struct sockaddr_in servaddr;
50
  int r;
51
  int fd;
52

    
53
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
54
  if (fd < 0) {
55
    return -1;
56
  }
57

    
58
  bzero(&servaddr, sizeof(servaddr));
59
  servaddr.sin_family = AF_INET;
60
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
61
  servaddr.sin_port = htons(port);
62
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
63
  if (r < 0) {
64
    close(fd);
65

    
66
    return -1;
67
  }
68
  fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
69

    
70
  return fd;
71
}
72

    
73
static const int *ports_parse(const char *config)
74
{
75
  static int res[UDP_PORTS_NUM_MAX + 1];
76
  int i = 0;
77
  struct tag *cfg_tags;
78

    
79
  cfg_tags = config_parse(config);
80
  if (cfg_tags) {
81
    int j;
82

    
83
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
84
      char tag[8];
85

    
86
      sprintf(tag, "port%d", j);
87
      if (config_value_int(cfg_tags, tag, &res[i])) {
88
        i++;
89
      }
90
    }
91
  }
92
  free(cfg_tags);
93
  res[i] = -1;
94

    
95
  return res;
96
}
97

    
98
static struct chunkiser_ctx *udp_open(const char *fname, int *period, const char *config)
99
{
100
  struct chunkiser_ctx *res;
101
  struct timeval tv;
102
  int i;
103
  const int *ports;
104

    
105
  res = malloc(sizeof(struct chunkiser_ctx));
106
  if (res == NULL) {
107
    return NULL;
108
  }
109

    
110
  ports = ports_parse(config);
111
  if (ports[0] == -1) {
112
    free(res);
113

    
114
    return NULL;
115
  }
116

    
117
  for (i = 0; ports[i] >= 0; i++) {
118
    res->fds[i] = listen_udp(ports[i]);
119
    if (res->fds[i] < 0) {
120
      fprintf(stderr, "Cannot open port %d\n", ports[i]);
121
      for (; i>=0 ; i--) {
122
        close(res->fds[i]);
123
      }
124
      free(res);
125

    
126
      return NULL;
127
    }
128
  }
129
  res->fds[i] = -1;
130

    
131
  gettimeofday(&tv, NULL);
132
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
133
  res->id = 1;
134

    
135
  res->buff = NULL;
136
  res->size = 0;
137
  res->counter = 0;
138
  res->every = 1;
139
  *period = 0;
140

    
141
  return res;
142
}
143

    
144
static void udp_close(struct chunkiser_ctx  *s)
145
{
146
  int i;
147

    
148
  for (i = 0; s->fds[i] >= 0; i++) {
149
    close(s->fds[i]);
150
  }
151
  free(s);
152
}
153

    
154
static uint8_t *udp_chunkise(struct chunkiser_ctx *s, int id, int *size, uint64_t *ts)
155
{
156
  int i;
157

    
158
  if (s->buff == NULL) {
159
    s->buff = malloc(UDP_BUF_SIZE + UDP_CHUNK_HEADER_SIZE);
160
    if (s->buff == NULL) {
161
      *size = -1;
162

    
163
      return NULL;
164
    }
165
  }
166
  for (i = 0; s->fds[i] >= 0; i++) {
167
    if ((*size = input_get_udp(s->buff + s->size + UDP_CHUNK_HEADER_SIZE, s->fds[i]))) {
168
      uint8_t *res = s->buff;
169
      struct timeval now;
170

    
171
      udp_chunk_header_write(s->buff + s->size, *size, i);
172
      *size += UDP_CHUNK_HEADER_SIZE;
173
      s->size += *size;
174

    
175
      if (++s->counter % s->every) {
176
        *size = 0;
177

    
178
        return NULL;
179
      }
180

    
181
      s->size = 0;
182
      s->counter = 0;
183
      s->buff = NULL;
184
      gettimeofday(&now, NULL);
185
      *ts = now.tv_sec * 1000000ULL + now.tv_usec;
186

    
187
      return res;
188
    }
189
  }
190
  *size = 0;        // FIXME: Unneeded?
191

    
192
  return NULL;
193
}
194

    
195
const int *udp_get_fds(const struct chunkiser_ctx *s)
196
{
197
  return s->fds;
198
}
199

    
200
struct chunkiser_iface in_udp = {
201
  .open = udp_open,
202
  .close = udp_close,
203
  .chunkise = udp_chunkise,
204
  .get_fds = udp_get_fds,
205
};
206