Statistics
| Branch: | Revision:

streamers / streaming.c @ 9819f35f

History | View | Annotate | Download (18.3 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27
#include <chunkiser_attrib.h>
28

    
29
#include "streaming.h"
30
#include "output.h"
31
#include "input.h"
32
#include "dbg.h"
33
#include "loop.h"
34
#include "hrc.h"
35
#include "chunk_signaling.h"
36
#include "chunklock.h"
37
#include "topology.h"
38
#include "measures.h"
39
#include "scheduling.h"
40
#include "transaction.h"
41

    
42
#include "scheduler_la.h"
43

    
44
#define CB_SIZE_TIME_UNLIMITED 1e12
45
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
46

    
47
static bool heuristics_distance_maxdeliver = false;
48
static int bcast_after_receive_every = 0;
49
static bool neigh_on_chunk_recv = false;
50

    
51
struct chunk_attributes {
52
  uint64_t deadline;
53
  uint16_t deadline_increment;
54
  uint16_t hopcount;
55
} __attribute__((packed));
56

    
57
extern bool chunk_log;
58

    
59
struct chunk_buffer *cb;
60
static struct input_desc *input;
61
static int cb_size;
62

    
63
int _needs(struct chunkID_set *cset, int cb_size, int cid);
64

    
65
uint64_t gettimeofday_in_us(void)
66
{
67
  struct timeval what_time; //to store the epoch time
68

    
69
  gettimeofday(&what_time, NULL);
70
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
71
}
72

    
73
void cb_print()
74
{
75
#ifdef DEBUG
76
  struct chunk *chunks;
77
  int num_chunks, i, id;
78
  chunks = cb_get_chunks(cb, &num_chunks);
79

    
80
  dprintf("\tchbuf :");
81
  i = 0;
82
  if(num_chunks) {
83
    id = chunks[0].id;
84
    dprintf(" %d-> ",id);
85
    while (i < num_chunks) {
86
      if (id == chunks[i].id) {
87
        dprintf("%d",id % 10);
88
        i++;
89
      } else if (chunk_islocked(id)) {
90
        dprintf("*");
91
      } else {
92
        dprintf(".");
93
      }
94
      id++;
95
    }
96
  }
97
  dprintf("\n");
98
#endif
99
}
100

    
101
void stream_init(int size, struct nodeID *myID)
102
{
103
  static char conf[32];
104

    
105
  cb_size = size;
106

    
107
  sprintf(conf, "size=%d", cb_size);
108
  cb = cb_init(conf);
109
  chunkDeliveryInit(myID);
110
  chunkSignalingInit(myID);
111
  init_measures();
112
}
113

    
114
int source_init(const char *fname, struct nodeID *myID, int *fds, int fds_size, int buff_size)
115
{
116
  input = input_open(fname, fds, fds_size);
117
  if (input == NULL) {
118
    return -1;
119
  }
120

    
121
  stream_init(buff_size, myID);
122
  return 0;
123
}
124

    
125
void chunk_attributes_fill(struct chunk* c)
126
{
127
  struct chunk_attributes * ca;
128
  int priority = 1;
129

    
130
  assert((!c->attributes && c->attributes_size == 0) ||
131
         chunk_attributes_chunker_verify(c->attributes, c->attributes_size));
132

    
133
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
134
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
135
    free(c->attributes);
136
    c->attributes = NULL;
137
    c->attributes_size = 0;
138
  }
139

    
140
  c->attributes_size = sizeof(struct chunk_attributes);
141
  c->attributes = ca = malloc(c->attributes_size);
142

    
143
  ca->deadline = c->id;
144
  ca->deadline_increment = priority * 2;
145
  ca->hopcount = 0;
146
}
147

    
148
int chunk_get_hopcount(struct chunk* c) {
149
  struct chunk_attributes * ca;
150

    
151
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
152
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
153
    return -1;
154
  }
155

    
156
  ca = (struct chunk_attributes *) c->attributes;
157
  return ca->hopcount;
158
}
159

    
160
void chunk_attributes_update_received(struct chunk* c)
161
{
162
  struct chunk_attributes * ca;
163

    
164
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
165
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
166
    return;
167
  }
168

    
169
  ca = (struct chunk_attributes *) c->attributes;
170
  ca->hopcount++;
171
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
172
}
173

    
174
void chunk_attributes_update_sending(const struct chunk* c)
175
{
176
  struct chunk_attributes * ca;
177

    
178
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
179
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
180
    return;
181
  }
182

    
183
  ca = (struct chunk_attributes *) c->attributes;
184
  ca->deadline += ca->deadline_increment;
185
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
186
}
187

    
188
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
189
{
190
  struct chunk *chunks;
191
  int num_chunks, i;
192
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
193
  chunks = cb_get_chunks(chbuf, &num_chunks);
194

    
195
  for(i=num_chunks-1; i>=0; i--) {
196
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
197
  }
198
  return my_bmap;
199
}
200

    
201
// a simple implementation that request everything that we miss ... up to max deliver
202
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
203
  struct chunkID_set *cset_acc, *my_bmap;
204
  int i, d, cset_off_size, offset;
205

    
206
  //double lossrate;
207
  struct peer *from = nodeid_to_peer_incoming(fromid, 0);
208

    
209
  cset_acc = chunkID_set_init("size=0");
210

    
211
    my_bmap = cb_to_bmap(cb);
212
    cset_off_size = chunkID_set_size(cset_off);
213
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
214
      int chunkid = chunkID_set_get_chunk(cset_off, i % cset_off_size);
215
      //dprintf("\tdo I need c%d ? :",chunkid);
216
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
217
        chunkID_set_add_chunk(cset_acc, chunkid);
218
        chunk_lock(chunkid,from);
219
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
220
#ifdef MONL
221
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
222
#endif
223
        dprintf("\n");
224
        d++;
225
      }
226
    }
227
    chunkID_set_free(my_bmap);
228

    
229
  reg_offer_accept_in(chunkID_set_size(cset_acc) > 0 ? 1 : 0);
230

    
231
  return cset_acc;
232
}
233

    
234
void send_bmap(struct nodeID *toid)
235
{
236
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
237
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
238
  chunkID_set_free(my_bmap);
239
}
240

    
241
void bcast_bmap()
242
{
243
  int i, n;
244
  struct peer *neighbours;
245
  struct peerset *pset;
246
  struct chunkID_set *my_bmap;
247

    
248
  pset = get_outgoing_peers();
249
  n = peerset_size(pset);
250
  neighbours = peerset_get_peers(pset);
251

    
252
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
253
  for (i = 0; i<n; i++) {
254
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
255
  }
256
  chunkID_set_free(my_bmap);
257
}
258

    
259
void send_ack(struct nodeID *toid, uint16_t trans_id)
260
{
261
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
262
  sendAck(toid, my_bmap,trans_id);
263
  chunkID_set_free(my_bmap);
264
}
265

    
266
double get_average_lossrate_pset(struct peerset *pset)
267
{
268
  int i, n;
269
  struct peer *neighbours;
270

    
271
  n = peerset_size(pset);
272
  neighbours = peerset_get_peers(pset);
273
  {
274
    struct nodeID *nodeids[n];
275
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
276
#ifdef MONL
277
    return get_average_lossrate(nodeids, n);
278
#else
279
    return 0;
280
#endif
281
  }
282
}
283

    
284
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
285
{
286
  send_ack(from, trans_id);        //send explicit ack
287
  dprintf("Send ACK for chunk from peer: %s, trans_id %d\n", node_addr(from), trans_id);
288
}
289

    
290
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
291
{
292
  int res;
293
  static struct chunk c;
294
  struct peer *p;
295
  static int bcast_cnt;
296
  uint16_t transid;
297

    
298
  // Check the list to find elements over timeout
299
  transactions_check();
300

    
301
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
302
//   dprintf("Received chunk from peer: %s, trans_id %d\n", node_addr(from), transid);
303
  if (res > 0) {
304
    chunk_attributes_update_received(&c);
305
    chunk_unlock(c.id);
306
    dprintf("Received chunk %d from peer: %s, trans_id %d\n", c.id, node_addr(from), transid);
307
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
308
    output_deliver(&c);
309
    res = cb_add_chunk(cb, &c);
310
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
311
    cb_print();
312
    if (res < 0) {
313
      dprintf("\tchunk too old, buffer full with newer chunks\n");
314
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
315
      free(c.data);
316
      free(c.attributes);
317
    }
318
    p = nodeid_to_peer_incoming(from, neigh_on_chunk_recv);
319
    if (p) {        //now we have it almost sure
320
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
321
    }
322
    ack_chunk(&c, from, transid);        //send explicit ack
323
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
324
       bcast_bmap();
325
    }
326
  } else {
327
    fprintf(stderr,"\tError: can't decode chunk!\n");
328
  }
329
}
330

    
331
struct chunk *generated_chunk(suseconds_t *delta)
332
{
333
  struct chunk *c;
334

    
335
  c = malloc(sizeof(struct chunk));
336
  if (!c) {
337
    fprintf(stderr, "Memory allocation error!\n");
338
    return NULL;
339
  }
340

    
341
  *delta = input_get(input, c);
342
  if (*delta < 0) {
343
    fprintf(stderr, "Error in input!\n");
344
    exit(-1);
345
  }
346
  if (c->data == NULL) {
347
    free(c);
348
    return NULL;
349
  }
350
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
351
  chunk_attributes_fill(c);
352
  return c;
353
}
354

    
355
int add_chunk(struct chunk *c)
356
{
357
  int res;
358

    
359
  res = cb_add_chunk(cb, c);
360
  if (res < 0) {
361
    free(c->data);
362
    free(c->attributes);
363
    free(c);
364
    return 0;
365
  }
366
  free(c);
367
  return 1;
368
}
369

    
370
uint64_t get_chunk_timestamp(int cid){
371
  const struct chunk *c = cb_get_chunk(cb, cid);
372
  if (!c) return 0;
373

    
374
  return c->timestamp;
375
}
376

    
377
/**
378
 *example function to filter chunks based on whether a given peer needs them.
379
 *
380
 * Looks at buffermap information received about the given peer.
381
 */
382
int needs(struct peer *n, int cid){
383
  struct peer * p = n;
384

    
385
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
386
    uint64_t ts;
387
    ts = get_chunk_timestamp(cid);
388
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
389
      return 0;
390
    }
391
  }
392

    
393
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
394
  if (! p->bmap) {
395
    //dprintf("no bmap\n");
396
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
397
  }
398
  return _needs(p->bmap, p->cb_size, cid);
399
}
400

    
401
int _needs(struct chunkID_set *cset, int cb_size, int cid){
402

    
403
  if (cb_size == 0) { //if it declared it does not needs chunks
404
    return 0;
405
  }
406

    
407
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
408
    uint64_t ts;
409
    ts = get_chunk_timestamp(cid);
410
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
411
      return 0;
412
    }
413
  }
414

    
415
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
416
    int missing, min;
417
    //@TODO: add some bmap_timestamp based logic
418

    
419
    if (chunkID_set_size(cset) == 0) {
420
      //dprintf("bmap empty\n");
421
      return 1;        // if the bmap seems empty, it needs the chunk
422
    }
423
    missing = cb_size - chunkID_set_size(cset);
424
    missing = missing < 0 ? 0 : missing;
425
    min = chunkID_set_get_earliest(cset);
426
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
427
    return (cid >= min - missing);
428
  }
429

    
430
  //dprintf("has it\n");
431
  return 0;
432
}
433

    
434
double peerWeightReceivedfrom(struct peer **n){
435
  struct peer * p = *n;
436
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
437
}
438

    
439
double peerWeightUniform(struct peer **n){
440
  return 1;
441
}
442

    
443
double peerWeightRtt(struct peer **n){
444
#ifdef MONL
445
  double rtt = get_rtt((*n)->id);
446
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
447
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
448
#else
449
  return 1;
450
#endif
451
}
452

    
453
//ordering function for ELp peer selection, chunk ID based
454
//can't be used as weight
455
double peerScoreELpID(struct nodeID **n){
456
  struct chunkID_set *bmap;
457
  int latest;
458
  struct peer * p = nodeid_to_peer(*n, 0);
459
  if (!p) return 0;
460

    
461
  bmap = p->bmap;
462
  if (!bmap) return 0;
463
  latest = chunkID_set_get_latest(bmap);
464
  if (latest == INT_MIN) return 0;
465

    
466
  return -latest;
467
}
468

    
469
double chunkScoreChunkID(int *cid){
470
  return (double) *cid;
471
}
472

    
473
uint64_t get_chunk_deadline(int cid){
474
  const struct chunk_attributes * ca;
475
  const struct chunk *c;
476

    
477
  c = cb_get_chunk(cb, cid);
478
  if (!c) return 0;
479

    
480
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
481
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
482
    return 0;
483
  }
484

    
485
  ca = (struct chunk_attributes *) c->attributes;
486
  return ca->deadline;
487
}
488

    
489
double chunkScoreDL(int *cid){
490
  return - (double)get_chunk_deadline(*cid);
491
}
492

    
493
double chunkScoreTimestamp(int *cid){
494
  return (double) get_chunk_timestamp(*cid);
495
}
496

    
497
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
498
  int i, d, cset_acc_size, res;
499
  struct peer *to = nodeid_to_peer_outgoing(toid, 0);
500
  cset_acc_size = chunkID_set_size(cset_acc);
501
  reg_offer_accept_out(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
502
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
503
    const struct chunk *c;
504
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
505
    c = cb_get_chunk(cb, chunkid);
506
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
507
      chunk_attributes_update_sending(c);
508
      res = sendChunk(toid, c, trans_id);
509
      dprintf("We are actually sending the chunk!!!, res %d\n", res);
510
      if (res >= 0) {
511
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
512
                d++;
513
        reg_chunk_send(c->id);
514
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(toid), gettimeofday_in_us(), res, c->size);}
515
        dprintf("Sending chunk %d to peer: %s, trans_id %d \n", c->id, node_addr(toid), trans_id);
516
      } else {
517
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
518
      }
519
    }
520
  }
521
}
522

    
523
int offer_max_deliver(struct nodeID *n)
524
{
525

    
526
  if (!heuristics_distance_maxdeliver) return 1;
527

    
528
#ifdef MONL
529
  switch (get_hopcount(n)) {
530
    case 0: return 5;
531
    case 1: return 2;
532
    default: return 1;
533
  }
534
#else
535
  return 1;
536
#endif
537
}
538

    
539
void send_offer()
540
{
541
  struct chunk *buff;
542
  int size, res, i, n;
543
  static int last_neighbor = 0;
544
  struct peer *neighbours;
545
  struct peerset *pset;
546
  uint16_t trans_id = 0;
547
  int offer_to_send = 0;
548

    
549
  // Check the list to find elements over timeout
550
  transactions_check();
551
  
552
#ifdef DEBUG
553
        struct timeval current_time_timeval;
554
        gettimeofday(&current_time_timeval, NULL);
555
#endif
556

    
557
  pset = get_outgoing_peers();
558
  n = peerset_size(pset);
559
  neighbours = peerset_get_peers(pset);
560
  if (n == 0) return;
561
  buff = cb_get_chunks(cb, &size);
562
  if (size == 0) return;
563

    
564
  {
565
    int chunkids[size];
566
    struct peer *nodeids[n];
567

    
568
    //reduce load a little bit if there are losses on the path from this guy
569
    double average_lossrate = get_average_lossrate_pset(pset);
570
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
571
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
572
      return;
573
    }
574

    
575
    for (i = 0;i < size; i++) 
576
            chunkids[size - 1 - i] = (buff+i)->id;
577
    for (i = 0; i<n; i++) 
578
            nodeids[i] = (neighbours+i);
579

    
580
  if (get_fixed_offer_threads() > 0)
581
        {
582
        offer_to_send = get_fixed_offer_threads() - get_running_offer_threads();
583
        dprintf("offer_to_send is fixed to %d, running %d\n", offer_to_send, get_running_offer_threads());
584
        }
585
  else
586
          offer_to_send = offer_threads();
587
  
588
  dprintf("Send Offer: %d neighbours, %d size, %d offer_to_send\n", n, size, offer_to_send);
589

    
590
  for (i = 0; i < offer_to_send; i++){
591
        last_neighbor = last_neighbor % n;
592
        int max_deliver = 1;
593
        struct chunkID_set *my_bmap = cb_to_bmap(cb); 
594
        
595
        trans_id = transaction_create(nodeids[last_neighbor]->id);
596
        res = offerChunks(nodeids[last_neighbor]->id, my_bmap, max_deliver, trans_id);
597
        chunkID_set_free(my_bmap);
598
        modify_running_offer_threads(true);
599
        
600
        reg_offers_in_flight(get_running_offer_threads());            
601
        dprintf("HRC: last_neighbor %d, peer %s, transid %d\n", last_neighbor, node_addr(nodeids[last_neighbor]->id), trans_id );
602
        last_neighbor++;
603
    }
604
  }
605
}
606

    
607

    
608
void send_chunk()
609
{
610
  struct chunk *buff;
611
  int size, res, i, n;
612
  struct peer *neighbours;
613
  struct peerset *pset;
614

    
615
  pset = get_outgoing_peers();
616
  n = peerset_size(pset);
617
  neighbours = peerset_get_peers(pset);
618
  dprintf("Send Chunk: %d neighbours\n", n);
619
  if (n == 0) return;
620
  buff = cb_get_chunks(cb, &size);
621
  dprintf("\t %d chunks in buffer...\n", size);
622
  if (size == 0) return;
623

    
624
  /************ STUPID DUMB SCHEDULING ****************/
625
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
626
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
627
  /************ /STUPID DUMB SCHEDULING ****************/
628

    
629
  /************ USE SCHEDULER ****************/
630
  {
631
    size_t selectedpairs_len = 1;
632
    int chunkids[size];
633
    struct peer *nodeids[n];
634
    struct PeerChunk selectedpairs[1];
635
  
636
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
637
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
638
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
639
  /************ /USE SCHEDULER ****************/
640

    
641
    for (i=0; i<selectedpairs_len ; i++){
642
      struct peer *p = selectedpairs[i].peer;
643
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
644
      dprintf("\t sending chunk[%d] to ", c->id);
645
      dprintf("%s\n", node_addr(p->id));
646

    
647
      send_bmap(p->id);
648

    
649
      chunk_attributes_update_sending(c);
650
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
651
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
652
      dprintf("\tResult: %d\n", res);
653
      if (res>=0) {
654
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
655
        reg_chunk_send(c->id);
656
      } else {
657
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
658
      }
659
    }
660
  }
661
}