283 |
283 |
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid)
|
284 |
284 |
{
|
285 |
285 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
286 |
|
sendBufferMap(toid,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
|
|
286 |
sendBufferMap(psinstance_nodeid(stc->ps), toid, NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
|
287 |
287 |
#ifdef LOG_SIGNAL
|
288 |
288 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
|
289 |
289 |
#endif
|
... | ... | |
303 |
303 |
|
304 |
304 |
my_bmap = cb_to_bmap(stc->cb); //cache our bmap for faster processing
|
305 |
305 |
for (i = 0; i<n; i++) {
|
306 |
|
sendBufferMap(neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
|
|
306 |
sendBufferMap(psinstance_nodeid(stc->ps),neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
|
307 |
307 |
#ifdef LOG_SIGNAL
|
308 |
308 |
log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
|
309 |
309 |
#endif
|
... | ... | |
314 |
314 |
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id)
|
315 |
315 |
{
|
316 |
316 |
struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
|
317 |
|
sendAck(toid, my_bmap,trans_id);
|
|
317 |
sendAck(psinstance_nodeid(stc->ps),toid, my_bmap,trans_id);
|
318 |
318 |
#ifdef LOG_SIGNAL
|
319 |
319 |
log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
|
320 |
320 |
#endif
|
... | ... | |
627 |
627 |
}
|
628 |
628 |
if (!to || needs(to, chunkid)) { //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
|
629 |
629 |
chunk_attributes_update_sending(c);
|
630 |
|
res = sendChunk(toid, c, trans_id);
|
|
630 |
res = sendChunk(psinstance_nodeid(stc->ps),toid, c, trans_id);
|
631 |
631 |
if (res >= 0) {
|
632 |
632 |
if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
|
633 |
633 |
d++;
|
... | ... | |
729 |
729 |
int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
|
730 |
730 |
struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
|
731 |
731 |
dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), selectedpeers[i]->cb_size);
|
732 |
|
offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
|
|
732 |
offerChunks(psinstance_nodeid(stc->ps),selectedpeers[i]->id, offer_cset, max_deliver, transid++);
|
733 |
733 |
#ifdef LOG_SIGNAL
|
734 |
734 |
log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
|
735 |
735 |
#endif
|
... | ... | |
831 |
831 |
}
|
832 |
832 |
chunk_attributes_update_sending(target_chunk);
|
833 |
833 |
transid = transaction_create(&(stc->transactions), target_peer->id);
|
834 |
|
res = sendChunk(target_peer->id, target_chunk, transid); //we use transactions in order to register acks for push
|
|
834 |
res = sendChunk(psinstance_nodeid(stc->ps),target_peer->id, target_chunk, transid); //we use transactions in order to register acks for push
|
835 |
835 |
if (res>=0) {
|
836 |
836 |
#ifdef LOG_CHUNK
|
837 |
837 |
log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
|
... | ... | |
926 |
926 |
|
927 |
927 |
chunk_attributes_update_sending(c);
|
928 |
928 |
transid = transaction_create(&(stc->transactions), p->id);
|
929 |
|
res = sendChunk(p->id, c, transid); //we use transactions in order to register acks for push
|
|
929 |
res = sendChunk(psinstance_nodeid(stc->ps),p->id, c, transid); //we use transactions in order to register acks for push
|
930 |
930 |
// res = sendChunk(p->id, c, 0); //we do not use transactions in pure push
|
931 |
931 |
dprintf("\tResult: %d\n", res);
|
932 |
932 |
if (res>=0) {
|