Statistics
| Branch: | Revision:

streamers / streaming.c @ 6fa45de5

History | View | Annotate | Download (17.9 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27
#include <chunkiser_attrib.h>
28

    
29
#include "streaming.h"
30
#include "output.h"
31
#include "input.h"
32
#include "dbg.h"
33
#include "chunk_signaling.h"
34
#include "chunklock.h"
35
#include "topology.h"
36
#include "measures.h"
37
#include "scheduling.h"
38

    
39
#include "scheduler_la.h"
40

    
41
# define CB_SIZE_TIME_UNLIMITED 1e12
42
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
43

    
44
static bool heuristics_distance_maxdeliver = false;
45
static int bcast_after_receive_every = 0;
46
static bool neigh_on_chunk_recv = true;
47

    
48
struct chunk_attributes {
49
  uint64_t deadline;
50
  uint16_t deadline_increment;
51
  uint16_t hopcount;
52
} __attribute__((packed));
53

    
54
extern bool chunk_log;
55

    
56
struct chunk_buffer *cb;
57
static struct input_desc *input;
58
static int cb_size;
59
static int transid=0;
60

    
61
static int offer_per_tick = 1;        //N_p parameter of POLITO
62

    
63
int _needs(struct chunkID_set *cset, int cb_size, int cid);
64

    
65
uint64_t gettimeofday_in_us(void)
66
{
67
  struct timeval what_time; //to store the epoch time
68

    
69
  gettimeofday(&what_time, NULL);
70
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
71
}
72

    
73
void cb_print()
74
{
75
#ifdef DEBUG
76
  struct chunk *chunks;
77
  int num_chunks, i, id;
78
  chunks = cb_get_chunks(cb, &num_chunks);
79

    
80
  dprintf("\tchbuf :");
81
  i = 0;
82
  if(num_chunks) {
83
    id = chunks[0].id;
84
    dprintf(" %d-> ",id);
85
    while (i < num_chunks) {
86
      if (id == chunks[i].id) {
87
        dprintf("%d",id % 10);
88
        i++;
89
      } else if (chunk_islocked(id)) {
90
        dprintf("*");
91
      } else {
92
        dprintf(".");
93
      }
94
      id++;
95
    }
96
  }
97
  dprintf("\n");
98
#endif
99
}
100

    
101
void stream_init(int size, struct nodeID *myID)
102
{
103
  static char conf[32];
104

    
105
  cb_size = size;
106

    
107
  sprintf(conf, "size=%d", cb_size);
108
  cb = cb_init(conf);
109
  chunkDeliveryInit(myID);
110
  chunkSignalingInit(myID);
111
  init_measures();
112
}
113

    
114
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
115
{
116
  int flags = 0;
117

    
118
  if (memcmp(fname, "udp:", 4) == 0) {
119
    fname += 4;
120
    flags = INPUT_UDP;
121
  }
122
  if (memcmp(fname, "ipb:", 4) == 0) {
123
    fname += 4;
124
    flags = INPUT_IPB;
125
  }
126
  if (loop) {
127
    flags |= INPUT_LOOP;
128
  }
129

    
130
  input = input_open(fname, flags, fds, fds_size);
131
  if (input == NULL) {
132
    return -1;
133
  }
134

    
135
  stream_init(1, myID);
136
  return 0;
137
}
138

    
139
void chunk_attributes_fill(struct chunk* c)
140
{
141
  struct chunk_attributes * ca;
142
  int priority = 1;
143

    
144
  assert((!c->attributes && c->attributes_size == 0) ||
145
         chunk_attributes_chunker_verify(c->attributes, c->attributes_size));
146

    
147
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
148
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
149
    free(c->attributes);
150
    c->attributes = NULL;
151
    c->attributes_size = 0;
152
  }
153

    
154
  c->attributes_size = sizeof(struct chunk_attributes);
155
  c->attributes = ca = malloc(c->attributes_size);
156

    
157
  ca->deadline = c->id;
158
  ca->deadline_increment = priority * 2;
159
  ca->hopcount = 0;
160
}
161

    
162
int chunk_get_hopcount(struct chunk* c) {
163
  struct chunk_attributes * ca;
164

    
165
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
166
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
167
    return -1;
168
  }
169

    
170
  ca = (struct chunk_attributes *) c->attributes;
171
  return ca->hopcount;
172
}
173

    
174
void chunk_attributes_update_received(struct chunk* c)
175
{
176
  struct chunk_attributes * ca;
177

    
178
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
179
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
180
    return;
181
  }
182

    
183
  ca = (struct chunk_attributes *) c->attributes;
184
  ca->hopcount++;
185
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
186
}
187

    
188
void chunk_attributes_update_sending(const struct chunk* c)
189
{
190
  struct chunk_attributes * ca;
191

    
192
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
193
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
194
    return;
195
  }
196

    
197
  ca = (struct chunk_attributes *) c->attributes;
198
  ca->deadline += ca->deadline_increment;
199
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
200
}
201

    
202
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
203
{
204
  struct chunk *chunks;
205
  int num_chunks, i;
206
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
207
  chunks = cb_get_chunks(chbuf, &num_chunks);
208

    
209
  for(i=num_chunks-1; i>=0; i--) {
210
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
211
  }
212
  return my_bmap;
213
}
214

    
215
// a simple implementation that request everything that we miss ... up to max deliver
216
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
217
  struct chunkID_set *cset_acc, *my_bmap;
218
  int i, d, cset_off_size;
219
  //double lossrate;
220
  struct peer *from = nodeid_to_peer(fromid, 0);
221

    
222
  cset_acc = chunkID_set_init("size=0");
223

    
224
  //reduce load a little bit if there are losses on the path from this guy
225
  //lossrate = get_lossrate_receive(from->id);
226
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
227
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
228
    my_bmap = cb_to_bmap(cb);
229
    cset_off_size = chunkID_set_size(cset_off);
230
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
231
      int chunkid = chunkID_set_get_chunk(cset_off, i);
232
      //dprintf("\tdo I need c%d ? :",chunkid);
233
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
234
        chunkID_set_add_chunk(cset_acc, chunkid);
235
        chunk_lock(chunkid,from);
236
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
237
#ifdef MONL
238
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
239
#endif
240
        dprintf("\n");
241
        d++;
242
      }
243
    }
244
    chunkID_set_free(my_bmap);
245
  //} else {
246
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
247
  //}
248

    
249
  return cset_acc;
250
}
251

    
252
void send_bmap(struct nodeID *toid)
253
{
254
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
255
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
256
  chunkID_set_free(my_bmap);
257
}
258

    
259
void bcast_bmap()
260
{
261
  int i, n;
262
  struct peer *neighbours;
263
  struct peerset *pset;
264
  struct chunkID_set *my_bmap;
265

    
266
  pset = get_peers();
267
  n = peerset_size(pset);
268
  neighbours = peerset_get_peers(pset);
269

    
270
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
271
  for (i = 0; i<n; i++) {
272
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
273
  }
274
  chunkID_set_free(my_bmap);
275
}
276

    
277
double get_average_lossrate_pset(struct peerset *pset)
278
{
279
  int i, n;
280
  struct peer *neighbours;
281

    
282
  n = peerset_size(pset);
283
  neighbours = peerset_get_peers(pset);
284
  {
285
    struct nodeID *nodeids[n];
286
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
287
#ifdef MONL
288
    return get_average_lossrate(nodeids, n);
289
#else
290
    return 0;
291
#endif
292
  }
293
}
294

    
295
void ack_chunk(struct chunk *c, struct nodeID *from)
296
{
297
  //reduce load a little bit if there are losses on the path from this guy
298
  double average_lossrate = get_average_lossrate_pset(get_peers());
299
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
300
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
301
    return;
302
  }
303
  send_bmap(from);        //send explicit ack
304
}
305

    
306
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
307
{
308
  int res;
309
  static struct chunk c;
310
  struct peer *p;
311
  static int bcast_cnt;
312
  uint16_t transid;
313

    
314
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
315
  if (res > 0) {
316
    chunk_attributes_update_received(&c);
317
    chunk_unlock(c.id);
318
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
319
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
320
    output_deliver(&c);
321
    res = cb_add_chunk(cb, &c);
322
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
323
    cb_print();
324
    if (res < 0) {
325
      dprintf("\tchunk too old, buffer full with newer chunks\n");
326
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
327
      free(c.data);
328
      free(c.attributes);
329
    }
330
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
331
    if (p) {        //now we have it almost sure
332
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
333
    }
334
    ack_chunk(&c,from);        //send explicit ack
335
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
336
       bcast_bmap();
337
    }
338
  } else {
339
    fprintf(stderr,"\tError: can't decode chunk!\n");
340
  }
341
}
342

    
343
struct chunk *generated_chunk(suseconds_t *delta)
344
{
345
  struct chunk *c;
346

    
347
  c = malloc(sizeof(struct chunk));
348
  if (!c) {
349
    fprintf(stderr, "Memory allocation error!\n");
350
    return NULL;
351
  }
352

    
353
  *delta = input_get(input, c);
354
  if (*delta < 0) {
355
    fprintf(stderr, "Error in input!\n");
356
    exit(-1);
357
  }
358
  if (c->data == NULL) {
359
    free(c);
360
    return NULL;
361
  }
362
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
363
  chunk_attributes_fill(c);
364
  return c;
365
}
366

    
367
int add_chunk(struct chunk *c)
368
{
369
  int res;
370

    
371
  res = cb_add_chunk(cb, c);
372
  if (res < 0) {
373
    free(c->data);
374
    free(c->attributes);
375
    free(c);
376
    return 0;
377
  }
378
  free(c);
379
  return 1;
380
}
381

    
382
uint64_t get_chunk_timestamp(int cid){
383
  const struct chunk *c = cb_get_chunk(cb, cid);
384
  if (!c) return 0;
385

    
386
  return c->timestamp;
387
}
388

    
389
/**
390
 *example function to filter chunks based on whether a given peer needs them.
391
 *
392
 * Looks at buffermap information received about the given peer.
393
 */
394
int needs(struct peer *n, int cid){
395
  struct peer * p = n;
396

    
397
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
398
    uint64_t ts;
399
    ts = get_chunk_timestamp(cid);
400
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
401
      return 0;
402
    }
403
  }
404

    
405
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
406
  if (! p->bmap) {
407
    //dprintf("no bmap\n");
408
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
409
  }
410
  return _needs(p->bmap, p->cb_size, cid);
411
}
412

    
413
int _needs(struct chunkID_set *cset, int cb_size, int cid){
414

    
415
  if (cb_size == 0) { //if it declared it does not needs chunks
416
    return 0;
417
  }
418

    
419
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
420
    uint64_t ts;
421
    ts = get_chunk_timestamp(cid);
422
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
423
      return 0;
424
    }
425
  }
426

    
427
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
428
    int missing, min;
429
    //@TODO: add some bmap_timestamp based logic
430

    
431
    if (chunkID_set_size(cset) == 0) {
432
      //dprintf("bmap empty\n");
433
      return 1;        // if the bmap seems empty, it needs the chunk
434
    }
435
    missing = cb_size - chunkID_set_size(cset);
436
    missing = missing < 0 ? 0 : missing;
437
    min = chunkID_set_get_earliest(cset);
438
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
439
    return (cid >= min - missing);
440
  }
441

    
442
  //dprintf("has it\n");
443
  return 0;
444
}
445

    
446
double peerWeightReceivedfrom(struct peer **n){
447
  struct peer * p = *n;
448
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
449
}
450

    
451
double peerWeightUniform(struct peer **n){
452
  return 1;
453
}
454

    
455
double peerWeightRtt(struct peer **n){
456
#ifdef MONL
457
  double rtt = get_rtt((*n)->id);
458
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
459
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
460
#else
461
  return 1;
462
#endif
463
}
464

    
465
//ordering function for ELp peer selection, chunk ID based
466
//can't be used as weight
467
double peerScoreELpID(struct nodeID **n){
468
  struct chunkID_set *bmap;
469
  int latest;
470
  struct peer * p = nodeid_to_peer(*n, 0);
471
  if (!p) return 0;
472

    
473
  bmap = p->bmap;
474
  if (!bmap) return 0;
475
  latest = chunkID_set_get_latest(bmap);
476
  if (latest == INT_MIN) return 0;
477

    
478
  return -latest;
479
}
480

    
481
double chunkScoreChunkID(int *cid){
482
  return (double) *cid;
483
}
484

    
485
uint64_t get_chunk_deadline(int cid){
486
  const struct chunk_attributes * ca;
487
  const struct chunk *c;
488

    
489
  c = cb_get_chunk(cb, cid);
490
  if (!c) return 0;
491

    
492
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
493
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
494
    return 0;
495
  }
496

    
497
  ca = (struct chunk_attributes *) c->attributes;
498
  return ca->deadline;
499
}
500

    
501
double chunkScoreDL(int *cid){
502
  return - (double)get_chunk_deadline(*cid);
503
}
504

    
505
double chunkScoreTimestamp(int *cid){
506
  return (double) get_chunk_timestamp(*cid);
507
}
508

    
509
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
510
  int i, d, cset_acc_size, res;
511
  struct peer *to = nodeid_to_peer(toid, 0);
512

    
513
  cset_acc_size = chunkID_set_size(cset_acc);
514
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
515
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
516
    const struct chunk *c;
517
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
518
    c = cb_get_chunk(cb, chunkid);
519
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
520
      chunk_attributes_update_sending(c);
521
      res = sendChunk(toid, c, trans_id);
522
      if (res >= 0) {
523
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
524
        d++;
525
        reg_chunk_send(c->id);
526
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
527
      } else {
528
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
529
      }
530
    }
531
  }
532
}
533

    
534
int offer_peer_count()
535
{
536
  return offer_per_tick;
537
}
538

    
539
int offer_max_deliver(struct nodeID *n)
540
{
541

    
542
  if (!heuristics_distance_maxdeliver) return 1;
543

    
544
#ifdef MONL
545
  switch (get_hopcount(n)) {
546
    case 0: return 5;
547
    case 1: return 2;
548
    default: return 1;
549
  }
550
#else
551
  return 1;
552
#endif
553
}
554

    
555
void send_offer()
556
{
557
  struct chunk *buff;
558
  int size, res, i, n;
559
  struct peer *neighbours;
560
  struct peerset *pset;
561

    
562
  pset = get_peers();
563
  n = peerset_size(pset);
564
  neighbours = peerset_get_peers(pset);
565
  dprintf("Send Offer: %d neighbours\n", n);
566
  if (n == 0) return;
567
  buff = cb_get_chunks(cb, &size);
568
  if (size == 0) return;
569

    
570
  {
571
    size_t selectedpeers_len = offer_peer_count();
572
    int chunkids[size];
573
    struct peer *nodeids[n];
574
    struct peer *selectedpeers[selectedpeers_len];
575

    
576
    //reduce load a little bit if there are losses on the path from this guy
577
    double average_lossrate = get_average_lossrate_pset(pset);
578
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
579
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
580
      return;
581
    }
582

    
583
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
584
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
585
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
586

    
587
    for (i=0; i<selectedpeers_len ; i++){
588
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
589
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
590
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
591
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
592
      chunkID_set_free(my_bmap);
593
    }
594
  }
595
}
596

    
597

    
598
void send_chunk()
599
{
600
  struct chunk *buff;
601
  int size, res, i, n;
602
  struct peer *neighbours;
603
  struct peerset *pset;
604

    
605
  pset = get_peers();
606
  n = peerset_size(pset);
607
  neighbours = peerset_get_peers(pset);
608
  dprintf("Send Chunk: %d neighbours\n", n);
609
  if (n == 0) return;
610
  buff = cb_get_chunks(cb, &size);
611
  dprintf("\t %d chunks in buffer...\n", size);
612
  if (size == 0) return;
613

    
614
  /************ STUPID DUMB SCHEDULING ****************/
615
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
616
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
617
  /************ /STUPID DUMB SCHEDULING ****************/
618

    
619
  /************ USE SCHEDULER ****************/
620
  {
621
    size_t selectedpairs_len = 1;
622
    int chunkids[size];
623
    struct peer *nodeids[n];
624
    struct PeerChunk selectedpairs[1];
625
  
626
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
627
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
628
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
629
  /************ /USE SCHEDULER ****************/
630

    
631
    for (i=0; i<selectedpairs_len ; i++){
632
      struct peer *p = selectedpairs[i].peer;
633
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
634
      dprintf("\t sending chunk[%d] to ", c->id);
635
      dprintf("%s\n", node_addr(p->id));
636

    
637
      send_bmap(p->id);
638

    
639
      chunk_attributes_update_sending(c);
640
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
641
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
642
      dprintf("\tResult: %d\n", res);
643
      if (res>=0) {
644
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
645
        reg_chunk_send(c->id);
646
      } else {
647
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
648
      }
649
    }
650
  }
651
}