Statistics
| Branch: | Revision:

streamers / streaming.c @ 45c3eb04

History | View | Annotate | Download (16 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h> //ENST
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11
#include <stdbool.h>
12
#include <math.h>
13
#include <assert.h>
14

    
15
#include <net_helper.h>
16
#include <chunk.h> 
17
#include <chunkbuffer.h> 
18
#include <trade_msg_la.h>
19
#include <trade_msg_ha.h>
20
#include <peerset.h>
21
#include <peer.h>
22
#include <chunkidset.h>
23
#include <limits.h>
24
#include <trade_sig_ha.h>
25

    
26
#include "streaming.h"
27
#include "output.h"
28
#include "input.h"
29
#include "dbg.h"
30
#include "chunk_signaling.h"
31
#include "chunklock.h"
32
#include "topology.h"
33
#include "measures.h"
34

    
35
#include "scheduler_la.h"
36

    
37
struct chunk_attributes {
38
  uint64_t deadline;
39
  uint16_t deadline_increment;
40
  uint16_t hopcount;
41
} __attribute__((packed));
42

    
43
//___________ENST_____________
44
struct timeval what_time; //to store the epoch time
45
uint64_t time_now; //to evaluate system time in us precision
46
int hc; //variable for storing the hopcount
47
extern bool log_on;
48
//____________________________
49

    
50
struct chunk_buffer *cb;
51
static struct input_desc *input;
52
static int cb_size;
53
static int transid=0;
54

    
55
static int offer_per_tick = 1;        //N_p parameter of POLITO
56

    
57
int _needs(struct chunkID_set *cset, int cb_size, int cid);
58

    
59
void cb_print()
60
{
61
#ifdef DEBUG
62
  struct chunk *chunks;
63
  int num_chunks, i, id;
64
  chunks = cb_get_chunks(cb, &num_chunks);
65

    
66
  dprintf("\tchbuf :");
67
  i = 0;
68
  if(num_chunks) {
69
    id = chunks[0].id;
70
    dprintf(" %d-> ",id);
71
    while (i < num_chunks) {
72
      if (id == chunks[i].id) {
73
        dprintf("%d",id % 10);
74
        i++;
75
      } else if (chunk_islocked(id)) {
76
        dprintf("*");
77
      } else {
78
        dprintf(".");
79
      }
80
      id++;
81
    }
82
  }
83
  dprintf("\n");
84
#endif
85
}
86

    
87
void stream_init(int size, struct nodeID *myID)
88
{
89
  char conf[32];
90

    
91
  cb_size = size;
92

    
93
  sprintf(conf, "size=%d", cb_size);
94
  cb = cb_init(conf);
95
  chunkDeliveryInit(myID);
96
  chunkSignalingInit(myID);
97
  init_measures();
98
}
99

    
100
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
101
{
102
  input = input_open(fname, loop ? INPUT_LOOP : 0, fds, fds_size);
103
  if (input == NULL) {
104
    return -1;
105
  }
106

    
107
  stream_init(1, myID);
108
  return 0;
109
}
110

    
111
void chunk_attributes_fill(struct chunk* c)
112
{
113
  struct chunk_attributes * ca;
114

    
115
  assert(!c->attributes && c->attributes_size == 0);
116

    
117
  c->attributes_size = sizeof(struct chunk_attributes);
118
  c->attributes = ca = malloc(c->attributes_size);
119

    
120
  ca->deadline = c->timestamp;
121
  ca->deadline_increment = 2;
122
  ca->hopcount = 0;
123
}
124

    
125
int chunk_get_hopcount(struct chunk* c) {
126
  struct chunk_attributes * ca;
127

    
128
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
129
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
130
    return -1;
131
  }
132

    
133
  ca = (struct chunk_attributes *) c->attributes;
134
  return ca->hopcount;
135
}
136

    
137
void chunk_attributes_update_received(struct chunk* c)
138
{
139
  struct chunk_attributes * ca;
140

    
141
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
142
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%d\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
143
    return;
144
  }
145

    
146
  ca = (struct chunk_attributes *) c->attributes;
147
  ca->hopcount++;
148
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
149
}
150

    
151
void chunk_attributes_update_sending(struct chunk* c)
152
{
153
  struct chunk_attributes * ca;
154

    
155
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
156
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
157
    return;
158
  }
159

    
160
  ca = (struct chunk_attributes *) c->attributes;
161
  ca->deadline += ca->deadline_increment;
162
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
163
}
164

    
165
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
166
{
167
  struct chunk *chunks;
168
  int num_chunks, i;
169
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
170
  chunks = cb_get_chunks(chbuf, &num_chunks);
171

    
172
  for(i=num_chunks-1; i>=0; i--) {
173
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
174
  }
175
  return my_bmap;
176
}
177

    
178
// a simple implementation that request everything that we miss ... up to max deliver
179
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id){
180
  struct chunkID_set *cset_acc, *my_bmap;
181
  int i, d, cset_off_size;
182
  //double lossrate;
183

    
184
  cset_acc = chunkID_set_init("size=0");
185

    
186
  //reduce load a little bit if there are losses on the path from this guy
187
  //lossrate = get_lossrate_receive(from->id);
188
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
189
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
190
    my_bmap = cb_to_bmap(cb);
191
    cset_off_size = chunkID_set_size(cset_off);
192
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
193
      int chunkid = chunkID_set_get_chunk(cset_off, i);
194
      //dprintf("\tdo I need c%d ? :",chunkid);
195
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
196
        chunkID_set_add_chunk(cset_acc, chunkid);
197
        chunk_lock(chunkid,from);
198
        dtprintf("accepting %d from %s", chunkid, node_addr(from->id));
199
#ifdef MONL
200
        dprintf(", loss:%f rtt:%f", get_lossrate(from->id), get_rtt(from->id));
201
#endif
202
        dprintf("\n");
203
        d++;
204
      }
205
    }
206
    chunkID_set_free(my_bmap);
207
  //} else {
208
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(from->id), lossrate, get_rtt(from->id));
209
  //}
210

    
211
  return cset_acc;
212
}
213

    
214
void send_bmap(struct peer *to)
215
{
216
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
217
   sendBufferMap(to->id,NULL, my_bmap, input ? 0 : cb_size, 0);
218
  chunkID_set_free(my_bmap);
219
}
220

    
221
double get_average_lossrate_pset(struct peerset *pset)
222
{
223
  int i, n;
224
  struct peer *neighbours;
225

    
226
  n = peerset_size(pset);
227
  neighbours = peerset_get_peers(pset);
228
  {
229
    struct nodeID *nodeids[n];
230
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
231
#ifdef MONL
232
    return get_average_lossrate(nodeids, n);
233
#else
234
    return 0;
235
#endif
236
  }
237
}
238

    
239
void ack_chunk(struct chunk *c, struct peer *p)
240
{
241
  //reduce load a little bit if there are losses on the path from this guy
242
  double average_lossrate = get_average_lossrate_pset(get_peers());
243
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
244
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
245
    return;
246
  }
247
  send_bmap(p);        //send explicit ack
248
}
249

    
250
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
251
{
252
  int res;
253
  static struct chunk c;
254
  struct peer *p;
255

    
256
  res = decodeChunk(&c, buff + 1, len - 1);
257
  if (res > 0) {
258
    chunk_attributes_update_received(&c);
259
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c));
260
    chunk_unlock(c.id);
261
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
262
    //____________ENST__________________________
263
    gettimeofday(&what_time, NULL);        
264
    time_now= what_time.tv_sec * 1000000ULL + what_time.tv_usec;
265
    hc = chunk_get_hopcount(&c);
266
    if(log_on){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %lld hopcount: %i\n", c.id, node_addr(from), time_now, hc);}
267
    //_________________________________________            
268
    output_deliver(&c);
269
    res = cb_add_chunk(cb, &c);
270
    cb_print();
271
    if (res < 0) {
272
      dprintf("\tchunk too old, buffer full with newer chunks\n");
273
      if(log_on){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %lld\n", c.id, node_addr(from), time_now);} //ENST
274
      free(c.data);
275
      free(c.attributes);
276
    }
277
    p = nodeid_to_peer(from,1);
278
    if (p) {        //now we have it almost sure
279
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
280
      ack_chunk(&c,p);        //send explicit ack
281
    }
282
  } else {
283
    fprintf(stderr,"\tError: can't decode chunk!\n");
284
  }
285
}
286

    
287
struct chunk *generated_chunk(suseconds_t *delta)
288
{
289
  struct chunk *c;
290

    
291
  c = malloc(sizeof(struct chunk));
292
  if (!c) {
293
    fprintf(stderr, "Memory allocation error!\n");
294
    return NULL;
295
  }
296

    
297
  *delta = input_get(input, c);
298
  if (*delta < 0) {
299
    fprintf(stderr, "Error in input!\n");
300
    exit(-1);
301
  }
302
  if (c->data == NULL) {
303
    free(c);
304
    return NULL;
305
  }
306
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
307
  chunk_attributes_fill(c);
308
  return c;
309
}
310

    
311
int add_chunk(struct chunk *c)
312
{
313
  int res;
314

    
315
  res = cb_add_chunk(cb, c);
316
  if (res < 0) {
317
    free(c->data);
318
    free(c->attributes);
319
    free(c);
320
    return 0;
321
  }
322
  free(c);
323
  return 1;
324
}
325

    
326
/**
327
 *example function to filter chunks based on whether a given peer needs them.
328
 *
329
 * Looks at buffermap information received about the given peer.
330
 */
331
int needs(struct nodeID *n, int cid){
332
  struct peer * p = nodeid_to_peer(n, 0);
333
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
334

    
335
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
336
  if (! p->bmap) {
337
    //dprintf("no bmap\n");
338
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
339
  }
340
  return _needs(p->bmap, p->cb_size, cid);
341
}
342

    
343
int _needs(struct chunkID_set *cset, int cb_size, int cid){
344
  if (cb_size == 0) { //if it declared it does not needs chunks
345
    return 0;
346
  }
347

    
348
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
349
    int missing, min;
350
    //@TODO: add some bmap_timestamp based logic
351

    
352
    if (chunkID_set_size(cset) == 0) {
353
      //dprintf("bmap empty\n");
354
      return 1;        // if the bmap seems empty, it needs the chunk
355
    }
356
    missing = cb_size - chunkID_set_size(cset);
357
    missing = missing < 0 ? 0 : missing;
358
    min = chunkID_set_get_earliest(cset);
359
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
360
    return (cid >= min - missing);
361
  }
362

    
363
  //dprintf("has it\n");
364
  return 0;
365
}
366

    
367
double peerWeightReceivedfrom(struct nodeID **n){
368
  struct peer * p = nodeid_to_peer(*n, 0);
369
  if (!p) return 0;
370
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
371
}
372

    
373
double peerWeightUniform(struct nodeID **n){
374
  return 1;
375
}
376

    
377
double peerWeightRtt(struct nodeID **n){
378
#ifdef MONL
379
  double rtt = get_rtt(*n);
380
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
381
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
382
#else
383
  return 1;
384
#endif
385
}
386

    
387
//ordering function for ELp peer selection, chunk ID based
388
//can't be used as weight
389
double peerScoreELpID(struct nodeID **n){
390
  struct chunkID_set *bmap;
391
  int latest;
392
  struct peer * p = nodeid_to_peer(*n, 0);
393
  if (!p) return 0;
394

    
395
  bmap = p->bmap;
396
  if (!bmap) return 0;
397
  latest = chunkID_set_get_latest(bmap);
398
  if (latest == INT_MIN) return 0;
399

    
400
  return -latest;
401
}
402

    
403
double getChunkTimestamp(int *cid){
404
  struct chunk *c = cb_get_chunk(cb, *cid);
405
  if (!c) return 0;
406

    
407
  return (double) c->timestamp;
408
}
409

    
410
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver, int trans_id){
411
  int i, d, cset_acc_size, res;
412

    
413
  cset_acc_size = chunkID_set_size(cset_acc);
414
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
415
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
416
    struct chunk *c;
417
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
418
    c = cb_get_chunk(cb, chunkid);
419
    if (c && needs(to->id, chunkid) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
420
      chunk_attributes_update_sending(c);
421
      res = sendChunk(to->id, c);
422
      if (res >= 0) {
423
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
424
        d++;
425
        reg_chunk_send(c->id);
426
        //___________________ENST__________________________________
427
        gettimeofday(&what_time, NULL);        
428
        time_now = what_time.tv_sec * 1000000ULL + what_time.tv_usec;        
429
        if(log_on){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld Result: %d\n", c->id, node_addr(to->id), time_now, res);}
430
        //________________________________________________________
431

    
432
      } else {
433
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
434
      }
435
    }
436
  }
437
}
438

    
439
int offer_peer_count()
440
{
441
  return offer_per_tick;
442
}
443

    
444
int offer_max_deliver(struct nodeID *n)
445
{
446
#ifdef MONL
447
  switch (get_hopcount(n)) {
448
    case 0: return 5;
449
    case 1: return 2;
450
    default: return 1;
451
  }
452
#else
453
  return 1;
454
#endif
455
}
456

    
457
void send_offer()
458
{
459
  struct chunk *buff;
460
  int size, res, i, n;
461
  struct peer *neighbours;
462
  struct peerset *pset;
463

    
464
  pset = get_peers();
465
  n = peerset_size(pset);
466
  neighbours = peerset_get_peers(pset);
467
  dprintf("Send Offer: %d neighbours\n", n);
468
  if (n == 0) return;
469
  buff = cb_get_chunks(cb, &size);
470
  if (size == 0) return;
471

    
472
  {
473
    size_t selectedpeers_len = offer_peer_count();
474
    int chunkids[size];
475
    struct nodeID *nodeids[n];
476
    struct nodeID *selectedpeers[selectedpeers_len];
477

    
478
    //reduce load a little bit if there are losses on the path from this guy
479
    double average_lossrate = get_average_lossrate_pset(pset);
480
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
481
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
482
      return;
483
    }
484

    
485
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
486
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
487
    selectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);        //select a peer that needs at least one of our chunks
488

    
489
    for (i=0; i<selectedpeers_len ; i++){
490
      int max_deliver = offer_max_deliver(selectedpeers[i]);
491
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
492
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]), nodeid_to_peer(selectedpeers[i],0)->cb_size);
493
      res = offerChunks(selectedpeers[i], my_bmap, max_deliver, transid++);
494
      chunkID_set_free(my_bmap);
495
    }
496
  }
497
}
498

    
499

    
500
void send_chunk()
501
{
502
  struct chunk *buff;
503
  int size, res, i, n;
504
  struct peer *neighbours;
505
  struct peerset *pset;
506
  //extern bool log_on;
507
        
508
  pset = get_peers();
509
  n = peerset_size(pset);
510
  neighbours = peerset_get_peers(pset);
511
  dprintf("Send Chunk: %d neighbours\n", n);
512
  if (n == 0) return;
513
  buff = cb_get_chunks(cb, &size);
514
  dprintf("\t %d chunks in buffer...\n", size);
515
  if (size == 0) return;
516

    
517
  /************ STUPID DUMB SCHEDULING ****************/
518
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
519
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
520
  /************ /STUPID DUMB SCHEDULING ****************/
521

    
522
  /************ USE SCHEDULER ****************/
523
  {
524
    size_t selectedpairs_len = 1;
525
    int chunkids[size];
526
    struct nodeID *nodeids[n];
527
    struct PeerChunk selectedpairs[1];
528
  
529
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
530
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
531
    schedSelectPeerFirst(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, needs, peerWeightRtt, getChunkTimestamp);
532
  /************ /USE SCHEDULER ****************/
533

    
534
    for (i=0; i<selectedpairs_len ; i++){
535
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
536
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
537
      dprintf("\t sending chunk[%d] to ", c->id);
538
      dprintf("%s\n", node_addr(p->id));
539

    
540
      send_bmap(p);
541
      //______________ENST_______________________
542
      gettimeofday(&what_time, NULL);                  
543
      time_now = what_time.tv_sec * 1000000ULL + what_time.tv_usec;
544
      if(log_on){
545
      fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %lld ", c->id, node_addr(p->id), time_now);
546
      }
547
      //________________________________________
548

    
549
      chunk_attributes_update_sending(c);
550
      res = sendChunk(p->id, c);
551
      if(log_on){fprintf(stderr, "Result: %d Size: %d bytes\n", res, c->size);} //ENST
552
      dprintf("\tResult: %d\n", res);
553
      if (res>=0) {
554
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
555
        reg_chunk_send(c->id);
556
      } else {
557
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
558
      }
559
    }
560
  }
561
}