nepatest_popbabel / test_code / dummyrouting.py @ 60ba786f
History | View | Annotate | Download (13.8 KB)
1 |
|
---|---|
2 |
import sys |
3 |
sys.path.append('../')
|
4 |
from network_builder import * |
5 |
from os import kill, path, makedirs |
6 |
from matplotlib.pyplot import ion |
7 |
from collections import defaultdict |
8 |
import random |
9 |
import time |
10 |
import ast |
11 |
import networkx as nx |
12 |
|
13 |
import signal |
14 |
|
15 |
from test_generic import * |
16 |
|
17 |
class OptimizeGraphChoice: |
18 |
""" helper class to optimize the choice of the
|
19 |
graphs to run the simulations """
|
20 |
|
21 |
def __init__(self, failures=1000000): |
22 |
|
23 |
# just an initialization number > of the size of the graph
|
24 |
self.failures = failures
|
25 |
self.shuffle = False |
26 |
|
27 |
def compute_topology_failure_maps(self, graph_dict, min_run_number): |
28 |
failure_map = defaultdict(list)
|
29 |
topo_failures = {} |
30 |
for topo, graph in graph_dict.items(): |
31 |
failure_list = self.get_emulation_runs_per_topology(graph)
|
32 |
failure_number = len(failure_list)
|
33 |
for idx in range(failure_number): |
34 |
failure_map[idx].append(topo) |
35 |
topo_failures[topo] = failure_number |
36 |
|
37 |
max_fail = len(
|
38 |
filter(lambda z: z>min_run_number, |
39 |
[len(y[1]) for y in sorted( |
40 |
failure_map.items(), key = lambda x: x[0])]) |
41 |
) |
42 |
print "The maximum number of failures avilable with " |
43 |
print min_run_number, "run(s) is ", max_fail |
44 |
|
45 |
# file -> failure list
|
46 |
file_dict = defaultdict(list)
|
47 |
|
48 |
min_fail = min(max_fail, self.failures) |
49 |
|
50 |
#print filter(lambda z: z>min_run_number,
|
51 |
# [len(y[1]) for y in sorted(
|
52 |
# failure_map.items(), key = lambda x: x[0])])
|
53 |
|
54 |
failure_counter = [0]*min_fail
|
55 |
|
56 |
for (idx, file_list) in \ |
57 |
sorted(failure_map.items(), key = lambda x: x[0])[:min_fail]: |
58 |
if idx >= min_fail:
|
59 |
break
|
60 |
if self.shuffle: |
61 |
# if i shuffle i can not compare opt/non-opt simulations
|
62 |
random.shuffle(file_list) |
63 |
for f in file_list: |
64 |
if failure_counter[idx] >= min_run_number:
|
65 |
break
|
66 |
if f in file_dict: |
67 |
continue
|
68 |
rem_runs = range(idx, min(topo_failures[f], min_fail)) |
69 |
file_dict[f] = rem_runs |
70 |
for r in rem_runs: |
71 |
failure_counter[r] += 1
|
72 |
|
73 |
for f,runs in sorted(file_dict.items(), key = lambda x: len(x[1])): |
74 |
print f, [1 if x in runs else 0 for x in range(min_fail)] |
75 |
|
76 |
return file_dict
|
77 |
|
78 |
def get_emulation_runs_per_topology(self, graph): |
79 |
""" return a list of nodes ordered by centrality that can be removed.
|
80 |
1) identify the 2-core of the network (i.e. remove leaf nodes up to
|
81 |
when there are any)
|
82 |
2) from the list of nodes in the 2-core, remove the articulation points
|
83 |
(i.e. nodes that will partition the 2-core in multiple disconnected
|
84 |
components)
|
85 |
3) for each remaining node, find the tree that is rooted on it (i.e.
|
86 |
remove node from the graph, all the disconnected components
|
87 |
that remain form such tree).
|
88 |
4) each node + tree can fail all together.
|
89 |
|
90 |
In this script this is necessary because not all the graphs support
|
91 |
the same number or run_ids. Some graph may allow 10 runs (10 failures)
|
92 |
while other 8 failures. I pre-parse the topology to identify this number,
|
93 |
then i run the simulations with the sufficient number of graphs that
|
94 |
allow me to have a minimum number of repetitions for each run_id (for each
|
95 |
failure). The minimum number is taken from the runs parameter in
|
96 |
command line """
|
97 |
|
98 |
# FIXME remove self loops from original graphs
|
99 |
graph.remove_edges_from(graph.selfloop_edges()) |
100 |
two_core = nx.k_core(graph, 2)
|
101 |
art = [n for n in nx.articulation_points(two_core)] |
102 |
cent_list = sorted(
|
103 |
[n for n in nx.betweenness_centrality(two_core).items() ], |
104 |
key = lambda x: -x[1]) |
105 |
|
106 |
fail_candidates = [n[0] for n in cent_list if n[0] not in art] |
107 |
|
108 |
fallible_nodes = [] |
109 |
for n in fail_candidates: |
110 |
gg = graph.copy() |
111 |
gg.remove_node(n) |
112 |
comp = list(nx.connected_components(gg))
|
113 |
isolated_nodes = [x for component in comp[1:] for x in component] |
114 |
fallible_nodes.append([n] + isolated_nodes) |
115 |
|
116 |
return fallible_nodes
|
117 |
|
118 |
|
119 |
class dummyRoutingTest(MininetTest): |
120 |
|
121 |
def __init__(self, mininet, duration=10): |
122 |
|
123 |
super(dummyRoutingTest, self).__init__(mininet, path, duration) |
124 |
self.centList = []
|
125 |
|
126 |
|
127 |
def launchSniffer(self, host): |
128 |
|
129 |
cmd = "tcpdump -i any -n -X -e "
|
130 |
|
131 |
logfile = self.prefix + host.name + "-dump.log" |
132 |
|
133 |
params = {} |
134 |
params['>'] = logfile
|
135 |
params['2>'] = logfile
|
136 |
|
137 |
|
138 |
return self.bgCmd(host, True, cmd, |
139 |
*reduce(lambda x, y: x + y, params.items())) |
140 |
|
141 |
|
142 |
def launchdummyRouting(self, host, args): |
143 |
|
144 |
cmd = "../dummy_routing_protocol/routingdaemon.py " + \
|
145 |
args |
146 |
|
147 |
logfile = self.prefix + host.name + ".log" |
148 |
|
149 |
params = {} |
150 |
params['>'] = logfile
|
151 |
params['2>'] = logfile
|
152 |
|
153 |
|
154 |
return self.bgCmd(host, True, cmd, |
155 |
*reduce(lambda x, y: x + y, params.items())) |
156 |
|
157 |
def runTest(self): |
158 |
|
159 |
info("*** Launching dummyRouting test\n")
|
160 |
info("Data folder: "+self.prefix+"\n") |
161 |
|
162 |
run_list = [] |
163 |
|
164 |
if self.stopAllNodes: |
165 |
if type(self.stopAllNodes) == int: |
166 |
self.centList = self.getCentrality()[:self.stopAllNodes] |
167 |
run_list = range(self.stopAllNodes) |
168 |
elif type(self.stopAllNodes) == list: |
169 |
for idx in self.stopAllNodes: |
170 |
self.centList.append(self.getCentrality()[idx]) |
171 |
run_list = self.stopAllNodes
|
172 |
|
173 |
for runid in run_list: |
174 |
info("\nStarting run " + str(runid) + "\n") |
175 |
self.runId = str(runid) |
176 |
if self.stopAllNodes: |
177 |
self.nodeCrashed = self.centList.pop(0) |
178 |
|
179 |
if not self.startRun(): |
180 |
# some times process are not killed in time, UDP
|
181 |
# ports are still occupied and the next run can not
|
182 |
# start correctly. I kill everything, wait some time, try
|
183 |
# to restart. If something still goes wrong i stop the emulation
|
184 |
self.killAll()
|
185 |
time.sleep(10)
|
186 |
info("\nWARNING: run_id " + str(runid) + " could not start, retrying...\n") |
187 |
if not self.startRun(): |
188 |
error("\nERROR: run_id " + str(runid) + " could not start!" + \ |
189 |
"please check the logs\n")
|
190 |
sys.exit(1)
|
191 |
|
192 |
|
193 |
|
194 |
eventDict = { |
195 |
self.startLog:["Start logging", self.sendSignal, |
196 |
{"sig":signal.SIGUSR1}],
|
197 |
self.stopNode:["Stopping node(s) " + str(self.nodeCrashed) + " ", |
198 |
self.sendSignal, {"sig":signal.SIGTSTP, |
199 |
"hostName":self.nodeCrashed}], |
200 |
self.stopLog:["Stop logging", self.sendSignal, |
201 |
{"sig":signal.SIGUSR1}]}
|
202 |
|
203 |
eventList = [] |
204 |
relativeTime = 0
|
205 |
for e in sorted(eventDict.keys()): |
206 |
if e > 0: |
207 |
data = eventDict[e] |
208 |
eventList.append([e - relativeTime] + data) |
209 |
relativeTime += (e - relativeTime) |
210 |
|
211 |
waitTime = self.duration - relativeTime
|
212 |
|
213 |
for event in eventList: |
214 |
sleep(event[0])
|
215 |
print event
|
216 |
event[2](**event[3]) |
217 |
info(event[1] + str(time.time()) + "\n") |
218 |
sleep(waitTime) |
219 |
for pid in self.pendingProc.keys(): |
220 |
self.sendSig(pid, signal.SIGTERM)
|
221 |
time.sleep(2)
|
222 |
# in case some process got stuck:
|
223 |
self.killAll()
|
224 |
time.sleep(2)
|
225 |
#sendSignal(signal.SIGUSR2)
|
226 |
#if self.startLog > 0:
|
227 |
# duration -= self.startLog
|
228 |
# sleep(self.startLog)
|
229 |
# print "XX", self.startLog, duration, time.time()
|
230 |
# # this is interpreted by the daemons as
|
231 |
# # "start (or stop) logging"
|
232 |
# self.sendSignal(signal.SIGUSR1)
|
233 |
# info("\nStart logging now!\n")
|
234 |
#if self.stopNode > 0:
|
235 |
# crashTime = self.stopNode - self.startLog
|
236 |
# duration -= crashTime
|
237 |
# sleep(crashTime)
|
238 |
# print "XX", self.stopNode, duration, time.time()
|
239 |
# # this is interpreted as "crash"
|
240 |
# self.sendSignal(signal.SIGTSTP, self.nodeCrashed)
|
241 |
# info("\nSent crash signal to node " + str(self.nodeCrashed))
|
242 |
#if self.stopLog > 0:
|
243 |
# stopTime = self.stopLog - (self.startLog + self.stopNode)
|
244 |
# duration -= stopTime
|
245 |
# print "XX", stopTime, duration, time.time()
|
246 |
# sleep(stopTime)
|
247 |
# self.sendSignal(signal.SIGUSR1)
|
248 |
# info("\nStop logging now!\n")
|
249 |
#print "XX", time.time(), duration
|
250 |
#sleep(duration)
|
251 |
## this is interpreted by the daemons as
|
252 |
## "restart a new run"
|
253 |
#self.sendSignal(signal.SIGUSR2)
|
254 |
|
255 |
|
256 |
|
257 |
|
258 |
def getCentrality(self): |
259 |
o = OptimizeGraphChoice() |
260 |
return o.get_emulation_runs_per_topology(self.graph) |
261 |
|
262 |
def startRun(self): |
263 |
|
264 |
rNode = ""
|
265 |
hostList = self.getAllHosts()
|
266 |
|
267 |
if rNode:
|
268 |
info("\nChosen node " + str(rNode) + " to fail\n") |
269 |
|
270 |
for h in hostList: |
271 |
args = " --runid=" + self.runId |
272 |
if self.logInterval != "": |
273 |
args += " " + self.logInterval |
274 |
if self.verbose != "": |
275 |
args += " " + self.verbose |
276 |
if self.centralityTuning != "": |
277 |
args += " " + self.centralityTuning |
278 |
|
279 |
launch_pid = self.launchdummyRouting(h, args)
|
280 |
|
281 |
if self.dump: |
282 |
self.launchSniffer(h)
|
283 |
|
284 |
if not self.nodeCrashed and rNode: |
285 |
self.nodeCrashed = [rNode]
|
286 |
return launch_pid
|
287 |
|
288 |
def sendSignal(self, sig, hostName=""): |
289 |
for pid, h in self.pendingProc.items(): |
290 |
if hostName:
|
291 |
for host in hostName: |
292 |
if host == h.name:
|
293 |
print "sending signal to host:", host, ", pid", pid |
294 |
self.sendSig(pid, sig)
|
295 |
# send to all
|
296 |
else:
|
297 |
self.sendSig(pid, sig)
|
298 |
|
299 |
|
300 |
def parseTime(self, timeString): |
301 |
|
302 |
retString = ""
|
303 |
if timeString.endswith('s'): |
304 |
retString = timeString[:-1]
|
305 |
elif timeString.endswith('m'): |
306 |
retString = int(timeString[:-1])*60 |
307 |
else:
|
308 |
retString = timeString |
309 |
return str(retString) |
310 |
|
311 |
class dummyRoutingRandomTest(dummyRoutingTest): |
312 |
|
313 |
def __init__(self, mininet, name, args): |
314 |
|
315 |
self.graph = mininet.gg
|
316 |
if "dumpWithTCPDump" in args.keys(): |
317 |
self.dump = True |
318 |
else:
|
319 |
self.dump = False |
320 |
# Doesn't work. If processes are started one after the other
|
321 |
# there is misalignment in the relative log time. I use
|
322 |
# a signal instead.
|
323 |
#if "startLog" in args.keys():
|
324 |
# self.startLog = "--startlog " + self.parseTime(args["startLog"])
|
325 |
#else:
|
326 |
# self.startLog = ""
|
327 |
|
328 |
if "startLog" in args.keys(): |
329 |
self.startLog = float(self.parseTime(args["startLog"])) |
330 |
else:
|
331 |
self.startLog = -1 |
332 |
|
333 |
if "stopLog" in args.keys(): |
334 |
self.stopLog = float(self.parseTime(args["stopLog"])) |
335 |
else:
|
336 |
self.stopLog = -1 |
337 |
|
338 |
if "logInterval" in args.keys(): |
339 |
self.logInterval = "--loginterval " \ |
340 |
+ self.parseTime(args["logInterval"]) |
341 |
else:
|
342 |
self.logInterval = "" |
343 |
|
344 |
if "verbose" in args.keys(): |
345 |
self.verbose = "-v " |
346 |
else:
|
347 |
self.verbose = "" |
348 |
|
349 |
if "centralityTuning" in args.keys(): |
350 |
self.centralityTuning = "-c " |
351 |
else:
|
352 |
self.centralityTuning = "" |
353 |
|
354 |
if "stopNode" in args.keys(): |
355 |
self.stopNode = int(self.parseTime(args["stopNode"])) |
356 |
else:
|
357 |
self.stopNode = -1 |
358 |
|
359 |
if "nodeCrashed" in args.keys(): |
360 |
self.nodeCrashed = [args["nodeCrashed"]] |
361 |
else:
|
362 |
self.nodeCrashed = "" |
363 |
|
364 |
if "stopAllNodes" in args.keys(): |
365 |
info("Going to stop all the nodes in sequence")
|
366 |
# convert option in a python object
|
367 |
if args['stopAllNodes'] == '': |
368 |
# stop all the nodes
|
369 |
self.stopAllNodes = len(self.graph) |
370 |
else:
|
371 |
try:
|
372 |
s = ast.literal_eval(args["stopAllNodes"])
|
373 |
except ValueError: |
374 |
error("Option " + args["stopAllNodes"] + " is not valid") |
375 |
exit(1) |
376 |
if type(s) == int: |
377 |
if s <= 0: |
378 |
error("\nPlease stopAllNodes must be > 0\n")
|
379 |
sys.exit(1)
|
380 |
self.stopAllNodes = s
|
381 |
info("... limited to " + args["stopAllNodes"] + " node(s).") |
382 |
elif type(s) == list and s: |
383 |
self.stopAllNodes = s
|
384 |
info("... limited to the list of nodes: " + str(s)) |
385 |
else:
|
386 |
error("Option " + args["stopAllNodes"] + " is not valid") |
387 |
|
388 |
self.nodeCrashed = []
|
389 |
else:
|
390 |
self.stopAllNodes = False |
391 |
|
392 |
self.runId = 0 |
393 |
|
394 |
duration = int(self.parseTime(args["duration"])) |
395 |
|
396 |
super(dummyRoutingRandomTest, self).__init__(mininet, duration) |
397 |
self.localPrefix = os.path.basename(os.path.splitext(
|
398 |
mininet.gg_name)[0])
|
399 |
self.setPrefix(name + self.localPrefix) |
400 |
|
401 |
|