Statistics
| Branch: | Revision:

streamers / streaming.c @ 65626579

History | View | Annotate | Download (16.4 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15
#include <inttypes.h>
16

    
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22
#include <peerset.h>
23
#include <peer.h>
24
#include <chunkidset.h>
25
#include <limits.h>
26
#include <trade_sig_ha.h>
27

    
28
#include "streaming.h"
29
#include "output.h"
30
#include "input.h"
31
#include "dbg.h"
32
#include "chunk_signaling.h"
33
#include "chunklock.h"
34
#include "topology.h"
35
#include "measures.h"
36
#include "scheduling.h"
37

    
38
#include "scheduler_la.h"
39

    
40
static bool heuristics_distance_maxdeliver = false;
41
static int bcast_after_receive_every = 0;
42
static bool neigh_on_chunk_recv = false;
43

    
44
struct chunk_attributes {
45
  uint64_t deadline;
46
  uint16_t deadline_increment;
47
  uint16_t hopcount;
48
} __attribute__((packed));
49

    
50
extern bool chunk_log;
51

    
52
struct chunk_buffer *cb;
53
static struct input_desc *input;
54
static int cb_size;
55
static int transid=0;
56

    
57
static int offer_per_tick = 1;        //N_p parameter of POLITO
58

    
59
int _needs(struct chunkID_set *cset, int cb_size, int cid);
60

    
61
uint64_t gettimeofday_in_us(void)
62
{
63
  struct timeval what_time; //to store the epoch time
64

    
65
  gettimeofday(&what_time, NULL);
66
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
67
}
68

    
69
void cb_print()
70
{
71
#ifdef DEBUG
72
  struct chunk *chunks;
73
  int num_chunks, i, id;
74
  chunks = cb_get_chunks(cb, &num_chunks);
75

    
76
  dprintf("\tchbuf :");
77
  i = 0;
78
  if(num_chunks) {
79
    id = chunks[0].id;
80
    dprintf(" %d-> ",id);
81
    while (i < num_chunks) {
82
      if (id == chunks[i].id) {
83
        dprintf("%d",id % 10);
84
        i++;
85
      } else if (chunk_islocked(id)) {
86
        dprintf("*");
87
      } else {
88
        dprintf(".");
89
      }
90
      id++;
91
    }
92
  }
93
  dprintf("\n");
94
#endif
95
}
96

    
97
void stream_init(int size, struct nodeID *myID)
98
{
99
  static char conf[32];
100

    
101
  cb_size = size;
102

    
103
  sprintf(conf, "size=%d", cb_size);
104
  cb = cb_init(conf);
105
  chunkDeliveryInit(myID);
106
  chunkSignalingInit(myID);
107
  init_measures();
108
}
109

    
110
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
111
{
112
  int flags = 0;
113

    
114
  if (memcmp(fname, "udp:", 4) == 0) {
115
    fname += 4;
116
    flags = INPUT_UDP;
117
  }
118
  if (memcmp(fname, "ipb:", 4) == 0) {
119
    fname += 4;
120
    flags = INPUT_IPB;
121
  }
122
  if (loop) {
123
    flags |= INPUT_LOOP;
124
  }
125
  input = input_open(fname, flags, fds, fds_size);
126
  if (input == NULL) {
127
    return -1;
128
  }
129

    
130
  stream_init(1, myID);
131
  return 0;
132
}
133

    
134
void chunk_attributes_fill(struct chunk* c)
135
{
136
  struct chunk_attributes * ca;
137

    
138
  assert(!c->attributes && c->attributes_size == 0);
139

    
140
  c->attributes_size = sizeof(struct chunk_attributes);
141
  c->attributes = ca = malloc(c->attributes_size);
142

    
143
  ca->deadline = c->timestamp;
144
  ca->deadline_increment = 2;
145
  ca->hopcount = 0;
146
}
147

    
148
int chunk_get_hopcount(struct chunk* c) {
149
  struct chunk_attributes * ca;
150

    
151
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
152
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
153
    return -1;
154
  }
155

    
156
  ca = (struct chunk_attributes *) c->attributes;
157
  return ca->hopcount;
158
}
159

    
160
void chunk_attributes_update_received(struct chunk* c)
161
{
162
  struct chunk_attributes * ca;
163

    
164
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
165
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
166
    return;
167
  }
168

    
169
  ca = (struct chunk_attributes *) c->attributes;
170
  ca->hopcount++;
171
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
172
}
173

    
174
void chunk_attributes_update_sending(const struct chunk* c)
175
{
176
  struct chunk_attributes * ca;
177

    
178
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
179
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
180
    return;
181
  }
182

    
183
  ca = (struct chunk_attributes *) c->attributes;
184
  ca->deadline += ca->deadline_increment;
185
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
186
}
187

    
188
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
189
{
190
  struct chunk *chunks;
191
  int num_chunks, i;
192
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
193
  chunks = cb_get_chunks(chbuf, &num_chunks);
194

    
195
  for(i=num_chunks-1; i>=0; i--) {
196
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
197
  }
198
  return my_bmap;
199
}
200

    
201
// a simple implementation that request everything that we miss ... up to max deliver
202
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
203
  struct chunkID_set *cset_acc, *my_bmap;
204
  int i, d, cset_off_size;
205
  //double lossrate;
206
  struct peer *from = nodeid_to_peer(fromid, 0);
207

    
208
  cset_acc = chunkID_set_init("size=0");
209

    
210
  //reduce load a little bit if there are losses on the path from this guy
211
  //lossrate = get_lossrate_receive(from->id);
212
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
213
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
214
    my_bmap = cb_to_bmap(cb);
215
    cset_off_size = chunkID_set_size(cset_off);
216
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
217
      int chunkid = chunkID_set_get_chunk(cset_off, i);
218
      //dprintf("\tdo I need c%d ? :",chunkid);
219
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
220
        chunkID_set_add_chunk(cset_acc, chunkid);
221
        chunk_lock(chunkid,from);
222
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
223
#ifdef MONL
224
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
225
#endif
226
        dprintf("\n");
227
        d++;
228
      }
229
    }
230
    chunkID_set_free(my_bmap);
231
  //} else {
232
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
233
  //}
234

    
235
  return cset_acc;
236
}
237

    
238
void send_bmap(struct nodeID *toid)
239
{
240
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
241
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
242
  chunkID_set_free(my_bmap);
243
}
244

    
245
void bcast_bmap()
246
{
247
  int i, n;
248
  struct peer *neighbours;
249
  struct peerset *pset;
250
  struct chunkID_set *my_bmap;
251

    
252
  pset = get_peers();
253
  n = peerset_size(pset);
254
  neighbours = peerset_get_peers(pset);
255

    
256
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
257
  for (i = 0; i<n; i++) {
258
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
259
  }
260
  chunkID_set_free(my_bmap);
261
}
262

    
263
double get_average_lossrate_pset(struct peerset *pset)
264
{
265
  int i, n;
266
  struct peer *neighbours;
267

    
268
  n = peerset_size(pset);
269
  neighbours = peerset_get_peers(pset);
270
  {
271
    struct nodeID *nodeids[n];
272
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
273
#ifdef MONL
274
    return get_average_lossrate(nodeids, n);
275
#else
276
    return 0;
277
#endif
278
  }
279
}
280

    
281
void ack_chunk(struct chunk *c, struct nodeID *from)
282
{
283
  //reduce load a little bit if there are losses on the path from this guy
284
  double average_lossrate = get_average_lossrate_pset(get_peers());
285
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
286
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
287
    return;
288
  }
289
  send_bmap(from);        //send explicit ack
290
}
291

    
292
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
293
{
294
  int res;
295
  static struct chunk c;
296
  struct peer *p;
297
  static int bcast_cnt;
298
  uint16_t transid;
299

    
300
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
301
  if (res > 0) {
302
    chunk_attributes_update_received(&c);
303
    chunk_unlock(c.id);
304
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
305
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
306
    output_deliver(&c);
307
    res = cb_add_chunk(cb, &c);
308
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
309
    cb_print();
310
    if (res < 0) {
311
      dprintf("\tchunk too old, buffer full with newer chunks\n");
312
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
313
      free(c.data);
314
      free(c.attributes);
315
    }
316
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
317
    if (p) {        //now we have it almost sure
318
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
319
    }
320
    ack_chunk(&c,from);        //send explicit ack
321
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
322
       bcast_bmap();
323
    }
324
  } else {
325
    fprintf(stderr,"\tError: can't decode chunk!\n");
326
  }
327
}
328

    
329
struct chunk *generated_chunk(suseconds_t *delta)
330
{
331
  struct chunk *c;
332

    
333
  c = malloc(sizeof(struct chunk));
334
  if (!c) {
335
    fprintf(stderr, "Memory allocation error!\n");
336
    return NULL;
337
  }
338

    
339
  *delta = input_get(input, c);
340
  if (*delta < 0) {
341
    fprintf(stderr, "Error in input!\n");
342
    exit(-1);
343
  }
344
  if (c->data == NULL) {
345
    free(c);
346
    return NULL;
347
  }
348
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
349
  chunk_attributes_fill(c);
350
  return c;
351
}
352

    
353
int add_chunk(struct chunk *c)
354
{
355
  int res;
356

    
357
  res = cb_add_chunk(cb, c);
358
  if (res < 0) {
359
    free(c->data);
360
    free(c->attributes);
361
    free(c);
362
    return 0;
363
  }
364
  free(c);
365
  return 1;
366
}
367

    
368
/**
369
 *example function to filter chunks based on whether a given peer needs them.
370
 *
371
 * Looks at buffermap information received about the given peer.
372
 */
373
int needs(struct peer *n, int cid){
374
  struct peer * p = n;
375

    
376
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
377
  if (! p->bmap) {
378
    //dprintf("no bmap\n");
379
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
380
  }
381
  return _needs(p->bmap, p->cb_size, cid);
382
}
383

    
384
int _needs(struct chunkID_set *cset, int cb_size, int cid){
385
  if (cb_size == 0) { //if it declared it does not needs chunks
386
    return 0;
387
  }
388

    
389
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
390
    int missing, min;
391
    //@TODO: add some bmap_timestamp based logic
392

    
393
    if (chunkID_set_size(cset) == 0) {
394
      //dprintf("bmap empty\n");
395
      return 1;        // if the bmap seems empty, it needs the chunk
396
    }
397
    missing = cb_size - chunkID_set_size(cset);
398
    missing = missing < 0 ? 0 : missing;
399
    min = chunkID_set_get_earliest(cset);
400
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
401
    return (cid >= min - missing);
402
  }
403

    
404
  //dprintf("has it\n");
405
  return 0;
406
}
407

    
408
double peerWeightReceivedfrom(struct peer **n){
409
  struct peer * p = *n;
410
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
411
}
412

    
413
double peerWeightUniform(struct peer **n){
414
  return 1;
415
}
416

    
417
double peerWeightRtt(struct peer **n){
418
#ifdef MONL
419
  double rtt = get_rtt(*n->id);
420
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
421
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
422
#else
423
  return 1;
424
#endif
425
}
426

    
427
//ordering function for ELp peer selection, chunk ID based
428
//can't be used as weight
429
double peerScoreELpID(struct nodeID **n){
430
  struct chunkID_set *bmap;
431
  int latest;
432
  struct peer * p = nodeid_to_peer(*n, 0);
433
  if (!p) return 0;
434

    
435
  bmap = p->bmap;
436
  if (!bmap) return 0;
437
  latest = chunkID_set_get_latest(bmap);
438
  if (latest == INT_MIN) return 0;
439

    
440
  return -latest;
441
}
442

    
443
double chunkScoreChunkID(int *cid){
444
  return (double) *cid;
445
}
446

    
447
double getChunkTimestamp(int *cid){
448
  const struct chunk *c = cb_get_chunk(cb, *cid);
449
  if (!c) return 0;
450

    
451
  return (double) c->timestamp;
452
}
453

    
454
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
455
  int i, d, cset_acc_size, res;
456
  struct peer *to = nodeid_to_peer(toid, 0);
457

    
458
  cset_acc_size = chunkID_set_size(cset_acc);
459
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
460
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
461
    const struct chunk *c;
462
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
463
    c = cb_get_chunk(cb, chunkid);
464
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
465
      chunk_attributes_update_sending(c);
466
      res = sendChunk(toid, c, trans_id);
467
      if (res >= 0) {
468
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
469
        d++;
470
        reg_chunk_send(c->id);
471
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
472
      } else {
473
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
474
      }
475
    }
476
  }
477
}
478

    
479
int offer_peer_count()
480
{
481
  return offer_per_tick;
482
}
483

    
484
int offer_max_deliver(struct nodeID *n)
485
{
486

    
487
  if (!heuristics_distance_maxdeliver) return 1;
488

    
489
#ifdef MONL
490
  switch (get_hopcount(n)) {
491
    case 0: return 5;
492
    case 1: return 2;
493
    default: return 1;
494
  }
495
#else
496
  return 1;
497
#endif
498
}
499

    
500
void send_offer()
501
{
502
  struct chunk *buff;
503
  int size, res, i, n;
504
  struct peer *neighbours;
505
  struct peerset *pset;
506

    
507
  pset = get_peers();
508
  n = peerset_size(pset);
509
  neighbours = peerset_get_peers(pset);
510
  dprintf("Send Offer: %d neighbours\n", n);
511
  if (n == 0) return;
512
  buff = cb_get_chunks(cb, &size);
513
  if (size == 0) return;
514

    
515
  {
516
    size_t selectedpeers_len = offer_peer_count();
517
    int chunkids[size];
518
    struct peer *nodeids[n];
519
    struct peer *selectedpeers[selectedpeers_len];
520

    
521
    //reduce load a little bit if there are losses on the path from this guy
522
    double average_lossrate = get_average_lossrate_pset(pset);
523
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
524
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
525
      return;
526
    }
527

    
528
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
529
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
530
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
531

    
532
    for (i=0; i<selectedpeers_len ; i++){
533
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
534
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
535
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
536
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
537
      chunkID_set_free(my_bmap);
538
    }
539
  }
540
}
541

    
542

    
543
void send_chunk()
544
{
545
  struct chunk *buff;
546
  int size, res, i, n;
547
  struct peer *neighbours;
548
  struct peerset *pset;
549

    
550
  pset = get_peers();
551
  n = peerset_size(pset);
552
  neighbours = peerset_get_peers(pset);
553
  dprintf("Send Chunk: %d neighbours\n", n);
554
  if (n == 0) return;
555
  buff = cb_get_chunks(cb, &size);
556
  dprintf("\t %d chunks in buffer...\n", size);
557
  if (size == 0) return;
558

    
559
  /************ STUPID DUMB SCHEDULING ****************/
560
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
561
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
562
  /************ /STUPID DUMB SCHEDULING ****************/
563

    
564
  /************ USE SCHEDULER ****************/
565
  {
566
    size_t selectedpairs_len = 1;
567
    int chunkids[size];
568
    struct peer *nodeids[n];
569
    struct PeerChunk selectedpairs[1];
570
  
571
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
572
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
573
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
574
  /************ /USE SCHEDULER ****************/
575

    
576
    for (i=0; i<selectedpairs_len ; i++){
577
      struct peer *p = selectedpairs[i].peer;
578
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
579
      dprintf("\t sending chunk[%d] to ", c->id);
580
      dprintf("%s\n", node_addr(p->id));
581

    
582
      send_bmap(p->id);
583

    
584
      chunk_attributes_update_sending(c);
585
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
586
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
587
      dprintf("\tResult: %d\n", res);
588
      if (res>=0) {
589
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
590
        reg_chunk_send(c->id);
591
      } else {
592
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
593
      }
594
    }
595
  }
596
}