Statistics
| Branch: | Revision:

streamers / streaming.c @ a1c01ccf

History | View | Annotate | Download (16.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14

    
15
#include <net_helper.h>
16
#include <chunk.h> 
17
#include <chunkbuffer.h> 
18
#include <trade_msg_la.h>
19
#include <trade_msg_ha.h>
20
#include <peerset.h>
21
#include <peer.h>
22
#include <chunkidset.h>
23
#include <limits.h>
24
#include <trade_sig_ha.h>
25

    
26
#include "streaming.h"
27
#include "output.h"
28
#include "input.h"
29
#include "dbg.h"
30
#include "chunk_signaling.h"
31
#include "chunklock.h"
32
#include "topology.h"
33
#include "measures.h"
34
#include "scheduling.h"
35

    
36
#include "scheduler_la.h"
37

    
38
static bool heuristics_distance_maxdeliver = false;
39
static int bcast_after_receive_every = 0;
40
static bool neigh_on_chunk_recv = false;
41

    
42
struct chunk_attributes {
43
  uint64_t deadline;
44
  uint16_t deadline_increment;
45
  uint16_t hopcount;
46
} __attribute__((packed));
47

    
48
extern bool chunk_log;
49

    
50
struct chunk_buffer *cb;
51
static struct input_desc *input;
52
static int cb_size;
53
static int transid=0;
54

    
55
static int offer_per_tick = 1;        //N_p parameter of POLITO
56

    
57
int _needs(struct chunkID_set *cset, int cb_size, int cid);
58

    
59
uint64_t gettimeofday_in_us(void)
60
{
61
  struct timeval what_time; //to store the epoch time
62

    
63
  gettimeofday(&what_time, NULL);
64
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
65
}
66

    
67
void cb_print()
68
{
69
#ifdef DEBUG
70
  struct chunk *chunks;
71
  int num_chunks, i, id;
72
  chunks = cb_get_chunks(cb, &num_chunks);
73

    
74
  dprintf("\tchbuf :");
75
  i = 0;
76
  if(num_chunks) {
77
    id = chunks[0].id;
78
    dprintf(" %d-> ",id);
79
    while (i < num_chunks) {
80
      if (id == chunks[i].id) {
81
        dprintf("%d",id % 10);
82
        i++;
83
      } else if (chunk_islocked(id)) {
84
        dprintf("*");
85
      } else {
86
        dprintf(".");
87
      }
88
      id++;
89
    }
90
  }
91
  dprintf("\n");
92
#endif
93
}
94

    
95
void stream_init(int size, struct nodeID *myID)
96
{
97
  static char conf[32];
98

    
99
  cb_size = size;
100

    
101
  sprintf(conf, "size=%d", cb_size);
102
  cb = cb_init(conf);
103
  chunkDeliveryInit(myID);
104
  chunkSignalingInit(myID);
105
  init_measures();
106
}
107

    
108
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
109
{
110
  input = input_open(fname, loop ? INPUT_LOOP : 0, fds, fds_size);
111
  if (input == NULL) {
112
    return -1;
113
  }
114

    
115
  stream_init(1, myID);
116
  return 0;
117
}
118

    
119
void chunk_attributes_fill(struct chunk* c)
120
{
121
  struct chunk_attributes * ca;
122

    
123
  assert(!c->attributes && c->attributes_size == 0);
124

    
125
  c->attributes_size = sizeof(struct chunk_attributes);
126
  c->attributes = ca = malloc(c->attributes_size);
127

    
128
  ca->deadline = c->timestamp;
129
  ca->deadline_increment = 2;
130
  ca->hopcount = 0;
131
}
132

    
133
int chunk_get_hopcount(struct chunk* c) {
134
  struct chunk_attributes * ca;
135

    
136
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
137
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
138
    return -1;
139
  }
140

    
141
  ca = (struct chunk_attributes *) c->attributes;
142
  return ca->hopcount;
143
}
144

    
145
void chunk_attributes_update_received(struct chunk* c)
146
{
147
  struct chunk_attributes * ca;
148

    
149
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
150
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
151
    return;
152
  }
153

    
154
  ca = (struct chunk_attributes *) c->attributes;
155
  ca->hopcount++;
156
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
157
}
158

    
159
void chunk_attributes_update_sending(struct chunk* c)
160
{
161
  struct chunk_attributes * ca;
162

    
163
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
164
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
165
    return;
166
  }
167

    
168
  ca = (struct chunk_attributes *) c->attributes;
169
  ca->deadline += ca->deadline_increment;
170
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
171
}
172

    
173
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
174
{
175
  struct chunk *chunks;
176
  int num_chunks, i;
177
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
178
  chunks = cb_get_chunks(chbuf, &num_chunks);
179

    
180
  for(i=num_chunks-1; i>=0; i--) {
181
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
182
  }
183
  return my_bmap;
184
}
185

    
186
// a simple implementation that request everything that we miss ... up to max deliver
187
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
188
  struct chunkID_set *cset_acc, *my_bmap;
189
  int i, d, cset_off_size;
190
  //double lossrate;
191
  struct peer *from = nodeid_to_peer(fromid, 0);
192

    
193
  cset_acc = chunkID_set_init("size=0");
194

    
195
  //reduce load a little bit if there are losses on the path from this guy
196
  //lossrate = get_lossrate_receive(from->id);
197
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
198
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
199
    my_bmap = cb_to_bmap(cb);
200
    cset_off_size = chunkID_set_size(cset_off);
201
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
202
      int chunkid = chunkID_set_get_chunk(cset_off, i);
203
      //dprintf("\tdo I need c%d ? :",chunkid);
204
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
205
        chunkID_set_add_chunk(cset_acc, chunkid);
206
        chunk_lock(chunkid,from);
207
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
208
#ifdef MONL
209
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
210
#endif
211
        dprintf("\n");
212
        d++;
213
      }
214
    }
215
    chunkID_set_free(my_bmap);
216
  //} else {
217
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
218
  //}
219

    
220
  return cset_acc;
221
}
222

    
223
void send_bmap(struct nodeID *toid)
224
{
225
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
226
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
227
  chunkID_set_free(my_bmap);
228
}
229

    
230
void bcast_bmap()
231
{
232
  int i, n;
233
  struct peer *neighbours;
234
  struct peerset *pset;
235
  struct chunkID_set *my_bmap;
236

    
237
  pset = get_peers();
238
  n = peerset_size(pset);
239
  neighbours = peerset_get_peers(pset);
240

    
241
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
242
  for (i = 0; i<n; i++) {
243
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
244
  }
245
  chunkID_set_free(my_bmap);
246
}
247

    
248
double get_average_lossrate_pset(struct peerset *pset)
249
{
250
  int i, n;
251
  struct peer *neighbours;
252

    
253
  n = peerset_size(pset);
254
  neighbours = peerset_get_peers(pset);
255
  {
256
    struct nodeID *nodeids[n];
257
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
258
#ifdef MONL
259
    return get_average_lossrate(nodeids, n);
260
#else
261
    return 0;
262
#endif
263
  }
264
}
265

    
266
void ack_chunk(struct chunk *c, struct nodeID *from)
267
{
268
  //reduce load a little bit if there are losses on the path from this guy
269
  double average_lossrate = get_average_lossrate_pset(get_peers());
270
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
271
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
272
    return;
273
  }
274
  send_bmap(from);        //send explicit ack
275
}
276

    
277
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
278
{
279
  int res;
280
  static struct chunk c;
281
  struct peer *p;
282
  static int bcast_cnt;
283
  int transid;
284

    
285
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
286
  if (res > 0) {
287
    chunk_attributes_update_received(&c);
288
    chunk_unlock(c.id);
289
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
290
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %lld hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
291
    output_deliver(&c);
292
    res = cb_add_chunk(cb, &c);
293
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
294
    cb_print();
295
    if (res < 0) {
296
      dprintf("\tchunk too old, buffer full with newer chunks\n");
297
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %lld\n", c.id, node_addr(from), gettimeofday_in_us());}
298
      free(c.data);
299
      free(c.attributes);
300
    }
301
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
302
    if (p) {        //now we have it almost sure
303
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
304
    }
305
    ack_chunk(&c,from);        //send explicit ack
306
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
307
       bcast_bmap();
308
    }
309
  } else {
310
    fprintf(stderr,"\tError: can't decode chunk!\n");
311
  }
312
}
313

    
314
struct chunk *generated_chunk(suseconds_t *delta)
315
{
316
  struct chunk *c;
317

    
318
  c = malloc(sizeof(struct chunk));
319
  if (!c) {
320
    fprintf(stderr, "Memory allocation error!\n");
321
    return NULL;
322
  }
323

    
324
  *delta = input_get(input, c);
325
  if (*delta < 0) {
326
    fprintf(stderr, "Error in input!\n");
327
    exit(-1);
328
  }
329
  if (c->data == NULL) {
330
    free(c);
331
    return NULL;
332
  }
333
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
334
  chunk_attributes_fill(c);
335
  return c;
336
}
337

    
338
int add_chunk(struct chunk *c)
339
{
340
  int res;
341

    
342
  res = cb_add_chunk(cb, c);
343
  if (res < 0) {
344
    free(c->data);
345
    free(c->attributes);
346
    free(c);
347
    return 0;
348
  }
349
  free(c);
350
  return 1;
351
}
352

    
353
/**
354
 *example function to filter chunks based on whether a given peer needs them.
355
 *
356
 * Looks at buffermap information received about the given peer.
357
 */
358
int needs(struct peer *n, int cid){
359
  struct peer * p = n;
360

    
361
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
362
  if (! p->bmap) {
363
    //dprintf("no bmap\n");
364
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
365
  }
366
  return _needs(p->bmap, p->cb_size, cid);
367
}
368

    
369
int _needs(struct chunkID_set *cset, int cb_size, int cid){
370
  if (cb_size == 0) { //if it declared it does not needs chunks
371
    return 0;
372
  }
373

    
374
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
375
    int missing, min;
376
    //@TODO: add some bmap_timestamp based logic
377

    
378
    if (chunkID_set_size(cset) == 0) {
379
      //dprintf("bmap empty\n");
380
      return 1;        // if the bmap seems empty, it needs the chunk
381
    }
382
    missing = cb_size - chunkID_set_size(cset);
383
    missing = missing < 0 ? 0 : missing;
384
    min = chunkID_set_get_earliest(cset);
385
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
386
    return (cid >= min - missing);
387
  }
388

    
389
  //dprintf("has it\n");
390
  return 0;
391
}
392

    
393
double peerWeightReceivedfrom(struct peer **n){
394
  struct peer * p = *n;
395
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
396
}
397

    
398
double peerWeightUniform(struct peer **n){
399
  return 1;
400
}
401

    
402
double peerWeightRtt(struct peer **n){
403
#ifdef MONL
404
  double rtt = get_rtt(*n->id);
405
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
406
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
407
#else
408
  return 1;
409
#endif
410
}
411

    
412
//ordering function for ELp peer selection, chunk ID based
413
//can't be used as weight
414
double peerScoreELpID(struct nodeID **n){
415
  struct chunkID_set *bmap;
416
  int latest;
417
  struct peer * p = nodeid_to_peer(*n, 0);
418
  if (!p) return 0;
419

    
420
  bmap = p->bmap;
421
  if (!bmap) return 0;
422
  latest = chunkID_set_get_latest(bmap);
423
  if (latest == INT_MIN) return 0;
424

    
425
  return -latest;
426
}
427

    
428
double chunkScoreChunkID(int *cid){
429
  return (double) *cid;
430
}
431

    
432
double getChunkTimestamp(int *cid){
433
  struct chunk *c = cb_get_chunk(cb, *cid);
434
  if (!c) return 0;
435

    
436
  return (double) c->timestamp;
437
}
438

    
439
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
440
  int i, d, cset_acc_size, res;
441
  struct peer *to = nodeid_to_peer(toid, 0);
442

    
443
  cset_acc_size = chunkID_set_size(cset_acc);
444
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
445
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
446
    struct chunk *c;
447
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
448
    c = cb_get_chunk(cb, chunkid);
449
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
450
      chunk_attributes_update_sending(c);
451
      res = sendChunk(toid, c, trans_id);
452
      if (res >= 0) {
453
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
454
        d++;
455
        reg_chunk_send(c->id);
456
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
457
      } else {
458
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
459
      }
460
    }
461
  }
462
}
463

    
464
int offer_peer_count()
465
{
466
  return offer_per_tick;
467
}
468

    
469
int offer_max_deliver(struct nodeID *n)
470
{
471

    
472
  if (!heuristics_distance_maxdeliver) return 1;
473

    
474
#ifdef MONL
475
  switch (get_hopcount(n)) {
476
    case 0: return 5;
477
    case 1: return 2;
478
    default: return 1;
479
  }
480
#else
481
  return 1;
482
#endif
483
}
484

    
485
void send_offer()
486
{
487
  struct chunk *buff;
488
  int size, res, i, n;
489
  struct peer *neighbours;
490
  struct peerset *pset;
491

    
492
  pset = get_peers();
493
  n = peerset_size(pset);
494
  neighbours = peerset_get_peers(pset);
495
  dprintf("Send Offer: %d neighbours\n", n);
496
  if (n == 0) return;
497
  buff = cb_get_chunks(cb, &size);
498
  if (size == 0) return;
499

    
500
  {
501
    size_t selectedpeers_len = offer_peer_count();
502
    int chunkids[size];
503
    struct peer *nodeids[n];
504
    struct peer *selectedpeers[selectedpeers_len];
505

    
506
    //reduce load a little bit if there are losses on the path from this guy
507
    double average_lossrate = get_average_lossrate_pset(pset);
508
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
509
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
510
      return;
511
    }
512

    
513
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
514
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
515
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
516

    
517
    for (i=0; i<selectedpeers_len ; i++){
518
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
519
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
520
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
521
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
522
      chunkID_set_free(my_bmap);
523
    }
524
  }
525
}
526

    
527

    
528
void send_chunk()
529
{
530
  struct chunk *buff;
531
  int size, res, i, n;
532
  struct peer *neighbours;
533
  struct peerset *pset;
534

    
535
  pset = get_peers();
536
  n = peerset_size(pset);
537
  neighbours = peerset_get_peers(pset);
538
  dprintf("Send Chunk: %d neighbours\n", n);
539
  if (n == 0) return;
540
  buff = cb_get_chunks(cb, &size);
541
  dprintf("\t %d chunks in buffer...\n", size);
542
  if (size == 0) return;
543

    
544
  /************ STUPID DUMB SCHEDULING ****************/
545
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
546
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
547
  /************ /STUPID DUMB SCHEDULING ****************/
548

    
549
  /************ USE SCHEDULER ****************/
550
  {
551
    size_t selectedpairs_len = 1;
552
    int chunkids[size];
553
    struct peer *nodeids[n];
554
    struct PeerChunk selectedpairs[1];
555
  
556
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
557
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
558
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
559
  /************ /USE SCHEDULER ****************/
560

    
561
    for (i=0; i<selectedpairs_len ; i++){
562
      struct peer *p = selectedpairs[i].peer;
563
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
564
      dprintf("\t sending chunk[%d] to ", c->id);
565
      dprintf("%s\n", node_addr(p->id));
566

    
567
      send_bmap(p->id);
568

    
569
      chunk_attributes_update_sending(c);
570
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
571
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
572
      dprintf("\tResult: %d\n", res);
573
      if (res>=0) {
574
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
575
        reg_chunk_send(c->id);
576
      } else {
577
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
578
      }
579
    }
580
  }
581
}