Statistics
| Branch: | Revision:

napa-baselibs / tests / Broadcaster / source.c @ 507372bb

History | View | Annotate | Download (5.9 KB)

1
/*
2
 * % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
3
 *
4
 * User layer services the Peer implementation for the Broadcaster demo application
5
 *
6
 */
7

    
8
#include <netinet/in.h>
9
#include <sys/socket.h>
10
#include <time.h>
11

    
12
#include "peer.h"
13

    
14
#include <chunk.h>
15
#include <mon.h>
16
#include <ul.h>
17

    
18
#define MAX_PACKET_SIZE                        2048
19
#define INITIAL_PKTBUFFER_NUM        10
20

    
21
/** Configuration options for the Source */
22
cfg_opt_t cfg_source[] = {
23
        CFG_STR("stream", NULL, CFGF_NONE),
24
        CFG_FLOAT("chunk_duration", 1.0, CFGF_NONE),
25
        CFG_END()
26
};
27

    
28
/** This function gets called periodically and arranges the publishing of a
29
 * MeasurementRecord with SrcLag = 0.0 
30
 */
31
void periodicPublishSrcLag(HANDLE h, void *arg) {
32
        MeasurementRecord mr;
33
        memset(&mr, 0, sizeof(mr));
34
        mr.published_name = "SrcLag";
35
        mr.value = 0.0;
36
        mr.originator = mr.targetA = mr.targetB = LocalPeerID;
37
        mr.channel = channel;
38
        publishMeasurementRecord(&mr, "SrcLag");
39
}
40

    
41
void src_init(cfg_t *source_cfg) {
42
        char *stream = cfg_getstr(source_cfg, "stream");
43
        if (!stream) return;
44
        crcInit();
45

    
46
        if (!strncmp("udp://", stream, 6)) {
47
                double chunk_duration = cfg_getfloat(source_cfg,"chunk_duration");
48
                info("Initializing SOURCE mode: tuning to stream %s, sampling rate is "
49
                        "%.2lf Hz", stream, (double)1.0/chunk_duration);
50
                init_source_ul(stream, chunk_duration);
51
        } else if (!strncmp("http://", stream, 7)) {
52
                char addr[128];
53
                int port,i;
54

    
55
                if (sscanf(stream, "http://%127[^:]:%i", addr, &port) != 2) 
56
                        fatal("Unable to parse source specification %s", stream);
57

    
58
                if (ulChunkReceiverSetup(addr, port)) 
59
                        fatal("Unable to start the HTTP receiver to http://%s:%d", 
60
                                        addr, port);
61
        } else fatal("Unable to parse stream specification %s", stream);
62
        napaSchedulePeriodic(NULL, 1/60.0, periodicPublishSrcLag, NULL);
63
}
64

    
65
struct source_stream {
66
        int udpSocket;
67
        double chunk_duration;
68
        int chunkid;
69
        void **buffers;
70
        int *bufferlengths;
71
        int num_buffers;
72
        int atts_size;
73
        char *atts;
74
};
75

    
76
/** This is our very primitive "Chunkizer" */
77
void read_stream(HANDLE h, void *arg) {
78
        struct source_stream *src = (struct source_stream *)arg;
79

    
80
        int num = 1;
81
        struct timeval tv_start, tv_end;
82
        double start, end, diff;
83

    
84
        int rd = recv(src->udpSocket, src->buffers[0], MAX_PACKET_SIZE, MSG_DONTWAIT);
85
        gettimeofday(&tv_start, NULL);
86
        start = tv_start.tv_sec + ((float)tv_start.tv_usec / 1000000);
87

    
88
        src->bufferlengths[0] = rd;
89
        size_t total = (rd <= 0) ? 0 : rd;
90
        while (rd > 0)  {
91

    
92
                //fill packet size info for attributes header
93
                if (num == 1) sprintf(src->atts, "%6d#0.000000$", rd);
94
                else {
95
                        char buff1[8];
96
                        sprintf(buff1, "%6d#",rd);
97
                        strcat(src->atts, buff1);
98
                        //fill packet time length info for attributes header
99
                        char buff2[10];
100
                        sprintf(buff2, "%8.6f$", diff);
101
                        strcat(src->atts, buff2);
102
                }
103

    
104
                rd = recv(src->udpSocket, src->buffers[num], MAX_PACKET_SIZE, MSG_DONTWAIT);
105
                gettimeofday(&tv_end, NULL);
106
                end = tv_end.tv_sec + ((float)tv_end.tv_usec / 1000000);
107
                diff = end - start;
108
                gettimeofday(&tv_start, NULL);
109
                start = tv_start.tv_sec + ((float)tv_start.tv_usec / 1000000);
110

    
111
                src->bufferlengths[num] = rd;
112
                if (rd <= 0) break;
113
                total += rd;
114
                if (++num == src->num_buffers) {
115
                        src->buffers = realloc(src->buffers, sizeof(void *) *
116
                                        (1 + src->num_buffers));
117
                        src->atts = realloc(src->atts, sizeof(char) * ((ATTRIBUTES_HEADER_SIZE *
118
                                        (1 + src->num_buffers)) + 1));
119
                        src->bufferlengths = realloc(src->bufferlengths, sizeof(int) *
120
                                        (1 + src->num_buffers));
121
                        src->bufferlengths[src->num_buffers] = 0;
122
                        src->buffers[src->num_buffers] = malloc(MAX_PACKET_SIZE);
123
                        src->num_buffers++;
124
                }
125
        } 
126

    
127
        if (total <= 0) return;
128

    
129
        /* and now we create a new chunk */
130
        void *data = malloc(total);
131
        int ptr = 0;
132
        int i;
133
        for (i = 0; i != num; i++) {
134
                memcpy(data + ptr, src->buffers[i], src->bufferlengths[i]);
135
                ptr += src->bufferlengths[i];
136
        }
137

    
138
        //fill packet checksum info for attributes header
139
        crc datacrc;
140
        datacrc = crcFast(data, total);
141
        char buff3[12];
142
        sprintf(buff3, "%10X$", (unsigned int)datacrc);
143
        src->atts = realloc(src->atts, (ATTRIBUTES_HEADER_SIZE * src->num_buffers) + 12);
144
        strcat(src->atts,buff3);
145

    
146
        src->atts_size = (num * sizeof(char) * ATTRIBUTES_HEADER_SIZE) + 12;
147
        struct chunk *c;
148
        if (src->atts_size > 0) {//chunk is encoded (timestamp is the chunk duration in milliseconds)
149
                c = chbCreateChunkWithAtts(false, data, total, src->chunkid,
150
                                (int)(1000000*src->chunk_duration), src->atts, src->atts_size);
151
        }
152
        else {//raw data chunk (if packets were received this never happens)
153
                c = chbCreateChunk(false, data, total, src->chunkid,
154
                                40*src->chunkid);
155
        }
156
        info("Adding chunk(%d): %d bytes, %d packets", src->chunkid++, total,
157
                        num);
158

    
159
        if (c) chbAddChunk(chunkbuffer, c);
160

    
161
        //...
162
        //char *strs = c->attributes;
163
        //info("c->attributes:%s",strs);
164
}
165

    
166
int init_source_ul(const char *stream, double chunk_duration) {
167
        char addr[128];
168
        int port,i;
169

    
170
        if (sscanf(stream, "udp://%127[^:]:%i", addr, &port) != 2) {
171
                fatal("Unable to parse source specification %s", stream);
172
        }
173

    
174
        struct source_stream *src = malloc(sizeof(struct source_stream));
175
        struct sockaddr_in udpsrc;
176
        int returnStatus = 0;
177

    
178
        src->udpSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
179
        udpsrc.sin_family = AF_INET;
180
        udpsrc.sin_addr.s_addr = inet_addr(addr);
181
        udpsrc.sin_port = htons(port);
182

    
183
        int status = bind(src->udpSocket,(struct sockaddr *)&udpsrc,sizeof(udpsrc));
184
        if (status) {
185
                fatal("Unable to bind socket to udp://%s:%d", addr, port);
186
        }
187

    
188
        src->chunk_duration = chunk_duration;
189
        src->chunkid = 0;
190
        src->buffers = calloc(sizeof(void *), INITIAL_PKTBUFFER_NUM);
191
        src->atts = calloc(sizeof(char), ATTRIBUTES_HEADER_SIZE * INITIAL_PKTBUFFER_NUM);
192
        src->atts = realloc(src->atts, sizeof(char) * ((ATTRIBUTES_HEADER_SIZE *
193
                        INITIAL_PKTBUFFER_NUM) + 1));
194
        src->bufferlengths = calloc(sizeof(int), INITIAL_PKTBUFFER_NUM);
195
        for (src->num_buffers = 0; src->num_buffers != INITIAL_PKTBUFFER_NUM; src->num_buffers++)
196
                src->buffers[src->num_buffers] = malloc(MAX_PACKET_SIZE);
197
        src->atts_size = 0;
198

    
199
        napaSchedulePeriodic(NULL, 1.0/chunk_duration, read_stream, src);
200
}
201

    
202

    
203