streamers / topology.c @ 004f49eb
History | View | Annotate | Download (6.63 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Csaba Kiraly
|
3 |
* Copyright (c) 2010 Luca Abeni
|
4 |
*
|
5 |
* This is free software; see gpl-3.0.txt
|
6 |
*/
|
7 |
#include <stdint.h> |
8 |
#include <stdio.h> |
9 |
#include <sys/time.h> |
10 |
#include <time.h> |
11 |
#include <stdlib.h> |
12 |
|
13 |
#include <math.h> |
14 |
#include <net_helper.h> |
15 |
#include <peerset.h> |
16 |
#include <peer.h> |
17 |
#include <grapes_msg_types.h> |
18 |
#include <topmanager.h> |
19 |
#include <tman.h> |
20 |
|
21 |
#include "topology.h" |
22 |
#include "streaming.h" |
23 |
#include "dbg.h" |
24 |
#include "measures.h" |
25 |
|
26 |
int NEIGHBORHOOD_TARGET_SIZE = 20; |
27 |
double NEIGHBORHOOD_ROTATE_RATIO = 1.0; |
28 |
#define TMAN_MAX_IDLE 10 |
29 |
#define TMAN_LOG_EVERY 1000 |
30 |
|
31 |
static struct peerset *pset; |
32 |
static struct timeval tout_bmap = {0, 0}; |
33 |
static int counter = 0; |
34 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in); |
35 |
static tmanRankingFunction rankFunct = simpleRanker;
|
36 |
static double my_metadata; |
37 |
static int cnt = 0; |
38 |
static struct nodeID *me = NULL; |
39 |
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN}; |
40 |
static struct nodeID ** neighbors; |
41 |
|
42 |
static void update_metadata(void) { |
43 |
|
44 |
#ifndef MONL
|
45 |
my_metadata = 1 + (((double)rand() / (double)RAND_MAX)*1000); |
46 |
#endif
|
47 |
#ifdef MONL
|
48 |
my_metadata = get_receive_delay(); |
49 |
#endif
|
50 |
} |
51 |
|
52 |
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) { |
53 |
|
54 |
double t,p1,p2;
|
55 |
t = *((const double *)tin); |
56 |
p1 = *((const double *)p1in); |
57 |
p2 = *((const double *)p2in); |
58 |
|
59 |
if (isnan(t) || (isnan(p1) && isnan(p2))) return 0; |
60 |
else if (isnan(p1)) return 2; |
61 |
else if (isnan(p2)) return 1; |
62 |
else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2; |
63 |
|
64 |
} |
65 |
|
66 |
int topologyInit(struct nodeID *myID, const char *config) |
67 |
{ |
68 |
int i;
|
69 |
for (i=0;i<2;i++) |
70 |
bind_msg_type(mTypes[i]); |
71 |
update_metadata(); |
72 |
me = myID; |
73 |
return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0)); |
74 |
} |
75 |
|
76 |
void topologyShutdown(void) |
77 |
{ |
78 |
} |
79 |
|
80 |
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size) |
81 |
{ |
82 |
// TODO: check this!! Just to use this function to bootstrap ncast...
|
83 |
if (counter < TMAN_MAX_IDLE)
|
84 |
return topAddNeighbour(neighbour,metadata,metadata_size);
|
85 |
else return tmanAddNeighbour(neighbour,metadata,metadata_size); |
86 |
} |
87 |
|
88 |
static int topoParseData(const uint8_t *buff, int len) |
89 |
{ |
90 |
int res = -1,ncs = 0,msize; |
91 |
const struct nodeID **n; const void *m; |
92 |
if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) { |
93 |
res = topParseData(buff,len); |
94 |
// if (counter <= TMAN_MAX_IDLE)
|
95 |
// counter++;
|
96 |
} |
97 |
if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN)) |
98 |
{ |
99 |
n = topGetNeighbourhood(&ncs); |
100 |
if (ncs) {
|
101 |
m = topGetMetadata(&msize); |
102 |
res = tmanParseData(buff,len,n,ncs,m,msize); |
103 |
} |
104 |
} |
105 |
return res;
|
106 |
} |
107 |
|
108 |
static const struct nodeID **topoGetNeighbourhood(int *n) |
109 |
{ |
110 |
int i; double d; |
111 |
if (counter > TMAN_MAX_IDLE) {
|
112 |
uint8_t *mdata; int msize;
|
113 |
*n = tmanGetNeighbourhoodSize(); |
114 |
if (neighbors) free(neighbors);
|
115 |
neighbors = calloc(*n,sizeof(struct nodeID *)); |
116 |
tmanGetMetadata(&msize); |
117 |
mdata = calloc(*n,msize); |
118 |
tmanGivePeers(*n,neighbors,(void *)mdata);
|
119 |
|
120 |
if (cnt % TMAN_LOG_EVERY == 0) { |
121 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata);
|
122 |
for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) { |
123 |
d = *((double *)(mdata+i*msize));
|
124 |
fprintf(stderr,"abouttopublish,%s,",node_addr(me));
|
125 |
fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
|
126 |
} |
127 |
fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
|
128 |
} |
129 |
|
130 |
free(mdata); |
131 |
return (const struct nodeID **)neighbors; |
132 |
} |
133 |
else
|
134 |
return topGetNeighbourhood(n);
|
135 |
} |
136 |
|
137 |
static void topoAddToBL (struct nodeID *id) |
138 |
{ |
139 |
if (counter >= TMAN_MAX_IDLE)
|
140 |
tmanAddToBlackList(id); |
141 |
// else
|
142 |
topAddToBlackList(id); |
143 |
} |
144 |
|
145 |
void add_peer(struct nodeID *id) |
146 |
{ |
147 |
dprintf("Adding %s to neighbourhood!\n", node_addr(id));
|
148 |
peerset_add_peer(pset, id); |
149 |
/* add measures here */
|
150 |
add_measures(id); |
151 |
send_bmap(id); |
152 |
} |
153 |
|
154 |
void remove_peer(struct nodeID *id) |
155 |
{ |
156 |
dprintf("Removing %s from neighbourhood!\n", node_addr(id));
|
157 |
/* add measures here */
|
158 |
delete_measures(id); |
159 |
peerset_remove_peer(pset, id); |
160 |
} |
161 |
|
162 |
// currently it just makes the peerset grow
|
163 |
void update_peers(struct nodeID *from, const uint8_t *buff, int len) |
164 |
{ |
165 |
int n_ids, i;
|
166 |
static const struct nodeID **ids; |
167 |
struct peer *peers;
|
168 |
struct timeval tnow, told;
|
169 |
|
170 |
// dprintf("Update peers: topo_msg:%d, ",len);
|
171 |
// if (from) {
|
172 |
// dprintf("from:%s, ",node_addr(from));
|
173 |
// if (peerset_check(pset, from) < 0) {
|
174 |
// topAddNeighbour(from, NULL, 0); //@TODO: this is agressive
|
175 |
// if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
|
176 |
// add_peer(from);
|
177 |
// }
|
178 |
// }
|
179 |
// }
|
180 |
|
181 |
if (cnt++ % 10000 == 0) { |
182 |
update_metadata(); |
183 |
if (counter > TMAN_MAX_IDLE) {
|
184 |
tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
|
185 |
} |
186 |
} |
187 |
|
188 |
topoParseData(buff, len); |
189 |
|
190 |
if (!buff) {
|
191 |
reg_neigh_size(peerset_size(pset)); |
192 |
return;
|
193 |
} |
194 |
|
195 |
ids = topoGetNeighbourhood(&n_ids); |
196 |
for(i = 0; i < n_ids; i++) { |
197 |
if(peerset_check(pset, ids[i]) < 0) { |
198 |
if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
|
199 |
add_peer(ids[i]); |
200 |
} else { //rotate neighbourhood |
201 |
if (rand()/((double)RAND_MAX + 1) < NEIGHBORHOOD_ROTATE_RATIO) { |
202 |
add_peer(ids[i]); |
203 |
} |
204 |
} |
205 |
} |
206 |
} |
207 |
|
208 |
if timerisset(&tout_bmap) {
|
209 |
gettimeofday(&tnow, NULL);
|
210 |
timersub(&tnow, &tout_bmap, &told); |
211 |
peers = peerset_get_peers(pset); |
212 |
for (i = 0; i < peerset_size(pset); i++) { |
213 |
if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
|
214 |
( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <) ) ) { |
215 |
//if (peerset_size(pset) > 1) { // avoid dropping our last link to the world
|
216 |
topoAddToBL(peers[i].id); |
217 |
remove_peer(peers[i--].id); |
218 |
//}
|
219 |
} |
220 |
} |
221 |
} |
222 |
|
223 |
while (NEIGHBORHOOD_TARGET_SIZE && peerset_size(pset) > NEIGHBORHOOD_TARGET_SIZE) { //reduce back neighbourhood to target size |
224 |
peers = peerset_get_peers(pset); |
225 |
remove_peer(peers[rand() % peerset_size(pset)].id); |
226 |
} |
227 |
|
228 |
reg_neigh_size(peerset_size(pset)); |
229 |
} |
230 |
|
231 |
struct peer *nodeid_to_peer(const struct nodeID* id, int reg) |
232 |
{ |
233 |
struct peer *p = peerset_get_peer(pset, id);
|
234 |
if (!p) {
|
235 |
//fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
|
236 |
if (reg) {
|
237 |
topoAddNeighbour(id, NULL, 0); |
238 |
add_peer(id); |
239 |
p = peerset_get_peer(pset,id); |
240 |
} |
241 |
} |
242 |
|
243 |
return p;
|
244 |
} |
245 |
|
246 |
int peers_init(void) |
247 |
{ |
248 |
fprintf(stderr,"peers_init\n");
|
249 |
pset = peerset_init(0);
|
250 |
return pset ? 1 : 0; |
251 |
} |
252 |
|
253 |
struct peerset *get_peers(void) |
254 |
{ |
255 |
return pset;
|
256 |
} |