streamers / streaming.c @ d6d4b2c2
History | View | Annotate | Download (17.9 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Luca Abeni
|
3 |
* Copyright (c) 2010 Csaba Kiraly
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <sys/time.h> |
8 |
#include <stdlib.h> |
9 |
#include <stdio.h> |
10 |
#include <stdint.h> |
11 |
#include <stdbool.h> |
12 |
#include <math.h> |
13 |
#include <assert.h> |
14 |
#include <string.h> |
15 |
#include <inttypes.h> |
16 |
|
17 |
#include <net_helper.h> |
18 |
#include <chunk.h> |
19 |
#include <chunkbuffer.h> |
20 |
#include <trade_msg_la.h> |
21 |
#include <trade_msg_ha.h> |
22 |
#include <peerset.h> |
23 |
#include <peer.h> |
24 |
#include <chunkidset.h> |
25 |
#include <limits.h> |
26 |
#include <trade_sig_ha.h> |
27 |
#include <chunkiser_attrib.h> |
28 |
|
29 |
#include "streaming.h" |
30 |
#include "output.h" |
31 |
#include "input.h" |
32 |
#include "dbg.h" |
33 |
#include "chunk_signaling.h" |
34 |
#include "chunklock.h" |
35 |
#include "topology.h" |
36 |
#include "measures.h" |
37 |
#include "scheduling.h" |
38 |
|
39 |
#include "scheduler_la.h" |
40 |
|
41 |
# define CB_SIZE_TIME_UNLIMITED 1e12 |
42 |
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED; //in millisec, defaults to unlimited
|
43 |
|
44 |
static bool heuristics_distance_maxdeliver = false; |
45 |
static int bcast_after_receive_every = 0; |
46 |
static bool neigh_on_chunk_recv = false; |
47 |
|
48 |
struct chunk_attributes {
|
49 |
uint64_t deadline; |
50 |
uint16_t deadline_increment; |
51 |
uint16_t hopcount; |
52 |
} __attribute__((packed)); |
53 |
|
54 |
extern bool chunk_log; |
55 |
|
56 |
struct chunk_buffer *cb;
|
57 |
static struct input_desc *input; |
58 |
static int cb_size; |
59 |
static int transid=0; |
60 |
|
61 |
static int offer_per_tick = 1; //N_p parameter of POLITO |
62 |
|
63 |
int _needs(struct chunkID_set *cset, int cb_size, int cid); |
64 |
|
65 |
uint64_t gettimeofday_in_us(void)
|
66 |
{ |
67 |
struct timeval what_time; //to store the epoch time |
68 |
|
69 |
gettimeofday(&what_time, NULL);
|
70 |
return what_time.tv_sec * 1000000ULL + what_time.tv_usec; |
71 |
} |
72 |
|
73 |
void cb_print()
|
74 |
{ |
75 |
#ifdef DEBUG
|
76 |
struct chunk *chunks;
|
77 |
int num_chunks, i, id;
|
78 |
chunks = cb_get_chunks(cb, &num_chunks); |
79 |
|
80 |
dprintf("\tchbuf :");
|
81 |
i = 0;
|
82 |
if(num_chunks) {
|
83 |
id = chunks[0].id;
|
84 |
dprintf(" %d-> ",id);
|
85 |
while (i < num_chunks) {
|
86 |
if (id == chunks[i].id) {
|
87 |
dprintf("%d",id % 10); |
88 |
i++; |
89 |
} else if (chunk_islocked(id)) { |
90 |
dprintf("*");
|
91 |
} else {
|
92 |
dprintf(".");
|
93 |
} |
94 |
id++; |
95 |
} |
96 |
} |
97 |
dprintf("\n");
|
98 |
#endif
|
99 |
} |
100 |
|
101 |
void stream_init(int size, struct nodeID *myID) |
102 |
{ |
103 |
static char conf[32]; |
104 |
|
105 |
cb_size = size; |
106 |
|
107 |
sprintf(conf, "size=%d", cb_size);
|
108 |
cb = cb_init(conf); |
109 |
chunkDeliveryInit(myID); |
110 |
chunkSignalingInit(myID); |
111 |
init_measures(); |
112 |
} |
113 |
|
114 |
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size) |
115 |
{ |
116 |
int flags = 0; |
117 |
|
118 |
if (memcmp(fname, "udp:", 4) == 0) { |
119 |
fname += 4;
|
120 |
flags = INPUT_UDP; |
121 |
} |
122 |
if (memcmp(fname, "ipb:", 4) == 0) { |
123 |
fname += 4;
|
124 |
flags = INPUT_IPB; |
125 |
} |
126 |
if (loop) {
|
127 |
flags |= INPUT_LOOP; |
128 |
} |
129 |
|
130 |
input = input_open(fname, flags, fds, fds_size); |
131 |
if (input == NULL) { |
132 |
return -1; |
133 |
} |
134 |
|
135 |
stream_init(1, myID);
|
136 |
return 0; |
137 |
} |
138 |
|
139 |
void chunk_attributes_fill(struct chunk* c) |
140 |
{ |
141 |
struct chunk_attributes * ca;
|
142 |
int priority = 1; |
143 |
|
144 |
assert((!c->attributes && c->attributes_size == 0) ||
|
145 |
chunk_attributes_chunker_verify(c->attributes, c->attributes_size)); |
146 |
|
147 |
if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
|
148 |
priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
|
149 |
free(c->attributes); |
150 |
c->attributes = NULL;
|
151 |
c->attributes_size = 0;
|
152 |
} |
153 |
|
154 |
c->attributes_size = sizeof(struct chunk_attributes); |
155 |
c->attributes = ca = malloc(c->attributes_size); |
156 |
|
157 |
ca->deadline = c->id; |
158 |
ca->deadline_increment = priority * 2;
|
159 |
ca->hopcount = 0;
|
160 |
} |
161 |
|
162 |
int chunk_get_hopcount(struct chunk* c) { |
163 |
struct chunk_attributes * ca;
|
164 |
|
165 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
166 |
fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
167 |
return -1; |
168 |
} |
169 |
|
170 |
ca = (struct chunk_attributes *) c->attributes;
|
171 |
return ca->hopcount;
|
172 |
} |
173 |
|
174 |
void chunk_attributes_update_received(struct chunk* c) |
175 |
{ |
176 |
struct chunk_attributes * ca;
|
177 |
|
178 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
179 |
fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
180 |
return;
|
181 |
} |
182 |
|
183 |
ca = (struct chunk_attributes *) c->attributes;
|
184 |
ca->hopcount++; |
185 |
dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
|
186 |
} |
187 |
|
188 |
void chunk_attributes_update_sending(const struct chunk* c) |
189 |
{ |
190 |
struct chunk_attributes * ca;
|
191 |
|
192 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
193 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
194 |
return;
|
195 |
} |
196 |
|
197 |
ca = (struct chunk_attributes *) c->attributes;
|
198 |
ca->deadline += ca->deadline_increment; |
199 |
dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
|
200 |
} |
201 |
|
202 |
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
203 |
{ |
204 |
struct chunk *chunks;
|
205 |
int num_chunks, i;
|
206 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
207 |
chunks = cb_get_chunks(chbuf, &num_chunks); |
208 |
|
209 |
for(i=num_chunks-1; i>=0; i--) { |
210 |
chunkID_set_add_chunk(my_bmap, chunks[i].id); |
211 |
} |
212 |
return my_bmap;
|
213 |
} |
214 |
|
215 |
// a simple implementation that request everything that we miss ... up to max deliver
|
216 |
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){ |
217 |
struct chunkID_set *cset_acc, *my_bmap;
|
218 |
int i, d, cset_off_size;
|
219 |
//double lossrate;
|
220 |
struct peer *from = nodeid_to_peer(fromid, 0); |
221 |
|
222 |
cset_acc = chunkID_set_init("size=0");
|
223 |
|
224 |
//reduce load a little bit if there are losses on the path from this guy
|
225 |
//lossrate = get_lossrate_receive(from->id);
|
226 |
//lossrate = finite(lossrate) ? lossrate : 0; //start agressively, assuming 0 loss
|
227 |
//if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
|
228 |
my_bmap = cb_to_bmap(cb); |
229 |
cset_off_size = chunkID_set_size(cset_off); |
230 |
for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) { |
231 |
int chunkid = chunkID_set_get_chunk(cset_off, i);
|
232 |
//dprintf("\tdo I need c%d ? :",chunkid);
|
233 |
if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
|
234 |
chunkID_set_add_chunk(cset_acc, chunkid); |
235 |
chunk_lock(chunkid,from); |
236 |
dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
|
237 |
#ifdef MONL
|
238 |
dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
|
239 |
#endif
|
240 |
dprintf("\n");
|
241 |
d++; |
242 |
} |
243 |
} |
244 |
chunkID_set_free(my_bmap); |
245 |
//} else {
|
246 |
// dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
|
247 |
//}
|
248 |
|
249 |
return cset_acc;
|
250 |
} |
251 |
|
252 |
void send_bmap(struct nodeID *toid) |
253 |
{ |
254 |
struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
255 |
sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0); |
256 |
chunkID_set_free(my_bmap); |
257 |
} |
258 |
|
259 |
void bcast_bmap()
|
260 |
{ |
261 |
int i, n;
|
262 |
struct peer *neighbours;
|
263 |
struct peerset *pset;
|
264 |
struct chunkID_set *my_bmap;
|
265 |
|
266 |
pset = get_peers(); |
267 |
n = peerset_size(pset); |
268 |
neighbours = peerset_get_peers(pset); |
269 |
|
270 |
my_bmap = cb_to_bmap(cb); //cache our bmap for faster processing
|
271 |
for (i = 0; i<n; i++) { |
272 |
sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0); |
273 |
} |
274 |
chunkID_set_free(my_bmap); |
275 |
} |
276 |
|
277 |
double get_average_lossrate_pset(struct peerset *pset) |
278 |
{ |
279 |
int i, n;
|
280 |
struct peer *neighbours;
|
281 |
|
282 |
n = peerset_size(pset); |
283 |
neighbours = peerset_get_peers(pset); |
284 |
{ |
285 |
struct nodeID *nodeids[n];
|
286 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id; |
287 |
#ifdef MONL
|
288 |
return get_average_lossrate(nodeids, n);
|
289 |
#else
|
290 |
return 0; |
291 |
#endif
|
292 |
} |
293 |
} |
294 |
|
295 |
void ack_chunk(struct chunk *c, struct nodeID *from) |
296 |
{ |
297 |
//reduce load a little bit if there are losses on the path from this guy
|
298 |
double average_lossrate = get_average_lossrate_pset(get_peers());
|
299 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
300 |
if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) { |
301 |
return;
|
302 |
} |
303 |
send_bmap(from); //send explicit ack
|
304 |
} |
305 |
|
306 |
void received_chunk(struct nodeID *from, const uint8_t *buff, int len) |
307 |
{ |
308 |
int res;
|
309 |
static struct chunk c; |
310 |
struct peer *p;
|
311 |
static int bcast_cnt; |
312 |
uint16_t transid; |
313 |
|
314 |
res = parseChunkMsg(buff + 1, len - 1, &c, &transid); |
315 |
if (res > 0) { |
316 |
chunk_attributes_update_received(&c); |
317 |
chunk_unlock(c.id); |
318 |
dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
|
319 |
if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));} |
320 |
output_deliver(&c); |
321 |
res = cb_add_chunk(cb, &c); |
322 |
reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE); |
323 |
cb_print(); |
324 |
if (res < 0) { |
325 |
dprintf("\tchunk too old, buffer full with newer chunks\n");
|
326 |
if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());} |
327 |
free(c.data); |
328 |
free(c.attributes); |
329 |
} |
330 |
p = nodeid_to_peer(from, neigh_on_chunk_recv); |
331 |
if (p) { //now we have it almost sure |
332 |
chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
333 |
} |
334 |
ack_chunk(&c,from); //send explicit ack
|
335 |
if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) { |
336 |
bcast_bmap(); |
337 |
} |
338 |
} else {
|
339 |
fprintf(stderr,"\tError: can't decode chunk!\n");
|
340 |
} |
341 |
} |
342 |
|
343 |
struct chunk *generated_chunk(suseconds_t *delta)
|
344 |
{ |
345 |
struct chunk *c;
|
346 |
|
347 |
c = malloc(sizeof(struct chunk)); |
348 |
if (!c) {
|
349 |
fprintf(stderr, "Memory allocation error!\n");
|
350 |
return NULL; |
351 |
} |
352 |
|
353 |
*delta = input_get(input, c); |
354 |
if (*delta < 0) { |
355 |
fprintf(stderr, "Error in input!\n");
|
356 |
exit(-1);
|
357 |
} |
358 |
if (c->data == NULL) { |
359 |
free(c); |
360 |
return NULL; |
361 |
} |
362 |
dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
|
363 |
chunk_attributes_fill(c); |
364 |
return c;
|
365 |
} |
366 |
|
367 |
int add_chunk(struct chunk *c) |
368 |
{ |
369 |
int res;
|
370 |
|
371 |
res = cb_add_chunk(cb, c); |
372 |
if (res < 0) { |
373 |
free(c->data); |
374 |
free(c->attributes); |
375 |
free(c); |
376 |
return 0; |
377 |
} |
378 |
free(c); |
379 |
return 1; |
380 |
} |
381 |
|
382 |
uint64_t get_chunk_timestamp(int cid){
|
383 |
const struct chunk *c = cb_get_chunk(cb, cid); |
384 |
if (!c) return 0; |
385 |
|
386 |
return c->timestamp;
|
387 |
} |
388 |
|
389 |
/**
|
390 |
*example function to filter chunks based on whether a given peer needs them.
|
391 |
*
|
392 |
* Looks at buffermap information received about the given peer.
|
393 |
*/
|
394 |
int needs(struct peer *n, int cid){ |
395 |
struct peer * p = n;
|
396 |
|
397 |
if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
398 |
uint64_t ts; |
399 |
ts = get_chunk_timestamp(cid); |
400 |
if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
401 |
return 0; |
402 |
} |
403 |
} |
404 |
|
405 |
//dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
|
406 |
if (! p->bmap) {
|
407 |
//dprintf("no bmap\n");
|
408 |
return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
409 |
} |
410 |
return _needs(p->bmap, p->cb_size, cid);
|
411 |
} |
412 |
|
413 |
int _needs(struct chunkID_set *cset, int cb_size, int cid){ |
414 |
|
415 |
if (cb_size == 0) { //if it declared it does not needs chunks |
416 |
return 0; |
417 |
} |
418 |
|
419 |
if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
420 |
uint64_t ts; |
421 |
ts = get_chunk_timestamp(cid); |
422 |
if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
423 |
return 0; |
424 |
} |
425 |
} |
426 |
|
427 |
if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk |
428 |
int missing, min;
|
429 |
//@TODO: add some bmap_timestamp based logic
|
430 |
|
431 |
if (chunkID_set_size(cset) == 0) { |
432 |
//dprintf("bmap empty\n");
|
433 |
return 1; // if the bmap seems empty, it needs the chunk |
434 |
} |
435 |
missing = cb_size - chunkID_set_size(cset); |
436 |
missing = missing < 0 ? 0 : missing; |
437 |
min = chunkID_set_get_earliest(cset); |
438 |
//dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
|
439 |
return (cid >= min - missing);
|
440 |
} |
441 |
|
442 |
//dprintf("has it\n");
|
443 |
return 0; |
444 |
} |
445 |
|
446 |
double peerWeightReceivedfrom(struct peer **n){ |
447 |
struct peer * p = *n;
|
448 |
return timerisset(&p->bmap_timestamp) ? 1 : 0.1; |
449 |
} |
450 |
|
451 |
double peerWeightUniform(struct peer **n){ |
452 |
return 1; |
453 |
} |
454 |
|
455 |
double peerWeightRtt(struct peer **n){ |
456 |
#ifdef MONL
|
457 |
double rtt = get_rtt((*n)->id);
|
458 |
//dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
|
459 |
return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1; |
460 |
#else
|
461 |
return 1; |
462 |
#endif
|
463 |
} |
464 |
|
465 |
//ordering function for ELp peer selection, chunk ID based
|
466 |
//can't be used as weight
|
467 |
double peerScoreELpID(struct nodeID **n){ |
468 |
struct chunkID_set *bmap;
|
469 |
int latest;
|
470 |
struct peer * p = nodeid_to_peer(*n, 0); |
471 |
if (!p) return 0; |
472 |
|
473 |
bmap = p->bmap; |
474 |
if (!bmap) return 0; |
475 |
latest = chunkID_set_get_latest(bmap); |
476 |
if (latest == INT_MIN) return 0; |
477 |
|
478 |
return -latest;
|
479 |
} |
480 |
|
481 |
double chunkScoreChunkID(int *cid){ |
482 |
return (double) *cid; |
483 |
} |
484 |
|
485 |
uint64_t get_chunk_deadline(int cid){
|
486 |
const struct chunk_attributes * ca; |
487 |
const struct chunk *c; |
488 |
|
489 |
c = cb_get_chunk(cb, cid); |
490 |
if (!c) return 0; |
491 |
|
492 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
493 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
494 |
return 0; |
495 |
} |
496 |
|
497 |
ca = (struct chunk_attributes *) c->attributes;
|
498 |
return ca->deadline;
|
499 |
} |
500 |
|
501 |
double chunkScoreDL(int *cid){ |
502 |
return - (double)get_chunk_deadline(*cid); |
503 |
} |
504 |
|
505 |
double chunkScoreTimestamp(int *cid){ |
506 |
return (double) get_chunk_timestamp(*cid); |
507 |
} |
508 |
|
509 |
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){ |
510 |
int i, d, cset_acc_size, res;
|
511 |
struct peer *to = nodeid_to_peer(toid, 0); |
512 |
|
513 |
cset_acc_size = chunkID_set_size(cset_acc); |
514 |
reg_offer_accept(cset_acc_size > 0 ? 1 : 0); //this only works if accepts are sent back even if 0 is accepted |
515 |
for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) { |
516 |
const struct chunk *c; |
517 |
int chunkid = chunkID_set_get_chunk(cset_acc, i);
|
518 |
c = cb_get_chunk(cb, chunkid); |
519 |
if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification |
520 |
chunk_attributes_update_sending(c); |
521 |
res = sendChunk(toid, c, trans_id); |
522 |
if (res >= 0) { |
523 |
if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive |
524 |
d++; |
525 |
reg_chunk_send(c->id); |
526 |
if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);} |
527 |
} else {
|
528 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
529 |
} |
530 |
} |
531 |
} |
532 |
} |
533 |
|
534 |
int offer_peer_count()
|
535 |
{ |
536 |
return offer_per_tick;
|
537 |
} |
538 |
|
539 |
int offer_max_deliver(struct nodeID *n) |
540 |
{ |
541 |
|
542 |
if (!heuristics_distance_maxdeliver) return 1; |
543 |
|
544 |
#ifdef MONL
|
545 |
switch (get_hopcount(n)) {
|
546 |
case 0: return 5; |
547 |
case 1: return 2; |
548 |
default: return 1; |
549 |
} |
550 |
#else
|
551 |
return 1; |
552 |
#endif
|
553 |
} |
554 |
|
555 |
void send_offer()
|
556 |
{ |
557 |
struct chunk *buff;
|
558 |
int size, res, i, n;
|
559 |
struct peer *neighbours;
|
560 |
struct peerset *pset;
|
561 |
|
562 |
pset = get_peers(); |
563 |
n = peerset_size(pset); |
564 |
neighbours = peerset_get_peers(pset); |
565 |
dprintf("Send Offer: %d neighbours\n", n);
|
566 |
if (n == 0) return; |
567 |
buff = cb_get_chunks(cb, &size); |
568 |
if (size == 0) return; |
569 |
|
570 |
{ |
571 |
size_t selectedpeers_len = offer_peer_count(); |
572 |
int chunkids[size];
|
573 |
struct peer *nodeids[n];
|
574 |
struct peer *selectedpeers[selectedpeers_len];
|
575 |
|
576 |
//reduce load a little bit if there are losses on the path from this guy
|
577 |
double average_lossrate = get_average_lossrate_pset(pset);
|
578 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
579 |
if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) { |
580 |
return;
|
581 |
} |
582 |
|
583 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
584 |
for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
585 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
586 |
|
587 |
for (i=0; i<selectedpeers_len ; i++){ |
588 |
int max_deliver = offer_max_deliver(selectedpeers[i]->id);
|
589 |
struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
590 |
dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
|
591 |
res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++); |
592 |
chunkID_set_free(my_bmap); |
593 |
} |
594 |
} |
595 |
} |
596 |
|
597 |
|
598 |
void send_chunk()
|
599 |
{ |
600 |
struct chunk *buff;
|
601 |
int size, res, i, n;
|
602 |
struct peer *neighbours;
|
603 |
struct peerset *pset;
|
604 |
|
605 |
pset = get_peers(); |
606 |
n = peerset_size(pset); |
607 |
neighbours = peerset_get_peers(pset); |
608 |
dprintf("Send Chunk: %d neighbours\n", n);
|
609 |
if (n == 0) return; |
610 |
buff = cb_get_chunks(cb, &size); |
611 |
dprintf("\t %d chunks in buffer...\n", size);
|
612 |
if (size == 0) return; |
613 |
|
614 |
/************ STUPID DUMB SCHEDULING ****************/
|
615 |
//target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
616 |
//c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
617 |
/************ /STUPID DUMB SCHEDULING ****************/
|
618 |
|
619 |
/************ USE SCHEDULER ****************/
|
620 |
{ |
621 |
size_t selectedpairs_len = 1;
|
622 |
int chunkids[size];
|
623 |
struct peer *nodeids[n];
|
624 |
struct PeerChunk selectedpairs[1]; |
625 |
|
626 |
for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id; |
627 |
for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
628 |
SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK); |
629 |
/************ /USE SCHEDULER ****************/
|
630 |
|
631 |
for (i=0; i<selectedpairs_len ; i++){ |
632 |
struct peer *p = selectedpairs[i].peer;
|
633 |
struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
|
634 |
dprintf("\t sending chunk[%d] to ", c->id);
|
635 |
dprintf("%s\n", node_addr(p->id));
|
636 |
|
637 |
send_bmap(p->id); |
638 |
|
639 |
chunk_attributes_update_sending(c); |
640 |
res = sendChunk(p->id, c, 0); //we do not use transactions in pure push |
641 |
if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);} |
642 |
dprintf("\tResult: %d\n", res);
|
643 |
if (res>=0) { |
644 |
chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
645 |
reg_chunk_send(c->id); |
646 |
} else {
|
647 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
648 |
} |
649 |
} |
650 |
} |
651 |
} |