grapes / src / Chunkiser / input-stream-udp.c @ 997c2b68
History | View | Annotate | Download (3.72 KB)
1 | 05c9dabb | Luca Abeni | /*
|
---|---|---|---|
2 | * Copyright (c) 2010 Csaba Kiraly
|
||
3 | *
|
||
4 | * This is free software; see gpl-3.0.txt
|
||
5 | */
|
||
6 | #include <sys/time.h> |
||
7 | #include <sys/socket.h> |
||
8 | #include <netinet/in.h> |
||
9 | #include <unistd.h> |
||
10 | #include <stdlib.h> |
||
11 | #include <stdint.h> |
||
12 | #include <stdio.h> |
||
13 | #include <string.h> |
||
14 | #include <limits.h> |
||
15 | 0f79759c | Luca Abeni | #include <string.h> |
16 | 05c9dabb | Luca Abeni | |
17 | 0f79759c | Luca Abeni | #include "int_coding.h" |
18 | 05c9dabb | Luca Abeni | #include "payload.h" |
19 | #include "config.h" |
||
20 | #include "chunkiser_iface.h" |
||
21 | |||
22 | #define UDP_PORTS_NUM_MAX 10 |
||
23 | #define UDP_BUF_SIZE 65536 |
||
24 | |||
25 | struct chunkiser_ctx {
|
||
26 | int fds[UDP_PORTS_NUM_MAX + 1]; |
||
27 | int id;
|
||
28 | uint64_t start_time; |
||
29 | uint8_t *buff; |
||
30 | int size;
|
||
31 | int counter;
|
||
32 | int every;
|
||
33 | }; |
||
34 | |||
35 | static int input_get_udp(uint8_t *data, int fd) |
||
36 | { |
||
37 | ssize_t msglen; |
||
38 | |||
39 | msglen = recv(fd, data, UDP_BUF_SIZE, MSG_DONTWAIT); |
||
40 | if (msglen <= 0) { |
||
41 | return 0; |
||
42 | } |
||
43 | |||
44 | 5da67049 | Luca Abeni | return msglen;
|
45 | 05c9dabb | Luca Abeni | } |
46 | |||
47 | static int listen_udp(int port) |
||
48 | { |
||
49 | struct sockaddr_in servaddr;
|
||
50 | int r;
|
||
51 | int fd;
|
||
52 | |||
53 | fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); |
||
54 | if (fd < 0) { |
||
55 | return -1; |
||
56 | } |
||
57 | |||
58 | bzero(&servaddr, sizeof(servaddr));
|
||
59 | servaddr.sin_family = AF_INET; |
||
60 | servaddr.sin_addr.s_addr = htonl(INADDR_ANY); |
||
61 | servaddr.sin_port = htons(port); |
||
62 | r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)); |
||
63 | if (r < 0) { |
||
64 | close(fd); |
||
65 | |||
66 | return -1; |
||
67 | } |
||
68 | fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
|
||
69 | |||
70 | return fd;
|
||
71 | } |
||
72 | |||
73 | static const int *ports_parse(const char *config) |
||
74 | { |
||
75 | static int res[UDP_PORTS_NUM_MAX + 1]; |
||
76 | int i = 0; |
||
77 | struct tag *cfg_tags;
|
||
78 | |||
79 | cfg_tags = config_parse(config); |
||
80 | if (cfg_tags) {
|
||
81 | int j;
|
||
82 | |||
83 | for (j = 0; j < UDP_PORTS_NUM_MAX; j++) { |
||
84 | char tag[8]; |
||
85 | |||
86 | sprintf(tag, "port%d", j);
|
||
87 | if (config_value_int(cfg_tags, tag, &res[i])) {
|
||
88 | i++; |
||
89 | } |
||
90 | } |
||
91 | } |
||
92 | 47e5028e | Luca Abeni | free(cfg_tags); |
93 | 05c9dabb | Luca Abeni | res[i] = -1;
|
94 | |||
95 | return res;
|
||
96 | } |
||
97 | |||
98 | static struct chunkiser_ctx *udp_open(const char *fname, int *period, const char *config) |
||
99 | { |
||
100 | struct chunkiser_ctx *res;
|
||
101 | struct timeval tv;
|
||
102 | int i;
|
||
103 | const int *ports; |
||
104 | |||
105 | res = malloc(sizeof(struct chunkiser_ctx)); |
||
106 | if (res == NULL) { |
||
107 | return NULL; |
||
108 | } |
||
109 | |||
110 | ports = ports_parse(config); |
||
111 | if (ports[0] == -1) { |
||
112 | free(res); |
||
113 | |||
114 | return NULL; |
||
115 | } |
||
116 | |||
117 | for (i = 0; ports[i] >= 0; i++) { |
||
118 | res->fds[i] = listen_udp(ports[i]); |
||
119 | if (res->fds[i] < 0) { |
||
120 | fprintf(stderr, "Cannot open port %d\n", ports[i]);
|
||
121 | for (; i>=0 ; i--) { |
||
122 | close(res->fds[i]); |
||
123 | } |
||
124 | free(res); |
||
125 | |||
126 | return NULL; |
||
127 | } |
||
128 | } |
||
129 | res->fds[i] = -1;
|
||
130 | |||
131 | gettimeofday(&tv, NULL);
|
||
132 | res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
|
||
133 | res->id = 1;
|
||
134 | |||
135 | res->buff = NULL;
|
||
136 | res->size = 0;
|
||
137 | res->counter = 0;
|
||
138 | res->every = 1;
|
||
139 | 866d93ee | Luca Abeni | *period = 0;
|
140 | 05c9dabb | Luca Abeni | |
141 | return res;
|
||
142 | } |
||
143 | |||
144 | static void udp_close(struct chunkiser_ctx *s) |
||
145 | { |
||
146 | int i;
|
||
147 | |||
148 | for (i = 0; s->fds[i] >= 0; i++) { |
||
149 | close(s->fds[i]); |
||
150 | } |
||
151 | free(s); |
||
152 | } |
||
153 | |||
154 | static uint8_t *udp_chunkise(struct chunkiser_ctx *s, int id, int *size, uint64_t *ts) |
||
155 | { |
||
156 | int i;
|
||
157 | |||
158 | if (s->buff == NULL) { |
||
159 | s->buff = malloc(UDP_BUF_SIZE + UDP_CHUNK_HEADER_SIZE); |
||
160 | if (s->buff == NULL) { |
||
161 | *size = -1;
|
||
162 | |||
163 | return NULL; |
||
164 | } |
||
165 | } |
||
166 | for (i = 0; s->fds[i] >= 0; i++) { |
||
167 | if ((*size = input_get_udp(s->buff + s->size + UDP_CHUNK_HEADER_SIZE, s->fds[i]))) {
|
||
168 | uint8_t *res = s->buff; |
||
169 | struct timeval now;
|
||
170 | |||
171 | udp_chunk_header_write(s->buff + s->size, *size, i); |
||
172 | 5da67049 | Luca Abeni | *size += UDP_CHUNK_HEADER_SIZE; |
173 | s->size += *size; |
||
174 | 05c9dabb | Luca Abeni | |
175 | if (++s->counter % s->every) {
|
||
176 | *size = 0;
|
||
177 | |||
178 | return NULL; |
||
179 | } |
||
180 | |||
181 | s->size = 0;
|
||
182 | s->counter = 0;
|
||
183 | s->buff = NULL;
|
||
184 | gettimeofday(&now, NULL);
|
||
185 | *ts = now.tv_sec * 1000000ULL + now.tv_usec;
|
||
186 | |||
187 | return res;
|
||
188 | } |
||
189 | } |
||
190 | *size = 0; // FIXME: Unneeded? |
||
191 | |||
192 | return NULL; |
||
193 | } |
||
194 | |||
195 | const int *udp_get_fds(const struct chunkiser_ctx *s) |
||
196 | { |
||
197 | return s->fds;
|
||
198 | } |
||
199 | |||
200 | struct chunkiser_iface in_udp = {
|
||
201 | .open = udp_open, |
||
202 | .close = udp_close, |
||
203 | .chunkise = udp_chunkise, |
||
204 | .get_fds = udp_get_fds, |
||
205 | }; |