Statistics
| Branch: | Revision:

napa-baselibs / tests / peer / ul / source.c @ 507372bb

History | View | Annotate | Download (1.85 KB)

1
/*
2
 * Minimalistic UserLayer for a source node
3
 * Will read the stream from an UDP socket and appends them to the ChunkBuffer
4
 */
5

    
6
#include        "peer.h"
7

    
8
#include <netinet/in.h>
9
#include <sys/socket.h>
10

    
11
#define READ_SIZE        65536
12

    
13
struct source_stream {
14
        struct bufferevent *bev;
15
        double chunk_duration;
16
        void *read_buffer;
17
        int read_size;
18
        int chunkid;
19
};
20

    
21
void read_stream(HANDLE h, void *arg) {
22
        struct source_stream *src = (struct source_stream *)arg;
23
        int ptr = 0;
24

    
25
        size_t rd = READ_SIZE;
26
        while (rd == READ_SIZE) {
27
                rd = bufferevent_read(src->bev, src->read_buffer + ptr, src->read_size - ptr);
28
                if (rd == (src->read_size - ptr)) { /* Need to grow buffer */
29
                        src->read_size += READ_SIZE;
30
                        src->read_buffer = realloc(src->read_buffer, src->read_size);
31
                        ptr += rd;
32
                }
33
        }
34
        info("New chunk(%d): %d bytes read", src->chunkid++, ptr+rd);
35
}
36

    
37
int init_source_ul(const char *stream, double chunk_duration) {
38
        char addr[128];
39
        int port;
40

    
41
        if (sscanf(stream, "udp://%127[^:]:%i", addr, &port) != 2) {
42
                fatal("Unable to parse source specification %s", stream);
43
        }
44

    
45
        struct sockaddr_in udpsrc;
46
        int returnStatus = 0;
47
        evutil_socket_t udpSocket;
48

    
49
        udpSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
50
        evutil_make_socket_nonblocking(udpSocket);
51

    
52
        udpsrc.sin_family = AF_INET;
53
        udpsrc.sin_addr.s_addr = inet_addr(addr);
54
        udpsrc.sin_port = htons(port);
55

    
56
        int status = bind(udpSocket,(struct sockaddr *)&udpsrc,sizeof(udpsrc));
57
        if (status) {
58
                fatal("Unable to bind socket to udp://%s:%d", addr, port);
59
        }
60

    
61
        struct source_stream *src = malloc(sizeof(struct source_stream));
62
        src->read_size = READ_SIZE;
63
        src->read_buffer = malloc(src->read_size);
64

    
65
        src->bev = (struct bufferevent *)bufferevent_socket_new((struct event_base *)eventbase, udpSocket, 0);
66
        bufferevent_enable(src->bev, EV_READ);
67
        src->chunk_duration = chunk_duration;
68
        napaSchedulePeriodic(NULL, 1.0/chunk_duration, read_stream, src);
69
}
70

    
71