streamers / loop-mt.c @ 03de31e0
History | View | Annotate | Download (5.74 KB)
1 | 8fed7779 | CsabaKiraly | /*
|
---|---|---|---|
2 | * Copyright (c) 2010 Luca Abeni
|
||
3 | * Copyright (c) 2010 Csaba Kiraly
|
||
4 | *
|
||
5 | * This is free software; see gpl-3.0.txt
|
||
6 | */
|
||
7 | ee908119 | Luca Abeni | #include <sys/time.h> |
8 | d92a24a6 | Luca Abeni | #include <unistd.h> |
9 | #include <stdint.h> |
||
10 | #include <stdlib.h> |
||
11 | #include <stdio.h> |
||
12 | #include <pthread.h> |
||
13 | #include <net_helper.h> |
||
14 | e28a1487 | CsabaKiraly | #include <grapes_msg_types.h> |
15 | 0e858c17 | Csaba Kiraly | #include <peerset.h> |
16 | #include <peer.h> |
||
17 | d92a24a6 | Luca Abeni | |
18 | cab9a9fb | Csaba Kiraly | #include "dbg.h" |
19 | 0e858c17 | Csaba Kiraly | #include "chunk_signaling.h" |
20 | d92a24a6 | Luca Abeni | #include "streaming.h" |
21 | 0e858c17 | Csaba Kiraly | #include "topology.h" |
22 | d92a24a6 | Luca Abeni | #include "loop.h" |
23 | |||
24 | c386efc6 | Csaba Kiraly | #define BUFFSIZE 512 * 1024 |
25 | 81b601c1 | GiuseppeTropea | #define FDSSIZE 16 |
26 | d92a24a6 | Luca Abeni | static int chunks_per_period = 1; |
27 | static int period = 500000; |
||
28 | static int done; |
||
29 | 81b601c1 | GiuseppeTropea | pthread_mutex_t cb_mutex; |
30 | pthread_mutex_t topology_mutex; |
||
31 | d92a24a6 | Luca Abeni | static struct nodeID *s; |
32 | |||
33 | static void *chunk_forging(void *dummy) |
||
34 | { |
||
35 | ee908119 | Luca Abeni | suseconds_t d; |
36 | 7eee10e4 | CsabaKiraly | struct chunk *c;
|
37 | d92a24a6 | Luca Abeni | |
38 | while(!done) {
|
||
39 | 7eee10e4 | CsabaKiraly | c = generated_chunk(&d); |
40 | if (c) {
|
||
41 | pthread_mutex_lock(&cb_mutex); |
||
42 | add_chunk(c); |
||
43 | pthread_mutex_unlock(&cb_mutex); |
||
44 | } |
||
45 | ce80b058 | Luca Abeni | usleep(d); |
46 | d92a24a6 | Luca Abeni | } |
47 | |||
48 | return NULL; |
||
49 | } |
||
50 | |||
51 | static void *source_receive(void *dummy) |
||
52 | { |
||
53 | while (!done) {
|
||
54 | int len;
|
||
55 | struct nodeID *remote;
|
||
56 | static uint8_t buff[BUFFSIZE];
|
||
57 | |||
58 | 4136911a | Luca Abeni | len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
59 | 9d6a53c3 | Csaba Kiraly | if (len < 0) { |
60 | fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
||
61 | nodeid_free(remote); |
||
62 | continue;
|
||
63 | } |
||
64 | d92a24a6 | Luca Abeni | switch (buff[0] /* Message Type */) { |
65 | f0b37018 | MarcoBiazzini | case MSG_TYPE_TMAN:
|
66 | 9176d3d1 | Csaba Kiraly | case MSG_TYPE_STREAMER_TOPOLOGY:
|
67 | 615e8354 | Luca Abeni | case MSG_TYPE_TOPOLOGY:
|
68 | d92a24a6 | Luca Abeni | pthread_mutex_lock(&topology_mutex); |
69 | fcb5c29b | Csaba Kiraly | update_peers(remote, buff, len); |
70 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&topology_mutex); |
71 | break;
|
||
72 | 615e8354 | Luca Abeni | case MSG_TYPE_CHUNK:
|
73 | b07667ee | Csaba Kiraly | fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
|
74 | 10796872 | Luca Abeni | break;
|
75 | 7e3653fa | Csaba Kiraly | case MSG_TYPE_SIGNALLING:
|
76 | pthread_mutex_lock(&topology_mutex); |
||
77 | sigParseData(remote, buff, len); |
||
78 | pthread_mutex_unlock(&topology_mutex); |
||
79 | break;
|
||
80 | d92a24a6 | Luca Abeni | default:
|
81 | fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
||
82 | } |
||
83 | a9d7dd3b | Luca Abeni | nodeid_free(remote); |
84 | d92a24a6 | Luca Abeni | } |
85 | |||
86 | return NULL; |
||
87 | } |
||
88 | |||
89 | static void *receive(void *dummy) |
||
90 | { |
||
91 | while (!done) {
|
||
92 | int len;
|
||
93 | struct nodeID *remote;
|
||
94 | static uint8_t buff[BUFFSIZE];
|
||
95 | |||
96 | 4136911a | Luca Abeni | len = recv_from_peer(s, &remote, buff, BUFFSIZE); |
97 | 9d6a53c3 | Csaba Kiraly | if (len < 0) { |
98 | fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
|
||
99 | nodeid_free(remote); |
||
100 | continue;
|
||
101 | } |
||
102 | cab9a9fb | Csaba Kiraly | dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote)); |
103 | d92a24a6 | Luca Abeni | switch (buff[0] /* Message Type */) { |
104 | f0b37018 | MarcoBiazzini | case MSG_TYPE_TMAN:
|
105 | 9176d3d1 | Csaba Kiraly | case MSG_TYPE_STREAMER_TOPOLOGY:
|
106 | 615e8354 | Luca Abeni | case MSG_TYPE_TOPOLOGY:
|
107 | d92a24a6 | Luca Abeni | pthread_mutex_lock(&topology_mutex); |
108 | fcb5c29b | Csaba Kiraly | update_peers(remote, buff, len); |
109 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&topology_mutex); |
110 | break;
|
||
111 | 615e8354 | Luca Abeni | case MSG_TYPE_CHUNK:
|
112 | cab9a9fb | Csaba Kiraly | dprintf("Chunk message received:\n");
|
113 | d92a24a6 | Luca Abeni | pthread_mutex_lock(&cb_mutex); |
114 | fcb5c29b | Csaba Kiraly | received_chunk(remote, buff, len); |
115 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&cb_mutex); |
116 | d2a239c6 | Luca | break;
|
117 | 0e858c17 | Csaba Kiraly | case MSG_TYPE_SIGNALLING:
|
118 | pthread_mutex_lock(&topology_mutex); |
||
119 | d310076d | Csaba Kiraly | sigParseData(remote, buff, len); |
120 | 0e858c17 | Csaba Kiraly | pthread_mutex_unlock(&topology_mutex); |
121 | break;
|
||
122 | d92a24a6 | Luca Abeni | default:
|
123 | fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
||
124 | } |
||
125 | a9d7dd3b | Luca Abeni | nodeid_free(remote); |
126 | d92a24a6 | Luca Abeni | } |
127 | |||
128 | return NULL; |
||
129 | } |
||
130 | |||
131 | static void *topology_sending(void *dummy) |
||
132 | { |
||
133 | int gossiping_period = period * 10; |
||
134 | |||
135 | pthread_mutex_lock(&topology_mutex); |
||
136 | fcb5c29b | Csaba Kiraly | update_peers(NULL, NULL, 0); |
137 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&topology_mutex); |
138 | while(!done) {
|
||
139 | pthread_mutex_lock(&topology_mutex); |
||
140 | fcb5c29b | Csaba Kiraly | update_peers(NULL, NULL, 0); |
141 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&topology_mutex); |
142 | 83c5c285 | Luca Abeni | usleep(gossiping_period); |
143 | d92a24a6 | Luca Abeni | } |
144 | |||
145 | return NULL; |
||
146 | } |
||
147 | |||
148 | static void *chunk_sending(void *dummy) |
||
149 | { |
||
150 | 83c5c285 | Luca Abeni | int chunk_period = period / chunks_per_period;
|
151 | d92a24a6 | Luca Abeni | |
152 | while(!done) {
|
||
153 | pthread_mutex_lock(&topology_mutex); |
||
154 | pthread_mutex_lock(&cb_mutex); |
||
155 | fcb5c29b | Csaba Kiraly | send_chunk(); |
156 | d92a24a6 | Luca Abeni | pthread_mutex_unlock(&cb_mutex); |
157 | pthread_mutex_unlock(&topology_mutex); |
||
158 | 83c5c285 | Luca Abeni | usleep(chunk_period); |
159 | d92a24a6 | Luca Abeni | } |
160 | |||
161 | return NULL; |
||
162 | } |
||
163 | |||
164 | 12a14c43 | CsabaKiraly | static void *chunk_trading(void *dummy) |
165 | { |
||
166 | int chunk_period = period / chunks_per_period;
|
||
167 | |||
168 | while(!done) {
|
||
169 | pthread_mutex_lock(&topology_mutex); |
||
170 | pthread_mutex_lock(&cb_mutex); |
||
171 | send_offer(); |
||
172 | pthread_mutex_unlock(&cb_mutex); |
||
173 | pthread_mutex_unlock(&topology_mutex); |
||
174 | usleep(chunk_period); |
||
175 | } |
||
176 | |||
177 | return NULL; |
||
178 | } |
||
179 | |||
180 | 0a40460a | Luca | void loop(struct nodeID *s1, int csize, int buff_size) |
181 | d92a24a6 | Luca Abeni | { |
182 | pthread_t receive_thread, gossiping_thread, distributing_thread; |
||
183 | |||
184 | period = csize; |
||
185 | s = s1; |
||
186 | |||
187 | 9e72961f | CsabaKiraly | peers_init(); |
188 | 0a40460a | Luca | stream_init(buff_size, s); |
189 | d92a24a6 | Luca Abeni | pthread_mutex_init(&cb_mutex, NULL);
|
190 | pthread_mutex_init(&topology_mutex, NULL);
|
||
191 | pthread_create(&receive_thread, NULL, receive, NULL); |
||
192 | pthread_create(&gossiping_thread, NULL, topology_sending, NULL); |
||
193 | 12a14c43 | CsabaKiraly | pthread_create(&distributing_thread, NULL, chunk_trading, NULL); |
194 | d92a24a6 | Luca Abeni | |
195 | pthread_join(receive_thread, NULL);
|
||
196 | pthread_join(gossiping_thread, NULL);
|
||
197 | pthread_join(distributing_thread, NULL);
|
||
198 | } |
||
199 | |||
200 | 03de31e0 | Csaba Kiraly | void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, int buff_size) |
201 | d92a24a6 | Luca Abeni | { |
202 | pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread; |
||
203 | |||
204 | period = csize; |
||
205 | chunks_per_period = chunks; |
||
206 | s = s1; |
||
207 | |||
208 | 81b601c1 | GiuseppeTropea | int fds[FDSSIZE];
|
209 | fds[0] = -1; |
||
210 | |||
211 | // sigInit(s);
|
||
212 | 9e72961f | CsabaKiraly | peers_init(); |
213 | 03de31e0 | Csaba Kiraly | if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) { |
214 | 54f4d42f | Csaba Kiraly | fprintf(stderr,"Cannot initialize source, exiting");
|
215 | return;
|
||
216 | } |
||
217 | d92a24a6 | Luca Abeni | pthread_mutex_init(&cb_mutex, NULL);
|
218 | pthread_mutex_init(&topology_mutex, NULL);
|
||
219 | pthread_create(&receive_thread, NULL, source_receive, NULL); |
||
220 | pthread_create(&gossiping_thread, NULL, topology_sending, NULL); |
||
221 | 81b601c1 | GiuseppeTropea | #ifndef HTTPIO
|
222 | d92a24a6 | Luca Abeni | pthread_create(&distributing_thread, NULL, chunk_sending, NULL); |
223 | 81b601c1 | GiuseppeTropea | pthread_create(&generate_thread, NULL, chunk_forging, NULL); |
224 | #endif
|
||
225 | d92a24a6 | Luca Abeni | |
226 | 81b601c1 | GiuseppeTropea | #ifndef HTTPIO
|
227 | d92a24a6 | Luca Abeni | pthread_join(generate_thread, NULL);
|
228 | 81b601c1 | GiuseppeTropea | pthread_join(distributing_thread, NULL);
|
229 | #endif
|
||
230 | d92a24a6 | Luca Abeni | pthread_join(receive_thread, NULL);
|
231 | pthread_join(gossiping_thread, NULL);
|
||
232 | } |