Statistics
| Branch: | Revision:

pstreamer / src / streaming.c @ dc99f5c4

History | View | Annotate | Download (31.3 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 * Copyright (c) 2017 Luca Baldesi
5
 *
6
 * This file is part of PeerStreamer.
7
 *
8
 * PeerStreamer is free software: you can redistribute it and/or
9
 * modify it under the terms of the GNU Affero General Public License as
10
 * published by the Free Software Foundation, either version 3 of the
11
 * License, or (at your option) any later version.
12
 *
13
 * PeerStreamer is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
16
 * General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU Affero General Public License
19
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
20
 *
21
 */
22
#include <sys/time.h>
23
#include <stdlib.h>
24
#include <stdio.h>
25
#include <stdint.h>
26
#include <stdbool.h>
27
#include <math.h>
28
#include <assert.h>
29
#include <string.h>
30
#include <inttypes.h>
31

    
32
#include <net_helper.h>
33
#include <chunk.h> 
34
#include <chunkbuffer.h> 
35
#include <trade_msg_la.h>
36
#include <trade_msg_ha.h>
37
#include <peerset.h>
38
#include <peer.h>
39
#include <chunkidset.h>
40
#include <limits.h>
41
#include <trade_sig_ha.h>
42
#ifdef CHUNK_ATTRIB_CHUNKER
43
#include <chunkiser_attrib.h>
44
#endif
45

    
46
#include "streaming.h"
47
#include "output.h"
48
#include "input.h"
49
#include "dbg.h"
50
#include "chunk_signaling.h"
51
#include "chunklock.h"
52
#include "topology.h"
53
#include "measures.h"
54
#include "net_helpers.h"
55
#include "scheduling.h"
56
#include "transaction.h"
57
#include "peer_metadata.h"
58

    
59
#include "scheduler_la.h"
60
#include "grapes_config.h"
61

    
62
# define CB_SIZE_TIME_UNLIMITED 1e12
63

    
64
void selectPeersForChunks(SchedOrdering ordering, schedPeerID *peers, size_t peers_len, schedChunkID *chunks, size_t chunks_len, schedPeerID *selected, size_t *selected_len, filterFunction filter, peerEvaluateFunction evaluate);
65

    
66
struct chunk_attributes {
67
  uint64_t deadline;
68
  uint16_t deadline_increment;
69
  uint16_t hopcount;
70
} __attribute__((packed));
71

    
72
enum distribution_type {DIST_UNIFORM, DIST_TURBO};
73

    
74
struct streaming_context {
75
        struct chunk_buffer *cb;
76
        struct input_desc *input;
77
        struct chunk_locks * ch_locks;
78
        struct service_times_element * transactions;
79
        const struct psinstance * ps;
80
        int cb_size;
81
        int bcast_after_receive_every;
82
        bool neigh_on_chunk_recv;
83
        bool send_bmap_before_push;
84
        uint64_t CB_SIZE_TIME;
85
        uint32_t chunk_loss_interval;
86
        enum distribution_type dist_type;
87
        int offer_per_period;
88
};
89

    
90
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config)
91
{
92
        struct streaming_context * stc;
93
        struct tag * tags;
94
        static char conf[80];
95

    
96
        stc = malloc(sizeof(struct streaming_context));
97
        stc->ps = ps;
98
        stc->bcast_after_receive_every = 0;
99
        stc->neigh_on_chunk_recv = false;
100
        stc->send_bmap_before_push = false;
101
        stc->transactions = NULL;
102
        stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
103
        stc->chunk_loss_interval = 0;  // disable self-lossy feature (for experiments)
104
        stc->dist_type = DIST_UNIFORM;
105

    
106
        tags = grapes_config_parse(config);
107
        if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0)
108
                stc->dist_type = DIST_TURBO;
109
        grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1);
110
        free(tags);
111

    
112
        stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
113
        stc->cb_size = psinstance_chunkbuffer_size(ps);
114
        sprintf(conf, "size=%d", stc->cb_size);
115
        stc->cb = cb_init(conf);
116
        stc->ch_locks = chunk_locks_create();
117

    
118
        chunkDeliveryInit(psinstance_nodeid(ps));
119
        chunkSignalingInit(psinstance_nodeid(ps));
120
        return stc;
121
}
122

    
123
void streaming_destroy(struct streaming_context ** stc)
124
{
125
        if (stc && *stc)
126
        {
127
                if((*stc)->input)
128
                        input_close((*stc)->input);
129
                if(((*stc)->ch_locks))
130
                        chunk_locks_destroy(&((*stc)->ch_locks));
131
                if(((*stc)->transactions))
132
                        transactions_destroy((*stc)->transactions);
133
                if(((*stc)->cb))
134
                        cb_destroy((*stc)->cb);
135
                free((*stc));
136
        }
137
}
138

    
139
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid);
140

    
141
uint64_t gettimeofday_in_us(void)
142
{
143
  struct timeval what_time; //to store the epoch time
144

    
145
  gettimeofday(&what_time, NULL);
146
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
147
}
148

    
149
void cb_print(const struct streaming_context * stc)
150
{
151
#ifdef DEBUG
152
  struct chunk *chunks;
153
  int num_chunks, i, id;
154
  chunks = cb_get_chunks(stc->cb, &num_chunks);
155

    
156
  dprintf("\tchbuf :");
157
  i = 0;
158
  if(num_chunks) {
159
    id = chunks[0].id;
160
    dprintf(" %d-> ",id);
161
    while (i < num_chunks) {
162
      if (id == chunks[i].id) {
163
        dprintf("%d",id % 10);
164
        i++;
165
      } else if (chunk_islocked(stc->ch_locks, id)) {
166
        dprintf("*");
167
      } else {
168
        dprintf(".");
169
      }
170
      id++;
171
    }
172
  }
173
  dprintf("\n");
174
#endif
175
}
176

    
177
void chunk_attributes_fill(struct chunk* c)
178
{
179
  struct chunk_attributes * ca;
180
  int priority = 1;
181

    
182
  assert((!c->attributes && c->attributes_size == 0)
183
#ifdef CHUNK_ATTRIB_CHUNKER
184
      || chunk_attributes_chunker_verify(c->attributes, c->attributes_size)
185
#endif
186
  );
187

    
188
#ifdef CHUNK_ATTRIB_CHUNKER
189
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
190
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
191
    free(c->attributes);
192
    c->attributes = NULL;
193
    c->attributes_size = 0;
194
  }
195
#endif
196

    
197
  c->attributes_size = sizeof(struct chunk_attributes);
198
  c->attributes = ca = malloc(c->attributes_size);
199

    
200
  ca->deadline = c->id;
201
  ca->deadline_increment = priority * 2;
202
  ca->hopcount = 0;
203
}
204

    
205
int chunk_get_hopcount(const struct chunk* c) {
206
  struct chunk_attributes * ca;
207

    
208
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
209
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
210
    return -1;
211
  }
212

    
213
  ca = (struct chunk_attributes *) c->attributes;
214
  return ca->hopcount;
215
}
216

    
217
void chunk_attributes_update_received(struct chunk* c)
218
{
219
  struct chunk_attributes * ca;
220

    
221
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
222
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
223
    return;
224
  }
225

    
226
  ca = (struct chunk_attributes *) c->attributes;
227
  ca->hopcount++;
228
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
229
}
230

    
231
void chunk_attributes_update_sending(const struct chunk* c)
232
{
233
  struct chunk_attributes * ca;
234

    
235
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
236
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
237
    return;
238
  }
239

    
240
  ca = (struct chunk_attributes *) c->attributes;
241
  ca->deadline += ca->deadline_increment;
242
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
243
}
244

    
245
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
246
{
247
  struct chunk *chunks;
248
  int num_chunks, i;
249
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
250
  chunks = cb_get_chunks(chbuf, &num_chunks);
251

    
252
  for(i=num_chunks-1; i>=0; i--) {
253
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
254
  }
255
  return my_bmap;
256
}
257

    
258
// a simple implementation that request everything that we miss ... up to max deliver
259
struct chunkID_set *get_chunks_to_accept(const struct streaming_context * stc, struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
260
  struct chunkID_set *cset_acc, *my_bmap;
261
  int i, d, cset_off_size;
262
  //double lossrate;
263
  struct peer *from = nodeid_to_peer(psinstance_topology(stc->ps), fromid, 0);
264

    
265
  cset_acc = chunkID_set_init("size=0");
266

    
267
  //reduce load a little bit if there are losses on the path from this guy
268
  //lossrate = get_lossrate_receive(from->id);
269
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
270
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
271
    my_bmap = cb_to_bmap(stc->cb);
272
    cset_off_size = chunkID_set_size(cset_off);
273
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
274
      int chunkid = chunkID_set_get_chunk(cset_off, i);
275
      //dprintf("\tdo I need c%d ? :",chunkid);
276
      if (!chunk_islocked(stc->ch_locks, chunkid) && _needs(stc, my_bmap, stc->cb_size, chunkid)) {
277
        chunkID_set_add_chunk(cset_acc, chunkid);
278
        chunk_lock(stc->ch_locks, chunkid,from);
279
        dtprintf("accepting %d from %s", chunkid, nodeid_static_str(fromid));
280
#ifdef MONL
281
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
282
#endif
283
        dprintf("\n");
284
        d++;
285
      }
286
    }
287
    chunkID_set_free(my_bmap);
288
  //} else {
289
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
290
  //}
291

    
292
  reg_offer_accept_in(psinstance_measures(stc->ps), chunkID_set_size(cset_acc) > 0 ? 1 : 0);
293

    
294
  return cset_acc;
295
}
296

    
297
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid)
298
{
299
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
300
   sendBufferMap(toid,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
301
#ifdef LOG_SIGNAL
302
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
303
#endif
304
  chunkID_set_free(my_bmap);
305
}
306

    
307
void bcast_bmap(const struct streaming_context * stc)
308
{
309
  int i, n;
310
  struct peer **neighbours;
311
  struct peerset *pset;
312
  struct chunkID_set *my_bmap;
313

    
314
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
315
  n = peerset_size(pset);
316
  neighbours = peerset_get_peers(pset);
317

    
318
  my_bmap = cb_to_bmap(stc->cb);        //cache our bmap for faster processing
319
  for (i = 0; i<n; i++) {
320
    sendBufferMap(neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
321
#ifdef LOG_SIGNAL
322
                 log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
323
#endif
324
  }
325
  chunkID_set_free(my_bmap);
326
}
327

    
328
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id)
329
{
330
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
331
  sendAck(toid, my_bmap,trans_id);
332
#ifdef LOG_SIGNAL
333
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
334
#endif
335
  chunkID_set_free(my_bmap);
336
}
337

    
338
double get_average_lossrate_pset(struct peerset *pset)
339
{
340
  return 0;
341
}
342

    
343
void ack_chunk(const struct streaming_context * stc, struct chunk *c, struct nodeID *from, uint16_t trans_id)
344
{
345
  //reduce load a little bit if there are losses on the path from this guy
346
  double average_lossrate = get_average_lossrate_pset(topology_get_neighbours(psinstance_topology(stc->ps)));
347
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
348
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
349
    return;
350
  }
351
  send_ack(stc, from, trans_id);        //send explicit ack
352
}
353

    
354
void received_chunk(struct streaming_context * stc, struct nodeID *from, const uint8_t *buff, int len)
355
{
356
  int res;
357
  static struct chunk c;
358
  struct peer *p;
359
  static int bcast_cnt;
360
  uint16_t transid;
361

    
362
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
363
  if (res > 0) {
364
                if (stc->chunk_loss_interval && c.id % stc->chunk_loss_interval == 0) {
365
                        fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
366
                        free(c.data);
367
                        free(c.attributes);
368
                        return;
369
                }
370
    chunk_attributes_update_received(&c);
371
    chunk_unlock(stc->ch_locks, c.id);
372
    dprintf("Received chunk %d from peer: %s\n", c.id, nodeid_static_str(from));
373
#ifdef LOG_CHUNK
374
    log_chunk(from,psinstance_nodeid(stc->ps),&c,"RECEIVED");
375
#endif
376
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
377
    output_deliver(psinstance_output(stc->ps), &c);
378
    res = cb_add_chunk(stc->cb, &c);
379
    reg_chunk_receive(psinstance_measures(stc->ps),c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
380
    cb_print(stc);
381
    if (res < 0) {
382
      dprintf("\tchunk too old, buffer full with newer chunks\n");
383
#ifdef LOG_CHUNK
384
      log_chunk_error(from,psinstance_nodeid(stc->ps),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
385
#endif
386
      free(c.data);
387
      free(c.attributes);
388
    }
389
    p = nodeid_to_peer(psinstance_topology(stc->ps), from, stc->neigh_on_chunk_recv);
390
    if (p) {        //now we have it almost sure
391
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
392
      gettimeofday(&p->bmap_timestamp, NULL);
393
    }
394
    ack_chunk(stc, &c, from, transid);        //send explicit ack
395
    if (stc->bcast_after_receive_every && bcast_cnt % stc->bcast_after_receive_every == 0) {
396
       bcast_bmap(stc);
397
    }
398
  } else {
399
    fprintf(stderr,"\tError: can't decode chunk!\n");
400
  }
401
}
402

    
403
struct chunk *generated_chunk(struct streaming_context * stc, suseconds_t *delta)
404
{
405
  struct chunk *c;
406

    
407
  c = malloc(sizeof(struct chunk));
408
  if (!c) {
409
    fprintf(stderr, "Memory allocation error!\n");
410
    return NULL;
411
  }
412
  memset(c, 0, sizeof(struct chunk));
413

    
414
  *delta = (suseconds_t)input_get(stc->input, c);
415
  if (*delta < 0) {
416
    fprintf(stderr, "Error in input!\n");
417
    exit(-1);
418
  }
419
  if (c->data == NULL) {
420
    free(c);
421
    return NULL;
422
  }
423
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
424
  chunk_attributes_fill(c);
425
  return c;
426
}
427

    
428
int add_chunk(struct streaming_context * stc, struct chunk *c)
429
{
430
  int res;
431

    
432
  if (stc && c && stc->cb)
433
  {
434
          res = cb_add_chunk(stc->cb, c);
435
          if (res < 0) {
436
            free(c->data);
437
            free(c->attributes);
438
            free(c);
439
            return 0;
440
          }
441
         // free(c);
442
          return 1;
443
  }
444
  return 0;
445
}
446

    
447
uint64_t get_chunk_timestamp(const struct streaming_context * stc, int cid){
448
  const struct chunk *c = cb_get_chunk(stc->cb, cid);
449
  if (!c) return 0;
450

    
451
  return c->timestamp;
452
}
453

    
454
void print_chunkID_set(struct chunkID_set *cset)
455
{
456
        uint32_t * ptr = (uint32_t *) cset;
457
        uint32_t n_elements,i;
458
        int * data = (int *) &(ptr[3]);
459
        fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]);
460
        fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]);
461
        n_elements = ptr[2];
462
        fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
463
        fprintf(stderr,"[DEBUG] Chunk ID elements: [");
464
        for (i=0;i<n_elements;i++)
465
                fprintf(stderr,".%d.",data[i]);
466

    
467
        fprintf(stderr,"]\n");
468
}
469

    
470
/**
471
 *example function to filter chunks based on whether a given peer needs them.
472
 *
473
 * Looks at buffermap information received about the given peer.
474
 */
475
int needs_old(const struct streaming_context * stc, struct peer *n, int cid){
476
  struct peer * p = n;
477

    
478
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
479
    uint64_t ts;
480
    ts = get_chunk_timestamp(stc, cid);
481
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
482
      return 0;
483
    }
484
  }
485

    
486
  //dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
487
  if (! p->bmap) { // this will never happen since the pset module initializes bmap
488
    //dprintf("no bmap\n");
489
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
490
  }
491
        
492
//        fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
493
//        print_chunkID_set(p->bmap);                                                                                                                                                                        // DEBUG
494

    
495
  return _needs(stc, p->bmap, peer_cb_size(p), cid);
496
}
497

    
498
/**
499
 * Function checking if chunkID_set cset may need chunk with id cid
500
 * @cset: target cset
501
 * @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
502
 *         the maximum capacity in of the receiving peer (so it's 0 for the source)
503
 * @cid: target chunk identifier
504
 */
505
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid){
506

    
507
  if (cb_size == 0) { //if it declared it does not needs chunks
508
    return 0;
509
  }
510

    
511
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
512
    uint64_t ts;
513
    ts = get_chunk_timestamp(stc, cid);
514
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
515
      return 0;
516
    }
517
  }
518

    
519
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
520
    int missing, min;
521
    //@TODO: add some bmap_timestamp based logic
522

    
523
    if (chunkID_set_size(cset) == 0) {
524
      //dprintf("bmap empty\n");
525
      return 1;        // if the bmap seems empty, it needs the chunk
526
    }
527
    missing = stc->cb_size - chunkID_set_size(cset);
528
    missing = missing < 0 ? 0 : missing;
529
    min = chunkID_set_get_earliest(cset);
530
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
531
    return (cid >= min - missing);
532
  }
533

    
534
  //dprintf("has it\n");
535
  return 0;
536
}
537

    
538
int needs(struct peer *p, int cid)
539
{
540
        int min;
541

    
542
        if (peer_cb_size(p) == 0) // it does not have capacity
543
                return 0;
544
        if (chunkID_set_check(p->bmap,cid) < 0)  // not in bmap
545
        {
546
                if(peer_cb_size(p) > chunkID_set_size(p->bmap)) // it has room for chunks anyway
547
                {
548
                        min = chunkID_set_get_earliest(p->bmap) - peer_cb_size(p) + chunkID_set_size(p->bmap);
549
                        min = min < 0 ? 0 : min;
550
                        if (cid >= min)
551
                                return 1;
552
                }
553
                if((int)chunkID_set_get_earliest(p->bmap) < cid)  // our is reasonably new
554
                        return 1;
555
        }
556
        return 0;
557
}
558

    
559
double peerWeightReceivedfrom(struct peer **n){
560
  struct peer * p = *n;
561
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
562
}
563

    
564
double peerWeightUniform(struct peer **n){
565
  return 1;
566
}
567

    
568
double peerWeightNeigh(struct peer **n){
569
        double res;
570
        res = (double)(((struct metadata *)(*n)->metadata)->neigh_size);
571
        if (res)
572
                return res;
573
        return 1;
574
}
575

    
576
double peerWeightInvNeigh(struct peer **n){
577
        double res;
578
        res = peerWeightNeigh(n);
579
        return 1/res;
580
}
581

    
582
double peerWeightLoss(struct peer **n){
583
  return 1;
584
}
585

    
586
//double peerWeightRtt(struct peer **n){
587
//#ifdef MONL
588
//  double rtt = get_rtt((*n)->id);
589
//  //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
590
//  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
591
//#else
592
//  return 1;
593
//#endif
594
//}
595

    
596
//ordering function for ELp peer selection, chunk ID based
597
//can't be used as weight
598
//double peerScoreELpID(struct nodeID **n){
599
//  struct chunkID_set *bmap;
600
//  int latest;
601
//  struct peer * p = nodeid_to_peer(*n, 0);
602
//  if (!p) return 0;
603
//
604
//  bmap = p->bmap;
605
//  if (!bmap) return 0;
606
//  latest = chunkID_set_get_latest(bmap);
607
//  if (latest == INT_MIN) return 0;
608
//
609
//  return -latest;
610
//}
611

    
612
double chunkScoreChunkID(int *cid){
613
  return (double) *cid;
614
}
615

    
616
uint64_t get_chunk_deadline(const struct streaming_context * stc, int cid){
617
  const struct chunk_attributes * ca;
618
  const struct chunk *c;
619

    
620
  c = cb_get_chunk(stc->cb, cid);
621
  if (!c) return 0;
622

    
623
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
624
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
625
    return 0;
626
  }
627

    
628
  ca = (struct chunk_attributes *) c->attributes;
629
  return ca->deadline;
630
}
631

    
632
//double chunkScoreDL(int *cid){
633
//  return - (double)get_chunk_deadline(*cid);
634
//}
635

    
636
//double chunkScoreTimestamp(int *cid){
637
//  return (double) get_chunk_timestamp(*cid);
638
//}
639

    
640
void send_accepted_chunks(const struct streaming_context * stc, struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
641
  int i, d, cset_acc_size, res;
642
  struct peer *to = nodeid_to_peer(psinstance_topology(stc->ps), toid, 0);
643

    
644
  transaction_reg_accept(stc->transactions, trans_id, toid);
645

    
646
  cset_acc_size = chunkID_set_size(cset_acc);
647
  reg_offer_accept_out(psinstance_measures(stc->ps),cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
648
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
649
    const struct chunk *c;
650
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
651
    c = cb_get_chunk(stc->cb, chunkid);
652
    if (!c) {        // we should have the chunk
653
      dprintf("%s asked for chunk %d we do not own anymore\n", nodeid_static_str(toid), chunkid);
654
      continue;
655
    }
656
    if (!to || needs(to, chunkid)) {        //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
657
      chunk_attributes_update_sending(c);
658
      res = sendChunk(toid, c, trans_id);
659
      if (res >= 0) {
660
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
661
        d++;
662
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
663
#ifdef LOG_CHUNK
664
              log_chunk(psinstance_nodeid(stc->ps), toid,c, "SENT_ACCEPTED");
665
#endif
666
        //{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
667
      } else {
668
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
669
      }
670
    }
671
  }
672
}
673

    
674
int offer_peer_count(const struct streaming_context * stc)
675
{
676
  return psinstance_num_offers(stc->ps);
677
}
678

    
679
int offer_max_deliver(const struct streaming_context * stc, const struct nodeID *n)
680
{
681
  return psinstance_chunks_per_offer(stc->ps);
682
}
683

    
684
////get the rtt. Currenly only MONL version is supported
685
//static double get_rtt_of(struct nodeID* n){
686
//#ifdef MONL
687
//  return get_rtt(n);
688
//#else
689
//  return NAN;
690
//#endif
691
//}
692

    
693
#define DEFAULT_RTT_ESTIMATE 0.5
694

    
695
struct chunkID_set *compose_offer_cset(const struct streaming_context * stc, const struct peer *p)
696
{
697
  int num_chunks, j;
698
  uint64_t smallest_ts; //, largest_ts;
699
  double dt;
700
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
701
  struct chunk *chunks = cb_get_chunks(stc->cb, &num_chunks);
702

    
703
  dt = DEFAULT_RTT_ESTIMATE;
704
  dt *= 1e6;        //convert to usec
705

    
706
  smallest_ts = chunks[0].timestamp;
707
//  largest_ts = chunks[num_chunks-1].timestamp;
708

    
709
  //add chunks in latest...earliest order
710
  if (psinstance_is_source(stc->ps)) {
711
    j = (num_chunks-1) * 3/4;        //do not send offers for the latest chunks from the source
712
  } else {
713
    j = num_chunks-1;
714
  }
715
  for(; j>=0; j--) {
716
    if (chunks[j].timestamp > smallest_ts + dt)
717
    chunkID_set_add_chunk(my_bmap, chunks[j].id);
718
  }
719

    
720
  return my_bmap;
721
}
722

    
723

    
724
void send_offer(const struct streaming_context * stc)
725
{
726
  struct chunk *buff;
727
  size_t size=0,  i, n;
728
  struct peer **neighbours;
729
  struct peerset *pset;
730

    
731
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
732
  n = peerset_size(pset);
733
  neighbours = peerset_get_peers(pset);
734
  // dprintf("Send Offer: %lu neighbours\n", n);
735
  if (n == 0) return;
736
  buff = cb_get_chunks(stc->cb, (int *)&size);
737
  if (size == 0) return;
738

    
739
    size_t selectedpeers_len = offer_peer_count(stc);
740
    int chunkids[size];
741
    struct peer *nodeids[n];
742
    struct peer *selectedpeers[selectedpeers_len];
743

    
744
    //reduce load a little bit if there are losses on the path from this guy
745
    double average_lossrate = get_average_lossrate_pset(pset);
746
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
747
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
748
      return;
749
    }
750

    
751
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
752
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
753
    if (stc->dist_type == DIST_TURBO)
754
            selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh);
755
    else
756
            selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform);
757

    
758
    for (i=0; i<selectedpeers_len ; i++){
759
      int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
760
      int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
761
      struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
762
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), peer_cb_size(selectedpeers[i]));
763
      offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
764
#ifdef LOG_SIGNAL
765
                        log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
766
#endif
767
      chunkID_set_free(offer_cset);
768
    }
769
}
770

    
771
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error)
772
{
773
        switch (error) {
774
                case E_CB_OLD:
775
                log_chunk(from,to,c,"TOO_OLD");
776
                break;
777
                case E_CB_DUPLICATE:
778
                log_chunk(from,to,c,"DUPLICATED");
779
                break;
780
                default:
781
                log_chunk(from,to,c,"ERROR");
782
        } 
783
}
784

    
785
void log_neighbourhood(const struct streaming_context * stc)
786
{
787
  struct peerset * pset;
788
  const struct peer * p;
789
  int psetsize,i;
790
  uint64_t now;
791
  char me[NODE_STR_LENGTH];
792

    
793
  node_addr(psinstance_nodeid(stc->ps), me, NODE_STR_LENGTH);
794
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
795
  psetsize = peerset_size(pset);
796
  now = gettimeofday_in_us();
797
  peerset_for_each(pset,p,i)
798
    fprintf(stderr,"[NEIGHBOURHOOD],%"PRIu64",%s,%s,%d\n",now,me,nodeid_static_str(p->id),psetsize);
799

    
800
}
801

    
802
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note)
803
{
804
        // semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
805
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
806
        node_addr(from,sndr,NODE_STR_LENGTH);
807
        node_addr(to,rcvr,NODE_STR_LENGTH);
808

    
809
        fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note);
810
}
811

    
812
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag)
813
{
814
        char typestr[24];
815
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
816
        node_addr(fromid,sndr,NODE_STR_LENGTH);
817
        node_addr(toid,rcvr,NODE_STR_LENGTH);
818

    
819
        switch (type)
820
        {
821
                case sig_offer:
822
                        sprintf(typestr,"%s","OFFER_SIG");
823
                        break;
824
                case sig_accept:
825
                        sprintf(typestr,"%s","ACCEPT_SIG");
826
                        break;
827
                case sig_request:
828
                        sprintf(typestr,"%s","REQUEST_SIG");
829
                        break;
830
                case sig_deliver:
831
                        sprintf(typestr,"%s","DELIVER_SIG");
832
                        break;
833
                case sig_send_buffermap:
834
                        sprintf(typestr,"%s","SEND_BMAP_SIG");
835
                        break;
836
                case sig_request_buffermap:
837
                        sprintf(typestr,"%s","REQUEST_BMAP_SIG");
838
                        break;
839
                case sig_ack:
840
                        sprintf(typestr,"%s","CHUNK_ACK_SIG");
841
                        break;
842
                default:
843
                        sprintf(typestr,"%s","UNKNOWN_SIG");
844

    
845
        }
846
        fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%s,%s\n",sndr,rcvr,trans_id,typestr,flag);
847
}
848

    
849
int peer_chunk_dispatch(const struct streaming_context * stc, const struct PeerChunk  *pairs,const size_t num_pairs)
850
{
851
        int transid, res,success = 0;
852
        size_t i;
853
        const struct peer * target_peer;
854
        const struct chunk * target_chunk;
855

    
856
        for (i=0; i<num_pairs ; i++){
857
                target_peer = pairs[i].peer;
858
                target_chunk = cb_get_chunk(stc->cb, pairs[i].chunk);
859

    
860
                if (stc->send_bmap_before_push) {
861
                        send_bmap(stc, target_peer->id);
862
                }
863
                chunk_attributes_update_sending(target_chunk);
864
                transid = transaction_create(stc->transactions, target_peer->id);
865
                res = sendChunk(target_peer->id, target_chunk, transid);        //we use transactions in order to register acks for push
866
                if (res>=0) {
867
#ifdef LOG_CHUNK
868
                        log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
869
#endif
870
//                        chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
871
                        reg_chunk_send(psinstance_measures(stc->ps),target_chunk->id);
872
                        success++;
873
                } else {
874
                        fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
875
                }
876
        }
877
        return success;
878

    
879
}
880

    
881
int inject_chunk(const struct streaming_context * stc, const struct chunk * target_chunk,const int multiplicity)
882
/*injects a specific chunk in the overlay and return the number of injected copies*/
883
{
884
        struct peerset *pset;
885
        struct peer ** peers, ** dst_peers;
886
        int peers_num;
887
        double (* peer_evaluation) (struct peer **n);
888
        size_t i, selectedpairs_len = multiplicity;
889
        struct PeerChunk  * selectedpairs;
890

    
891
        pset = topology_get_neighbours(psinstance_topology(stc->ps));
892
        peers_num = peerset_size(pset);
893
        peers = peerset_get_peers(pset);
894
        peer_evaluation = SCHED_PEER;
895

    
896
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
897
         dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
898
        if (stc->dist_type == DIST_TURBO)
899
                selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh);
900
        else
901
                selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
902

    
903
        selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
904
        for ( i=0; i<selectedpairs_len; i++)
905
        {
906
                selectedpairs[i].peer = dst_peers[i];
907
                selectedpairs[i].chunk = target_chunk->id;
908
        }
909

    
910
        peer_chunk_dispatch(stc, selectedpairs,selectedpairs_len);
911

    
912
        free(selectedpairs);
913
        free(dst_peers);
914
        return selectedpairs_len;
915
}
916

    
917
void send_chunk(const struct streaming_context * stc)
918
{
919
  struct chunk *buff;
920
  int size, res, i, n;
921
  struct peer **neighbours;
922
  struct peerset *pset;
923

    
924
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
925
  n = peerset_size(pset);
926
  neighbours = peerset_get_peers(pset);
927
  dprintf("Send Chunk: %d neighbours\n", n);
928
  if (n == 0) return;
929
  buff = cb_get_chunks(stc->cb, &size);
930
  dprintf("\t %d chunks in buffer...\n", size);
931
  if (size == 0) return;
932

    
933
  /************ STUPID DUMB SCHEDULING ****************/
934
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
935
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
936
  /************ /STUPID DUMB SCHEDULING ****************/
937

    
938
  /************ USE SCHEDULER ****************/
939
  {
940
    size_t selectedpairs_len = 1;
941
    int chunkids[size];
942
                int transid;
943
    struct peer *nodeids[n];
944
    struct PeerChunk selectedpairs[1];
945
  
946
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
947
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
948
            SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
949
  /************ /USE SCHEDULER ****************/
950

    
951
    for (i=0; i<(int)selectedpairs_len ; i++){
952
      struct peer *p = selectedpairs[i].peer;
953
      const struct chunk *c = cb_get_chunk(stc->cb, selectedpairs[i].chunk);
954
      dprintf("\t sending chunk[%d] to ", c->id);
955
      dprintf("%s\n", nodeid_static_str(p->id));
956

    
957
      if (stc->send_bmap_before_push) {
958
        send_bmap(stc, p->id);
959
      }
960

    
961
      chunk_attributes_update_sending(c);
962
      transid = transaction_create(stc->transactions, p->id);
963
      res = sendChunk(p->id, c, transid);        //we use transactions in order to register acks for push
964
//      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
965
      dprintf("\tResult: %d\n", res);
966
      if (res>=0) {
967
#ifdef LOG_CHUNK
968
              log_chunk(psinstance_nodeid(stc->ps),p->id,c,"SENT");
969
#endif
970
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
971
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
972
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
973
      } else {
974
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
975
      }
976
    }
977
  }
978
}
979

    
980
suseconds_t streaming_offer_interval(const struct streaming_context *stc)
981
{
982
        struct peerset *pset;
983
        const struct peer * p;
984
        int i;
985
        double load = 0;
986
        suseconds_t offer_int;
987

    
988
        offer_int = chunk_interval_measure(psinstance_measures(stc->ps));
989
        
990
        if (stc->dist_type == DIST_TURBO) {
991
                pset = topology_get_neighbours(psinstance_topology(stc->ps));
992
                peerset_for_each(pset,p,i)
993
                        load += peerWeightInvNeigh((struct peer **)&p);
994
                offer_int /= load;
995
        }
996

    
997
        return offer_int / stc->offer_per_period;
998
}