pstreamer / src / streaming.c @ dc99f5c4
History | View | Annotate | Download (31.3 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010-2011 Luca Abeni
|
3 |
* Copyright (c) 2010-2011 Csaba Kiraly
|
4 |
* Copyright (c) 2017 Luca Baldesi
|
5 |
*
|
6 |
* This file is part of PeerStreamer.
|
7 |
*
|
8 |
* PeerStreamer is free software: you can redistribute it and/or
|
9 |
* modify it under the terms of the GNU Affero General Public License as
|
10 |
* published by the Free Software Foundation, either version 3 of the
|
11 |
* License, or (at your option) any later version.
|
12 |
*
|
13 |
* PeerStreamer is distributed in the hope that it will be useful,
|
14 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
15 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
|
16 |
* General Public License for more details.
|
17 |
*
|
18 |
* You should have received a copy of the GNU Affero General Public License
|
19 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>.
|
20 |
*
|
21 |
*/
|
22 |
#include <sys/time.h> |
23 |
#include <stdlib.h> |
24 |
#include <stdio.h> |
25 |
#include <stdint.h> |
26 |
#include <stdbool.h> |
27 |
#include <math.h> |
28 |
#include <assert.h> |
29 |
#include <string.h> |
30 |
#include <inttypes.h> |
31 |
|
32 |
#include <net_helper.h> |
33 |
#include <chunk.h> |
34 |
#include <chunkbuffer.h> |
35 |
#include <trade_msg_la.h> |
36 |
#include <trade_msg_ha.h> |
37 |
#include <peerset.h> |
38 |
#include <peer.h> |
39 |
#include <chunkidset.h> |
40 |
#include <limits.h> |
41 |
#include <trade_sig_ha.h> |
42 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
43 |
#include <chunkiser_attrib.h> |
44 |
#endif
|
45 |
|
46 |
#include "streaming.h" |
47 |
#include "output.h" |
48 |
#include "input.h" |
49 |
#include "dbg.h" |
50 |
#include "chunk_signaling.h" |
51 |
#include "chunklock.h" |
52 |
#include "topology.h" |
53 |
#include "measures.h" |
54 |
#include "net_helpers.h" |
55 |
#include "scheduling.h" |
56 |
#include "transaction.h" |
57 |
#include "peer_metadata.h" |
58 |
|
59 |
#include "scheduler_la.h" |
60 |
#include "grapes_config.h" |
61 |
|
62 |
# define CB_SIZE_TIME_UNLIMITED 1e12 |
63 |
|
64 |
void selectPeersForChunks(SchedOrdering ordering, schedPeerID *peers, size_t peers_len, schedChunkID *chunks, size_t chunks_len, schedPeerID *selected, size_t *selected_len, filterFunction filter, peerEvaluateFunction evaluate);
|
65 |
|
66 |
struct chunk_attributes {
|
67 |
uint64_t deadline; |
68 |
uint16_t deadline_increment; |
69 |
uint16_t hopcount; |
70 |
} __attribute__((packed)); |
71 |
|
72 |
enum distribution_type {DIST_UNIFORM, DIST_TURBO};
|
73 |
|
74 |
struct streaming_context {
|
75 |
struct chunk_buffer *cb;
|
76 |
struct input_desc *input;
|
77 |
struct chunk_locks * ch_locks;
|
78 |
struct service_times_element * transactions;
|
79 |
const struct psinstance * ps; |
80 |
int cb_size;
|
81 |
int bcast_after_receive_every;
|
82 |
bool neigh_on_chunk_recv;
|
83 |
bool send_bmap_before_push;
|
84 |
uint64_t CB_SIZE_TIME; |
85 |
uint32_t chunk_loss_interval; |
86 |
enum distribution_type dist_type;
|
87 |
int offer_per_period;
|
88 |
}; |
89 |
|
90 |
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config) |
91 |
{ |
92 |
struct streaming_context * stc;
|
93 |
struct tag * tags;
|
94 |
static char conf[80]; |
95 |
|
96 |
stc = malloc(sizeof(struct streaming_context)); |
97 |
stc->ps = ps; |
98 |
stc->bcast_after_receive_every = 0;
|
99 |
stc->neigh_on_chunk_recv = false;
|
100 |
stc->send_bmap_before_push = false;
|
101 |
stc->transactions = NULL;
|
102 |
stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED; //in millisec, defaults to unlimited
|
103 |
stc->chunk_loss_interval = 0; // disable self-lossy feature (for experiments) |
104 |
stc->dist_type = DIST_UNIFORM; |
105 |
|
106 |
tags = grapes_config_parse(config); |
107 |
if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0) |
108 |
stc->dist_type = DIST_TURBO; |
109 |
grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1); |
110 |
free(tags); |
111 |
|
112 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
|
113 |
stc->cb_size = psinstance_chunkbuffer_size(ps); |
114 |
sprintf(conf, "size=%d", stc->cb_size);
|
115 |
stc->cb = cb_init(conf); |
116 |
stc->ch_locks = chunk_locks_create(); |
117 |
|
118 |
chunkDeliveryInit(psinstance_nodeid(ps)); |
119 |
chunkSignalingInit(psinstance_nodeid(ps)); |
120 |
return stc;
|
121 |
} |
122 |
|
123 |
void streaming_destroy(struct streaming_context ** stc) |
124 |
{ |
125 |
if (stc && *stc)
|
126 |
{ |
127 |
if((*stc)->input)
|
128 |
input_close((*stc)->input); |
129 |
if(((*stc)->ch_locks))
|
130 |
chunk_locks_destroy(&((*stc)->ch_locks)); |
131 |
if(((*stc)->transactions))
|
132 |
transactions_destroy((*stc)->transactions); |
133 |
if(((*stc)->cb))
|
134 |
cb_destroy((*stc)->cb); |
135 |
free((*stc)); |
136 |
} |
137 |
} |
138 |
|
139 |
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid); |
140 |
|
141 |
uint64_t gettimeofday_in_us(void)
|
142 |
{ |
143 |
struct timeval what_time; //to store the epoch time |
144 |
|
145 |
gettimeofday(&what_time, NULL);
|
146 |
return what_time.tv_sec * 1000000ULL + what_time.tv_usec; |
147 |
} |
148 |
|
149 |
void cb_print(const struct streaming_context * stc) |
150 |
{ |
151 |
#ifdef DEBUG
|
152 |
struct chunk *chunks;
|
153 |
int num_chunks, i, id;
|
154 |
chunks = cb_get_chunks(stc->cb, &num_chunks); |
155 |
|
156 |
dprintf("\tchbuf :");
|
157 |
i = 0;
|
158 |
if(num_chunks) {
|
159 |
id = chunks[0].id;
|
160 |
dprintf(" %d-> ",id);
|
161 |
while (i < num_chunks) {
|
162 |
if (id == chunks[i].id) {
|
163 |
dprintf("%d",id % 10); |
164 |
i++; |
165 |
} else if (chunk_islocked(stc->ch_locks, id)) { |
166 |
dprintf("*");
|
167 |
} else {
|
168 |
dprintf(".");
|
169 |
} |
170 |
id++; |
171 |
} |
172 |
} |
173 |
dprintf("\n");
|
174 |
#endif
|
175 |
} |
176 |
|
177 |
void chunk_attributes_fill(struct chunk* c) |
178 |
{ |
179 |
struct chunk_attributes * ca;
|
180 |
int priority = 1; |
181 |
|
182 |
assert((!c->attributes && c->attributes_size == 0)
|
183 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
184 |
|| chunk_attributes_chunker_verify(c->attributes, c->attributes_size) |
185 |
#endif
|
186 |
); |
187 |
|
188 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
189 |
if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
|
190 |
priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
|
191 |
free(c->attributes); |
192 |
c->attributes = NULL;
|
193 |
c->attributes_size = 0;
|
194 |
} |
195 |
#endif
|
196 |
|
197 |
c->attributes_size = sizeof(struct chunk_attributes); |
198 |
c->attributes = ca = malloc(c->attributes_size); |
199 |
|
200 |
ca->deadline = c->id; |
201 |
ca->deadline_increment = priority * 2;
|
202 |
ca->hopcount = 0;
|
203 |
} |
204 |
|
205 |
int chunk_get_hopcount(const struct chunk* c) { |
206 |
struct chunk_attributes * ca;
|
207 |
|
208 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
209 |
fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
210 |
return -1; |
211 |
} |
212 |
|
213 |
ca = (struct chunk_attributes *) c->attributes;
|
214 |
return ca->hopcount;
|
215 |
} |
216 |
|
217 |
void chunk_attributes_update_received(struct chunk* c) |
218 |
{ |
219 |
struct chunk_attributes * ca;
|
220 |
|
221 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
222 |
fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
223 |
return;
|
224 |
} |
225 |
|
226 |
ca = (struct chunk_attributes *) c->attributes;
|
227 |
ca->hopcount++; |
228 |
dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
|
229 |
} |
230 |
|
231 |
void chunk_attributes_update_sending(const struct chunk* c) |
232 |
{ |
233 |
struct chunk_attributes * ca;
|
234 |
|
235 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
236 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
237 |
return;
|
238 |
} |
239 |
|
240 |
ca = (struct chunk_attributes *) c->attributes;
|
241 |
ca->deadline += ca->deadline_increment; |
242 |
dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
|
243 |
} |
244 |
|
245 |
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
246 |
{ |
247 |
struct chunk *chunks;
|
248 |
int num_chunks, i;
|
249 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
250 |
chunks = cb_get_chunks(chbuf, &num_chunks); |
251 |
|
252 |
for(i=num_chunks-1; i>=0; i--) { |
253 |
chunkID_set_add_chunk(my_bmap, chunks[i].id); |
254 |
} |
255 |
return my_bmap;
|
256 |
} |
257 |
|
258 |
// a simple implementation that request everything that we miss ... up to max deliver
|
259 |
struct chunkID_set *get_chunks_to_accept(const struct streaming_context * stc, struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){ |
260 |
struct chunkID_set *cset_acc, *my_bmap;
|
261 |
int i, d, cset_off_size;
|
262 |
//double lossrate;
|
263 |
struct peer *from = nodeid_to_peer(psinstance_topology(stc->ps), fromid, 0); |
264 |
|
265 |
cset_acc = chunkID_set_init("size=0");
|
266 |
|
267 |
//reduce load a little bit if there are losses on the path from this guy
|
268 |
//lossrate = get_lossrate_receive(from->id);
|
269 |
//lossrate = finite(lossrate) ? lossrate : 0; //start agressively, assuming 0 loss
|
270 |
//if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
|
271 |
my_bmap = cb_to_bmap(stc->cb); |
272 |
cset_off_size = chunkID_set_size(cset_off); |
273 |
for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) { |
274 |
int chunkid = chunkID_set_get_chunk(cset_off, i);
|
275 |
//dprintf("\tdo I need c%d ? :",chunkid);
|
276 |
if (!chunk_islocked(stc->ch_locks, chunkid) && _needs(stc, my_bmap, stc->cb_size, chunkid)) {
|
277 |
chunkID_set_add_chunk(cset_acc, chunkid); |
278 |
chunk_lock(stc->ch_locks, chunkid,from); |
279 |
dtprintf("accepting %d from %s", chunkid, nodeid_static_str(fromid));
|
280 |
#ifdef MONL
|
281 |
dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
|
282 |
#endif
|
283 |
dprintf("\n");
|
284 |
d++; |
285 |
} |
286 |
} |
287 |
chunkID_set_free(my_bmap); |
288 |
//} else {
|
289 |
// dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
|
290 |
//}
|
291 |
|
292 |
reg_offer_accept_in(psinstance_measures(stc->ps), chunkID_set_size(cset_acc) > 0 ? 1 : 0); |
293 |
|
294 |
return cset_acc;
|
295 |
} |
296 |
|
297 |
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid) |
298 |
{ |
299 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
300 |
sendBufferMap(toid,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0); |
301 |
#ifdef LOG_SIGNAL
|
302 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT"); |
303 |
#endif
|
304 |
chunkID_set_free(my_bmap); |
305 |
} |
306 |
|
307 |
void bcast_bmap(const struct streaming_context * stc) |
308 |
{ |
309 |
int i, n;
|
310 |
struct peer **neighbours;
|
311 |
struct peerset *pset;
|
312 |
struct chunkID_set *my_bmap;
|
313 |
|
314 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
315 |
n = peerset_size(pset); |
316 |
neighbours = peerset_get_peers(pset); |
317 |
|
318 |
my_bmap = cb_to_bmap(stc->cb); //cache our bmap for faster processing
|
319 |
for (i = 0; i<n; i++) { |
320 |
sendBufferMap(neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0); |
321 |
#ifdef LOG_SIGNAL
|
322 |
log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT"); |
323 |
#endif
|
324 |
} |
325 |
chunkID_set_free(my_bmap); |
326 |
} |
327 |
|
328 |
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id) |
329 |
{ |
330 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
331 |
sendAck(toid, my_bmap,trans_id); |
332 |
#ifdef LOG_SIGNAL
|
333 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
|
334 |
#endif
|
335 |
chunkID_set_free(my_bmap); |
336 |
} |
337 |
|
338 |
double get_average_lossrate_pset(struct peerset *pset) |
339 |
{ |
340 |
return 0; |
341 |
} |
342 |
|
343 |
void ack_chunk(const struct streaming_context * stc, struct chunk *c, struct nodeID *from, uint16_t trans_id) |
344 |
{ |
345 |
//reduce load a little bit if there are losses on the path from this guy
|
346 |
double average_lossrate = get_average_lossrate_pset(topology_get_neighbours(psinstance_topology(stc->ps)));
|
347 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
348 |
if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) { |
349 |
return;
|
350 |
} |
351 |
send_ack(stc, from, trans_id); //send explicit ack
|
352 |
} |
353 |
|
354 |
void received_chunk(struct streaming_context * stc, struct nodeID *from, const uint8_t *buff, int len) |
355 |
{ |
356 |
int res;
|
357 |
static struct chunk c; |
358 |
struct peer *p;
|
359 |
static int bcast_cnt; |
360 |
uint16_t transid; |
361 |
|
362 |
res = parseChunkMsg(buff + 1, len - 1, &c, &transid); |
363 |
if (res > 0) { |
364 |
if (stc->chunk_loss_interval && c.id % stc->chunk_loss_interval == 0) { |
365 |
fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
|
366 |
free(c.data); |
367 |
free(c.attributes); |
368 |
return;
|
369 |
} |
370 |
chunk_attributes_update_received(&c); |
371 |
chunk_unlock(stc->ch_locks, c.id); |
372 |
dprintf("Received chunk %d from peer: %s\n", c.id, nodeid_static_str(from));
|
373 |
#ifdef LOG_CHUNK
|
374 |
log_chunk(from,psinstance_nodeid(stc->ps),&c,"RECEIVED");
|
375 |
#endif
|
376 |
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
|
377 |
output_deliver(psinstance_output(stc->ps), &c); |
378 |
res = cb_add_chunk(stc->cb, &c); |
379 |
reg_chunk_receive(psinstance_measures(stc->ps),c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE); |
380 |
cb_print(stc); |
381 |
if (res < 0) { |
382 |
dprintf("\tchunk too old, buffer full with newer chunks\n");
|
383 |
#ifdef LOG_CHUNK
|
384 |
log_chunk_error(from,psinstance_nodeid(stc->ps),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
|
385 |
#endif
|
386 |
free(c.data); |
387 |
free(c.attributes); |
388 |
} |
389 |
p = nodeid_to_peer(psinstance_topology(stc->ps), from, stc->neigh_on_chunk_recv); |
390 |
if (p) { //now we have it almost sure |
391 |
chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
392 |
gettimeofday(&p->bmap_timestamp, NULL);
|
393 |
} |
394 |
ack_chunk(stc, &c, from, transid); //send explicit ack
|
395 |
if (stc->bcast_after_receive_every && bcast_cnt % stc->bcast_after_receive_every == 0) { |
396 |
bcast_bmap(stc); |
397 |
} |
398 |
} else {
|
399 |
fprintf(stderr,"\tError: can't decode chunk!\n");
|
400 |
} |
401 |
} |
402 |
|
403 |
struct chunk *generated_chunk(struct streaming_context * stc, suseconds_t *delta) |
404 |
{ |
405 |
struct chunk *c;
|
406 |
|
407 |
c = malloc(sizeof(struct chunk)); |
408 |
if (!c) {
|
409 |
fprintf(stderr, "Memory allocation error!\n");
|
410 |
return NULL; |
411 |
} |
412 |
memset(c, 0, sizeof(struct chunk)); |
413 |
|
414 |
*delta = (suseconds_t)input_get(stc->input, c); |
415 |
if (*delta < 0) { |
416 |
fprintf(stderr, "Error in input!\n");
|
417 |
exit(-1);
|
418 |
} |
419 |
if (c->data == NULL) { |
420 |
free(c); |
421 |
return NULL; |
422 |
} |
423 |
dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
|
424 |
chunk_attributes_fill(c); |
425 |
return c;
|
426 |
} |
427 |
|
428 |
int add_chunk(struct streaming_context * stc, struct chunk *c) |
429 |
{ |
430 |
int res;
|
431 |
|
432 |
if (stc && c && stc->cb)
|
433 |
{ |
434 |
res = cb_add_chunk(stc->cb, c); |
435 |
if (res < 0) { |
436 |
free(c->data); |
437 |
free(c->attributes); |
438 |
free(c); |
439 |
return 0; |
440 |
} |
441 |
// free(c);
|
442 |
return 1; |
443 |
} |
444 |
return 0; |
445 |
} |
446 |
|
447 |
uint64_t get_chunk_timestamp(const struct streaming_context * stc, int cid){ |
448 |
const struct chunk *c = cb_get_chunk(stc->cb, cid); |
449 |
if (!c) return 0; |
450 |
|
451 |
return c->timestamp;
|
452 |
} |
453 |
|
454 |
void print_chunkID_set(struct chunkID_set *cset) |
455 |
{ |
456 |
uint32_t * ptr = (uint32_t *) cset; |
457 |
uint32_t n_elements,i; |
458 |
int * data = (int *) &(ptr[3]); |
459 |
fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]); |
460 |
fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]); |
461 |
n_elements = ptr[2];
|
462 |
fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
|
463 |
fprintf(stderr,"[DEBUG] Chunk ID elements: [");
|
464 |
for (i=0;i<n_elements;i++) |
465 |
fprintf(stderr,".%d.",data[i]);
|
466 |
|
467 |
fprintf(stderr,"]\n");
|
468 |
} |
469 |
|
470 |
/**
|
471 |
*example function to filter chunks based on whether a given peer needs them.
|
472 |
*
|
473 |
* Looks at buffermap information received about the given peer.
|
474 |
*/
|
475 |
int needs_old(const struct streaming_context * stc, struct peer *n, int cid){ |
476 |
struct peer * p = n;
|
477 |
|
478 |
if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
479 |
uint64_t ts; |
480 |
ts = get_chunk_timestamp(stc, cid); |
481 |
if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
482 |
return 0; |
483 |
} |
484 |
} |
485 |
|
486 |
//dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
|
487 |
if (! p->bmap) { // this will never happen since the pset module initializes bmap |
488 |
//dprintf("no bmap\n");
|
489 |
return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
490 |
} |
491 |
|
492 |
// fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
|
493 |
// print_chunkID_set(p->bmap); // DEBUG
|
494 |
|
495 |
return _needs(stc, p->bmap, peer_cb_size(p), cid);
|
496 |
} |
497 |
|
498 |
/**
|
499 |
* Function checking if chunkID_set cset may need chunk with id cid
|
500 |
* @cset: target cset
|
501 |
* @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
|
502 |
* the maximum capacity in of the receiving peer (so it's 0 for the source)
|
503 |
* @cid: target chunk identifier
|
504 |
*/
|
505 |
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid){ |
506 |
|
507 |
if (cb_size == 0) { //if it declared it does not needs chunks |
508 |
return 0; |
509 |
} |
510 |
|
511 |
if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
512 |
uint64_t ts; |
513 |
ts = get_chunk_timestamp(stc, cid); |
514 |
if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
515 |
return 0; |
516 |
} |
517 |
} |
518 |
|
519 |
if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk |
520 |
int missing, min;
|
521 |
//@TODO: add some bmap_timestamp based logic
|
522 |
|
523 |
if (chunkID_set_size(cset) == 0) { |
524 |
//dprintf("bmap empty\n");
|
525 |
return 1; // if the bmap seems empty, it needs the chunk |
526 |
} |
527 |
missing = stc->cb_size - chunkID_set_size(cset); |
528 |
missing = missing < 0 ? 0 : missing; |
529 |
min = chunkID_set_get_earliest(cset); |
530 |
//dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
|
531 |
return (cid >= min - missing);
|
532 |
} |
533 |
|
534 |
//dprintf("has it\n");
|
535 |
return 0; |
536 |
} |
537 |
|
538 |
int needs(struct peer *p, int cid) |
539 |
{ |
540 |
int min;
|
541 |
|
542 |
if (peer_cb_size(p) == 0) // it does not have capacity |
543 |
return 0; |
544 |
if (chunkID_set_check(p->bmap,cid) < 0) // not in bmap |
545 |
{ |
546 |
if(peer_cb_size(p) > chunkID_set_size(p->bmap)) // it has room for chunks anyway |
547 |
{ |
548 |
min = chunkID_set_get_earliest(p->bmap) - peer_cb_size(p) + chunkID_set_size(p->bmap); |
549 |
min = min < 0 ? 0 : min; |
550 |
if (cid >= min)
|
551 |
return 1; |
552 |
} |
553 |
if((int)chunkID_set_get_earliest(p->bmap) < cid) // our is reasonably new |
554 |
return 1; |
555 |
} |
556 |
return 0; |
557 |
} |
558 |
|
559 |
double peerWeightReceivedfrom(struct peer **n){ |
560 |
struct peer * p = *n;
|
561 |
return timerisset(&p->bmap_timestamp) ? 1 : 0.1; |
562 |
} |
563 |
|
564 |
double peerWeightUniform(struct peer **n){ |
565 |
return 1; |
566 |
} |
567 |
|
568 |
double peerWeightNeigh(struct peer **n){ |
569 |
double res;
|
570 |
res = (double)(((struct metadata *)(*n)->metadata)->neigh_size); |
571 |
if (res)
|
572 |
return res;
|
573 |
return 1; |
574 |
} |
575 |
|
576 |
double peerWeightInvNeigh(struct peer **n){ |
577 |
double res;
|
578 |
res = peerWeightNeigh(n); |
579 |
return 1/res; |
580 |
} |
581 |
|
582 |
double peerWeightLoss(struct peer **n){ |
583 |
return 1; |
584 |
} |
585 |
|
586 |
//double peerWeightRtt(struct peer **n){
|
587 |
//#ifdef MONL
|
588 |
// double rtt = get_rtt((*n)->id);
|
589 |
// //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
|
590 |
// return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
|
591 |
//#else
|
592 |
// return 1;
|
593 |
//#endif
|
594 |
//}
|
595 |
|
596 |
//ordering function for ELp peer selection, chunk ID based
|
597 |
//can't be used as weight
|
598 |
//double peerScoreELpID(struct nodeID **n){
|
599 |
// struct chunkID_set *bmap;
|
600 |
// int latest;
|
601 |
// struct peer * p = nodeid_to_peer(*n, 0);
|
602 |
// if (!p) return 0;
|
603 |
//
|
604 |
// bmap = p->bmap;
|
605 |
// if (!bmap) return 0;
|
606 |
// latest = chunkID_set_get_latest(bmap);
|
607 |
// if (latest == INT_MIN) return 0;
|
608 |
//
|
609 |
// return -latest;
|
610 |
//}
|
611 |
|
612 |
double chunkScoreChunkID(int *cid){ |
613 |
return (double) *cid; |
614 |
} |
615 |
|
616 |
uint64_t get_chunk_deadline(const struct streaming_context * stc, int cid){ |
617 |
const struct chunk_attributes * ca; |
618 |
const struct chunk *c; |
619 |
|
620 |
c = cb_get_chunk(stc->cb, cid); |
621 |
if (!c) return 0; |
622 |
|
623 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
624 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
625 |
return 0; |
626 |
} |
627 |
|
628 |
ca = (struct chunk_attributes *) c->attributes;
|
629 |
return ca->deadline;
|
630 |
} |
631 |
|
632 |
//double chunkScoreDL(int *cid){
|
633 |
// return - (double)get_chunk_deadline(*cid);
|
634 |
//}
|
635 |
|
636 |
//double chunkScoreTimestamp(int *cid){
|
637 |
// return (double) get_chunk_timestamp(*cid);
|
638 |
//}
|
639 |
|
640 |
void send_accepted_chunks(const struct streaming_context * stc, struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){ |
641 |
int i, d, cset_acc_size, res;
|
642 |
struct peer *to = nodeid_to_peer(psinstance_topology(stc->ps), toid, 0); |
643 |
|
644 |
transaction_reg_accept(stc->transactions, trans_id, toid); |
645 |
|
646 |
cset_acc_size = chunkID_set_size(cset_acc); |
647 |
reg_offer_accept_out(psinstance_measures(stc->ps),cset_acc_size > 0 ? 1 : 0); //this only works if accepts are sent back even if 0 is accepted |
648 |
for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) { |
649 |
const struct chunk *c; |
650 |
int chunkid = chunkID_set_get_chunk(cset_acc, i);
|
651 |
c = cb_get_chunk(stc->cb, chunkid); |
652 |
if (!c) { // we should have the chunk |
653 |
dprintf("%s asked for chunk %d we do not own anymore\n", nodeid_static_str(toid), chunkid);
|
654 |
continue;
|
655 |
} |
656 |
if (!to || needs(to, chunkid)) { //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification |
657 |
chunk_attributes_update_sending(c); |
658 |
res = sendChunk(toid, c, trans_id); |
659 |
if (res >= 0) { |
660 |
if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive |
661 |
d++; |
662 |
reg_chunk_send(psinstance_measures(stc->ps),c->id); |
663 |
#ifdef LOG_CHUNK
|
664 |
log_chunk(psinstance_nodeid(stc->ps), toid,c, "SENT_ACCEPTED");
|
665 |
#endif
|
666 |
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
|
667 |
} else {
|
668 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
669 |
} |
670 |
} |
671 |
} |
672 |
} |
673 |
|
674 |
int offer_peer_count(const struct streaming_context * stc) |
675 |
{ |
676 |
return psinstance_num_offers(stc->ps);
|
677 |
} |
678 |
|
679 |
int offer_max_deliver(const struct streaming_context * stc, const struct nodeID *n) |
680 |
{ |
681 |
return psinstance_chunks_per_offer(stc->ps);
|
682 |
} |
683 |
|
684 |
////get the rtt. Currenly only MONL version is supported
|
685 |
//static double get_rtt_of(struct nodeID* n){
|
686 |
//#ifdef MONL
|
687 |
// return get_rtt(n);
|
688 |
//#else
|
689 |
// return NAN;
|
690 |
//#endif
|
691 |
//}
|
692 |
|
693 |
#define DEFAULT_RTT_ESTIMATE 0.5 |
694 |
|
695 |
struct chunkID_set *compose_offer_cset(const struct streaming_context * stc, const struct peer *p) |
696 |
{ |
697 |
int num_chunks, j;
|
698 |
uint64_t smallest_ts; //, largest_ts;
|
699 |
double dt;
|
700 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
701 |
struct chunk *chunks = cb_get_chunks(stc->cb, &num_chunks);
|
702 |
|
703 |
dt = DEFAULT_RTT_ESTIMATE; |
704 |
dt *= 1e6; //convert to usec |
705 |
|
706 |
smallest_ts = chunks[0].timestamp;
|
707 |
// largest_ts = chunks[num_chunks-1].timestamp;
|
708 |
|
709 |
//add chunks in latest...earliest order
|
710 |
if (psinstance_is_source(stc->ps)) {
|
711 |
j = (num_chunks-1) * 3/4; //do not send offers for the latest chunks from the source |
712 |
} else {
|
713 |
j = num_chunks-1;
|
714 |
} |
715 |
for(; j>=0; j--) { |
716 |
if (chunks[j].timestamp > smallest_ts + dt)
|
717 |
chunkID_set_add_chunk(my_bmap, chunks[j].id); |
718 |
} |
719 |
|
720 |
return my_bmap;
|
721 |
} |
722 |
|
723 |
|
724 |
void send_offer(const struct streaming_context * stc) |
725 |
{ |
726 |
struct chunk *buff;
|
727 |
size_t size=0, i, n;
|
728 |
struct peer **neighbours;
|
729 |
struct peerset *pset;
|
730 |
|
731 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
732 |
n = peerset_size(pset); |
733 |
neighbours = peerset_get_peers(pset); |
734 |
// dprintf("Send Offer: %lu neighbours\n", n);
|
735 |
if (n == 0) return; |
736 |
buff = cb_get_chunks(stc->cb, (int *)&size);
|
737 |
if (size == 0) return; |
738 |
|
739 |
size_t selectedpeers_len = offer_peer_count(stc); |
740 |
int chunkids[size];
|
741 |
struct peer *nodeids[n];
|
742 |
struct peer *selectedpeers[selectedpeers_len];
|
743 |
|
744 |
//reduce load a little bit if there are losses on the path from this guy
|
745 |
double average_lossrate = get_average_lossrate_pset(pset);
|
746 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
747 |
if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) { |
748 |
return;
|
749 |
} |
750 |
|
751 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
752 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
753 |
if (stc->dist_type == DIST_TURBO)
|
754 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh); |
755 |
else
|
756 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform); |
757 |
|
758 |
for (i=0; i<selectedpeers_len ; i++){ |
759 |
int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
|
760 |
int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
|
761 |
struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
|
762 |
dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), peer_cb_size(selectedpeers[i]));
|
763 |
offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++); |
764 |
#ifdef LOG_SIGNAL
|
765 |
log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
|
766 |
#endif
|
767 |
chunkID_set_free(offer_cset); |
768 |
} |
769 |
} |
770 |
|
771 |
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error) |
772 |
{ |
773 |
switch (error) {
|
774 |
case E_CB_OLD:
|
775 |
log_chunk(from,to,c,"TOO_OLD");
|
776 |
break;
|
777 |
case E_CB_DUPLICATE:
|
778 |
log_chunk(from,to,c,"DUPLICATED");
|
779 |
break;
|
780 |
default:
|
781 |
log_chunk(from,to,c,"ERROR");
|
782 |
} |
783 |
} |
784 |
|
785 |
void log_neighbourhood(const struct streaming_context * stc) |
786 |
{ |
787 |
struct peerset * pset;
|
788 |
const struct peer * p; |
789 |
int psetsize,i;
|
790 |
uint64_t now; |
791 |
char me[NODE_STR_LENGTH];
|
792 |
|
793 |
node_addr(psinstance_nodeid(stc->ps), me, NODE_STR_LENGTH); |
794 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
795 |
psetsize = peerset_size(pset); |
796 |
now = gettimeofday_in_us(); |
797 |
peerset_for_each(pset,p,i) |
798 |
fprintf(stderr,"[NEIGHBOURHOOD],%"PRIu64",%s,%s,%d\n",now,me,nodeid_static_str(p->id),psetsize); |
799 |
|
800 |
} |
801 |
|
802 |
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note) |
803 |
{ |
804 |
// semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
|
805 |
char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
|
806 |
node_addr(from,sndr,NODE_STR_LENGTH); |
807 |
node_addr(to,rcvr,NODE_STR_LENGTH); |
808 |
|
809 |
fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note); |
810 |
} |
811 |
|
812 |
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag) |
813 |
{ |
814 |
char typestr[24]; |
815 |
char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
|
816 |
node_addr(fromid,sndr,NODE_STR_LENGTH); |
817 |
node_addr(toid,rcvr,NODE_STR_LENGTH); |
818 |
|
819 |
switch (type)
|
820 |
{ |
821 |
case sig_offer:
|
822 |
sprintf(typestr,"%s","OFFER_SIG"); |
823 |
break;
|
824 |
case sig_accept:
|
825 |
sprintf(typestr,"%s","ACCEPT_SIG"); |
826 |
break;
|
827 |
case sig_request:
|
828 |
sprintf(typestr,"%s","REQUEST_SIG"); |
829 |
break;
|
830 |
case sig_deliver:
|
831 |
sprintf(typestr,"%s","DELIVER_SIG"); |
832 |
break;
|
833 |
case sig_send_buffermap:
|
834 |
sprintf(typestr,"%s","SEND_BMAP_SIG"); |
835 |
break;
|
836 |
case sig_request_buffermap:
|
837 |
sprintf(typestr,"%s","REQUEST_BMAP_SIG"); |
838 |
break;
|
839 |
case sig_ack:
|
840 |
sprintf(typestr,"%s","CHUNK_ACK_SIG"); |
841 |
break;
|
842 |
default:
|
843 |
sprintf(typestr,"%s","UNKNOWN_SIG"); |
844 |
|
845 |
} |
846 |
fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%s,%s\n",sndr,rcvr,trans_id,typestr,flag);
|
847 |
} |
848 |
|
849 |
int peer_chunk_dispatch(const struct streaming_context * stc, const struct PeerChunk *pairs,const size_t num_pairs) |
850 |
{ |
851 |
int transid, res,success = 0; |
852 |
size_t i; |
853 |
const struct peer * target_peer; |
854 |
const struct chunk * target_chunk; |
855 |
|
856 |
for (i=0; i<num_pairs ; i++){ |
857 |
target_peer = pairs[i].peer; |
858 |
target_chunk = cb_get_chunk(stc->cb, pairs[i].chunk); |
859 |
|
860 |
if (stc->send_bmap_before_push) {
|
861 |
send_bmap(stc, target_peer->id); |
862 |
} |
863 |
chunk_attributes_update_sending(target_chunk); |
864 |
transid = transaction_create(stc->transactions, target_peer->id); |
865 |
res = sendChunk(target_peer->id, target_chunk, transid); //we use transactions in order to register acks for push
|
866 |
if (res>=0) { |
867 |
#ifdef LOG_CHUNK
|
868 |
log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
|
869 |
#endif
|
870 |
// chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
|
871 |
reg_chunk_send(psinstance_measures(stc->ps),target_chunk->id); |
872 |
success++; |
873 |
} else {
|
874 |
fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
|
875 |
} |
876 |
} |
877 |
return success;
|
878 |
|
879 |
} |
880 |
|
881 |
int inject_chunk(const struct streaming_context * stc, const struct chunk * target_chunk,const int multiplicity) |
882 |
/*injects a specific chunk in the overlay and return the number of injected copies*/
|
883 |
{ |
884 |
struct peerset *pset;
|
885 |
struct peer ** peers, ** dst_peers;
|
886 |
int peers_num;
|
887 |
double (* peer_evaluation) (struct peer **n); |
888 |
size_t i, selectedpairs_len = multiplicity; |
889 |
struct PeerChunk * selectedpairs;
|
890 |
|
891 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
892 |
peers_num = peerset_size(pset); |
893 |
peers = peerset_get_peers(pset); |
894 |
peer_evaluation = SCHED_PEER; |
895 |
|
896 |
//SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
|
897 |
dst_peers = (struct peer **) malloc(sizeof(struct peer* ) * multiplicity); |
898 |
if (stc->dist_type == DIST_TURBO)
|
899 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh); |
900 |
else
|
901 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
902 |
|
903 |
selectedpairs = (struct PeerChunk *) malloc(sizeof(struct PeerChunk) * selectedpairs_len); |
904 |
for ( i=0; i<selectedpairs_len; i++) |
905 |
{ |
906 |
selectedpairs[i].peer = dst_peers[i]; |
907 |
selectedpairs[i].chunk = target_chunk->id; |
908 |
} |
909 |
|
910 |
peer_chunk_dispatch(stc, selectedpairs,selectedpairs_len); |
911 |
|
912 |
free(selectedpairs); |
913 |
free(dst_peers); |
914 |
return selectedpairs_len;
|
915 |
} |
916 |
|
917 |
void send_chunk(const struct streaming_context * stc) |
918 |
{ |
919 |
struct chunk *buff;
|
920 |
int size, res, i, n;
|
921 |
struct peer **neighbours;
|
922 |
struct peerset *pset;
|
923 |
|
924 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
925 |
n = peerset_size(pset); |
926 |
neighbours = peerset_get_peers(pset); |
927 |
dprintf("Send Chunk: %d neighbours\n", n);
|
928 |
if (n == 0) return; |
929 |
buff = cb_get_chunks(stc->cb, &size); |
930 |
dprintf("\t %d chunks in buffer...\n", size);
|
931 |
if (size == 0) return; |
932 |
|
933 |
/************ STUPID DUMB SCHEDULING ****************/
|
934 |
//target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
935 |
//c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
936 |
/************ /STUPID DUMB SCHEDULING ****************/
|
937 |
|
938 |
/************ USE SCHEDULER ****************/
|
939 |
{ |
940 |
size_t selectedpairs_len = 1;
|
941 |
int chunkids[size];
|
942 |
int transid;
|
943 |
struct peer *nodeids[n];
|
944 |
struct PeerChunk selectedpairs[1]; |
945 |
|
946 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
947 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
948 |
SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
|
949 |
/************ /USE SCHEDULER ****************/
|
950 |
|
951 |
for (i=0; i<(int)selectedpairs_len ; i++){ |
952 |
struct peer *p = selectedpairs[i].peer;
|
953 |
const struct chunk *c = cb_get_chunk(stc->cb, selectedpairs[i].chunk); |
954 |
dprintf("\t sending chunk[%d] to ", c->id);
|
955 |
dprintf("%s\n", nodeid_static_str(p->id));
|
956 |
|
957 |
if (stc->send_bmap_before_push) {
|
958 |
send_bmap(stc, p->id); |
959 |
} |
960 |
|
961 |
chunk_attributes_update_sending(c); |
962 |
transid = transaction_create(stc->transactions, p->id); |
963 |
res = sendChunk(p->id, c, transid); //we use transactions in order to register acks for push
|
964 |
// res = sendChunk(p->id, c, 0); //we do not use transactions in pure push
|
965 |
dprintf("\tResult: %d\n", res);
|
966 |
if (res>=0) { |
967 |
#ifdef LOG_CHUNK
|
968 |
log_chunk(psinstance_nodeid(stc->ps),p->id,c,"SENT");
|
969 |
#endif
|
970 |
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
|
971 |
chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
972 |
reg_chunk_send(psinstance_measures(stc->ps),c->id); |
973 |
} else {
|
974 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
975 |
} |
976 |
} |
977 |
} |
978 |
} |
979 |
|
980 |
suseconds_t streaming_offer_interval(const struct streaming_context *stc) |
981 |
{ |
982 |
struct peerset *pset;
|
983 |
const struct peer * p; |
984 |
int i;
|
985 |
double load = 0; |
986 |
suseconds_t offer_int; |
987 |
|
988 |
offer_int = chunk_interval_measure(psinstance_measures(stc->ps)); |
989 |
|
990 |
if (stc->dist_type == DIST_TURBO) {
|
991 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
992 |
peerset_for_each(pset,p,i) |
993 |
load += peerWeightInvNeigh((struct peer **)&p);
|
994 |
offer_int /= load; |
995 |
} |
996 |
|
997 |
return offer_int / stc->offer_per_period;
|
998 |
} |