Statistics
| Branch: | Revision:

nepatest_popbabel / test_code / dummyrouting.py @ cafb67a9

History | View | Annotate | Download (13.8 KB)

1

    
2
import sys
3
sys.path.append('../')
4
from network_builder import *
5
from os import kill, path, makedirs
6
from matplotlib.pyplot import ion
7
from collections import defaultdict
8
import random 
9
import time
10
import ast
11
import networkx as nx
12
import code
13

    
14
import signal
15

    
16
from test_generic import *
17

    
18
class OptimizeGraphChoice:
19
    """ helper class to optimize the choice of the 
20
    graphs to run the simulations """
21

    
22
    def __init__(self, failures=1000000):
23

    
24
        # just an initialization number > of the size of the graph
25
        self.failures = failures
26
        self.shuffle = False
27

    
28
    def compute_topology_failure_maps(self, graph_dict, min_run_number):
29
        failure_map = defaultdict(list)
30
        topo_failures = {}
31
        for topo, graph in graph_dict.items():
32
            failure_list = self.get_emulation_runs_per_topology(graph)
33
            failure_number = len(failure_list)
34
            for idx in range(failure_number):
35
                failure_map[idx].append(topo)
36
                topo_failures[topo] = failure_number
37

    
38
        max_fail = len(
39
                filter(lambda z: z>min_run_number,
40
                [len(y[1]) for y in sorted(
41
                    failure_map.items(), key = lambda x: x[0])]) 
42
                )
43
        print "The maximum number of failures avilable with "
44
        print min_run_number, "run(s) is ", max_fail
45

    
46
        # file -> failure list
47
        file_dict = defaultdict(list)
48

    
49
        min_fail = min(max_fail, self.failures)
50

    
51
        #print filter(lambda z: z>min_run_number,
52
        #        [len(y[1]) for y in sorted(
53
        #            failure_map.items(), key = lambda x: x[0])]) 
54

    
55
        failure_counter = [0]*min_fail
56

    
57
        for (idx, file_list) in \
58
            sorted(failure_map.items(), key = lambda x: x[0])[:min_fail]:
59
                if idx >= min_fail:
60
                    break
61
                if self.shuffle:
62
                    # if i shuffle i can not compare opt/non-opt simulations
63
                    random.shuffle(file_list)
64
                for f in file_list:
65
                    if failure_counter[idx] >=  min_run_number:
66
                        break
67
                    if f in file_dict:
68
                        continue
69
                    rem_runs = range(idx, min(topo_failures[f], min_fail))
70
                    file_dict[f] = rem_runs
71
                    for r in rem_runs:
72
                        failure_counter[r] += 1
73

    
74
        for f,runs in sorted(file_dict.items(), key = lambda x: len(x[1])):
75
            print f, [1 if x in runs else 0 for x in range(min_fail)]
76

    
77
        return file_dict
78

    
79
    def get_emulation_runs_per_topology(self, graph):
80
        """ return a list of nodes ordered by centrality that can be removed.
81
        1) identify the 2-core of the network (i.e. remove leaf nodes up to
82
           when there are any)
83
        2) from the list of nodes in the 2-core, remove the articulation points
84
            (i.e. nodes that will partition the 2-core in multiple disconnected
85
            components)
86
        3) for each remaining node, find the tree that is rooted on it (i.e.
87
           remove node from the graph, all the disconnected components
88
           that remain form such tree). 
89
        4) each node + tree can fail all together.
90

91
        In this script this is necessary because not all the graphs support 
92
        the same number or run_ids. Some graph may allow 10 runs (10 failures)
93
        while other 8 failures. I pre-parse the topology to identify this number, 
94
        then i run the simulations with the sufficient number of graphs that 
95
        allow me to have a minimum number of repetitions for each run_id (for each
96
        failure). The minimum number is taken from the runs parameter in 
97
        command line """
98

    
99
        # FIXME remove self loops from original graphs
100
        graph.remove_edges_from(graph.selfloop_edges())
101
        two_core = nx.k_core(graph, 2)
102
        art = [n for n in nx.articulation_points(two_core)]
103
        cent_list = sorted(
104
                [n for n in nx.betweenness_centrality(two_core).items() ],
105
                key = lambda x: -x[1])
106

    
107
        fail_candidates = [n[0] for n in cent_list if n[0] not in art]
108

    
109
        fallible_nodes = []
110
        for n in fail_candidates:
111
            gg = graph.copy()
112
            gg.remove_node(n)
113
            comp = list(nx.connected_components(gg))
114
            isolated_nodes = [x for component in comp[1:] for x in component]
115
            fallible_nodes.append([n] + isolated_nodes) 
116

    
117
        return fallible_nodes
118

    
119

    
120
class dummyRoutingTest(MininetTest):
121

    
122
    def __init__(self, mininet, duration=10):
123

    
124
        super(dummyRoutingTest, self).__init__(mininet, path, duration)
125
        self.centList = []
126

    
127

    
128
    def launchSniffer(self, host):
129

    
130
        cmd = "tcpdump -i any -n -X -e "
131

    
132
        logfile = self.prefix + host.name + "-dump.log"
133

    
134
        params = {}
135
        params['>'] = logfile
136
        params['2>'] = logfile
137

    
138

    
139
        return self.bgCmd(host, True, cmd,
140
            *reduce(lambda x, y: x + y, params.items()))
141

    
142

    
143
    def launchdummyRouting(self, host,  args):
144

    
145
        cmd = "../dummy_routing_protocol/routingdaemon.py " + \
146
                args
147

    
148
        logfile = self.prefix + host.name + ".log"
149

    
150
        params = {}
151
        params['>'] = logfile
152
        params['2>'] = logfile
153

    
154

    
155
        return self.bgCmd(host, True, cmd,
156
            *reduce(lambda x, y: x + y, params.items()))
157

    
158
    def runTest(self):
159

    
160
        info("*** Launching dummyRouting test\n")
161
        info("Data folder: "+self.prefix+"\n")
162

    
163
        run_list = []
164

    
165
        if self.stopAllNodes:
166
            if type(self.stopAllNodes) == int:
167
                self.centList = self.getCentrality()[:self.stopAllNodes]
168
                run_list = range(self.stopAllNodes)
169
            elif type(self.stopAllNodes) == list:
170
                for idx in self.stopAllNodes:
171
                    self.centList.append(self.getCentrality()[idx])
172
                run_list = self.stopAllNodes
173

    
174
        for runid in run_list:
175
            info("\nStarting run " + str(runid) + "\n")
176
            self.runId = str(runid)
177
            if self.stopAllNodes:
178
                self.nodeCrashed = self.centList.pop(0)
179

    
180
            if not self.startRun():
181
                # some times process are not killed in time, UDP
182
                # ports are still occupied and the next run can not
183
                # start correctly. I kill everything, wait some time, try 
184
                # to restart. If something still goes wrong i stop the emulation
185
                self.killAll()
186
                time.sleep(10)
187
                info("\nWARNING: run_id " + str(runid) + " could not start, retrying...\n")
188
                if not self.startRun():
189
                    error("\nERROR: run_id " + str(runid) + " could not start!" + \
190
                            "please check the logs\n")
191
                    sys.exit(1)
192

    
193

    
194

    
195
            eventDict = {
196
                    self.startLog:["Start logging", self.sendSignal,
197
                        {"sig":signal.SIGUSR1}],
198
                    self.stopNode:["Stopping node(s) " + str(self.nodeCrashed) + " ",
199
                        self.sendSignal, {"sig":signal.SIGTSTP,
200
                        "hostName":self.nodeCrashed}],
201
                    self.stopLog:["Stop logging", self.sendSignal,
202
                        {"sig":signal.SIGUSR1}]}
203

    
204
            eventList = []
205
            relativeTime = 0
206
            for e in sorted(eventDict.keys()):
207
                if e > 0:
208
                    data = eventDict[e]
209
                    eventList.append([e - relativeTime] + data)
210
                    relativeTime += (e - relativeTime)
211

    
212
            waitTime = self.duration - relativeTime
213

    
214
            for event in eventList:
215
                sleep(event[0])
216
                print event
217
                event[2](**event[3])
218
                info(event[1] + str(time.time()) + "\n")
219
            sleep(waitTime)
220
            for pid in self.pendingProc.keys():
221
                self.sendSig(pid, signal.SIGTERM)
222
            time.sleep(2)
223
            # in case some process got stuck:
224
            self.killAll()
225
            time.sleep(2)
226
            #sendSignal(signal.SIGUSR2)
227
            #if self.startLog > 0:
228
            #    duration -= self.startLog
229
            #    sleep(self.startLog)
230
            #    print "XX", self.startLog, duration, time.time()
231
            #    # this is interpreted by the daemons as 
232
            #    # "start (or stop) logging"
233
            #    self.sendSignal(signal.SIGUSR1)
234
            #    info("\nStart logging now!\n") 
235
            #if self.stopNode > 0:
236
            #    crashTime = self.stopNode - self.startLog
237
            #    duration -= crashTime
238
            #    sleep(crashTime)
239
            #    print "XX", self.stopNode, duration, time.time()
240
            #    # this is interpreted as "crash"
241
            #    self.sendSignal(signal.SIGTSTP, self.nodeCrashed)
242
            #    info("\nSent crash signal to node " + str(self.nodeCrashed))
243
            #if self.stopLog > 0:
244
            #    stopTime = self.stopLog - (self.startLog + self.stopNode)
245
            #    duration -= stopTime
246
            #    print "XX", stopTime, duration, time.time()
247
            #    sleep(stopTime)
248
            #    self.sendSignal(signal.SIGUSR1)
249
            #    info("\nStop logging now!\n") 
250
            #print "XX", time.time(), duration
251
            #sleep(duration)
252
            ## this is interpreted by the daemons as 
253
            ## "restart a new run"
254
            #self.sendSignal(signal.SIGUSR2)
255

    
256

    
257

    
258

    
259
    def getCentrality(self):
260
        o = OptimizeGraphChoice()
261
        return o.get_emulation_runs_per_topology(self.graph)
262

    
263
    def startRun(self):
264

    
265
        rNode = ""
266
        hostList = self.getAllHosts()
267

    
268
        if rNode:
269
            info("\nChosen node " + str(rNode) + " to fail\n")
270

    
271
        for h in hostList:
272
            args = " --runid=" + self.runId
273
            if self.logInterval != "":
274
                args += " " + self.logInterval
275
            if self.verbose != "":
276
                args += " " + self.verbose
277
            if self.centralityTuning != "":
278
                args += " " + self.centralityTuning
279

    
280
            launch_pid = self.launchdummyRouting(h, args)
281

    
282
            if self.dump:
283
                self.launchSniffer(h)
284

    
285
        if not self.nodeCrashed and rNode:
286
            self.nodeCrashed = [rNode]
287
        return launch_pid
288

    
289
    def sendSignal(self, sig, hostName=""):
290
        for pid, h in self.pendingProc.items():
291
            if hostName:
292
                for host in hostName:
293
                    if host == h.name:
294
                        print "sending signal to host:", host, ", pid", pid
295
                        self.sendSig(pid, sig)
296
            # send to all 
297
            else:
298
                self.sendSig(pid, sig)
299

    
300

    
301
    def parseTime(self, timeString):
302

    
303
        retString = ""
304
        if timeString.endswith('s'):
305
            retString = timeString[:-1]
306
        elif timeString.endswith('m'):
307
            retString = int(timeString[:-1])*60
308
        else:
309
            retString = timeString
310
        return str(retString)
311

    
312
class dummyRoutingRandomTest(dummyRoutingTest):
313

    
314
    def __init__(self, mininet, name, args):
315

    
316
        self.graph = mininet.gg
317
        if "dumpWithTCPDump" in args.keys():
318
            self.dump = True
319
        else:
320
            self.dump = False
321
        # Doesn't work. If processes are started one after the other
322
        # there is misalignment in the relative log time. I use 
323
        # a signal instead.
324
        #if "startLog" in args.keys():
325
        #    self.startLog = "--startlog " + self.parseTime(args["startLog"])
326
        #else:
327
        #    self.startLog = ""
328

    
329
        if "startLog" in args.keys():
330
            self.startLog = float(self.parseTime(args["startLog"]))
331
        else:
332
            self.startLog = -1
333

    
334
        if "stopLog" in args.keys():
335
            self.stopLog = float(self.parseTime(args["stopLog"]))
336
        else:
337
            self.stopLog = -1
338

    
339
        if "logInterval" in args.keys():
340
            self.logInterval = "--loginterval " \
341
                    + self.parseTime(args["logInterval"])
342
        else:
343
            self.logInterval = ""
344

    
345
        if "verbose" in args.keys():
346
            self.verbose = "-v "
347
        else:
348
            self.verbose = ""
349

    
350
        if "centralityTuning" in args.keys():
351
            self.centralityTuning = "-c "
352
        else:
353
            self.centralityTuning = ""
354

    
355
        if "stopNode" in args.keys():
356
            self.stopNode = int(self.parseTime(args["stopNode"]))
357
        else:
358
            self.stopNode = -1
359

    
360
        if "nodeCrashed" in args.keys():
361
            self.nodeCrashed = [args["nodeCrashed"]]
362
        else:
363
            self.nodeCrashed = ""
364

    
365
        if "stopAllNodes" in args.keys():
366
            info("Going to stop all the nodes in sequence")
367
            # convert option in a python object
368
            if args['stopAllNodes'] == '':
369
                # stop all the nodes 
370
                self.stopAllNodes = len(self.graph)
371
            else:
372
                try:
373
                    s = ast.literal_eval(args["stopAllNodes"])
374
                except ValueError:
375
                    error("Option " + args["stopAllNodes"] + " is not valid")
376
                    exit(1)
377
                if type(s) == int:
378
                    if s <= 0:
379
                        error("\nPlease stopAllNodes must be > 0\n")
380
                        sys.exit(1)
381
                    self.stopAllNodes = s
382
                    info("... limited to " + args["stopAllNodes"] + " node(s).")
383
                elif type(s) == list and s:
384
                    self.stopAllNodes = s
385
                    info("... limited to the list of nodes: " + str(s))
386
                else:
387
                    error("Option " + args["stopAllNodes"] + " is not valid")
388

    
389
            self.nodeCrashed = []
390
        else:
391
            self.stopAllNodes = False
392

    
393
        self.runId = 0
394

    
395
        duration = int(self.parseTime(args["duration"]))
396

    
397
        super(dummyRoutingRandomTest, self).__init__(mininet, duration)
398
        self.localPrefix = os.path.basename(os.path.splitext(
399
            mininet.gg.name)[0])
400
        self.setPrefix(name)
401

    
402