streamers / loop.c @ 03de31e0
History | View | Annotate | Download (4.97 KB)
1 | 8fed7779 | CsabaKiraly | /*
|
---|---|---|---|
2 | * Copyright (c) 2010 Luca Abeni
|
||
3 | * Copyright (c) 2010 Csaba Kiraly
|
||
4 | *
|
||
5 | * This is free software; see gpl-3.0.txt
|
||
6 | */
|
||
7 | 89e893e2 | Luca | #include <sys/select.h> |
8 | #include <sys/time.h> |
||
9 | #include <time.h> |
||
10 | #include <stdint.h> |
||
11 | #include <stdlib.h> |
||
12 | #include <stdio.h> |
||
13 | 30a6e902 | Csaba Kiraly | #include <stdbool.h> |
14 | c86d8685 | CsabaKiraly | #include <string.h> |
15 | 89e893e2 | Luca | |
16 | #include <net_helper.h> |
||
17 | e28a1487 | CsabaKiraly | #include <grapes_msg_types.h> |
18 | 3df216c8 | Csaba Kiraly | #include <peerset.h> |
19 | 018c744a | Csaba Kiraly | #include <peer.h> |
20 | 89e893e2 | Luca | |
21 | 63ebb93d | CsabaKiraly | #include "compatibility/timer.h" |
22 | |||
23 | e223dc99 | Csaba Kiraly | #include "chunk_signaling.h" |
24 | ada339a0 | ArpadBakay | #include "streaming.h" |
25 | 74a5d4ae | CsabaKiraly | #include "topology.h" |
26 | 89e893e2 | Luca | #include "loop.h" |
27 | ce80b058 | Luca Abeni | #include "dbg.h" |
28 | 89e893e2 | Luca | |
29 | 625dbc20 | Csaba Kiraly | #define BUFFSIZE 512 * 1024 |
30 | c9370421 | CsabaKiraly | #define FDSSIZE 16 |
31 | 89e893e2 | Luca | static struct timeval period = {0, 500000}; |
32 | static struct timeval tnext; |
||
33 | e223dc99 | Csaba Kiraly | |
34 | 5805c339 | GiuseppeTropea | #ifdef HTTPIO_MHD
|
35 | 03dca3bf | ArpadBakay | extern pthread_mutex_t cb_mutex;
|
36 | #endif
|
||
37 | |||
38 | 89e893e2 | Luca | void tout_init(struct timeval *tv) |
39 | { |
||
40 | struct timeval tnow;
|
||
41 | |||
42 | if (tnext.tv_sec == 0) { |
||
43 | gettimeofday(&tnext, NULL);
|
||
44 | } |
||
45 | gettimeofday(&tnow, NULL);
|
||
46 | if(timercmp(&tnow, &tnext, <)) {
|
||
47 | timersub(&tnext, &tnow, tv); |
||
48 | } else {
|
||
49 | *tv = (struct timeval){0, 0}; |
||
50 | } |
||
51 | } |
||
52 | |||
53 | 0a40460a | Luca | void loop(struct nodeID *s, int csize, int buff_size) |
54 | 89e893e2 | Luca | { |
55 | int done = 0; |
||
56 | static uint8_t buff[BUFFSIZE];
|
||
57 | int cnt = 0; |
||
58 | |||
59 | period.tv_sec = csize / 1000000;
|
||
60 | period.tv_usec = csize % 1000000;
|
||
61 | |||
62 | fcb5c29b | Csaba Kiraly | peers_init(); |
63 | 0a40460a | Luca | stream_init(buff_size, s); |
64 | cf820c66 | Csaba Kiraly | update_peers(NULL, NULL, 0); |
65 | 89e893e2 | Luca | while (!done) {
|
66 | 4136911a | Luca Abeni | int len, res;
|
67 | struct timeval tv;
|
||
68 | 89e893e2 | Luca | |
69 | 4136911a | Luca Abeni | tout_init(&tv); |
70 | 08495d05 | Csaba Kiraly | res = wait4data(s, &tv, NULL);
|
71 | 4136911a | Luca Abeni | if (res > 0) { |
72 | 74a5d4ae | CsabaKiraly | struct nodeID *remote;
|
73 | 89e893e2 | Luca | |
74 | 4136911a | Luca Abeni | len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
75 | 525bb486 | Csaba Kiraly | if (len < 0) { |
76 | fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
||
77 | nodeid_free(remote); |
||
78 | continue;
|
||
79 | } |
||
80 | 89e893e2 | Luca | switch (buff[0] /* Message Type */) { |
81 | 9ea73763 | Csaba Kiraly | case MSG_TYPE_TMAN:
|
82 | 9176d3d1 | Csaba Kiraly | case MSG_TYPE_STREAMER_TOPOLOGY:
|
83 | 615e8354 | Luca Abeni | case MSG_TYPE_TOPOLOGY:
|
84 | 7e65d1f7 | Csaba Kiraly | dtprintf("Topo message received:\n");
|
85 | fcb5c29b | Csaba Kiraly | update_peers(remote, buff, len); |
86 | 89e893e2 | Luca | break;
|
87 | 615e8354 | Luca Abeni | case MSG_TYPE_CHUNK:
|
88 | 7e65d1f7 | Csaba Kiraly | dtprintf("Chunk message received:\n");
|
89 | fcb5c29b | Csaba Kiraly | received_chunk(remote, buff, len); |
90 | d2a239c6 | Luca | break;
|
91 | e223dc99 | Csaba Kiraly | case MSG_TYPE_SIGNALLING:
|
92 | 7e65d1f7 | Csaba Kiraly | dtprintf("Sign message received:\n");
|
93 | d310076d | Csaba Kiraly | sigParseData(remote, buff, len); |
94 | e223dc99 | Csaba Kiraly | break;
|
95 | 89e893e2 | Luca | default:
|
96 | fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
||
97 | } |
||
98 | a9d7dd3b | Luca Abeni | nodeid_free(remote); |
99 | 89e893e2 | Luca | } else {
|
100 | struct timeval tmp;
|
||
101 | fcb5c29b | Csaba Kiraly | //send_chunk();
|
102 | send_offer(); |
||
103 | 89e893e2 | Luca | if (cnt++ % 10 == 0) { |
104 | fcb5c29b | Csaba Kiraly | update_peers(NULL, NULL, 0); |
105 | 89e893e2 | Luca | } |
106 | timeradd(&tnext, &period, &tmp); |
||
107 | tnext = tmp; |
||
108 | } |
||
109 | } |
||
110 | } |
||
111 | |||
112 | 03de31e0 | Csaba Kiraly | void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, int buff_size) |
113 | 89e893e2 | Luca | { |
114 | int done = 0; |
||
115 | static uint8_t buff[BUFFSIZE];
|
||
116 | int cnt = 0; |
||
117 | c9370421 | CsabaKiraly | int fds[FDSSIZE];
|
118 | fds[0] = -1; |
||
119 | 89e893e2 | Luca | |
120 | period.tv_sec = csize / 1000000;
|
||
121 | period.tv_usec = csize % 1000000;
|
||
122 | |||
123 | fcb5c29b | Csaba Kiraly | peers_init(); |
124 | c9370421 | CsabaKiraly | |
125 | 03de31e0 | Csaba Kiraly | if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) { |
126 | 54f4d42f | Csaba Kiraly | fprintf(stderr,"Cannot initialize source, exiting");
|
127 | return;
|
||
128 | } |
||
129 | 89e893e2 | Luca | while (!done) {
|
130 | 4136911a | Luca Abeni | int len, res;
|
131 | a34ed273 | Luca Abeni | struct timeval tv, *ptv;
|
132 | int wait4fds[FDSSIZE], *pfds;
|
||
133 | 89e893e2 | Luca | |
134 | 03dca3bf | ArpadBakay | #ifdef HTTPIO
|
135 | c86d8685 | CsabaKiraly | memcpy(wait4fds, fds, sizeof(fds));
|
136 | res = wait4data(s, NULL, wait4fds);
|
||
137 | 03dca3bf | ArpadBakay | #else
|
138 | a34ed273 | Luca Abeni | if (fds[0] == -1) { |
139 | tout_init(&tv); |
||
140 | ptv = &tv; |
||
141 | pfds = NULL;
|
||
142 | } else {
|
||
143 | memcpy(wait4fds, fds, sizeof(fds));
|
||
144 | pfds = wait4fds; |
||
145 | ptv = NULL;
|
||
146 | } |
||
147 | res = wait4data(s, ptv, pfds); |
||
148 | 03dca3bf | ArpadBakay | #endif
|
149 | 653129d8 | CsabaKiraly | if (res == 1) { |
150 | 74a5d4ae | CsabaKiraly | struct nodeID *remote;
|
151 | 89e893e2 | Luca | |
152 | 4136911a | Luca Abeni | len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
153 | 9d6a53c3 | Csaba Kiraly | if (len < 0) { |
154 | fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
||
155 | nodeid_free(remote); |
||
156 | continue;
|
||
157 | } |
||
158 | fbeb4f32 | Csaba Kiraly | dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote)); |
159 | 89e893e2 | Luca | switch (buff[0] /* Message Type */) { |
160 | 2323102b | MarcoBiazzini | case MSG_TYPE_TMAN:
|
161 | 9176d3d1 | Csaba Kiraly | case MSG_TYPE_STREAMER_TOPOLOGY:
|
162 | 615e8354 | Luca Abeni | case MSG_TYPE_TOPOLOGY:
|
163 | 89e893e2 | Luca | fprintf(stderr, "Top Parse\n");
|
164 | 5805c339 | GiuseppeTropea | #ifdef HTTPIO_MHD
|
165 | b8d7817f | GiuseppeTropea | pthread_mutex_lock(&cb_mutex); |
166 | #endif
|
||
167 | fcb5c29b | Csaba Kiraly | update_peers(remote, buff, len); |
168 | 5805c339 | GiuseppeTropea | #ifdef HTTPIO_MHD
|
169 | b8d7817f | GiuseppeTropea | pthread_mutex_unlock(&cb_mutex); |
170 | #endif
|
||
171 | 89e893e2 | Luca | break;
|
172 | 615e8354 | Luca Abeni | case MSG_TYPE_CHUNK:
|
173 | b07667ee | Csaba Kiraly | fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
|
174 | 10796872 | Luca Abeni | break;
|
175 | e223dc99 | Csaba Kiraly | case MSG_TYPE_SIGNALLING:
|
176 | 5805c339 | GiuseppeTropea | #ifdef HTTPIO_MHD
|
177 | b8d7817f | GiuseppeTropea | pthread_mutex_lock(&cb_mutex); |
178 | #endif
|
||
179 | d310076d | Csaba Kiraly | sigParseData(remote, buff, len); |
180 | 5805c339 | GiuseppeTropea | #ifdef HTTPIO_MHD
|
181 | b8d7817f | GiuseppeTropea | pthread_mutex_unlock(&cb_mutex); |
182 | #endif
|
||
183 | e223dc99 | Csaba Kiraly | break;
|
184 | 89e893e2 | Luca | default:
|
185 | fprintf(stderr, "Bad Message Type %x\n", buff[0]); |
||
186 | } |
||
187 | a9d7dd3b | Luca Abeni | nodeid_free(remote); |
188 | 653129d8 | CsabaKiraly | } else if (res == 0 || res == 2) { //timeout or data arrived from source |
189 | 7eee10e4 | CsabaKiraly | int i;
|
190 | ce80b058 | Luca Abeni | struct timeval tmp, d;
|
191 | 7eee10e4 | CsabaKiraly | struct chunk *c;
|
192 | 89e893e2 | Luca | |
193 | 7abbc9e7 | Csaba Kiraly | d.tv_sec = 0;
|
194 | 7eee10e4 | CsabaKiraly | c = generated_chunk(&d.tv_usec); |
195 | if (c) {
|
||
196 | add_chunk(c); |
||
197 | 6fe7eade | Luca Abeni | for (i = 0; i < chunks; i++) { // @TODO: why this cycle? |
198 | fcb5c29b | Csaba Kiraly | send_chunk(); |
199 | afdc8db4 | Luca Abeni | } |
200 | feaec543 | CsabaKiraly | send_offer(); |
201 | afdc8db4 | Luca Abeni | if (cnt++ % 10 == 0) { |
202 | fcb5c29b | Csaba Kiraly | update_peers(NULL, NULL, 0); |
203 | afdc8db4 | Luca Abeni | } |
204 | 89e893e2 | Luca | } |
205 | ce80b058 | Luca Abeni | timeradd(&tnext, &d, &tmp); |
206 | 89e893e2 | Luca | tnext = tmp; |
207 | } |
||
208 | } |
||
209 | } |