Statistics
| Branch: | Revision:

streamers / streaming.c @ 14893aa5

History | View | Annotate | Download (16.2 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <chunk.h> 
18
#include <chunkbuffer.h> 
19
#include <trade_msg_la.h>
20
#include <trade_msg_ha.h>
21
#include <peerset.h>
22
#include <peer.h>
23
#include <chunkidset.h>
24
#include <limits.h>
25
#include <trade_sig_ha.h>
26

    
27
#include "streaming.h"
28
#include "output.h"
29
#include "input.h"
30
#include "dbg.h"
31
#include "chunk_signaling.h"
32
#include "chunklock.h"
33
#include "topology.h"
34
#include "measures.h"
35
#include "scheduling.h"
36

    
37
#include "scheduler_la.h"
38

    
39
static bool heuristics_distance_maxdeliver = false;
40
static int bcast_after_receive_every = 0;
41
static bool neigh_on_chunk_recv = false;
42

    
43
struct chunk_attributes {
44
  uint64_t deadline;
45
  uint16_t deadline_increment;
46
  uint16_t hopcount;
47
} __attribute__((packed));
48

    
49
extern bool chunk_log;
50

    
51
struct chunk_buffer *cb;
52
static struct input_desc *input;
53
static int cb_size;
54
static int transid=0;
55

    
56
static int offer_per_tick = 1;        //N_p parameter of POLITO
57

    
58
int _needs(struct chunkID_set *cset, int cb_size, int cid);
59

    
60
uint64_t gettimeofday_in_us(void)
61
{
62
  struct timeval what_time; //to store the epoch time
63

    
64
  gettimeofday(&what_time, NULL);
65
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
66
}
67

    
68
void cb_print()
69
{
70
#ifdef DEBUG
71
  struct chunk *chunks;
72
  int num_chunks, i, id;
73
  chunks = cb_get_chunks(cb, &num_chunks);
74

    
75
  dprintf("\tchbuf :");
76
  i = 0;
77
  if(num_chunks) {
78
    id = chunks[0].id;
79
    dprintf(" %d-> ",id);
80
    while (i < num_chunks) {
81
      if (id == chunks[i].id) {
82
        dprintf("%d",id % 10);
83
        i++;
84
      } else if (chunk_islocked(id)) {
85
        dprintf("*");
86
      } else {
87
        dprintf(".");
88
      }
89
      id++;
90
    }
91
  }
92
  dprintf("\n");
93
#endif
94
}
95

    
96
void stream_init(int size, struct nodeID *myID)
97
{
98
  static char conf[32];
99

    
100
  cb_size = size;
101

    
102
  sprintf(conf, "size=%d", cb_size);
103
  cb = cb_init(conf);
104
  chunkDeliveryInit(myID);
105
  chunkSignalingInit(myID);
106
  init_measures();
107
}
108

    
109
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size, int buff_size)
110
{
111
  int flags = 0;
112

    
113
  if (memcmp(fname, "udp:", 4) == 0) {
114
    fname += 4;
115
    flags = INPUT_UDP;
116
  }
117
  if (loop) {
118
    flags |= INPUT_LOOP;
119
  }
120
  input = input_open(fname, flags, fds, fds_size);
121
  if (input == NULL) {
122
    return -1;
123
  }
124

    
125
  stream_init(buff_size, myID);
126
  return 0;
127
}
128

    
129
void chunk_attributes_fill(struct chunk* c)
130
{
131
  struct chunk_attributes * ca;
132

    
133
  assert(!c->attributes && c->attributes_size == 0);
134

    
135
  c->attributes_size = sizeof(struct chunk_attributes);
136
  c->attributes = ca = malloc(c->attributes_size);
137

    
138
  ca->deadline = c->timestamp;
139
  ca->deadline_increment = 2;
140
  ca->hopcount = 0;
141
}
142

    
143
int chunk_get_hopcount(struct chunk* c) {
144
  struct chunk_attributes * ca;
145

    
146
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
147
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
148
    return -1;
149
  }
150

    
151
  ca = (struct chunk_attributes *) c->attributes;
152
  return ca->hopcount;
153
}
154

    
155
void chunk_attributes_update_received(struct chunk* c)
156
{
157
  struct chunk_attributes * ca;
158

    
159
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
160
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
161
    return;
162
  }
163

    
164
  ca = (struct chunk_attributes *) c->attributes;
165
  ca->hopcount++;
166
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
167
}
168

    
169
void chunk_attributes_update_sending(struct chunk* c)
170
{
171
  struct chunk_attributes * ca;
172

    
173
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
174
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
175
    return;
176
  }
177

    
178
  ca = (struct chunk_attributes *) c->attributes;
179
  ca->deadline += ca->deadline_increment;
180
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
181
}
182

    
183
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
184
{
185
  struct chunk *chunks;
186
  int num_chunks, i;
187
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
188
  chunks = cb_get_chunks(chbuf, &num_chunks);
189

    
190
  for(i=num_chunks-1; i>=0; i--) {
191
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
192
  }
193
  return my_bmap;
194
}
195

    
196
// a simple implementation that request everything that we miss ... up to max deliver
197
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
198
  struct chunkID_set *cset_acc, *my_bmap;
199
  int i, d, cset_off_size;
200
  //double lossrate;
201
  struct peer *from = nodeid_to_peer(fromid, 0);
202

    
203
  cset_acc = chunkID_set_init("size=0");
204

    
205
  //reduce load a little bit if there are losses on the path from this guy
206
  //lossrate = get_lossrate_receive(from->id);
207
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
208
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
209
    my_bmap = cb_to_bmap(cb);
210
    cset_off_size = chunkID_set_size(cset_off);
211
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
212
      int chunkid = chunkID_set_get_chunk(cset_off, i);
213
      //dprintf("\tdo I need c%d ? :",chunkid);
214
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
215
        chunkID_set_add_chunk(cset_acc, chunkid);
216
        chunk_lock(chunkid,from);
217
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
218
#ifdef MONL
219
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
220
#endif
221
        dprintf("\n");
222
        d++;
223
      }
224
    }
225
    chunkID_set_free(my_bmap);
226
  //} else {
227
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
228
  //}
229

    
230
  return cset_acc;
231
}
232

    
233
void send_bmap(struct nodeID *toid)
234
{
235
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
236
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
237
  chunkID_set_free(my_bmap);
238
}
239

    
240
void bcast_bmap()
241
{
242
  int i, n;
243
  struct peer *neighbours;
244
  struct peerset *pset;
245
  struct chunkID_set *my_bmap;
246

    
247
  pset = get_peers();
248
  n = peerset_size(pset);
249
  neighbours = peerset_get_peers(pset);
250

    
251
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
252
  for (i = 0; i<n; i++) {
253
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
254
  }
255
  chunkID_set_free(my_bmap);
256
}
257

    
258
double get_average_lossrate_pset(struct peerset *pset)
259
{
260
  int i, n;
261
  struct peer *neighbours;
262

    
263
  n = peerset_size(pset);
264
  neighbours = peerset_get_peers(pset);
265
  {
266
    struct nodeID *nodeids[n];
267
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
268
#ifdef MONL
269
    return get_average_lossrate(nodeids, n);
270
#else
271
    return 0;
272
#endif
273
  }
274
}
275

    
276
void ack_chunk(struct chunk *c, struct nodeID *from)
277
{
278
  //reduce load a little bit if there are losses on the path from this guy
279
  double average_lossrate = get_average_lossrate_pset(get_peers());
280
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
281
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
282
    return;
283
  }
284
  send_bmap(from);        //send explicit ack
285
}
286

    
287
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
288
{
289
  int res;
290
  static struct chunk c;
291
  struct peer *p;
292
  static int bcast_cnt;
293
  int transid;
294

    
295
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
296
  if (res > 0) {
297
    chunk_attributes_update_received(&c);
298
    chunk_unlock(c.id);
299
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
300
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %lld hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
301
    output_deliver(&c);
302
    res = cb_add_chunk(cb, &c);
303
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
304
    cb_print();
305
    if (res < 0) {
306
      dprintf("\tchunk too old, buffer full with newer chunks\n");
307
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %lld\n", c.id, node_addr(from), gettimeofday_in_us());}
308
      free(c.data);
309
      free(c.attributes);
310
    }
311
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
312
    if (p) {        //now we have it almost sure
313
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
314
    }
315
    ack_chunk(&c,from);        //send explicit ack
316
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
317
       bcast_bmap();
318
    }
319
  } else {
320
    fprintf(stderr,"\tError: can't decode chunk!\n");
321
  }
322
}
323

    
324
struct chunk *generated_chunk(suseconds_t *delta)
325
{
326
  struct chunk *c;
327

    
328
  c = malloc(sizeof(struct chunk));
329
  if (!c) {
330
    fprintf(stderr, "Memory allocation error!\n");
331
    return NULL;
332
  }
333

    
334
  *delta = input_get(input, c);
335
  if (*delta < 0) {
336
    fprintf(stderr, "Error in input!\n");
337
    exit(-1);
338
  }
339
  if (c->data == NULL) {
340
    free(c);
341
    return NULL;
342
  }
343
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
344
  chunk_attributes_fill(c);
345
  return c;
346
}
347

    
348
int add_chunk(struct chunk *c)
349
{
350
  int res;
351

    
352
  res = cb_add_chunk(cb, c);
353
  if (res < 0) {
354
    free(c->data);
355
    free(c->attributes);
356
    free(c);
357
    return 0;
358
  }
359
  free(c);
360
  return 1;
361
}
362

    
363
/**
364
 *example function to filter chunks based on whether a given peer needs them.
365
 *
366
 * Looks at buffermap information received about the given peer.
367
 */
368
int needs(struct peer *n, int cid){
369
  struct peer * p = n;
370

    
371
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
372
  if (! p->bmap) {
373
    //dprintf("no bmap\n");
374
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
375
  }
376
  return _needs(p->bmap, p->cb_size, cid);
377
}
378

    
379
int _needs(struct chunkID_set *cset, int cb_size, int cid){
380
  if (cb_size == 0) { //if it declared it does not needs chunks
381
    return 0;
382
  }
383

    
384
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
385
    int missing, min;
386
    //@TODO: add some bmap_timestamp based logic
387

    
388
    if (chunkID_set_size(cset) == 0) {
389
      //dprintf("bmap empty\n");
390
      return 1;        // if the bmap seems empty, it needs the chunk
391
    }
392
    missing = cb_size - chunkID_set_size(cset);
393
    missing = missing < 0 ? 0 : missing;
394
    min = chunkID_set_get_earliest(cset);
395
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
396
    return (cid >= min - missing);
397
  }
398

    
399
  //dprintf("has it\n");
400
  return 0;
401
}
402

    
403
double peerWeightReceivedfrom(struct peer **n){
404
  struct peer * p = *n;
405
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
406
}
407

    
408
double peerWeightUniform(struct peer **n){
409
  return 1;
410
}
411

    
412
double peerWeightRtt(struct peer **n){
413
#ifdef MONL
414
  double rtt = get_rtt(*n->id);
415
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
416
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
417
#else
418
  return 1;
419
#endif
420
}
421

    
422
//ordering function for ELp peer selection, chunk ID based
423
//can't be used as weight
424
double peerScoreELpID(struct nodeID **n){
425
  struct chunkID_set *bmap;
426
  int latest;
427
  struct peer * p = nodeid_to_peer(*n, 0);
428
  if (!p) return 0;
429

    
430
  bmap = p->bmap;
431
  if (!bmap) return 0;
432
  latest = chunkID_set_get_latest(bmap);
433
  if (latest == INT_MIN) return 0;
434

    
435
  return -latest;
436
}
437

    
438
double chunkScoreChunkID(int *cid){
439
  return (double) *cid;
440
}
441

    
442
double getChunkTimestamp(int *cid){
443
  struct chunk *c = cb_get_chunk(cb, *cid);
444
  if (!c) return 0;
445

    
446
  return (double) c->timestamp;
447
}
448

    
449
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
450
  int i, d, cset_acc_size, res;
451
  struct peer *to = nodeid_to_peer(toid, 0);
452

    
453
  cset_acc_size = chunkID_set_size(cset_acc);
454
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
455
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
456
    struct chunk *c;
457
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
458
    c = cb_get_chunk(cb, chunkid);
459
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
460
      chunk_attributes_update_sending(c);
461
      res = sendChunk(toid, c, trans_id);
462
      if (res >= 0) {
463
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
464
        d++;
465
        reg_chunk_send(c->id);
466
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
467
      } else {
468
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
469
      }
470
    }
471
  }
472
}
473

    
474
int offer_peer_count()
475
{
476
  return offer_per_tick;
477
}
478

    
479
int offer_max_deliver(struct nodeID *n)
480
{
481

    
482
  if (!heuristics_distance_maxdeliver) return 1;
483

    
484
#ifdef MONL
485
  switch (get_hopcount(n)) {
486
    case 0: return 5;
487
    case 1: return 2;
488
    default: return 1;
489
  }
490
#else
491
  return 1;
492
#endif
493
}
494

    
495
void send_offer()
496
{
497
  struct chunk *buff;
498
  int size, res, i, n;
499
  struct peer *neighbours;
500
  struct peerset *pset;
501

    
502
  pset = get_peers();
503
  n = peerset_size(pset);
504
  neighbours = peerset_get_peers(pset);
505
  dprintf("Send Offer: %d neighbours\n", n);
506
  if (n == 0) return;
507
  buff = cb_get_chunks(cb, &size);
508
  if (size == 0) return;
509

    
510
  {
511
    size_t selectedpeers_len = offer_peer_count();
512
    int chunkids[size];
513
    struct peer *nodeids[n];
514
    struct peer *selectedpeers[selectedpeers_len];
515

    
516
    //reduce load a little bit if there are losses on the path from this guy
517
    double average_lossrate = get_average_lossrate_pset(pset);
518
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
519
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
520
      return;
521
    }
522

    
523
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
524
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
525
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
526

    
527
    for (i=0; i<selectedpeers_len ; i++){
528
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
529
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
530
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
531
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
532
      chunkID_set_free(my_bmap);
533
    }
534
  }
535
}
536

    
537

    
538
void send_chunk()
539
{
540
  struct chunk *buff;
541
  int size, res, i, n;
542
  struct peer *neighbours;
543
  struct peerset *pset;
544

    
545
  pset = get_peers();
546
  n = peerset_size(pset);
547
  neighbours = peerset_get_peers(pset);
548
  dprintf("Send Chunk: %d neighbours\n", n);
549
  if (n == 0) return;
550
  buff = cb_get_chunks(cb, &size);
551
  dprintf("\t %d chunks in buffer...\n", size);
552
  if (size == 0) return;
553

    
554
  /************ STUPID DUMB SCHEDULING ****************/
555
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
556
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
557
  /************ /STUPID DUMB SCHEDULING ****************/
558

    
559
  /************ USE SCHEDULER ****************/
560
  {
561
    size_t selectedpairs_len = 1;
562
    int chunkids[size];
563
    struct peer *nodeids[n];
564
    struct PeerChunk selectedpairs[1];
565
  
566
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
567
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
568
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
569
  /************ /USE SCHEDULER ****************/
570

    
571
    for (i=0; i<selectedpairs_len ; i++){
572
      struct peer *p = selectedpairs[i].peer;
573
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
574
      dprintf("\t sending chunk[%d] to ", c->id);
575
      dprintf("%s\n", node_addr(p->id));
576

    
577
      send_bmap(p->id);
578

    
579
      chunk_attributes_update_sending(c);
580
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
581
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
582
      dprintf("\tResult: %d\n", res);
583
      if (res>=0) {
584
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
585
        reg_chunk_send(c->id);
586
      } else {
587
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
588
      }
589
    }
590
  }
591
}