streamers / streaming.c @ 784a92e2
History | View | Annotate | Download (16.7 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Luca Abeni
|
3 |
* Copyright (c) 2010 Csaba Kiraly
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <sys/time.h> |
8 |
#include <stdlib.h> |
9 |
#include <stdio.h> |
10 |
#include <stdint.h> |
11 |
#include <stdbool.h> |
12 |
#include <math.h> |
13 |
#include <assert.h> |
14 |
#include <string.h> |
15 |
#include <inttypes.h> |
16 |
|
17 |
#include <net_helper.h> |
18 |
#include <chunk.h> |
19 |
#include <chunkbuffer.h> |
20 |
#include <trade_msg_la.h> |
21 |
#include <trade_msg_ha.h> |
22 |
#include <peerset.h> |
23 |
#include <peer.h> |
24 |
#include <chunkidset.h> |
25 |
#include <limits.h> |
26 |
#include <trade_sig_ha.h> |
27 |
|
28 |
#include "streaming.h" |
29 |
#include "output.h" |
30 |
#include "input.h" |
31 |
#include "dbg.h" |
32 |
#include "chunk_signaling.h" |
33 |
#include "chunklock.h" |
34 |
#include "topology.h" |
35 |
#include "measures.h" |
36 |
#include "scheduling.h" |
37 |
|
38 |
#include "scheduler_la.h" |
39 |
|
40 |
static bool heuristics_distance_maxdeliver = false; |
41 |
static int bcast_after_receive_every = 0; |
42 |
static bool neigh_on_chunk_recv = false; |
43 |
|
44 |
struct chunk_attributes {
|
45 |
uint64_t deadline; |
46 |
uint16_t deadline_increment; |
47 |
uint16_t hopcount; |
48 |
} __attribute__((packed)); |
49 |
|
50 |
extern bool chunk_log; |
51 |
|
52 |
struct chunk_buffer *cb;
|
53 |
static struct input_desc *input; |
54 |
static int cb_size; |
55 |
static int transid=0; |
56 |
|
57 |
static int offer_per_tick = 1; //N_p parameter of POLITO |
58 |
|
59 |
int _needs(struct chunkID_set *cset, int cb_size, int cid); |
60 |
|
61 |
uint64_t gettimeofday_in_us(void)
|
62 |
{ |
63 |
struct timeval what_time; //to store the epoch time |
64 |
|
65 |
gettimeofday(&what_time, NULL);
|
66 |
return what_time.tv_sec * 1000000ULL + what_time.tv_usec; |
67 |
} |
68 |
|
69 |
void cb_print()
|
70 |
{ |
71 |
#ifdef DEBUG
|
72 |
struct chunk *chunks;
|
73 |
int num_chunks, i, id;
|
74 |
chunks = cb_get_chunks(cb, &num_chunks); |
75 |
|
76 |
dprintf("\tchbuf :");
|
77 |
i = 0;
|
78 |
if(num_chunks) {
|
79 |
id = chunks[0].id;
|
80 |
dprintf(" %d-> ",id);
|
81 |
while (i < num_chunks) {
|
82 |
if (id == chunks[i].id) {
|
83 |
dprintf("%d",id % 10); |
84 |
i++; |
85 |
} else if (chunk_islocked(id)) { |
86 |
dprintf("*");
|
87 |
} else {
|
88 |
dprintf(".");
|
89 |
} |
90 |
id++; |
91 |
} |
92 |
} |
93 |
dprintf("\n");
|
94 |
#endif
|
95 |
} |
96 |
|
97 |
void stream_init(int size, struct nodeID *myID) |
98 |
{ |
99 |
static char conf[32]; |
100 |
|
101 |
cb_size = size; |
102 |
|
103 |
sprintf(conf, "size=%d", cb_size);
|
104 |
cb = cb_init(conf); |
105 |
chunkDeliveryInit(myID); |
106 |
chunkSignalingInit(myID); |
107 |
init_measures(); |
108 |
} |
109 |
|
110 |
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size) |
111 |
{ |
112 |
int flags = 0; |
113 |
|
114 |
if (memcmp(fname, "udp:", 4) == 0) { |
115 |
fname += 4;
|
116 |
flags = INPUT_UDP; |
117 |
} |
118 |
if (loop) {
|
119 |
flags |= INPUT_LOOP; |
120 |
} |
121 |
input = input_open(fname, flags, fds, fds_size); |
122 |
if (input == NULL) { |
123 |
return -1; |
124 |
} |
125 |
|
126 |
stream_init(1, myID);
|
127 |
return 0; |
128 |
} |
129 |
|
130 |
void chunk_attributes_fill(struct chunk* c) |
131 |
{ |
132 |
struct chunk_attributes * ca;
|
133 |
static uint32_t counter = 0; |
134 |
|
135 |
assert(!c->attributes && c->attributes_size == 0);
|
136 |
|
137 |
c->attributes_size = sizeof(struct chunk_attributes); |
138 |
c->attributes = ca = malloc(c->attributes_size); |
139 |
|
140 |
ca->deadline = c->id; |
141 |
ca->deadline_increment = ((counter++ % 3) + 1) * 2; |
142 |
ca->hopcount = 0;
|
143 |
} |
144 |
|
145 |
int chunk_get_hopcount(struct chunk* c) { |
146 |
struct chunk_attributes * ca;
|
147 |
|
148 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
149 |
fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
150 |
return -1; |
151 |
} |
152 |
|
153 |
ca = (struct chunk_attributes *) c->attributes;
|
154 |
return ca->hopcount;
|
155 |
} |
156 |
|
157 |
void chunk_attributes_update_received(struct chunk* c) |
158 |
{ |
159 |
struct chunk_attributes * ca;
|
160 |
|
161 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
162 |
fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
163 |
return;
|
164 |
} |
165 |
|
166 |
ca = (struct chunk_attributes *) c->attributes;
|
167 |
ca->hopcount++; |
168 |
dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
|
169 |
} |
170 |
|
171 |
void chunk_attributes_update_sending(const struct chunk* c) |
172 |
{ |
173 |
struct chunk_attributes * ca;
|
174 |
|
175 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
176 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
177 |
return;
|
178 |
} |
179 |
|
180 |
ca = (struct chunk_attributes *) c->attributes;
|
181 |
ca->deadline += ca->deadline_increment; |
182 |
dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
|
183 |
} |
184 |
|
185 |
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
186 |
{ |
187 |
struct chunk *chunks;
|
188 |
int num_chunks, i;
|
189 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
190 |
chunks = cb_get_chunks(chbuf, &num_chunks); |
191 |
|
192 |
for(i=num_chunks-1; i>=0; i--) { |
193 |
chunkID_set_add_chunk(my_bmap, chunks[i].id); |
194 |
} |
195 |
return my_bmap;
|
196 |
} |
197 |
|
198 |
// a simple implementation that request everything that we miss ... up to max deliver
|
199 |
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){ |
200 |
struct chunkID_set *cset_acc, *my_bmap;
|
201 |
int i, d, cset_off_size;
|
202 |
//double lossrate;
|
203 |
struct peer *from = nodeid_to_peer(fromid, 0); |
204 |
|
205 |
cset_acc = chunkID_set_init("size=0");
|
206 |
|
207 |
//reduce load a little bit if there are losses on the path from this guy
|
208 |
//lossrate = get_lossrate_receive(from->id);
|
209 |
//lossrate = finite(lossrate) ? lossrate : 0; //start agressively, assuming 0 loss
|
210 |
//if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
|
211 |
my_bmap = cb_to_bmap(cb); |
212 |
cset_off_size = chunkID_set_size(cset_off); |
213 |
for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) { |
214 |
int chunkid = chunkID_set_get_chunk(cset_off, i);
|
215 |
//dprintf("\tdo I need c%d ? :",chunkid);
|
216 |
if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
|
217 |
chunkID_set_add_chunk(cset_acc, chunkid); |
218 |
chunk_lock(chunkid,from); |
219 |
dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
|
220 |
#ifdef MONL
|
221 |
dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
|
222 |
#endif
|
223 |
dprintf("\n");
|
224 |
d++; |
225 |
} |
226 |
} |
227 |
chunkID_set_free(my_bmap); |
228 |
//} else {
|
229 |
// dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
|
230 |
//}
|
231 |
|
232 |
return cset_acc;
|
233 |
} |
234 |
|
235 |
void send_bmap(struct nodeID *toid) |
236 |
{ |
237 |
struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
238 |
sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0); |
239 |
chunkID_set_free(my_bmap); |
240 |
} |
241 |
|
242 |
void bcast_bmap()
|
243 |
{ |
244 |
int i, n;
|
245 |
struct peer *neighbours;
|
246 |
struct peerset *pset;
|
247 |
struct chunkID_set *my_bmap;
|
248 |
|
249 |
pset = get_peers(); |
250 |
n = peerset_size(pset); |
251 |
neighbours = peerset_get_peers(pset); |
252 |
|
253 |
my_bmap = cb_to_bmap(cb); //cache our bmap for faster processing
|
254 |
for (i = 0; i<n; i++) { |
255 |
sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0); |
256 |
} |
257 |
chunkID_set_free(my_bmap); |
258 |
} |
259 |
|
260 |
double get_average_lossrate_pset(struct peerset *pset) |
261 |
{ |
262 |
int i, n;
|
263 |
struct peer *neighbours;
|
264 |
|
265 |
n = peerset_size(pset); |
266 |
neighbours = peerset_get_peers(pset); |
267 |
{ |
268 |
struct nodeID *nodeids[n];
|
269 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id; |
270 |
#ifdef MONL
|
271 |
return get_average_lossrate(nodeids, n);
|
272 |
#else
|
273 |
return 0; |
274 |
#endif
|
275 |
} |
276 |
} |
277 |
|
278 |
void ack_chunk(struct chunk *c, struct nodeID *from) |
279 |
{ |
280 |
//reduce load a little bit if there are losses on the path from this guy
|
281 |
double average_lossrate = get_average_lossrate_pset(get_peers());
|
282 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
283 |
if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) { |
284 |
return;
|
285 |
} |
286 |
send_bmap(from); //send explicit ack
|
287 |
} |
288 |
|
289 |
void received_chunk(struct nodeID *from, const uint8_t *buff, int len) |
290 |
{ |
291 |
int res;
|
292 |
static struct chunk c; |
293 |
struct peer *p;
|
294 |
static int bcast_cnt; |
295 |
uint16_t transid; |
296 |
|
297 |
res = parseChunkMsg(buff + 1, len - 1, &c, &transid); |
298 |
if (res > 0) { |
299 |
chunk_attributes_update_received(&c); |
300 |
chunk_unlock(c.id); |
301 |
dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
|
302 |
if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));} |
303 |
output_deliver(&c); |
304 |
res = cb_add_chunk(cb, &c); |
305 |
reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE); |
306 |
cb_print(); |
307 |
if (res < 0) { |
308 |
dprintf("\tchunk too old, buffer full with newer chunks\n");
|
309 |
if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());} |
310 |
free(c.data); |
311 |
free(c.attributes); |
312 |
} |
313 |
p = nodeid_to_peer(from, neigh_on_chunk_recv); |
314 |
if (p) { //now we have it almost sure |
315 |
chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
316 |
} |
317 |
ack_chunk(&c,from); //send explicit ack
|
318 |
if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) { |
319 |
bcast_bmap(); |
320 |
} |
321 |
} else {
|
322 |
fprintf(stderr,"\tError: can't decode chunk!\n");
|
323 |
} |
324 |
} |
325 |
|
326 |
struct chunk *generated_chunk(suseconds_t *delta)
|
327 |
{ |
328 |
struct chunk *c;
|
329 |
|
330 |
c = malloc(sizeof(struct chunk)); |
331 |
if (!c) {
|
332 |
fprintf(stderr, "Memory allocation error!\n");
|
333 |
return NULL; |
334 |
} |
335 |
|
336 |
*delta = input_get(input, c); |
337 |
if (*delta < 0) { |
338 |
fprintf(stderr, "Error in input!\n");
|
339 |
exit(-1);
|
340 |
} |
341 |
if (c->data == NULL) { |
342 |
free(c); |
343 |
return NULL; |
344 |
} |
345 |
dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
|
346 |
chunk_attributes_fill(c); |
347 |
return c;
|
348 |
} |
349 |
|
350 |
int add_chunk(struct chunk *c) |
351 |
{ |
352 |
int res;
|
353 |
|
354 |
res = cb_add_chunk(cb, c); |
355 |
if (res < 0) { |
356 |
free(c->data); |
357 |
free(c->attributes); |
358 |
free(c); |
359 |
return 0; |
360 |
} |
361 |
free(c); |
362 |
return 1; |
363 |
} |
364 |
|
365 |
/**
|
366 |
*example function to filter chunks based on whether a given peer needs them.
|
367 |
*
|
368 |
* Looks at buffermap information received about the given peer.
|
369 |
*/
|
370 |
int needs(struct peer *n, int cid){ |
371 |
struct peer * p = n;
|
372 |
|
373 |
//dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
|
374 |
if (! p->bmap) {
|
375 |
//dprintf("no bmap\n");
|
376 |
return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
377 |
} |
378 |
return _needs(p->bmap, p->cb_size, cid);
|
379 |
} |
380 |
|
381 |
int _needs(struct chunkID_set *cset, int cb_size, int cid){ |
382 |
if (cb_size == 0) { //if it declared it does not needs chunks |
383 |
return 0; |
384 |
} |
385 |
|
386 |
if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk |
387 |
int missing, min;
|
388 |
//@TODO: add some bmap_timestamp based logic
|
389 |
|
390 |
if (chunkID_set_size(cset) == 0) { |
391 |
//dprintf("bmap empty\n");
|
392 |
return 1; // if the bmap seems empty, it needs the chunk |
393 |
} |
394 |
missing = cb_size - chunkID_set_size(cset); |
395 |
missing = missing < 0 ? 0 : missing; |
396 |
min = chunkID_set_get_earliest(cset); |
397 |
//dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
|
398 |
return (cid >= min - missing);
|
399 |
} |
400 |
|
401 |
//dprintf("has it\n");
|
402 |
return 0; |
403 |
} |
404 |
|
405 |
double peerWeightReceivedfrom(struct peer **n){ |
406 |
struct peer * p = *n;
|
407 |
return timerisset(&p->bmap_timestamp) ? 1 : 0.1; |
408 |
} |
409 |
|
410 |
double peerWeightUniform(struct peer **n){ |
411 |
return 1; |
412 |
} |
413 |
|
414 |
double peerWeightRtt(struct peer **n){ |
415 |
#ifdef MONL
|
416 |
double rtt = get_rtt(*n->id);
|
417 |
//dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
|
418 |
return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1; |
419 |
#else
|
420 |
return 1; |
421 |
#endif
|
422 |
} |
423 |
|
424 |
//ordering function for ELp peer selection, chunk ID based
|
425 |
//can't be used as weight
|
426 |
double peerScoreELpID(struct nodeID **n){ |
427 |
struct chunkID_set *bmap;
|
428 |
int latest;
|
429 |
struct peer * p = nodeid_to_peer(*n, 0); |
430 |
if (!p) return 0; |
431 |
|
432 |
bmap = p->bmap; |
433 |
if (!bmap) return 0; |
434 |
latest = chunkID_set_get_latest(bmap); |
435 |
if (latest == INT_MIN) return 0; |
436 |
|
437 |
return -latest;
|
438 |
} |
439 |
|
440 |
double chunkScoreChunkID(int *cid){ |
441 |
return (double) *cid; |
442 |
} |
443 |
|
444 |
double chunkScoreDL(int *cid){ |
445 |
struct chunk_attributes * ca;
|
446 |
struct chunk *c;
|
447 |
|
448 |
c = cb_get_chunk(cb, *cid); |
449 |
if (!c) return 0; |
450 |
|
451 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
452 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
453 |
return;
|
454 |
} |
455 |
|
456 |
ca = (struct chunk_attributes *) c->attributes;
|
457 |
return - (double)ca->deadline; |
458 |
} |
459 |
|
460 |
double getChunkTimestamp(int *cid){ |
461 |
const struct chunk *c = cb_get_chunk(cb, *cid); |
462 |
if (!c) return 0; |
463 |
|
464 |
return (double) c->timestamp; |
465 |
} |
466 |
|
467 |
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){ |
468 |
int i, d, cset_acc_size, res;
|
469 |
struct peer *to = nodeid_to_peer(toid, 0); |
470 |
|
471 |
cset_acc_size = chunkID_set_size(cset_acc); |
472 |
reg_offer_accept(cset_acc_size > 0 ? 1 : 0); //this only works if accepts are sent back even if 0 is accepted |
473 |
for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) { |
474 |
const struct chunk *c; |
475 |
int chunkid = chunkID_set_get_chunk(cset_acc, i);
|
476 |
c = cb_get_chunk(cb, chunkid); |
477 |
if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification |
478 |
chunk_attributes_update_sending(c); |
479 |
res = sendChunk(toid, c, trans_id); |
480 |
if (res >= 0) { |
481 |
if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive |
482 |
d++; |
483 |
reg_chunk_send(c->id); |
484 |
if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);} |
485 |
} else {
|
486 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
487 |
} |
488 |
} |
489 |
} |
490 |
} |
491 |
|
492 |
int offer_peer_count()
|
493 |
{ |
494 |
return offer_per_tick;
|
495 |
} |
496 |
|
497 |
int offer_max_deliver(struct nodeID *n) |
498 |
{ |
499 |
|
500 |
if (!heuristics_distance_maxdeliver) return 1; |
501 |
|
502 |
#ifdef MONL
|
503 |
switch (get_hopcount(n)) {
|
504 |
case 0: return 5; |
505 |
case 1: return 2; |
506 |
default: return 1; |
507 |
} |
508 |
#else
|
509 |
return 1; |
510 |
#endif
|
511 |
} |
512 |
|
513 |
void send_offer()
|
514 |
{ |
515 |
struct chunk *buff;
|
516 |
int size, res, i, n;
|
517 |
struct peer *neighbours;
|
518 |
struct peerset *pset;
|
519 |
|
520 |
pset = get_peers(); |
521 |
n = peerset_size(pset); |
522 |
neighbours = peerset_get_peers(pset); |
523 |
dprintf("Send Offer: %d neighbours\n", n);
|
524 |
if (n == 0) return; |
525 |
buff = cb_get_chunks(cb, &size); |
526 |
if (size == 0) return; |
527 |
|
528 |
{ |
529 |
size_t selectedpeers_len = offer_peer_count(); |
530 |
int chunkids[size];
|
531 |
struct peer *nodeids[n];
|
532 |
struct peer *selectedpeers[selectedpeers_len];
|
533 |
|
534 |
//reduce load a little bit if there are losses on the path from this guy
|
535 |
double average_lossrate = get_average_lossrate_pset(pset);
|
536 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
537 |
if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) { |
538 |
return;
|
539 |
} |
540 |
|
541 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
542 |
for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
543 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
544 |
|
545 |
for (i=0; i<selectedpeers_len ; i++){ |
546 |
int max_deliver = offer_max_deliver(selectedpeers[i]->id);
|
547 |
struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
548 |
dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
|
549 |
res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++); |
550 |
chunkID_set_free(my_bmap); |
551 |
} |
552 |
} |
553 |
} |
554 |
|
555 |
|
556 |
void send_chunk()
|
557 |
{ |
558 |
struct chunk *buff;
|
559 |
int size, res, i, n;
|
560 |
struct peer *neighbours;
|
561 |
struct peerset *pset;
|
562 |
|
563 |
pset = get_peers(); |
564 |
n = peerset_size(pset); |
565 |
neighbours = peerset_get_peers(pset); |
566 |
dprintf("Send Chunk: %d neighbours\n", n);
|
567 |
if (n == 0) return; |
568 |
buff = cb_get_chunks(cb, &size); |
569 |
dprintf("\t %d chunks in buffer...\n", size);
|
570 |
if (size == 0) return; |
571 |
|
572 |
/************ STUPID DUMB SCHEDULING ****************/
|
573 |
//target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
574 |
//c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
575 |
/************ /STUPID DUMB SCHEDULING ****************/
|
576 |
|
577 |
/************ USE SCHEDULER ****************/
|
578 |
{ |
579 |
size_t selectedpairs_len = 1;
|
580 |
int chunkids[size];
|
581 |
struct peer *nodeids[n];
|
582 |
struct PeerChunk selectedpairs[1]; |
583 |
|
584 |
for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id; |
585 |
for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
586 |
SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK); |
587 |
/************ /USE SCHEDULER ****************/
|
588 |
|
589 |
for (i=0; i<selectedpairs_len ; i++){ |
590 |
struct peer *p = selectedpairs[i].peer;
|
591 |
struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
|
592 |
dprintf("\t sending chunk[%d] to ", c->id);
|
593 |
dprintf("%s\n", node_addr(p->id));
|
594 |
|
595 |
send_bmap(p->id); |
596 |
|
597 |
chunk_attributes_update_sending(c); |
598 |
res = sendChunk(p->id, c, 0); //we do not use transactions in pure push |
599 |
if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);} |
600 |
dprintf("\tResult: %d\n", res);
|
601 |
if (res>=0) { |
602 |
chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
603 |
reg_chunk_send(c->id); |
604 |
} else {
|
605 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
606 |
} |
607 |
} |
608 |
} |
609 |
} |