Statistics
| Branch: | Revision:

pstreamer / src / streaming.c @ 56d5986f

History | View | Annotate | Download (29.7 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 * Copyright (c) 2017 Luca Baldesi
5
 *
6
 * This file is part of PeerStreamer.
7
 *
8
 * PeerStreamer is free software: you can redistribute it and/or
9
 * modify it under the terms of the GNU Affero General Public License as
10
 * published by the Free Software Foundation, either version 3 of the
11
 * License, or (at your option) any later version.
12
 *
13
 * PeerStreamer is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
16
 * General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU Affero General Public License
19
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
20
 *
21
 */
22
#include <sys/time.h>
23
#include <stdlib.h>
24
#include <stdio.h>
25
#include <stdint.h>
26
#include <stdbool.h>
27
#include <math.h>
28
#include <assert.h>
29
#include <string.h>
30
#include <inttypes.h>
31

    
32
#include <net_helper.h>
33
#include <chunk.h> 
34
#include <chunkbuffer.h> 
35
#include <trade_msg_la.h>
36
#include <trade_msg_ha.h>
37
#include <peerset.h>
38
#include <peer.h>
39
#include <chunkidset.h>
40
#include <limits.h>
41
#include <trade_sig_ha.h>
42
#ifdef CHUNK_ATTRIB_CHUNKER
43
#include <chunkiser_attrib.h>
44
#endif
45

    
46
#include "streaming.h"
47
#include "output.h"
48
#include "input.h"
49
#include "dbg.h"
50
#include "chunk_signaling.h"
51
#include "chunklock.h"
52
#include "topology.h"
53
#include "measures.h"
54
#include "net_helpers.h"
55
#include "scheduling.h"
56
#include "transaction.h"
57
#include "peer_metadata.h"
58

    
59
#include "scheduler_la.h"
60

    
61
# define CB_SIZE_TIME_UNLIMITED 1e12
62

    
63
void selectPeersForChunks(SchedOrdering ordering, schedPeerID *peers, size_t peers_len, schedChunkID *chunks, size_t chunks_len, schedPeerID *selected, size_t *selected_len, filterFunction filter, peerEvaluateFunction evaluate);
64

    
65
struct chunk_attributes {
66
  uint64_t deadline;
67
  uint16_t deadline_increment;
68
  uint16_t hopcount;
69
} __attribute__((packed));
70

    
71
struct streaming_context {
72
        struct chunk_buffer *cb;
73
        struct input_desc *input;
74
        struct chunk_locks * ch_locks;
75
        struct service_times_element * transactions;
76
        const struct psinstance * ps;
77
        int cb_size;
78
        int bcast_after_receive_every;
79
        bool neigh_on_chunk_recv;
80
        bool send_bmap_before_push;
81
        uint64_t CB_SIZE_TIME;
82
        uint32_t chunk_loss_interval;
83
};
84

    
85
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config)
86
{
87
        struct streaming_context * stc;
88
        static char conf[80];
89

    
90
        stc = malloc(sizeof(struct streaming_context));
91
        stc->bcast_after_receive_every = 0;
92
        stc->neigh_on_chunk_recv = false;
93
        stc->send_bmap_before_push = false;
94
        stc->transactions = NULL;
95
        stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
96
        stc->chunk_loss_interval = 0;  // disable self-lossy feature (for experiments)
97

    
98
        stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
99

    
100
        stc->ps = ps;
101
        stc->cb_size = psinstance_chunkbuffer_size(ps);
102
        sprintf(conf, "size=%d", stc->cb_size);
103
        stc->cb = cb_init(conf);
104
        chunkDeliveryInit(psinstance_nodeid(ps));
105
        chunkSignalingInit(psinstance_nodeid(ps));
106
        stc->ch_locks = chunk_locks_create();
107
        return stc;
108
}
109

    
110
void streaming_destroy(struct streaming_context ** stc)
111
{
112
        if (stc && *stc)
113
        {
114
                if((*stc)->input)
115
                        input_close((*stc)->input);
116
                if(((*stc)->ch_locks))
117
                        chunk_locks_destroy(&((*stc)->ch_locks));
118
                if(((*stc)->transactions))
119
                        transactions_destroy((*stc)->transactions);
120
                if(((*stc)->cb))
121
                        cb_destroy((*stc)->cb);
122
                free((*stc));
123
        }
124
}
125

    
126
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid);
127

    
128
uint64_t gettimeofday_in_us(void)
129
{
130
  struct timeval what_time; //to store the epoch time
131

    
132
  gettimeofday(&what_time, NULL);
133
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
134
}
135

    
136
void cb_print(const struct streaming_context * stc)
137
{
138
#ifdef DEBUG
139
  struct chunk *chunks;
140
  int num_chunks, i, id;
141
  chunks = cb_get_chunks(stc->cb, &num_chunks);
142

    
143
  dprintf("\tchbuf :");
144
  i = 0;
145
  if(num_chunks) {
146
    id = chunks[0].id;
147
    dprintf(" %d-> ",id);
148
    while (i < num_chunks) {
149
      if (id == chunks[i].id) {
150
        dprintf("%d",id % 10);
151
        i++;
152
      } else if (chunk_islocked(stc->ch_locks, id)) {
153
        dprintf("*");
154
      } else {
155
        dprintf(".");
156
      }
157
      id++;
158
    }
159
  }
160
  dprintf("\n");
161
#endif
162
}
163

    
164
void chunk_attributes_fill(struct chunk* c)
165
{
166
  struct chunk_attributes * ca;
167
  int priority = 1;
168

    
169
  assert((!c->attributes && c->attributes_size == 0)
170
#ifdef CHUNK_ATTRIB_CHUNKER
171
      || chunk_attributes_chunker_verify(c->attributes, c->attributes_size)
172
#endif
173
  );
174

    
175
#ifdef CHUNK_ATTRIB_CHUNKER
176
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
177
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
178
    free(c->attributes);
179
    c->attributes = NULL;
180
    c->attributes_size = 0;
181
  }
182
#endif
183

    
184
  c->attributes_size = sizeof(struct chunk_attributes);
185
  c->attributes = ca = malloc(c->attributes_size);
186

    
187
  ca->deadline = c->id;
188
  ca->deadline_increment = priority * 2;
189
  ca->hopcount = 0;
190
}
191

    
192
int chunk_get_hopcount(const struct chunk* c) {
193
  struct chunk_attributes * ca;
194

    
195
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
196
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
197
    return -1;
198
  }
199

    
200
  ca = (struct chunk_attributes *) c->attributes;
201
  return ca->hopcount;
202
}
203

    
204
void chunk_attributes_update_received(struct chunk* c)
205
{
206
  struct chunk_attributes * ca;
207

    
208
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
209
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
210
    return;
211
  }
212

    
213
  ca = (struct chunk_attributes *) c->attributes;
214
  ca->hopcount++;
215
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
216
}
217

    
218
void chunk_attributes_update_sending(const struct chunk* c)
219
{
220
  struct chunk_attributes * ca;
221

    
222
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
223
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
224
    return;
225
  }
226

    
227
  ca = (struct chunk_attributes *) c->attributes;
228
  ca->deadline += ca->deadline_increment;
229
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
230
}
231

    
232
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
233
{
234
  struct chunk *chunks;
235
  int num_chunks, i;
236
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
237
  chunks = cb_get_chunks(chbuf, &num_chunks);
238

    
239
  for(i=num_chunks-1; i>=0; i--) {
240
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
241
  }
242
  return my_bmap;
243
}
244

    
245
// a simple implementation that request everything that we miss ... up to max deliver
246
struct chunkID_set *get_chunks_to_accept(const struct streaming_context * stc, struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
247
  struct chunkID_set *cset_acc, *my_bmap;
248
  int i, d, cset_off_size;
249
  //double lossrate;
250
  struct peer *from = nodeid_to_peer(psinstance_topology(stc->ps), fromid, 0);
251

    
252
  cset_acc = chunkID_set_init("size=0");
253

    
254
  //reduce load a little bit if there are losses on the path from this guy
255
  //lossrate = get_lossrate_receive(from->id);
256
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
257
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
258
    my_bmap = cb_to_bmap(stc->cb);
259
    cset_off_size = chunkID_set_size(cset_off);
260
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
261
      int chunkid = chunkID_set_get_chunk(cset_off, i);
262
      //dprintf("\tdo I need c%d ? :",chunkid);
263
      if (!chunk_islocked(stc->ch_locks, chunkid) && _needs(stc, my_bmap, stc->cb_size, chunkid)) {
264
        chunkID_set_add_chunk(cset_acc, chunkid);
265
        chunk_lock(stc->ch_locks, chunkid,from);
266
        dtprintf("accepting %d from %s", chunkid, nodeid_static_str(fromid));
267
#ifdef MONL
268
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
269
#endif
270
        dprintf("\n");
271
        d++;
272
      }
273
    }
274
    chunkID_set_free(my_bmap);
275
  //} else {
276
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr_tr(fromid), lossrate, get_rtt(fromid));
277
  //}
278

    
279
  reg_offer_accept_in(psinstance_measures(stc->ps), chunkID_set_size(cset_acc) > 0 ? 1 : 0);
280

    
281
  return cset_acc;
282
}
283

    
284
void send_bmap(const struct streaming_context *stc, const struct nodeID *toid)
285
{
286
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
287
   sendBufferMap(toid,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
288
#ifdef LOG_SIGNAL
289
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
290
#endif
291
  chunkID_set_free(my_bmap);
292
}
293

    
294
void bcast_bmap(const struct streaming_context * stc)
295
{
296
  int i, n;
297
  struct peer **neighbours;
298
  struct peerset *pset;
299
  struct chunkID_set *my_bmap;
300

    
301
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
302
  n = peerset_size(pset);
303
  neighbours = peerset_get_peers(pset);
304

    
305
  my_bmap = cb_to_bmap(stc->cb);        //cache our bmap for faster processing
306
  for (i = 0; i<n; i++) {
307
    sendBufferMap(neighbours[i]->id,NULL, my_bmap, psinstance_is_source(stc->ps) ? 0 : stc->cb_size, 0);
308
#ifdef LOG_SIGNAL
309
                 log_signal(psinstance_nodeid(stc->ps),neighbours[i]->id,chunkID_set_size(my_bmap),0,sig_send_buffermap,"SENT");
310
#endif
311
  }
312
  chunkID_set_free(my_bmap);
313
}
314

    
315
void send_ack(const struct streaming_context * stc, struct nodeID *toid, uint16_t trans_id)
316
{
317
  struct chunkID_set *my_bmap = cb_to_bmap(stc->cb);
318
  sendAck(toid, my_bmap,trans_id);
319
#ifdef LOG_SIGNAL
320
        log_signal(psinstance_nodeid(stc->ps),toid,chunkID_set_size(my_bmap),trans_id,sig_ack,"SENT");
321
#endif
322
  chunkID_set_free(my_bmap);
323
}
324

    
325
double get_average_lossrate_pset(struct peerset *pset)
326
{
327
  return 0;
328
}
329

    
330
void ack_chunk(const struct streaming_context * stc, struct chunk *c, struct nodeID *from, uint16_t trans_id)
331
{
332
  //reduce load a little bit if there are losses on the path from this guy
333
  double average_lossrate = get_average_lossrate_pset(topology_get_neighbours(psinstance_topology(stc->ps)));
334
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
335
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
336
    return;
337
  }
338
  send_ack(stc, from, trans_id);        //send explicit ack
339
}
340

    
341
void received_chunk(struct streaming_context * stc, struct nodeID *from, const uint8_t *buff, int len)
342
{
343
  int res;
344
  static struct chunk c;
345
  struct peer *p;
346
  static int bcast_cnt;
347
  uint16_t transid;
348

    
349
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
350
  if (res > 0) {
351
                if (stc->chunk_loss_interval && c.id % stc->chunk_loss_interval == 0) {
352
                        fprintf(stderr,"[NOISE] Chunk %d discarded >:)\n",c.id);
353
                        free(c.data);
354
                        free(c.attributes);
355
                        return;
356
                }
357
    chunk_attributes_update_received(&c);
358
    chunk_unlock(stc->ch_locks, c.id);
359
    dprintf("Received chunk %d from peer: %s\n", c.id, nodeid_static_str(from));
360
#ifdef LOG_CHUNK
361
    log_chunk(from,psinstance_nodeid(stc->ps),&c,"RECEIVED");
362
#endif
363
//{fprintf(stderr, "TEO: Peer %s received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", node_addr_tr(get_my_addr()),c.id, node_addr_tr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
364
    output_deliver(psinstance_output(stc->ps), &c);
365
    res = cb_add_chunk(stc->cb, &c);
366
    reg_chunk_receive(psinstance_measures(stc->ps),c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
367
    cb_print(stc);
368
    if (res < 0) {
369
      dprintf("\tchunk too old, buffer full with newer chunks\n");
370
#ifdef LOG_CHUNK
371
      log_chunk_error(from,psinstance_nodeid(stc->ps),&c,res); //{fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr_tr(from), gettimeofday_in_us());}
372
#endif
373
      free(c.data);
374
      free(c.attributes);
375
    }
376
    p = nodeid_to_peer(psinstance_topology(stc->ps), from, stc->neigh_on_chunk_recv);
377
    if (p) {        //now we have it almost sure
378
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
379
      gettimeofday(&p->bmap_timestamp, NULL);
380
    }
381
    ack_chunk(stc, &c, from, transid);        //send explicit ack
382
    if (stc->bcast_after_receive_every && bcast_cnt % stc->bcast_after_receive_every == 0) {
383
       bcast_bmap(stc);
384
    }
385
  } else {
386
    fprintf(stderr,"\tError: can't decode chunk!\n");
387
  }
388
}
389

    
390
struct chunk *generated_chunk(struct streaming_context * stc, suseconds_t *delta)
391
{
392
  struct chunk *c;
393

    
394
  c = malloc(sizeof(struct chunk));
395
  if (!c) {
396
    fprintf(stderr, "Memory allocation error!\n");
397
    return NULL;
398
  }
399
  memset(c, 0, sizeof(struct chunk));
400

    
401
  *delta = (suseconds_t)input_get(stc->input, c);
402
  if (*delta < 0) {
403
    fprintf(stderr, "Error in input!\n");
404
    exit(-1);
405
  }
406
  if (c->data == NULL) {
407
    free(c);
408
    return NULL;
409
  }
410
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
411
  chunk_attributes_fill(c);
412
  return c;
413
}
414

    
415
int add_chunk(struct streaming_context * stc, struct chunk *c)
416
{
417
  int res;
418

    
419
  if (stc && c && stc->cb)
420
  {
421
          res = cb_add_chunk(stc->cb, c);
422
          if (res < 0) {
423
            free(c->data);
424
            free(c->attributes);
425
            free(c);
426
            return 0;
427
          }
428
         // free(c);
429
          return 1;
430
  }
431
  return 0;
432
}
433

    
434
uint64_t get_chunk_timestamp(const struct streaming_context * stc, int cid){
435
  const struct chunk *c = cb_get_chunk(stc->cb, cid);
436
  if (!c) return 0;
437

    
438
  return c->timestamp;
439
}
440

    
441
void print_chunkID_set(struct chunkID_set *cset)
442
{
443
        uint32_t * ptr = (uint32_t *) cset;
444
        uint32_t n_elements,i;
445
        int * data = (int *) &(ptr[3]);
446
        fprintf(stderr,"[DEBUG] Chunk ID set type: %d\n",ptr[0]);
447
        fprintf(stderr,"[DEBUG] Chunk ID set size: %d\n",ptr[1]);
448
        n_elements = ptr[2];
449
        fprintf(stderr,"[DEBUG] Chunk ID n_elements: %d\n",n_elements);
450
        fprintf(stderr,"[DEBUG] Chunk ID elements: [");
451
        for (i=0;i<n_elements;i++)
452
                fprintf(stderr,".%d.",data[i]);
453

    
454
        fprintf(stderr,"]\n");
455
}
456

    
457
/**
458
 *example function to filter chunks based on whether a given peer needs them.
459
 *
460
 * Looks at buffermap information received about the given peer.
461
 */
462
int needs_old(const struct streaming_context * stc, struct peer *n, int cid){
463
  struct peer * p = n;
464

    
465
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
466
    uint64_t ts;
467
    ts = get_chunk_timestamp(stc, cid);
468
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
469
      return 0;
470
    }
471
  }
472

    
473
  //dprintf("\t%s needs c%d ? :",node_addr_tr(p->id),c->id);
474
  if (! p->bmap) { // this will never happen since the pset module initializes bmap
475
    //dprintf("no bmap\n");
476
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
477
  }
478
        
479
//        fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
480
//        print_chunkID_set(p->bmap);                                                                                                                                                                        // DEBUG
481

    
482
  return _needs(stc, p->bmap, peer_cb_size(p), cid);
483
}
484

    
485
/**
486
 * Function checking if chunkID_set cset may need chunk with id cid
487
 * @cset: target cset
488
 * @cb_size: maximum allowed numer of chunks. In the case of offer it indicates
489
 *         the maximum capacity in of the receiving peer (so it's 0 for the source)
490
 * @cid: target chunk identifier
491
 */
492
int _needs(const struct streaming_context * stc, struct chunkID_set *cset, int cb_size, int cid){
493

    
494
  if (cb_size == 0) { //if it declared it does not needs chunks
495
    return 0;
496
  }
497

    
498
  if (stc->CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
499
    uint64_t ts;
500
    ts = get_chunk_timestamp(stc, cid);
501
    if (ts && (ts < gettimeofday_in_us() - stc->CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
502
      return 0;
503
    }
504
  }
505

    
506
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
507
    int missing, min;
508
    //@TODO: add some bmap_timestamp based logic
509

    
510
    if (chunkID_set_size(cset) == 0) {
511
      //dprintf("bmap empty\n");
512
      return 1;        // if the bmap seems empty, it needs the chunk
513
    }
514
    missing = stc->cb_size - chunkID_set_size(cset);
515
    missing = missing < 0 ? 0 : missing;
516
    min = chunkID_set_get_earliest(cset);
517
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
518
    return (cid >= min - missing);
519
  }
520

    
521
  //dprintf("has it\n");
522
  return 0;
523
}
524

    
525
int needs(struct peer *p, int cid)
526
{
527
        int min;
528

    
529
        if (peer_cb_size(p) == 0) // it does not have capacity
530
                return 0;
531
        if (chunkID_set_check(p->bmap,cid) < 0)  // not in bmap
532
        {
533
                if(peer_cb_size(p) > chunkID_set_size(p->bmap)) // it has room for chunks anyway
534
                {
535
                        min = chunkID_set_get_earliest(p->bmap) - peer_cb_size(p) + chunkID_set_size(p->bmap);
536
                        min = min < 0 ? 0 : min;
537
                        if (cid >= min)
538
                                return 1;
539
                }
540
                if((int)chunkID_set_get_earliest(p->bmap) < cid)  // our is reasonably new
541
                        return 1;
542
        }
543
        return 0;
544
}
545

    
546
double peerWeightReceivedfrom(struct peer **n){
547
  struct peer * p = *n;
548
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
549
}
550

    
551
double peerWeightUniform(struct peer **n){
552
  return 1;
553
}
554

    
555
double peerWeightLoss(struct peer **n){
556
  return 1;
557
}
558

    
559
//double peerWeightRtt(struct peer **n){
560
//#ifdef MONL
561
//  double rtt = get_rtt((*n)->id);
562
//  //dprintf("RTT to %s: %f\n", node_addr_tr(p->id), rtt);
563
//  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
564
//#else
565
//  return 1;
566
//#endif
567
//}
568

    
569
//ordering function for ELp peer selection, chunk ID based
570
//can't be used as weight
571
//double peerScoreELpID(struct nodeID **n){
572
//  struct chunkID_set *bmap;
573
//  int latest;
574
//  struct peer * p = nodeid_to_peer(*n, 0);
575
//  if (!p) return 0;
576
//
577
//  bmap = p->bmap;
578
//  if (!bmap) return 0;
579
//  latest = chunkID_set_get_latest(bmap);
580
//  if (latest == INT_MIN) return 0;
581
//
582
//  return -latest;
583
//}
584

    
585
double chunkScoreChunkID(int *cid){
586
  return (double) *cid;
587
}
588

    
589
uint64_t get_chunk_deadline(const struct streaming_context * stc, int cid){
590
  const struct chunk_attributes * ca;
591
  const struct chunk *c;
592

    
593
  c = cb_get_chunk(stc->cb, cid);
594
  if (!c) return 0;
595

    
596
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
597
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
598
    return 0;
599
  }
600

    
601
  ca = (struct chunk_attributes *) c->attributes;
602
  return ca->deadline;
603
}
604

    
605
//double chunkScoreDL(int *cid){
606
//  return - (double)get_chunk_deadline(*cid);
607
//}
608

    
609
//double chunkScoreTimestamp(int *cid){
610
//  return (double) get_chunk_timestamp(*cid);
611
//}
612

    
613
void send_accepted_chunks(const struct streaming_context * stc, struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
614
  int i, d, cset_acc_size, res;
615
  struct peer *to = nodeid_to_peer(psinstance_topology(stc->ps), toid, 0);
616

    
617
  transaction_reg_accept(stc->transactions, trans_id, toid);
618

    
619
  cset_acc_size = chunkID_set_size(cset_acc);
620
  reg_offer_accept_out(psinstance_measures(stc->ps),cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
621
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
622
    const struct chunk *c;
623
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
624
    c = cb_get_chunk(stc->cb, chunkid);
625
    if (!c) {        // we should have the chunk
626
      dprintf("%s asked for chunk %d we do not own anymore\n", nodeid_static_str(toid), chunkid);
627
      continue;
628
    }
629
    if (!to || needs(to, chunkid)) {        //he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
630
      chunk_attributes_update_sending(c);
631
      res = sendChunk(toid, c, trans_id);
632
      if (res >= 0) {
633
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
634
        d++;
635
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
636
#ifdef LOG_CHUNK
637
              log_chunk(psinstance_nodeid(stc->ps), toid,c, "SENT_ACCEPTED");
638
#endif
639
        //{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(toid), gettimeofday_in_us(), res, c->size);}
640
      } else {
641
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
642
      }
643
    }
644
  }
645
}
646

    
647
int offer_peer_count(const struct streaming_context * stc)
648
{
649
  return psinstance_num_offers(stc->ps);
650
}
651

    
652
int offer_max_deliver(const struct streaming_context * stc, const struct nodeID *n)
653
{
654
  return psinstance_chunks_per_offer(stc->ps);
655
}
656

    
657
////get the rtt. Currenly only MONL version is supported
658
//static double get_rtt_of(struct nodeID* n){
659
//#ifdef MONL
660
//  return get_rtt(n);
661
//#else
662
//  return NAN;
663
//#endif
664
//}
665

    
666
#define DEFAULT_RTT_ESTIMATE 0.5
667

    
668
struct chunkID_set *compose_offer_cset(const struct streaming_context * stc, const struct peer *p)
669
{
670
  int num_chunks, j;
671
  uint64_t smallest_ts; //, largest_ts;
672
  double dt;
673
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
674
  struct chunk *chunks = cb_get_chunks(stc->cb, &num_chunks);
675

    
676
  dt = DEFAULT_RTT_ESTIMATE;
677
  dt *= 1e6;        //convert to usec
678

    
679
  smallest_ts = chunks[0].timestamp;
680
//  largest_ts = chunks[num_chunks-1].timestamp;
681

    
682
  //add chunks in latest...earliest order
683
  if (psinstance_is_source(stc->ps)) {
684
    j = (num_chunks-1) * 3/4;        //do not send offers for the latest chunks from the source
685
  } else {
686
    j = num_chunks-1;
687
  }
688
  for(; j>=0; j--) {
689
    if (chunks[j].timestamp > smallest_ts + dt)
690
    chunkID_set_add_chunk(my_bmap, chunks[j].id);
691
  }
692

    
693
  return my_bmap;
694
}
695

    
696

    
697
void send_offer(const struct streaming_context * stc)
698
{
699
  struct chunk *buff;
700
  size_t size=0,  i, n;
701
  struct peer **neighbours;
702
  struct peerset *pset;
703

    
704
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
705
  n = peerset_size(pset);
706
  neighbours = peerset_get_peers(pset);
707
  // dprintf("Send Offer: %lu neighbours\n", n);
708
  if (n == 0) return;
709
  buff = cb_get_chunks(stc->cb, (int *)&size);
710
  if (size == 0) return;
711

    
712
    size_t selectedpeers_len = offer_peer_count(stc);
713
    int chunkids[size];
714
    struct peer *nodeids[n];
715
    struct peer *selectedpeers[selectedpeers_len];
716

    
717
    //reduce load a little bit if there are losses on the path from this guy
718
    double average_lossrate = get_average_lossrate_pset(pset);
719
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
720
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
721
      return;
722
    }
723

    
724
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
725
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
726
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
727

    
728
    for (i=0; i<selectedpeers_len ; i++){
729
      int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
730
      int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
731
      struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
732
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), peer_cb_size(selectedpeers[i]));
733
      offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
734
#ifdef LOG_SIGNAL
735
                        log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
736
#endif
737
      chunkID_set_free(offer_cset);
738
    }
739
}
740

    
741
void log_chunk_error(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,int error)
742
{
743
        switch (error) {
744
                case E_CB_OLD:
745
                log_chunk(from,to,c,"TOO_OLD");
746
                break;
747
                case E_CB_DUPLICATE:
748
                log_chunk(from,to,c,"DUPLICATED");
749
                break;
750
                default:
751
                log_chunk(from,to,c,"ERROR");
752
        } 
753
}
754

    
755
void log_neighbourhood(const struct streaming_context * stc)
756
{
757
  struct peerset * pset;
758
  const struct peer * p;
759
  int psetsize,i;
760
  uint64_t now;
761
  char me[NODE_STR_LENGTH];
762

    
763
  node_addr(psinstance_nodeid(stc->ps), me, NODE_STR_LENGTH);
764
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
765
  psetsize = peerset_size(pset);
766
  now = gettimeofday_in_us();
767
  peerset_for_each(pset,p,i)
768
    fprintf(stderr,"[NEIGHBOURHOOD],%"PRIu64",%s,%s,%d\n",now,me,nodeid_static_str(p->id),psetsize);
769

    
770
}
771

    
772
void log_chunk(const struct nodeID *from,const struct nodeID *to,const struct chunk *c,const char * note)
773
{
774
        // semantic: [CHUNK_LOG],log_date,sender,receiver,id,size(bytes),chunk_timestamp,hopcount,notes
775
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
776
        node_addr(from,sndr,NODE_STR_LENGTH);
777
        node_addr(to,rcvr,NODE_STR_LENGTH);
778

    
779
        fprintf(stderr,"[CHUNK_LOG],%"PRIu64",%s,%s,%d,%d,%"PRIu64",%i,%s\n",gettimeofday_in_us(),sndr,rcvr,c->id,c->size,c->timestamp,chunk_get_hopcount(c),note);
780
}
781

    
782
void log_signal(const struct nodeID *fromid,const struct nodeID *toid,const int cidset_size,uint16_t trans_id,enum signaling_type type,const char *flag)
783
{
784
        char typestr[24];
785
        char sndr[NODE_STR_LENGTH],rcvr[NODE_STR_LENGTH];
786
        node_addr(fromid,sndr,NODE_STR_LENGTH);
787
        node_addr(toid,rcvr,NODE_STR_LENGTH);
788

    
789
        switch (type)
790
        {
791
                case sig_offer:
792
                        sprintf(typestr,"%s","OFFER_SIG");
793
                        break;
794
                case sig_accept:
795
                        sprintf(typestr,"%s","ACCEPT_SIG");
796
                        break;
797
                case sig_request:
798
                        sprintf(typestr,"%s","REQUEST_SIG");
799
                        break;
800
                case sig_deliver:
801
                        sprintf(typestr,"%s","DELIVER_SIG");
802
                        break;
803
                case sig_send_buffermap:
804
                        sprintf(typestr,"%s","SEND_BMAP_SIG");
805
                        break;
806
                case sig_request_buffermap:
807
                        sprintf(typestr,"%s","REQUEST_BMAP_SIG");
808
                        break;
809
                case sig_ack:
810
                        sprintf(typestr,"%s","CHUNK_ACK_SIG");
811
                        break;
812
                default:
813
                        sprintf(typestr,"%s","UNKNOWN_SIG");
814

    
815
        }
816
        fprintf(stderr,"[OFFER_LOG],%s,%s,%d,%s,%s\n",sndr,rcvr,trans_id,typestr,flag);
817
}
818

    
819
int peer_chunk_dispatch(const struct streaming_context * stc, const struct PeerChunk  *pairs,const size_t num_pairs)
820
{
821
        int transid, res,success = 0;
822
        size_t i;
823
        const struct peer * target_peer;
824
        const struct chunk * target_chunk;
825

    
826
        for (i=0; i<num_pairs ; i++){
827
                target_peer = pairs[i].peer;
828
                target_chunk = cb_get_chunk(stc->cb, pairs[i].chunk);
829

    
830
                if (stc->send_bmap_before_push) {
831
                        send_bmap(stc, target_peer->id);
832
                }
833
                chunk_attributes_update_sending(target_chunk);
834
                transid = transaction_create(stc->transactions, target_peer->id);
835
                res = sendChunk(target_peer->id, target_chunk, transid);        //we use transactions in order to register acks for push
836
                if (res>=0) {
837
#ifdef LOG_CHUNK
838
                        log_chunk(psinstance_nodeid(stc->ps), target_peer->id, target_chunk,"SENT");
839
#endif
840
//                        chunkID_set_add_chunk((target_peer)->bmap,target_chunk->id); //don't send twice ... assuming that it will actually arrive
841
                        reg_chunk_send(psinstance_measures(stc->ps),target_chunk->id);
842
                        success++;
843
                } else {
844
                        fprintf(stderr,"ERROR sending chunk %d\n",target_chunk->id);
845
                }
846
        }
847
        return success;
848

    
849
}
850

    
851
int inject_chunk(const struct streaming_context * stc, const struct chunk * target_chunk,const int multiplicity)
852
/*injects a specific chunk in the overlay and return the number of injected copies*/
853
{
854
        struct peerset *pset;
855
        struct peer ** peers, ** dst_peers;
856
        int peers_num;
857
        double (* peer_evaluation) (struct peer **n);
858
        size_t i, selectedpairs_len = multiplicity;
859
        struct PeerChunk  * selectedpairs;
860

    
861
        pset = topology_get_neighbours(psinstance_topology(stc->ps));
862
        peers_num = peerset_size(pset);
863
        peers = peerset_get_peers(pset);
864
        peer_evaluation = SCHED_PEER;
865

    
866
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
867
         dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
868
         selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
869

    
870
        selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
871
        for ( i=0; i<selectedpairs_len; i++)
872
        {
873
                selectedpairs[i].peer = dst_peers[i];
874
                selectedpairs[i].chunk = target_chunk->id;
875
        }
876

    
877
        peer_chunk_dispatch(stc, selectedpairs,selectedpairs_len);
878

    
879
        free(selectedpairs);
880
        free(dst_peers);
881
        return selectedpairs_len;
882
}
883

    
884
void send_chunk(const struct streaming_context * stc)
885
{
886
  struct chunk *buff;
887
  int size, res, i, n;
888
  struct peer **neighbours;
889
  struct peerset *pset;
890

    
891
  pset = topology_get_neighbours(psinstance_topology(stc->ps));
892
  n = peerset_size(pset);
893
  neighbours = peerset_get_peers(pset);
894
  dprintf("Send Chunk: %d neighbours\n", n);
895
  if (n == 0) return;
896
  buff = cb_get_chunks(stc->cb, &size);
897
  dprintf("\t %d chunks in buffer...\n", size);
898
  if (size == 0) return;
899

    
900
  /************ STUPID DUMB SCHEDULING ****************/
901
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
902
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
903
  /************ /STUPID DUMB SCHEDULING ****************/
904

    
905
  /************ USE SCHEDULER ****************/
906
  {
907
    size_t selectedpairs_len = 1;
908
    int chunkids[size];
909
                int transid;
910
    struct peer *nodeids[n];
911
    struct PeerChunk selectedpairs[1];
912
  
913
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
914
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
915
            SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
916
  /************ /USE SCHEDULER ****************/
917

    
918
    for (i=0; i<(int)selectedpairs_len ; i++){
919
      struct peer *p = selectedpairs[i].peer;
920
      const struct chunk *c = cb_get_chunk(stc->cb, selectedpairs[i].chunk);
921
      dprintf("\t sending chunk[%d] to ", c->id);
922
      dprintf("%s\n", nodeid_static_str(p->id));
923

    
924
      if (stc->send_bmap_before_push) {
925
        send_bmap(stc, p->id);
926
      }
927

    
928
      chunk_attributes_update_sending(c);
929
      transid = transaction_create(stc->transactions, p->id);
930
      res = sendChunk(p->id, c, transid);        //we use transactions in order to register acks for push
931
//      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
932
      dprintf("\tResult: %d\n", res);
933
      if (res>=0) {
934
#ifdef LOG_CHUNK
935
              log_chunk(psinstance_nodeid(stc->ps),p->id,c,"SENT");
936
#endif
937
//{fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr_tr(p->id), gettimeofday_in_us(), res, c->size);}
938
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
939
        reg_chunk_send(psinstance_measures(stc->ps),c->id);
940
      } else {
941
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
942
      }
943
    }
944
  }
945
}