Statistics
| Branch: | Revision:

streamers / streaming.c @ 03dca3bf

History | View | Annotate | Download (14.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <stdint.h>
10
#include <stdbool.h>
11
#include <math.h>
12
#include <assert.h>
13

    
14
#include <net_helper.h>
15
#include <chunk.h> 
16
#include <chunkbuffer.h> 
17
#include <trade_msg_la.h>
18
#include <trade_msg_ha.h>
19
#include <peerset.h>
20
#include <peer.h>
21
#include <chunkidset.h>
22
#include <limits.h>
23
#include <trade_sig_ha.h>
24

    
25
#include "streaming.h"
26
#include "output.h"
27
#include "input.h"
28
#include "dbg.h"
29
#include "chunk_signaling.h"
30
#include "chunklock.h"
31
#include "topology.h"
32
#include "measures.h"
33

    
34
#include "scheduler_la.h"
35
#include "chunkbuffer.h"
36
#include "chunkbuffer_helper.h"
37

    
38
struct chunk_attributes {
39
  uint64_t deadline;
40
  uint16_t deadline_increment;
41
  uint16_t hopcount;
42
} __attribute__((packed));
43

    
44
struct chunk_buffer *cb;
45
static struct input_desc *input;
46
static int cb_size;
47
static int transid=0;
48

    
49
static int offer_per_tick = 1;        //N_p parameter of POLITO
50

    
51
int _needs(struct chunkID_set *cset, int cb_size, int cid);
52

    
53
void cb_print()
54
{
55
#ifdef DEBUG
56
  struct chunk *chunks;
57
  int num_chunks, i, id;
58
  chunks = cb_get_chunks(cb, &num_chunks);
59

    
60
  dprintf("\tchbuf :");
61
  i = 0;
62
  if(num_chunks) {
63
    id = chunks[0].id;
64
    dprintf(" %d-> ",id);
65
    while (i < num_chunks) {
66
      if (id == chunks[i].id) {
67
        dprintf("%d",id % 10);
68
        i++;
69
      } else if (chunk_islocked(id)) {
70
        dprintf("*");
71
      } else {
72
        dprintf(".");
73
      }
74
      id++;
75
    }
76
  }
77
  dprintf("\n");
78
#endif
79
}
80

    
81
void stream_init(int size, struct nodeID *myID)
82
{
83
  char conf[32];
84

    
85
  cb_size = size;
86

    
87
  sprintf(conf, "size=%d", cb_size);
88
  cb = cb_init(conf);
89
  chunkDeliveryInit(myID);
90
  chunkSignalingInit(myID);
91
  //init_measures();
92
}
93

    
94
int source_init(const char *fname, struct nodeID *myID, bool loop)
95
{
96
  input = input_open(fname, loop ? INPUT_LOOP : 0);
97
  if (input == NULL) {
98
    return -1;
99
  }
100

    
101
  stream_init(1, myID);
102
  return 0;
103
}
104

    
105
void chunk_attributes_fill(struct chunk* c)
106
{
107
  struct chunk_attributes * ca;
108

    
109
  assert(!c->attributes && c->attributes_size == 0);
110

    
111
  c->attributes_size = sizeof(struct chunk_attributes);
112
  c->attributes = ca = (struct chunk_attributes *)malloc(c->attributes_size);
113

    
114
  ca->deadline = c->timestamp;
115
  ca->deadline_increment = 2;
116
  ca->hopcount = 0;
117
}
118

    
119
int chunk_get_hopcount(struct chunk* c) {
120
  struct chunk_attributes * ca;
121

    
122
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
123
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
124
    return -1;
125
  }
126

    
127
  ca = (struct chunk_attributes *) c->attributes;
128
  return ca->hopcount;
129
}
130

    
131
void chunk_attributes_update_received(const struct chunk* c)
132
{
133
  struct chunk_attributes * ca;
134

    
135
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
136
    fprintf(stderr,"Warning, received chunk %d with strange attributes block\n", c->id);
137
    return;
138
  }
139

    
140
  ca = (struct chunk_attributes *) c->attributes;
141
  ca->hopcount++;
142
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
143
}
144

    
145
void chunk_attributes_update_sending(const struct chunk* c)
146
{
147
  struct chunk_attributes * ca;
148

    
149
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
150
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
151
    return;
152
  }
153

    
154
  ca = (struct chunk_attributes *) c->attributes;
155
  ca->deadline += ca->deadline_increment;
156
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
157
}
158

    
159
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
160
{
161
  struct chunk *chunks;
162
  size_t num_chunks;
163
  int i;
164
  struct chunkID_set *my_bmap = chunkID_set_init("type=1");
165
  chunks = cb_get_chunks(chbuf, &num_chunks);
166

    
167
  for(i=num_chunks-1; i>=0; i--) {
168
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
169
  }
170
  return my_bmap;
171
}
172

    
173
// a simple implementation that request everything that we miss ... up to max deliver
174
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id){
175
  struct chunkID_set *cset_acc, *my_bmap;
176
  int i, d, cset_off_size;
177
  //double lossrate;
178

    
179
  cset_acc = chunkID_set_init("size=0");
180

    
181
  //reduce load a little bit if there are losses on the path from this guy
182
  //lossrate = get_lossrate_receive(from->id);
183
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
184
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
185
    my_bmap = cb_to_bmap(cb);
186
    cset_off_size = chunkID_set_size(cset_off);
187
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
188
      int chunkid = chunkID_set_get_chunk(cset_off, i);
189
      //dprintf("\tdo I need c%d ? :",chunkid);
190
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
191
        chunkID_set_add_chunk(cset_acc, chunkid);
192
        chunk_lock(chunkid,from);
193
        dtprintf("accepting %d from %s", chunkid, node_addr(from->id));
194
#ifdef MONL
195
        dprintf(", loss:%f rtt:%f", get_lossrate(from->id), get_rtt(from->id));
196
#endif
197
        dprintf("\n");
198
        d++;
199
      }
200
    }
201
    chunkID_set_free(my_bmap);
202
  //} else {
203
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(from->id), lossrate, get_rtt(from->id));
204
  //}
205

    
206
  return cset_acc;
207
}
208

    
209
void send_bmap(struct peer *to)
210
{
211
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
212
   sendBufferMap(to->id,NULL, my_bmap, cb_size, 0);
213
  chunkID_set_free(my_bmap);
214
}
215

    
216
double get_average_lossrate_pset(struct peerset *pset)
217
{
218
  int i, n;
219
  struct peer *neighbours;
220

    
221
  n = peerset_size(pset);
222
  neighbours = peerset_get_peers(pset);
223
  {
224
    const struct nodeID *nodeids[n];
225
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
226
#ifdef MONL
227
    return get_average_lossrate(nodeids, n);
228
#else
229
    return 0;
230
#endif
231
  }
232
}
233

    
234
void ack_chunk(struct chunk *c, struct peer *p)
235
{
236
  //reduce load a little bit if there are losses on the path from this guy
237
  double average_lossrate = get_average_lossrate_pset(get_peers());
238
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
239
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
240
    return;
241
  }
242
  send_bmap(p);        //send explicit ack
243
}
244

    
245
void received_chunk(const struct nodeID *from, const uint8_t *buff, int len)
246
{
247
  int res;
248
  static struct chunk c;
249
  struct peer *p;
250

    
251
  res = decodeChunk(&c, buff + 1, len - 1);
252
  if (res > 0) {
253
    chunk_attributes_update_received(&c);
254
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c));
255
    chunk_unlock(c.id);
256
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
257
    output_deliver(&c);
258
    res = cb_add_chunk(cb, &c);
259
    cb_print();
260
    if (res < 0) {
261
      dprintf("\tchunk too old, buffer full with newer chunks\n");
262
      free(c.data);
263
      free(c.attributes);
264
    }
265
    p = nodeid_to_peer(from,1);
266
    if (p) {        //now we have it almost sure
267
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
268
      ack_chunk(&c,p);        //send explicit ack
269
    }
270
  } else {
271
    fprintf(stderr,"\tError: can't decode chunk!\n");
272
  }
273
}
274

    
275
int generated_chunk(suseconds_t *delta)
276
{
277
  int res;
278
  struct chunk c;
279

    
280
  *delta = input_get(input, &c);
281
  if (*delta < 0) {
282
    fprintf(stderr, "Error in input!\n");
283
    exit(-1);
284
  }
285
  if (c.data == NULL) {
286
    return 0;
287
  }
288
  dprintf("Generated chunk %d of %d bytes\n",c.id, c.size);
289
  chunk_attributes_fill(&c);
290
  res = cb_add_chunk(cb, &c);
291
  if (res < 0) {
292
    free(c.data);
293
    free(c.attributes);
294
  }
295

    
296
  return 1;
297
}
298

    
299
/**
300
 *example function to filter chunks based on whether a given peer needs them.
301
 *
302
 * Looks at buffermap information received about the given peer.
303
 */
304
int needs(const struct nodeID *n, int cid){
305
  struct peer * p = nodeid_to_peer(n, 0);
306
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
307

    
308
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
309
  if (! p->bmap) {
310
    //dprintf("no bmap\n");
311
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
312
  }
313
  return _needs(p->bmap, p->cb_size, cid);
314
}
315

    
316
int _needs(struct chunkID_set *cset, int cb_size, int cid){
317
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
318
    int missing, min;
319
    //@TODO: add some bmap_timestamp based logic
320

    
321
    if (chunkID_set_size(cset) == 0) {
322
      //dprintf("bmap empty\n");
323
      return 1;        // if the bmap seems empty, it needs the chunk
324
    }
325
    missing = cb_size - chunkID_set_size(cset);
326
    missing = missing < 0 ? 0 : missing;
327
    min = chunkID_set_get_earliest(cset);
328
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
329
    return (cid >= min - missing);
330
  }
331

    
332
  //dprintf("has it\n");
333
  return 0;
334
}
335

    
336
double peerWeightReceivedfrom(const struct nodeID * const *n){
337
  struct peer * p = nodeid_to_peer(*n, 0);
338
  if (!p) return 0;
339
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
340
}
341

    
342
double peerWeightUniform(const struct nodeID * const *n){
343
  return 1;
344
}
345

    
346
double peerWeightRtt(const struct nodeID * const *n){
347
#ifdef MONL
348
  double rtt = get_rtt(*n);
349
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
350
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
351
#else
352
  return 1;
353
#endif
354
}
355

    
356
//ordering function for ELp peer selection, chunk ID based
357
//can't be used as weight
358
double peerScoreELpID(struct nodeID **n){
359
  struct chunkID_set *bmap;
360
  int latest;
361
  struct peer * p = nodeid_to_peer(*n, 0);
362
  if (!p) return 0;
363

    
364
  bmap = p->bmap;
365
  if (!bmap) return 0;
366
  latest = chunkID_set_get_latest(bmap);
367
  if (latest == INT_MIN) return 0;
368

    
369
  return -latest;
370
}
371

    
372
double getChunkTimestamp(const int *cid){
373
  const struct chunk *c = cb_get_chunk(cb, *cid);
374
  if (!c) return 0;
375

    
376
  return (double) c->timestamp;
377
}
378

    
379
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver, int trans_id){
380
  int i, d, cset_acc_size, res;
381

    
382
  cset_acc_size = chunkID_set_size(cset_acc);
383
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
384
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
385
    const struct chunk *c;
386
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
387
    c = cb_get_chunk(cb, chunkid);
388
    if (c && needs(to->id, chunkid) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
389
      chunk_attributes_update_sending(c);
390
      res = sendChunk(to->id, c);
391
      if (res >= 0) {
392
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
393
        d++;
394
        reg_chunk_send(c->id);
395
      } else {
396
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
397
      }
398
    }
399
  }
400
}
401

    
402
int offer_peer_count()
403
{
404
  return offer_per_tick;
405
}
406

    
407
int offer_max_deliver(const struct nodeID *n)
408
{
409
#ifdef MONL
410
  switch (get_hopcount(n)) {
411
    case 0: return 5;
412
    case 1: return 2;
413
    default: return 1;
414
  }
415
#else
416
  return 1;
417
#endif
418
}
419

    
420
void send_offer()
421
{
422
  struct chunk *buff;
423
  size_t size, i, n;
424
  int res;
425
  struct peer *neighbours;
426
  struct peerset *pset;
427

    
428
  pset = get_peers();
429
  n = peerset_size(pset);
430
  neighbours = peerset_get_peers(pset);
431
  dprintf("Send Offer: %d neighbours\n", n);
432
  if (n == 0) return;
433
  buff = cb_get_chunks(cb, &size);
434
  if (size == 0) return;
435

    
436
  {
437
    size_t selectedpeers_len = offer_peer_count();
438
    int chunkids[size];
439
    const struct nodeID *nodeids[n];
440
    const struct nodeID *selectedpeers[selectedpeers_len];
441

    
442
    //reduce load a little bit if there are losses on the path from this guy
443
    double average_lossrate = get_average_lossrate_pset(pset);
444
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
445
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
446
      return;
447
    }
448

    
449
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
450
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
451
    schedSelectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);        //select a peer that needs at least one of our chunks
452

    
453
    for (i=0; i<selectedpeers_len ; i++){
454
      int max_deliver = offer_max_deliver(selectedpeers[i]);
455
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
456
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]));
457
      res = offerChunks(selectedpeers[i], my_bmap, max_deliver, transid++);
458
      chunkID_set_free(my_bmap);
459
    }
460
  }
461
}
462

    
463

    
464
void send_chunk()
465
{
466
  struct chunk *buff;
467
  size_t size, i, n;
468
  int res;
469
  struct peer *neighbours;
470
  struct peerset *pset;
471

    
472
  pset = get_peers();
473
  n = peerset_size(pset);
474
  neighbours = peerset_get_peers(pset);
475
  dprintf("Send Chunk: %d neighbours\n", n);
476
  if (n == 0) return;
477
  buff = cb_get_chunks(cb, &size);
478
  dprintf("\t %d chunks in buffer...\n", size);
479
  if (size == 0) return;
480

    
481
  /************ STUPID DUMB SCHEDULING ****************/
482
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
483
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
484
  /************ /STUPID DUMB SCHEDULING ****************/
485

    
486
  /************ USE SCHEDULER ****************/
487
  {
488
    size_t selectedpairs_len = 1;
489
    int chunkids[size];
490
    const struct nodeID *nodeids[n];
491
    struct PeerChunk selectedpairs[1];
492
  
493
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
494
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
495
    schedSelectPeerFirst(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, needs, peerWeightRtt, getChunkTimestamp);
496
  /************ /USE SCHEDULER ****************/
497

    
498
    for (i=0; i<selectedpairs_len ; i++){
499
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
500
      const struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
501
      dprintf("\t sending chunk[%d] to ", c->id);
502
      dprintf("%s\n", node_addr(p->id));
503

    
504
      send_bmap(p);
505
      chunk_attributes_update_sending(c);
506
      res = sendChunk(p->id, c);
507
      dprintf("\tResult: %d\n", res);
508
      if (res>=0) {
509
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
510
        reg_chunk_send(c->id);
511
      } else {
512
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
513
      }
514
    }
515
  }
516
}