Statistics
| Branch: | Revision:

streamers / streaming.c @ c9370421

History | View | Annotate | Download (14.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <stdint.h>
10
#include <stdbool.h>
11
#include <math.h>
12
#include <assert.h>
13

    
14
#include <net_helper.h>
15
#include <chunk.h> 
16
#include <chunkbuffer.h> 
17
#include <trade_msg_la.h>
18
#include <trade_msg_ha.h>
19
#include <peerset.h>
20
#include <peer.h>
21
#include <chunkidset.h>
22
#include <limits.h>
23
#include <trade_sig_ha.h>
24

    
25
#include "streaming.h"
26
#include "output.h"
27
#include "input.h"
28
#include "dbg.h"
29
#include "chunklock.h"
30
#include "topology.h"
31
#include "measures.h"
32

    
33
#include "scheduler_la.h"
34

    
35
struct chunk_attributes {
36
  uint64_t deadline;
37
  uint16_t deadline_increment;
38
  uint16_t hopcount;
39
} __attribute__((packed));
40

    
41
struct chunk_buffer *cb;
42
static struct input_desc *input;
43
static int cb_size;
44
static int transid=0;
45

    
46
static int offer_per_tick = 1;        //N_p parameter of POLITO
47

    
48
int _needs(struct chunkID_set *cset, int cb_size, int cid);
49

    
50
void cb_print()
51
{
52
#ifdef DEBUG
53
  struct chunk *chunks;
54
  int num_chunks, i, id;
55
  chunks = cb_get_chunks(cb, &num_chunks);
56

    
57
  dprintf("\tchbuf :");
58
  i = 0;
59
  if(num_chunks) {
60
    id = chunks[0].id;
61
    dprintf(" %d-> ",id);
62
    while (i < num_chunks) {
63
      if (id == chunks[i].id) {
64
        dprintf("%d",id % 10);
65
        i++;
66
      } else if (chunk_islocked(id)) {
67
        dprintf("*");
68
      } else {
69
        dprintf(".");
70
      }
71
      id++;
72
    }
73
  }
74
  dprintf("\n");
75
#endif
76
}
77

    
78
void stream_init(int size, struct nodeID *myID)
79
{
80
  char conf[32];
81

    
82
  cb_size = size;
83

    
84
  sprintf(conf, "size=%d", cb_size);
85
  cb = cb_init(conf);
86
  chunkDeliveryInit(myID);
87
  chunkSignalingInit(myID);
88
  init_measures();
89
}
90

    
91
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
92
{
93
  input = input_open(fname, loop ? INPUT_LOOP : 0, fds, fds_size);
94
  if (input == NULL) {
95
    return -1;
96
  }
97

    
98
  stream_init(1, myID);
99
  return 0;
100
}
101

    
102
void chunk_attributes_fill(struct chunk* c)
103
{
104
  struct chunk_attributes * ca;
105

    
106
  assert(!c->attributes && c->attributes_size == 0);
107

    
108
  c->attributes_size = sizeof(struct chunk_attributes);
109
  c->attributes = ca = malloc(c->attributes_size);
110

    
111
  ca->deadline = c->timestamp;
112
  ca->deadline_increment = 2;
113
  ca->hopcount = 0;
114
}
115

    
116
int chunk_get_hopcount(struct chunk* c) {
117
  struct chunk_attributes * ca;
118

    
119
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
120
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
121
    return -1;
122
  }
123

    
124
  ca = (struct chunk_attributes *) c->attributes;
125
  return ca->hopcount;
126
}
127

    
128
void chunk_attributes_update_received(struct chunk* c)
129
{
130
  struct chunk_attributes * ca;
131

    
132
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
133
    fprintf(stderr,"Warning, received chunk %d with strange attributes block\n", c->id);
134
    return;
135
  }
136

    
137
  ca = (struct chunk_attributes *) c->attributes;
138
  ca->hopcount++;
139
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
140
}
141

    
142
void chunk_attributes_update_sending(struct chunk* c)
143
{
144
  struct chunk_attributes * ca;
145

    
146
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
147
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
148
    return;
149
  }
150

    
151
  ca = (struct chunk_attributes *) c->attributes;
152
  ca->deadline += ca->deadline_increment;
153
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
154
}
155

    
156
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
157
{
158
  struct chunk *chunks;
159
  int num_chunks, i;
160
  struct chunkID_set *my_bmap = chunkID_set_init("type=1");
161
  chunks = cb_get_chunks(chbuf, &num_chunks);
162

    
163
  for(i=num_chunks-1; i>=0; i--) {
164
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
165
  }
166
  return my_bmap;
167
}
168

    
169
// a simple implementation that request everything that we miss ... up to max deliver
170
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id){
171
  struct chunkID_set *cset_acc, *my_bmap;
172
  int i, d, cset_off_size;
173
  //double lossrate;
174

    
175
  cset_acc = chunkID_set_init("size=0");
176

    
177
  //reduce load a little bit if there are losses on the path from this guy
178
  //lossrate = get_lossrate_receive(from->id);
179
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
180
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
181
    my_bmap = cb_to_bmap(cb);
182
    cset_off_size = chunkID_set_size(cset_off);
183
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
184
      int chunkid = chunkID_set_get_chunk(cset_off, i);
185
      //dprintf("\tdo I need c%d ? :",chunkid);
186
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
187
        chunkID_set_add_chunk(cset_acc, chunkid);
188
        chunk_lock(chunkid,from);
189
        dtprintf("accepting %d from %s", chunkid, node_addr(from->id));
190
#ifdef MONL
191
        dprintf(", loss:%f rtt:%f", get_lossrate(from->id), get_rtt(from->id));
192
#endif
193
        dprintf("\n");
194
        d++;
195
      }
196
    }
197
    chunkID_set_free(my_bmap);
198
  //} else {
199
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(from->id), lossrate, get_rtt(from->id));
200
  //}
201

    
202
  return cset_acc;
203
}
204

    
205
void send_bmap(struct peer *to)
206
{
207
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
208
   sendBufferMap(to->id,NULL, my_bmap, 0); //FIXME: should add cb_size
209
  chunkID_set_free(my_bmap);
210
}
211

    
212
double get_average_lossrate_pset(struct peerset *pset)
213
{
214
  int i, n;
215
  struct peer *neighbours;
216

    
217
  n = peerset_size(pset);
218
  neighbours = peerset_get_peers(pset);
219
  {
220
    struct nodeID *nodeids[n];
221
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
222
#ifdef MONL
223
    return get_average_lossrate(nodeids, n);
224
#else
225
    return 0;
226
#endif
227
  }
228
}
229

    
230
void ack_chunk(struct chunk *c, struct peer *p)
231
{
232
  //reduce load a little bit if there are losses on the path from this guy
233
  double average_lossrate = get_average_lossrate_pset(get_peers());
234
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
235
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
236
    return;
237
  }
238
  send_bmap(p);        //send explicit ack
239
}
240

    
241
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
242
{
243
  int res;
244
  static struct chunk c;
245
  struct peer *p;
246

    
247
  res = decodeChunk(&c, buff + 1, len - 1);
248
  if (res > 0) {
249
    chunk_attributes_update_received(&c);
250
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c));
251
    chunk_unlock(c.id);
252
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
253
    output_deliver(&c);
254
    res = cb_add_chunk(cb, &c);
255
    cb_print();
256
    if (res < 0) {
257
      dprintf("\tchunk too old, buffer full with newer chunks\n");
258
      free(c.data);
259
      free(c.attributes);
260
    }
261
    p = nodeid_to_peer(from,1);
262
    if (p) {        //now we have it almost sure
263
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
264
      ack_chunk(&c,p);        //send explicit ack
265
    }
266
  } else {
267
    fprintf(stderr,"\tError: can't decode chunk!\n");
268
  }
269
}
270

    
271
struct chunk *generated_chunk(suseconds_t *delta)
272
{
273
  struct chunk *c;
274

    
275
  c = malloc(sizeof(struct chunk));
276
  if (!c) {
277
    fprintf(stderr, "Memory allocation error!\n");
278
    return NULL;
279
  }
280

    
281
  *delta = input_get(input, c);
282
  if (*delta < 0) {
283
    fprintf(stderr, "Error in input!\n");
284
    exit(-1);
285
  }
286
  if (c->data == NULL) {
287
    free(c);
288
    return NULL;
289
  }
290
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
291
  chunk_attributes_fill(c);
292
  return c;
293
}
294

    
295
int add_chunk(struct chunk *c)
296
{
297
  int res;
298

    
299
  res = cb_add_chunk(cb, c);
300
  if (res < 0) {
301
    free(c->data);
302
    free(c->attributes);
303
    free(c);
304
    return 0;
305
  }
306
  free(c);
307
  return 1;
308
}
309

    
310
/**
311
 *example function to filter chunks based on whether a given peer needs them.
312
 *
313
 * Looks at buffermap information received about the given peer.
314
 */
315
int needs(struct nodeID *n, int cid){
316
  struct peer * p = nodeid_to_peer(n, 0);
317
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
318

    
319
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
320
  if (! p->bmap) {
321
    //dprintf("no bmap\n");
322
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
323
  }
324
  return _needs(p->bmap, cb_size, cid);        //FIXME: cb_size should be p->cb_size
325
}
326

    
327
int _needs(struct chunkID_set *cset, int cb_size, int cid){
328
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
329
    int missing, min;
330
    //@TODO: add some bmap_timestamp based logic
331

    
332
    if (chunkID_set_size(cset) == 0) {
333
      //dprintf("bmap empty\n");
334
      return 1;        // if the bmap seems empty, it needs the chunk
335
    }
336
    missing = cb_size - chunkID_set_size(cset);
337
    missing = missing < 0 ? 0 : missing;
338
    min = chunkID_set_get_earliest(cset);
339
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
340
    return (cid >= min - missing);
341
  }
342

    
343
  //dprintf("has it\n");
344
  return 0;
345
}
346

    
347
double peerWeightReceivedfrom(struct nodeID **n){
348
  struct peer * p = nodeid_to_peer(*n, 0);
349
  if (!p) return 0;
350
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
351
}
352

    
353
double peerWeightUniform(struct nodeID **n){
354
  return 1;
355
}
356

    
357
double peerWeightRtt(struct nodeID **n){
358
#ifdef MONL
359
  double rtt = get_rtt(*n);
360
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
361
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
362
#else
363
  return 1;
364
#endif
365
}
366

    
367
//ordering function for ELp peer selection, chunk ID based
368
//can't be used as weight
369
double peerScoreELpID(struct nodeID **n){
370
  struct chunkID_set *bmap;
371
  int latest;
372
  struct peer * p = nodeid_to_peer(*n, 0);
373
  if (!p) return 0;
374

    
375
  bmap = p->bmap;
376
  if (!bmap) return 0;
377
  latest = chunkID_set_get_latest(bmap);
378
  if (latest == INT_MIN) return 0;
379

    
380
  return -latest;
381
}
382

    
383
double getChunkTimestamp(int *cid){
384
  struct chunk *c = cb_get_chunk(cb, *cid);
385
  if (!c) return 0;
386

    
387
  return (double) c->timestamp;
388
}
389

    
390
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver, int trans_id){
391
  int i, d, cset_acc_size, res;
392

    
393
  cset_acc_size = chunkID_set_size(cset_acc);
394
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
395
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
396
    struct chunk *c;
397
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
398
    c = cb_get_chunk(cb, chunkid);
399
    if (c && needs(to->id, chunkid) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
400
      chunk_attributes_update_sending(c);
401
      res = sendChunk(to->id, c);
402
      if (res >= 0) {
403
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
404
        d++;
405
        reg_chunk_send(c->id);
406
      } else {
407
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
408
      }
409
    }
410
  }
411
}
412

    
413
int offer_peer_count()
414
{
415
  return offer_per_tick;
416
}
417

    
418
int offer_max_deliver(struct nodeID *n)
419
{
420
#ifdef MONL
421
  switch (get_hopcount(n)) {
422
    case 0: return 5;
423
    case 1: return 2;
424
    default: return 1;
425
  }
426
#else
427
  return 1;
428
#endif
429
}
430

    
431
void send_offer()
432
{
433
  struct chunk *buff;
434
  int size, res, i, n;
435
  struct peer *neighbours;
436
  struct peerset *pset;
437

    
438
  pset = get_peers();
439
  n = peerset_size(pset);
440
  neighbours = peerset_get_peers(pset);
441
  dprintf("Send Offer: %d neighbours\n", n);
442
  if (n == 0) return;
443
  buff = cb_get_chunks(cb, &size);
444
  if (size == 0) return;
445

    
446
  {
447
    size_t selectedpeers_len = offer_peer_count();
448
    int chunkids[size];
449
    struct nodeID *nodeids[n];
450
    struct nodeID *selectedpeers[selectedpeers_len];
451

    
452
    //reduce load a little bit if there are losses on the path from this guy
453
    double average_lossrate = get_average_lossrate_pset(pset);
454
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
455
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
456
      return;
457
    }
458

    
459
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
460
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
461
    selectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);        //select a peer that needs at least one of our chunks
462

    
463
    for (i=0; i<selectedpeers_len ; i++){
464
      int max_deliver = offer_max_deliver(selectedpeers[i]);
465
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
466
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]));
467
      res = offerChunks(selectedpeers[i], my_bmap, max_deliver, transid++);
468
      chunkID_set_free(my_bmap);
469
    }
470
  }
471
}
472

    
473

    
474
void send_chunk()
475
{
476
  struct chunk *buff;
477
  int size, res, i, n;
478
  struct peer *neighbours;
479
  struct peerset *pset;
480

    
481
  pset = get_peers();
482
  n = peerset_size(pset);
483
  neighbours = peerset_get_peers(pset);
484
  dprintf("Send Chunk: %d neighbours\n", n);
485
  if (n == 0) return;
486
  buff = cb_get_chunks(cb, &size);
487
  dprintf("\t %d chunks in buffer...\n", size);
488
  if (size == 0) return;
489

    
490
  /************ STUPID DUMB SCHEDULING ****************/
491
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
492
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
493
  /************ /STUPID DUMB SCHEDULING ****************/
494

    
495
  /************ USE SCHEDULER ****************/
496
  {
497
    size_t selectedpairs_len = 1;
498
    int chunkids[size];
499
    struct nodeID *nodeids[n];
500
    struct PeerChunk selectedpairs[1];
501
  
502
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
503
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
504
    schedSelectPeerFirst(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, needs, peerWeightRtt, getChunkTimestamp);
505
  /************ /USE SCHEDULER ****************/
506

    
507
    for (i=0; i<selectedpairs_len ; i++){
508
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
509
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
510
      dprintf("\t sending chunk[%d] to ", c->id);
511
      dprintf("%s\n", node_addr(p->id));
512

    
513
      send_bmap(p);
514
      chunk_attributes_update_sending(c);
515
      res = sendChunk(p->id, c);
516
      dprintf("\tResult: %d\n", res);
517
      if (res>=0) {
518
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
519
        reg_chunk_send(c->id);
520
      } else {
521
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
522
      }
523
    }
524
  }
525
}