pstreamer / src / topology.c @ bdfb47bf
History | View | Annotate | Download (13.7 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2014-2017 Luca Baldesi
|
3 |
*
|
4 |
* This file is part of PeerStreamer.
|
5 |
*
|
6 |
* PeerStreamer is free software: you can redistribute it and/or
|
7 |
* modify it under the terms of the GNU Affero General Public License as
|
8 |
* published by the Free Software Foundation, either version 3 of the
|
9 |
* License, or (at your option) any later version.
|
10 |
*
|
11 |
* PeerStreamer is distributed in the hope that it will be useful,
|
12 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
|
14 |
* General Public License for more details.
|
15 |
*
|
16 |
* You should have received a copy of the GNU Affero General Public License
|
17 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>.
|
18 |
*
|
19 |
*/
|
20 |
#include <stdint.h> |
21 |
#include <stdio.h> |
22 |
#include <sys/time.h> |
23 |
#include <time.h> |
24 |
#include <stdlib.h> |
25 |
#include <string.h> |
26 |
//
|
27 |
#include <math.h> |
28 |
#include <net_helper.h> |
29 |
#include <peerset.h> |
30 |
#include <peersampler.h> |
31 |
#include <peer.h> |
32 |
#include <grapes_msg_types.h> |
33 |
#include<grapes_config.h> |
34 |
//
|
35 |
#include "compatibility/timer.h" |
36 |
//
|
37 |
#include "topology.h" |
38 |
#include "net_helpers.h" |
39 |
#include "dbg.h" |
40 |
#include "measures.h" |
41 |
#include "streaming.h" |
42 |
|
43 |
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
|
44 |
enum neighbourhood_msg_t {NEIGHBOURHOOD_ADD, NEIGHBOURHOOD_REMOVE, NEIGHBOURHOOD_QUIT};
|
45 |
|
46 |
#define DEFAULT_PEER_CBSIZE 50 |
47 |
|
48 |
#ifndef NAN //NAN is missing in some old math.h versions |
49 |
#define NAN (0.0/0.0) |
50 |
#endif
|
51 |
|
52 |
struct metadata {
|
53 |
uint16_t cb_size; |
54 |
} __attribute__((packed)); |
55 |
|
56 |
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
|
57 |
|
58 |
struct topology {
|
59 |
double desired_bw;
|
60 |
double desired_rtt;
|
61 |
double alpha_target;
|
62 |
double topo_mem;
|
63 |
bool topo_out;
|
64 |
bool topo_in;
|
65 |
bool topo_keep_best;
|
66 |
bool topo_add_best;
|
67 |
int neighbourhood_target_size;
|
68 |
struct timeval tout_bmap;
|
69 |
struct metadata my_metadata;
|
70 |
|
71 |
const struct psinstance * ps; |
72 |
struct psample_context * tc;
|
73 |
struct peerset * neighbourhood;
|
74 |
struct peerset * swarm_bucket;
|
75 |
struct peerset * locked_neighs;
|
76 |
}; |
77 |
|
78 |
struct peerset * topology_get_neighbours(struct topology * t) |
79 |
{ |
80 |
return t->neighbourhood;
|
81 |
} |
82 |
|
83 |
void peerset_print(const struct peerset * pset, const char * name) |
84 |
{ |
85 |
const struct peer * p; |
86 |
int i;
|
87 |
if(name) fprintf(stderr,"%s\n",name); |
88 |
if(pset)
|
89 |
peerset_for_each(pset,p,i) |
90 |
fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
|
91 |
} |
92 |
|
93 |
void update_metadata(struct topology * t) |
94 |
{ |
95 |
t->my_metadata.cb_size = psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps);
|
96 |
} |
97 |
|
98 |
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id) |
99 |
{ |
100 |
struct peer * p = NULL; |
101 |
p = peerset_get_peer(t->swarm_bucket,id); |
102 |
if(p == NULL) |
103 |
p = peerset_get_peer(t->neighbourhood,id); |
104 |
return p;
|
105 |
} |
106 |
|
107 |
int topology_init(struct topology * t, const struct psinstance * ps, const char *config) |
108 |
{ |
109 |
struct tag * tags;
|
110 |
int peer_timeout;
|
111 |
|
112 |
tags = grapes_config_parse(config); |
113 |
|
114 |
bind_msg_type(MSG_TYPE_NEIGHBOURHOOD); |
115 |
bind_msg_type(MSG_TYPE_TOPOLOGY); |
116 |
|
117 |
grapes_config_value_int_default(tags, "peer_timeout", &peer_timeout, 10); |
118 |
t->tout_bmap.tv_sec = peer_timeout; |
119 |
t->tout_bmap.tv_usec = 0;
|
120 |
t->ps = ps; |
121 |
|
122 |
t->neighbourhood = peerset_init(0);
|
123 |
t->swarm_bucket = peerset_init(0);
|
124 |
t->locked_neighs = peerset_init(0);
|
125 |
|
126 |
t->desired_bw = 0; //TODO: turn on capacity measurement and set meaningful default value |
127 |
t->desired_rtt = 0.2; |
128 |
t->alpha_target = 0.4; |
129 |
t->topo_mem = 0.7; |
130 |
t->topo_out = true; //peer selects out-neighbours |
131 |
t->topo_in = true; //peer selects in-neighbours (combined means bidirectional) |
132 |
t->topo_keep_best = false;
|
133 |
t->topo_add_best = false;
|
134 |
t->neighbourhood_target_size = 30;
|
135 |
|
136 |
update_metadata(t); |
137 |
t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config); |
138 |
|
139 |
free(tags); |
140 |
return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0; |
141 |
} |
142 |
|
143 |
struct topology * topology_create(const struct psinstance *ps, const char *config) |
144 |
{ |
145 |
struct topology * t = NULL; |
146 |
t = malloc(sizeof(struct topology)); |
147 |
topology_init(t, ps, config); |
148 |
return t;
|
149 |
} |
150 |
|
151 |
/*useful during bootstrap*/
|
152 |
int topology_node_insert(struct topology * t, struct nodeID *id) |
153 |
{ |
154 |
struct metadata m = {0}; |
155 |
if (topology_get_peer(t, id) == NULL) |
156 |
peerset_add_peer(t->swarm_bucket,id); |
157 |
return psample_add_peer(t->tc,id,&m,sizeof(m)); |
158 |
} |
159 |
|
160 |
void topology_peer_set_metadata(struct peer *p, const struct metadata *m) |
161 |
{ |
162 |
if (p)
|
163 |
{ |
164 |
if (m)
|
165 |
{ |
166 |
p->cb_size = m->cb_size; |
167 |
} |
168 |
else
|
169 |
{ |
170 |
p->cb_size = DEFAULT_PEER_CBSIZE; |
171 |
} |
172 |
|
173 |
} |
174 |
} |
175 |
|
176 |
struct peer * neighbourhood_add_peer(struct topology * t, const struct nodeID *id) |
177 |
{ |
178 |
struct peer * p = NULL; |
179 |
if (id)
|
180 |
{ |
181 |
p = peerset_pop_peer(t->swarm_bucket,id); |
182 |
if(p)
|
183 |
peerset_push_peer(t->neighbourhood,p); |
184 |
else
|
185 |
{ |
186 |
peerset_add_peer(t->neighbourhood,id); |
187 |
p = peerset_get_peer(t->neighbourhood,id); |
188 |
peerset_push_peer(t->locked_neighs,p); |
189 |
} |
190 |
measures_add_node(psinstance_measures(t->ps), p->id); |
191 |
// fprintf(stderr,"[DEBUG] sending bmap to peer %s \n",nodeid_static_str(id));
|
192 |
send_bmap(psinstance_streaming(t->ps), id); |
193 |
} |
194 |
return p;
|
195 |
} |
196 |
|
197 |
void neighbourhood_remove_peer(struct topology * t, const struct nodeID *id) |
198 |
{ |
199 |
struct peer *p=NULL; |
200 |
if(id)
|
201 |
{ |
202 |
p = peerset_pop_peer(t->neighbourhood,id); |
203 |
if(p)
|
204 |
peerset_push_peer(t->swarm_bucket,p); |
205 |
|
206 |
peerset_pop_peer(t->locked_neighs,id); |
207 |
} |
208 |
} |
209 |
|
210 |
void topology_remove_peer(struct topology * t, const struct nodeID *id) |
211 |
{ |
212 |
if(t && id)
|
213 |
{ |
214 |
peerset_pop_peer(t->locked_neighs, id); |
215 |
psample_remove_peer(t->tc, id); |
216 |
peerset_remove_peer(t->neighbourhood, id); |
217 |
// peerset_remove_peer(t->swarm_bucket, id);
|
218 |
} |
219 |
} |
220 |
|
221 |
void neighbourhood_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len) |
222 |
{ |
223 |
struct metadata m = {0}; |
224 |
struct peer *p = NULL; |
225 |
|
226 |
switch(buff[0]) { |
227 |
case NEIGHBOURHOOD_ADD:
|
228 |
// fprintf(stderr,"[DEBUG] adding peer %s from message\n",nodeid_static_str(from));
|
229 |
p = neighbourhood_add_peer(t, from); |
230 |
if (len >= (sizeof(struct metadata) + 1)) |
231 |
{ |
232 |
memmove(&m,buff+1,sizeof(struct metadata)); |
233 |
topology_peer_set_metadata(p,&m); |
234 |
} |
235 |
break;
|
236 |
|
237 |
case NEIGHBOURHOOD_REMOVE:
|
238 |
neighbourhood_remove_peer(t, from); |
239 |
break;
|
240 |
case NEIGHBOURHOOD_QUIT:
|
241 |
topology_remove_peer(t, from); |
242 |
break;
|
243 |
default:
|
244 |
dprintf("Unknown neighbourhood message type");
|
245 |
} |
246 |
} |
247 |
|
248 |
void topology_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len) |
249 |
{ |
250 |
switch(buff[0]) { |
251 |
case MSG_TYPE_NEIGHBOURHOOD:
|
252 |
if (t->topo_in)
|
253 |
{ |
254 |
neighbourhood_message_parse(t, from, buff+1,len);
|
255 |
reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood)); |
256 |
} |
257 |
break;
|
258 |
case MSG_TYPE_TOPOLOGY:
|
259 |
psample_parse_data(t->tc,buff,len); |
260 |
//fprintf(stderr,"[DEBUG] received TOPO message\n");
|
261 |
break;
|
262 |
default:
|
263 |
fprintf(stderr,"Unknown topology message type");
|
264 |
} |
265 |
} |
266 |
|
267 |
void topology_sample_peers(struct topology * t) |
268 |
{ |
269 |
int sample_nodes_num,sample_metas_num,i;
|
270 |
const struct nodeID * const * sample_nodes; |
271 |
struct metadata const * sample_metas; |
272 |
struct peer * p;
|
273 |
|
274 |
//fprintf(stderr,"[DEBUG] starting peer sampling\n");
|
275 |
sample_nodes = psample_get_cache(t->tc,&sample_nodes_num); |
276 |
sample_metas = psample_get_metadata(t->tc,&sample_metas_num); |
277 |
for (i=0;i<sample_nodes_num;i++) |
278 |
{ |
279 |
//fprintf(stderr,"[DEBUG] sampled node: %s\n",nodeid_static_str(sample_nodes[i]));
|
280 |
p = topology_get_peer(t, sample_nodes[i]); |
281 |
if(p==NULL) |
282 |
{ |
283 |
peerset_add_peer(t->swarm_bucket,sample_nodes[i]); |
284 |
p = topology_get_peer(t, sample_nodes[i]); |
285 |
} |
286 |
topology_peer_set_metadata(p,&(sample_metas[i])); |
287 |
} |
288 |
} |
289 |
|
290 |
void topology_blacklist_add(struct topology * t, struct nodeID * id) |
291 |
{ |
292 |
} |
293 |
|
294 |
void neighbourhood_drop_unactives(struct topology * t, struct timeval * bmap_timeout) |
295 |
{ |
296 |
struct timeval tnow, told;
|
297 |
struct peer *const *peers; |
298 |
int i;
|
299 |
gettimeofday(&tnow, NULL);
|
300 |
timersub(&tnow, bmap_timeout, &told); |
301 |
peers = peerset_get_peers(t->neighbourhood); |
302 |
for (i = 0; i < peerset_size(t->neighbourhood); i++) { |
303 |
if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
|
304 |
( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <) ) ) { |
305 |
dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood));
|
306 |
// if (peerset_size(t->neighbourhood) > 1) { // avoid dropping our last link to the world
|
307 |
topology_blacklist_add(t, peers[i]->id); |
308 |
topology_remove_peer(t, peers[i]->id); |
309 |
// }
|
310 |
} |
311 |
} |
312 |
|
313 |
} |
314 |
|
315 |
void array_shuffle(void *base, int nmemb, int size) { |
316 |
int i,newpos;
|
317 |
unsigned char t[size]; |
318 |
unsigned char* b = base; |
319 |
|
320 |
for (i = nmemb - 1; i > 0; i--) { |
321 |
newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1); |
322 |
memcpy(t, b + size * newpos, size); |
323 |
memmove(b + size * newpos, b + size * i, size); |
324 |
memcpy(b + size * i, t, size); |
325 |
} |
326 |
} |
327 |
|
328 |
double get_rtt_of(struct topology *t, const struct nodeID* n){ |
329 |
return NAN;
|
330 |
} |
331 |
|
332 |
double get_capacity_of(struct topology *t, const struct nodeID* n){ |
333 |
struct peer *p = topology_get_peer(t, n);
|
334 |
if (p) {
|
335 |
return p->capacity;
|
336 |
} |
337 |
|
338 |
return NAN;
|
339 |
} |
340 |
|
341 |
int neighbourhood_send_msg(struct topology *t, const struct peer * p,uint8_t type) |
342 |
{ |
343 |
uint8_t * msg; |
344 |
int res;
|
345 |
msg = malloc(sizeof(struct metadata)+2); |
346 |
msg[0] = MSG_TYPE_NEIGHBOURHOOD;
|
347 |
msg[1] = type;
|
348 |
memmove(msg+2,&(t->my_metadata),sizeof(struct metadata)); |
349 |
res = send_to_peer(psinstance_nodeid(t->ps), p->id, msg, sizeof(struct metadata)+2); |
350 |
free(msg); |
351 |
return res;
|
352 |
} |
353 |
|
354 |
void topology_quit_overlay(struct topology *t) |
355 |
{ |
356 |
const struct peer * p; |
357 |
int i;
|
358 |
|
359 |
dprintf("Notifying known peers of quitting...\n");
|
360 |
peerset_for_each(t->neighbourhood, p, i) |
361 |
neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT); |
362 |
peerset_for_each(t->swarm_bucket, p, i) |
363 |
neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT); |
364 |
} |
365 |
|
366 |
void peerset_destroy_reference_copy(struct peerset ** pset) |
367 |
{ |
368 |
while (peerset_size(*pset))
|
369 |
peerset_pop_peer(*pset,(peerset_get_peers(*pset)[0])->id);
|
370 |
|
371 |
peerset_destroy(pset); |
372 |
} |
373 |
|
374 |
struct peerset * peerset_create_reference_copy(struct peerset * pset) |
375 |
{ |
376 |
struct peerset * ns;
|
377 |
const struct peer * p; |
378 |
int i;
|
379 |
|
380 |
ns = peerset_init(0);
|
381 |
peerset_for_each(pset,p,i) |
382 |
peerset_push_peer(ns, (struct peer *)p);
|
383 |
return ns;
|
384 |
} |
385 |
|
386 |
struct peer *nodeid_to_peer(struct topology * t, struct nodeID *id,int reg) |
387 |
{ |
388 |
struct peer * p;
|
389 |
p = topology_get_peer(t, id); |
390 |
if(p==NULL && reg) |
391 |
{ |
392 |
topology_node_insert(t, id); |
393 |
neighbourhood_add_peer(t, id); |
394 |
p = topology_get_peer(t, id); |
395 |
if(t->topo_out)
|
396 |
neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD); |
397 |
} |
398 |
return p;
|
399 |
} |
400 |
|
401 |
/* move num peers from pset1 to pset2 after applying the filtering_mask function and following the given criterion */
|
402 |
void topology_move_peers(struct peerset * pset1, struct peerset * pset2,int num,enum peer_choice criterion,bool (*filter_mask)(const struct peer *),int (*cmp_peer)(const void* p0, const void* p1)) |
403 |
{ |
404 |
struct peer * const * const_peers; |
405 |
struct peer ** peers;
|
406 |
struct peer *p;
|
407 |
int peers_num,i,j;
|
408 |
|
409 |
peers_num = peerset_size(pset1); |
410 |
const_peers = peerset_get_peers(pset1); |
411 |
peers = (struct peer **)malloc(sizeof(struct peer *)*peers_num); |
412 |
if (filter_mask)
|
413 |
{ |
414 |
for(i = 0,j = 0; i<peers_num; i++) |
415 |
if (filter_mask(const_peers[i]))
|
416 |
peers[j++] = const_peers[i]; |
417 |
peers_num = j; |
418 |
} else
|
419 |
memmove(peers,const_peers,peers_num*sizeof(struct peer*)); |
420 |
|
421 |
if (criterion != PEER_CHOICE_RANDOM && cmp_peer != NULL) { |
422 |
//fprintf(stderr,"[DEBUG] choosen the qsort\n");
|
423 |
qsort(peers, peers_num, sizeof(struct peer*), cmp_peer); |
424 |
} else {
|
425 |
array_shuffle(peers, peers_num, sizeof(struct peer *)); |
426 |
} |
427 |
for (i=0; i<peers_num && i<num; i++) |
428 |
{ |
429 |
if (criterion == PEER_CHOICE_WORST)
|
430 |
p = peerset_pop_peer(pset1,(peers[peers_num -i -1])->id);
|
431 |
else
|
432 |
p = peerset_pop_peer(pset1,(peers[i])->id); |
433 |
peerset_push_peer(pset2,p); |
434 |
} |
435 |
free(peers); |
436 |
} |
437 |
|
438 |
void peerset_reference_copy_add(struct peerset * dst, struct peerset * src) |
439 |
{ |
440 |
const struct peer *p; |
441 |
int i;
|
442 |
|
443 |
peerset_for_each(src,p,i) |
444 |
peerset_push_peer(dst, (struct peer *)p);
|
445 |
} |
446 |
|
447 |
void topology_signal_change(struct topology *t, const struct peerset const * old_neighs) |
448 |
{ |
449 |
const struct peer * p; |
450 |
int i;
|
451 |
reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood)); |
452 |
|
453 |
// advertise changes
|
454 |
if(t->topo_out)
|
455 |
{ |
456 |
peerset_for_each(t->neighbourhood,p,i) |
457 |
{ |
458 |
if(peerset_check(old_neighs,p->id) < 0) |
459 |
neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD); |
460 |
} |
461 |
peerset_for_each(old_neighs,p,i) |
462 |
{ |
463 |
if(peerset_check(t->neighbourhood,p->id) < 0) |
464 |
neighbourhood_send_msg(t,p,NEIGHBOURHOOD_REMOVE); |
465 |
} |
466 |
} |
467 |
} |
468 |
|
469 |
|
470 |
void topology_update_random(struct topology * t) |
471 |
{ |
472 |
int discard_num;
|
473 |
int others_num;
|
474 |
|
475 |
discard_num = (int)((1-t->topo_mem) * peerset_size(t->neighbourhood)); |
476 |
topology_move_peers(t->neighbourhood,t->swarm_bucket,discard_num,PEER_CHOICE_RANDOM,NULL,NULL); |
477 |
|
478 |
others_num = MAX(t->neighbourhood_target_size-peerset_size(t->neighbourhood),0);
|
479 |
topology_move_peers(t->swarm_bucket,t->neighbourhood,others_num,PEER_CHOICE_RANDOM,NULL,NULL); |
480 |
} |
481 |
|
482 |
|
483 |
void topology_update(struct topology * t) |
484 |
{ |
485 |
struct peerset * old_neighs;
|
486 |
const struct peer * p; |
487 |
int i;
|
488 |
|
489 |
psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages |
490 |
|
491 |
update_metadata(t); |
492 |
topology_sample_peers(t); |
493 |
|
494 |
if timerisset(&(t->tout_bmap) )
|
495 |
neighbourhood_drop_unactives(t, &(t->tout_bmap)); |
496 |
|
497 |
old_neighs = peerset_create_reference_copy(t->neighbourhood); |
498 |
|
499 |
topology_update_random(t); |
500 |
|
501 |
topology_signal_change(t, old_neighs); |
502 |
peerset_destroy_reference_copy(&old_neighs); |
503 |
|
504 |
peerset_for_each(t->swarm_bucket,p,i) |
505 |
peerset_pop_peer(t->locked_neighs,p->id); |
506 |
peerset_clear(t->swarm_bucket,0); // we don't remember past peers |
507 |
} |
508 |
|
509 |
void topology_destroy(struct topology **t) |
510 |
{ |
511 |
topology_quit_overlay(*t); |
512 |
if (t && *t)
|
513 |
{ |
514 |
if(((*t)->locked_neighs))
|
515 |
peerset_destroy_reference_copy(&((*t)->locked_neighs)); |
516 |
if(((*t)->neighbourhood))
|
517 |
peerset_destroy(&((*t)->neighbourhood)); |
518 |
if(((*t)->swarm_bucket))
|
519 |
peerset_destroy(&((*t)->swarm_bucket)); |
520 |
if(((*t)->tc))
|
521 |
psample_destroy(&((*t)->tc)); |
522 |
free(*t); |
523 |
} |
524 |
} |