Statistics
| Branch: | Revision:

streamers / streaming.c @ 14e5c21e

History | View | Annotate | Download (15.4 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14

    
15
#include <net_helper.h>
16
#include <chunk.h> 
17
#include <chunkbuffer.h> 
18
#include <trade_msg_la.h>
19
#include <trade_msg_ha.h>
20
#include <peerset.h>
21
#include <peer.h>
22
#include <chunkidset.h>
23
#include <limits.h>
24
#include <trade_sig_ha.h>
25

    
26
#include "streaming.h"
27
#include "output.h"
28
#include "input.h"
29
#include "dbg.h"
30
#include "chunk_signaling.h"
31
#include "chunklock.h"
32
#include "topology.h"
33
#include "measures.h"
34

    
35
#include "scheduler_la.h"
36

    
37
struct chunk_attributes {
38
  uint64_t deadline;
39
  uint16_t deadline_increment;
40
  uint16_t hopcount;
41
} __attribute__((packed));
42

    
43
extern bool chunk_log;
44

    
45
struct chunk_buffer *cb;
46
static struct input_desc *input;
47
static int cb_size;
48
static int transid=0;
49

    
50
static int offer_per_tick = 1;        //N_p parameter of POLITO
51

    
52
int _needs(struct chunkID_set *cset, int cb_size, int cid);
53

    
54
uint64_t gettimeofday_in_us(void)
55
{
56
  struct timeval what_time; //to store the epoch time
57

    
58
  gettimeofday(&what_time, NULL);
59
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
60
}
61

    
62
void cb_print()
63
{
64
#ifdef DEBUG
65
  struct chunk *chunks;
66
  int num_chunks, i, id;
67
  chunks = cb_get_chunks(cb, &num_chunks);
68

    
69
  dprintf("\tchbuf :");
70
  i = 0;
71
  if(num_chunks) {
72
    id = chunks[0].id;
73
    dprintf(" %d-> ",id);
74
    while (i < num_chunks) {
75
      if (id == chunks[i].id) {
76
        dprintf("%d",id % 10);
77
        i++;
78
      } else if (chunk_islocked(id)) {
79
        dprintf("*");
80
      } else {
81
        dprintf(".");
82
      }
83
      id++;
84
    }
85
  }
86
  dprintf("\n");
87
#endif
88
}
89

    
90
void stream_init(int size, struct nodeID *myID)
91
{
92
  static char conf[32];
93

    
94
  cb_size = size;
95

    
96
  sprintf(conf, "size=%d", cb_size);
97
  cb = cb_init(conf);
98
  chunkDeliveryInit(myID);
99
  chunkSignalingInit(myID);
100
  init_measures();
101
}
102

    
103
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
104
{
105
  input = input_open(fname, loop ? INPUT_LOOP : 0, fds, fds_size);
106
  if (input == NULL) {
107
    return -1;
108
  }
109

    
110
  stream_init(1, myID);
111
  return 0;
112
}
113

    
114
void chunk_attributes_fill(struct chunk* c)
115
{
116
  struct chunk_attributes * ca;
117

    
118
  assert(!c->attributes && c->attributes_size == 0);
119

    
120
  c->attributes_size = sizeof(struct chunk_attributes);
121
  c->attributes = ca = malloc(c->attributes_size);
122

    
123
  ca->deadline = c->timestamp;
124
  ca->deadline_increment = 2;
125
  ca->hopcount = 0;
126
}
127

    
128
int chunk_get_hopcount(struct chunk* c) {
129
  struct chunk_attributes * ca;
130

    
131
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
132
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
133
    return -1;
134
  }
135

    
136
  ca = (struct chunk_attributes *) c->attributes;
137
  return ca->hopcount;
138
}
139

    
140
void chunk_attributes_update_received(struct chunk* c)
141
{
142
  struct chunk_attributes * ca;
143

    
144
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
145
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
146
    return;
147
  }
148

    
149
  ca = (struct chunk_attributes *) c->attributes;
150
  ca->hopcount++;
151
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
152
}
153

    
154
void chunk_attributes_update_sending(struct chunk* c)
155
{
156
  struct chunk_attributes * ca;
157

    
158
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
159
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
160
    return;
161
  }
162

    
163
  ca = (struct chunk_attributes *) c->attributes;
164
  ca->deadline += ca->deadline_increment;
165
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
166
}
167

    
168
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
169
{
170
  struct chunk *chunks;
171
  int num_chunks, i;
172
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
173
  chunks = cb_get_chunks(chbuf, &num_chunks);
174

    
175
  for(i=num_chunks-1; i>=0; i--) {
176
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
177
  }
178
  return my_bmap;
179
}
180

    
181
// a simple implementation that request everything that we miss ... up to max deliver
182
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id){
183
  struct chunkID_set *cset_acc, *my_bmap;
184
  int i, d, cset_off_size;
185
  //double lossrate;
186

    
187
  cset_acc = chunkID_set_init("size=0");
188

    
189
  //reduce load a little bit if there are losses on the path from this guy
190
  //lossrate = get_lossrate_receive(from->id);
191
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
192
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
193
    my_bmap = cb_to_bmap(cb);
194
    cset_off_size = chunkID_set_size(cset_off);
195
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
196
      int chunkid = chunkID_set_get_chunk(cset_off, i);
197
      //dprintf("\tdo I need c%d ? :",chunkid);
198
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
199
        chunkID_set_add_chunk(cset_acc, chunkid);
200
        chunk_lock(chunkid,from);
201
        dtprintf("accepting %d from %s", chunkid, node_addr(from->id));
202
#ifdef MONL
203
        dprintf(", loss:%f rtt:%f", get_lossrate(from->id), get_rtt(from->id));
204
#endif
205
        dprintf("\n");
206
        d++;
207
      }
208
    }
209
    chunkID_set_free(my_bmap);
210
  //} else {
211
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(from->id), lossrate, get_rtt(from->id));
212
  //}
213

    
214
  return cset_acc;
215
}
216

    
217
void send_bmap(struct peer *to)
218
{
219
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
220
   sendBufferMap(to->id,NULL, my_bmap, input ? 0 : cb_size, 0);
221
  chunkID_set_free(my_bmap);
222
}
223

    
224
double get_average_lossrate_pset(struct peerset *pset)
225
{
226
  int i, n;
227
  struct peer *neighbours;
228

    
229
  n = peerset_size(pset);
230
  neighbours = peerset_get_peers(pset);
231
  {
232
    struct nodeID *nodeids[n];
233
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
234
#ifdef MONL
235
    return get_average_lossrate(nodeids, n);
236
#else
237
    return 0;
238
#endif
239
  }
240
}
241

    
242
void ack_chunk(struct chunk *c, struct peer *p)
243
{
244
  //reduce load a little bit if there are losses on the path from this guy
245
  double average_lossrate = get_average_lossrate_pset(get_peers());
246
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
247
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
248
    return;
249
  }
250
  send_bmap(p);        //send explicit ack
251
}
252

    
253
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
254
{
255
  int res;
256
  static struct chunk c;
257
  struct peer *p;
258

    
259
  res = decodeChunk(&c, buff + 1, len - 1);
260
  if (res > 0) {
261
    chunk_attributes_update_received(&c);
262
    chunk_unlock(c.id);
263
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
264
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %lld hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
265
    output_deliver(&c);
266
    res = cb_add_chunk(cb, &c);
267
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
268
    cb_print();
269
    if (res < 0) {
270
      dprintf("\tchunk too old, buffer full with newer chunks\n");
271
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %lld\n", c.id, node_addr(from), gettimeofday_in_us());}
272
      free(c.data);
273
      free(c.attributes);
274
    }
275
    p = nodeid_to_peer(from,1);
276
    if (p) {        //now we have it almost sure
277
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
278
      ack_chunk(&c,p);        //send explicit ack
279
    }
280
  } else {
281
    fprintf(stderr,"\tError: can't decode chunk!\n");
282
  }
283
}
284

    
285
struct chunk *generated_chunk(suseconds_t *delta)
286
{
287
  struct chunk *c;
288

    
289
  c = malloc(sizeof(struct chunk));
290
  if (!c) {
291
    fprintf(stderr, "Memory allocation error!\n");
292
    return NULL;
293
  }
294

    
295
  *delta = input_get(input, c);
296
  if (*delta < 0) {
297
    fprintf(stderr, "Error in input!\n");
298
    exit(-1);
299
  }
300
  if (c->data == NULL) {
301
    free(c);
302
    return NULL;
303
  }
304
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
305
  chunk_attributes_fill(c);
306
  return c;
307
}
308

    
309
int add_chunk(struct chunk *c)
310
{
311
  int res;
312

    
313
  res = cb_add_chunk(cb, c);
314
  if (res < 0) {
315
    free(c->data);
316
    free(c->attributes);
317
    free(c);
318
    return 0;
319
  }
320
  free(c);
321
  return 1;
322
}
323

    
324
/**
325
 *example function to filter chunks based on whether a given peer needs them.
326
 *
327
 * Looks at buffermap information received about the given peer.
328
 */
329
int needs(struct nodeID *n, int cid){
330
  struct peer * p = nodeid_to_peer(n, 0);
331
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
332

    
333
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
334
  if (! p->bmap) {
335
    //dprintf("no bmap\n");
336
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
337
  }
338
  return _needs(p->bmap, p->cb_size, cid);
339
}
340

    
341
int _needs(struct chunkID_set *cset, int cb_size, int cid){
342
  if (cb_size == 0) { //if it declared it does not needs chunks
343
    return 0;
344
  }
345

    
346
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
347
    int missing, min;
348
    //@TODO: add some bmap_timestamp based logic
349

    
350
    if (chunkID_set_size(cset) == 0) {
351
      //dprintf("bmap empty\n");
352
      return 1;        // if the bmap seems empty, it needs the chunk
353
    }
354
    missing = cb_size - chunkID_set_size(cset);
355
    missing = missing < 0 ? 0 : missing;
356
    min = chunkID_set_get_earliest(cset);
357
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
358
    return (cid >= min - missing);
359
  }
360

    
361
  //dprintf("has it\n");
362
  return 0;
363
}
364

    
365
double peerWeightReceivedfrom(struct nodeID **n){
366
  struct peer * p = nodeid_to_peer(*n, 0);
367
  if (!p) return 0;
368
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
369
}
370

    
371
double peerWeightUniform(struct nodeID **n){
372
  return 1;
373
}
374

    
375
double peerWeightRtt(struct nodeID **n){
376
#ifdef MONL
377
  double rtt = get_rtt(*n);
378
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
379
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
380
#else
381
  return 1;
382
#endif
383
}
384

    
385
//ordering function for ELp peer selection, chunk ID based
386
//can't be used as weight
387
double peerScoreELpID(struct nodeID **n){
388
  struct chunkID_set *bmap;
389
  int latest;
390
  struct peer * p = nodeid_to_peer(*n, 0);
391
  if (!p) return 0;
392

    
393
  bmap = p->bmap;
394
  if (!bmap) return 0;
395
  latest = chunkID_set_get_latest(bmap);
396
  if (latest == INT_MIN) return 0;
397

    
398
  return -latest;
399
}
400

    
401
double getChunkTimestamp(int *cid){
402
  struct chunk *c = cb_get_chunk(cb, *cid);
403
  if (!c) return 0;
404

    
405
  return (double) c->timestamp;
406
}
407

    
408
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver, int trans_id){
409
  int i, d, cset_acc_size, res;
410

    
411
  cset_acc_size = chunkID_set_size(cset_acc);
412
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
413
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
414
    struct chunk *c;
415
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
416
    c = cb_get_chunk(cb, chunkid);
417
    if (c && needs(to->id, chunkid) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
418
      chunk_attributes_update_sending(c);
419
      res = sendChunk(to->id, c);
420
      if (res >= 0) {
421
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
422
        d++;
423
        reg_chunk_send(c->id);
424
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d\n", c->id, node_addr(to->id), gettimeofday_in_us(), res);}
425
      } else {
426
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
427
      }
428
    }
429
  }
430
}
431

    
432
int offer_peer_count()
433
{
434
  return offer_per_tick;
435
}
436

    
437
int offer_max_deliver(struct nodeID *n)
438
{
439
#ifdef MONL
440
  switch (get_hopcount(n)) {
441
    case 0: return 5;
442
    case 1: return 2;
443
    default: return 1;
444
  }
445
#else
446
  return 1;
447
#endif
448
}
449

    
450
void send_offer()
451
{
452
  struct chunk *buff;
453
  int size, res, i, n;
454
  struct peer *neighbours;
455
  struct peerset *pset;
456

    
457
  pset = get_peers();
458
  n = peerset_size(pset);
459
  neighbours = peerset_get_peers(pset);
460
  dprintf("Send Offer: %d neighbours\n", n);
461
  if (n == 0) return;
462
  buff = cb_get_chunks(cb, &size);
463
  if (size == 0) return;
464

    
465
  {
466
    size_t selectedpeers_len = offer_peer_count();
467
    int chunkids[size];
468
    struct nodeID *nodeids[n];
469
    struct nodeID *selectedpeers[selectedpeers_len];
470

    
471
    //reduce load a little bit if there are losses on the path from this guy
472
    double average_lossrate = get_average_lossrate_pset(pset);
473
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
474
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
475
      return;
476
    }
477

    
478
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
479
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
480
    selectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);        //select a peer that needs at least one of our chunks
481

    
482
    for (i=0; i<selectedpeers_len ; i++){
483
      int max_deliver = offer_max_deliver(selectedpeers[i]);
484
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
485
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]), nodeid_to_peer(selectedpeers[i],0)->cb_size);
486
      res = offerChunks(selectedpeers[i], my_bmap, max_deliver, transid++);
487
      chunkID_set_free(my_bmap);
488
    }
489
  }
490
}
491

    
492

    
493
void send_chunk()
494
{
495
  struct chunk *buff;
496
  int size, res, i, n;
497
  struct peer *neighbours;
498
  struct peerset *pset;
499

    
500
  pset = get_peers();
501
  n = peerset_size(pset);
502
  neighbours = peerset_get_peers(pset);
503
  dprintf("Send Chunk: %d neighbours\n", n);
504
  if (n == 0) return;
505
  buff = cb_get_chunks(cb, &size);
506
  dprintf("\t %d chunks in buffer...\n", size);
507
  if (size == 0) return;
508

    
509
  /************ STUPID DUMB SCHEDULING ****************/
510
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
511
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
512
  /************ /STUPID DUMB SCHEDULING ****************/
513

    
514
  /************ USE SCHEDULER ****************/
515
  {
516
    size_t selectedpairs_len = 1;
517
    int chunkids[size];
518
    struct nodeID *nodeids[n];
519
    struct PeerChunk selectedpairs[1];
520
  
521
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
522
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
523
    schedSelectPeerFirst(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, needs, peerWeightRtt, getChunkTimestamp);
524
  /************ /USE SCHEDULER ****************/
525

    
526
    for (i=0; i<selectedpairs_len ; i++){
527
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
528
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
529
      dprintf("\t sending chunk[%d] to ", c->id);
530
      dprintf("%s\n", node_addr(p->id));
531

    
532
      send_bmap(p);
533
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld ", c->id, node_addr(p->id), gettimeofday_in_us());}
534

    
535
      chunk_attributes_update_sending(c);
536
      res = sendChunk(p->id, c);
537
      if(chunk_log){fprintf(stderr, "Result: %d Size: %d bytes\n", res, c->size);}
538
      dprintf("\tResult: %d\n", res);
539
      if (res>=0) {
540
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
541
        reg_chunk_send(c->id);
542
      } else {
543
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
544
      }
545
    }
546
  }
547
}