streamers / topology.c @ c7ebbd34
History | View | Annotate | Download (11.5 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Csaba Kiraly
|
3 |
* Copyright (c) 2010 Luca Abeni
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <stdint.h> |
8 |
#include <stdio.h> |
9 |
#include <sys/time.h> |
10 |
#include <time.h> |
11 |
#include <stdlib.h> |
12 |
#include <string.h> |
13 |
|
14 |
#include <math.h> |
15 |
#include <net_helper.h> |
16 |
#include <peerset.h> |
17 |
#include <peer.h> |
18 |
#include <grapes_msg_types.h> |
19 |
#include <topmanager.h> |
20 |
#include <tman.h> |
21 |
|
22 |
#include "compatibility/timer.h" |
23 |
|
24 |
#include "topology.h" |
25 |
#include "streaming.h" |
26 |
#include "dbg.h" |
27 |
#include "measures.h" |
28 |
#include "streamer.h" |
29 |
|
30 |
#define MIN(A,B) ((A) < (B)) ? (A) : (B)
|
31 |
|
32 |
double desired_rtt = 0.2; |
33 |
double alpha_target = 0.5; |
34 |
|
35 |
int NEIGHBORHOOD_TARGET_SIZE = 20; |
36 |
double NEIGHBORHOOD_ROTATE_RATIO = 1.0; |
37 |
#define TMAN_MAX_IDLE 10 |
38 |
#define TMAN_LOG_EVERY 1000 |
39 |
|
40 |
static struct peerset *pset; |
41 |
static struct timeval tout_bmap = {10, 0}; |
42 |
static int counter = 0; |
43 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in); |
44 |
static tmanRankingFunction rankFunct = simpleRanker;
|
45 |
struct metadata {
|
46 |
uint16_t cb_size; |
47 |
uint16_t cps; |
48 |
float recv_delay;
|
49 |
}; |
50 |
static struct metadata my_metadata; |
51 |
static int cnt = 0; |
52 |
static struct nodeID *me = NULL; |
53 |
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN}; |
54 |
static struct nodeID ** neighbors; |
55 |
|
56 |
static void update_metadata(void) { |
57 |
|
58 |
my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
|
59 |
my_metadata.recv_delay = get_receive_delay(); |
60 |
my_metadata.cps = get_chunks_per_sec(); |
61 |
} |
62 |
|
63 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) { |
64 |
|
65 |
double t,p1,p2;
|
66 |
t = *((const double *)tin); |
67 |
p1 = *((const double *)p1in); |
68 |
p2 = *((const double *)p2in); |
69 |
|
70 |
if (isnan(t) || (isnan(p1) && isnan(p2))) return 0; |
71 |
else if (isnan(p1)) return 2; |
72 |
else if (isnan(p2)) return 1; |
73 |
else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2; |
74 |
|
75 |
} |
76 |
|
77 |
int topologyInit(struct nodeID *myID, const char *config) |
78 |
{ |
79 |
int i;
|
80 |
for (i=0;i<2;i++) |
81 |
bind_msg_type(mTypes[i]); |
82 |
update_metadata(); |
83 |
me = myID; |
84 |
return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0)); |
85 |
} |
86 |
|
87 |
void topologyShutdown(void) |
88 |
{ |
89 |
} |
90 |
|
91 |
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size) |
92 |
{ |
93 |
// TODO: check this!! Just to use this function to bootstrap ncast...
|
94 |
struct metadata m = {0}; //TODO: check what metadata option should mean |
95 |
|
96 |
if (counter < TMAN_MAX_IDLE)
|
97 |
return topAddNeighbour(neighbour,&m,sizeof(m)); |
98 |
else return tmanAddNeighbour(neighbour,&m,sizeof(m)); |
99 |
} |
100 |
|
101 |
static int topoParseData(const uint8_t *buff, int len) |
102 |
{ |
103 |
int res = -1,ncs = 0,msize; |
104 |
const struct nodeID **n; const void *m; |
105 |
if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) { |
106 |
res = topParseData(buff,len); |
107 |
// if (counter <= TMAN_MAX_IDLE)
|
108 |
// counter++;
|
109 |
} |
110 |
if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN)) |
111 |
{ |
112 |
n = topGetNeighbourhood(&ncs); |
113 |
if (ncs) {
|
114 |
m = topGetMetadata(&msize); |
115 |
res = tmanParseData(buff,len,n,ncs,m,msize); |
116 |
} |
117 |
} |
118 |
return res;
|
119 |
} |
120 |
|
121 |
static const struct nodeID **topoGetNeighbourhood(int *n) |
122 |
{ |
123 |
int i; double d; |
124 |
if (counter > TMAN_MAX_IDLE) {
|
125 |
uint8_t *mdata; int msize;
|
126 |
*n = tmanGetNeighbourhoodSize(); |
127 |
if (neighbors) free(neighbors);
|
128 |
neighbors = calloc(*n,sizeof(struct nodeID *)); |
129 |
tmanGetMetadata(&msize); |
130 |
mdata = calloc(*n,msize); |
131 |
tmanGivePeers(*n,neighbors,(void *)mdata);
|
132 |
|
133 |
if (cnt % TMAN_LOG_EVERY == 0) { |
134 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
|
135 |
for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) { |
136 |
d = *((double *)(mdata+i*msize));
|
137 |
fprintf(stderr,"abouttopublish,%s,",node_addr(me));
|
138 |
fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
|
139 |
} |
140 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
|
141 |
} |
142 |
|
143 |
free(mdata); |
144 |
return (const struct nodeID **)neighbors; |
145 |
} |
146 |
else
|
147 |
return topGetNeighbourhood(n);
|
148 |
} |
149 |
|
150 |
static void topoAddToBL (struct nodeID *id) |
151 |
{ |
152 |
if (counter >= TMAN_MAX_IDLE)
|
153 |
tmanAddToBlackList(id); |
154 |
// else
|
155 |
topAddToBlackList(id); |
156 |
} |
157 |
|
158 |
void add_peer(const struct nodeID *id, const struct metadata *m) |
159 |
{ |
160 |
dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1); |
161 |
peerset_add_peer(pset, id); |
162 |
if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
|
163 |
/* add measures here */
|
164 |
add_measures(id); |
165 |
send_bmap(id); |
166 |
} |
167 |
|
168 |
void remove_peer(const struct nodeID *id) |
169 |
{ |
170 |
dprintf("Removing %s from neighbourhood!\n", node_addr(id));
|
171 |
/* add measures here */
|
172 |
delete_measures(id); |
173 |
peerset_remove_peer(pset, id); |
174 |
} |
175 |
|
176 |
//get the rtt. Currenly only MONL version is supported
|
177 |
static double get_rtt_of(const struct nodeID* n){ |
178 |
#ifdef MONL
|
179 |
return get_rtt(n);
|
180 |
#else
|
181 |
return NAN;
|
182 |
#endif
|
183 |
} |
184 |
|
185 |
//returns: 1:yes 0:no -1:unknown
|
186 |
int desiredness(const struct nodeID* n) { |
187 |
double rtt = get_rtt_of(n);
|
188 |
|
189 |
return isnan(rtt) ? -1 : ((rtt <= desired_rtt) ? 1 : 0); |
190 |
} |
191 |
|
192 |
bool is_desired(const struct nodeID* n) { |
193 |
return (desiredness(n) == 1); |
194 |
} |
195 |
|
196 |
// The usual shuffle
|
197 |
static void shuffle(void *base, size_t nmemb, size_t size) { |
198 |
int i;
|
199 |
unsigned char t[size]; |
200 |
unsigned char* b = base; |
201 |
|
202 |
for (i = nmemb - 1; i > 0; i--) { |
203 |
int newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1); |
204 |
memcpy(t, b + size * newpos, size); |
205 |
memmove(b + size * newpos, b + size * i, size); |
206 |
memcpy(b + size * i, t, size); |
207 |
} |
208 |
} |
209 |
|
210 |
static void nidset_shuffle(const struct nodeID **base, size_t nmemb) { |
211 |
shuffle(base, nmemb, sizeof(struct nodeID *)); |
212 |
} |
213 |
|
214 |
static int nidset_filter(const struct nodeID **dst, size_t *dst_size, const struct nodeID **src, size_t src_size, bool(*f)(const struct nodeID *)) { |
215 |
size_t i; |
216 |
size_t max_size = *dst_size; |
217 |
*dst_size = 0;
|
218 |
|
219 |
for (i = 0; i < src_size; i++) { |
220 |
if (f(src[i])) {
|
221 |
if (*dst_size < max_size) {
|
222 |
dst[(*dst_size)++] = src[i]; |
223 |
} else {
|
224 |
return -1; |
225 |
} |
226 |
} |
227 |
} |
228 |
|
229 |
return 0; |
230 |
} |
231 |
|
232 |
// B \ A
|
233 |
static int nidset_complement(const struct nodeID **dst, size_t *dst_size, const struct nodeID **bs, size_t bs_size, const struct nodeID **as, size_t as_size) { |
234 |
size_t i, j; |
235 |
size_t max_size = *dst_size; |
236 |
*dst_size = 0;
|
237 |
|
238 |
for (i = 0; i < bs_size; i++) { |
239 |
for (j = 0; j < as_size; j++) { |
240 |
if (bs[i] == as[j]) {
|
241 |
break;
|
242 |
} |
243 |
} |
244 |
if (j >= as_size) {
|
245 |
if (*dst_size < max_size) {
|
246 |
dst[(*dst_size)++] = bs[i]; |
247 |
} else {
|
248 |
return -1; |
249 |
} |
250 |
} |
251 |
} |
252 |
|
253 |
return 0; |
254 |
} |
255 |
|
256 |
static bool nidset_find(size_t *i, const struct nodeID **ids, size_t ids_size, const struct nodeID *id) { |
257 |
for (*i = 0; *i < ids_size; (*i)++) { |
258 |
if (ids[*i] == id) {
|
259 |
return true; |
260 |
} |
261 |
} |
262 |
return false; |
263 |
} |
264 |
|
265 |
static int nidset_add(const struct nodeID **dst, size_t *dst_size, const struct nodeID **as, size_t as_size, const struct nodeID **bs, size_t bs_size) { |
266 |
size_t i; |
267 |
size_t max_size = *dst_size; |
268 |
|
269 |
i = MIN(as_size, max_size); |
270 |
memcpy(dst, as, i * sizeof(struct nodeID*)); |
271 |
*dst_size = i; |
272 |
if (i < as_size) return -1; |
273 |
|
274 |
i = MIN(bs_size, max_size - *dst_size); |
275 |
memcpy(dst + *dst_size , bs, i * sizeof(struct nodeID*)); |
276 |
*dst_size += i; |
277 |
if (i < bs_size) return -1; |
278 |
|
279 |
return 0; |
280 |
} |
281 |
|
282 |
static int nidset_add_i(const struct nodeID **dst, size_t *dst_size, size_t max_size, const struct nodeID **as, size_t as_size) { |
283 |
size_t i; |
284 |
|
285 |
i = MIN(as_size, max_size - *dst_size); |
286 |
memcpy(dst + *dst_size , as, i * sizeof(struct nodeID*)); |
287 |
*dst_size += i; |
288 |
if (i < as_size) return -1; |
289 |
|
290 |
return 0; |
291 |
} |
292 |
|
293 |
// currently it just makes the peerset grow
|
294 |
void update_peers(struct nodeID *from, const uint8_t *buff, int len) |
295 |
{ |
296 |
int n_ids, metasize, i;
|
297 |
static const struct nodeID **ids; |
298 |
static const struct metadata *metas; |
299 |
struct peer *peers;
|
300 |
struct timeval tnow, told;
|
301 |
|
302 |
// dprintf("Update peers: topo_msg:%d, ",len);
|
303 |
// if (from) {
|
304 |
// dprintf("from:%s, ",node_addr(from));
|
305 |
// if (peerset_check(pset, from) < 0) {
|
306 |
// topAddNeighbour(from, NULL, 0); //@TODO: this is agressive
|
307 |
// if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
|
308 |
// add_peer(from);
|
309 |
// }
|
310 |
// }
|
311 |
// }
|
312 |
|
313 |
if (cnt++ % 100 == 0) { |
314 |
update_metadata(); |
315 |
if (counter > TMAN_MAX_IDLE) {
|
316 |
tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
|
317 |
} |
318 |
} |
319 |
|
320 |
topoParseData(buff, len); |
321 |
|
322 |
if (!buff) {
|
323 |
reg_neigh_size(peerset_size(pset)); |
324 |
return;
|
325 |
} |
326 |
|
327 |
fprintf(stderr,"Topo modify start\n");
|
328 |
peers = peerset_get_peers(pset); |
329 |
for (i = 0; i < peerset_size(pset); i++) { |
330 |
fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
|
331 |
} |
332 |
|
333 |
ids = topoGetNeighbourhood(&n_ids); //TODO handle both tman and topo
|
334 |
metas = topGetMetadata(&metasize); //TODO: check metasize
|
335 |
for(i = 0; i < n_ids; i++) { |
336 |
if(peerset_check(pset, ids[i]) < 0) { |
337 |
if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
|
338 |
add_peer(ids[i],&metas[i]); |
339 |
} else { //rotate neighbourhood |
340 |
if (rand()/((double)RAND_MAX + 1) < NEIGHBORHOOD_ROTATE_RATIO) { |
341 |
add_peer(ids[i],&metas[i]); |
342 |
} |
343 |
} |
344 |
} |
345 |
} |
346 |
|
347 |
if timerisset(&tout_bmap) {
|
348 |
gettimeofday(&tnow, NULL);
|
349 |
timersub(&tnow, &tout_bmap, &told); |
350 |
peers = peerset_get_peers(pset); |
351 |
for (i = 0; i < peerset_size(pset); i++) { |
352 |
if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
|
353 |
( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <) ) ) { |
354 |
//if (peerset_size(pset) > 1) { // avoid dropping our last link to the world
|
355 |
topoAddToBL(peers[i].id); |
356 |
remove_peer(peers[i--].id); |
357 |
//}
|
358 |
} |
359 |
} |
360 |
} |
361 |
|
362 |
|
363 |
n_ids = peerset_size(pset); |
364 |
{ |
365 |
int desired_part;
|
366 |
const struct nodeID *nodeids[n_ids], *desireds[n_ids], *selecteds[n_ids], *others[n_ids], *toremoves[n_ids]; |
367 |
size_t nodeids_size, desireds_size, selecteds_size, others_size, toremoves_size; |
368 |
nodeids_size = desireds_size = selecteds_size = others_size = toremoves_size = n_ids; |
369 |
|
370 |
//compose list of nodeids
|
371 |
peers = peerset_get_peers(pset); |
372 |
for (i = 0; i < n_ids; i++) { |
373 |
nodeids[i] = peers[i].id; |
374 |
} |
375 |
|
376 |
// select the alpha_target portion of desired peers
|
377 |
desired_part = alpha_target * NEIGHBORHOOD_TARGET_SIZE; |
378 |
nidset_filter(desireds, &desireds_size, nodeids, nodeids_size, is_desired); |
379 |
nidset_shuffle(desireds, desireds_size); |
380 |
selecteds_size = MIN(desireds_size,desired_part); |
381 |
memcpy(selecteds, desireds, selecteds_size * sizeof(selecteds[0])); |
382 |
|
383 |
// random from the rest
|
384 |
nidset_complement(others, &others_size, nodeids, nodeids_size, selecteds, selecteds_size); |
385 |
nidset_shuffle(others, others_size); |
386 |
nidset_add_i(selecteds, &selecteds_size, n_ids, others, NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size); |
387 |
|
388 |
// finally, remove those not needed
|
389 |
fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
|
390 |
nidset_complement(toremoves, &toremoves_size, nodeids, nodeids_size, selecteds, selecteds_size); |
391 |
for (i = 0; i < toremoves_size; i++) { |
392 |
fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
|
393 |
remove_peer(toremoves[i]); |
394 |
} |
395 |
fprintf(stderr,"Topo remove end\n");
|
396 |
} |
397 |
|
398 |
reg_neigh_size(peerset_size(pset)); |
399 |
} |
400 |
|
401 |
struct peer *nodeid_to_peer(const struct nodeID* id, int reg) |
402 |
{ |
403 |
struct peer *p = peerset_get_peer(pset, id);
|
404 |
if (!p) {
|
405 |
//fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
|
406 |
if (reg) {
|
407 |
add_peer(id,NULL);
|
408 |
p = peerset_get_peer(pset,id); |
409 |
} |
410 |
} |
411 |
|
412 |
return p;
|
413 |
} |
414 |
|
415 |
int peers_init(void) |
416 |
{ |
417 |
fprintf(stderr,"peers_init\n");
|
418 |
pset = peerset_init(0);
|
419 |
return pset ? 1 : 0; |
420 |
} |
421 |
|
422 |
struct peerset *get_peers(void) |
423 |
{ |
424 |
return pset;
|
425 |
} |