Statistics
| Branch: | Revision:

streamers / streaming.c @ 37ba82c3

History | View | Annotate | Download (16.5 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 1cd06c26 CsabaKiraly
#include <sys/time.h>
8 89e893e2 Luca
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11 30a6e902 Csaba Kiraly
#include <stdbool.h>
12 4c0ba13e Csaba Kiraly
#include <math.h>
13 e99600d8 Csaba Kiraly
#include <assert.h>
14 f14985ba Luca Abeni
#include <string.h>
15 84ff82ba CsabaKiraly
#include <inttypes.h>
16 89e893e2 Luca
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22 0f35d029 Csaba Kiraly
#include <peerset.h>
23
#include <peer.h>
24 e98d8f50 Csaba Kiraly
#include <chunkidset.h>
25 ea084625 Csaba Kiraly
#include <limits.h>
26 6546a0c0 Alessandro Russo
#include <trade_sig_ha.h>
27 89e893e2 Luca
28
#include "streaming.h"
29
#include "output.h"
30
#include "input.h"
31 e64fc7e5 Luca
#include "dbg.h"
32 b44ae8d2 CsabaKiraly
#include "chunk_signaling.h"
33 abd2ef3b Csaba Kiraly
#include "chunklock.h"
34 fcb5c29b Csaba Kiraly
#include "topology.h"
35 4c0ba13e Csaba Kiraly
#include "measures.h"
36 0fec1310 Csaba Kiraly
#include "scheduling.h"
37 89e893e2 Luca
38 4367dafd Csaba Kiraly
#include "scheduler_la.h"
39
40 d88f86de Csaba Kiraly
static bool heuristics_distance_maxdeliver = false;
41 a204d648 Csaba Kiraly
static int bcast_after_receive_every = 0;
42 59b87dad Csaba Kiraly
static bool neigh_on_chunk_recv = false;
43 10fd812c Csaba Kiraly
44 e99600d8 Csaba Kiraly
struct chunk_attributes {
45 f5d9663e Csaba Kiraly
  uint64_t deadline;
46
  uint16_t deadline_increment;
47 e99600d8 Csaba Kiraly
  uint16_t hopcount;
48
} __attribute__((packed));
49
50 1cd06c26 CsabaKiraly
extern bool chunk_log;
51 bc1ddc15 MatteoSammarco
52 03dca3bf ArpadBakay
struct chunk_buffer *cb;
53 709f774c Luca
static struct input_desc *input;
54 8c1b2832 Csaba Kiraly
static int cb_size;
55 7ca3b176 Csaba Kiraly
static int transid=0;
56 89e893e2 Luca
57 dc87dca9 Csaba Kiraly
static int offer_per_tick = 1;        //N_p parameter of POLITO
58
59 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid);
60
61 1cd06c26 CsabaKiraly
uint64_t gettimeofday_in_us(void)
62
{
63
  struct timeval what_time; //to store the epoch time
64
65
  gettimeofday(&what_time, NULL);
66
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
67
}
68
69 d3a242ab Csaba Kiraly
void cb_print()
70
{
71 b45e7201 Csaba Kiraly
#ifdef DEBUG
72 d3a242ab Csaba Kiraly
  struct chunk *chunks;
73
  int num_chunks, i, id;
74
  chunks = cb_get_chunks(cb, &num_chunks);
75
76
  dprintf("\tchbuf :");
77
  i = 0;
78
  if(num_chunks) {
79
    id = chunks[0].id;
80
    dprintf(" %d-> ",id);
81
    while (i < num_chunks) {
82
      if (id == chunks[i].id) {
83
        dprintf("%d",id % 10);
84
        i++;
85 2314ccb7 Csaba Kiraly
      } else if (chunk_islocked(id)) {
86
        dprintf("*");
87 d3a242ab Csaba Kiraly
      } else {
88
        dprintf(".");
89
      }
90
      id++;
91
    }
92
  }
93
  dprintf("\n");
94 b45e7201 Csaba Kiraly
#endif
95 d3a242ab Csaba Kiraly
}
96
97 6920fdab Luca
void stream_init(int size, struct nodeID *myID)
98 89e893e2 Luca
{
99 8612d586 CsabaKiraly
  static char conf[32];
100 89e893e2 Luca
101 8c1b2832 Csaba Kiraly
  cb_size = size;
102
103
  sprintf(conf, "size=%d", cb_size);
104 89e893e2 Luca
  cb = cb_init(conf);
105 1b6a3ea7 Csaba Kiraly
  chunkDeliveryInit(myID);
106 513e75ef Alessandro Russo
  chunkSignalingInit(myID);
107 95faef91 CsabaKiraly
  init_measures();
108 7442ecb3 Luca
}
109
110 c9370421 CsabaKiraly
int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size)
111 7442ecb3 Luca
{
112 f14985ba Luca Abeni
  int flags = 0;
113
114
  if (memcmp(fname, "udp:", 4) == 0) {
115
    fname += 4;
116
    flags = INPUT_UDP;
117
  }
118
  if (loop) {
119
    flags |= INPUT_LOOP;
120
  }
121
  input = input_open(fname, flags, fds, fds_size);
122 7442ecb3 Luca
  if (input == NULL) {
123
    return -1;
124
  }
125
126
  stream_init(1, myID);
127
  return 0;
128 89e893e2 Luca
}
129
130 e99600d8 Csaba Kiraly
void chunk_attributes_fill(struct chunk* c)
131
{
132
  struct chunk_attributes * ca;
133
134
  assert(!c->attributes && c->attributes_size == 0);
135
136
  c->attributes_size = sizeof(struct chunk_attributes);
137 74a5d4ae CsabaKiraly
  c->attributes = ca = malloc(c->attributes_size);
138 e99600d8 Csaba Kiraly
139 f5d9663e Csaba Kiraly
  ca->deadline = c->timestamp;
140
  ca->deadline_increment = 2;
141 e99600d8 Csaba Kiraly
  ca->hopcount = 0;
142
}
143
144 ccfc425d Csaba Kiraly
int chunk_get_hopcount(struct chunk* c) {
145
  struct chunk_attributes * ca;
146
147
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
148 84ff82ba CsabaKiraly
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
149 ccfc425d Csaba Kiraly
    return -1;
150
  }
151
152
  ca = (struct chunk_attributes *) c->attributes;
153
  return ca->hopcount;
154
}
155
156 74a5d4ae CsabaKiraly
void chunk_attributes_update_received(struct chunk* c)
157 e99600d8 Csaba Kiraly
{
158
  struct chunk_attributes * ca;
159
160
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
161 84ff82ba CsabaKiraly
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
162 e99600d8 Csaba Kiraly
    return;
163
  }
164
165
  ca = (struct chunk_attributes *) c->attributes;
166
  ca->hopcount++;
167
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
168
}
169
170 43dd1a10 Csaba Kiraly
void chunk_attributes_update_sending(const struct chunk* c)
171 f5d9663e Csaba Kiraly
{
172
  struct chunk_attributes * ca;
173
174
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
175 cf6aaf5b Csaba Kiraly
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
176 f5d9663e Csaba Kiraly
    return;
177
  }
178
179
  ca = (struct chunk_attributes *) c->attributes;
180
  ca->deadline += ca->deadline_increment;
181
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
182
}
183
184 2b97cbf1 Csaba Kiraly
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
185
{
186
  struct chunk *chunks;
187 74a5d4ae CsabaKiraly
  int num_chunks, i;
188 b31bd3a2 CsabaKiraly
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
189 2b97cbf1 Csaba Kiraly
  chunks = cb_get_chunks(chbuf, &num_chunks);
190
191 b5462b05 Csaba Kiraly
  for(i=num_chunks-1; i>=0; i--) {
192 2b97cbf1 Csaba Kiraly
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
193
  }
194
  return my_bmap;
195
}
196
197 d447f71d Csaba Kiraly
// a simple implementation that request everything that we miss ... up to max deliver
198 8e750be6 Csaba Kiraly
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
199 2b97cbf1 Csaba Kiraly
  struct chunkID_set *cset_acc, *my_bmap;
200 d447f71d Csaba Kiraly
  int i, d, cset_off_size;
201 8dd1eccd Csaba Kiraly
  //double lossrate;
202 f9630d20 Csaba Kiraly
  struct peer *from = nodeid_to_peer(fromid, 0);
203 d447f71d Csaba Kiraly
204 0781f344 Csaba Kiraly
  cset_acc = chunkID_set_init("size=0");
205 e735a1b1 Csaba Kiraly
206
  //reduce load a little bit if there are losses on the path from this guy
207 8dd1eccd Csaba Kiraly
  //lossrate = get_lossrate_receive(from->id);
208
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
209
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
210 e735a1b1 Csaba Kiraly
    my_bmap = cb_to_bmap(cb);
211
    cset_off_size = chunkID_set_size(cset_off);
212
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
213
      int chunkid = chunkID_set_get_chunk(cset_off, i);
214 702769ac Csaba Kiraly
      //dprintf("\tdo I need c%d ? :",chunkid);
215 e735a1b1 Csaba Kiraly
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
216
        chunkID_set_add_chunk(cset_acc, chunkid);
217
        chunk_lock(chunkid,from);
218 f9630d20 Csaba Kiraly
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
219 cff93a07 Csaba Kiraly
#ifdef MONL
220 f9630d20 Csaba Kiraly
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
221 cff93a07 Csaba Kiraly
#endif
222
        dprintf("\n");
223 e735a1b1 Csaba Kiraly
        d++;
224
      }
225 d447f71d Csaba Kiraly
    }
226 e735a1b1 Csaba Kiraly
    chunkID_set_free(my_bmap);
227 8dd1eccd Csaba Kiraly
  //} else {
228 f9630d20 Csaba Kiraly
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
229 8dd1eccd Csaba Kiraly
  //}
230 d447f71d Csaba Kiraly
231
  return cset_acc;
232
}
233
234 b0225995 Csaba Kiraly
void send_bmap(struct nodeID *toid)
235 22ebd96d Csaba Kiraly
{
236
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
237 b0225995 Csaba Kiraly
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
238 c8c4c779 Csaba Kiraly
  chunkID_set_free(my_bmap);
239 a1a9e662 Csaba Kiraly
}
240
241 a204d648 Csaba Kiraly
void bcast_bmap()
242
{
243
  int i, n;
244
  struct peer *neighbours;
245
  struct peerset *pset;
246 ba99da01 Csaba Kiraly
  struct chunkID_set *my_bmap;
247 a204d648 Csaba Kiraly
248
  pset = get_peers();
249
  n = peerset_size(pset);
250
  neighbours = peerset_get_peers(pset);
251
252 ba99da01 Csaba Kiraly
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
253 a204d648 Csaba Kiraly
  for (i = 0; i<n; i++) {
254 ba99da01 Csaba Kiraly
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
255 a204d648 Csaba Kiraly
  }
256 ba99da01 Csaba Kiraly
  chunkID_set_free(my_bmap);
257 a204d648 Csaba Kiraly
}
258
259 0ebdcf82 Csaba Kiraly
void send_ack(struct nodeID *toid, uint16_t trans_id)
260
{
261
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
262
  sendAck(toid, my_bmap,trans_id);
263
  chunkID_set_free(my_bmap);
264
}
265
266 68330740 Csaba Kiraly
double get_average_lossrate_pset(struct peerset *pset)
267
{
268
  int i, n;
269
  struct peer *neighbours;
270
271
  n = peerset_size(pset);
272
  neighbours = peerset_get_peers(pset);
273
  {
274 74a5d4ae CsabaKiraly
    struct nodeID *nodeids[n];
275 68330740 Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
276 5fc04950 Csaba Kiraly
#ifdef MONL
277 68330740 Csaba Kiraly
    return get_average_lossrate(nodeids, n);
278 5fc04950 Csaba Kiraly
#else
279
    return 0;
280
#endif
281 68330740 Csaba Kiraly
  }
282
}
283
284 37ba82c3 Csaba Kiraly
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
285 5b95417d Csaba Kiraly
{
286
  //reduce load a little bit if there are losses on the path from this guy
287
  double average_lossrate = get_average_lossrate_pset(get_peers());
288
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
289
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
290
    return;
291
  }
292 37ba82c3 Csaba Kiraly
  send_ack(from, trans_id);        //send explicit ack
293 5b95417d Csaba Kiraly
}
294
295 74a5d4ae CsabaKiraly
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
296 89e893e2 Luca
{
297
  int res;
298 52b7c5ea Luca
  static struct chunk c;
299 30c9739a Csaba Kiraly
  struct peer *p;
300 a204d648 Csaba Kiraly
  static int bcast_cnt;
301 397f3860 Csaba Kiraly
  uint16_t transid;
302 89e893e2 Luca
303 a1c01ccf Csaba Kiraly
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
304 89e893e2 Luca
  if (res > 0) {
305 e99600d8 Csaba Kiraly
    chunk_attributes_update_received(&c);
306 6ac1e106 Csaba Kiraly
    chunk_unlock(c.id);
307 13d85fc6 Csaba Kiraly
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
308 84ff82ba CsabaKiraly
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));}
309 89e893e2 Luca
    output_deliver(&c);
310
    res = cb_add_chunk(cb, &c);
311 14e5c21e Csaba Kiraly
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
312 fc0260a4 Csaba Kiraly
    cb_print();
313 89e893e2 Luca
    if (res < 0) {
314 13d85fc6 Csaba Kiraly
      dprintf("\tchunk too old, buffer full with newer chunks\n");
315 84ff82ba CsabaKiraly
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
316 89e893e2 Luca
      free(c.data);
317
      free(c.attributes);
318
    }
319 13d4c180 Csaba Kiraly
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
320 30c9739a Csaba Kiraly
    if (p) {        //now we have it almost sure
321
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
322
    }
323 37ba82c3 Csaba Kiraly
    ack_chunk(&c, from, transid);        //send explicit ack
324 a204d648 Csaba Kiraly
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
325
       bcast_bmap();
326
    }
327 ae3c4aa9 Csaba Kiraly
  } else {
328
    fprintf(stderr,"\tError: can't decode chunk!\n");
329 89e893e2 Luca
  }
330
}
331
332 685225b2 CsabaKiraly
struct chunk *generated_chunk(suseconds_t *delta)
333 89e893e2 Luca
{
334 685225b2 CsabaKiraly
  struct chunk *c;
335
336
  c = malloc(sizeof(struct chunk));
337
  if (!c) {
338
    fprintf(stderr, "Memory allocation error!\n");
339
    return NULL;
340
  }
341 89e893e2 Luca
342 685225b2 CsabaKiraly
  *delta = input_get(input, c);
343 afdc8db4 Luca Abeni
  if (*delta < 0) {
344 4bb789ed Luca
    fprintf(stderr, "Error in input!\n");
345
    exit(-1);
346
  }
347 685225b2 CsabaKiraly
  if (c->data == NULL) {
348
    free(c);
349
    return NULL;
350 4bb789ed Luca
  }
351 685225b2 CsabaKiraly
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
352
  chunk_attributes_fill(c);
353
  return c;
354
}
355
356
int add_chunk(struct chunk *c)
357
{
358
  int res;
359
360
  res = cb_add_chunk(cb, c);
361 5fb5ecd4 Luca
  if (res < 0) {
362 685225b2 CsabaKiraly
    free(c->data);
363
    free(c->attributes);
364
    free(c);
365
    return 0;
366 89e893e2 Luca
  }
367 685225b2 CsabaKiraly
  free(c);
368 afdc8db4 Luca Abeni
  return 1;
369 89e893e2 Luca
}
370
371 4367dafd Csaba Kiraly
/**
372 d74bc79c Csaba Kiraly
 *example function to filter chunks based on whether a given peer needs them.
373
 *
374
 * Looks at buffermap information received about the given peer.
375 4367dafd Csaba Kiraly
 */
376 7b86e7d9 Csaba Kiraly
int needs(struct peer *n, int cid){
377
  struct peer * p = n;
378 1b7da906 Csaba Kiraly
379 3cd33bb0 Csaba Kiraly
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
380 d74bc79c Csaba Kiraly
  if (! p->bmap) {
381 3cd33bb0 Csaba Kiraly
    //dprintf("no bmap\n");
382 d74bc79c Csaba Kiraly
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
383
  }
384 b44ae8d2 CsabaKiraly
  return _needs(p->bmap, p->cb_size, cid);
385 742dfaec Csaba Kiraly
}
386 d74bc79c Csaba Kiraly
387 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid){
388 7c2ac59e CsabaKiraly
  if (cb_size == 0) { //if it declared it does not needs chunks
389
    return 0;
390
  }
391
392 742dfaec Csaba Kiraly
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
393 d74bc79c Csaba Kiraly
    int missing, min;
394
    //@TODO: add some bmap_timestamp based logic
395
396 742dfaec Csaba Kiraly
    if (chunkID_set_size(cset) == 0) {
397 3cd33bb0 Csaba Kiraly
      //dprintf("bmap empty\n");
398 d74bc79c Csaba Kiraly
      return 1;        // if the bmap seems empty, it needs the chunk
399
    }
400 742dfaec Csaba Kiraly
    missing = cb_size - chunkID_set_size(cset);
401 d74bc79c Csaba Kiraly
    missing = missing < 0 ? 0 : missing;
402 eb42de41 Csaba Kiraly
    min = chunkID_set_get_earliest(cset);
403 3cd33bb0 Csaba Kiraly
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
404 742dfaec Csaba Kiraly
    return (cid >= min - missing);
405 d74bc79c Csaba Kiraly
  }
406
407 3cd33bb0 Csaba Kiraly
  //dprintf("has it\n");
408 d74bc79c Csaba Kiraly
  return 0;
409 4367dafd Csaba Kiraly
}
410 d74bc79c Csaba Kiraly
411 7b86e7d9 Csaba Kiraly
double peerWeightReceivedfrom(struct peer **n){
412
  struct peer * p = *n;
413 1b7da906 Csaba Kiraly
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
414 4367dafd Csaba Kiraly
}
415 eb4ecf1c Csaba Kiraly
416 8bb39b0b Csaba Kiraly
double peerWeightUniform(struct peer **n){
417 eb4ecf1c Csaba Kiraly
  return 1;
418
}
419
420 7b86e7d9 Csaba Kiraly
double peerWeightRtt(struct peer **n){
421 d4a680a0 Csaba Kiraly
#ifdef MONL
422 7b86e7d9 Csaba Kiraly
  double rtt = get_rtt(*n->id);
423 1b7da906 Csaba Kiraly
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
424 3ffbcf2d Csaba Kiraly
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
425 d4a680a0 Csaba Kiraly
#else
426
  return 1;
427 5fc04950 Csaba Kiraly
#endif
428 d4a680a0 Csaba Kiraly
}
429 4c0ba13e Csaba Kiraly
430 ea084625 Csaba Kiraly
//ordering function for ELp peer selection, chunk ID based
431
//can't be used as weight
432 1b7da906 Csaba Kiraly
double peerScoreELpID(struct nodeID **n){
433 ea084625 Csaba Kiraly
  struct chunkID_set *bmap;
434
  int latest;
435 1b7da906 Csaba Kiraly
  struct peer * p = nodeid_to_peer(*n, 0);
436
  if (!p) return 0;
437 ea084625 Csaba Kiraly
438 1b7da906 Csaba Kiraly
  bmap = p->bmap;
439 ea084625 Csaba Kiraly
  if (!bmap) return 0;
440
  latest = chunkID_set_get_latest(bmap);
441
  if (latest == INT_MIN) return 0;
442
443
  return -latest;
444
}
445
446 97ab5c93 Csaba Kiraly
double chunkScoreChunkID(int *cid){
447
  return (double) *cid;
448
}
449
450 74a5d4ae CsabaKiraly
double getChunkTimestamp(int *cid){
451 43dd1a10 Csaba Kiraly
  const struct chunk *c = cb_get_chunk(cb, *cid);
452 1b7da906 Csaba Kiraly
  if (!c) return 0;
453
454
  return (double) c->timestamp;
455 4367dafd Csaba Kiraly
}
456
457 8e750be6 Csaba Kiraly
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
458 8e85b948 Alessandro Russo
  int i, d, cset_acc_size, res;
459 b13977c0 Csaba Kiraly
  struct peer *to = nodeid_to_peer(toid, 0);
460 d447f71d Csaba Kiraly
461
  cset_acc_size = chunkID_set_size(cset_acc);
462 e2c563e7 Csaba Kiraly
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
463 d447f71d Csaba Kiraly
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
464 43dd1a10 Csaba Kiraly
    const struct chunk *c;
465 d447f71d Csaba Kiraly
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
466
    c = cb_get_chunk(cb, chunkid);
467 f8286367 Csaba Kiraly
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
468 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
469 033319c7 Csaba Kiraly
      res = sendChunk(toid, c, trans_id);
470 d447f71d Csaba Kiraly
      if (res >= 0) {
471 f8286367 Csaba Kiraly
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
472 d447f71d Csaba Kiraly
        d++;
473 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
474 0adbb1ef Csaba Kiraly
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);}
475 96b8d0a8 Csaba Kiraly
      } else {
476
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
477 d447f71d Csaba Kiraly
      }
478
    }
479
  }
480
}
481
482 dc87dca9 Csaba Kiraly
int offer_peer_count()
483
{
484
  return offer_per_tick;
485
}
486
487 74a5d4ae CsabaKiraly
int offer_max_deliver(struct nodeID *n)
488 960017bf Csaba Kiraly
{
489 10fd812c Csaba Kiraly
490
  if (!heuristics_distance_maxdeliver) return 1;
491
492 5fc04950 Csaba Kiraly
#ifdef MONL
493 1b7da906 Csaba Kiraly
  switch (get_hopcount(n)) {
494 960017bf Csaba Kiraly
    case 0: return 5;
495
    case 1: return 2;
496
    default: return 1;
497
  }
498 5fc04950 Csaba Kiraly
#else
499
  return 1;
500
#endif
501 960017bf Csaba Kiraly
}
502
503 fcb5c29b Csaba Kiraly
void send_offer()
504 43355360 Csaba Kiraly
{
505
  struct chunk *buff;
506 74a5d4ae CsabaKiraly
  int size, res, i, n;
507 43355360 Csaba Kiraly
  struct peer *neighbours;
508 fcb5c29b Csaba Kiraly
  struct peerset *pset;
509 43355360 Csaba Kiraly
510 fcb5c29b Csaba Kiraly
  pset = get_peers();
511 43355360 Csaba Kiraly
  n = peerset_size(pset);
512
  neighbours = peerset_get_peers(pset);
513
  dprintf("Send Offer: %d neighbours\n", n);
514
  if (n == 0) return;
515
  buff = cb_get_chunks(cb, &size);
516
  if (size == 0) return;
517
518
  {
519 dc87dca9 Csaba Kiraly
    size_t selectedpeers_len = offer_peer_count();
520 1b7da906 Csaba Kiraly
    int chunkids[size];
521 8bb39b0b Csaba Kiraly
    struct peer *nodeids[n];
522
    struct peer *selectedpeers[selectedpeers_len];
523 43355360 Csaba Kiraly
524 abbef5a5 Csaba Kiraly
    //reduce load a little bit if there are losses on the path from this guy
525 68330740 Csaba Kiraly
    double average_lossrate = get_average_lossrate_pset(pset);
526 abbef5a5 Csaba Kiraly
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
527 f76dc385 Csaba Kiraly
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
528 abbef5a5 Csaba Kiraly
      return;
529
    }
530
531 ffcc70ad CsabaKiraly
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
532 8bb39b0b Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
533 851fa962 Csaba Kiraly
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
534 43355360 Csaba Kiraly
535
    for (i=0; i<selectedpeers_len ; i++){
536 8bb39b0b Csaba Kiraly
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
537 43355360 Csaba Kiraly
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
538 8bb39b0b Csaba Kiraly
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
539
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
540 ddedf85f Csaba Kiraly
      chunkID_set_free(my_bmap);
541 43355360 Csaba Kiraly
    }
542
  }
543
}
544
545 4367dafd Csaba Kiraly
546 fcb5c29b Csaba Kiraly
void send_chunk()
547 89e893e2 Luca
{
548
  struct chunk *buff;
549 74a5d4ae CsabaKiraly
  int size, res, i, n;
550 924226c0 Luca Abeni
  struct peer *neighbours;
551 fcb5c29b Csaba Kiraly
  struct peerset *pset;
552 1cd06c26 CsabaKiraly
553 fcb5c29b Csaba Kiraly
  pset = get_peers();
554 0f35d029 Csaba Kiraly
  n = peerset_size(pset);
555
  neighbours = peerset_get_peers(pset);
556 e64fc7e5 Luca
  dprintf("Send Chunk: %d neighbours\n", n);
557 89e893e2 Luca
  if (n == 0) return;
558
  buff = cb_get_chunks(cb, &size);
559 e64fc7e5 Luca
  dprintf("\t %d chunks in buffer...\n", size);
560 89e893e2 Luca
  if (size == 0) return;
561
562
  /************ STUPID DUMB SCHEDULING ****************/
563 4367dafd Csaba Kiraly
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
564
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
565 89e893e2 Luca
  /************ /STUPID DUMB SCHEDULING ****************/
566
567 4367dafd Csaba Kiraly
  /************ USE SCHEDULER ****************/
568 924226c0 Luca Abeni
  {
569
    size_t selectedpairs_len = 1;
570 1b7da906 Csaba Kiraly
    int chunkids[size];
571 7b86e7d9 Csaba Kiraly
    struct peer *nodeids[n];
572 924226c0 Luca Abeni
    struct PeerChunk selectedpairs[1];
573
  
574 1b7da906 Csaba Kiraly
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
575 7b86e7d9 Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
576 0fec1310 Csaba Kiraly
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
577 4367dafd Csaba Kiraly
  /************ /USE SCHEDULER ****************/
578
579 924226c0 Luca Abeni
    for (i=0; i<selectedpairs_len ; i++){
580 7b86e7d9 Csaba Kiraly
      struct peer *p = selectedpairs[i].peer;
581 74a5d4ae CsabaKiraly
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
582 924226c0 Luca Abeni
      dprintf("\t sending chunk[%d] to ", c->id);
583
      dprintf("%s\n", node_addr(p->id));
584
585 b0225995 Csaba Kiraly
      send_bmap(p->id);
586 bc1ddc15 MatteoSammarco
587 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
588 033319c7 Csaba Kiraly
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
589 84ff82ba CsabaKiraly
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
590 13d85fc6 Csaba Kiraly
      dprintf("\tResult: %d\n", res);
591 924226c0 Luca Abeni
      if (res>=0) {
592
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
593 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
594 96b8d0a8 Csaba Kiraly
      } else {
595
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
596 924226c0 Luca Abeni
      }
597 0f35d029 Csaba Kiraly
    }
598 4367dafd Csaba Kiraly
  }
599 89e893e2 Luca
}