napa-baselibs / tests / peer / ul / source.c @ 507372bb
History | View | Annotate | Download (1.85 KB)
1 |
/*
|
---|---|
2 |
* Minimalistic UserLayer for a source node
|
3 |
* Will read the stream from an UDP socket and appends them to the ChunkBuffer
|
4 |
*/
|
5 |
|
6 |
#include "peer.h" |
7 |
|
8 |
#include <netinet/in.h> |
9 |
#include <sys/socket.h> |
10 |
|
11 |
#define READ_SIZE 65536 |
12 |
|
13 |
struct source_stream {
|
14 |
struct bufferevent *bev;
|
15 |
double chunk_duration;
|
16 |
void *read_buffer;
|
17 |
int read_size;
|
18 |
int chunkid;
|
19 |
}; |
20 |
|
21 |
void read_stream(HANDLE h, void *arg) { |
22 |
struct source_stream *src = (struct source_stream *)arg; |
23 |
int ptr = 0; |
24 |
|
25 |
size_t rd = READ_SIZE; |
26 |
while (rd == READ_SIZE) {
|
27 |
rd = bufferevent_read(src->bev, src->read_buffer + ptr, src->read_size - ptr); |
28 |
if (rd == (src->read_size - ptr)) { /* Need to grow buffer */ |
29 |
src->read_size += READ_SIZE; |
30 |
src->read_buffer = realloc(src->read_buffer, src->read_size); |
31 |
ptr += rd; |
32 |
} |
33 |
} |
34 |
info("New chunk(%d): %d bytes read", src->chunkid++, ptr+rd);
|
35 |
} |
36 |
|
37 |
int init_source_ul(const char *stream, double chunk_duration) { |
38 |
char addr[128]; |
39 |
int port;
|
40 |
|
41 |
if (sscanf(stream, "udp://%127[^:]:%i", addr, &port) != 2) { |
42 |
fatal("Unable to parse source specification %s", stream);
|
43 |
} |
44 |
|
45 |
struct sockaddr_in udpsrc;
|
46 |
int returnStatus = 0; |
47 |
evutil_socket_t udpSocket; |
48 |
|
49 |
udpSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); |
50 |
evutil_make_socket_nonblocking(udpSocket); |
51 |
|
52 |
udpsrc.sin_family = AF_INET; |
53 |
udpsrc.sin_addr.s_addr = inet_addr(addr); |
54 |
udpsrc.sin_port = htons(port); |
55 |
|
56 |
int status = bind(udpSocket,(struct sockaddr *)&udpsrc,sizeof(udpsrc)); |
57 |
if (status) {
|
58 |
fatal("Unable to bind socket to udp://%s:%d", addr, port);
|
59 |
} |
60 |
|
61 |
struct source_stream *src = malloc(sizeof(struct source_stream)); |
62 |
src->read_size = READ_SIZE; |
63 |
src->read_buffer = malloc(src->read_size); |
64 |
|
65 |
src->bev = (struct bufferevent *)bufferevent_socket_new((struct event_base *)eventbase, udpSocket, 0); |
66 |
bufferevent_enable(src->bev, EV_READ); |
67 |
src->chunk_duration = chunk_duration; |
68 |
napaSchedulePeriodic(NULL, 1.0/chunk_duration, read_stream, src); |
69 |
} |
70 |
|
71 |
|