Statistics
| Branch: | Revision:

streamers / streaming.c @ a2cb3546

History | View | Annotate | Download (18.8 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27
#ifdef CHUNK_ATTRIB_CHUNKER
28
#include <chunkiser_attrib.h>
29
#endif
30

    
31
#include "streaming.h"
32
#include "streamer.h"
33
#include "output.h"
34
#include "input.h"
35
#include "dbg.h"
36
#include "chunk_signaling.h"
37
#include "chunklock.h"
38
#include "topology.h"
39
#include "measures.h"
40
#include "scheduling.h"
41
#include "transaction.h"
42
#include "node_addr.h"
43

    
44
#include "scheduler_la.h"
45

    
46
# define CB_SIZE_TIME_UNLIMITED 1e12
47
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
48

    
49
static bool heuristics_distance_maxdeliver = false;
50
static int bcast_after_receive_every = 0;
51
static bool neigh_on_chunk_recv = false;
52
static bool send_bmap_before_push = false;
53

    
54
struct chunk_attributes {
55
  uint64_t deadline;
56
  uint16_t deadline_increment;
57
  uint16_t hopcount;
58
} __attribute__((packed));
59

    
60
extern bool chunk_log;
61

    
62
struct chunk_buffer *cb;
63
static struct input_desc *input;
64
static int cb_size;
65

    
66
static int offer_per_tick = 1;        //N_p parameter of POLITO
67

    
68
int _needs(struct chunkID_set *cset, int cb_size, int cid);
69

    
70
uint64_t gettimeofday_in_us(void)
71
{
72
  struct timeval what_time; //to store the epoch time
73

    
74
  gettimeofday(&what_time, NULL);
75
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
76
}
77

    
78
void cb_print()
79
{
80
#ifdef DEBUG
81
  struct chunk *chunks;
82
  int num_chunks, i, id;
83
  chunks = cb_get_chunks(cb, &num_chunks);
84

    
85
  dprintf("\tchbuf :");
86
  i = 0;
87
  if(num_chunks) {
88
    id = chunks[0].id;
89
    dprintf(" %d-> ",id);
90
    while (i < num_chunks) {
91
      if (id == chunks[i].id) {
92
        dprintf("%d",id % 10);
93
        i++;
94
      } else if (chunk_islocked(id)) {
95
        dprintf("*");
96
      } else {
97
        dprintf(".");
98
      }
99
      id++;
100
    }
101
  }
102
  dprintf("\n");
103
#endif
104
}
105

    
106
void stream_init(int size, struct nodeID *myID)
107
{
108
  static char conf[32];
109

    
110
  cb_size = size;
111

    
112
  sprintf(conf, "size=%d", cb_size);
113
  cb = cb_init(conf);
114
  chunkDeliveryInit(myID);
115
  chunkSignalingInit(myID);
116
  init_measures();
117
}
118

    
119
int source_init(const char *fname, struct nodeID *myID, int *fds, int fds_size, int buff_size)
120
{
121
  input = input_open(fname, fds, fds_size);
122
  if (input == NULL) {
123
    return -1;
124
  }
125

    
126
  stream_init(buff_size, myID);
127
  return 0;
128
}
129

    
130
void chunk_attributes_fill(struct chunk* c)
131
{
132
  struct chunk_attributes * ca;
133
  int priority = 1;
134

    
135
  assert((!c->attributes && c->attributes_size == 0)
136
#ifdef CHUNK_ATTRIB_CHUNKER
137
      || chunk_attributes_chunker_verify(c->attributes, c->attributes_size)
138
#endif
139
  );
140

    
141
#ifdef CHUNK_ATTRIB_CHUNKER
142
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
143
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
144
    free(c->attributes);
145
    c->attributes = NULL;
146
    c->attributes_size = 0;
147
  }
148
#endif
149

    
150
  c->attributes_size = sizeof(struct chunk_attributes);
151
  c->attributes = ca = malloc(c->attributes_size);
152

    
153
  ca->deadline = c->id;
154
  ca->deadline_increment = priority * 2;
155
  ca->hopcount = 0;
156
}
157

    
158
int chunk_get_hopcount(struct chunk* c) {
159
  struct chunk_attributes * ca;
160

    
161
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
162
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
163
    return -1;
164
  }
165

    
166
  ca = (struct chunk_attributes *) c->attributes;
167
  return ca->hopcount;
168
}
169

    
170
void chunk_attributes_update_received(struct chunk* c)
171
{
172
  struct chunk_attributes * ca;
173

    
174
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
175
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
176
    return;
177
  }
178

    
179
  ca = (struct chunk_attributes *) c->attributes;
180
  ca->hopcount++;
181
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
182
}
183

    
184
void chunk_attributes_update_sending(const struct chunk* c)
185
{
186
  struct chunk_attributes * ca;
187

    
188
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
189
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
190
    return;
191
  }
192

    
193
  ca = (struct chunk_attributes *) c->attributes;
194
  ca->deadline += ca->deadline_increment;
195
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
196
}
197

    
198
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
199
{
200
  struct chunk *chunks;
201
  int num_chunks, i;
202
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
203
  chunks = cb_get_chunks(chbuf, &num_chunks);
204

    
205
  for(i=num_chunks-1; i>=0; i--) {
206
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
207
  }
208
  return my_bmap;
209
}
210

    
211
// a simple implementation that request everything that we miss ... up to max deliver
212
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
213
  struct chunkID_set *cset_acc, *my_bmap;
214
  int i, d, cset_off_size;
215
  //double lossrate;
216
  struct peer *from = nodeid_to_peer(fromid, 0);
217

    
218
  cset_acc = chunkID_set_init("size=0");
219

    
220
  //reduce load a little bit if there are losses on the path from this guy
221
  //lossrate = get_lossrate_receive(from->id);
222
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
223
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
224
    my_bmap = cb_to_bmap(cb);
225
    cset_off_size = chunkID_set_size(cset_off);
226
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
227
      int chunkid = chunkID_set_get_chunk(cset_off, i);
228
      //dprintf("\tdo I need c%d ? :",chunkid);
229
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
230
        chunkID_set_add_chunk(cset_acc, chunkid);
231
        chunk_lock(chunkid,from);
232
        dtprintf("accepting %d from %s", chunkid, node_addr_tr(fromid));
233
#ifdef MONL
234
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
235
#endif
236
        dprintf("\n");
237
        d++;
238
      }
239
    }
240
    chunkID_set_free(my_bmap);
241
  //} else {
242
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
243
  //}
244

    
245
  reg_offer_accept_in(chunkID_set_size(cset_acc) > 0 ? 1 : 0);
246

    
247
  return cset_acc;
248
}
249

    
250
void send_bmap(struct nodeID *toid)
251
{
252
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
253
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
254
  chunkID_set_free(my_bmap);
255
}
256

    
257
void bcast_bmap()
258
{
259
  int i, n;
260
  struct peer *neighbours;
261
  struct peerset *pset;
262
  struct chunkID_set *my_bmap;
263

    
264
  pset = get_peers();
265
  n = peerset_size(pset);
266
  neighbours = peerset_get_peers(pset);
267

    
268
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
269
  for (i = 0; i<n; i++) {
270
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
271
  }
272
  chunkID_set_free(my_bmap);
273
}
274

    
275
void send_ack(struct nodeID *toid, uint16_t trans_id)
276
{
277
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
278
  sendAck(toid, my_bmap,trans_id);
279
  chunkID_set_free(my_bmap);
280
}
281

    
282
double get_average_lossrate_pset(struct peerset *pset)
283
{
284
  int i, n;
285
  struct peer *neighbours;
286

    
287
  n = peerset_size(pset);
288
  neighbours = peerset_get_peers(pset);
289
  {
290
    struct nodeID *nodeids[n];
291
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
292
#ifdef MONL
293
    return get_average_lossrate(nodeids, n);
294
#else
295
    return 0;
296
#endif
297
  }
298
}
299

    
300
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
301
{
302
  //reduce load a little bit if there are losses on the path from this guy
303
  double average_lossrate = get_average_lossrate_pset(get_peers());
304
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
305
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
306
    return;
307
  }
308
  send_ack(from, trans_id);        //send explicit ack
309
}
310

    
311
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
312
{
313
  int res;
314
  static struct chunk c;
315
  struct peer *p;
316
  static int bcast_cnt;
317
  uint16_t transid;
318

    
319
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
320
  if (res > 0) {
321
    chunk_attributes_update_received(&c);
322
    chunk_unlock(c.id);
323
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr_tr(from));
324
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
325
    output_deliver(&c);
326
    res = cb_add_chunk(cb, &c);
327
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
328
    cb_print();
329
    if (res < 0) {
330
      dprintf("\tchunk too old, buffer full with newer chunks\n");
331
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
332
      free(c.data);
333
      free(c.attributes);
334
    }
335
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
336
    if (p) {        //now we have it almost sure
337
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
338
      gettimeofday(&p->bmap_timestamp, NULL);
339
    }
340
    ack_chunk(&c, from, transid);        //send explicit ack
341
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
342
       bcast_bmap();
343
    }
344
  } else {
345
    fprintf(stderr,"\tError: can't decode chunk!\n");
346
  }
347
}
348

    
349
struct chunk *generated_chunk(suseconds_t *delta)
350
{
351
  struct chunk *c;
352

    
353
  c = malloc(sizeof(struct chunk));
354
  if (!c) {
355
    fprintf(stderr, "Memory allocation error!\n");
356
    return NULL;
357
  }
358

    
359
  *delta = input_get(input, c);
360
  if (*delta < 0) {
361
    fprintf(stderr, "Error in input!\n");
362
    exit(-1);
363
  }
364
  if (c->data == NULL) {
365
    free(c);
366
    return NULL;
367
  }
368
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
369
  chunk_attributes_fill(c);
370
  return c;
371
}
372

    
373
int add_chunk(struct chunk *c)
374
{
375
  int res;
376

    
377
  res = cb_add_chunk(cb, c);
378
  if (res < 0) {
379
    free(c->data);
380
    free(c->attributes);
381
    free(c);
382
    return 0;
383
  }
384
  free(c);
385
  return 1;
386
}
387

    
388
uint64_t get_chunk_timestamp(int cid){
389
  const struct chunk *c = cb_get_chunk(cb, cid);
390
  if (!c) return 0;
391

    
392
  return c->timestamp;
393
}
394

    
395
/**
396
 *example function to filter chunks based on whether a given peer needs them.
397
 *
398
 * Looks at buffermap information received about the given peer.
399
 */
400
int needs(struct peer *n, int cid){
401
  struct peer * p = n;
402

    
403
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
404
    uint64_t ts;
405
    ts = get_chunk_timestamp(cid);
406
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
407
      return 0;
408
    }
409
  }
410

    
411
  //dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
412
  if (! p->bmap) {
413
    //dprintf("no bmap\n");
414
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
415
  }
416
  return _needs(p->bmap, p->cb_size, cid);
417
}
418

    
419
int _needs(struct chunkID_set *cset, int cb_size, int cid){
420

    
421
  if (cb_size == 0) { //if it declared it does not needs chunks
422
    return 0;
423
  }
424

    
425
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
426
    uint64_t ts;
427
    ts = get_chunk_timestamp(cid);
428
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
429
      return 0;
430
    }
431
  }
432

    
433
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
434
    int missing, min;
435
    //@TODO: add some bmap_timestamp based logic
436

    
437
    if (chunkID_set_size(cset) == 0) {
438
      //dprintf("bmap empty\n");
439
      return 1;        // if the bmap seems empty, it needs the chunk
440
    }
441
    missing = cb_size - chunkID_set_size(cset);
442
    missing = missing < 0 ? 0 : missing;
443
    min = chunkID_set_get_earliest(cset);
444
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
445
    return (cid >= min - missing);
446
  }
447

    
448
  //dprintf("has it\n");
449
  return 0;
450
}
451

    
452
double peerWeightReceivedfrom(struct peer **n){
453
  struct peer * p = *n;
454
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
455
}
456

    
457
double peerWeightUniform(struct peer **n){
458
  return 1;
459
}
460

    
461
double peerWeightRtt(struct peer **n){
462
#ifdef MONL
463
  double rtt = get_rtt((*n)->id);
464
  //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
465
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
466
#else
467
  return 1;
468
#endif
469
}
470

    
471
//ordering function for ELp peer selection, chunk ID based
472
//can't be used as weight
473
double peerScoreELpID(struct nodeID **n){
474
  struct chunkID_set *bmap;
475
  int latest;
476
  struct peer * p = nodeid_to_peer(*n, 0);
477
  if (!p) return 0;
478

    
479
  bmap = p->bmap;
480
  if (!bmap) return 0;
481
  latest = chunkID_set_get_latest(bmap);
482
  if (latest == INT_MIN) return 0;
483

    
484
  return -latest;
485
}
486

    
487
double chunkScoreChunkID(int *cid){
488
  return (double) *cid;
489
}
490

    
491
uint64_t get_chunk_deadline(int cid){
492
  const struct chunk_attributes * ca;
493
  const struct chunk *c;
494

    
495
  c = cb_get_chunk(cb, cid);
496
  if (!c) return 0;
497

    
498
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
499
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
500
    return 0;
501
  }
502

    
503
  ca = (struct chunk_attributes *) c->attributes;
504
  return ca->deadline;
505
}
506

    
507
double chunkScoreDL(int *cid){
508
  return - (double)get_chunk_deadline(*cid);
509
}
510

    
511
double chunkScoreTimestamp(int *cid){
512
  return (double) get_chunk_timestamp(*cid);
513
}
514

    
515
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
516
  int i, d, cset_acc_size, res;
517
  struct peer *to = nodeid_to_peer(toid, 0);
518

    
519
  transaction_reg_accept(trans_id, toid);
520

    
521
  cset_acc_size = chunkID_set_size(cset_acc);
522
  reg_offer_accept_out(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
523
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
524
    const struct chunk *c;
525
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
526
    c = cb_get_chunk(cb, chunkid);
527
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
528
      chunk_attributes_update_sending(c);
529
      res = sendChunk(toid, c, trans_id);
530
      if (res >= 0) {
531
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
532
        d++;
533
        reg_chunk_send(c->id);
534
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
535
      } else {
536
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
537
      }
538
    }
539
  }
540
}
541

    
542
int offer_peer_count()
543
{
544
  return offer_per_tick;
545
}
546

    
547
int offer_max_deliver(struct nodeID *n)
548
{
549

    
550
  if (!heuristics_distance_maxdeliver) return 1;
551

    
552
#ifdef MONL
553
  switch (get_hopcount(n)) {
554
    case 0: return 5;
555
    case 1: return 2;
556
    default: return 1;
557
  }
558
#else
559
  return 1;
560
#endif
561
}
562

    
563
static struct chunkID_set * compose_offer_cset(void)
564
{
565
  if (am_i_source()) {
566
    struct chunk *chunks;
567
    int num_chunks, j;
568
    struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
569
    chunks = cb_get_chunks(cb, &num_chunks);
570
    for(j=((num_chunks-1)*3)/4; j>=0; j--) {
571
      chunkID_set_add_chunk(my_bmap, chunks[j].id);
572
    }
573
    return my_bmap;
574
  } else {
575
    return cb_to_bmap(cb);
576
  }
577
}
578

    
579

    
580
void send_offer()
581
{
582
  struct chunk *buff;
583
  int size, res, i, n;
584
  struct peer *neighbours;
585
  struct peerset *pset;
586

    
587
  pset = get_peers();
588
  n = peerset_size(pset);
589
  neighbours = peerset_get_peers(pset);
590
  dprintf("Send Offer: %d neighbours\n", n);
591
  if (n == 0) return;
592
  buff = cb_get_chunks(cb, &size);
593
  if (size == 0) return;
594

    
595
  {
596
    size_t selectedpeers_len = offer_peer_count();
597
    int chunkids[size];
598
    struct peer *nodeids[n];
599
    struct peer *selectedpeers[selectedpeers_len];
600

    
601
    //reduce load a little bit if there are losses on the path from this guy
602
    double average_lossrate = get_average_lossrate_pset(pset);
603
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
604
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
605
      return;
606
    }
607

    
608
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
609
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
610
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
611

    
612
    for (i=0; i<selectedpeers_len ; i++){
613
      int transid = transaction_create(selectedpeers[i]->id);
614
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
615
      struct chunkID_set *offer_cset = compose_offer_cset();
616
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr_tr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
617
      res = offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
618
      chunkID_set_free(offer_cset);
619
    }
620
  }
621
}
622

    
623

    
624
void send_chunk()
625
{
626
  struct chunk *buff;
627
  int size, res, i, n;
628
  struct peer *neighbours;
629
  struct peerset *pset;
630

    
631
  pset = get_peers();
632
  n = peerset_size(pset);
633
  neighbours = peerset_get_peers(pset);
634
  dprintf("Send Chunk: %d neighbours\n", n);
635
  if (n == 0) return;
636
  buff = cb_get_chunks(cb, &size);
637
  dprintf("\t %d chunks in buffer...\n", size);
638
  if (size == 0) return;
639

    
640
  /************ STUPID DUMB SCHEDULING ****************/
641
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
642
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
643
  /************ /STUPID DUMB SCHEDULING ****************/
644

    
645
  /************ USE SCHEDULER ****************/
646
  {
647
    size_t selectedpairs_len = 1;
648
    int chunkids[size];
649
    struct peer *nodeids[n];
650
    struct PeerChunk selectedpairs[1];
651
  
652
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
653
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
654
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
655
  /************ /USE SCHEDULER ****************/
656

    
657
    for (i=0; i<selectedpairs_len ; i++){
658
      struct peer *p = selectedpairs[i].peer;
659
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
660
      dprintf("\t sending chunk[%d] to ", c->id);
661
      dprintf("%s\n", node_addr_tr(p->id));
662

    
663
      if (send_bmap_before_push) {
664
        send_bmap(p->id);
665
      }
666

    
667
      chunk_attributes_update_sending(c);
668
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
669
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
670
      dprintf("\tResult: %d\n", res);
671
      if (res>=0) {
672
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
673
        reg_chunk_send(c->id);
674
      } else {
675
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
676
      }
677
    }
678
  }
679
}