streamers / streaming.c @ bb1ef440
History | View | Annotate | Download (16.3 KB)
1 | 8fed7779 | CsabaKiraly | /*
|
---|---|---|---|
2 | * Copyright (c) 2010 Luca Abeni
|
||
3 | * Copyright (c) 2010 Csaba Kiraly
|
||
4 | *
|
||
5 | * This is free software; see gpl-3.0.txt
|
||
6 | */
|
||
7 | 1cd06c26 | CsabaKiraly | #include <sys/time.h> |
8 | 89e893e2 | Luca | #include <stdlib.h> |
9 | #include <stdio.h> |
||
10 | #include <stdint.h> |
||
11 | 30a6e902 | Csaba Kiraly | #include <stdbool.h> |
12 | 4c0ba13e | Csaba Kiraly | #include <math.h> |
13 | e99600d8 | Csaba Kiraly | #include <assert.h> |
14 | f14985ba | Luca Abeni | #include <string.h> |
15 | 84ff82ba | CsabaKiraly | #include <inttypes.h> |
16 | 89e893e2 | Luca | |
17 | #include <net_helper.h> |
||
18 | #include <chunk.h> |
||
19 | #include <chunkbuffer.h> |
||
20 | #include <trade_msg_la.h> |
||
21 | #include <trade_msg_ha.h> |
||
22 | 0f35d029 | Csaba Kiraly | #include <peerset.h> |
23 | #include <peer.h> |
||
24 | e98d8f50 | Csaba Kiraly | #include <chunkidset.h> |
25 | ea084625 | Csaba Kiraly | #include <limits.h> |
26 | 6546a0c0 | Alessandro Russo | #include <trade_sig_ha.h> |
27 | 89e893e2 | Luca | |
28 | #include "streaming.h" |
||
29 | #include "output.h" |
||
30 | #include "input.h" |
||
31 | e64fc7e5 | Luca | #include "dbg.h" |
32 | b44ae8d2 | CsabaKiraly | #include "chunk_signaling.h" |
33 | abd2ef3b | Csaba Kiraly | #include "chunklock.h" |
34 | fcb5c29b | Csaba Kiraly | #include "topology.h" |
35 | 4c0ba13e | Csaba Kiraly | #include "measures.h" |
36 | 0fec1310 | Csaba Kiraly | #include "scheduling.h" |
37 | 89e893e2 | Luca | |
38 | 4367dafd | Csaba Kiraly | #include "scheduler_la.h" |
39 | |||
40 | d88f86de | Csaba Kiraly | static bool heuristics_distance_maxdeliver = false; |
41 | a204d648 | Csaba Kiraly | static int bcast_after_receive_every = 0; |
42 | 59b87dad | Csaba Kiraly | static bool neigh_on_chunk_recv = false; |
43 | 10fd812c | Csaba Kiraly | |
44 | e99600d8 | Csaba Kiraly | struct chunk_attributes {
|
45 | f5d9663e | Csaba Kiraly | uint64_t deadline; |
46 | uint16_t deadline_increment; |
||
47 | e99600d8 | Csaba Kiraly | uint16_t hopcount; |
48 | } __attribute__((packed)); |
||
49 | |||
50 | 1cd06c26 | CsabaKiraly | extern bool chunk_log; |
51 | bc1ddc15 | MatteoSammarco | |
52 | 03dca3bf | ArpadBakay | struct chunk_buffer *cb;
|
53 | 709f774c | Luca | static struct input_desc *input; |
54 | 8c1b2832 | Csaba Kiraly | static int cb_size; |
55 | 7ca3b176 | Csaba Kiraly | static int transid=0; |
56 | 89e893e2 | Luca | |
57 | dc87dca9 | Csaba Kiraly | static int offer_per_tick = 1; //N_p parameter of POLITO |
58 | |||
59 | 742dfaec | Csaba Kiraly | int _needs(struct chunkID_set *cset, int cb_size, int cid); |
60 | |||
61 | 1cd06c26 | CsabaKiraly | uint64_t gettimeofday_in_us(void)
|
62 | { |
||
63 | struct timeval what_time; //to store the epoch time |
||
64 | |||
65 | gettimeofday(&what_time, NULL);
|
||
66 | return what_time.tv_sec * 1000000ULL + what_time.tv_usec; |
||
67 | } |
||
68 | |||
69 | d3a242ab | Csaba Kiraly | void cb_print()
|
70 | { |
||
71 | b45e7201 | Csaba Kiraly | #ifdef DEBUG
|
72 | d3a242ab | Csaba Kiraly | struct chunk *chunks;
|
73 | int num_chunks, i, id;
|
||
74 | chunks = cb_get_chunks(cb, &num_chunks); |
||
75 | |||
76 | dprintf("\tchbuf :");
|
||
77 | i = 0;
|
||
78 | if(num_chunks) {
|
||
79 | id = chunks[0].id;
|
||
80 | dprintf(" %d-> ",id);
|
||
81 | while (i < num_chunks) {
|
||
82 | if (id == chunks[i].id) {
|
||
83 | dprintf("%d",id % 10); |
||
84 | i++; |
||
85 | 2314ccb7 | Csaba Kiraly | } else if (chunk_islocked(id)) { |
86 | dprintf("*");
|
||
87 | d3a242ab | Csaba Kiraly | } else {
|
88 | dprintf(".");
|
||
89 | } |
||
90 | id++; |
||
91 | } |
||
92 | } |
||
93 | dprintf("\n");
|
||
94 | b45e7201 | Csaba Kiraly | #endif
|
95 | d3a242ab | Csaba Kiraly | } |
96 | |||
97 | 6920fdab | Luca | void stream_init(int size, struct nodeID *myID) |
98 | 89e893e2 | Luca | { |
99 | 8612d586 | CsabaKiraly | static char conf[32]; |
100 | 89e893e2 | Luca | |
101 | 8c1b2832 | Csaba Kiraly | cb_size = size; |
102 | |||
103 | sprintf(conf, "size=%d", cb_size);
|
||
104 | 89e893e2 | Luca | cb = cb_init(conf); |
105 | 1b6a3ea7 | Csaba Kiraly | chunkDeliveryInit(myID); |
106 | 513e75ef | Alessandro Russo | chunkSignalingInit(myID); |
107 | 95faef91 | CsabaKiraly | init_measures(); |
108 | 7442ecb3 | Luca | } |
109 | |||
110 | c9370421 | CsabaKiraly | int source_init(const char *fname, struct nodeID *myID, bool loop, int *fds, int fds_size) |
111 | 7442ecb3 | Luca | { |
112 | f14985ba | Luca Abeni | int flags = 0; |
113 | |||
114 | if (memcmp(fname, "udp:", 4) == 0) { |
||
115 | fname += 4;
|
||
116 | flags = INPUT_UDP; |
||
117 | } |
||
118 | if (loop) {
|
||
119 | flags |= INPUT_LOOP; |
||
120 | } |
||
121 | dceef344 | Csaba Kiraly | |
122 | f14985ba | Luca Abeni | input = input_open(fname, flags, fds, fds_size); |
123 | 7442ecb3 | Luca | if (input == NULL) { |
124 | return -1; |
||
125 | } |
||
126 | |||
127 | stream_init(1, myID);
|
||
128 | return 0; |
||
129 | 89e893e2 | Luca | } |
130 | |||
131 | e99600d8 | Csaba Kiraly | void chunk_attributes_fill(struct chunk* c) |
132 | { |
||
133 | struct chunk_attributes * ca;
|
||
134 | |||
135 | assert(!c->attributes && c->attributes_size == 0);
|
||
136 | |||
137 | c->attributes_size = sizeof(struct chunk_attributes); |
||
138 | 74a5d4ae | CsabaKiraly | c->attributes = ca = malloc(c->attributes_size); |
139 | e99600d8 | Csaba Kiraly | |
140 | f5d9663e | Csaba Kiraly | ca->deadline = c->timestamp; |
141 | ca->deadline_increment = 2;
|
||
142 | e99600d8 | Csaba Kiraly | ca->hopcount = 0;
|
143 | } |
||
144 | |||
145 | ccfc425d | Csaba Kiraly | int chunk_get_hopcount(struct chunk* c) { |
146 | struct chunk_attributes * ca;
|
||
147 | |||
148 | if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
||
149 | 84ff82ba | CsabaKiraly | fprintf(stderr,"Warning, chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
150 | ccfc425d | Csaba Kiraly | return -1; |
151 | } |
||
152 | |||
153 | ca = (struct chunk_attributes *) c->attributes;
|
||
154 | return ca->hopcount;
|
||
155 | } |
||
156 | |||
157 | 74a5d4ae | CsabaKiraly | void chunk_attributes_update_received(struct chunk* c) |
158 | e99600d8 | Csaba Kiraly | { |
159 | struct chunk_attributes * ca;
|
||
160 | |||
161 | if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
||
162 | 84ff82ba | CsabaKiraly | fprintf(stderr,"Warning, received chunk %d with strange attributes block. Size:%d expected:%lu\n", c->id, c->attributes ? c->attributes_size : 0, sizeof(struct chunk_attributes)); |
163 | e99600d8 | Csaba Kiraly | return;
|
164 | } |
||
165 | |||
166 | ca = (struct chunk_attributes *) c->attributes;
|
||
167 | ca->hopcount++; |
||
168 | dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
|
||
169 | } |
||
170 | |||
171 | 43dd1a10 | Csaba Kiraly | void chunk_attributes_update_sending(const struct chunk* c) |
172 | f5d9663e | Csaba Kiraly | { |
173 | struct chunk_attributes * ca;
|
||
174 | |||
175 | if (!c->attributes || c->attributes_size != sizeof(struct chunk_attributes)) { |
||
176 | cf6aaf5b | Csaba Kiraly | fprintf(stderr,"Warning, chunk %d with strange attributes block\n", c->id);
|
177 | f5d9663e | Csaba Kiraly | return;
|
178 | } |
||
179 | |||
180 | ca = (struct chunk_attributes *) c->attributes;
|
||
181 | ca->deadline += ca->deadline_increment; |
||
182 | dprintf("Sending chunk %d with deadline %lu\n", c->id, ca->deadline);
|
||
183 | } |
||
184 | |||
185 | 2b97cbf1 | Csaba Kiraly | struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
186 | { |
||
187 | struct chunk *chunks;
|
||
188 | 74a5d4ae | CsabaKiraly | int num_chunks, i;
|
189 | b31bd3a2 | CsabaKiraly | struct chunkID_set *my_bmap = chunkID_set_init("type=bitmap"); |
190 | 2b97cbf1 | Csaba Kiraly | chunks = cb_get_chunks(chbuf, &num_chunks); |
191 | |||
192 | b5462b05 | Csaba Kiraly | for(i=num_chunks-1; i>=0; i--) { |
193 | 2b97cbf1 | Csaba Kiraly | chunkID_set_add_chunk(my_bmap, chunks[i].id); |
194 | } |
||
195 | return my_bmap;
|
||
196 | } |
||
197 | |||
198 | d447f71d | Csaba Kiraly | // a simple implementation that request everything that we miss ... up to max deliver
|
199 | 8e750be6 | Csaba Kiraly | struct chunkID_set *get_chunks_to_accept(struct nodeID *fromid, const struct chunkID_set *cset_off, int max_deliver, uint16_t trans_id){ |
200 | 2b97cbf1 | Csaba Kiraly | struct chunkID_set *cset_acc, *my_bmap;
|
201 | d447f71d | Csaba Kiraly | int i, d, cset_off_size;
|
202 | 8dd1eccd | Csaba Kiraly | //double lossrate;
|
203 | f9630d20 | Csaba Kiraly | struct peer *from = nodeid_to_peer(fromid, 0); |
204 | d447f71d | Csaba Kiraly | |
205 | 0781f344 | Csaba Kiraly | cset_acc = chunkID_set_init("size=0");
|
206 | e735a1b1 | Csaba Kiraly | |
207 | //reduce load a little bit if there are losses on the path from this guy
|
||
208 | 8dd1eccd | Csaba Kiraly | //lossrate = get_lossrate_receive(from->id);
|
209 | //lossrate = finite(lossrate) ? lossrate : 0; //start agressively, assuming 0 loss
|
||
210 | //if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
|
||
211 | e735a1b1 | Csaba Kiraly | my_bmap = cb_to_bmap(cb); |
212 | cset_off_size = chunkID_set_size(cset_off); |
||
213 | for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) { |
||
214 | int chunkid = chunkID_set_get_chunk(cset_off, i);
|
||
215 | 702769ac | Csaba Kiraly | //dprintf("\tdo I need c%d ? :",chunkid);
|
216 | e735a1b1 | Csaba Kiraly | if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
|
217 | chunkID_set_add_chunk(cset_acc, chunkid); |
||
218 | chunk_lock(chunkid,from); |
||
219 | f9630d20 | Csaba Kiraly | dtprintf("accepting %d from %s", chunkid, node_addr(fromid));
|
220 | cff93a07 | Csaba Kiraly | #ifdef MONL
|
221 | f9630d20 | Csaba Kiraly | dprintf(", loss:%f rtt:%f", get_lossrate(fromid), get_rtt(fromid));
|
222 | cff93a07 | Csaba Kiraly | #endif
|
223 | dprintf("\n");
|
||
224 | e735a1b1 | Csaba Kiraly | d++; |
225 | } |
||
226 | d447f71d | Csaba Kiraly | } |
227 | e735a1b1 | Csaba Kiraly | chunkID_set_free(my_bmap); |
228 | 8dd1eccd | Csaba Kiraly | //} else {
|
229 | f9630d20 | Csaba Kiraly | // dtprintf("accepting -- from %s loss:%f rtt:%f\n", node_addr(fromid), lossrate, get_rtt(fromid));
|
230 | 8dd1eccd | Csaba Kiraly | //}
|
231 | d447f71d | Csaba Kiraly | |
232 | return cset_acc;
|
||
233 | } |
||
234 | |||
235 | b0225995 | Csaba Kiraly | void send_bmap(struct nodeID *toid) |
236 | 22ebd96d | Csaba Kiraly | { |
237 | struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
||
238 | b0225995 | Csaba Kiraly | sendBufferMap(toid,NULL, my_bmap, input ? 0 : cb_size, 0); |
239 | c8c4c779 | Csaba Kiraly | chunkID_set_free(my_bmap); |
240 | a1a9e662 | Csaba Kiraly | } |
241 | |||
242 | a204d648 | Csaba Kiraly | void bcast_bmap()
|
243 | { |
||
244 | int i, n;
|
||
245 | struct peer *neighbours;
|
||
246 | struct peerset *pset;
|
||
247 | ba99da01 | Csaba Kiraly | struct chunkID_set *my_bmap;
|
248 | a204d648 | Csaba Kiraly | |
249 | pset = get_peers(); |
||
250 | n = peerset_size(pset); |
||
251 | neighbours = peerset_get_peers(pset); |
||
252 | |||
253 | ba99da01 | Csaba Kiraly | my_bmap = cb_to_bmap(cb); //cache our bmap for faster processing
|
254 | a204d648 | Csaba Kiraly | for (i = 0; i<n; i++) { |
255 | ba99da01 | Csaba Kiraly | sendBufferMap(neighbours[i].id,NULL, my_bmap, input ? 0 : cb_size, 0); |
256 | a204d648 | Csaba Kiraly | } |
257 | ba99da01 | Csaba Kiraly | chunkID_set_free(my_bmap); |
258 | a204d648 | Csaba Kiraly | } |
259 | |||
260 | 68330740 | Csaba Kiraly | double get_average_lossrate_pset(struct peerset *pset) |
261 | { |
||
262 | int i, n;
|
||
263 | struct peer *neighbours;
|
||
264 | |||
265 | n = peerset_size(pset); |
||
266 | neighbours = peerset_get_peers(pset); |
||
267 | { |
||
268 | 74a5d4ae | CsabaKiraly | struct nodeID *nodeids[n];
|
269 | 68330740 | Csaba Kiraly | for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id; |
270 | 5fc04950 | Csaba Kiraly | #ifdef MONL
|
271 | 68330740 | Csaba Kiraly | return get_average_lossrate(nodeids, n);
|
272 | 5fc04950 | Csaba Kiraly | #else
|
273 | return 0; |
||
274 | #endif
|
||
275 | 68330740 | Csaba Kiraly | } |
276 | } |
||
277 | |||
278 | fa3d2720 | Csaba Kiraly | void ack_chunk(struct chunk *c, struct nodeID *from) |
279 | 5b95417d | Csaba Kiraly | { |
280 | //reduce load a little bit if there are losses on the path from this guy
|
||
281 | double average_lossrate = get_average_lossrate_pset(get_peers());
|
||
282 | average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
||
283 | if (rand()/((double)RAND_MAX + 1) < 1 * average_lossrate ) { |
||
284 | return;
|
||
285 | } |
||
286 | fa3d2720 | Csaba Kiraly | send_bmap(from); //send explicit ack
|
287 | 5b95417d | Csaba Kiraly | } |
288 | |||
289 | 74a5d4ae | CsabaKiraly | void received_chunk(struct nodeID *from, const uint8_t *buff, int len) |
290 | 89e893e2 | Luca | { |
291 | int res;
|
||
292 | 52b7c5ea | Luca | static struct chunk c; |
293 | 30c9739a | Csaba Kiraly | struct peer *p;
|
294 | a204d648 | Csaba Kiraly | static int bcast_cnt; |
295 | 397f3860 | Csaba Kiraly | uint16_t transid; |
296 | 89e893e2 | Luca | |
297 | a1c01ccf | Csaba Kiraly | res = parseChunkMsg(buff + 1, len - 1, &c, &transid); |
298 | 89e893e2 | Luca | if (res > 0) { |
299 | e99600d8 | Csaba Kiraly | chunk_attributes_update_received(&c); |
300 | 6ac1e106 | Csaba Kiraly | chunk_unlock(c.id); |
301 | 13d85fc6 | Csaba Kiraly | dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
|
302 | 84ff82ba | CsabaKiraly | if(chunk_log){fprintf(stderr, "TEO: Received chunk %d from peer: %s at: %"PRIu64" hopcount: %i\n", c.id, node_addr(from), gettimeofday_in_us(), chunk_get_hopcount(&c));} |
303 | 89e893e2 | Luca | output_deliver(&c); |
304 | res = cb_add_chunk(cb, &c); |
||
305 | 14e5c21e | Csaba Kiraly | reg_chunk_receive(c.id, c.timestamp, chunk_get_hopcount(&c), res==E_CB_OLD, res==E_CB_DUPLICATE); |
306 | fc0260a4 | Csaba Kiraly | cb_print(); |
307 | 89e893e2 | Luca | if (res < 0) { |
308 | 13d85fc6 | Csaba Kiraly | dprintf("\tchunk too old, buffer full with newer chunks\n");
|
309 | 84ff82ba | CsabaKiraly | if(chunk_log){fprintf(stderr, "TEO: Received chunk: %d too old (buffer full with newer chunks) from peer: %s at: %"PRIu64"\n", c.id, node_addr(from), gettimeofday_in_us());} |
310 | 89e893e2 | Luca | free(c.data); |
311 | free(c.attributes); |
||
312 | } |
||
313 | 13d4c180 | Csaba Kiraly | p = nodeid_to_peer(from, neigh_on_chunk_recv); |
314 | 30c9739a | Csaba Kiraly | if (p) { //now we have it almost sure |
315 | chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
||
316 | } |
||
317 | c3e8369f | Csaba Kiraly | ack_chunk(&c,from); //send explicit ack
|
318 | a204d648 | Csaba Kiraly | if (bcast_after_receive_every && bcast_cnt % bcast_after_receive_every == 0) { |
319 | bcast_bmap(); |
||
320 | } |
||
321 | ae3c4aa9 | Csaba Kiraly | } else {
|
322 | fprintf(stderr,"\tError: can't decode chunk!\n");
|
||
323 | 89e893e2 | Luca | } |
324 | } |
||
325 | |||
326 | 685225b2 | CsabaKiraly | struct chunk *generated_chunk(suseconds_t *delta)
|
327 | 89e893e2 | Luca | { |
328 | 685225b2 | CsabaKiraly | struct chunk *c;
|
329 | |||
330 | c = malloc(sizeof(struct chunk)); |
||
331 | if (!c) {
|
||
332 | fprintf(stderr, "Memory allocation error!\n");
|
||
333 | return NULL; |
||
334 | } |
||
335 | 89e893e2 | Luca | |
336 | 685225b2 | CsabaKiraly | *delta = input_get(input, c); |
337 | afdc8db4 | Luca Abeni | if (*delta < 0) { |
338 | 4bb789ed | Luca | fprintf(stderr, "Error in input!\n");
|
339 | exit(-1);
|
||
340 | } |
||
341 | 685225b2 | CsabaKiraly | if (c->data == NULL) { |
342 | free(c); |
||
343 | return NULL; |
||
344 | 4bb789ed | Luca | } |
345 | 685225b2 | CsabaKiraly | dprintf("Generated chunk %d of %d bytes\n",c->id, c->size);
|
346 | chunk_attributes_fill(c); |
||
347 | return c;
|
||
348 | } |
||
349 | |||
350 | int add_chunk(struct chunk *c) |
||
351 | { |
||
352 | int res;
|
||
353 | |||
354 | res = cb_add_chunk(cb, c); |
||
355 | 5fb5ecd4 | Luca | if (res < 0) { |
356 | 685225b2 | CsabaKiraly | free(c->data); |
357 | free(c->attributes); |
||
358 | free(c); |
||
359 | return 0; |
||
360 | 89e893e2 | Luca | } |
361 | 685225b2 | CsabaKiraly | free(c); |
362 | afdc8db4 | Luca Abeni | return 1; |
363 | 89e893e2 | Luca | } |
364 | |||
365 | 4367dafd | Csaba Kiraly | /**
|
366 | d74bc79c | Csaba Kiraly | *example function to filter chunks based on whether a given peer needs them.
|
367 | *
|
||
368 | * Looks at buffermap information received about the given peer.
|
||
369 | 4367dafd | Csaba Kiraly | */
|
370 | 7b86e7d9 | Csaba Kiraly | int needs(struct peer *n, int cid){ |
371 | struct peer * p = n;
|
||
372 | 1b7da906 | Csaba Kiraly | |
373 | 3cd33bb0 | Csaba Kiraly | //dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
|
374 | d74bc79c | Csaba Kiraly | if (! p->bmap) {
|
375 | 3cd33bb0 | Csaba Kiraly | //dprintf("no bmap\n");
|
376 | d74bc79c | Csaba Kiraly | return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
377 | } |
||
378 | b44ae8d2 | CsabaKiraly | return _needs(p->bmap, p->cb_size, cid);
|
379 | 742dfaec | Csaba Kiraly | } |
380 | d74bc79c | Csaba Kiraly | |
381 | 742dfaec | Csaba Kiraly | int _needs(struct chunkID_set *cset, int cb_size, int cid){ |
382 | 7c2ac59e | CsabaKiraly | if (cb_size == 0) { //if it declared it does not needs chunks |
383 | return 0; |
||
384 | } |
||
385 | |||
386 | 742dfaec | Csaba Kiraly | if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk |
387 | d74bc79c | Csaba Kiraly | int missing, min;
|
388 | //@TODO: add some bmap_timestamp based logic
|
||
389 | |||
390 | 742dfaec | Csaba Kiraly | if (chunkID_set_size(cset) == 0) { |
391 | 3cd33bb0 | Csaba Kiraly | //dprintf("bmap empty\n");
|
392 | d74bc79c | Csaba Kiraly | return 1; // if the bmap seems empty, it needs the chunk |
393 | } |
||
394 | 742dfaec | Csaba Kiraly | missing = cb_size - chunkID_set_size(cset); |
395 | d74bc79c | Csaba Kiraly | missing = missing < 0 ? 0 : missing; |
396 | eb42de41 | Csaba Kiraly | min = chunkID_set_get_earliest(cset); |
397 | 3cd33bb0 | Csaba Kiraly | //dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
|
398 | 742dfaec | Csaba Kiraly | return (cid >= min - missing);
|
399 | d74bc79c | Csaba Kiraly | } |
400 | |||
401 | 3cd33bb0 | Csaba Kiraly | //dprintf("has it\n");
|
402 | d74bc79c | Csaba Kiraly | return 0; |
403 | 4367dafd | Csaba Kiraly | } |
404 | d74bc79c | Csaba Kiraly | |
405 | 7b86e7d9 | Csaba Kiraly | double peerWeightReceivedfrom(struct peer **n){ |
406 | struct peer * p = *n;
|
||
407 | 1b7da906 | Csaba Kiraly | return timerisset(&p->bmap_timestamp) ? 1 : 0.1; |
408 | 4367dafd | Csaba Kiraly | } |
409 | eb4ecf1c | Csaba Kiraly | |
410 | 8bb39b0b | Csaba Kiraly | double peerWeightUniform(struct peer **n){ |
411 | eb4ecf1c | Csaba Kiraly | return 1; |
412 | } |
||
413 | |||
414 | 7b86e7d9 | Csaba Kiraly | double peerWeightRtt(struct peer **n){ |
415 | d4a680a0 | Csaba Kiraly | #ifdef MONL
|
416 | 7b86e7d9 | Csaba Kiraly | double rtt = get_rtt(*n->id);
|
417 | 1b7da906 | Csaba Kiraly | //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
|
418 | 3ffbcf2d | Csaba Kiraly | return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1; |
419 | d4a680a0 | Csaba Kiraly | #else
|
420 | return 1; |
||
421 | 5fc04950 | Csaba Kiraly | #endif
|
422 | d4a680a0 | Csaba Kiraly | } |
423 | 4c0ba13e | Csaba Kiraly | |
424 | ea084625 | Csaba Kiraly | //ordering function for ELp peer selection, chunk ID based
|
425 | //can't be used as weight
|
||
426 | 1b7da906 | Csaba Kiraly | double peerScoreELpID(struct nodeID **n){ |
427 | ea084625 | Csaba Kiraly | struct chunkID_set *bmap;
|
428 | int latest;
|
||
429 | 1b7da906 | Csaba Kiraly | struct peer * p = nodeid_to_peer(*n, 0); |
430 | if (!p) return 0; |
||
431 | ea084625 | Csaba Kiraly | |
432 | 1b7da906 | Csaba Kiraly | bmap = p->bmap; |
433 | ea084625 | Csaba Kiraly | if (!bmap) return 0; |
434 | latest = chunkID_set_get_latest(bmap); |
||
435 | if (latest == INT_MIN) return 0; |
||
436 | |||
437 | return -latest;
|
||
438 | } |
||
439 | |||
440 | 97ab5c93 | Csaba Kiraly | double chunkScoreChunkID(int *cid){ |
441 | return (double) *cid; |
||
442 | } |
||
443 | |||
444 | 74a5d4ae | CsabaKiraly | double getChunkTimestamp(int *cid){ |
445 | 43dd1a10 | Csaba Kiraly | const struct chunk *c = cb_get_chunk(cb, *cid); |
446 | 1b7da906 | Csaba Kiraly | if (!c) return 0; |
447 | |||
448 | return (double) c->timestamp; |
||
449 | 4367dafd | Csaba Kiraly | } |
450 | |||
451 | 8e750be6 | Csaba Kiraly | void send_accepted_chunks(struct nodeID *toid, struct chunkID_set *cset_acc, int max_deliver, uint16_t trans_id){ |
452 | 8e85b948 | Alessandro Russo | int i, d, cset_acc_size, res;
|
453 | b13977c0 | Csaba Kiraly | struct peer *to = nodeid_to_peer(toid, 0); |
454 | d447f71d | Csaba Kiraly | |
455 | cset_acc_size = chunkID_set_size(cset_acc); |
||
456 | e2c563e7 | Csaba Kiraly | reg_offer_accept(cset_acc_size > 0 ? 1 : 0); //this only works if accepts are sent back even if 0 is accepted |
457 | d447f71d | Csaba Kiraly | for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) { |
458 | 43dd1a10 | Csaba Kiraly | const struct chunk *c; |
459 | d447f71d | Csaba Kiraly | int chunkid = chunkID_set_get_chunk(cset_acc, i);
|
460 | c = cb_get_chunk(cb, chunkid); |
||
461 | f8286367 | Csaba Kiraly | if (c && (!to || needs(to, chunkid)) ) {// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification |
462 | f5d9663e | Csaba Kiraly | chunk_attributes_update_sending(c); |
463 | 033319c7 | Csaba Kiraly | res = sendChunk(toid, c, trans_id); |
464 | d447f71d | Csaba Kiraly | if (res >= 0) { |
465 | f8286367 | Csaba Kiraly | if(to) chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive |
466 | d447f71d | Csaba Kiraly | d++; |
467 | f740dafb | Csaba Kiraly | reg_chunk_send(c->id); |
468 | 0adbb1ef | Csaba Kiraly | if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d\n", c->id, node_addr(toid), gettimeofday_in_us(), res);} |
469 | 96b8d0a8 | Csaba Kiraly | } else {
|
470 | fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
||
471 | d447f71d | Csaba Kiraly | } |
472 | } |
||
473 | } |
||
474 | } |
||
475 | |||
476 | dc87dca9 | Csaba Kiraly | int offer_peer_count()
|
477 | { |
||
478 | return offer_per_tick;
|
||
479 | } |
||
480 | |||
481 | 74a5d4ae | CsabaKiraly | int offer_max_deliver(struct nodeID *n) |
482 | 960017bf | Csaba Kiraly | { |
483 | 10fd812c | Csaba Kiraly | |
484 | if (!heuristics_distance_maxdeliver) return 1; |
||
485 | |||
486 | 5fc04950 | Csaba Kiraly | #ifdef MONL
|
487 | 1b7da906 | Csaba Kiraly | switch (get_hopcount(n)) {
|
488 | 960017bf | Csaba Kiraly | case 0: return 5; |
489 | case 1: return 2; |
||
490 | default: return 1; |
||
491 | } |
||
492 | 5fc04950 | Csaba Kiraly | #else
|
493 | return 1; |
||
494 | #endif
|
||
495 | 960017bf | Csaba Kiraly | } |
496 | |||
497 | fcb5c29b | Csaba Kiraly | void send_offer()
|
498 | 43355360 | Csaba Kiraly | { |
499 | struct chunk *buff;
|
||
500 | 74a5d4ae | CsabaKiraly | int size, res, i, n;
|
501 | 43355360 | Csaba Kiraly | struct peer *neighbours;
|
502 | fcb5c29b | Csaba Kiraly | struct peerset *pset;
|
503 | 43355360 | Csaba Kiraly | |
504 | fcb5c29b | Csaba Kiraly | pset = get_peers(); |
505 | 43355360 | Csaba Kiraly | n = peerset_size(pset); |
506 | neighbours = peerset_get_peers(pset); |
||
507 | dprintf("Send Offer: %d neighbours\n", n);
|
||
508 | if (n == 0) return; |
||
509 | buff = cb_get_chunks(cb, &size); |
||
510 | if (size == 0) return; |
||
511 | |||
512 | { |
||
513 | dc87dca9 | Csaba Kiraly | size_t selectedpeers_len = offer_peer_count(); |
514 | 1b7da906 | Csaba Kiraly | int chunkids[size];
|
515 | 8bb39b0b | Csaba Kiraly | struct peer *nodeids[n];
|
516 | struct peer *selectedpeers[selectedpeers_len];
|
||
517 | 43355360 | Csaba Kiraly | |
518 | abbef5a5 | Csaba Kiraly | //reduce load a little bit if there are losses on the path from this guy
|
519 | 68330740 | Csaba Kiraly | double average_lossrate = get_average_lossrate_pset(pset);
|
520 | abbef5a5 | Csaba Kiraly | average_lossrate = finite(average_lossrate) ? average_lossrate : 0; //start agressively, assuming 0 loss |
521 | f76dc385 | Csaba Kiraly | if (rand()/((double)RAND_MAX + 1) < 10 * average_lossrate ) { |
522 | abbef5a5 | Csaba Kiraly | return;
|
523 | } |
||
524 | |||
525 | ffcc70ad | CsabaKiraly | for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
526 | 8bb39b0b | Csaba Kiraly | for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
527 | 851fa962 | Csaba Kiraly | selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
528 | 43355360 | Csaba Kiraly | |
529 | for (i=0; i<selectedpeers_len ; i++){ |
||
530 | 8bb39b0b | Csaba Kiraly | int max_deliver = offer_max_deliver(selectedpeers[i]->id);
|
531 | 43355360 | Csaba Kiraly | struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
532 | 8bb39b0b | Csaba Kiraly | dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, node_addr(selectedpeers[i]->id), selectedpeers[i]->cb_size);
|
533 | res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++); |
||
534 | ddedf85f | Csaba Kiraly | chunkID_set_free(my_bmap); |
535 | 43355360 | Csaba Kiraly | } |
536 | } |
||
537 | } |
||
538 | |||
539 | 4367dafd | Csaba Kiraly | |
540 | fcb5c29b | Csaba Kiraly | void send_chunk()
|
541 | 89e893e2 | Luca | { |
542 | struct chunk *buff;
|
||
543 | 74a5d4ae | CsabaKiraly | int size, res, i, n;
|
544 | 924226c0 | Luca Abeni | struct peer *neighbours;
|
545 | fcb5c29b | Csaba Kiraly | struct peerset *pset;
|
546 | 1cd06c26 | CsabaKiraly | |
547 | fcb5c29b | Csaba Kiraly | pset = get_peers(); |
548 | 0f35d029 | Csaba Kiraly | n = peerset_size(pset); |
549 | neighbours = peerset_get_peers(pset); |
||
550 | e64fc7e5 | Luca | dprintf("Send Chunk: %d neighbours\n", n);
|
551 | 89e893e2 | Luca | if (n == 0) return; |
552 | buff = cb_get_chunks(cb, &size); |
||
553 | e64fc7e5 | Luca | dprintf("\t %d chunks in buffer...\n", size);
|
554 | 89e893e2 | Luca | if (size == 0) return; |
555 | |||
556 | /************ STUPID DUMB SCHEDULING ****************/
|
||
557 | 4367dafd | Csaba Kiraly | //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
558 | //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
||
559 | 89e893e2 | Luca | /************ /STUPID DUMB SCHEDULING ****************/
|
560 | |||
561 | 4367dafd | Csaba Kiraly | /************ USE SCHEDULER ****************/
|
562 | 924226c0 | Luca Abeni | { |
563 | size_t selectedpairs_len = 1;
|
||
564 | 1b7da906 | Csaba Kiraly | int chunkids[size];
|
565 | 7b86e7d9 | Csaba Kiraly | struct peer *nodeids[n];
|
566 | 924226c0 | Luca Abeni | struct PeerChunk selectedpairs[1]; |
567 | |||
568 | 1b7da906 | Csaba Kiraly | for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id; |
569 | 7b86e7d9 | Csaba Kiraly | for (i = 0; i<n; i++) nodeids[i] = (neighbours+i); |
570 | 0fec1310 | Csaba Kiraly | SCHED_TYPE(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpairs, &selectedpairs_len, SCHED_NEEDS, SCHED_PEER, SCHED_CHUNK); |
571 | 4367dafd | Csaba Kiraly | /************ /USE SCHEDULER ****************/
|
572 | |||
573 | 924226c0 | Luca Abeni | for (i=0; i<selectedpairs_len ; i++){ |
574 | 7b86e7d9 | Csaba Kiraly | struct peer *p = selectedpairs[i].peer;
|
575 | 74a5d4ae | CsabaKiraly | struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
|
576 | 924226c0 | Luca Abeni | dprintf("\t sending chunk[%d] to ", c->id);
|
577 | dprintf("%s\n", node_addr(p->id));
|
||
578 | |||
579 | b0225995 | Csaba Kiraly | send_bmap(p->id); |
580 | bc1ddc15 | MatteoSammarco | |
581 | f5d9663e | Csaba Kiraly | chunk_attributes_update_sending(c); |
582 | 033319c7 | Csaba Kiraly | res = sendChunk(p->id, c, 0); //we do not use transactions in pure push |
583 | 84ff82ba | CsabaKiraly | if(chunk_log){fprintf(stderr, "TEO: Sending chunk %d to peer: %s at: %"PRIu64" Result: %d Size: %d bytes\n", c->id, node_addr(p->id), gettimeofday_in_us(), res, c->size);} |
584 | 13d85fc6 | Csaba Kiraly | dprintf("\tResult: %d\n", res);
|
585 | 924226c0 | Luca Abeni | if (res>=0) { |
586 | chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
||
587 | f740dafb | Csaba Kiraly | reg_chunk_send(c->id); |
588 | 96b8d0a8 | Csaba Kiraly | } else {
|
589 | fprintf(stderr,"ERROR sending chunk %d\n",c->id);
|
||
590 | 924226c0 | Luca Abeni | } |
591 | 0f35d029 | Csaba Kiraly | } |
592 | 4367dafd | Csaba Kiraly | } |
593 | 89e893e2 | Luca | } |