Statistics
| Branch: | Revision:

pstreamer / src / streaming.c @ fe735c05

History | View | Annotate | Download (29.8 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 * Copyright (c) 2017 Luca Baldesi
5
 *
6
 * This file is part of PeerStreamer.
7
 *
8
 * PeerStreamer is free software: you can redistribute it and/or
9
 * modify it under the terms of the GNU Affero General Public License as
10
 * published by the Free Software Foundation, either version 3 of the
11
 * License, or (at your option) any later version.
12
 *
13
 * PeerStreamer is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
16
 * General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU Affero General Public License
19
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
20
 *
21
 */
22
#include <sys/time.h>
23
#include <stdlib.h>
24
#include <stdio.h>
25
#include <stdint.h>
26
#include <stdbool.h>
27
#include <math.h>
28
#include <assert.h>
29
#include <string.h>
30
#include <inttypes.h>
31

    
32
#include <net_helper.h>
33
#include <chunk.h> 
34
#include <chunkbuffer.h> 
35
#include <trade_msg_la.h>
36
#include <trade_msg_ha.h>
37
#include <peerset.h>
38
#include <peer.h>
39
#include <chunkidset.h>
40
#include <limits.h>
41
#include <trade_sig_ha.h>
42
#ifdef CHUNK_ATTRIB_CHUNKER
43
#include <chunkiser_attrib.h>
44
#endif
45

    
46
#include "streaming.h"
47
#include "output.h"
48
#include "input.h"
49
#include "dbg.h"
50
#include "chunk_signaling.h"
51
#include "chunklock.h"
52
#include "topology.h"
53
#include "measures.h"
54
#include "net_helpers.h"
55
#include "scheduling.h"
56
#include "transaction.h"
57

    
58
#include "scheduler_la.h"
59

    
60
# define CB_SIZE_TIME_UNLIMITED 1e12
61

    
62
void selectPeersForChunks(SchedOrdering ordering, schedPeerID *peers, size_t peers_len, schedChunkID *chunks, size_t chunks_len, schedPeerID *selected, size_t *selected_len, filterFunction filter, peerEvaluateFunction evaluate);
63

    
64
struct chunk_attributes {
65
  uint64_t deadline;
66
  uint16_t deadline_increment;
67
  uint16_t hopcount;
68
} __attribute__((packed));
69

    
70
struct streaming_context {
71
        struct chunk_buffer *cb;
72
        struct input_desc *input;
73
        struct chunk_locks * ch_locks;
74
        struct service_times_element * transactions;
75
        const struct psinstance * ps;
76
        int cb_size;
77
        int bcast_after_receive_every;
78
        bool neigh_on_chunk_recv;
79
        bool send_bmap_before_push;
80
        uint64_t CB_SIZE_TIME;
81
        uint32_t chunk_loss_interval;
82
};
83

    
84
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config)
85
{
86
        struct streaming_context * stc;
87
        static char conf[80];
88

    
89
        stc = malloc(sizeof(struct streaming_context));
90
        stc->bcast_after_receive_every = 0;
91
        stc->neigh_on_chunk_recv = false;
92
        stc->send_bmap_before_push = false;
93
        stc->transactions = NULL;
94
        stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
95
        stc->chunk_loss_interval = 0;  // disable self-lossy feature (for experiments)
96

    
97
        stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
98

    
99
        stc->ps = ps;
100
        stc->cb_size = psinstance_chunkbuffer_size(ps);
101
        sprintf(conf, "size=%d", stc->cb_size);
102
        stc->cb = cb_init(conf);
103
        chunkDeliveryInit(psinstance_nodeid(ps));
104
        chunkSignalingInit(psinstance_nodeid(ps));
105
        stc->ch_locks = chunk_locks_create();
106
        return stc;
107
}
108

    
109
void streaming_destroy(struct streaming_context ** stc)
110
{
111
        if (stc && *stc)
112
        {
113
                if((*stc)->input)
114
                        input_close((*stc)->input);
115
                if(((*stc)->ch_locks))
116
                        chunk_locks_destroy(&((*stc)->ch_locks));
117
                if(((*stc)->transactions))
118
                        transaction_destroy(&((*stc)->transactions));
119
                if(((*stc)->cb))
120
                        cb_destroy((*stc)->cb);
121
                free((*stc));
122
        }
123
}
124

    
125
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid);
126

    
127
uint64_t gettimeofday_in_us(void)
128
{
129
  struct timeval what_time; //to store the epoch time
130

    
131
  gettimeofday(&what_time, NULL);
132
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
133
}
134

    
135
void cb_print(const struct streaming_context * stc)
136
{
137
#ifdef DEBUG
138
  struct chunk *chunks;
139
  int num_chunks, i, id;
140
  chunks = cb_get_chunks(stc->cb, &num_chunks);
141

    
142
  dprintf("\tchbuf :");
143
  i = 0;
144
  if(num_chunks) {
145
    id = chunks[0].id;
146
    dprintf(" %d-> ",id);
147
    while (i < num_chunks) {
148
      if (id == chunks[i].id) {
149
        dprintf("%d",id % 10);
150
        i++;
151
      } else if (chunk_islocked(stc->ch_locks, id)) {
152
        dprintf("*");
153
      } else {
154
        dprintf(".");
155
      }
156
      id++;
157
    }
158
  }
159
  dprintf("\n");
160
#endif
161
}
162

    
163
void chunk_attributes_fill(struct chunk* c)
164
{
165
  struct chunk_attributes * ca;
166
  int priority = 1;
167

    
168
  assert((!c->attributes && c->attributes_size == 0)
169
#ifdef CHUNK_ATTRIB_CHUNKER
170
      || chunk_attributes_chunker_verify(c->attributes, c->attributes_size)
171
#endif
172
  );
173

    
174
#ifdef CHUNK_ATTRIB_CHUNKER
175
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
176
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
177
    free(c->attributes);
178
    c->attributes = NULL;
179
    c->attributes_size = 0;
180
  }
181
#endif
182

    
183
  c->attributes_size = sizeof(struct chunk_attributes);
184
  c->attributes = ca = malloc(c->attributes_size);
185

    
186
  ca->deadline = c->id;
187
  ca->deadline_increment = priority * 2;
188
  ca->hopcount = 0;
189
}
190

    
191
int chunk_get_hopcount(const struct chunk* c) {
192
  struct chunk_attributes * ca;
193

    
194
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
195
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
196
    return -1;
197
  }
198

    
199
  ca = (struct chunk_attributes *) c->attributes;
200
  return ca->hopcount;
201
}
202

    
203
void chunk_attributes_update_received(struct chunk* c)
204
{
205
  struct chunk_attributes * ca;
206

    
207
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
208
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
209
    return;
210
  }
211

    
212
  ca = (struct chunk_attributes *) c->attributes;
213
  ca->hopcount++;
214
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
215
}
216

    
217
void chunk_attributes_update_sending(const struct chunk* c)
218
{
219
  struct chunk_attributes * ca;
220

    
221
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
222
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
223
    return;
224
  }
225

    
226
  ca = (struct chunk_attributes *) c->attributes;
227
  ca->deadline += ca->deadline_increment;
228
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
229
}
230

    
231
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
232
{
233
  struct chunk *chunks;
234
  int num_chunks, i;
235
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
236
  chunks = cb_get_chunks(chbuf, &num_chunks);
237

    
238
  for(i=num_chunks-1; i>=0; i--) {
239
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
240
  }
241
  return my_bmap;
242
}
243

    
244
// a simple implementation that request everything that we miss ... up to max deliver
245
struct chunkID_set *get_chunks_to_accept(const struct streaming_context * stc, struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
246
  struct chunkID_set *cset_acc, *my_bmap;
247
  int i, d, cset_off_size;
248
  //double lossrate;
249
  struct peer *from = nodeid_to_peer(psinstance_topology(stc->ps), fromid, 0);
250

    
251
  cset_acc = chunkID_set_init("size=0");
252

    
253
  //reduce load a little bit if there are losses on the path from this guy
254
  //lossrate = get_lossrate_receive(from->id);
255
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
256
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
257
    my_bmap = cb_to_bmap(stc->cb);
258
    cset_off_size = chunkID_set_size(cset_off);
259
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
260
      int chunkid = chunkID_set_get_chunk(cset_off, i);
261
      //dprintf("\tdo I need c%d ? :",chunkid);
262
      if (!chunk_islocked(stc->ch_locks, chunkid) && _needs(stc, my_bmap, stc->cb_size, chunkid)) {
263
        chunkID_set_add_chunk(cset_acc, chunkid);
264
        chunk_lock(stc->ch_locks, chunkid,from);
265
        dtprintf("accepting %d from %s", chunkid, nodeid_static_str(fromid));
266
#ifdef MONL
267
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
268
#endif
269
        dprintf("\n");
270
        d++;
271
      }
272
    }
273
    chunkID_set_free(my_bmap);
274
  //} else {
275
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
276
  //}
277

    
278
  reg_offer_accept_in(psinstance_measures(stc->ps), chunkID_set_size(cset_acc) > 0 ? 1 : 0);
279

    
280
  return cset_acc;
281
}
282

    
283
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid)
284
{
285
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
286
   sendBufferMap(psinstance_nodeid(stc->ps), toid, NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
287
#ifdef LOG_SIGNAL
288
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
289
#endif
290
  chunkID_set_free(my_bmap);
291
}
292

    
293
void bcast_bmap(const struct streaming_context * stc)
294
{
295
  int i, n;
296
  struct peer **neighbours;
297
  struct peerset *pset;
298
  struct chunkID_set *my_bmap;
299

    
300
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
301
  n = peerset_size(pset);
302
  neighbours = peerset_get_peers(pset);
303

    
304
  my_bmap = cb_to_bmap(stc->cb);        //cache our bmap for faster processing
305
  for (i = 0; i<n; i++) {
306
    sendBufferMap(psinstance_nodeid(stc->ps),neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
307
#ifdef LOG_SIGNAL
308
                 log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
309
#endif
310
  }
311
  chunkID_set_free(my_bmap);
312
}
313

    
314
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id)
315
{
316
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
317
  sendAck(psinstance_nodeid(stc->ps),toid, my_bmap,trans_id);
318
#ifdef LOG_SIGNAL
319
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
320
#endif
321
  chunkID_set_free(my_bmap);
322
}
323

    
324
double get_average_lossrate_pset(struct peerset *pset)
325
{
326
  return 0;
327
}
328

    
329
void ack_chunk(const struct streaming_context * stc, struct chunk *c, struct nodeID *from, uint16_t trans_id)
330
{
331
  //reduce load a little bit if there are losses on the path from this guy
332
  double average_lossrate = get_average_lossrate_pset(topology_get_neighbours(psinstance_topology(stc->ps)));
333
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
334
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
335
    return;
336
  }
337
  send_ack(stc, from, trans_id);        //send explicit ack
338
}
339

    
340
void received_chunk(struct streaming_context * stc, struct nodeID *from, const uint8_t *buff, int len)
341
{
342
  int res;
343
  static struct chunk c;
344
  struct peer *p;
345
  static int bcast_cnt;
346
  uint16_t transid;
347

    
348
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
349
  if (res > 0) {
350
                if (stc->chunk_loss_interval && c.id % stc->chunk_loss_interval == 0) {
351
                        fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
352
                        free(c.data);
353
                        free(c.attributes);
354
                        return;
355
                }
356
    chunk_attributes_update_received(&c);
357
    chunk_unlock(stc->ch_locks, c.id);
358
    dprintf("Received chunk %d from peer: %s\n", c.id, nodeid_static_str(from));
359
#ifdef LOG_CHUNK
360
    log_chunk(from,psinstance_nodeid(stc->ps),&c,"RECEIVED");
361
#endif
362
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
363
    output_deliver(psinstance_output(stc->ps), &c);
364
    res = cb_add_chunk(stc->cb, &c);
365
    reg_chunk_receive(psinstance_measures(stc->ps),c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
366
    cb_print(stc);
367
    if (res < 0) {
368
      dprintf("\tchunk too old, buffer full with newer chunks\n");
369
#ifdef LOG_CHUNK
370
      log_chunk_error(from,psinstance_nodeid(stc->ps),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
371
#endif
372
      free(c.data);
373
      free(c.attributes);
374
    }
375
    p = nodeid_to_peer(psinstance_topology(stc->ps), from, stc->neigh_on_chunk_recv);
376
    if (p) {        //now we have it almost sure
377
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
378
      gettimeofday(&p->bmap_timestamp, NULL);
379
    }
380
    ack_chunk(stc, &c, from, transid);        //send explicit ack
381
    if (stc->bcast_after_receive_every && bcast_cnt % stc->bcast_after_receive_every == 0) {
382
       bcast_bmap(stc);
383
    }
384
  } else {
385
    fprintf(stderr,"\tError: can't decode chunk!\n");
386
  }
387
}
388

    
389
struct chunk *generated_chunk(struct streaming_context * stc, suseconds_t *delta)
390
{
391
  struct chunk *c;
392

    
393
  c = malloc(sizeof(struct chunk));
394
  if (!c) {
395
    fprintf(stderr, "Memory allocation error!\n");
396
    return NULL;
397
  }
398
  memset(c, 0, sizeof(struct chunk));
399

    
400
  *delta = (suseconds_t)input_get(stc->input, c);
401
  if (*delta < 0) {
402
    fprintf(stderr, "Error in input!\n");
403
    exit(-1);
404
  }
405
  if (c->data == NULL) {
406
    free(c);
407
    return NULL;
408
  }
409
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
410
  chunk_attributes_fill(c);
411
  return c;
412
}
413

    
414
int add_chunk(struct streaming_context * stc, struct chunk *c)
415
{
416
  int res;
417

    
418
  if (stc && c && stc->cb)
419
  {
420
          res = cb_add_chunk(stc->cb, c);
421
          if (res < 0) {
422
            free(c->data);
423
            free(c->attributes);
424
            free(c);
425
            return 0;
426
          }
427
         // free(c);
428
          return 1;
429
  }
430
  return 0;
431
}
432

    
433
uint64_t get_chunk_timestamp(const struct streaming_context * stc, int cid){
434
  const struct chunk *c = cb_get_chunk(stc->cb, cid);
435
  if (!c) return 0;
436

    
437
  return c->timestamp;
438
}
439

    
440
void print_chunkID_set(struct chunkID_set *cset)
441
{
442
        uint32_t * ptr = (uint32_t *) cset;
443
        uint32_t n_elements,i;
444
        int * data = (int *) &(ptr[3]);
445
        fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]);
446
        fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]);
447
        n_elements = ptr[2];
448
        fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
449
        fprintf(stderr,"[DEBUG] Chunk ID elements: [");
450
        for (i=0;i<n_elements;i++)
451
                fprintf(stderr,".%d.",data[i]);
452

    
453
        fprintf(stderr,"]\n");
454
}
455

    
456
/**
457
 *example function to filter chunks based on whether a given peer needs them.
458
 *
459
 * Looks at buffermap information received about the given peer.
460
 */
461
int needs_old(const struct streaming_context * stc, struct peer *n, int cid){
462
  struct peer * p = n;
463

    
464
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
465
    uint64_t ts;
466
    ts = get_chunk_timestamp(stc, cid);
467
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
468
      return 0;
469
    }
470
  }
471

    
472
  //dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
473
  if (! p->bmap) { // this will never happen since the pset module initializes bmap
474
    //dprintf("no bmap\n");
475
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
476
  }
477
        
478
//        fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
479
//        print_chunkID_set(p->bmap);                                                                                                                                                                        // DEBUG
480

    
481
  return _needs(stc, p->bmap, p->cb_size, cid);
482
}
483

    
484
/**
485
 * Function checking if chunkID_set cset may need chunk with id cid
486
 * @cset: target cset
487
 * @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
488
 *         the maximum capacity in of the receiving peer (so it's 0 for the source)
489
 * @cid: target chunk identifier
490
 */
491
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid){
492

    
493
  if (cb_size == 0) { //if it declared it does not needs chunks
494
    return 0;
495
  }
496

    
497
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
498
    uint64_t ts;
499
    ts = get_chunk_timestamp(stc, cid);
500
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
501
      return 0;
502
    }
503
  }
504

    
505
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
506
    int missing, min;
507
    //@TODO: add some bmap_timestamp based logic
508

    
509
    if (chunkID_set_size(cset) == 0) {
510
      //dprintf("bmap empty\n");
511
      return 1;        // if the bmap seems empty, it needs the chunk
512
    }
513
    missing = stc->cb_size - chunkID_set_size(cset);
514
    missing = missing < 0 ? 0 : missing;
515
    min = chunkID_set_get_earliest(cset);
516
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
517
    return (cid >= min - missing);
518
  }
519

    
520
  //dprintf("has it\n");
521
  return 0;
522
}
523

    
524
int needs(struct peer *p, int cid)
525
{
526
        int min;
527

    
528
        if (p->cb_size == 0) // it does not have capacity
529
                return 0;
530
        if (chunkID_set_check(p->bmap,cid) < 0)  // not in bmap
531
        {
532
                if(p->cb_size > chunkID_set_size(p->bmap)) // it has room for chunks anyway
533
                {
534
                        min = chunkID_set_get_earliest(p->bmap) - p->cb_size + chunkID_set_size(p->bmap);
535
                        min = min < 0 ? 0 : min;
536
                        if (cid >= min)
537
                                return 1;
538
                }
539
                if((int)chunkID_set_get_earliest(p->bmap) < cid)  // our is reasonably new
540
                        return 1;
541
        }
542
        return 0;
543
}
544

    
545
double peerWeightReceivedfrom(struct peer **n){
546
  struct peer * p = *n;
547
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
548
}
549

    
550
double peerWeightUniform(struct peer **n){
551
  return 1;
552
}
553

    
554
double peerWeightLoss(struct peer **n){
555
  return 1;
556
}
557

    
558
//double peerWeightRtt(struct peer **n){
559
//#ifdef MONL
560
//  double rtt = get_rtt((*n)->id);
561
//  //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
562
//  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
563
//#else
564
//  return 1;
565
//#endif
566
//}
567

    
568
//ordering function for ELp peer selection, chunk ID based
569
//can't be used as weight
570
//double peerScoreELpID(struct nodeID **n){
571
//  struct chunkID_set *bmap;
572
//  int latest;
573
//  struct peer * p = nodeid_to_peer(*n, 0);
574
//  if (!p) return 0;
575
//
576
//  bmap = p->bmap;
577
//  if (!bmap) return 0;
578
//  latest = chunkID_set_get_latest(bmap);
579
//  if (latest == INT_MIN) return 0;
580
//
581
//  return -latest;
582
//}
583

    
584
double chunkScoreChunkID(int *cid){
585
  return (double) *cid;
586
}
587

    
588
uint64_t get_chunk_deadline(const struct streaming_context * stc, int cid){
589
  const struct chunk_attributes * ca;
590
  const struct chunk *c;
591

    
592
  c = cb_get_chunk(stc->cb, cid);
593
  if (!c) return 0;
594

    
595
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
596
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
597
    return 0;
598
  }
599

    
600
  ca = (struct chunk_attributes *) c->attributes;
601
  return ca->deadline;
602
}
603

    
604
//double chunkScoreDL(int *cid){
605
//  return - (double)get_chunk_deadline(*cid);
606
//}
607

    
608
//double chunkScoreTimestamp(int *cid){
609
//  return (double) get_chunk_timestamp(*cid);
610
//}
611

    
612
void send_accepted_chunks(const struct streaming_context * stc, struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
613
  int i, d, cset_acc_size, res;
614
  struct peer *to = nodeid_to_peer(psinstance_topology(stc->ps), toid, 0);
615

    
616
  transaction_reg_accept(stc->transactions, trans_id, toid);
617

    
618
  cset_acc_size = chunkID_set_size(cset_acc);
619
  reg_offer_accept_out(psinstance_measures(stc->ps),cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
620
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
621
    const struct chunk *c;
622
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
623
    c = cb_get_chunk(stc->cb, chunkid);
624
    if (!c) {        // we should have the chunk
625
      dprintf("%s asked for chunk %d we do not own anymore\n", nodeid_static_str(toid), chunkid);
626
      continue;
627
    }
628
    if (!to || needs(to, chunkid)) {        //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
629
      chunk_attributes_update_sending(c);
630
      res = sendChunk(psinstance_nodeid(stc->ps),toid, c, trans_id);
631
      if (res >= 0) {
632
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
633
        d++;
634
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
635
#ifdef LOG_CHUNK
636
              log_chunk(psinstance_nodeid(stc->ps), toid,c, "SENT_ACCEPTED");
637
#endif
638
        //{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
639
      } else {
640
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
641
      }
642
    }
643
  }
644
}
645

    
646
int offer_peer_count(const struct streaming_context * stc)
647
{
648
  return psinstance_num_offers(stc->ps);
649
}
650

    
651
int offer_max_deliver(const struct streaming_context * stc, const struct nodeID *n)
652
{
653
  return psinstance_chunks_per_offer(stc->ps);
654
}
655

    
656
////get the rtt. Currenly only MONL version is supported
657
//static double get_rtt_of(struct nodeID* n){
658
//#ifdef MONL
659
//  return get_rtt(n);
660
//#else
661
//  return NAN;
662
//#endif
663
//}
664

    
665
#define DEFAULT_RTT_ESTIMATE 0.5
666

    
667
struct chunkID_set *compose_offer_cset(const struct streaming_context * stc, const struct peer *p)
668
{
669
  int num_chunks, j;
670
  uint64_t smallest_ts; //, largest_ts;
671
  double dt;
672
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
673
  struct chunk *chunks = cb_get_chunks(stc->cb, &num_chunks);
674

    
675
  dt = DEFAULT_RTT_ESTIMATE;
676
  dt *= 1e6;        //convert to usec
677

    
678
  smallest_ts = chunks[0].timestamp;
679
//  largest_ts = chunks[num_chunks-1].timestamp;
680

    
681
  //add chunks in latest...earliest order
682
  if (psinstance_is_source(stc->ps)) {
683
    j = (num_chunks-1) * 3/4;        //do not send offers for the latest chunks from the source
684
  } else {
685
    j = num_chunks-1;
686
  }
687
  for(; j>=0; j--) {
688
    if (chunks[j].timestamp > smallest_ts + dt)
689
    chunkID_set_add_chunk(my_bmap, chunks[j].id);
690
  }
691

    
692
  return my_bmap;
693
}
694

    
695

    
696
void send_offer(struct streaming_context * stc)
697
{
698
  struct chunk *buff;
699
  size_t size=0,  i, n;
700
  struct peer **neighbours;
701
  struct peerset *pset;
702

    
703
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
704
  n = peerset_size(pset);
705
  neighbours = peerset_get_peers(pset);
706
  // dprintf("Send Offer: %lu neighbours\n", n);
707
  if (n == 0) return;
708
  buff = cb_get_chunks(stc->cb, (int *)&size);
709
  if (size == 0) return;
710

    
711
    size_t selectedpeers_len = offer_peer_count(stc);
712
    int chunkids[size];
713
    struct peer *nodeids[n];
714
    struct peer *selectedpeers[selectedpeers_len];
715

    
716
    //reduce load a little bit if there are losses on the path from this guy
717
    double average_lossrate = get_average_lossrate_pset(pset);
718
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
719
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
720
      return;
721
    }
722

    
723
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
724
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
725
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
726

    
727
    for (i=0; i<selectedpeers_len ; i++){
728
      int transid = transaction_create(&(stc->transactions), selectedpeers[i]->id);
729
      int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
730
      struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
731
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), selectedpeers[i]->cb_size);
732
      offerChunks(psinstance_nodeid(stc->ps),selectedpeers[i]->id, offer_cset, max_deliver, transid++);
733
#ifdef LOG_SIGNAL
734
                        log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
735
#endif
736
      chunkID_set_free(offer_cset);
737
    }
738
}
739

    
740
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error)
741
{
742
        switch (error) {
743
                case E_CB_OLD:
744
                log_chunk(from,to,c,"TOO_OLD");
745
                break;
746
                case E_CB_DUPLICATE:
747
                log_chunk(from,to,c,"DUPLICATED");
748
                break;
749
                default:
750
                log_chunk(from,to,c,"ERROR");
751
        } 
752
}
753

    
754
void log_neighbourhood(const struct streaming_context * stc)
755
{
756
  struct peerset * pset;
757
  const struct peer * p;
758
  int psetsize,i;
759
  uint64_t now;
760
  char me[NODE_STR_LENGTH];
761

    
762
  node_addr(psinstance_nodeid(stc->ps), me, NODE_STR_LENGTH);
763
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
764
  psetsize = peerset_size(pset);
765
  now = gettimeofday_in_us();
766
  peerset_for_each(pset,p,i)
767
    fprintf(stderr,"[NEIGHBOURHOOD],%"PRIu64",%s,%s,%d\n",now,me,nodeid_static_str(p->id),psetsize);
768

    
769
}
770

    
771
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note)
772
{
773
        // semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
774
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
775
        node_addr(from,sndr,NODE_STR_LENGTH);
776
        node_addr(to,rcvr,NODE_STR_LENGTH);
777

    
778
        fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note);
779
}
780

    
781
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag)
782
{
783
        char typestr[24];
784
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
785
        node_addr(fromid,sndr,NODE_STR_LENGTH);
786
        node_addr(toid,rcvr,NODE_STR_LENGTH);
787

    
788
        switch (type)
789
        {
790
                case sig_offer:
791
                        sprintf(typestr,"%s","OFFER_SIG");
792
                        break;
793
                case sig_accept:
794
                        sprintf(typestr,"%s","ACCEPT_SIG");
795
                        break;
796
                case sig_request:
797
                        sprintf(typestr,"%s","REQUEST_SIG");
798
                        break;
799
                case sig_deliver:
800
                        sprintf(typestr,"%s","DELIVER_SIG");
801
                        break;
802
                case sig_send_buffermap:
803
                        sprintf(typestr,"%s","SEND_BMAP_SIG");
804
                        break;
805
                case sig_request_buffermap:
806
                        sprintf(typestr,"%s","REQUEST_BMAP_SIG");
807
                        break;
808
                case sig_ack:
809
                        sprintf(typestr,"%s","CHUNK_ACK_SIG");
810
                        break;
811
                default:
812
                        sprintf(typestr,"%s","UNKNOWN_SIG");
813

    
814
        }
815
        fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%s,%s\n",sndr,rcvr,trans_id,typestr,flag);
816
}
817

    
818
int peer_chunk_dispatch(struct streaming_context * stc, const struct PeerChunk  *pairs,const size_t num_pairs)
819
{
820
        int transid, res,success = 0;
821
        size_t i;
822
        const struct peer * target_peer;
823
        const struct chunk * target_chunk;
824

    
825
        for (i=0; i<num_pairs ; i++){
826
                target_peer = pairs[i].peer;
827
                target_chunk = cb_get_chunk(stc->cb, pairs[i].chunk);
828

    
829
                if (stc->send_bmap_before_push) {
830
                        send_bmap(stc, target_peer->id);
831
                }
832
                chunk_attributes_update_sending(target_chunk);
833
                transid = transaction_create(&(stc->transactions), target_peer->id);
834
                res = sendChunk(psinstance_nodeid(stc->ps),target_peer->id, target_chunk, transid);        //we use transactions in order to register acks for push
835
                if (res>=0) {
836
#ifdef LOG_CHUNK
837
                        log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
838
#endif
839
//                        chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
840
                        reg_chunk_send(psinstance_measures(stc->ps),target_chunk->id);
841
                        success++;
842
                } else {
843
                        fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
844
                }
845
        }
846
        return success;
847

    
848
}
849

    
850
int inject_chunk(struct streaming_context * stc, const struct chunk * target_chunk,const int multiplicity)
851
/*injects a specific chunk in the overlay and return the number of injected copies*/
852
{
853
        struct peerset *pset;
854
        struct peer ** peers, ** dst_peers;
855
        int peers_num;
856
        double (* peer_evaluation) (struct peer **n);
857
        size_t i, selectedpairs_len = multiplicity;
858
        struct PeerChunk  * selectedpairs;
859

    
860
        pset = topology_get_neighbours(psinstance_topology(stc->ps));
861
        peers_num = peerset_size(pset);
862
        peers = peerset_get_peers(pset);
863
        peer_evaluation = SCHED_PEER;
864

    
865
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
866
         dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
867
         selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
868

    
869
        selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
870
        for ( i=0; i<selectedpairs_len; i++)
871
        {
872
                selectedpairs[i].peer = dst_peers[i];
873
                selectedpairs[i].chunk = target_chunk->id;
874
        }
875

    
876
        peer_chunk_dispatch(stc, selectedpairs,selectedpairs_len);
877

    
878
        free(selectedpairs);
879
        free(dst_peers);
880
        return selectedpairs_len;
881
}
882

    
883
void send_chunk(struct streaming_context * stc)
884
{
885
  struct chunk *buff;
886
  int size, res, i, n;
887
  struct peer **neighbours;
888
  struct peerset *pset;
889

    
890
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
891
  n = peerset_size(pset);
892
  neighbours = peerset_get_peers(pset);
893
  dprintf("Send Chunk: %d neighbours\n", n);
894
  if (n == 0) return;
895
  buff = cb_get_chunks(stc->cb, &size);
896
  dprintf("\t %d chunks in buffer...\n", size);
897
  if (size == 0) return;
898

    
899
  /************ STUPID DUMB SCHEDULING ****************/
900
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
901
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
902
  /************ /STUPID DUMB SCHEDULING ****************/
903

    
904
  /************ USE SCHEDULER ****************/
905
  {
906
    size_t selectedpairs_len = 1;
907
    int chunkids[size];
908
                int transid;
909
    struct peer *nodeids[n];
910
    struct PeerChunk selectedpairs[1];
911
  
912
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
913
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
914
            SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
915
  /************ /USE SCHEDULER ****************/
916

    
917
    for (i=0; i<(int)selectedpairs_len ; i++){
918
      struct peer *p = selectedpairs[i].peer;
919
      const struct chunk *c = cb_get_chunk(stc->cb, selectedpairs[i].chunk);
920
      dprintf("\t sending chunk[%d] to ", c->id);
921
      dprintf("%s\n", nodeid_static_str(p->id));
922

    
923
      if (stc->send_bmap_before_push) {
924
        send_bmap(stc, p->id);
925
      }
926

    
927
      chunk_attributes_update_sending(c);
928
      transid = transaction_create(&(stc->transactions), p->id);
929
      res = sendChunk(psinstance_nodeid(stc->ps),p->id, c, transid);        //we use transactions in order to register acks for push
930
//      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
931
      dprintf("\tResult: %d\n", res);
932
      if (res>=0) {
933
#ifdef LOG_CHUNK
934
              log_chunk(psinstance_nodeid(stc->ps),p->id,c,"SENT");
935
#endif
936
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
937
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
938
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
939
      } else {
940
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
941
      }
942
    }
943
  }
944
}