Statistics
| Branch: | Revision:

streamers / streaming.c @ 08f76adc

History | View | Annotate | Download (17.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27
#include <chunkiser_attrib.h>
28

    
29
#include "streaming.h"
30
#include "output.h"
31
#include "input.h"
32
#include "dbg.h"
33
#include "chunk_signaling.h"
34
#include "chunklock.h"
35
#include "topology.h"
36
#include "measures.h"
37
#include "scheduling.h"
38

    
39
#include "scheduler_la.h"
40

    
41
static bool heuristics_distance_maxdeliver = false;
42
static int bcast_after_receive_every = 0;
43
static bool neigh_on_chunk_recv = false;
44

    
45
struct chunk_attributes {
46
  uint64_t deadline;
47
  uint16_t deadline_increment;
48
  uint16_t hopcount;
49
} __attribute__((packed));
50

    
51
extern bool chunk_log;
52

    
53
struct chunk_buffer *cb;
54
static struct input_desc *input;
55
static int cb_size;
56
static int transid=0;
57

    
58
static int offer_per_tick = 1;        //N_p parameter of POLITO
59

    
60
int _needs(struct chunkID_set *cset, int cb_size, int cid);
61

    
62
uint64_t gettimeofday_in_us(void)
63
{
64
  struct timeval what_time; //to store the epoch time
65

    
66
  gettimeofday(&what_time, NULL);
67
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
68
}
69

    
70
void cb_print()
71
{
72
#ifdef DEBUG
73
  struct chunk *chunks;
74
  int num_chunks, i, id;
75
  chunks = cb_get_chunks(cb, &num_chunks);
76

    
77
  dprintf("\tchbuf :");
78
  i = 0;
79
  if(num_chunks) {
80
    id = chunks[0].id;
81
    dprintf(" %d-> ",id);
82
    while (i < num_chunks) {
83
      if (id == chunks[i].id) {
84
        dprintf("%d",id % 10);
85
        i++;
86
      } else if (chunk_islocked(id)) {
87
        dprintf("*");
88
      } else {
89
        dprintf(".");
90
      }
91
      id++;
92
    }
93
  }
94
  dprintf("\n");
95
#endif
96
}
97

    
98
void stream_init(int size, struct nodeID *myID)
99
{
100
  static char conf[32];
101

    
102
  cb_size = size;
103

    
104
  sprintf(conf, "size=%d", cb_size);
105
  cb = cb_init(conf);
106
  chunkDeliveryInit(myID);
107
  chunkSignalingInit(myID);
108
  init_measures();
109
}
110

    
111
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
112
{
113
  int flags = 0;
114

    
115
  if (memcmp(fname, "udp:", 4) == 0) {
116
    fname += 4;
117
    flags = INPUT_UDP;
118
  }
119
  if (loop) {
120
    flags |= INPUT_LOOP;
121
  }
122
  input = input_open(fname, flags, fds, fds_size);
123
  if (input == NULL) {
124
    return -1;
125
  }
126

    
127
  stream_init(1, myID);
128
  return 0;
129
}
130

    
131
void chunk_attributes_fill(struct chunk* c)
132
{
133
  struct chunk_attributes * ca;
134
  int priority = 1;
135

    
136
  assert((!c->attributes && c->attributes_size == 0) ||
137
         chunk_attributes_chunker_verify(c->attributes, c->attributes_size));
138

    
139
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
140
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
141
    free(c->attributes);
142
    c->attributes = NULL;
143
    c->attributes_size = 0;
144
  }
145

    
146
  c->attributes_size = sizeof(struct chunk_attributes);
147
  c->attributes = ca = malloc(c->attributes_size);
148

    
149
  ca->deadline = c->id;
150
  ca->deadline_increment = priority * 2;
151
  ca->hopcount = 0;
152
}
153

    
154
int chunk_get_hopcount(struct chunk* c) {
155
  struct chunk_attributes * ca;
156

    
157
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
158
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
159
    return -1;
160
  }
161

    
162
  ca = (struct chunk_attributes *) c->attributes;
163
  return ca->hopcount;
164
}
165

    
166
void chunk_attributes_update_received(struct chunk* c)
167
{
168
  struct chunk_attributes * ca;
169

    
170
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
171
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
172
    return;
173
  }
174

    
175
  ca = (struct chunk_attributes *) c->attributes;
176
  ca->hopcount++;
177
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
178
}
179

    
180
void chunk_attributes_update_sending(const struct chunk* c)
181
{
182
  struct chunk_attributes * ca;
183

    
184
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
185
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
186
    return;
187
  }
188

    
189
  ca = (struct chunk_attributes *) c->attributes;
190
  ca->deadline += ca->deadline_increment;
191
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
192
}
193

    
194
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
195
{
196
  struct chunk *chunks;
197
  int num_chunks, i;
198
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
199
  chunks = cb_get_chunks(chbuf, &num_chunks);
200

    
201
  for(i=num_chunks-1; i>=0; i--) {
202
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
203
  }
204
  return my_bmap;
205
}
206

    
207
// a simple implementation that request everything that we miss ... up to max deliver
208
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
209
  struct chunkID_set *cset_acc, *my_bmap;
210
  int i, d, cset_off_size;
211
  //double lossrate;
212
  struct peer *from = nodeid_to_peer(fromid, 0);
213

    
214
  cset_acc = chunkID_set_init("size=0");
215

    
216
  //reduce load a little bit if there are losses on the path from this guy
217
  //lossrate = get_lossrate_receive(from->id);
218
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
219
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
220
    my_bmap = cb_to_bmap(cb);
221
    cset_off_size = chunkID_set_size(cset_off);
222
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
223
      int chunkid = chunkID_set_get_chunk(cset_off, i);
224
      //dprintf("\tdo I need c%d ? :",chunkid);
225
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
226
        chunkID_set_add_chunk(cset_acc, chunkid);
227
        chunk_lock(chunkid,from);
228
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
229
#ifdef MONL
230
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
231
#endif
232
        dprintf("\n");
233
        d++;
234
      }
235
    }
236
    chunkID_set_free(my_bmap);
237
  //} else {
238
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
239
  //}
240

    
241
  return cset_acc;
242
}
243

    
244
void send_bmap(struct nodeID *toid)
245
{
246
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
247
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
248
  chunkID_set_free(my_bmap);
249
}
250

    
251
void bcast_bmap()
252
{
253
  int i, n;
254
  struct peer *neighbours;
255
  struct peerset *pset;
256
  struct chunkID_set *my_bmap;
257

    
258
  pset = get_peers();
259
  n = peerset_size(pset);
260
  neighbours = peerset_get_peers(pset);
261

    
262
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
263
  for (i = 0; i<n; i++) {
264
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
265
  }
266
  chunkID_set_free(my_bmap);
267
}
268

    
269
double get_average_lossrate_pset(struct peerset *pset)
270
{
271
  int i, n;
272
  struct peer *neighbours;
273

    
274
  n = peerset_size(pset);
275
  neighbours = peerset_get_peers(pset);
276
  {
277
    struct nodeID *nodeids[n];
278
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
279
#ifdef MONL
280
    return get_average_lossrate(nodeids, n);
281
#else
282
    return 0;
283
#endif
284
  }
285
}
286

    
287
void ack_chunk(struct chunk *c, struct nodeID *from)
288
{
289
  //reduce load a little bit if there are losses on the path from this guy
290
  double average_lossrate = get_average_lossrate_pset(get_peers());
291
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
292
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
293
    return;
294
  }
295
  send_bmap(from);        //send explicit ack
296
}
297

    
298
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
299
{
300
  int res;
301
  static struct chunk c;
302
  struct peer *p;
303
  static int bcast_cnt;
304
  uint16_t transid;
305

    
306
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
307
  if (res > 0) {
308
    chunk_attributes_update_received(&c);
309
    chunk_unlock(c.id);
310
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
311
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
312
    output_deliver(&c);
313
    res = cb_add_chunk(cb, &c);
314
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
315
    cb_print();
316
    if (res < 0) {
317
      dprintf("\tchunk too old, buffer full with newer chunks\n");
318
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
319
      free(c.data);
320
      free(c.attributes);
321
    }
322
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
323
    if (p) {        //now we have it almost sure
324
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
325
    }
326
    ack_chunk(&c,from);        //send explicit ack
327
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
328
       bcast_bmap();
329
    }
330
  } else {
331
    fprintf(stderr,"\tError: can't decode chunk!\n");
332
  }
333
}
334

    
335
struct chunk *generated_chunk(suseconds_t *delta)
336
{
337
  struct chunk *c;
338

    
339
  c = malloc(sizeof(struct chunk));
340
  if (!c) {
341
    fprintf(stderr, "Memory allocation error!\n");
342
    return NULL;
343
  }
344

    
345
  *delta = input_get(input, c);
346
  if (*delta < 0) {
347
    fprintf(stderr, "Error in input!\n");
348
    exit(-1);
349
  }
350
  if (c->data == NULL) {
351
    free(c);
352
    return NULL;
353
  }
354
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
355
  chunk_attributes_fill(c);
356
  return c;
357
}
358

    
359
int add_chunk(struct chunk *c)
360
{
361
  int res;
362

    
363
  res = cb_add_chunk(cb, c);
364
  if (res < 0) {
365
    free(c->data);
366
    free(c->attributes);
367
    free(c);
368
    return 0;
369
  }
370
  free(c);
371
  return 1;
372
}
373

    
374
/**
375
 *example function to filter chunks based on whether a given peer needs them.
376
 *
377
 * Looks at buffermap information received about the given peer.
378
 */
379
int needs(struct peer *n, int cid){
380
  struct peer * p = n;
381

    
382
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
383
  if (! p->bmap) {
384
    //dprintf("no bmap\n");
385
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
386
  }
387
  return _needs(p->bmap, p->cb_size, cid);
388
}
389

    
390
int _needs(struct chunkID_set *cset, int cb_size, int cid){
391
  if (cb_size == 0) { //if it declared it does not needs chunks
392
    return 0;
393
  }
394

    
395
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
396
    int missing, min;
397
    //@TODO: add some bmap_timestamp based logic
398

    
399
    if (chunkID_set_size(cset) == 0) {
400
      //dprintf("bmap empty\n");
401
      return 1;        // if the bmap seems empty, it needs the chunk
402
    }
403
    missing = cb_size - chunkID_set_size(cset);
404
    missing = missing < 0 ? 0 : missing;
405
    min = chunkID_set_get_earliest(cset);
406
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
407
    return (cid >= min - missing);
408
  }
409

    
410
  //dprintf("has it\n");
411
  return 0;
412
}
413

    
414
double peerWeightReceivedfrom(struct peer **n){
415
  struct peer * p = *n;
416
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
417
}
418

    
419
double peerWeightUniform(struct peer **n){
420
  return 1;
421
}
422

    
423
double peerWeightRtt(struct peer **n){
424
#ifdef MONL
425
  double rtt = get_rtt(*n->id);
426
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
427
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
428
#else
429
  return 1;
430
#endif
431
}
432

    
433
//ordering function for ELp peer selection, chunk ID based
434
//can't be used as weight
435
double peerScoreELpID(struct nodeID **n){
436
  struct chunkID_set *bmap;
437
  int latest;
438
  struct peer * p = nodeid_to_peer(*n, 0);
439
  if (!p) return 0;
440

    
441
  bmap = p->bmap;
442
  if (!bmap) return 0;
443
  latest = chunkID_set_get_latest(bmap);
444
  if (latest == INT_MIN) return 0;
445

    
446
  return -latest;
447
}
448

    
449
double chunkScoreChunkID(int *cid){
450
  return (double) *cid;
451
}
452

    
453
uint64_t get_chunk_deadline(int cid){
454
  const struct chunk_attributes * ca;
455
  const struct chunk *c;
456

    
457
  c = cb_get_chunk(cb, cid);
458
  if (!c) return 0;
459

    
460
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
461
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
462
    return 0;
463
  }
464

    
465
  ca = (struct chunk_attributes *) c->attributes;
466
  return ca->deadline;
467
}
468

    
469
double chunkScoreDL(int *cid){
470
  return - (double)get_chunk_deadline(*cid);
471
}
472

    
473
double getChunkTimestamp(int *cid){
474
  const struct chunk *c = cb_get_chunk(cb, *cid);
475
  if (!c) return 0;
476

    
477
  return (double) c->timestamp;
478
}
479

    
480
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
481
  int i, d, cset_acc_size, res;
482
  struct peer *to = nodeid_to_peer(toid, 0);
483

    
484
  cset_acc_size = chunkID_set_size(cset_acc);
485
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
486
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
487
    const struct chunk *c;
488
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
489
    c = cb_get_chunk(cb, chunkid);
490
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
491
      chunk_attributes_update_sending(c);
492
      res = sendChunk(toid, c, trans_id);
493
      if (res >= 0) {
494
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
495
        d++;
496
        reg_chunk_send(c->id);
497
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
498
      } else {
499
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
500
      }
501
    }
502
  }
503
}
504

    
505
int offer_peer_count()
506
{
507
  return offer_per_tick;
508
}
509

    
510
int offer_max_deliver(struct nodeID *n)
511
{
512

    
513
  if (!heuristics_distance_maxdeliver) return 1;
514

    
515
#ifdef MONL
516
  switch (get_hopcount(n)) {
517
    case 0: return 5;
518
    case 1: return 2;
519
    default: return 1;
520
  }
521
#else
522
  return 1;
523
#endif
524
}
525

    
526
void send_offer()
527
{
528
  struct chunk *buff;
529
  int size, res, i, n;
530
  struct peer *neighbours;
531
  struct peerset *pset;
532

    
533
  pset = get_peers();
534
  n = peerset_size(pset);
535
  neighbours = peerset_get_peers(pset);
536
  dprintf("Send Offer: %d neighbours\n", n);
537
  if (n == 0) return;
538
  buff = cb_get_chunks(cb, &size);
539
  if (size == 0) return;
540

    
541
  {
542
    size_t selectedpeers_len = offer_peer_count();
543
    int chunkids[size];
544
    struct peer *nodeids[n];
545
    struct peer *selectedpeers[selectedpeers_len];
546

    
547
    //reduce load a little bit if there are losses on the path from this guy
548
    double average_lossrate = get_average_lossrate_pset(pset);
549
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
550
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
551
      return;
552
    }
553

    
554
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
555
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
556
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
557

    
558
    for (i=0; i<selectedpeers_len ; i++){
559
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
560
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
561
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
562
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
563
      chunkID_set_free(my_bmap);
564
    }
565
  }
566
}
567

    
568

    
569
void send_chunk()
570
{
571
  struct chunk *buff;
572
  int size, res, i, n;
573
  struct peer *neighbours;
574
  struct peerset *pset;
575

    
576
  pset = get_peers();
577
  n = peerset_size(pset);
578
  neighbours = peerset_get_peers(pset);
579
  dprintf("Send Chunk: %d neighbours\n", n);
580
  if (n == 0) return;
581
  buff = cb_get_chunks(cb, &size);
582
  dprintf("\t %d chunks in buffer...\n", size);
583
  if (size == 0) return;
584

    
585
  /************ STUPID DUMB SCHEDULING ****************/
586
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
587
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
588
  /************ /STUPID DUMB SCHEDULING ****************/
589

    
590
  /************ USE SCHEDULER ****************/
591
  {
592
    size_t selectedpairs_len = 1;
593
    int chunkids[size];
594
    struct peer *nodeids[n];
595
    struct PeerChunk selectedpairs[1];
596
  
597
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
598
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
599
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
600
  /************ /USE SCHEDULER ****************/
601

    
602
    for (i=0; i<selectedpairs_len ; i++){
603
      struct peer *p = selectedpairs[i].peer;
604
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
605
      dprintf("\t sending chunk[%d] to ", c->id);
606
      dprintf("%s\n", node_addr(p->id));
607

    
608
      send_bmap(p->id);
609

    
610
      chunk_attributes_update_sending(c);
611
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
612
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
613
      dprintf("\tResult: %d\n", res);
614
      if (res>=0) {
615
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
616
        reg_chunk_send(c->id);
617
      } else {
618
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
619
      }
620
    }
621
  }
622
}