streamers / topology.c @ f6e9f400
History | View | Annotate | Download (14 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Csaba Kiraly
|
3 |
* Copyright (c) 2010 Luca Abeni
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <stdint.h> |
8 |
#include <stdio.h> |
9 |
#include <sys/time.h> |
10 |
#include <time.h> |
11 |
#include <stdlib.h> |
12 |
#include <string.h> |
13 |
|
14 |
#include <math.h> |
15 |
#include <net_helper.h> |
16 |
#include <peerset.h> |
17 |
#include <peer.h> |
18 |
#include <grapes_msg_types.h> |
19 |
#include <topmanager.h> |
20 |
#include <tman.h> |
21 |
|
22 |
#include "compatibility/timer.h" |
23 |
|
24 |
#include "topology.h" |
25 |
#include "nodeid_set.h" |
26 |
#include "streaming.h" |
27 |
#include "dbg.h" |
28 |
#include "measures.h" |
29 |
#include "streamer.h" |
30 |
|
31 |
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
|
32 |
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
|
33 |
|
34 |
double desired_bw = 0; |
35 |
double desired_rtt = 0.2; |
36 |
double alpha_target = 0.5; |
37 |
double topo_mem = 0; |
38 |
|
39 |
bool topo_out = true; //peer selects out-neighbours |
40 |
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional) |
41 |
|
42 |
bool topo_keep_best = false; |
43 |
bool topo_add_best = false; |
44 |
|
45 |
int NEIGHBORHOOD_TARGET_SIZE = 30; |
46 |
double NEIGHBORHOOD_ROTATE_RATIO = 1.0; |
47 |
#define TMAN_MAX_IDLE 10 |
48 |
#define TMAN_LOG_EVERY 1000 |
49 |
|
50 |
#define STREAMER_TOPOLOGY_MSG_ADD 0 |
51 |
#define STREAMER_TOPOLOGY_MSG_REMOVE 1 |
52 |
|
53 |
static struct peerset *pset; |
54 |
static struct timeval tout_bmap = {20, 0}; |
55 |
static int counter = 0; |
56 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in); |
57 |
static tmanRankingFunction rankFunct = simpleRanker;
|
58 |
|
59 |
struct metadata {
|
60 |
uint16_t cb_size; |
61 |
uint16_t cps; |
62 |
float capacity;
|
63 |
float recv_delay;
|
64 |
} __attribute__((packed)); |
65 |
|
66 |
static struct metadata my_metadata; |
67 |
static int cnt = 0; |
68 |
static struct nodeID *me = NULL; |
69 |
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY}; |
70 |
static struct nodeID ** neighbors; |
71 |
|
72 |
static void update_metadata(void) { |
73 |
my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
|
74 |
my_metadata.recv_delay = get_receive_delay(); |
75 |
my_metadata.cps = get_chunks_per_sec(); |
76 |
my_metadata.capacity = get_capacity(); |
77 |
} |
78 |
|
79 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) { |
80 |
|
81 |
double t,p1,p2;
|
82 |
t = *((const double *)tin); |
83 |
p1 = *((const double *)p1in); |
84 |
p2 = *((const double *)p2in); |
85 |
|
86 |
if (isnan(t) || (isnan(p1) && isnan(p2))) return 0; |
87 |
else if (isnan(p1)) return 2; |
88 |
else if (isnan(p2)) return 1; |
89 |
else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2; |
90 |
|
91 |
} |
92 |
|
93 |
int topologyInit(struct nodeID *myID, const char *config) |
94 |
{ |
95 |
int i;
|
96 |
for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++) |
97 |
bind_msg_type(mTypes[i]); |
98 |
update_metadata(); |
99 |
me = myID; |
100 |
return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0)); |
101 |
} |
102 |
|
103 |
void topologyShutdown(void) |
104 |
{ |
105 |
} |
106 |
|
107 |
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size) |
108 |
{ |
109 |
// TODO: check this!! Just to use this function to bootstrap ncast...
|
110 |
struct metadata m = {0}; //TODO: check what metadata option should mean |
111 |
|
112 |
if (counter < TMAN_MAX_IDLE)
|
113 |
return topAddNeighbour(neighbour,&m,sizeof(m)); |
114 |
else return tmanAddNeighbour(neighbour,&m,sizeof(m)); |
115 |
} |
116 |
|
117 |
static int topoParseData(const uint8_t *buff, int len) |
118 |
{ |
119 |
int res = -1,ncs = 0,msize; |
120 |
const struct nodeID **n; const void *m; |
121 |
if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) { |
122 |
res = topParseData(buff,len); |
123 |
// if (counter <= TMAN_MAX_IDLE)
|
124 |
// counter++;
|
125 |
} |
126 |
if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN)) |
127 |
{ |
128 |
n = topGetNeighbourhood(&ncs); |
129 |
if (ncs) {
|
130 |
m = topGetMetadata(&msize); |
131 |
res = tmanParseData(buff,len,n,ncs,m,msize); |
132 |
} |
133 |
} |
134 |
return res;
|
135 |
} |
136 |
|
137 |
static const struct nodeID **topoGetNeighbourhood(int *n) |
138 |
{ |
139 |
int i; double d; |
140 |
if (counter > TMAN_MAX_IDLE) {
|
141 |
uint8_t *mdata; int msize;
|
142 |
*n = tmanGetNeighbourhoodSize(); |
143 |
if (neighbors) free(neighbors);
|
144 |
neighbors = calloc(*n,sizeof(struct nodeID *)); |
145 |
tmanGetMetadata(&msize); |
146 |
mdata = calloc(*n,msize); |
147 |
tmanGivePeers(*n,neighbors,(void *)mdata);
|
148 |
|
149 |
if (cnt % TMAN_LOG_EVERY == 0) { |
150 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
|
151 |
for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) { |
152 |
d = *((double *)(mdata+i*msize));
|
153 |
fprintf(stderr,"abouttopublish,%s,",node_addr(me));
|
154 |
fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
|
155 |
} |
156 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
|
157 |
} |
158 |
|
159 |
free(mdata); |
160 |
return (const struct nodeID **)neighbors; |
161 |
} |
162 |
else
|
163 |
return topGetNeighbourhood(n);
|
164 |
} |
165 |
|
166 |
static void topoAddToBL (struct nodeID *id) |
167 |
{ |
168 |
if (counter >= TMAN_MAX_IDLE)
|
169 |
tmanAddToBlackList(id); |
170 |
// else
|
171 |
topAddToBlackList(id); |
172 |
} |
173 |
|
174 |
//TODO: send metadata as well
|
175 |
static int send_topo_msg(struct nodeID *dst, uint8_t type) |
176 |
{ |
177 |
char msg[2]; |
178 |
msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
|
179 |
msg[1] = type;
|
180 |
return send_to_peer(me, dst, msg, 2); |
181 |
} |
182 |
|
183 |
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote) |
184 |
{ |
185 |
if (local) {
|
186 |
dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1); |
187 |
peerset_add_peer(pset, id); |
188 |
if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
|
189 |
if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
|
190 |
/* add measures here */
|
191 |
add_measures(id); |
192 |
send_bmap(id); |
193 |
} |
194 |
if (remote) {
|
195 |
dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
|
196 |
send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD); |
197 |
} |
198 |
} |
199 |
|
200 |
static void remove_peer(const struct nodeID *id, bool local, bool remote) |
201 |
{ |
202 |
if (local) {
|
203 |
dprintf("Removing %s from neighbourhood!\n", node_addr(id));
|
204 |
/* add measures here */
|
205 |
delete_measures(id); |
206 |
peerset_remove_peer(pset, id); |
207 |
} |
208 |
if (remote) {
|
209 |
dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
|
210 |
send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE); |
211 |
} |
212 |
} |
213 |
|
214 |
//get the rtt. Currenly only MONL version is supported
|
215 |
static double get_rtt_of(const struct nodeID* n){ |
216 |
#ifdef MONL
|
217 |
return get_rtt(n);
|
218 |
#else
|
219 |
return NAN;
|
220 |
#endif
|
221 |
} |
222 |
|
223 |
//get the declared capacity of a node
|
224 |
static double get_capacity_of(const struct nodeID* n){ |
225 |
struct peer *p = peerset_get_peer(pset, n);
|
226 |
if (p) {
|
227 |
return p->capacity;
|
228 |
} |
229 |
|
230 |
return NAN;
|
231 |
} |
232 |
|
233 |
//returns: 1:yes 0:no -1:unknown
|
234 |
int desiredness(const struct nodeID* n) { |
235 |
double rtt = get_rtt_of(n);
|
236 |
double bw = get_capacity_of(n);
|
237 |
|
238 |
if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) { |
239 |
return -1; |
240 |
} else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) { |
241 |
return 1; |
242 |
} |
243 |
|
244 |
return 0; |
245 |
} |
246 |
|
247 |
bool is_desired(const struct nodeID* n) { |
248 |
return (desiredness(n) == 1); |
249 |
} |
250 |
|
251 |
int cmp_rtt(const struct nodeID* a, const struct nodeID* b) { |
252 |
double ra, rb;
|
253 |
ra = get_rtt_of(a); |
254 |
rb = get_rtt_of(a); |
255 |
if ((isnan(ra) && isnan(rb)) || ra == rb) return 0; |
256 |
else if (isnan(rb) || ra < rb) return -1; |
257 |
else return 1; |
258 |
} |
259 |
|
260 |
int vcmp_rtt(const void* a, const void* b) { |
261 |
return cmp_rtt((const struct nodeID*)a, (const struct nodeID*)b); |
262 |
} |
263 |
|
264 |
// currently it just makes the peerset grow
|
265 |
void update_peers(struct nodeID *from, const uint8_t *buff, int len) |
266 |
{ |
267 |
int n_ids, metasize, i, newids_size, max_ids;
|
268 |
static const struct nodeID **newids; |
269 |
static const struct metadata *metas; |
270 |
struct peer *peers;
|
271 |
struct timeval tnow, told;
|
272 |
static const struct nodeID **savedids; |
273 |
static int savedids_size; |
274 |
|
275 |
if timerisset(&tout_bmap) {
|
276 |
gettimeofday(&tnow, NULL);
|
277 |
timersub(&tnow, &tout_bmap, &told); |
278 |
peers = peerset_get_peers(pset); |
279 |
for (i = 0; i < peerset_size(pset); i++) { |
280 |
if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
|
281 |
( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <) ) ) { |
282 |
ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
|
283 |
//if (peerset_size(pset) > 1) { // avoid dropping our last link to the world
|
284 |
topoAddToBL(peers[i].id); |
285 |
remove_peer(peers[i--].id, true, true); |
286 |
//}
|
287 |
} |
288 |
} |
289 |
} |
290 |
|
291 |
if (cnt++ % 100 == 0) { |
292 |
update_metadata(); |
293 |
if (counter > TMAN_MAX_IDLE) {
|
294 |
tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
|
295 |
} |
296 |
} |
297 |
|
298 |
//handle explicit add/remove messages
|
299 |
if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) { |
300 |
dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
|
301 |
if (len != 2) { |
302 |
fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
|
303 |
return;
|
304 |
} |
305 |
switch (buff[1]) { |
306 |
case STREAMER_TOPOLOGY_MSG_ADD:
|
307 |
ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
|
308 |
add_peer(from, NULL, true, false); |
309 |
break;
|
310 |
case STREAMER_TOPOLOGY_MSG_REMOVE:
|
311 |
ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
|
312 |
remove_peer(from, true, false); |
313 |
break;
|
314 |
default:
|
315 |
fprintf(stderr, "Bad streamer topo message received!\n");
|
316 |
} |
317 |
reg_neigh_size(peerset_size(pset)); |
318 |
return;
|
319 |
} |
320 |
|
321 |
topoParseData(buff, len); |
322 |
|
323 |
if (!buff) {
|
324 |
reg_neigh_size(peerset_size(pset)); |
325 |
return;
|
326 |
} |
327 |
|
328 |
peers = peerset_get_peers(pset); |
329 |
n_ids = peerset_size(pset); |
330 |
newids = topoGetNeighbourhood(&newids_size); //TODO handle both tman and topo
|
331 |
metas = topGetMetadata(&metasize); //TODO: check metasize
|
332 |
max_ids = n_ids + savedids_size + newids_size; |
333 |
ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
|
334 |
{ |
335 |
int desired_part;
|
336 |
const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids]; |
337 |
int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
|
338 |
nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids; |
339 |
|
340 |
if (topo_out) {
|
341 |
for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) { |
342 |
oldids[oldids_size++] = peers[i].id; |
343 |
fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
|
344 |
} |
345 |
} else {
|
346 |
for (i = 0, oldids_size = 0; i < savedids_size; i++) { |
347 |
oldids[oldids_size++] = savedids[i]; |
348 |
fprintf(stderr," %s - RTT: %f\n", node_addr(savedids[i]) , get_rtt_of(savedids[i]));
|
349 |
} |
350 |
savedids_size = 0;
|
351 |
free(savedids); |
352 |
} |
353 |
|
354 |
// select the topo_mem portion of peers to be kept (uniform random)
|
355 |
if (topo_keep_best) {
|
356 |
qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_rtt); |
357 |
} else {
|
358 |
nidset_shuffle(oldids, oldids_size); |
359 |
} |
360 |
keep_size = selecteds_size = (int) (topo_mem * oldids_size);
|
361 |
memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0])); |
362 |
|
363 |
// compose list of known nodeids
|
364 |
nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size); |
365 |
|
366 |
// compose list of candidate nodeids
|
367 |
nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size); |
368 |
|
369 |
// select the alpha_target portion of desired peers
|
370 |
desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0); |
371 |
nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired); |
372 |
if (topo_add_best) {
|
373 |
qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_rtt); |
374 |
} else {
|
375 |
nidset_shuffle(desireds, desireds_size); |
376 |
} |
377 |
nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part)); |
378 |
|
379 |
// random from the rest
|
380 |
nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size); |
381 |
nidset_shuffle(others, others_size); |
382 |
random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
|
383 |
nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size); |
384 |
|
385 |
fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
|
386 |
selecteds_size, nodeids_size, |
387 |
keep_size, oldids_size, |
388 |
MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part, |
389 |
random_size, others_size); |
390 |
// add new ones
|
391 |
nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size); |
392 |
for (i = 0; i < toadds_size; i++) { |
393 |
int j;
|
394 |
//searching for the metadata
|
395 |
if (nidset_find(&j, newids, newids_size, toadds[i])) {
|
396 |
fprintf(stderr," adding %s\n", node_addr(toadds[i]));
|
397 |
add_peer(newids[j], &metas[j], topo_out, topo_in); |
398 |
} else {
|
399 |
fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
|
400 |
} |
401 |
} |
402 |
|
403 |
// finally, remove those not needed
|
404 |
fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
|
405 |
nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size); |
406 |
for (i = 0; i < toremoves_size; i++) { |
407 |
fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
|
408 |
remove_peer(toremoves[i], topo_out, topo_in); |
409 |
} |
410 |
fprintf(stderr,"Topo remove end\n");
|
411 |
|
412 |
if (!topo_out) {
|
413 |
savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors |
414 |
for (i = 0, savedids_size = 0; i < selecteds_size; i++) { |
415 |
savedids[savedids_size++] = nodeid_dup(selecteds[i]); |
416 |
} |
417 |
for (i = 0; i < oldids_size; i++) { |
418 |
nodeid_free(oldids[i]); |
419 |
} |
420 |
} |
421 |
} |
422 |
|
423 |
reg_neigh_size(peerset_size(pset)); |
424 |
} |
425 |
|
426 |
struct peer *nodeid_to_peer(const struct nodeID* id, int reg) |
427 |
{ |
428 |
struct peer *p = peerset_get_peer(pset, id);
|
429 |
if (!p) {
|
430 |
//fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
|
431 |
if (reg) {
|
432 |
add_peer(id,NULL, topo_out, false); |
433 |
fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
|
434 |
p = peerset_get_peer(pset,id); |
435 |
} |
436 |
} |
437 |
|
438 |
return p;
|
439 |
} |
440 |
|
441 |
int peers_init(void) |
442 |
{ |
443 |
fprintf(stderr,"peers_init\n");
|
444 |
pset = peerset_init(0);
|
445 |
return pset ? 1 : 0; |
446 |
} |
447 |
|
448 |
struct peerset *get_peers(void) |
449 |
{ |
450 |
return pset;
|
451 |
} |