pstreamer / src / streaming.c @ fe735c05
History | View | Annotate | Download (29.8 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010-2011 Luca Abeni
|
3 |
* Copyright (c) 2010-2011 Csaba Kiraly
|
4 |
* Copyright (c) 2017 Luca Baldesi
|
5 |
*
|
6 |
* This file is part of PeerStreamer.
|
7 |
*
|
8 |
* PeerStreamer is free software: you can redistribute it and/or
|
9 |
* modify it under the terms of the GNU Affero General Public License as
|
10 |
* published by the Free Software Foundation, either version 3 of the
|
11 |
* License, or (at your option) any later version.
|
12 |
*
|
13 |
* PeerStreamer is distributed in the hope that it will be useful,
|
14 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
15 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
|
16 |
* General Public License for more details.
|
17 |
*
|
18 |
* You should have received a copy of the GNU Affero General Public License
|
19 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>.
|
20 |
*
|
21 |
*/
|
22 |
#include <sys/time.h> |
23 |
#include <stdlib.h> |
24 |
#include <stdio.h> |
25 |
#include <stdint.h> |
26 |
#include <stdbool.h> |
27 |
#include <math.h> |
28 |
#include <assert.h> |
29 |
#include <string.h> |
30 |
#include <inttypes.h> |
31 |
|
32 |
#include <net_helper.h> |
33 |
#include <chunk.h> |
34 |
#include <chunkbuffer.h> |
35 |
#include <trade_msg_la.h> |
36 |
#include <trade_msg_ha.h> |
37 |
#include <peerset.h> |
38 |
#include <peer.h> |
39 |
#include <chunkidset.h> |
40 |
#include <limits.h> |
41 |
#include <trade_sig_ha.h> |
42 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
43 |
#include <chunkiser_attrib.h> |
44 |
#endif
|
45 |
|
46 |
#include "streaming.h" |
47 |
#include "output.h" |
48 |
#include "input.h" |
49 |
#include "dbg.h" |
50 |
#include "chunk_signaling.h" |
51 |
#include "chunklock.h" |
52 |
#include "topology.h" |
53 |
#include "measures.h" |
54 |
#include "net_helpers.h" |
55 |
#include "scheduling.h" |
56 |
#include "transaction.h" |
57 |
|
58 |
#include "scheduler_la.h" |
59 |
|
60 |
# define CB_SIZE_TIME_UNLIMITED 1e12 |
61 |
|
62 |
void selectPeersForChunks(SchedOrdering ordering, schedPeerID *peers, size_t peers_len, schedChunkID *chunks, size_t chunks_len, schedPeerID *selected, size_t *selected_len, filterFunction filter, peerEvaluateFunction evaluate);
|
63 |
|
64 |
struct chunk_attributes {
|
65 |
uint64_t deadline; |
66 |
uint16_t deadline_increment; |
67 |
uint16_t hopcount; |
68 |
} __attribute__((packed)); |
69 |
|
70 |
struct streaming_context {
|
71 |
struct chunk_buffer *cb;
|
72 |
struct input_desc *input;
|
73 |
struct chunk_locks * ch_locks;
|
74 |
struct service_times_element * transactions;
|
75 |
const struct psinstance * ps; |
76 |
int cb_size;
|
77 |
int bcast_after_receive_every;
|
78 |
bool neigh_on_chunk_recv;
|
79 |
bool send_bmap_before_push;
|
80 |
uint64_t CB_SIZE_TIME; |
81 |
uint32_t chunk_loss_interval; |
82 |
}; |
83 |
|
84 |
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config) |
85 |
{ |
86 |
struct streaming_context * stc;
|
87 |
static char conf[80]; |
88 |
|
89 |
stc = malloc(sizeof(struct streaming_context)); |
90 |
stc->bcast_after_receive_every = 0;
|
91 |
stc->neigh_on_chunk_recv = false;
|
92 |
stc->send_bmap_before_push = false;
|
93 |
stc->transactions = NULL;
|
94 |
stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED; //in millisec, defaults to unlimited
|
95 |
stc->chunk_loss_interval = 0; // disable self-lossy feature (for experiments) |
96 |
|
97 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
|
98 |
|
99 |
stc->ps = ps; |
100 |
stc->cb_size = psinstance_chunkbuffer_size(ps); |
101 |
sprintf(conf, "size=%d", stc->cb_size);
|
102 |
stc->cb = cb_init(conf); |
103 |
chunkDeliveryInit(psinstance_nodeid(ps)); |
104 |
chunkSignalingInit(psinstance_nodeid(ps)); |
105 |
stc->ch_locks = chunk_locks_create(); |
106 |
return stc;
|
107 |
} |
108 |
|
109 |
void streaming_destroy(struct streaming_context ** stc) |
110 |
{ |
111 |
if (stc && *stc)
|
112 |
{ |
113 |
if((*stc)->input)
|
114 |
input_close((*stc)->input); |
115 |
if(((*stc)->ch_locks))
|
116 |
chunk_locks_destroy(&((*stc)->ch_locks)); |
117 |
if(((*stc)->transactions))
|
118 |
transaction_destroy(&((*stc)->transactions)); |
119 |
if(((*stc)->cb))
|
120 |
cb_destroy((*stc)->cb); |
121 |
free((*stc)); |
122 |
} |
123 |
} |
124 |
|
125 |
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid); |
126 |
|
127 |
uint64_t gettimeofday_in_us(void)
|
128 |
{ |
129 |
struct timeval what_time; //to store the epoch time |
130 |
|
131 |
gettimeofday(&what_time, NULL);
|
132 |
return what_time.tv_sec * 1000000ULL + what_time.tv_usec; |
133 |
} |
134 |
|
135 |
void cb_print(const struct streaming_context * stc) |
136 |
{ |
137 |
#ifdef DEBUG
|
138 |
struct chunk *chunks;
|
139 |
int num_chunks, i, id;
|
140 |
chunks = cb_get_chunks(stc->cb, &num_chunks); |
141 |
|
142 |
dprintf("\tchbuf :");
|
143 |
i = 0;
|
144 |
if(num_chunks) {
|
145 |
id = chunks[0].id;
|
146 |
dprintf(" %d-> ",id);
|
147 |
while (i < num_chunks) {
|
148 |
if (id == chunks[i].id) {
|
149 |
dprintf("%d",id % 10); |
150 |
i++; |
151 |
} else if (chunk_islocked(stc->ch_locks, id)) { |
152 |
dprintf("*");
|
153 |
} else {
|
154 |
dprintf(".");
|
155 |
} |
156 |
id++; |
157 |
} |
158 |
} |
159 |
dprintf("\n");
|
160 |
#endif
|
161 |
} |
162 |
|
163 |
void chunk_attributes_fill(struct chunk* c) |
164 |
{ |
165 |
struct chunk_attributes * ca;
|
166 |
int priority = 1; |
167 |
|
168 |
assert((!c->attributes && c->attributes_size == 0)
|
169 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
170 |
|| chunk_attributes_chunker_verify(c->attributes, c->attributes_size) |
171 |
#endif
|
172 |
); |
173 |
|
174 |
#ifdef CHUNK_ATTRIB_CHUNKER
|
175 |
if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
|
176 |
priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
|
177 |
free(c->attributes); |
178 |
c->attributes = NULL;
|
179 |
c->attributes_size = 0;
|
180 |
} |
181 |
#endif
|
182 |
|
183 |
c->attributes_size = sizeof(struct chunk_attributes); |
184 |
c->attributes = ca = malloc(c->attributes_size); |
185 |
|
186 |
ca->deadline = c->id; |
187 |
ca->deadline_increment = priority * 2;
|
188 |
ca->hopcount = 0;
|
189 |
} |
190 |
|
191 |
int chunk_get_hopcount(const struct chunk* c) { |
192 |
struct chunk_attributes * ca;
|
193 |
|
194 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
195 |
fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
196 |
return -1; |
197 |
} |
198 |
|
199 |
ca = (struct chunk_attributes *) c->attributes;
|
200 |
return ca->hopcount;
|
201 |
} |
202 |
|
203 |
void chunk_attributes_update_received(struct chunk* c) |
204 |
{ |
205 |
struct chunk_attributes * ca;
|
206 |
|
207 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
208 |
fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
209 |
return;
|
210 |
} |
211 |
|
212 |
ca = (struct chunk_attributes *) c->attributes;
|
213 |
ca->hopcount++; |
214 |
dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
|
215 |
} |
216 |
|
217 |
void chunk_attributes_update_sending(const struct chunk* c) |
218 |
{ |
219 |
struct chunk_attributes * ca;
|
220 |
|
221 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
222 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
223 |
return;
|
224 |
} |
225 |
|
226 |
ca = (struct chunk_attributes *) c->attributes;
|
227 |
ca->deadline += ca->deadline_increment; |
228 |
dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
|
229 |
} |
230 |
|
231 |
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
232 |
{ |
233 |
struct chunk *chunks;
|
234 |
int num_chunks, i;
|
235 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
236 |
chunks = cb_get_chunks(chbuf, &num_chunks); |
237 |
|
238 |
for(i=num_chunks-1; i>=0; i--) { |
239 |
chunkID_set_add_chunk(my_bmap, chunks[i].id); |
240 |
} |
241 |
return my_bmap;
|
242 |
} |
243 |
|
244 |
// a simple implementation that request everything that we miss ... up to max deliver
|
245 |
struct chunkID_set *get_chunks_to_accept(const struct streaming_context * stc, struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){ |
246 |
struct chunkID_set *cset_acc, *my_bmap;
|
247 |
int i, d, cset_off_size;
|
248 |
//double lossrate;
|
249 |
struct peer *from = nodeid_to_peer(psinstance_topology(stc->ps), fromid, 0); |
250 |
|
251 |
cset_acc = chunkID_set_init("size=0");
|
252 |
|
253 |
//reduce load a little bit if there are losses on the path from this guy
|
254 |
//lossrate = get_lossrate_receive(from->id);
|
255 |
//lossrate = finite(lossrate) ? lossrate : 0; //start agressively, assuming 0 loss
|
256 |
//if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
|
257 |
my_bmap = cb_to_bmap(stc->cb); |
258 |
cset_off_size = chunkID_set_size(cset_off); |
259 |
for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) { |
260 |
int chunkid = chunkID_set_get_chunk(cset_off, i);
|
261 |
//dprintf("\tdo I need c%d ? :",chunkid);
|
262 |
if (!chunk_islocked(stc->ch_locks, chunkid) && _needs(stc, my_bmap, stc->cb_size, chunkid)) {
|
263 |
chunkID_set_add_chunk(cset_acc, chunkid); |
264 |
chunk_lock(stc->ch_locks, chunkid,from); |
265 |
dtprintf("accepting %d from %s", chunkid, nodeid_static_str(fromid));
|
266 |
#ifdef MONL
|
267 |
dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
|
268 |
#endif
|
269 |
dprintf("\n");
|
270 |
d++; |
271 |
} |
272 |
} |
273 |
chunkID_set_free(my_bmap); |
274 |
//} else {
|
275 |
// dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
|
276 |
//}
|
277 |
|
278 |
reg_offer_accept_in(psinstance_measures(stc->ps), chunkID_set_size(cset_acc) > 0 ? 1 : 0); |
279 |
|
280 |
return cset_acc;
|
281 |
} |
282 |
|
283 |
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid) |
284 |
{ |
285 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
286 |
sendBufferMap(psinstance_nodeid(stc->ps), toid, NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0); |
287 |
#ifdef LOG_SIGNAL
|
288 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT"); |
289 |
#endif
|
290 |
chunkID_set_free(my_bmap); |
291 |
} |
292 |
|
293 |
void bcast_bmap(const struct streaming_context * stc) |
294 |
{ |
295 |
int i, n;
|
296 |
struct peer **neighbours;
|
297 |
struct peerset *pset;
|
298 |
struct chunkID_set *my_bmap;
|
299 |
|
300 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
301 |
n = peerset_size(pset); |
302 |
neighbours = peerset_get_peers(pset); |
303 |
|
304 |
my_bmap = cb_to_bmap(stc->cb); //cache our bmap for faster processing
|
305 |
for (i = 0; i<n; i++) { |
306 |
sendBufferMap(psinstance_nodeid(stc->ps),neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0); |
307 |
#ifdef LOG_SIGNAL
|
308 |
log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT"); |
309 |
#endif
|
310 |
} |
311 |
chunkID_set_free(my_bmap); |
312 |
} |
313 |
|
314 |
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id) |
315 |
{ |
316 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
317 |
sendAck(psinstance_nodeid(stc->ps),toid, my_bmap,trans_id); |
318 |
#ifdef LOG_SIGNAL
|
319 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
|
320 |
#endif
|
321 |
chunkID_set_free(my_bmap); |
322 |
} |
323 |
|
324 |
double get_average_lossrate_pset(struct peerset *pset) |
325 |
{ |
326 |
return 0; |
327 |
} |
328 |
|
329 |
void ack_chunk(const struct streaming_context * stc, struct chunk *c, struct nodeID *from, uint16_t trans_id) |
330 |
{ |
331 |
//reduce load a little bit if there are losses on the path from this guy
|
332 |
double average_lossrate = get_average_lossrate_pset(topology_get_neighbours(psinstance_topology(stc->ps)));
|
333 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
334 |
if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) { |
335 |
return;
|
336 |
} |
337 |
send_ack(stc, from, trans_id); //send explicit ack
|
338 |
} |
339 |
|
340 |
void received_chunk(struct streaming_context * stc, struct nodeID *from, const uint8_t *buff, int len) |
341 |
{ |
342 |
int res;
|
343 |
static struct chunk c; |
344 |
struct peer *p;
|
345 |
static int bcast_cnt; |
346 |
uint16_t transid; |
347 |
|
348 |
res = parseChunkMsg(buff + 1, len - 1, &c, &transid); |
349 |
if (res > 0) { |
350 |
if (stc->chunk_loss_interval && c.id % stc->chunk_loss_interval == 0) { |
351 |
fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
|
352 |
free(c.data); |
353 |
free(c.attributes); |
354 |
return;
|
355 |
} |
356 |
chunk_attributes_update_received(&c); |
357 |
chunk_unlock(stc->ch_locks, c.id); |
358 |
dprintf("Received chunk %d from peer: %s\n", c.id, nodeid_static_str(from));
|
359 |
#ifdef LOG_CHUNK
|
360 |
log_chunk(from,psinstance_nodeid(stc->ps),&c,"RECEIVED");
|
361 |
#endif
|
362 |
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
|
363 |
output_deliver(psinstance_output(stc->ps), &c); |
364 |
res = cb_add_chunk(stc->cb, &c); |
365 |
reg_chunk_receive(psinstance_measures(stc->ps),c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE); |
366 |
cb_print(stc); |
367 |
if (res < 0) { |
368 |
dprintf("\tchunk too old, buffer full with newer chunks\n");
|
369 |
#ifdef LOG_CHUNK
|
370 |
log_chunk_error(from,psinstance_nodeid(stc->ps),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
|
371 |
#endif
|
372 |
free(c.data); |
373 |
free(c.attributes); |
374 |
} |
375 |
p = nodeid_to_peer(psinstance_topology(stc->ps), from, stc->neigh_on_chunk_recv); |
376 |
if (p) { //now we have it almost sure |
377 |
chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
378 |
gettimeofday(&p->bmap_timestamp, NULL);
|
379 |
} |
380 |
ack_chunk(stc, &c, from, transid); //send explicit ack
|
381 |
if (stc->bcast_after_receive_every && bcast_cnt % stc->bcast_after_receive_every == 0) { |
382 |
bcast_bmap(stc); |
383 |
} |
384 |
} else {
|
385 |
fprintf(stderr,"\tError: can't decode chunk!\n");
|
386 |
} |
387 |
} |
388 |
|
389 |
struct chunk *generated_chunk(struct streaming_context * stc, suseconds_t *delta) |
390 |
{ |
391 |
struct chunk *c;
|
392 |
|
393 |
c = malloc(sizeof(struct chunk)); |
394 |
if (!c) {
|
395 |
fprintf(stderr, "Memory allocation error!\n");
|
396 |
return NULL; |
397 |
} |
398 |
memset(c, 0, sizeof(struct chunk)); |
399 |
|
400 |
*delta = (suseconds_t)input_get(stc->input, c); |
401 |
if (*delta < 0) { |
402 |
fprintf(stderr, "Error in input!\n");
|
403 |
exit(-1);
|
404 |
} |
405 |
if (c->data == NULL) { |
406 |
free(c); |
407 |
return NULL; |
408 |
} |
409 |
dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
|
410 |
chunk_attributes_fill(c); |
411 |
return c;
|
412 |
} |
413 |
|
414 |
int add_chunk(struct streaming_context * stc, struct chunk *c) |
415 |
{ |
416 |
int res;
|
417 |
|
418 |
if (stc && c && stc->cb)
|
419 |
{ |
420 |
res = cb_add_chunk(stc->cb, c); |
421 |
if (res < 0) { |
422 |
free(c->data); |
423 |
free(c->attributes); |
424 |
free(c); |
425 |
return 0; |
426 |
} |
427 |
// free(c);
|
428 |
return 1; |
429 |
} |
430 |
return 0; |
431 |
} |
432 |
|
433 |
uint64_t get_chunk_timestamp(const struct streaming_context * stc, int cid){ |
434 |
const struct chunk *c = cb_get_chunk(stc->cb, cid); |
435 |
if (!c) return 0; |
436 |
|
437 |
return c->timestamp;
|
438 |
} |
439 |
|
440 |
void print_chunkID_set(struct chunkID_set *cset) |
441 |
{ |
442 |
uint32_t * ptr = (uint32_t *) cset; |
443 |
uint32_t n_elements,i; |
444 |
int * data = (int *) &(ptr[3]); |
445 |
fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]); |
446 |
fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]); |
447 |
n_elements = ptr[2];
|
448 |
fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
|
449 |
fprintf(stderr,"[DEBUG] Chunk ID elements: [");
|
450 |
for (i=0;i<n_elements;i++) |
451 |
fprintf(stderr,".%d.",data[i]);
|
452 |
|
453 |
fprintf(stderr,"]\n");
|
454 |
} |
455 |
|
456 |
/**
|
457 |
*example function to filter chunks based on whether a given peer needs them.
|
458 |
*
|
459 |
* Looks at buffermap information received about the given peer.
|
460 |
*/
|
461 |
int needs_old(const struct streaming_context * stc, struct peer *n, int cid){ |
462 |
struct peer * p = n;
|
463 |
|
464 |
if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
465 |
uint64_t ts; |
466 |
ts = get_chunk_timestamp(stc, cid); |
467 |
if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
468 |
return 0; |
469 |
} |
470 |
} |
471 |
|
472 |
//dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
|
473 |
if (! p->bmap) { // this will never happen since the pset module initializes bmap |
474 |
//dprintf("no bmap\n");
|
475 |
return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
476 |
} |
477 |
|
478 |
// fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
|
479 |
// print_chunkID_set(p->bmap); // DEBUG
|
480 |
|
481 |
return _needs(stc, p->bmap, p->cb_size, cid);
|
482 |
} |
483 |
|
484 |
/**
|
485 |
* Function checking if chunkID_set cset may need chunk with id cid
|
486 |
* @cset: target cset
|
487 |
* @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
|
488 |
* the maximum capacity in of the receiving peer (so it's 0 for the source)
|
489 |
* @cid: target chunk identifier
|
490 |
*/
|
491 |
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid){ |
492 |
|
493 |
if (cb_size == 0) { //if it declared it does not needs chunks |
494 |
return 0; |
495 |
} |
496 |
|
497 |
if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
|
498 |
uint64_t ts; |
499 |
ts = get_chunk_timestamp(stc, cid); |
500 |
if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) { //if we don't know the timestamp, we accept |
501 |
return 0; |
502 |
} |
503 |
} |
504 |
|
505 |
if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk |
506 |
int missing, min;
|
507 |
//@TODO: add some bmap_timestamp based logic
|
508 |
|
509 |
if (chunkID_set_size(cset) == 0) { |
510 |
//dprintf("bmap empty\n");
|
511 |
return 1; // if the bmap seems empty, it needs the chunk |
512 |
} |
513 |
missing = stc->cb_size - chunkID_set_size(cset); |
514 |
missing = missing < 0 ? 0 : missing; |
515 |
min = chunkID_set_get_earliest(cset); |
516 |
//dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
|
517 |
return (cid >= min - missing);
|
518 |
} |
519 |
|
520 |
//dprintf("has it\n");
|
521 |
return 0; |
522 |
} |
523 |
|
524 |
int needs(struct peer *p, int cid) |
525 |
{ |
526 |
int min;
|
527 |
|
528 |
if (p->cb_size == 0) // it does not have capacity |
529 |
return 0; |
530 |
if (chunkID_set_check(p->bmap,cid) < 0) // not in bmap |
531 |
{ |
532 |
if(p->cb_size > chunkID_set_size(p->bmap)) // it has room for chunks anyway |
533 |
{ |
534 |
min = chunkID_set_get_earliest(p->bmap) - p->cb_size + chunkID_set_size(p->bmap); |
535 |
min = min < 0 ? 0 : min; |
536 |
if (cid >= min)
|
537 |
return 1; |
538 |
} |
539 |
if((int)chunkID_set_get_earliest(p->bmap) < cid) // our is reasonably new |
540 |
return 1; |
541 |
} |
542 |
return 0; |
543 |
} |
544 |
|
545 |
double peerWeightReceivedfrom(struct peer **n){ |
546 |
struct peer * p = *n;
|
547 |
return timerisset(&p->bmap_timestamp) ? 1 : 0.1; |
548 |
} |
549 |
|
550 |
double peerWeightUniform(struct peer **n){ |
551 |
return 1; |
552 |
} |
553 |
|
554 |
double peerWeightLoss(struct peer **n){ |
555 |
return 1; |
556 |
} |
557 |
|
558 |
//double peerWeightRtt(struct peer **n){
|
559 |
//#ifdef MONL
|
560 |
// double rtt = get_rtt((*n)->id);
|
561 |
// //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
|
562 |
// return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
|
563 |
//#else
|
564 |
// return 1;
|
565 |
//#endif
|
566 |
//}
|
567 |
|
568 |
//ordering function for ELp peer selection, chunk ID based
|
569 |
//can't be used as weight
|
570 |
//double peerScoreELpID(struct nodeID **n){
|
571 |
// struct chunkID_set *bmap;
|
572 |
// int latest;
|
573 |
// struct peer * p = nodeid_to_peer(*n, 0);
|
574 |
// if (!p) return 0;
|
575 |
//
|
576 |
// bmap = p->bmap;
|
577 |
// if (!bmap) return 0;
|
578 |
// latest = chunkID_set_get_latest(bmap);
|
579 |
// if (latest == INT_MIN) return 0;
|
580 |
//
|
581 |
// return -latest;
|
582 |
//}
|
583 |
|
584 |
double chunkScoreChunkID(int *cid){ |
585 |
return (double) *cid; |
586 |
} |
587 |
|
588 |
uint64_t get_chunk_deadline(const struct streaming_context * stc, int cid){ |
589 |
const struct chunk_attributes * ca; |
590 |
const struct chunk *c; |
591 |
|
592 |
c = cb_get_chunk(stc->cb, cid); |
593 |
if (!c) return 0; |
594 |
|
595 |
if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
596 |
fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
597 |
return 0; |
598 |
} |
599 |
|
600 |
ca = (struct chunk_attributes *) c->attributes;
|
601 |
return ca->deadline;
|
602 |
} |
603 |
|
604 |
//double chunkScoreDL(int *cid){
|
605 |
// return - (double)get_chunk_deadline(*cid);
|
606 |
//}
|
607 |
|
608 |
//double chunkScoreTimestamp(int *cid){
|
609 |
// return (double) get_chunk_timestamp(*cid);
|
610 |
//}
|
611 |
|
612 |
void send_accepted_chunks(const struct streaming_context * stc, struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){ |
613 |
int i, d, cset_acc_size, res;
|
614 |
struct peer *to = nodeid_to_peer(psinstance_topology(stc->ps), toid, 0); |
615 |
|
616 |
transaction_reg_accept(stc->transactions, trans_id, toid); |
617 |
|
618 |
cset_acc_size = chunkID_set_size(cset_acc); |
619 |
reg_offer_accept_out(psinstance_measures(stc->ps),cset_acc_size > 0 ? 1 : 0); //this only works if accepts are sent back even if 0 is accepted |
620 |
for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) { |
621 |
const struct chunk *c; |
622 |
int chunkid = chunkID_set_get_chunk(cset_acc, i);
|
623 |
c = cb_get_chunk(stc->cb, chunkid); |
624 |
if (!c) { // we should have the chunk |
625 |
dprintf("%s asked for chunk %d we do not own anymore\n", nodeid_static_str(toid), chunkid);
|
626 |
continue;
|
627 |
} |
628 |
if (!to || needs(to, chunkid)) { //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification |
629 |
chunk_attributes_update_sending(c); |
630 |
res = sendChunk(psinstance_nodeid(stc->ps),toid, c, trans_id); |
631 |
if (res >= 0) { |
632 |
if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive |
633 |
d++; |
634 |
reg_chunk_send(psinstance_measures(stc->ps),c->id); |
635 |
#ifdef LOG_CHUNK
|
636 |
log_chunk(psinstance_nodeid(stc->ps), toid,c, "SENT_ACCEPTED");
|
637 |
#endif
|
638 |
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
|
639 |
} else {
|
640 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
641 |
} |
642 |
} |
643 |
} |
644 |
} |
645 |
|
646 |
int offer_peer_count(const struct streaming_context * stc) |
647 |
{ |
648 |
return psinstance_num_offers(stc->ps);
|
649 |
} |
650 |
|
651 |
int offer_max_deliver(const struct streaming_context * stc, const struct nodeID *n) |
652 |
{ |
653 |
return psinstance_chunks_per_offer(stc->ps);
|
654 |
} |
655 |
|
656 |
////get the rtt. Currenly only MONL version is supported
|
657 |
//static double get_rtt_of(struct nodeID* n){
|
658 |
//#ifdef MONL
|
659 |
// return get_rtt(n);
|
660 |
//#else
|
661 |
// return NAN;
|
662 |
//#endif
|
663 |
//}
|
664 |
|
665 |
#define DEFAULT_RTT_ESTIMATE 0.5 |
666 |
|
667 |
struct chunkID_set *compose_offer_cset(const struct streaming_context * stc, const struct peer *p) |
668 |
{ |
669 |
int num_chunks, j;
|
670 |
uint64_t smallest_ts; //, largest_ts;
|
671 |
double dt;
|
672 |
struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
673 |
struct chunk *chunks = cb_get_chunks(stc->cb, &num_chunks);
|
674 |
|
675 |
dt = DEFAULT_RTT_ESTIMATE; |
676 |
dt *= 1e6; //convert to usec |
677 |
|
678 |
smallest_ts = chunks[0].timestamp;
|
679 |
// largest_ts = chunks[num_chunks-1].timestamp;
|
680 |
|
681 |
//add chunks in latest...earliest order
|
682 |
if (psinstance_is_source(stc->ps)) {
|
683 |
j = (num_chunks-1) * 3/4; //do not send offers for the latest chunks from the source |
684 |
} else {
|
685 |
j = num_chunks-1;
|
686 |
} |
687 |
for(; j>=0; j--) { |
688 |
if (chunks[j].timestamp > smallest_ts + dt)
|
689 |
chunkID_set_add_chunk(my_bmap, chunks[j].id); |
690 |
} |
691 |
|
692 |
return my_bmap;
|
693 |
} |
694 |
|
695 |
|
696 |
void send_offer(struct streaming_context * stc) |
697 |
{ |
698 |
struct chunk *buff;
|
699 |
size_t size=0, i, n;
|
700 |
struct peer **neighbours;
|
701 |
struct peerset *pset;
|
702 |
|
703 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
704 |
n = peerset_size(pset); |
705 |
neighbours = peerset_get_peers(pset); |
706 |
// dprintf("Send Offer: %lu neighbours\n", n);
|
707 |
if (n == 0) return; |
708 |
buff = cb_get_chunks(stc->cb, (int *)&size);
|
709 |
if (size == 0) return; |
710 |
|
711 |
size_t selectedpeers_len = offer_peer_count(stc); |
712 |
int chunkids[size];
|
713 |
struct peer *nodeids[n];
|
714 |
struct peer *selectedpeers[selectedpeers_len];
|
715 |
|
716 |
//reduce load a little bit if there are losses on the path from this guy
|
717 |
double average_lossrate = get_average_lossrate_pset(pset);
|
718 |
average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
719 |
if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) { |
720 |
return;
|
721 |
} |
722 |
|
723 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
724 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
725 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
726 |
|
727 |
for (i=0; i<selectedpeers_len ; i++){ |
728 |
int transid = transaction_create(&(stc->transactions), selectedpeers[i]->id);
|
729 |
int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
|
730 |
struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
|
731 |
dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), selectedpeers[i]->cb_size);
|
732 |
offerChunks(psinstance_nodeid(stc->ps),selectedpeers[i]->id, offer_cset, max_deliver, transid++); |
733 |
#ifdef LOG_SIGNAL
|
734 |
log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
|
735 |
#endif
|
736 |
chunkID_set_free(offer_cset); |
737 |
} |
738 |
} |
739 |
|
740 |
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error) |
741 |
{ |
742 |
switch (error) {
|
743 |
case E_CB_OLD:
|
744 |
log_chunk(from,to,c,"TOO_OLD");
|
745 |
break;
|
746 |
case E_CB_DUPLICATE:
|
747 |
log_chunk(from,to,c,"DUPLICATED");
|
748 |
break;
|
749 |
default:
|
750 |
log_chunk(from,to,c,"ERROR");
|
751 |
} |
752 |
} |
753 |
|
754 |
void log_neighbourhood(const struct streaming_context * stc) |
755 |
{ |
756 |
struct peerset * pset;
|
757 |
const struct peer * p; |
758 |
int psetsize,i;
|
759 |
uint64_t now; |
760 |
char me[NODE_STR_LENGTH];
|
761 |
|
762 |
node_addr(psinstance_nodeid(stc->ps), me, NODE_STR_LENGTH); |
763 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
764 |
psetsize = peerset_size(pset); |
765 |
now = gettimeofday_in_us(); |
766 |
peerset_for_each(pset,p,i) |
767 |
fprintf(stderr,"[NEIGHBOURHOOD],%"PRIu64",%s,%s,%d\n",now,me,nodeid_static_str(p->id),psetsize); |
768 |
|
769 |
} |
770 |
|
771 |
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note) |
772 |
{ |
773 |
// semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
|
774 |
char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
|
775 |
node_addr(from,sndr,NODE_STR_LENGTH); |
776 |
node_addr(to,rcvr,NODE_STR_LENGTH); |
777 |
|
778 |
fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note); |
779 |
} |
780 |
|
781 |
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag) |
782 |
{ |
783 |
char typestr[24]; |
784 |
char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
|
785 |
node_addr(fromid,sndr,NODE_STR_LENGTH); |
786 |
node_addr(toid,rcvr,NODE_STR_LENGTH); |
787 |
|
788 |
switch (type)
|
789 |
{ |
790 |
case sig_offer:
|
791 |
sprintf(typestr,"%s","OFFER_SIG"); |
792 |
break;
|
793 |
case sig_accept:
|
794 |
sprintf(typestr,"%s","ACCEPT_SIG"); |
795 |
break;
|
796 |
case sig_request:
|
797 |
sprintf(typestr,"%s","REQUEST_SIG"); |
798 |
break;
|
799 |
case sig_deliver:
|
800 |
sprintf(typestr,"%s","DELIVER_SIG"); |
801 |
break;
|
802 |
case sig_send_buffermap:
|
803 |
sprintf(typestr,"%s","SEND_BMAP_SIG"); |
804 |
break;
|
805 |
case sig_request_buffermap:
|
806 |
sprintf(typestr,"%s","REQUEST_BMAP_SIG"); |
807 |
break;
|
808 |
case sig_ack:
|
809 |
sprintf(typestr,"%s","CHUNK_ACK_SIG"); |
810 |
break;
|
811 |
default:
|
812 |
sprintf(typestr,"%s","UNKNOWN_SIG"); |
813 |
|
814 |
} |
815 |
fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%s,%s\n",sndr,rcvr,trans_id,typestr,flag);
|
816 |
} |
817 |
|
818 |
int peer_chunk_dispatch(struct streaming_context * stc, const struct PeerChunk *pairs,const size_t num_pairs) |
819 |
{ |
820 |
int transid, res,success = 0; |
821 |
size_t i; |
822 |
const struct peer * target_peer; |
823 |
const struct chunk * target_chunk; |
824 |
|
825 |
for (i=0; i<num_pairs ; i++){ |
826 |
target_peer = pairs[i].peer; |
827 |
target_chunk = cb_get_chunk(stc->cb, pairs[i].chunk); |
828 |
|
829 |
if (stc->send_bmap_before_push) {
|
830 |
send_bmap(stc, target_peer->id); |
831 |
} |
832 |
chunk_attributes_update_sending(target_chunk); |
833 |
transid = transaction_create(&(stc->transactions), target_peer->id); |
834 |
res = sendChunk(psinstance_nodeid(stc->ps),target_peer->id, target_chunk, transid); //we use transactions in order to register acks for push
|
835 |
if (res>=0) { |
836 |
#ifdef LOG_CHUNK
|
837 |
log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
|
838 |
#endif
|
839 |
// chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
|
840 |
reg_chunk_send(psinstance_measures(stc->ps),target_chunk->id); |
841 |
success++; |
842 |
} else {
|
843 |
fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
|
844 |
} |
845 |
} |
846 |
return success;
|
847 |
|
848 |
} |
849 |
|
850 |
int inject_chunk(struct streaming_context * stc, const struct chunk * target_chunk,const int multiplicity) |
851 |
/*injects a specific chunk in the overlay and return the number of injected copies*/
|
852 |
{ |
853 |
struct peerset *pset;
|
854 |
struct peer ** peers, ** dst_peers;
|
855 |
int peers_num;
|
856 |
double (* peer_evaluation) (struct peer **n); |
857 |
size_t i, selectedpairs_len = multiplicity; |
858 |
struct PeerChunk * selectedpairs;
|
859 |
|
860 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
861 |
peers_num = peerset_size(pset); |
862 |
peers = peerset_get_peers(pset); |
863 |
peer_evaluation = SCHED_PEER; |
864 |
|
865 |
//SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
|
866 |
dst_peers = (struct peer **) malloc(sizeof(struct peer* ) * multiplicity); |
867 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
868 |
|
869 |
selectedpairs = (struct PeerChunk *) malloc(sizeof(struct PeerChunk) * selectedpairs_len); |
870 |
for ( i=0; i<selectedpairs_len; i++) |
871 |
{ |
872 |
selectedpairs[i].peer = dst_peers[i]; |
873 |
selectedpairs[i].chunk = target_chunk->id; |
874 |
} |
875 |
|
876 |
peer_chunk_dispatch(stc, selectedpairs,selectedpairs_len); |
877 |
|
878 |
free(selectedpairs); |
879 |
free(dst_peers); |
880 |
return selectedpairs_len;
|
881 |
} |
882 |
|
883 |
void send_chunk(struct streaming_context * stc) |
884 |
{ |
885 |
struct chunk *buff;
|
886 |
int size, res, i, n;
|
887 |
struct peer **neighbours;
|
888 |
struct peerset *pset;
|
889 |
|
890 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
891 |
n = peerset_size(pset); |
892 |
neighbours = peerset_get_peers(pset); |
893 |
dprintf("Send Chunk: %d neighbours\n", n);
|
894 |
if (n == 0) return; |
895 |
buff = cb_get_chunks(stc->cb, &size); |
896 |
dprintf("\t %d chunks in buffer...\n", size);
|
897 |
if (size == 0) return; |
898 |
|
899 |
/************ STUPID DUMB SCHEDULING ****************/
|
900 |
//target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
901 |
//c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
902 |
/************ /STUPID DUMB SCHEDULING ****************/
|
903 |
|
904 |
/************ USE SCHEDULER ****************/
|
905 |
{ |
906 |
size_t selectedpairs_len = 1;
|
907 |
int chunkids[size];
|
908 |
int transid;
|
909 |
struct peer *nodeids[n];
|
910 |
struct PeerChunk selectedpairs[1]; |
911 |
|
912 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
913 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
914 |
SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
|
915 |
/************ /USE SCHEDULER ****************/
|
916 |
|
917 |
for (i=0; i<(int)selectedpairs_len ; i++){ |
918 |
struct peer *p = selectedpairs[i].peer;
|
919 |
const struct chunk *c = cb_get_chunk(stc->cb, selectedpairs[i].chunk); |
920 |
dprintf("\t sending chunk[%d] to ", c->id);
|
921 |
dprintf("%s\n", nodeid_static_str(p->id));
|
922 |
|
923 |
if (stc->send_bmap_before_push) {
|
924 |
send_bmap(stc, p->id); |
925 |
} |
926 |
|
927 |
chunk_attributes_update_sending(c); |
928 |
transid = transaction_create(&(stc->transactions), p->id); |
929 |
res = sendChunk(psinstance_nodeid(stc->ps),p->id, c, transid); //we use transactions in order to register acks for push
|
930 |
// res = sendChunk(p->id, c, 0); //we do not use transactions in pure push
|
931 |
dprintf("\tResult: %d\n", res);
|
932 |
if (res>=0) { |
933 |
#ifdef LOG_CHUNK
|
934 |
log_chunk(psinstance_nodeid(stc->ps),p->id,c,"SENT");
|
935 |
#endif
|
936 |
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
|
937 |
chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
938 |
reg_chunk_send(psinstance_measures(stc->ps),c->id); |
939 |
} else {
|
940 |
fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
941 |
} |
942 |
} |
943 |
} |
944 |
} |