streamers / loop.c @ a34ed273
History | View | Annotate | Download (4.72 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Luca Abeni
|
3 |
* Copyright (c) 2010 Csaba Kiraly
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <sys/select.h> |
8 |
#include <sys/time.h> |
9 |
#include <time.h> |
10 |
#include <stdint.h> |
11 |
#include <stdlib.h> |
12 |
#include <stdio.h> |
13 |
#include <stdbool.h> |
14 |
#include <string.h> |
15 |
|
16 |
#include <net_helper.h> |
17 |
#include <grapes_msg_types.h> |
18 |
#include <peerset.h> |
19 |
#include <peer.h> |
20 |
|
21 |
#include "chunk_signaling.h" |
22 |
#include "streaming.h" |
23 |
#include "topology.h" |
24 |
#include "loop.h" |
25 |
#include "dbg.h" |
26 |
|
27 |
#define BUFFSIZE 512 * 1024 |
28 |
#define FDSSIZE 16 |
29 |
static struct timeval period = {0, 500000}; |
30 |
static struct timeval tnext; |
31 |
|
32 |
#ifdef HTTPIO_MHD
|
33 |
extern pthread_mutex_t cb_mutex;
|
34 |
#endif
|
35 |
|
36 |
void tout_init(struct timeval *tv) |
37 |
{ |
38 |
struct timeval tnow;
|
39 |
|
40 |
if (tnext.tv_sec == 0) { |
41 |
gettimeofday(&tnext, NULL);
|
42 |
} |
43 |
gettimeofday(&tnow, NULL);
|
44 |
if(timercmp(&tnow, &tnext, <)) {
|
45 |
timersub(&tnext, &tnow, tv); |
46 |
} else {
|
47 |
*tv = (struct timeval){0, 0}; |
48 |
} |
49 |
} |
50 |
|
51 |
void loop(struct nodeID *s, int csize, int buff_size) |
52 |
{ |
53 |
int done = 0; |
54 |
static uint8_t buff[BUFFSIZE];
|
55 |
int cnt = 0; |
56 |
|
57 |
period.tv_sec = csize / 1000000;
|
58 |
period.tv_usec = csize % 1000000;
|
59 |
|
60 |
peers_init(); |
61 |
stream_init(buff_size, s); |
62 |
update_peers(NULL, NULL, 0); |
63 |
while (!done) {
|
64 |
int len, res;
|
65 |
struct timeval tv;
|
66 |
|
67 |
tout_init(&tv); |
68 |
res = wait4data(s, &tv, NULL);
|
69 |
if (res > 0) { |
70 |
struct nodeID *remote;
|
71 |
|
72 |
len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
73 |
if (len < 0) { |
74 |
fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
75 |
nodeid_free(remote); |
76 |
continue;
|
77 |
} |
78 |
switch (buff[0] /* Message Type */) { |
79 |
case MSG_TYPE_TMAN:
|
80 |
case MSG_TYPE_TOPOLOGY:
|
81 |
update_peers(remote, buff, len); |
82 |
break;
|
83 |
case MSG_TYPE_CHUNK:
|
84 |
dprintf("Chunk message received:\n");
|
85 |
received_chunk(remote, buff, len); |
86 |
break;
|
87 |
case MSG_TYPE_SIGNALLING:
|
88 |
sigParseData(remote, buff, len); |
89 |
break;
|
90 |
default:
|
91 |
fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
92 |
} |
93 |
nodeid_free(remote); |
94 |
} else {
|
95 |
struct timeval tmp;
|
96 |
//send_chunk();
|
97 |
send_offer(); |
98 |
if (cnt++ % 10 == 0) { |
99 |
update_peers(NULL, NULL, 0); |
100 |
} |
101 |
timeradd(&tnext, &period, &tmp); |
102 |
tnext = tmp; |
103 |
} |
104 |
} |
105 |
} |
106 |
|
107 |
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop) |
108 |
{ |
109 |
int done = 0; |
110 |
static uint8_t buff[BUFFSIZE];
|
111 |
int cnt = 0; |
112 |
int fds[FDSSIZE];
|
113 |
fds[0] = -1; |
114 |
|
115 |
period.tv_sec = csize / 1000000;
|
116 |
period.tv_usec = csize % 1000000;
|
117 |
|
118 |
peers_init(); |
119 |
|
120 |
if (source_init(fname, s, loop, fds, FDSSIZE) < 0) { |
121 |
fprintf(stderr,"Cannot initialize source, exiting");
|
122 |
return;
|
123 |
} |
124 |
while (!done) {
|
125 |
int len, res;
|
126 |
struct timeval tv, *ptv;
|
127 |
int wait4fds[FDSSIZE], *pfds;
|
128 |
|
129 |
#ifdef HTTPIO
|
130 |
memcpy(wait4fds, fds, sizeof(fds));
|
131 |
res = wait4data(s, NULL, wait4fds);
|
132 |
#else
|
133 |
if (fds[0] == -1) { |
134 |
tout_init(&tv); |
135 |
ptv = &tv; |
136 |
pfds = NULL;
|
137 |
} else {
|
138 |
memcpy(wait4fds, fds, sizeof(fds));
|
139 |
pfds = wait4fds; |
140 |
ptv = NULL;
|
141 |
} |
142 |
res = wait4data(s, ptv, pfds); |
143 |
#endif
|
144 |
if (res == 1) { |
145 |
struct nodeID *remote;
|
146 |
|
147 |
len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
148 |
if (len < 0) { |
149 |
fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
150 |
nodeid_free(remote); |
151 |
continue;
|
152 |
} |
153 |
dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote)); |
154 |
switch (buff[0] /* Message Type */) { |
155 |
case MSG_TYPE_TMAN:
|
156 |
case MSG_TYPE_TOPOLOGY:
|
157 |
fprintf(stderr, "Top Parse\n");
|
158 |
#ifdef HTTPIO_MHD
|
159 |
pthread_mutex_lock(&cb_mutex); |
160 |
#endif
|
161 |
update_peers(remote, buff, len); |
162 |
#ifdef HTTPIO_MHD
|
163 |
pthread_mutex_unlock(&cb_mutex); |
164 |
#endif
|
165 |
break;
|
166 |
case MSG_TYPE_CHUNK:
|
167 |
fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
|
168 |
break;
|
169 |
case MSG_TYPE_SIGNALLING:
|
170 |
#ifdef HTTPIO_MHD
|
171 |
pthread_mutex_lock(&cb_mutex); |
172 |
#endif
|
173 |
sigParseData(remote, buff, len); |
174 |
#ifdef HTTPIO_MHD
|
175 |
pthread_mutex_unlock(&cb_mutex); |
176 |
#endif
|
177 |
break;
|
178 |
default:
|
179 |
fprintf(stderr, "Bad Message Type %x\n", buff[0]); |
180 |
} |
181 |
nodeid_free(remote); |
182 |
} else if (res == 0 || res == 2) { //timeout or data arrived from source |
183 |
int i;
|
184 |
struct timeval tmp, d;
|
185 |
struct chunk *c;
|
186 |
|
187 |
d.tv_sec = 0;
|
188 |
c = generated_chunk(&d.tv_usec); |
189 |
if (c) {
|
190 |
add_chunk(c); |
191 |
for (i = 0; i < chunks; i++) { // @TODO: why this cycle? |
192 |
send_chunk(); |
193 |
} |
194 |
if (cnt++ % 10 == 0) { |
195 |
update_peers(NULL, NULL, 0); |
196 |
} |
197 |
} |
198 |
timeradd(&tnext, &d, &tmp); |
199 |
tnext = tmp; |
200 |
} |
201 |
} |
202 |
} |