Statistics
| Branch: | Revision:

streamers / streaming.c @ e3f73a68

History | View | Annotate | Download (18.7 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 1cd06c26 CsabaKiraly
#include <sys/time.h>
8 89e893e2 Luca
#include <stdlib.h>
9
#include <stdio.h>
10
#include <stdint.h>
11 30a6e902 Csaba Kiraly
#include <stdbool.h>
12 4c0ba13e Csaba Kiraly
#include <math.h>
13 e99600d8 Csaba Kiraly
#include <assert.h>
14 f14985ba Luca Abeni
#include <string.h>
15 84ff82ba CsabaKiraly
#include <inttypes.h>
16 89e893e2 Luca
17
#include <net_helper.h>
18
#include <chunk.h> 
19
#include <chunkbuffer.h> 
20
#include <trade_msg_la.h>
21
#include <trade_msg_ha.h>
22 0f35d029 Csaba Kiraly
#include <peerset.h>
23
#include <peer.h>
24 e98d8f50 Csaba Kiraly
#include <chunkidset.h>
25 ea084625 Csaba Kiraly
#include <limits.h>
26 6546a0c0 Alessandro Russo
#include <trade_sig_ha.h>
27 58811a24 Csaba Kiraly
#include <chunkiser_attrib.h>
28 89e893e2 Luca
29
#include "streaming.h"
30 d5f60f90 Csaba Kiraly
#include "streamer.h"
31 89e893e2 Luca
#include "output.h"
32
#include "input.h"
33 e64fc7e5 Luca
#include "dbg.h"
34 b44ae8d2 CsabaKiraly
#include "chunk_signaling.h"
35 abd2ef3b Csaba Kiraly
#include "chunklock.h"
36 fcb5c29b Csaba Kiraly
#include "topology.h"
37 4c0ba13e Csaba Kiraly
#include "measures.h"
38 0fec1310 Csaba Kiraly
#include "scheduling.h"
39 50f1ec20 Csaba Kiraly
#include "transaction.h"
40 89e893e2 Luca
41 4367dafd Csaba Kiraly
#include "scheduler_la.h"
42
43 9fd49433 Csaba Kiraly
# define CB_SIZE_TIME_UNLIMITED 1e12
44
uint64_t CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;        //in millisec, defaults to unlimited
45 d263a301 Csaba Kiraly
46 d88f86de Csaba Kiraly
static bool heuristics_distance_maxdeliver = false;
47 a204d648 Csaba Kiraly
static int bcast_after_receive_every = 0;
48 59b87dad Csaba Kiraly
static bool neigh_on_chunk_recv = false;
49 0e5fe687 Csaba Kiraly
static bool send_bmap_before_push = false;
50 10fd812c Csaba Kiraly
51 e99600d8 Csaba Kiraly
struct chunk_attributes {
52 f5d9663e Csaba Kiraly
  uint64_t deadline;
53
  uint16_t deadline_increment;
54 e99600d8 Csaba Kiraly
  uint16_t hopcount;
55
} __attribute__((packed));
56
57 1cd06c26 CsabaKiraly
extern bool chunk_log;
58 bc1ddc15 MatteoSammarco
59 03dca3bf ArpadBakay
struct chunk_buffer *cb;
60 709f774c Luca
static struct input_desc *input;
61 8c1b2832 Csaba Kiraly
static int cb_size;
62 89e893e2 Luca
63 dc87dca9 Csaba Kiraly
static int offer_per_tick = 1;        //N_p parameter of POLITO
64
65 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid);
66
67 1cd06c26 CsabaKiraly
uint64_t gettimeofday_in_us(void)
68
{
69
  struct timeval what_time; //to store the epoch time
70
71
  gettimeofday(&what_time, NULL);
72
  return what_time.tv_sec * 1000000ULL + what_time.tv_usec;
73
}
74
75 d3a242ab Csaba Kiraly
void cb_print()
76
{
77 b45e7201 Csaba Kiraly
#ifdef DEBUG
78 d3a242ab Csaba Kiraly
  struct chunk *chunks;
79
  int num_chunks, i, id;
80
  chunks = cb_get_chunks(cb, &num_chunks);
81
82
  dprintf("\tchbuf :");
83
  i = 0;
84
  if(num_chunks) {
85
    id = chunks[0].id;
86
    dprintf(" %d-> ",id);
87
    while (i < num_chunks) {
88
      if (id == chunks[i].id) {
89
        dprintf("%d",id % 10);
90
        i++;
91 2314ccb7 Csaba Kiraly
      } else if (chunk_islocked(id)) {
92
        dprintf("*");
93 d3a242ab Csaba Kiraly
      } else {
94
        dprintf(".");
95
      }
96
      id++;
97
    }
98
  }
99
  dprintf("\n");
100 b45e7201 Csaba Kiraly
#endif
101 d3a242ab Csaba Kiraly
}
102
103 6920fdab Luca
void stream_init(int size, struct nodeID *myID)
104 89e893e2 Luca
{
105 8612d586 CsabaKiraly
  static char conf[32];
106 89e893e2 Luca
107 8c1b2832 Csaba Kiraly
  cb_size = size;
108
109
  sprintf(conf, "size=%d", cb_size);
110 89e893e2 Luca
  cb = cb_init(conf);
111 1b6a3ea7 Csaba Kiraly
  chunkDeliveryInit(myID);
112 513e75ef Alessandro Russo
  chunkSignalingInit(myID);
113 95faef91 CsabaKiraly
  init_measures();
114 7442ecb3 Luca
}
115
116 03de31e0 Csaba Kiraly
int source_init(const char *fname, struct nodeID *myID, int *fds, int fds_size, int buff_size)
117 7442ecb3 Luca
{
118 03de31e0 Csaba Kiraly
  input = input_open(fname, fds, fds_size);
119 7442ecb3 Luca
  if (input == NULL) {
120
    return -1;
121
  }
122
123 14893aa5 CsabaKiraly
  stream_init(buff_size, myID);
124 7442ecb3 Luca
  return 0;
125 89e893e2 Luca
}
126
127 e99600d8 Csaba Kiraly
void chunk_attributes_fill(struct chunk* c)
128
{
129
  struct chunk_attributes * ca;
130 58811a24 Csaba Kiraly
  int priority = 1;
131 e99600d8 Csaba Kiraly
132 58811a24 Csaba Kiraly
  assert((!c->attributes && c->attributes_size == 0) ||
133
         chunk_attributes_chunker_verify(c->attributes, c->attributes_size));
134
135
  if (chunk_attributes_chunker_verify(c->attributes, c->attributes_size)) {
136
    priority = ((struct chunk_attributes_chunker*) c->attributes)->priority;
137
    free(c->attributes);
138
    c->attributes = NULL;
139
    c->attributes_size = 0;
140
  }
141 e99600d8 Csaba Kiraly
142
  c->attributes_size = sizeof(struct chunk_attributes);
143 74a5d4ae CsabaKiraly
  c->attributes = ca = malloc(c->attributes_size);
144 e99600d8 Csaba Kiraly
145 e13e2d0e Csaba Kiraly
  ca->deadline = c->id;
146 58811a24 Csaba Kiraly
  ca->deadline_increment = priority * 2;
147 e99600d8 Csaba Kiraly
  ca->hopcount = 0;
148
}
149
150 ccfc425d Csaba Kiraly
int chunk_get_hopcount(struct chunk* c) {
151
  struct chunk_attributes * ca;
152
153
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
154 e3f73a68 ArpadBakay
    fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%u\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
155 ccfc425d Csaba Kiraly
    return -1;
156
  }
157
158
  ca = (struct chunk_attributes *) c->attributes;
159
  return ca->hopcount;
160
}
161
162 74a5d4ae CsabaKiraly
void chunk_attributes_update_received(struct chunk* c)
163 e99600d8 Csaba Kiraly
{
164
  struct chunk_attributes * ca;
165
166
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
167 e3f73a68 ArpadBakay
    fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%u\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes));
168 e99600d8 Csaba Kiraly
    return;
169
  }
170
171
  ca = (struct chunk_attributes *) c->attributes;
172
  ca->hopcount++;
173
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
174
}
175
176 43dd1a10 Csaba Kiraly
void chunk_attributes_update_sending(const struct chunk* c)
177 f5d9663e Csaba Kiraly
{
178
  struct chunk_attributes * ca;
179
180
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
181 cf6aaf5b Csaba Kiraly
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
182 f5d9663e Csaba Kiraly
    return;
183
  }
184
185
  ca = (struct chunk_attributes *) c->attributes;
186
  ca->deadline += ca->deadline_increment;
187 65626579 Csaba Kiraly
  dprintf("Sending chunk %d with deadline %lu (increment: %d)\n", c->id, ca->deadline, ca->deadline_increment);
188 f5d9663e Csaba Kiraly
}
189
190 2b97cbf1 Csaba Kiraly
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
191
{
192
  struct chunk *chunks;
193 74a5d4ae CsabaKiraly
  int num_chunks, i;
194 b31bd3a2 CsabaKiraly
  struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
195 2b97cbf1 Csaba Kiraly
  chunks = cb_get_chunks(chbuf, &num_chunks);
196
197 b5462b05 Csaba Kiraly
  for(i=num_chunks-1; i>=0; i--) {
198 2b97cbf1 Csaba Kiraly
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
199
  }
200
  return my_bmap;
201
}
202
203 d447f71d Csaba Kiraly
// a simple implementation that request everything that we miss ... up to max deliver
204 8e750be6 Csaba Kiraly
struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){
205 2b97cbf1 Csaba Kiraly
  struct chunkID_set *cset_acc, *my_bmap;
206 d447f71d Csaba Kiraly
  int i, d, cset_off_size;
207 8dd1eccd Csaba Kiraly
  //double lossrate;
208 f9630d20 Csaba Kiraly
  struct peer *from = nodeid_to_peer(fromid, 0);
209 d447f71d Csaba Kiraly
210 0781f344 Csaba Kiraly
  cset_acc = chunkID_set_init("size=0");
211 e735a1b1 Csaba Kiraly
212
  //reduce load a little bit if there are losses on the path from this guy
213 8dd1eccd Csaba Kiraly
  //lossrate = get_lossrate_receive(from->id);
214
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
215
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
216 e735a1b1 Csaba Kiraly
    my_bmap = cb_to_bmap(cb);
217
    cset_off_size = chunkID_set_size(cset_off);
218
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
219
      int chunkid = chunkID_set_get_chunk(cset_off, i);
220 702769ac Csaba Kiraly
      //dprintf("\tdo I need c%d ? :",chunkid);
221 e735a1b1 Csaba Kiraly
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
222
        chunkID_set_add_chunk(cset_acc, chunkid);
223
        chunk_lock(chunkid,from);
224 f9630d20 Csaba Kiraly
        dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
225 cff93a07 Csaba Kiraly
#ifdef MONL
226 f9630d20 Csaba Kiraly
        dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
227 cff93a07 Csaba Kiraly
#endif
228
        dprintf("\n");
229 e735a1b1 Csaba Kiraly
        d++;
230
      }
231 d447f71d Csaba Kiraly
    }
232 e735a1b1 Csaba Kiraly
    chunkID_set_free(my_bmap);
233 8dd1eccd Csaba Kiraly
  //} else {
234 f9630d20 Csaba Kiraly
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
235 8dd1eccd Csaba Kiraly
  //}
236 d447f71d Csaba Kiraly
237 f8373956 Csaba Kiraly
  reg_offer_accept_in(chunkID_set_size(cset_acc) > 0 ? 1 : 0);
238
239 d447f71d Csaba Kiraly
  return cset_acc;
240
}
241
242 b0225995 Csaba Kiraly
void send_bmap(struct nodeID *toid)
243 22ebd96d Csaba Kiraly
{
244
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
245 b0225995 Csaba Kiraly
   sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0);
246 c8c4c779 Csaba Kiraly
  chunkID_set_free(my_bmap);
247 a1a9e662 Csaba Kiraly
}
248
249 a204d648 Csaba Kiraly
void bcast_bmap()
250
{
251
  int i, n;
252
  struct peer *neighbours;
253
  struct peerset *pset;
254 ba99da01 Csaba Kiraly
  struct chunkID_set *my_bmap;
255 a204d648 Csaba Kiraly
256
  pset = get_peers();
257
  n = peerset_size(pset);
258
  neighbours = peerset_get_peers(pset);
259
260 ba99da01 Csaba Kiraly
  my_bmap = cb_to_bmap(cb);        //cache our bmap for faster processing
261 a204d648 Csaba Kiraly
  for (i = 0; i<n; i++) {
262 ba99da01 Csaba Kiraly
    sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0);
263 a204d648 Csaba Kiraly
  }
264 ba99da01 Csaba Kiraly
  chunkID_set_free(my_bmap);
265 a204d648 Csaba Kiraly
}
266
267 0ebdcf82 Csaba Kiraly
void send_ack(struct nodeID *toid, uint16_t trans_id)
268
{
269
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
270
  sendAck(toid, my_bmap,trans_id);
271
  chunkID_set_free(my_bmap);
272
}
273
274 68330740 Csaba Kiraly
double get_average_lossrate_pset(struct peerset *pset)
275
{
276
  int i, n;
277
  struct peer *neighbours;
278
279
  n = peerset_size(pset);
280
  neighbours = peerset_get_peers(pset);
281
  {
282 74a5d4ae CsabaKiraly
    struct nodeID *nodeids[n];
283 68330740 Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
284 5fc04950 Csaba Kiraly
#ifdef MONL
285 68330740 Csaba Kiraly
    return get_average_lossrate(nodeids, n);
286 5fc04950 Csaba Kiraly
#else
287
    return 0;
288
#endif
289 68330740 Csaba Kiraly
  }
290
}
291
292 37ba82c3 Csaba Kiraly
void ack_chunk(struct chunk *c, struct nodeID *from, uint16_t trans_id)
293 5b95417d Csaba Kiraly
{
294
  //reduce load a little bit if there are losses on the path from this guy
295
  double average_lossrate = get_average_lossrate_pset(get_peers());
296
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
297
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
298
    return;
299
  }
300 37ba82c3 Csaba Kiraly
  send_ack(from, trans_id);        //send explicit ack
301 5b95417d Csaba Kiraly
}
302
303 74a5d4ae CsabaKiraly
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
304 89e893e2 Luca
{
305
  int res;
306 52b7c5ea Luca
  static struct chunk c;
307 30c9739a Csaba Kiraly
  struct peer *p;
308 a204d648 Csaba Kiraly
  static int bcast_cnt;
309 397f3860 Csaba Kiraly
  uint16_t transid;
310 89e893e2 Luca
311 a1c01ccf Csaba Kiraly
  res = parseChunkMsg(buff + 1, len - 1, &c, &transid);
312 89e893e2 Luca
  if (res > 0) {
313 e99600d8 Csaba Kiraly
    chunk_attributes_update_received(&c);
314 6ac1e106 Csaba Kiraly
    chunk_unlock(c.id);
315 13d85fc6 Csaba Kiraly
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
316 6853bc18 Csaba Kiraly
    if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i Size: %d bytes\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c), c.size);}
317 89e893e2 Luca
    output_deliver(&c);
318
    res = cb_add_chunk(cb, &c);
319 14e5c21e Csaba Kiraly
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE);
320 fc0260a4 Csaba Kiraly
    cb_print();
321 89e893e2 Luca
    if (res < 0) {
322 13d85fc6 Csaba Kiraly
      dprintf("\tchunk too old, buffer full with newer chunks\n");
323 84ff82ba CsabaKiraly
      if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());}
324 89e893e2 Luca
      free(c.data);
325
      free(c.attributes);
326
    }
327 13d4c180 Csaba Kiraly
    p = nodeid_to_peer(from, neigh_on_chunk_recv);
328 30c9739a Csaba Kiraly
    if (p) {        //now we have it almost sure
329
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
330 c4aa43f8 Csaba Kiraly
      gettimeofday(&p->bmap_timestamp, NULL);
331 30c9739a Csaba Kiraly
    }
332 37ba82c3 Csaba Kiraly
    ack_chunk(&c, from, transid);        //send explicit ack
333 a204d648 Csaba Kiraly
    if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) {
334
       bcast_bmap();
335
    }
336 ae3c4aa9 Csaba Kiraly
  } else {
337
    fprintf(stderr,"\tError: can't decode chunk!\n");
338 89e893e2 Luca
  }
339
}
340
341 685225b2 CsabaKiraly
struct chunk *generated_chunk(suseconds_t *delta)
342 89e893e2 Luca
{
343 685225b2 CsabaKiraly
  struct chunk *c;
344
345
  c = malloc(sizeof(struct chunk));
346
  if (!c) {
347
    fprintf(stderr, "Memory allocation error!\n");
348
    return NULL;
349
  }
350 89e893e2 Luca
351 685225b2 CsabaKiraly
  *delta = input_get(input, c);
352 afdc8db4 Luca Abeni
  if (*delta < 0) {
353 4bb789ed Luca
    fprintf(stderr, "Error in input!\n");
354
    exit(-1);
355
  }
356 685225b2 CsabaKiraly
  if (c->data == NULL) {
357
    free(c);
358
    return NULL;
359 4bb789ed Luca
  }
360 685225b2 CsabaKiraly
  dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
361
  chunk_attributes_fill(c);
362
  return c;
363
}
364
365
int add_chunk(struct chunk *c)
366
{
367
  int res;
368
369
  res = cb_add_chunk(cb, c);
370 5fb5ecd4 Luca
  if (res < 0) {
371 685225b2 CsabaKiraly
    free(c->data);
372
    free(c->attributes);
373
    free(c);
374
    return 0;
375 89e893e2 Luca
  }
376 685225b2 CsabaKiraly
  free(c);
377 afdc8db4 Luca Abeni
  return 1;
378 89e893e2 Luca
}
379
380 2d4a594b Csaba Kiraly
uint64_t get_chunk_timestamp(int cid){
381
  const struct chunk *c = cb_get_chunk(cb, cid);
382
  if (!c) return 0;
383
384
  return c->timestamp;
385
}
386
387 4367dafd Csaba Kiraly
/**
388 d74bc79c Csaba Kiraly
 *example function to filter chunks based on whether a given peer needs them.
389
 *
390
 * Looks at buffermap information received about the given peer.
391 4367dafd Csaba Kiraly
 */
392 7b86e7d9 Csaba Kiraly
int needs(struct peer *n, int cid){
393
  struct peer * p = n;
394 1b7da906 Csaba Kiraly
395 a115d090 Csaba Kiraly
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
396
    uint64_t ts;
397
    ts = get_chunk_timestamp(cid);
398
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
399
      return 0;
400
    }
401 d263a301 Csaba Kiraly
  }
402 1b7da906 Csaba Kiraly
403 3cd33bb0 Csaba Kiraly
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
404 d74bc79c Csaba Kiraly
  if (! p->bmap) {
405 3cd33bb0 Csaba Kiraly
    //dprintf("no bmap\n");
406 d74bc79c Csaba Kiraly
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
407
  }
408 b44ae8d2 CsabaKiraly
  return _needs(p->bmap, p->cb_size, cid);
409 742dfaec Csaba Kiraly
}
410 d74bc79c Csaba Kiraly
411 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid){
412 3656c531 Csaba Kiraly
413 7c2ac59e CsabaKiraly
  if (cb_size == 0) { //if it declared it does not needs chunks
414
    return 0;
415
  }
416
417 a115d090 Csaba Kiraly
  if (CB_SIZE_TIME < CB_SIZE_TIME_UNLIMITED) {
418
    uint64_t ts;
419
    ts = get_chunk_timestamp(cid);
420
    if (ts && (ts < gettimeofday_in_us() - CB_SIZE_TIME)) {        //if we don't know the timestamp, we accept
421
      return 0;
422
    }
423 d263a301 Csaba Kiraly
  }
424
425 742dfaec Csaba Kiraly
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
426 d74bc79c Csaba Kiraly
    int missing, min;
427
    //@TODO: add some bmap_timestamp based logic
428
429 742dfaec Csaba Kiraly
    if (chunkID_set_size(cset) == 0) {
430 3cd33bb0 Csaba Kiraly
      //dprintf("bmap empty\n");
431 d74bc79c Csaba Kiraly
      return 1;        // if the bmap seems empty, it needs the chunk
432
    }
433 742dfaec Csaba Kiraly
    missing = cb_size - chunkID_set_size(cset);
434 d74bc79c Csaba Kiraly
    missing = missing < 0 ? 0 : missing;
435 eb42de41 Csaba Kiraly
    min = chunkID_set_get_earliest(cset);
436 3cd33bb0 Csaba Kiraly
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
437 742dfaec Csaba Kiraly
    return (cid >= min - missing);
438 d74bc79c Csaba Kiraly
  }
439
440 3cd33bb0 Csaba Kiraly
  //dprintf("has it\n");
441 d74bc79c Csaba Kiraly
  return 0;
442 4367dafd Csaba Kiraly
}
443 d74bc79c Csaba Kiraly
444 7b86e7d9 Csaba Kiraly
double peerWeightReceivedfrom(struct peer **n){
445
  struct peer * p = *n;
446 1b7da906 Csaba Kiraly
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
447 4367dafd Csaba Kiraly
}
448 eb4ecf1c Csaba Kiraly
449 8bb39b0b Csaba Kiraly
double peerWeightUniform(struct peer **n){
450 eb4ecf1c Csaba Kiraly
  return 1;
451
}
452
453 7b86e7d9 Csaba Kiraly
double peerWeightRtt(struct peer **n){
454 d4a680a0 Csaba Kiraly
#ifdef MONL
455 9dc9369a Csaba Kiraly
  double rtt = get_rtt((*n)->id);
456 1b7da906 Csaba Kiraly
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
457 3ffbcf2d Csaba Kiraly
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
458 d4a680a0 Csaba Kiraly
#else
459
  return 1;
460 5fc04950 Csaba Kiraly
#endif
461 d4a680a0 Csaba Kiraly
}
462 4c0ba13e Csaba Kiraly
463 ea084625 Csaba Kiraly
//ordering function for ELp peer selection, chunk ID based
464
//can't be used as weight
465 1b7da906 Csaba Kiraly
double peerScoreELpID(struct nodeID **n){
466 ea084625 Csaba Kiraly
  struct chunkID_set *bmap;
467
  int latest;
468 1b7da906 Csaba Kiraly
  struct peer * p = nodeid_to_peer(*n, 0);
469
  if (!p) return 0;
470 ea084625 Csaba Kiraly
471 1b7da906 Csaba Kiraly
  bmap = p->bmap;
472 ea084625 Csaba Kiraly
  if (!bmap) return 0;
473
  latest = chunkID_set_get_latest(bmap);
474
  if (latest == INT_MIN) return 0;
475
476
  return -latest;
477
}
478
479 97ab5c93 Csaba Kiraly
double chunkScoreChunkID(int *cid){
480
  return (double) *cid;
481
}
482
483 54da4e2b Csaba Kiraly
uint64_t get_chunk_deadline(int cid){
484 08f76adc Csaba Kiraly
  const struct chunk_attributes * ca;
485
  const struct chunk *c;
486 54da4e2b Csaba Kiraly
487
  c = cb_get_chunk(cb, cid);
488 1b7da906 Csaba Kiraly
  if (!c) return 0;
489
490 54da4e2b Csaba Kiraly
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
491
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
492
    return 0;
493
  }
494
495
  ca = (struct chunk_attributes *) c->attributes;
496
  return ca->deadline;
497
}
498
499 e13e2d0e Csaba Kiraly
double chunkScoreDL(int *cid){
500 05a1dc0a Csaba Kiraly
  return - (double)get_chunk_deadline(*cid);
501 e13e2d0e Csaba Kiraly
}
502
503 2d4a594b Csaba Kiraly
double chunkScoreTimestamp(int *cid){
504
  return (double) get_chunk_timestamp(*cid);
505 4367dafd Csaba Kiraly
}
506
507 8e750be6 Csaba Kiraly
void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){
508 8e85b948 Alessandro Russo
  int i, d, cset_acc_size, res;
509 b13977c0 Csaba Kiraly
  struct peer *to = nodeid_to_peer(toid, 0);
510 d447f71d Csaba Kiraly
511 fbd478bb Csaba Kiraly
  transaction_reg_accept(trans_id, toid);
512 50f1ec20 Csaba Kiraly
513 d447f71d Csaba Kiraly
  cset_acc_size = chunkID_set_size(cset_acc);
514 f8373956 Csaba Kiraly
  reg_offer_accept_out(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
515 d447f71d Csaba Kiraly
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
516 43dd1a10 Csaba Kiraly
    const struct chunk *c;
517 d447f71d Csaba Kiraly
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
518
    c = cb_get_chunk(cb, chunkid);
519 f8286367 Csaba Kiraly
    if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
520 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
521 033319c7 Csaba Kiraly
      res = sendChunk(toid, c, trans_id);
522 d447f71d Csaba Kiraly
      if (res >= 0) {
523 f8286367 Csaba Kiraly
        if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
524 d447f71d Csaba Kiraly
        d++;
525 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
526 6853bc18 Csaba Kiraly
        if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(toid), gettimeofday_in_us(), res, c->size);}
527 96b8d0a8 Csaba Kiraly
      } else {
528
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
529 d447f71d Csaba Kiraly
      }
530
    }
531
  }
532
}
533
534 dc87dca9 Csaba Kiraly
int offer_peer_count()
535
{
536
  return offer_per_tick;
537
}
538
539 74a5d4ae CsabaKiraly
int offer_max_deliver(struct nodeID *n)
540 960017bf Csaba Kiraly
{
541 10fd812c Csaba Kiraly
542
  if (!heuristics_distance_maxdeliver) return 1;
543
544 5fc04950 Csaba Kiraly
#ifdef MONL
545 1b7da906 Csaba Kiraly
  switch (get_hopcount(n)) {
546 960017bf Csaba Kiraly
    case 0: return 5;
547
    case 1: return 2;
548
    default: return 1;
549
  }
550 5fc04950 Csaba Kiraly
#else
551
  return 1;
552
#endif
553 960017bf Csaba Kiraly
}
554
555 aa2355c3 Csaba Kiraly
static struct chunkID_set * compose_offer_cset(void)
556
{
557 d5f60f90 Csaba Kiraly
  if (am_i_source()) {
558
    struct chunk *chunks;
559
    int num_chunks, j;
560
    struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap");
561
    chunks = cb_get_chunks(cb, &num_chunks);
562
    for(j=((num_chunks-1)*3)/4; j>=0; j--) {
563
      chunkID_set_add_chunk(my_bmap, chunks[j].id);
564
    }
565
    return my_bmap;
566
  } else {
567
    return cb_to_bmap(cb);
568
  }
569 aa2355c3 Csaba Kiraly
}
570
571
572 fcb5c29b Csaba Kiraly
void send_offer()
573 43355360 Csaba Kiraly
{
574
  struct chunk *buff;
575 74a5d4ae CsabaKiraly
  int size, res, i, n;
576 43355360 Csaba Kiraly
  struct peer *neighbours;
577 fcb5c29b Csaba Kiraly
  struct peerset *pset;
578 43355360 Csaba Kiraly
579 fcb5c29b Csaba Kiraly
  pset = get_peers();
580 43355360 Csaba Kiraly
  n = peerset_size(pset);
581
  neighbours = peerset_get_peers(pset);
582
  dprintf("Send Offer: %d neighbours\n", n);
583
  if (n == 0) return;
584
  buff = cb_get_chunks(cb, &size);
585
  if (size == 0) return;
586
587
  {
588 dc87dca9 Csaba Kiraly
    size_t selectedpeers_len = offer_peer_count();
589 1b7da906 Csaba Kiraly
    int chunkids[size];
590 8bb39b0b Csaba Kiraly
    struct peer *nodeids[n];
591
    struct peer *selectedpeers[selectedpeers_len];
592 43355360 Csaba Kiraly
593 abbef5a5 Csaba Kiraly
    //reduce load a little bit if there are losses on the path from this guy
594 68330740 Csaba Kiraly
    double average_lossrate = get_average_lossrate_pset(pset);
595 abbef5a5 Csaba Kiraly
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
596 f76dc385 Csaba Kiraly
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
597 abbef5a5 Csaba Kiraly
      return;
598
    }
599
600 ffcc70ad CsabaKiraly
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
601 8bb39b0b Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
602 e3f73a68 ArpadBakay
    schedSelectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
603 43355360 Csaba Kiraly
604
    for (i=0; i<selectedpeers_len ; i++){
605 aeb87217 Csaba Kiraly
      int transid = transaction_create(selectedpeers[i]->id);
606 8bb39b0b Csaba Kiraly
      int max_deliver = offer_max_deliver(selectedpeers[i]->id);
607 aa2355c3 Csaba Kiraly
      struct chunkID_set *offer_cset = compose_offer_cset();
608 8bb39b0b Csaba Kiraly
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
609 aa2355c3 Csaba Kiraly
      res = offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
610
      chunkID_set_free(offer_cset);
611 43355360 Csaba Kiraly
    }
612
  }
613
}
614
615 4367dafd Csaba Kiraly
616 fcb5c29b Csaba Kiraly
void send_chunk()
617 89e893e2 Luca
{
618
  struct chunk *buff;
619 74a5d4ae CsabaKiraly
  int size, res, i, n;
620 924226c0 Luca Abeni
  struct peer *neighbours;
621 fcb5c29b Csaba Kiraly
  struct peerset *pset;
622 1cd06c26 CsabaKiraly
623 fcb5c29b Csaba Kiraly
  pset = get_peers();
624 0f35d029 Csaba Kiraly
  n = peerset_size(pset);
625
  neighbours = peerset_get_peers(pset);
626 e64fc7e5 Luca
  dprintf("Send Chunk: %d neighbours\n", n);
627 89e893e2 Luca
  if (n == 0) return;
628
  buff = cb_get_chunks(cb, &size);
629 e64fc7e5 Luca
  dprintf("\t %d chunks in buffer...\n", size);
630 89e893e2 Luca
  if (size == 0) return;
631
632
  /************ STUPID DUMB SCHEDULING ****************/
633 4367dafd Csaba Kiraly
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
634
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
635 89e893e2 Luca
  /************ /STUPID DUMB SCHEDULING ****************/
636
637 4367dafd Csaba Kiraly
  /************ USE SCHEDULER ****************/
638 924226c0 Luca Abeni
  {
639
    size_t selectedpairs_len = 1;
640 1b7da906 Csaba Kiraly
    int chunkids[size];
641 7b86e7d9 Csaba Kiraly
    struct peer *nodeids[n];
642 924226c0 Luca Abeni
    struct PeerChunk selectedpairs[1];
643
  
644 96e30de6 Csaba Kiraly
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
645 7b86e7d9 Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i);
646 96e30de6 Csaba Kiraly
    SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK);
647 4367dafd Csaba Kiraly
  /************ /USE SCHEDULER ****************/
648
649 924226c0 Luca Abeni
    for (i=0; i<selectedpairs_len ; i++){
650 7b86e7d9 Csaba Kiraly
      struct peer *p = selectedpairs[i].peer;
651 e3f73a68 ArpadBakay
      const struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
652 924226c0 Luca Abeni
      dprintf("\t sending chunk[%d] to ", c->id);
653
      dprintf("%s\n", node_addr(p->id));
654
655 219e975e Csaba Kiraly
      if (send_bmap_before_push) {
656
        send_bmap(p->id);
657
      }
658 bc1ddc15 MatteoSammarco
659 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
660 033319c7 Csaba Kiraly
      res = sendChunk(p->id, c, 0);        //we do not use transactions in pure push
661 84ff82ba CsabaKiraly
      if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);}
662 13d85fc6 Csaba Kiraly
      dprintf("\tResult: %d\n", res);
663 924226c0 Luca Abeni
      if (res>=0) {
664
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
665 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
666 96b8d0a8 Csaba Kiraly
      } else {
667
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
668 924226c0 Luca Abeni
      }
669 0f35d029 Csaba Kiraly
    }
670 4367dafd Csaba Kiraly
  }
671 89e893e2 Luca
}