Statistics
| Branch: | Revision:

streamers / streaming.c @ 74a5d4ae

History | View | Annotate | Download (13.8 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 89e893e2 Luca
#include <stdlib.h>
8
#include <stdio.h>
9
#include <stdint.h>
10 30a6e902 Csaba Kiraly
#include <stdbool.h>
11 4c0ba13e Csaba Kiraly
#include <math.h>
12 e99600d8 Csaba Kiraly
#include <assert.h>
13 89e893e2 Luca
14
#include <net_helper.h>
15
#include <chunk.h> 
16
#include <chunkbuffer.h> 
17
#include <trade_msg_la.h>
18
#include <trade_msg_ha.h>
19 0f35d029 Csaba Kiraly
#include <peerset.h>
20
#include <peer.h>
21 e98d8f50 Csaba Kiraly
#include <chunkidset.h>
22 ea084625 Csaba Kiraly
#include <limits.h>
23 6546a0c0 Alessandro Russo
#include <trade_sig_ha.h>
24 89e893e2 Luca
25
#include "streaming.h"
26
#include "output.h"
27
#include "input.h"
28 e64fc7e5 Luca
#include "dbg.h"
29 a1a9e662 Csaba Kiraly
#include "chunk_signaling.h"
30 abd2ef3b Csaba Kiraly
#include "chunklock.h"
31 fcb5c29b Csaba Kiraly
#include "topology.h"
32 4c0ba13e Csaba Kiraly
#include "measures.h"
33 89e893e2 Luca
34 4367dafd Csaba Kiraly
#include "scheduler_la.h"
35
36 e99600d8 Csaba Kiraly
struct chunk_attributes {
37 f5d9663e Csaba Kiraly
  uint64_t deadline;
38
  uint16_t deadline_increment;
39 e99600d8 Csaba Kiraly
  uint16_t hopcount;
40
} __attribute__((packed));
41
42 03dca3bf ArpadBakay
struct chunk_buffer *cb;
43 709f774c Luca
static struct input_desc *input;
44 8c1b2832 Csaba Kiraly
static int cb_size;
45 7ca3b176 Csaba Kiraly
static int transid=0;
46 89e893e2 Luca
47 dc87dca9 Csaba Kiraly
static int offer_per_tick = 1;        //N_p parameter of POLITO
48
49 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid);
50
51 d3a242ab Csaba Kiraly
void cb_print()
52
{
53 b45e7201 Csaba Kiraly
#ifdef DEBUG
54 d3a242ab Csaba Kiraly
  struct chunk *chunks;
55
  int num_chunks, i, id;
56
  chunks = cb_get_chunks(cb, &num_chunks);
57
58
  dprintf("\tchbuf :");
59
  i = 0;
60
  if(num_chunks) {
61
    id = chunks[0].id;
62
    dprintf(" %d-> ",id);
63
    while (i < num_chunks) {
64
      if (id == chunks[i].id) {
65
        dprintf("%d",id % 10);
66
        i++;
67 2314ccb7 Csaba Kiraly
      } else if (chunk_islocked(id)) {
68
        dprintf("*");
69 d3a242ab Csaba Kiraly
      } else {
70
        dprintf(".");
71
      }
72
      id++;
73
    }
74
  }
75
  dprintf("\n");
76 b45e7201 Csaba Kiraly
#endif
77 d3a242ab Csaba Kiraly
}
78
79 6920fdab Luca
void stream_init(int size, struct nodeID *myID)
80 89e893e2 Luca
{
81
  char conf[32];
82
83 8c1b2832 Csaba Kiraly
  cb_size = size;
84
85
  sprintf(conf, "size=%d", cb_size);
86 89e893e2 Luca
  cb = cb_init(conf);
87 1b6a3ea7 Csaba Kiraly
  chunkDeliveryInit(myID);
88 513e75ef Alessandro Russo
  chunkSignalingInit(myID);
89 710648c6 Csaba Kiraly
  //init_measures();
90 7442ecb3 Luca
}
91
92 30a6e902 Csaba Kiraly
int source_init(const char *fname, struct nodeID *myID, bool loop)
93 7442ecb3 Luca
{
94 30a6e902 Csaba Kiraly
  input = input_open(fname, loop ? INPUT_LOOP : 0);
95 7442ecb3 Luca
  if (input == NULL) {
96
    return -1;
97
  }
98
99
  stream_init(1, myID);
100
  return 0;
101 89e893e2 Luca
}
102
103 e99600d8 Csaba Kiraly
void chunk_attributes_fill(struct chunk* c)
104
{
105
  struct chunk_attributes * ca;
106
107
  assert(!c->attributes && c->attributes_size == 0);
108
109
  c->attributes_size = sizeof(struct chunk_attributes);
110 74a5d4ae CsabaKiraly
  c->attributes = ca = malloc(c->attributes_size);
111 e99600d8 Csaba Kiraly
112 f5d9663e Csaba Kiraly
  ca->deadline = c->timestamp;
113
  ca->deadline_increment = 2;
114 e99600d8 Csaba Kiraly
  ca->hopcount = 0;
115
}
116
117 ccfc425d Csaba Kiraly
int chunk_get_hopcount(struct chunk* c) {
118
  struct chunk_attributes * ca;
119
120
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
121 cf6aaf5b Csaba Kiraly
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
122 ccfc425d Csaba Kiraly
    return -1;
123
  }
124
125
  ca = (struct chunk_attributes *) c->attributes;
126
  return ca->hopcount;
127
}
128
129 74a5d4ae CsabaKiraly
void chunk_attributes_update_received(struct chunk* c)
130 e99600d8 Csaba Kiraly
{
131
  struct chunk_attributes * ca;
132
133
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
134 cf6aaf5b Csaba Kiraly
    fprintf(stderr,"Warning, received chunk %d with strange attributes block\n", c->id);
135 e99600d8 Csaba Kiraly
    return;
136
  }
137
138
  ca = (struct chunk_attributes *) c->attributes;
139
  ca->hopcount++;
140
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
141
}
142
143 74a5d4ae CsabaKiraly
void chunk_attributes_update_sending(struct chunk* c)
144 f5d9663e Csaba Kiraly
{
145
  struct chunk_attributes * ca;
146
147
  if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) {
148 cf6aaf5b Csaba Kiraly
    fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
149 f5d9663e Csaba Kiraly
    return;
150
  }
151
152
  ca = (struct chunk_attributes *) c->attributes;
153
  ca->deadline += ca->deadline_increment;
154
  dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
155
}
156
157 2b97cbf1 Csaba Kiraly
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
158
{
159
  struct chunk *chunks;
160 74a5d4ae CsabaKiraly
  int num_chunks, i;
161 0781f344 Csaba Kiraly
  struct chunkID_set *my_bmap = chunkID_set_init("type=1");
162 2b97cbf1 Csaba Kiraly
  chunks = cb_get_chunks(chbuf, &num_chunks);
163
164 b5462b05 Csaba Kiraly
  for(i=num_chunks-1; i>=0; i--) {
165 2b97cbf1 Csaba Kiraly
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
166
  }
167
  return my_bmap;
168
}
169
170 d447f71d Csaba Kiraly
// a simple implementation that request everything that we miss ... up to max deliver
171 b5a5780a Csaba Kiraly
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id){
172 2b97cbf1 Csaba Kiraly
  struct chunkID_set *cset_acc, *my_bmap;
173 d447f71d Csaba Kiraly
  int i, d, cset_off_size;
174 8dd1eccd Csaba Kiraly
  //double lossrate;
175 d447f71d Csaba Kiraly
176 0781f344 Csaba Kiraly
  cset_acc = chunkID_set_init("size=0");
177 e735a1b1 Csaba Kiraly
178
  //reduce load a little bit if there are losses on the path from this guy
179 8dd1eccd Csaba Kiraly
  //lossrate = get_lossrate_receive(from->id);
180
  //lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
181
  //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
182 e735a1b1 Csaba Kiraly
    my_bmap = cb_to_bmap(cb);
183
    cset_off_size = chunkID_set_size(cset_off);
184
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
185
      int chunkid = chunkID_set_get_chunk(cset_off, i);
186 702769ac Csaba Kiraly
      //dprintf("\tdo I need c%d ? :",chunkid);
187 e735a1b1 Csaba Kiraly
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
188
        chunkID_set_add_chunk(cset_acc, chunkid);
189
        chunk_lock(chunkid,from);
190 cff93a07 Csaba Kiraly
        dtprintf("accepting %d from %s", chunkid, node_addr(from->id));
191
#ifdef MONL
192
        dprintf(", loss:%f rtt:%f", get_lossrate(from->id), get_rtt(from->id));
193
#endif
194
        dprintf("\n");
195 e735a1b1 Csaba Kiraly
        d++;
196
      }
197 d447f71d Csaba Kiraly
    }
198 e735a1b1 Csaba Kiraly
    chunkID_set_free(my_bmap);
199 8dd1eccd Csaba Kiraly
  //} else {
200
  //    dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(from->id), lossrate, get_rtt(from->id));
201
  //}
202 d447f71d Csaba Kiraly
203
  return cset_acc;
204
}
205
206 22ebd96d Csaba Kiraly
void send_bmap(struct peer *to)
207
{
208
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
209 513e75ef Alessandro Russo
   sendBufferMap(to->id,NULL, my_bmap, cb_size, 0);
210 c8c4c779 Csaba Kiraly
  chunkID_set_free(my_bmap);
211 a1a9e662 Csaba Kiraly
}
212
213 68330740 Csaba Kiraly
double get_average_lossrate_pset(struct peerset *pset)
214
{
215
  int i, n;
216
  struct peer *neighbours;
217
218
  n = peerset_size(pset);
219
  neighbours = peerset_get_peers(pset);
220
  {
221 74a5d4ae CsabaKiraly
    struct nodeID *nodeids[n];
222 68330740 Csaba Kiraly
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
223 5fc04950 Csaba Kiraly
#ifdef MONL
224 68330740 Csaba Kiraly
    return get_average_lossrate(nodeids, n);
225 5fc04950 Csaba Kiraly
#else
226
    return 0;
227
#endif
228 68330740 Csaba Kiraly
  }
229
}
230
231 5b95417d Csaba Kiraly
void ack_chunk(struct chunk *c, struct peer *p)
232
{
233
  //reduce load a little bit if there are losses on the path from this guy
234
  double average_lossrate = get_average_lossrate_pset(get_peers());
235
  average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
236
  if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) {
237
    return;
238
  }
239
  send_bmap(p);        //send explicit ack
240
}
241
242 74a5d4ae CsabaKiraly
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
243 89e893e2 Luca
{
244
  int res;
245 52b7c5ea Luca
  static struct chunk c;
246 30c9739a Csaba Kiraly
  struct peer *p;
247 89e893e2 Luca
248
  res = decodeChunk(&c, buff + 1, len - 1);
249
  if (res > 0) {
250 e99600d8 Csaba Kiraly
    chunk_attributes_update_received(&c);
251 efb3861d Csaba Kiraly
    reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c));
252 6ac1e106 Csaba Kiraly
    chunk_unlock(c.id);
253 13d85fc6 Csaba Kiraly
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
254 89e893e2 Luca
    output_deliver(&c);
255
    res = cb_add_chunk(cb, &c);
256 fc0260a4 Csaba Kiraly
    cb_print();
257 89e893e2 Luca
    if (res < 0) {
258 13d85fc6 Csaba Kiraly
      dprintf("\tchunk too old, buffer full with newer chunks\n");
259 89e893e2 Luca
      free(c.data);
260
      free(c.attributes);
261
    }
262 fcb5c29b Csaba Kiraly
    p = nodeid_to_peer(from,1);
263 30c9739a Csaba Kiraly
    if (p) {        //now we have it almost sure
264
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
265 5b95417d Csaba Kiraly
      ack_chunk(&c,p);        //send explicit ack
266 30c9739a Csaba Kiraly
    }
267 ae3c4aa9 Csaba Kiraly
  } else {
268
    fprintf(stderr,"\tError: can't decode chunk!\n");
269 89e893e2 Luca
  }
270
}
271
272 afdc8db4 Luca Abeni
int generated_chunk(suseconds_t *delta)
273 89e893e2 Luca
{
274
  int res;
275
  struct chunk c;
276
277 afdc8db4 Luca Abeni
  *delta = input_get(input, &c);
278
  if (*delta < 0) {
279 4bb789ed Luca
    fprintf(stderr, "Error in input!\n");
280
    exit(-1);
281
  }
282 afdc8db4 Luca Abeni
  if (c.data == NULL) {
283
    return 0;
284 4bb789ed Luca
  }
285 af45500a Csaba Kiraly
  dprintf("Generated chunk %d of %d bytes\n",c.id, c.size);
286 e99600d8 Csaba Kiraly
  chunk_attributes_fill(&c);
287 5fb5ecd4 Luca
  res = cb_add_chunk(cb, &c);
288
  if (res < 0) {
289
    free(c.data);
290
    free(c.attributes);
291 89e893e2 Luca
  }
292 ce80b058 Luca Abeni
293 afdc8db4 Luca Abeni
  return 1;
294 89e893e2 Luca
}
295
296 4367dafd Csaba Kiraly
/**
297 d74bc79c Csaba Kiraly
 *example function to filter chunks based on whether a given peer needs them.
298
 *
299
 * Looks at buffermap information received about the given peer.
300 4367dafd Csaba Kiraly
 */
301 74a5d4ae CsabaKiraly
int needs(struct nodeID *n, int cid){
302 1b7da906 Csaba Kiraly
  struct peer * p = nodeid_to_peer(n, 0);
303
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
304
305 3cd33bb0 Csaba Kiraly
  //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
306 d74bc79c Csaba Kiraly
  if (! p->bmap) {
307 3cd33bb0 Csaba Kiraly
    //dprintf("no bmap\n");
308 d74bc79c Csaba Kiraly
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
309
  }
310 1b7da906 Csaba Kiraly
  return _needs(p->bmap, p->cb_size, cid);
311 742dfaec Csaba Kiraly
}
312 d74bc79c Csaba Kiraly
313 742dfaec Csaba Kiraly
int _needs(struct chunkID_set *cset, int cb_size, int cid){
314
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
315 d74bc79c Csaba Kiraly
    int missing, min;
316
    //@TODO: add some bmap_timestamp based logic
317
318 742dfaec Csaba Kiraly
    if (chunkID_set_size(cset) == 0) {
319 3cd33bb0 Csaba Kiraly
      //dprintf("bmap empty\n");
320 d74bc79c Csaba Kiraly
      return 1;        // if the bmap seems empty, it needs the chunk
321
    }
322 742dfaec Csaba Kiraly
    missing = cb_size - chunkID_set_size(cset);
323 d74bc79c Csaba Kiraly
    missing = missing < 0 ? 0 : missing;
324 eb42de41 Csaba Kiraly
    min = chunkID_set_get_earliest(cset);
325 3cd33bb0 Csaba Kiraly
      //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
326 742dfaec Csaba Kiraly
    return (cid >= min - missing);
327 d74bc79c Csaba Kiraly
  }
328
329 3cd33bb0 Csaba Kiraly
  //dprintf("has it\n");
330 d74bc79c Csaba Kiraly
  return 0;
331 4367dafd Csaba Kiraly
}
332 d74bc79c Csaba Kiraly
333 74a5d4ae CsabaKiraly
double peerWeightReceivedfrom(struct nodeID **n){
334 1b7da906 Csaba Kiraly
  struct peer * p = nodeid_to_peer(*n, 0);
335
  if (!p) return 0;
336
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
337 4367dafd Csaba Kiraly
}
338 eb4ecf1c Csaba Kiraly
339 74a5d4ae CsabaKiraly
double peerWeightUniform(struct nodeID **n){
340 eb4ecf1c Csaba Kiraly
  return 1;
341
}
342
343 74a5d4ae CsabaKiraly
double peerWeightRtt(struct nodeID **n){
344 d4a680a0 Csaba Kiraly
#ifdef MONL
345 1b7da906 Csaba Kiraly
  double rtt = get_rtt(*n);
346
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
347 3ffbcf2d Csaba Kiraly
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
348 d4a680a0 Csaba Kiraly
#else
349
  return 1;
350 5fc04950 Csaba Kiraly
#endif
351 d4a680a0 Csaba Kiraly
}
352 4c0ba13e Csaba Kiraly
353 ea084625 Csaba Kiraly
//ordering function for ELp peer selection, chunk ID based
354
//can't be used as weight
355 1b7da906 Csaba Kiraly
double peerScoreELpID(struct nodeID **n){
356 ea084625 Csaba Kiraly
  struct chunkID_set *bmap;
357
  int latest;
358 1b7da906 Csaba Kiraly
  struct peer * p = nodeid_to_peer(*n, 0);
359
  if (!p) return 0;
360 ea084625 Csaba Kiraly
361 1b7da906 Csaba Kiraly
  bmap = p->bmap;
362 ea084625 Csaba Kiraly
  if (!bmap) return 0;
363
  latest = chunkID_set_get_latest(bmap);
364
  if (latest == INT_MIN) return 0;
365
366
  return -latest;
367
}
368
369 74a5d4ae CsabaKiraly
double getChunkTimestamp(int *cid){
370
  struct chunk *c = cb_get_chunk(cb, *cid);
371 1b7da906 Csaba Kiraly
  if (!c) return 0;
372
373
  return (double) c->timestamp;
374 4367dafd Csaba Kiraly
}
375
376 b5a5780a Csaba Kiraly
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver, int trans_id){
377 8e85b948 Alessandro Russo
  int i, d, cset_acc_size, res;
378 d447f71d Csaba Kiraly
379
  cset_acc_size = chunkID_set_size(cset_acc);
380 e2c563e7 Csaba Kiraly
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);        //this only works if accepts are sent back even if 0 is accepted
381 d447f71d Csaba Kiraly
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
382 74a5d4ae CsabaKiraly
    struct chunk *c;
383 d447f71d Csaba Kiraly
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
384
    c = cb_get_chunk(cb, chunkid);
385 1b7da906 Csaba Kiraly
    if (c && needs(to->id, chunkid) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
386 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
387 8e85b948 Alessandro Russo
      res = sendChunk(to->id, c);
388 d447f71d Csaba Kiraly
      if (res >= 0) {
389
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
390
        d++;
391 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
392 96b8d0a8 Csaba Kiraly
      } else {
393
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
394 d447f71d Csaba Kiraly
      }
395
    }
396
  }
397
}
398
399 dc87dca9 Csaba Kiraly
int offer_peer_count()
400
{
401
  return offer_per_tick;
402
}
403
404 74a5d4ae CsabaKiraly
int offer_max_deliver(struct nodeID *n)
405 960017bf Csaba Kiraly
{
406 5fc04950 Csaba Kiraly
#ifdef MONL
407 1b7da906 Csaba Kiraly
  switch (get_hopcount(n)) {
408 960017bf Csaba Kiraly
    case 0: return 5;
409
    case 1: return 2;
410
    default: return 1;
411
  }
412 5fc04950 Csaba Kiraly
#else
413
  return 1;
414
#endif
415 960017bf Csaba Kiraly
}
416
417 fcb5c29b Csaba Kiraly
void send_offer()
418 43355360 Csaba Kiraly
{
419
  struct chunk *buff;
420 74a5d4ae CsabaKiraly
  int size, res, i, n;
421 43355360 Csaba Kiraly
  struct peer *neighbours;
422 fcb5c29b Csaba Kiraly
  struct peerset *pset;
423 43355360 Csaba Kiraly
424 fcb5c29b Csaba Kiraly
  pset = get_peers();
425 43355360 Csaba Kiraly
  n = peerset_size(pset);
426
  neighbours = peerset_get_peers(pset);
427
  dprintf("Send Offer: %d neighbours\n", n);
428
  if (n == 0) return;
429
  buff = cb_get_chunks(cb, &size);
430
  if (size == 0) return;
431
432
  {
433 dc87dca9 Csaba Kiraly
    size_t selectedpeers_len = offer_peer_count();
434 1b7da906 Csaba Kiraly
    int chunkids[size];
435 74a5d4ae CsabaKiraly
    struct nodeID *nodeids[n];
436
    struct nodeID *selectedpeers[selectedpeers_len];
437 43355360 Csaba Kiraly
438 abbef5a5 Csaba Kiraly
    //reduce load a little bit if there are losses on the path from this guy
439 68330740 Csaba Kiraly
    double average_lossrate = get_average_lossrate_pset(pset);
440 abbef5a5 Csaba Kiraly
    average_lossrate = finite(average_lossrate) ? average_lossrate : 0;        //start agressively, assuming 0 loss
441 f76dc385 Csaba Kiraly
    if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) {
442 abbef5a5 Csaba Kiraly
      return;
443
    }
444
445 1b7da906 Csaba Kiraly
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
446
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
447 74a5d4ae CsabaKiraly
    selectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);        //select a peer that needs at least one of our chunks
448 43355360 Csaba Kiraly
449
    for (i=0; i<selectedpeers_len ; i++){
450 960017bf Csaba Kiraly
      int max_deliver = offer_max_deliver(selectedpeers[i]);
451 43355360 Csaba Kiraly
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
452 1b7da906 Csaba Kiraly
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]));
453
      res = offerChunks(selectedpeers[i], my_bmap, max_deliver, transid++);
454 ddedf85f Csaba Kiraly
      chunkID_set_free(my_bmap);
455 43355360 Csaba Kiraly
    }
456
  }
457
}
458
459 4367dafd Csaba Kiraly
460 fcb5c29b Csaba Kiraly
void send_chunk()
461 89e893e2 Luca
{
462
  struct chunk *buff;
463 74a5d4ae CsabaKiraly
  int size, res, i, n;
464 924226c0 Luca Abeni
  struct peer *neighbours;
465 fcb5c29b Csaba Kiraly
  struct peerset *pset;
466 89e893e2 Luca
467 fcb5c29b Csaba Kiraly
  pset = get_peers();
468 0f35d029 Csaba Kiraly
  n = peerset_size(pset);
469
  neighbours = peerset_get_peers(pset);
470 e64fc7e5 Luca
  dprintf("Send Chunk: %d neighbours\n", n);
471 89e893e2 Luca
  if (n == 0) return;
472
  buff = cb_get_chunks(cb, &size);
473 e64fc7e5 Luca
  dprintf("\t %d chunks in buffer...\n", size);
474 89e893e2 Luca
  if (size == 0) return;
475
476
  /************ STUPID DUMB SCHEDULING ****************/
477 4367dafd Csaba Kiraly
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
478
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
479 89e893e2 Luca
  /************ /STUPID DUMB SCHEDULING ****************/
480
481 4367dafd Csaba Kiraly
  /************ USE SCHEDULER ****************/
482 924226c0 Luca Abeni
  {
483
    size_t selectedpairs_len = 1;
484 1b7da906 Csaba Kiraly
    int chunkids[size];
485 74a5d4ae CsabaKiraly
    struct nodeID *nodeids[n];
486 924226c0 Luca Abeni
    struct PeerChunk selectedpairs[1];
487
  
488 1b7da906 Csaba Kiraly
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
489
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
490
    schedSelectPeerFirst(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, needs, peerWeightRtt, getChunkTimestamp);
491 4367dafd Csaba Kiraly
  /************ /USE SCHEDULER ****************/
492
493 924226c0 Luca Abeni
    for (i=0; i<selectedpairs_len ; i++){
494 1b7da906 Csaba Kiraly
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
495 74a5d4ae CsabaKiraly
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
496 924226c0 Luca Abeni
      dprintf("\t sending chunk[%d] to ", c->id);
497
      dprintf("%s\n", node_addr(p->id));
498
499 449e156e Csaba Kiraly
      send_bmap(p);
500 f5d9663e Csaba Kiraly
      chunk_attributes_update_sending(c);
501 924226c0 Luca Abeni
      res = sendChunk(p->id, c);
502 13d85fc6 Csaba Kiraly
      dprintf("\tResult: %d\n", res);
503 924226c0 Luca Abeni
      if (res>=0) {
504
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
505 f740dafb Csaba Kiraly
        reg_chunk_send(c->id);
506 96b8d0a8 Csaba Kiraly
      } else {
507
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
508 924226c0 Luca Abeni
      }
509 0f35d029 Csaba Kiraly
    }
510 4367dafd Csaba Kiraly
  }
511 89e893e2 Luca
}