Statistics
| Branch: | Revision:

streamers / streaming.c @ 9a1f5816

History | View | Annotate | Download (26.7 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 *
5
 * This file is part of PeerStreamer.
6
 *
7
 * PeerStreamer is free software: you can redistribute it and/or
8
 * modify it under the terms of the GNU Affero General Public License as
9
 * published by the Free Software Foundation, either version 3 of the
10
 * License, or (at your option) any later version.
11
 *
12
 * PeerStreamer is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
15
 * General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Affero General Public License
18
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
19
 *
20
 */
21
#include <sys/time.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24
#include <stdint.h>
25
#include <stdbool.h>
26
#include <math.h>
27
#include <assert.h>
28
#include <string.h>
29
#include <inttypes.h>
30

    
31
#include <net_helper.h>
32
#include <chunk.h> 
33
#include <chunkbuffer.h> 
34
#include <trade_msg_la.h>
35
#include <trade_msg_ha.h>
36
#include <peerset.h>
37
#include <peer.h>
38
#include <chunkidset.h>
39
#include <limits.h>
40
#include <trade_sig_ha.h>
41
#ifdef CHUNK_ATTRIB_CHUNKER
42
#include <chunkiser_attrib.h>
43
#endif
44

    
45
#include "streaming.h"
46
#include "streamer.h"
47
#include "output.h"
48
#include "input.h"
49
#include "dbg.h"
50
#include "chunk_signaling.h"
51
#include "chunklock.h"
52
#include "topology.h"
53
#include "measures.h"
54
#include "scheduling.h"
55
#include "transaction.h"
56
#include "node_addr.h"
57

    
58
#include "scheduler_la.h"
59

    
60
# define CB_SIZE_TIME_UNLIMITED 1e12
61
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
62

    
63
static bool heuristics_distance_maxdeliver = false;
64
static int bcast_after_receive_every = 0;
65
static bool neigh_on_chunk_recv = false;
66
static bool send_bmap_before_push = false;
67

    
68
struct chunk_attributes {
69
  uint64_t deadline;
70
  uint16_t deadline_increment;
71
  uint16_t hopcount;
72
} __attribute__((packed));
73

    
74
extern bool chunk_log;
75
extern bool signal_log;
76
extern bool push_strategy;
77
extern unsigned int chunk_loss_interval;
78
extern int chunks_per_offer;
79

    
80
struct chunk_buffer *cb;
81
static struct input_desc *input;
82
static int cb_size;
83

    
84
static int offer_per_tick = 1;        //N_p parameter of POLITO
85

    
86
int _needs(struct chunkID_set *cset, int cb_size, int cid);
87

    
88
uint64_t gettimeofday_in_us(void)
89
{
90
  struct timeval what_time; //to store the epoch time
91

    
92
  gettimeofday(&what_time, NULL);
93
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
94
}
95

    
96
void cb_print()
97
{
98
#ifdef DEBUG
99
  struct chunk *chunks;
100
  int num_chunks, i, id;
101
  chunks = cb_get_chunks(cb, &num_chunks);
102

    
103
  dprintf("\tchbuf :");
104
  i = 0;
105
  if(num_chunks) {
106
    id = chunks[0].id;
107
    dprintf(" %d-> ",id);
108
    while (i < num_chunks) {
109
      if (id == chunks[i].id) {
110
        dprintf("%d",id % 10);
111
        i++;
112
      } else if (chunk_islocked(id)) {
113
        dprintf("*");
114
      } else {
115
        dprintf(".");
116
      }
117
      id++;
118
    }
119
  }
120
  dprintf("\n");
121
#endif
122
}
123

    
124
void stream_init(int size, struct nodeID *myID)
125
{
126
  static char conf[32];
127

    
128
  cb_size = size;
129

    
130
  sprintf(conf, "size=%d", cb_size);
131
  cb = cb_init(conf);
132
  chunkDeliveryInit(myID);
133
  chunkSignalingInit(myID);
134
  init_measures();
135
}
136

    
137
int source_init(const char *fname, struct nodeID *myID, int *fds, int fds_size, int buff_size)
138
{
139
  input = input_open(fname, fds, fds_size);
140
  if (input == NULL) {
141
    return -1;
142
  }
143

    
144
  stream_init(buff_size, myID);
145
  return 0;
146
}
147

    
148
void chunk_attributes_fill(struct chunk* c)
149
{
150
  struct chunk_attributes * ca;
151
  int priority = 1;
152

    
153
  assert((!c->attributes && c->attributes_size == 0)
154
#ifdef CHUNK_ATTRIB_CHUNKER
155
      || chunk_attributes_chunker_verify(c->attributes, c->attributes_size)
156
#endif
157
  );
158

    
159
#ifdef CHUNK_ATTRIB_CHUNKER
160
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
161
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
162
    free(c->attributes);
163
    c->attributes = NULL;
164
    c->attributes_size = 0;
165
  }
166
#endif
167

    
168
  c->attributes_size = sizeof(struct chunk_attributes);
169
  c->attributes = ca = malloc(c->attributes_size);
170

    
171
  ca->deadline = c->id;
172
  ca->deadline_increment = priority * 2;
173
  ca->hopcount = 0;
174
}
175

    
176
int chunk_get_hopcount(const struct chunk* c) {
177
  struct chunk_attributes * ca;
178

    
179
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
180
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
181
    return -1;
182
  }
183

    
184
  ca = (struct chunk_attributes *) c->attributes;
185
  return ca->hopcount;
186
}
187

    
188
void chunk_attributes_update_received(struct chunk* c)
189
{
190
  struct chunk_attributes * ca;
191

    
192
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
193
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
194
    return;
195
  }
196

    
197
  ca = (struct chunk_attributes *) c->attributes;
198
  ca->hopcount++;
199
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
200
}
201

    
202
void chunk_attributes_update_sending(const struct chunk* c)
203
{
204
  struct chunk_attributes * ca;
205

    
206
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
207
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
208
    return;
209
  }
210

    
211
  ca = (struct chunk_attributes *) c->attributes;
212
  ca->deadline += ca->deadline_increment;
213
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
214
}
215

    
216
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
217
{
218
  struct chunk *chunks;
219
  int num_chunks, i;
220
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
221
  chunks = cb_get_chunks(chbuf, &num_chunks);
222

    
223
  for(i=num_chunks-1; i>=0; i--) {
224
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
225
  }
226
  return my_bmap;
227
}
228

    
229
// a simple implementation that request everything that we miss ... up to max deliver
230
struct chunkID_set *get_chunks_to_accept(const struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
231
  struct chunkID_set *cset_acc, *my_bmap;
232
  int i, d, cset_off_size;
233
  //double lossrate;
234
  struct peer *from = nodeid_to_peer(fromid, 0);
235

    
236
  cset_acc = chunkID_set_init("size=0");
237

    
238
  //reduce load a little bit if there are losses on the path from this guy
239
  //lossrate = get_lossrate_receive(from->id);
240
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
241
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
242
    my_bmap = cb_to_bmap(cb);
243
    cset_off_size = chunkID_set_size(cset_off);
244
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
245
      int chunkid = chunkID_set_get_chunk(cset_off, i);
246
      //dprintf("\tdo I need c%d ? :",chunkid);
247
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
248
        chunkID_set_add_chunk(cset_acc, chunkid);
249
        chunk_lock(chunkid,from);
250
        dtprintf("accepting %d from %s", chunkid, node_addr_tr(fromid));
251
#ifdef MONL
252
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
253
#endif
254
        dprintf("\n");
255
        d++;
256
      }
257
    }
258
    chunkID_set_free(my_bmap);
259
  //} else {
260
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
261
  //}
262

    
263
  reg_offer_accept_in(chunkID_set_size(cset_acc) > 0 ? 1 : 0);
264

    
265
  return cset_acc;
266
}
267

    
268
void send_bmap(const struct nodeID *toid)
269
{
270
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
271
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
272
         if (signal_log) log_signal(get_my_addr(),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
273
  chunkID_set_free(my_bmap);
274
}
275

    
276
void bcast_bmap()
277
{
278
  int i, n;
279
  struct peer **neighbours;
280
  struct peerset *pset;
281
  struct chunkID_set *my_bmap;
282

    
283
  pset = topology_get_neighbours();
284
  n = peerset_size(pset);
285
  neighbours = peerset_get_peers(pset);
286

    
287
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
288
  for (i = 0; i<n; i++) {
289
    sendBufferMap(neighbours[i]->id,NULL, my_bmap, input ? 0 : cb_size, 0);
290
                 if (signal_log) log_signal(get_my_addr(),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
291
  }
292
  chunkID_set_free(my_bmap);
293
}
294

    
295
void send_ack(struct nodeID *toid, uint16_t trans_id)
296
{
297
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
298
  sendAck(toid, my_bmap,trans_id);
299
        if (signal_log) log_signal(get_my_addr(),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
300
  chunkID_set_free(my_bmap);
301
}
302

    
303
double get_average_lossrate_pset(struct peerset *pset)
304
{
305
#ifdef MONL
306
  int i, n;
307
  struct peer **neighbours;
308

    
309
  n = peerset_size(pset);
310
  neighbours = peerset_get_peers(pset);
311
  {
312
    struct nodeID *nodeids[n];
313
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i]->id;
314
    return get_average_lossrate(nodeids, n);
315
        }
316
#else
317
  return 0;
318
#endif
319
}
320

    
321
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
322
{
323
  //reduce load a little bit if there are losses on the path from this guy
324
  double average_lossrate = get_average_lossrate_pset(topology_get_neighbours());
325
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
326
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
327
    return;
328
  }
329
  send_ack(from, trans_id);        //send explicit ack
330
}
331

    
332
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
333
{
334
  int res;
335
  static struct chunk c;
336
  struct peer *p;
337
  static int bcast_cnt;
338
  uint16_t transid;
339

    
340
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
341
  if (res > 0) {
342
                if (chunk_loss_interval && c.id % chunk_loss_interval == 0) {
343
                        fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
344
                        free(c.data);
345
                        free(c.attributes);
346
                        return;
347
                }
348
    chunk_attributes_update_received(&c);
349
    chunk_unlock(c.id);
350
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr_tr(from));
351
    if(chunk_log) log_chunk(from,get_my_addr(),&c,"RECEIVED");
352
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
353
    output_deliver(&c);
354
    res = cb_add_chunk(cb, &c);
355
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
356
    cb_print();
357
    if (res < 0) {
358
      dprintf("\tchunk too old, buffer full with newer chunks\n");
359
      if(chunk_log) log_chunk_error(from,get_my_addr(),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
360
      free(c.data);
361
      free(c.attributes);
362
    }
363
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
364
    if (p) {        //now we have it almost sure
365
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
366
      gettimeofday(&p->bmap_timestamp, NULL);
367
    }
368
    ack_chunk(&c, from, transid);        //send explicit ack
369
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
370
       bcast_bmap();
371
    }
372
  } else {
373
    fprintf(stderr,"\tError: can't decode chunk!\n");
374
  }
375
}
376

    
377
struct chunk *generated_chunk(suseconds_t *delta)
378
{
379
  struct chunk *c;
380

    
381
  c = malloc(sizeof(struct chunk));
382
  if (!c) {
383
    fprintf(stderr, "Memory allocation error!\n");
384
    return NULL;
385
  }
386

    
387
  *delta = (suseconds_t)input_get(input, c);
388
  if (*delta < 0) {
389
    fprintf(stderr, "Error in input!\n");
390
    exit(-1);
391
  }
392
  if (c->data == NULL) {
393
    free(c);
394
    return NULL;
395
  }
396
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
397
  chunk_attributes_fill(c);
398
  return c;
399
}
400

    
401
int add_chunk(struct chunk *c)
402
{
403
  int res;
404

    
405
  res = cb_add_chunk(cb, c);
406
  if (res < 0) {
407
    free(c->data);
408
    free(c->attributes);
409
    free(c);
410
    return 0;
411
  }
412
 // free(c);
413
  return 1;
414
}
415

    
416
uint64_t get_chunk_timestamp(int cid){
417
  const struct chunk *c = cb_get_chunk(cb, cid);
418
  if (!c) return 0;
419

    
420
  return c->timestamp;
421
}
422

    
423
void print_chunkID_set(struct chunkID_set *cset)
424
{
425
        uint32_t * ptr = (uint32_t *) cset;
426
        uint32_t n_elements,i;
427
        int * data = (int *) &(ptr[3]);
428
        fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]);
429
        fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]);
430
        n_elements = ptr[2];
431
        fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
432
        fprintf(stderr,"[DEBUG] Chunk ID elements: [");
433
        for (i=0;i<n_elements;i++)
434
                fprintf(stderr,".%d.",data[i]);
435

    
436
        fprintf(stderr,"]\n");
437
}
438

    
439
/**
440
 *example function to filter chunks based on whether a given peer needs them.
441
 *
442
 * Looks at buffermap information received about the given peer.
443
 */
444
int needs(struct peer *n, int cid){
445
  struct peer * p = n;
446

    
447
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
448
    uint64_t ts;
449
    ts = get_chunk_timestamp(cid);
450
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
451
      return 0;
452
    }
453
  }
454

    
455
  //dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
456
  if (! p->bmap) { // this will never happen since the pset module initializes bmap
457
    //dprintf("no bmap\n");
458
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
459
  }
460
        
461
//        fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
462
//        print_chunkID_set(p->bmap);                                                                                                                                                                        // DEBUG
463

    
464
  return _needs(p->bmap, p->cb_size, cid);
465
}
466

    
467
/**
468
 * Function checking if chunkID_set cset may need chunk with id cid
469
 * @cset: target cset
470
 * @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
471
 *         the maximum capacity in of the receiving peer (so it's 0 for the source)
472
 * @cid: target chunk identifier
473
 */
474
int _needs(struct chunkID_set *cset, int cb_size, int cid){
475

    
476
  if (cb_size == 0) { //if it declared it does not needs chunks
477
    return 0;
478
  }
479

    
480
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
481
    uint64_t ts;
482
    ts = get_chunk_timestamp(cid);
483
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
484
      return 0;
485
    }
486
  }
487

    
488
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
489
    int missing, min;
490
    //@TODO: add some bmap_timestamp based logic
491

    
492
    if (chunkID_set_size(cset) == 0) {
493
      //dprintf("bmap empty\n");
494
      return 1;        // if the bmap seems empty, it needs the chunk
495
    }
496
    missing = cb_size - chunkID_set_size(cset);
497
    missing = missing < 0 ? 0 : missing;
498
    min = chunkID_set_get_earliest(cset);
499
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
500
    return (cid >= min - missing);
501
  }
502

    
503
  //dprintf("has it\n");
504
  return 0;
505
}
506

    
507
double peerWeightReceivedfrom(struct peer **n){
508
  struct peer * p = *n;
509
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
510
}
511

    
512
double peerWeightUniform(struct peer **n){
513
  return 1;
514
}
515

    
516
double peerWeightLoss(struct peer **n){
517
  return get_reception_rate_measure((*n)->id);
518
}
519

    
520
double peerWeightRtt(struct peer **n){
521
#ifdef MONL
522
  double rtt = get_rtt((*n)->id);
523
  //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
524
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
525
#else
526
  return 1;
527
#endif
528
}
529

    
530
//ordering function for ELp peer selection, chunk ID based
531
//can't be used as weight
532
double peerScoreELpID(struct nodeID **n){
533
  struct chunkID_set *bmap;
534
  int latest;
535
  struct peer * p = nodeid_to_peer(*n, 0);
536
  if (!p) return 0;
537

    
538
  bmap = p->bmap;
539
  if (!bmap) return 0;
540
  latest = chunkID_set_get_latest(bmap);
541
  if (latest == INT_MIN) return 0;
542

    
543
  return -latest;
544
}
545

    
546
double chunkScoreChunkID(int *cid){
547
  return (double) *cid;
548
}
549

    
550
uint64_t get_chunk_deadline(int cid){
551
  const struct chunk_attributes * ca;
552
  const struct chunk *c;
553

    
554
  c = cb_get_chunk(cb, cid);
555
  if (!c) return 0;
556

    
557
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
558
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
559
    return 0;
560
  }
561

    
562
  ca = (struct chunk_attributes *) c->attributes;
563
  return ca->deadline;
564
}
565

    
566
double chunkScoreDL(int *cid){
567
  return - (double)get_chunk_deadline(*cid);
568
}
569

    
570
double chunkScoreTimestamp(int *cid){
571
  return (double) get_chunk_timestamp(*cid);
572
}
573

    
574
void send_accepted_chunks(const struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
575
  int i, d, cset_acc_size, res;
576
  struct peer *to = nodeid_to_peer(toid, 0);
577

    
578
  transaction_reg_accept(trans_id, toid);
579

    
580
  cset_acc_size = chunkID_set_size(cset_acc);
581
  reg_offer_accept_out(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
582
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
583
    const struct chunk *c;
584
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
585
    c = cb_get_chunk(cb, chunkid);
586
    if (!c) {        // we should have the chunk
587
      dprintf("%s asked for chunk %d we do not own anymore\n", node_addr_tr(toid), chunkid);
588
      continue;
589
    }
590
    if (!to || needs(to, chunkid)) {        //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
591
      chunk_attributes_update_sending(c);
592
      res = sendChunk(toid, c, trans_id);
593
      if (res >= 0) {
594
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
595
        d++;
596
        reg_chunk_send(c->id);
597
              if(chunk_log) log_chunk(get_my_addr(),toid,c,"SENT_ACCEPTED");
598
        //{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
599
      } else {
600
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
601
      }
602
    }
603
  }
604
}
605

    
606
int offer_peer_count()
607
{
608
  return offer_per_tick;
609
}
610

    
611
int offer_max_deliver(struct nodeID *n)
612
{
613

    
614
        if (chunks_per_offer) return chunks_per_offer;
615
  if (!heuristics_distance_maxdeliver) return 1;
616

    
617
#ifdef MONL
618
  switch (get_hopcount(n)) {
619
    case 0: return 5;
620
    case 1: return 2;
621
    default: return 1;
622
  }
623
#else
624
  return 1;
625
#endif
626
}
627

    
628
//get the rtt. Currenly only MONL version is supported
629
static double get_rtt_of(struct nodeID* n){
630
#ifdef MONL
631
  return get_rtt(n);
632
#else
633
  return NAN;
634
#endif
635
}
636

    
637
#define DEFAULT_RTT_ESTIMATE 0.5
638

    
639
static struct chunkID_set *compose_offer_cset(struct peer *p)
640
{
641
  int num_chunks, j;
642
  uint64_t smallest_ts; //, largest_ts;
643
  double dt;
644
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
645
  struct chunk *chunks = cb_get_chunks(cb, &num_chunks);
646

    
647
  if (p) {
648
    dt = get_rtt_of(p->id);
649
  } else {
650
    dt = NAN;
651
  }
652
  if (isnan(dt)) dt = DEFAULT_RTT_ESTIMATE;
653
  dt *= 1e6;        //convert to usec
654

    
655
  smallest_ts = chunks[0].timestamp;
656
//  largest_ts = chunks[num_chunks-1].timestamp;
657

    
658
  //add chunks in latest...earliest order
659
  if (am_i_source()) {
660
    j = (num_chunks-1) * 3/4;        //do not send offers for the latest chunks from the source
661
  } else {
662
    j = num_chunks-1;
663
  }
664
  for(; j>=0; j--) {
665
    if (chunks[j].timestamp > smallest_ts + dt)
666
    chunkID_set_add_chunk(my_bmap, chunks[j].id);
667
  }
668

    
669
  return my_bmap;
670
}
671

    
672

    
673
void send_offer()
674
{
675
  struct chunk *buff;
676
  int size,  i, n;
677
  struct peer **neighbours;
678
  struct peerset *pset;
679

    
680
  pset = topology_get_neighbours();
681
  n = peerset_size(pset);
682
  neighbours = peerset_get_peers(pset);
683
  dprintf("Send Offer: %d neighbours\n", n);
684
  if (n == 0) return;
685
  buff = cb_get_chunks(cb, &size);
686
  if (size == 0) return;
687

    
688
  {
689
    size_t selectedpeers_len = offer_peer_count();
690
    int chunkids[size];
691
    struct peer *nodeids[n];
692
    struct peer *selectedpeers[selectedpeers_len];
693

    
694
    //reduce load a little bit if there are losses on the path from this guy
695
    double average_lossrate = get_average_lossrate_pset(pset);
696
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
697
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
698
      return;
699
    }
700

    
701
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
702
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
703
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
704

    
705
    for (i=0; i<selectedpeers_len ; i++){
706
      int transid = transaction_create(selectedpeers[i]->id);
707
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
708
      struct chunkID_set *offer_cset = compose_offer_cset(selectedpeers[i]);
709
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr_tr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
710
      offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
711
                        if (signal_log) log_signal(get_my_addr(),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
712
      chunkID_set_free(offer_cset);
713
    }
714
  }
715
}
716

    
717
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error)
718
{
719
        switch (error) {
720
                case E_CB_OLD:
721
                log_chunk(from,get_my_addr(),c,"TOO_OLD");
722
                break;
723
                case E_CB_DUPLICATE:
724
                log_chunk(from,get_my_addr(),c,"DUPLICATED");
725
                break;
726
                default:
727
                log_chunk(from,get_my_addr(),c,"ERROR");
728
        } 
729
}
730
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note)
731
{
732
        // semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
733
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
734
        node_addr(from,sndr,NODE_STR_LENGTH);
735
        node_addr(to,rcvr,NODE_STR_LENGTH);
736

    
737
        fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note);
738
}
739

    
740
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag)
741
{
742
        char typestr[24];
743
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
744
        node_addr(fromid,sndr,NODE_STR_LENGTH);
745
        node_addr(toid,rcvr,NODE_STR_LENGTH);
746

    
747
        switch (type)
748
        {
749
                case sig_offer:
750
                        sprintf(typestr,"%s","OFFER_SIG");
751
                        break;
752
                case sig_accept:
753
                        sprintf(typestr,"%s","ACCEPT_SIG");
754
                        break;
755
                case sig_request:
756
                        sprintf(typestr,"%s","REQUEST_SIG");
757
                        break;
758
                case sig_deliver:
759
                        sprintf(typestr,"%s","DELIVER_SIG");
760
                        break;
761
                case sig_send_buffermap:
762
                        sprintf(typestr,"%s","SEND_BMAP_SIG");
763
                        break;
764
                case sig_request_buffermap:
765
                        sprintf(typestr,"%s","REQUEST_BMAP_SIG");
766
                        break;
767
                case sig_ack:
768
                        sprintf(typestr,"%s","CHUNK_ACK_SIG");
769
                        break;
770
                default:
771
                        sprintf(typestr,"%s","UNKNOWN_SIG");
772

    
773
        }
774
        fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%d,%s,%s\n",sndr,rcvr,cb_size,trans_id,typestr,flag);
775
}
776

    
777
int peer_chunk_dispatch(const struct PeerChunk  *pairs,const size_t num_pairs)
778
{
779
        int i, transid, res,success = 0;
780
        const struct peer * target_peer;
781
        const struct chunk * target_chunk;
782

    
783
        for (i=0; i<num_pairs ; i++){
784
                target_peer = pairs[i].peer;
785
                target_chunk = cb_get_chunk(cb, pairs[i].chunk);
786

    
787
                if (send_bmap_before_push) {
788
                        send_bmap(target_peer->id);
789
                }
790
                chunk_attributes_update_sending(target_chunk);
791
                transid = transaction_create(target_peer->id);
792
                res = sendChunk(target_peer->id, target_chunk, transid);        //we use transactions in order to register acks for push
793
                if (res>=0) {
794
                        if(chunk_log) log_chunk(get_my_addr(),target_peer->id,target_chunk,"SENT");
795
//                        chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
796
                        reg_chunk_send(target_chunk->id);
797
                        success++;
798
                } else {
799
                        fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
800
                }
801
        }
802
        return success;
803

    
804
}
805

    
806
int inject_chunk(const struct chunk * target_chunk,const int multiplicity)
807
/*injects a specific chunk in the overlay and return the number of injected copies*/
808
{
809
        struct peerset *pset;
810
        struct peer ** peers, ** dst_peers;
811
        int peers_num, i;
812
        double (* peer_evaluation) (struct peer **n);
813
        size_t selectedpairs_len = multiplicity;
814
        struct PeerChunk  * selectedpairs;
815

    
816
        pset = topology_get_neighbours();
817
        peers_num = peerset_size(pset);
818
        peers = peerset_get_peers(pset);
819
        peer_evaluation = push_strategy ? peerWeightLoss : SCHED_PEER;
820

    
821
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
822
         dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
823
         selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
824

    
825
        selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
826
        for ( i=0; i<selectedpairs_len; i++)
827
        {
828
                selectedpairs[i].peer = dst_peers[i];
829
                selectedpairs[i].chunk = target_chunk->id;
830
        }
831

    
832
        peer_chunk_dispatch(selectedpairs,selectedpairs_len);
833

    
834
        free(selectedpairs);
835
        free(dst_peers);
836
        return selectedpairs_len;
837
}
838

    
839
void send_chunk()
840
{
841
  struct chunk *buff;
842
  int size, res, i, n;
843
  struct peer **neighbours;
844
  struct peerset *pset;
845

    
846
  pset = topology_get_neighbours();
847
  n = peerset_size(pset);
848
  neighbours = peerset_get_peers(pset);
849
  dprintf("Send Chunk: %d neighbours\n", n);
850
  if (n == 0) return;
851
  buff = cb_get_chunks(cb, &size);
852
  dprintf("\t %d chunks in buffer...\n", size);
853
  if (size == 0) return;
854

    
855
  /************ STUPID DUMB SCHEDULING ****************/
856
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
857
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
858
  /************ /STUPID DUMB SCHEDULING ****************/
859

    
860
  /************ USE SCHEDULER ****************/
861
  {
862
    size_t selectedpairs_len = 1;
863
    int chunkids[size];
864
                int transid;
865
    struct peer *nodeids[n];
866
    struct PeerChunk selectedpairs[1];
867
  
868
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
869
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
870
                if (push_strategy){
871
            SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peerWeightLoss, SCHED_CHUNK);
872
//                        fprintf(stderr,"[DEBUG] using push strategy.\n");
873
                }
874
                else
875
            SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
876
  /************ /USE SCHEDULER ****************/
877

    
878
    for (i=0; i<selectedpairs_len ; i++){
879
      struct peer *p = selectedpairs[i].peer;
880
      const struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
881
      dprintf("\t sending chunk[%d] to ", c->id);
882
      dprintf("%s\n", node_addr_tr(p->id));
883

    
884
      if (send_bmap_before_push) {
885
        send_bmap(p->id);
886
      }
887

    
888
      chunk_attributes_update_sending(c);
889
      transid = transaction_create(p->id);
890
      res = sendChunk(p->id, c, transid);        //we use transactions in order to register acks for push
891
//      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
892
      dprintf("\tResult: %d\n", res);
893
      if (res>=0) {
894
              if(chunk_log) log_chunk(get_my_addr(),p->id,c,"SENT");
895
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
896
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
897
        reg_chunk_send(c->id);
898
      } else {
899
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
900
      }
901
    }
902
  }
903
}