Statistics
| Branch: | Revision:

streamers / streaming.c @ aa2355c3

History | View | Annotate | Download (18.3 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27
#include <chunkiser_attrib.h>
28

    
29
#include "streaming.h"
30
#include "output.h"
31
#include "input.h"
32
#include "dbg.h"
33
#include "chunk_signaling.h"
34
#include "chunklock.h"
35
#include "topology.h"
36
#include "measures.h"
37
#include "scheduling.h"
38
#include "transaction.h"
39

    
40
#include "scheduler_la.h"
41

    
42
# define CB_SIZE_TIME_UNLIMITED 1e12
43
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
44

    
45
static bool heuristics_distance_maxdeliver = false;
46
static int bcast_after_receive_every = 0;
47
static bool neigh_on_chunk_recv = false;
48
static bool send_bmap_before_push = false;
49

    
50
struct chunk_attributes {
51
  uint64_t deadline;
52
  uint16_t deadline_increment;
53
  uint16_t hopcount;
54
} __attribute__((packed));
55

    
56
extern bool chunk_log;
57

    
58
struct chunk_buffer *cb;
59
static struct input_desc *input;
60
static int cb_size;
61

    
62
static int offer_per_tick = 1;        //N_p parameter of POLITO
63

    
64
int _needs(struct chunkID_set *cset, int cb_size, int cid);
65

    
66
uint64_t gettimeofday_in_us(void)
67
{
68
  struct timeval what_time; //to store the epoch time
69

    
70
  gettimeofday(&what_time, NULL);
71
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
72
}
73

    
74
void cb_print()
75
{
76
#ifdef DEBUG
77
  struct chunk *chunks;
78
  int num_chunks, i, id;
79
  chunks = cb_get_chunks(cb, &num_chunks);
80

    
81
  dprintf("\tchbuf :");
82
  i = 0;
83
  if(num_chunks) {
84
    id = chunks[0].id;
85
    dprintf(" %d-> ",id);
86
    while (i < num_chunks) {
87
      if (id == chunks[i].id) {
88
        dprintf("%d",id % 10);
89
        i++;
90
      } else if (chunk_islocked(id)) {
91
        dprintf("*");
92
      } else {
93
        dprintf(".");
94
      }
95
      id++;
96
    }
97
  }
98
  dprintf("\n");
99
#endif
100
}
101

    
102
void stream_init(int size, struct nodeID *myID)
103
{
104
  static char conf[32];
105

    
106
  cb_size = size;
107

    
108
  sprintf(conf, "size=%d", cb_size);
109
  cb = cb_init(conf);
110
  chunkDeliveryInit(myID);
111
  chunkSignalingInit(myID);
112
  init_measures();
113
}
114

    
115
int source_init(const char *fname, struct nodeID *myID, int *fds, int fds_size, int buff_size)
116
{
117
  input = input_open(fname, fds, fds_size);
118
  if (input == NULL) {
119
    return -1;
120
  }
121

    
122
  stream_init(buff_size, myID);
123
  return 0;
124
}
125

    
126
void chunk_attributes_fill(struct chunk* c)
127
{
128
  struct chunk_attributes * ca;
129
  int priority = 1;
130

    
131
  assert((!c->attributes && c->attributes_size == 0) ||
132
         chunk_attributes_chunker_verify(c->attributes, c->attributes_size));
133

    
134
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
135
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
136
    free(c->attributes);
137
    c->attributes = NULL;
138
    c->attributes_size = 0;
139
  }
140

    
141
  c->attributes_size = sizeof(struct chunk_attributes);
142
  c->attributes = ca = malloc(c->attributes_size);
143

    
144
  ca->deadline = c->id;
145
  ca->deadline_increment = priority * 2;
146
  ca->hopcount = 0;
147
}
148

    
149
int chunk_get_hopcount(struct chunk* c) {
150
  struct chunk_attributes * ca;
151

    
152
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
153
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
154
    return -1;
155
  }
156

    
157
  ca = (struct chunk_attributes *) c->attributes;
158
  return ca->hopcount;
159
}
160

    
161
void chunk_attributes_update_received(struct chunk* c)
162
{
163
  struct chunk_attributes * ca;
164

    
165
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
166
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
167
    return;
168
  }
169

    
170
  ca = (struct chunk_attributes *) c->attributes;
171
  ca->hopcount++;
172
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
173
}
174

    
175
void chunk_attributes_update_sending(const struct chunk* c)
176
{
177
  struct chunk_attributes * ca;
178

    
179
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
180
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
181
    return;
182
  }
183

    
184
  ca = (struct chunk_attributes *) c->attributes;
185
  ca->deadline += ca->deadline_increment;
186
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
187
}
188

    
189
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
190
{
191
  struct chunk *chunks;
192
  int num_chunks, i;
193
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
194
  chunks = cb_get_chunks(chbuf, &num_chunks);
195

    
196
  for(i=num_chunks-1; i>=0; i--) {
197
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
198
  }
199
  return my_bmap;
200
}
201

    
202
// a simple implementation that request everything that we miss ... up to max deliver
203
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
204
  struct chunkID_set *cset_acc, *my_bmap;
205
  int i, d, cset_off_size;
206
  //double lossrate;
207
  struct peer *from = nodeid_to_peer(fromid, 0);
208

    
209
  cset_acc = chunkID_set_init("size=0");
210

    
211
  //reduce load a little bit if there are losses on the path from this guy
212
  //lossrate = get_lossrate_receive(from->id);
213
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
214
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
215
    my_bmap = cb_to_bmap(cb);
216
    cset_off_size = chunkID_set_size(cset_off);
217
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
218
      int chunkid = chunkID_set_get_chunk(cset_off, i);
219
      //dprintf("\tdo I need c%d ? :",chunkid);
220
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
221
        chunkID_set_add_chunk(cset_acc, chunkid);
222
        chunk_lock(chunkid,from);
223
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
224
#ifdef MONL
225
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
226
#endif
227
        dprintf("\n");
228
        d++;
229
      }
230
    }
231
    chunkID_set_free(my_bmap);
232
  //} else {
233
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
234
  //}
235

    
236
  reg_offer_accept_in(chunkID_set_size(cset_acc) > 0 ? 1 : 0);
237

    
238
  return cset_acc;
239
}
240

    
241
void send_bmap(struct nodeID *toid)
242
{
243
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
244
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
245
  chunkID_set_free(my_bmap);
246
}
247

    
248
void bcast_bmap()
249
{
250
  int i, n;
251
  struct peer *neighbours;
252
  struct peerset *pset;
253
  struct chunkID_set *my_bmap;
254

    
255
  pset = get_peers();
256
  n = peerset_size(pset);
257
  neighbours = peerset_get_peers(pset);
258

    
259
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
260
  for (i = 0; i<n; i++) {
261
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
262
  }
263
  chunkID_set_free(my_bmap);
264
}
265

    
266
void send_ack(struct nodeID *toid, uint16_t trans_id)
267
{
268
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
269
  sendAck(toid, my_bmap,trans_id);
270
  chunkID_set_free(my_bmap);
271
}
272

    
273
double get_average_lossrate_pset(struct peerset *pset)
274
{
275
  int i, n;
276
  struct peer *neighbours;
277

    
278
  n = peerset_size(pset);
279
  neighbours = peerset_get_peers(pset);
280
  {
281
    struct nodeID *nodeids[n];
282
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
283
#ifdef MONL
284
    return get_average_lossrate(nodeids, n);
285
#else
286
    return 0;
287
#endif
288
  }
289
}
290

    
291
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
292
{
293
  //reduce load a little bit if there are losses on the path from this guy
294
  double average_lossrate = get_average_lossrate_pset(get_peers());
295
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
296
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
297
    return;
298
  }
299
  send_ack(from, trans_id);        //send explicit ack
300
}
301

    
302
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
303
{
304
  int res;
305
  static struct chunk c;
306
  struct peer *p;
307
  static int bcast_cnt;
308
  uint16_t transid;
309

    
310
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
311
  if (res > 0) {
312
    chunk_attributes_update_received(&c);
313
    chunk_unlock(c.id);
314
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
315
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
316
    output_deliver(&c);
317
    res = cb_add_chunk(cb, &c);
318
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
319
    cb_print();
320
    if (res < 0) {
321
      dprintf("\tchunk too old, buffer full with newer chunks\n");
322
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
323
      free(c.data);
324
      free(c.attributes);
325
    }
326
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
327
    if (p) {        //now we have it almost sure
328
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
329
    }
330
    ack_chunk(&c, from, transid);        //send explicit ack
331
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
332
       bcast_bmap();
333
    }
334
  } else {
335
    fprintf(stderr,"\tError: can't decode chunk!\n");
336
  }
337
}
338

    
339
struct chunk *generated_chunk(suseconds_t *delta)
340
{
341
  struct chunk *c;
342

    
343
  c = malloc(sizeof(struct chunk));
344
  if (!c) {
345
    fprintf(stderr, "Memory allocation error!\n");
346
    return NULL;
347
  }
348

    
349
  *delta = input_get(input, c);
350
  if (*delta < 0) {
351
    fprintf(stderr, "Error in input!\n");
352
    exit(-1);
353
  }
354
  if (c->data == NULL) {
355
    free(c);
356
    return NULL;
357
  }
358
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
359
  chunk_attributes_fill(c);
360
  return c;
361
}
362

    
363
int add_chunk(struct chunk *c)
364
{
365
  int res;
366

    
367
  res = cb_add_chunk(cb, c);
368
  if (res < 0) {
369
    free(c->data);
370
    free(c->attributes);
371
    free(c);
372
    return 0;
373
  }
374
  free(c);
375
  return 1;
376
}
377

    
378
uint64_t get_chunk_timestamp(int cid){
379
  const struct chunk *c = cb_get_chunk(cb, cid);
380
  if (!c) return 0;
381

    
382
  return c->timestamp;
383
}
384

    
385
/**
386
 *example function to filter chunks based on whether a given peer needs them.
387
 *
388
 * Looks at buffermap information received about the given peer.
389
 */
390
int needs(struct peer *n, int cid){
391
  struct peer * p = n;
392

    
393
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
394
    uint64_t ts;
395
    ts = get_chunk_timestamp(cid);
396
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
397
      return 0;
398
    }
399
  }
400

    
401
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
402
  if (! p->bmap) {
403
    //dprintf("no bmap\n");
404
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
405
  }
406
  return _needs(p->bmap, p->cb_size, cid);
407
}
408

    
409
int _needs(struct chunkID_set *cset, int cb_size, int cid){
410

    
411
  if (cb_size == 0) { //if it declared it does not needs chunks
412
    return 0;
413
  }
414

    
415
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
416
    uint64_t ts;
417
    ts = get_chunk_timestamp(cid);
418
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
419
      return 0;
420
    }
421
  }
422

    
423
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
424
    int missing, min;
425
    //@TODO: add some bmap_timestamp based logic
426

    
427
    if (chunkID_set_size(cset) == 0) {
428
      //dprintf("bmap empty\n");
429
      return 1;        // if the bmap seems empty, it needs the chunk
430
    }
431
    missing = cb_size - chunkID_set_size(cset);
432
    missing = missing < 0 ? 0 : missing;
433
    min = chunkID_set_get_earliest(cset);
434
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
435
    return (cid >= min - missing);
436
  }
437

    
438
  //dprintf("has it\n");
439
  return 0;
440
}
441

    
442
double peerWeightReceivedfrom(struct peer **n){
443
  struct peer * p = *n;
444
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
445
}
446

    
447
double peerWeightUniform(struct peer **n){
448
  return 1;
449
}
450

    
451
double peerWeightRtt(struct peer **n){
452
#ifdef MONL
453
  double rtt = get_rtt((*n)->id);
454
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
455
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
456
#else
457
  return 1;
458
#endif
459
}
460

    
461
//ordering function for ELp peer selection, chunk ID based
462
//can't be used as weight
463
double peerScoreELpID(struct nodeID **n){
464
  struct chunkID_set *bmap;
465
  int latest;
466
  struct peer * p = nodeid_to_peer(*n, 0);
467
  if (!p) return 0;
468

    
469
  bmap = p->bmap;
470
  if (!bmap) return 0;
471
  latest = chunkID_set_get_latest(bmap);
472
  if (latest == INT_MIN) return 0;
473

    
474
  return -latest;
475
}
476

    
477
double chunkScoreChunkID(int *cid){
478
  return (double) *cid;
479
}
480

    
481
uint64_t get_chunk_deadline(int cid){
482
  const struct chunk_attributes * ca;
483
  const struct chunk *c;
484

    
485
  c = cb_get_chunk(cb, cid);
486
  if (!c) return 0;
487

    
488
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
489
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
490
    return 0;
491
  }
492

    
493
  ca = (struct chunk_attributes *) c->attributes;
494
  return ca->deadline;
495
}
496

    
497
double chunkScoreDL(int *cid){
498
  return - (double)get_chunk_deadline(*cid);
499
}
500

    
501
double chunkScoreTimestamp(int *cid){
502
  return (double) get_chunk_timestamp(*cid);
503
}
504

    
505
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
506
  int i, d, cset_acc_size, res;
507
  struct peer *to = nodeid_to_peer(toid, 0);
508

    
509
  transaction_reg_accept(trans_id, toid);
510

    
511
  cset_acc_size = chunkID_set_size(cset_acc);
512
  reg_offer_accept_out(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
513
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
514
    const struct chunk *c;
515
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
516
    c = cb_get_chunk(cb, chunkid);
517
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
518
      chunk_attributes_update_sending(c);
519
      res = sendChunk(toid, c, trans_id);
520
      if (res >= 0) {
521
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
522
        d++;
523
        reg_chunk_send(c->id);
524
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(toid), gettimeofday_in_us(), res, c->size);}
525
      } else {
526
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
527
      }
528
    }
529
  }
530
}
531

    
532
int offer_peer_count()
533
{
534
  return offer_per_tick;
535
}
536

    
537
int offer_max_deliver(struct nodeID *n)
538
{
539

    
540
  if (!heuristics_distance_maxdeliver) return 1;
541

    
542
#ifdef MONL
543
  switch (get_hopcount(n)) {
544
    case 0: return 5;
545
    case 1: return 2;
546
    default: return 1;
547
  }
548
#else
549
  return 1;
550
#endif
551
}
552

    
553
static struct chunkID_set * compose_offer_cset(void)
554
{
555
  return cb_to_bmap(cb);
556
}
557

    
558

    
559
void send_offer()
560
{
561
  struct chunk *buff;
562
  int size, res, i, n;
563
  struct peer *neighbours;
564
  struct peerset *pset;
565

    
566
  pset = get_peers();
567
  n = peerset_size(pset);
568
  neighbours = peerset_get_peers(pset);
569
  dprintf("Send Offer: %d neighbours\n", n);
570
  if (n == 0) return;
571
  buff = cb_get_chunks(cb, &size);
572
  if (size == 0) return;
573

    
574
  {
575
    size_t selectedpeers_len = offer_peer_count();
576
    int chunkids[size];
577
    struct peer *nodeids[n];
578
    struct peer *selectedpeers[selectedpeers_len];
579

    
580
    //reduce load a little bit if there are losses on the path from this guy
581
    double average_lossrate = get_average_lossrate_pset(pset);
582
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
583
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
584
      return;
585
    }
586

    
587
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
588
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
589
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
590

    
591
    for (i=0; i<selectedpeers_len ; i++){
592
      int transid = transaction_create(selectedpeers[i]->id);
593
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
594
      struct chunkID_set *offer_cset = compose_offer_cset();
595
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
596
      res = offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
597
      chunkID_set_free(offer_cset);
598
    }
599
  }
600
}
601

    
602

    
603
void send_chunk()
604
{
605
  struct chunk *buff;
606
  int size, res, i, n;
607
  struct peer *neighbours;
608
  struct peerset *pset;
609

    
610
  pset = get_peers();
611
  n = peerset_size(pset);
612
  neighbours = peerset_get_peers(pset);
613
  dprintf("Send Chunk: %d neighbours\n", n);
614
  if (n == 0) return;
615
  buff = cb_get_chunks(cb, &size);
616
  dprintf("\t %d chunks in buffer...\n", size);
617
  if (size == 0) return;
618

    
619
  /************ STUPID DUMB SCHEDULING ****************/
620
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
621
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
622
  /************ /STUPID DUMB SCHEDULING ****************/
623

    
624
  /************ USE SCHEDULER ****************/
625
  {
626
    size_t selectedpairs_len = 1;
627
    int chunkids[size];
628
    struct peer *nodeids[n];
629
    struct PeerChunk selectedpairs[1];
630
  
631
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
632
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
633
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
634
  /************ /USE SCHEDULER ****************/
635

    
636
    for (i=0; i<selectedpairs_len ; i++){
637
      struct peer *p = selectedpairs[i].peer;
638
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
639
      dprintf("\t sending chunk[%d] to ", c->id);
640
      dprintf("%s\n", node_addr(p->id));
641

    
642
      if (send_bmap_before_push) {
643
        send_bmap(p->id);
644
      }
645

    
646
      chunk_attributes_update_sending(c);
647
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
648
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
649
      dprintf("\tResult: %d\n", res);
650
      if (res>=0) {
651
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
652
        reg_chunk_send(c->id);
653
      } else {
654
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
655
      }
656
    }
657
  }
658
}