Statistics
| Branch: | Revision:

streamers / streaming.c @ 3730fda0

History | View | Annotate | Download (16.6 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27

    
28
#include "streaming.h"
29
#include "output.h"
30
#include "input.h"
31
#include "dbg.h"
32
#include "chunk_signaling.h"
33
#include "chunklock.h"
34
#include "topology.h"
35
#include "measures.h"
36
#include "scheduling.h"
37

    
38
#include "scheduler_la.h"
39

    
40
uint64_t CB_SIZE_TIME = 4*1e6;
41

    
42
static bool heuristics_distance_maxdeliver = false;
43
static int bcast_after_receive_every = 0;
44
static bool neigh_on_chunk_recv = false;
45

    
46
struct chunk_attributes {
47
  uint64_t deadline;
48
  uint16_t deadline_increment;
49
  uint16_t hopcount;
50
} __attribute__((packed));
51

    
52
extern bool chunk_log;
53

    
54
struct chunk_buffer *cb;
55
static struct input_desc *input;
56
static int cb_size;
57
static int transid=0;
58

    
59
static int offer_per_tick = 1;        //N_p parameter of POLITO
60

    
61
int _needs(struct chunkID_set *cset, int cb_size, int cid);
62

    
63
uint64_t gettimeofday_in_us(void)
64
{
65
  struct timeval what_time; //to store the epoch time
66

    
67
  gettimeofday(&what_time, NULL);
68
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
69
}
70

    
71
void cb_print()
72
{
73
#ifdef DEBUG
74
  struct chunk *chunks;
75
  int num_chunks, i, id;
76
  chunks = cb_get_chunks(cb, &num_chunks);
77

    
78
  dprintf("\tchbuf :");
79
  i = 0;
80
  if(num_chunks) {
81
    id = chunks[0].id;
82
    dprintf(" %d-> ",id);
83
    while (i < num_chunks) {
84
      if (id == chunks[i].id) {
85
        dprintf("%d",id % 10);
86
        i++;
87
      } else if (chunk_islocked(id)) {
88
        dprintf("*");
89
      } else {
90
        dprintf(".");
91
      }
92
      id++;
93
    }
94
  }
95
  dprintf("\n");
96
#endif
97
}
98

    
99
void stream_init(int size, struct nodeID *myID)
100
{
101
  static char conf[32];
102

    
103
  cb_size = size;
104

    
105
  sprintf(conf, "size=%d", cb_size);
106
  cb = cb_init(conf);
107
  chunkDeliveryInit(myID);
108
  chunkSignalingInit(myID);
109
  init_measures();
110
}
111

    
112
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
113
{
114
  int flags = 0;
115

    
116
  if (memcmp(fname, "udp:", 4) == 0) {
117
    fname += 4;
118
    flags = INPUT_UDP;
119
  }
120
  if (loop) {
121
    flags |= INPUT_LOOP;
122
  }
123
  input = input_open(fname, flags, fds, fds_size);
124
  if (input == NULL) {
125
    return -1;
126
  }
127

    
128
  stream_init(1, myID);
129
  return 0;
130
}
131

    
132
void chunk_attributes_fill(struct chunk* c)
133
{
134
  struct chunk_attributes * ca;
135

    
136
  assert(!c->attributes && c->attributes_size == 0);
137

    
138
  c->attributes_size = sizeof(struct chunk_attributes);
139
  c->attributes = ca = malloc(c->attributes_size);
140

    
141
  ca->deadline = c->timestamp;
142
  ca->deadline_increment = 2;
143
  ca->hopcount = 0;
144
}
145

    
146
int chunk_get_hopcount(struct chunk* c) {
147
  struct chunk_attributes * ca;
148

    
149
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
150
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
151
    return -1;
152
  }
153

    
154
  ca = (struct chunk_attributes *) c->attributes;
155
  return ca->hopcount;
156
}
157

    
158
void chunk_attributes_update_received(struct chunk* c)
159
{
160
  struct chunk_attributes * ca;
161

    
162
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
163
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
164
    return;
165
  }
166

    
167
  ca = (struct chunk_attributes *) c->attributes;
168
  ca->hopcount++;
169
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
170
}
171

    
172
void chunk_attributes_update_sending(const struct chunk* c)
173
{
174
  struct chunk_attributes * ca;
175

    
176
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
177
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
178
    return;
179
  }
180

    
181
  ca = (struct chunk_attributes *) c->attributes;
182
  ca->deadline += ca->deadline_increment;
183
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
184
}
185

    
186
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
187
{
188
  struct chunk *chunks;
189
  int num_chunks, i;
190
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
191
  chunks = cb_get_chunks(chbuf, &num_chunks);
192

    
193
  for(i=num_chunks-1; i>=0; i--) {
194
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
195
  }
196
  return my_bmap;
197
}
198

    
199
// a simple implementation that request everything that we miss ... up to max deliver
200
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
201
  struct chunkID_set *cset_acc, *my_bmap;
202
  int i, d, cset_off_size;
203
  //double lossrate;
204
  struct peer *from = nodeid_to_peer(fromid, 0);
205

    
206
  cset_acc = chunkID_set_init("size=0");
207

    
208
  //reduce load a little bit if there are losses on the path from this guy
209
  //lossrate = get_lossrate_receive(from->id);
210
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
211
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
212
    my_bmap = cb_to_bmap(cb);
213
    cset_off_size = chunkID_set_size(cset_off);
214
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
215
      int chunkid = chunkID_set_get_chunk(cset_off, i);
216
      //dprintf("\tdo I need c%d ? :",chunkid);
217
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
218
        chunkID_set_add_chunk(cset_acc, chunkid);
219
        chunk_lock(chunkid,from);
220
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
221
#ifdef MONL
222
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
223
#endif
224
        dprintf("\n");
225
        d++;
226
      }
227
    }
228
    chunkID_set_free(my_bmap);
229
  //} else {
230
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
231
  //}
232

    
233
  return cset_acc;
234
}
235

    
236
void send_bmap(struct nodeID *toid)
237
{
238
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
239
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
240
  chunkID_set_free(my_bmap);
241
}
242

    
243
void bcast_bmap()
244
{
245
  int i, n;
246
  struct peer *neighbours;
247
  struct peerset *pset;
248
  struct chunkID_set *my_bmap;
249

    
250
  pset = get_peers();
251
  n = peerset_size(pset);
252
  neighbours = peerset_get_peers(pset);
253

    
254
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
255
  for (i = 0; i<n; i++) {
256
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
257
  }
258
  chunkID_set_free(my_bmap);
259
}
260

    
261
double get_average_lossrate_pset(struct peerset *pset)
262
{
263
  int i, n;
264
  struct peer *neighbours;
265

    
266
  n = peerset_size(pset);
267
  neighbours = peerset_get_peers(pset);
268
  {
269
    struct nodeID *nodeids[n];
270
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
271
#ifdef MONL
272
    return get_average_lossrate(nodeids, n);
273
#else
274
    return 0;
275
#endif
276
  }
277
}
278

    
279
void ack_chunk(struct chunk *c, struct nodeID *from)
280
{
281
  //reduce load a little bit if there are losses on the path from this guy
282
  double average_lossrate = get_average_lossrate_pset(get_peers());
283
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
284
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
285
    return;
286
  }
287
  send_bmap(from);        //send explicit ack
288
}
289

    
290
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
291
{
292
  int res;
293
  static struct chunk c;
294
  struct peer *p;
295
  static int bcast_cnt;
296
  uint16_t transid;
297

    
298
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
299
  if (res > 0) {
300
    chunk_attributes_update_received(&c);
301
    chunk_unlock(c.id);
302
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
303
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
304
    output_deliver(&c);
305
    res = cb_add_chunk(cb, &c);
306
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
307
    cb_print();
308
    if (res < 0) {
309
      dprintf("\tchunk too old, buffer full with newer chunks\n");
310
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
311
      free(c.data);
312
      free(c.attributes);
313
    }
314
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
315
    if (p) {        //now we have it almost sure
316
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
317
    }
318
    ack_chunk(&c,from);        //send explicit ack
319
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
320
       bcast_bmap();
321
    }
322
  } else {
323
    fprintf(stderr,"\tError: can't decode chunk!\n");
324
  }
325
}
326

    
327
struct chunk *generated_chunk(suseconds_t *delta)
328
{
329
  struct chunk *c;
330

    
331
  c = malloc(sizeof(struct chunk));
332
  if (!c) {
333
    fprintf(stderr, "Memory allocation error!\n");
334
    return NULL;
335
  }
336

    
337
  *delta = input_get(input, c);
338
  if (*delta < 0) {
339
    fprintf(stderr, "Error in input!\n");
340
    exit(-1);
341
  }
342
  if (c->data == NULL) {
343
    free(c);
344
    return NULL;
345
  }
346
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
347
  chunk_attributes_fill(c);
348
  return c;
349
}
350

    
351
int add_chunk(struct chunk *c)
352
{
353
  int res;
354

    
355
  res = cb_add_chunk(cb, c);
356
  if (res < 0) {
357
    free(c->data);
358
    free(c->attributes);
359
    free(c);
360
    return 0;
361
  }
362
  free(c);
363
  return 1;
364
}
365

    
366
uint64_t get_chunk_timestamp(int cid){
367
  const struct chunk *c = cb_get_chunk(cb, cid);
368
  if (!c) return 0;
369

    
370
  return c->timestamp;
371
}
372

    
373
/**
374
 *example function to filter chunks based on whether a given peer needs them.
375
 *
376
 * Looks at buffermap information received about the given peer.
377
 */
378
int needs(struct peer *n, int cid){
379
  struct peer * p = n;
380

    
381
  if (get_chunk_timestamp(cid) < gettimeofday_in_us() - CB_SIZE_TIME) {
382
    return 0;
383
  }
384

    
385
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
386
  if (! p->bmap) {
387
    //dprintf("no bmap\n");
388
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
389
  }
390
  return _needs(p->bmap, p->cb_size, cid);
391
}
392

    
393
int _needs(struct chunkID_set *cset, int cb_size, int cid){
394
  if (cb_size == 0) { //if it declared it does not needs chunks
395
    return 0;
396
  }
397

    
398
  if (get_chunk_timestamp(cid) < gettimeofday_in_us() - CB_SIZE_TIME) {
399
    return 0;
400
  }
401

    
402
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
403
    int missing, min;
404
    //@TODO: add some bmap_timestamp based logic
405

    
406
    if (chunkID_set_size(cset) == 0) {
407
      //dprintf("bmap empty\n");
408
      return 1;        // if the bmap seems empty, it needs the chunk
409
    }
410
    missing = cb_size - chunkID_set_size(cset);
411
    missing = missing < 0 ? 0 : missing;
412
    min = chunkID_set_get_earliest(cset);
413
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
414
    return (cid >= min - missing);
415
  }
416

    
417
  //dprintf("has it\n");
418
  return 0;
419
}
420

    
421
double peerWeightReceivedfrom(struct peer **n){
422
  struct peer * p = *n;
423
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
424
}
425

    
426
double peerWeightUniform(struct peer **n){
427
  return 1;
428
}
429

    
430
double peerWeightRtt(struct peer **n){
431
#ifdef MONL
432
  double rtt = get_rtt(*n->id);
433
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
434
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
435
#else
436
  return 1;
437
#endif
438
}
439

    
440
//ordering function for ELp peer selection, chunk ID based
441
//can't be used as weight
442
double peerScoreELpID(struct nodeID **n){
443
  struct chunkID_set *bmap;
444
  int latest;
445
  struct peer * p = nodeid_to_peer(*n, 0);
446
  if (!p) return 0;
447

    
448
  bmap = p->bmap;
449
  if (!bmap) return 0;
450
  latest = chunkID_set_get_latest(bmap);
451
  if (latest == INT_MIN) return 0;
452

    
453
  return -latest;
454
}
455

    
456
double chunkScoreChunkID(int *cid){
457
  return (double) *cid;
458
}
459

    
460
double chunkScoreTimestamp(int *cid){
461
  return (double) get_chunk_timestamp(*cid);
462
}
463

    
464
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
465
  int i, d, cset_acc_size, res;
466
  struct peer *to = nodeid_to_peer(toid, 0);
467

    
468
  cset_acc_size = chunkID_set_size(cset_acc);
469
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
470
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
471
    const struct chunk *c;
472
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
473
    c = cb_get_chunk(cb, chunkid);
474
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
475
      chunk_attributes_update_sending(c);
476
      res = sendChunk(toid, c, trans_id);
477
      if (res >= 0) {
478
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
479
        d++;
480
        reg_chunk_send(c->id);
481
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
482
      } else {
483
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
484
      }
485
    }
486
  }
487
}
488

    
489
int offer_peer_count()
490
{
491
  return offer_per_tick;
492
}
493

    
494
int offer_max_deliver(struct nodeID *n)
495
{
496

    
497
  if (!heuristics_distance_maxdeliver) return 1;
498

    
499
#ifdef MONL
500
  switch (get_hopcount(n)) {
501
    case 0: return 5;
502
    case 1: return 2;
503
    default: return 1;
504
  }
505
#else
506
  return 1;
507
#endif
508
}
509

    
510
void send_offer()
511
{
512
  struct chunk *buff;
513
  int size, res, i, n;
514
  struct peer *neighbours;
515
  struct peerset *pset;
516

    
517
  pset = get_peers();
518
  n = peerset_size(pset);
519
  neighbours = peerset_get_peers(pset);
520
  dprintf("Send Offer: %d neighbours\n", n);
521
  if (n == 0) return;
522
  buff = cb_get_chunks(cb, &size);
523
  if (size == 0) return;
524

    
525
  {
526
    size_t selectedpeers_len = offer_peer_count();
527
    int chunkids[size];
528
    struct peer *nodeids[n];
529
    struct peer *selectedpeers[selectedpeers_len];
530

    
531
    //reduce load a little bit if there are losses on the path from this guy
532
    double average_lossrate = get_average_lossrate_pset(pset);
533
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
534
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
535
      return;
536
    }
537

    
538
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
539
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
540
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
541

    
542
    for (i=0; i<selectedpeers_len ; i++){
543
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
544
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
545
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
546
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
547
      chunkID_set_free(my_bmap);
548
    }
549
  }
550
}
551

    
552

    
553
void send_chunk()
554
{
555
  struct chunk *buff;
556
  int size, res, i, n;
557
  struct peer *neighbours;
558
  struct peerset *pset;
559

    
560
  pset = get_peers();
561
  n = peerset_size(pset);
562
  neighbours = peerset_get_peers(pset);
563
  dprintf("Send Chunk: %d neighbours\n", n);
564
  if (n == 0) return;
565
  buff = cb_get_chunks(cb, &size);
566
  dprintf("\t %d chunks in buffer...\n", size);
567
  if (size == 0) return;
568

    
569
  /************ STUPID DUMB SCHEDULING ****************/
570
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
571
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
572
  /************ /STUPID DUMB SCHEDULING ****************/
573

    
574
  /************ USE SCHEDULER ****************/
575
  {
576
    size_t selectedpairs_len = 1;
577
    int chunkids[size];
578
    struct peer *nodeids[n];
579
    struct PeerChunk selectedpairs[1];
580
  
581
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
582
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
583
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
584
  /************ /USE SCHEDULER ****************/
585

    
586
    for (i=0; i<selectedpairs_len ; i++){
587
      struct peer *p = selectedpairs[i].peer;
588
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
589
      dprintf("\t sending chunk[%d] to ", c->id);
590
      dprintf("%s\n", node_addr(p->id));
591

    
592
      send_bmap(p->id);
593

    
594
      chunk_attributes_update_sending(c);
595
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
596
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
597
      dprintf("\tResult: %d\n", res);
598
      if (res>=0) {
599
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
600
        reg_chunk_send(c->id);
601
      } else {
602
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
603
      }
604
    }
605
  }
606
}