Statistics
| Branch: | Revision:

nepatest_popbabel / test_code / dummyrouting.py @ 60ba786f

History | View | Annotate | Download (13.8 KB)

1

    
2
import sys
3
sys.path.append('../')
4
from network_builder import *
5
from os import kill, path, makedirs
6
from matplotlib.pyplot import ion
7
from collections import defaultdict
8
import random 
9
import time
10
import ast
11
import networkx as nx
12

    
13
import signal
14

    
15
from test_generic import *
16

    
17
class OptimizeGraphChoice:
18
    """ helper class to optimize the choice of the 
19
    graphs to run the simulations """
20

    
21
    def __init__(self, failures=1000000):
22

    
23
        # just an initialization number > of the size of the graph
24
        self.failures = failures
25
        self.shuffle = False
26

    
27
    def compute_topology_failure_maps(self, graph_dict, min_run_number):
28
        failure_map = defaultdict(list)
29
        topo_failures = {}
30
        for topo, graph in graph_dict.items():
31
            failure_list = self.get_emulation_runs_per_topology(graph)
32
            failure_number = len(failure_list)
33
            for idx in range(failure_number):
34
                failure_map[idx].append(topo)
35
                topo_failures[topo] = failure_number
36

    
37
        max_fail = len(
38
                filter(lambda z: z>min_run_number,
39
                [len(y[1]) for y in sorted(
40
                    failure_map.items(), key = lambda x: x[0])]) 
41
                )
42
        print "The maximum number of failures avilable with "
43
        print min_run_number, "run(s) is ", max_fail
44

    
45
        # file -> failure list
46
        file_dict = defaultdict(list)
47

    
48
        min_fail = min(max_fail, self.failures)
49

    
50
        #print filter(lambda z: z>min_run_number,
51
        #        [len(y[1]) for y in sorted(
52
        #            failure_map.items(), key = lambda x: x[0])]) 
53

    
54
        failure_counter = [0]*min_fail
55

    
56
        for (idx, file_list) in \
57
            sorted(failure_map.items(), key = lambda x: x[0])[:min_fail]:
58
                if idx >= min_fail:
59
                    break
60
                if self.shuffle:
61
                    # if i shuffle i can not compare opt/non-opt simulations
62
                    random.shuffle(file_list)
63
                for f in file_list:
64
                    if failure_counter[idx] >=  min_run_number:
65
                        break
66
                    if f in file_dict:
67
                        continue
68
                    rem_runs = range(idx, min(topo_failures[f], min_fail))
69
                    file_dict[f] = rem_runs
70
                    for r in rem_runs:
71
                        failure_counter[r] += 1
72

    
73
        for f,runs in sorted(file_dict.items(), key = lambda x: len(x[1])):
74
            print f, [1 if x in runs else 0 for x in range(min_fail)]
75

    
76
        return file_dict
77

    
78
    def get_emulation_runs_per_topology(self, graph):
79
        """ return a list of nodes ordered by centrality that can be removed.
80
        1) identify the 2-core of the network (i.e. remove leaf nodes up to
81
           when there are any)
82
        2) from the list of nodes in the 2-core, remove the articulation points
83
            (i.e. nodes that will partition the 2-core in multiple disconnected
84
            components)
85
        3) for each remaining node, find the tree that is rooted on it (i.e.
86
           remove node from the graph, all the disconnected components
87
           that remain form such tree). 
88
        4) each node + tree can fail all together.
89

90
        In this script this is necessary because not all the graphs support 
91
        the same number or run_ids. Some graph may allow 10 runs (10 failures)
92
        while other 8 failures. I pre-parse the topology to identify this number, 
93
        then i run the simulations with the sufficient number of graphs that 
94
        allow me to have a minimum number of repetitions for each run_id (for each
95
        failure). The minimum number is taken from the runs parameter in 
96
        command line """
97

    
98
        # FIXME remove self loops from original graphs
99
        graph.remove_edges_from(graph.selfloop_edges())
100
        two_core = nx.k_core(graph, 2)
101
        art = [n for n in nx.articulation_points(two_core)]
102
        cent_list = sorted(
103
                [n for n in nx.betweenness_centrality(two_core).items() ],
104
                key = lambda x: -x[1])
105

    
106
        fail_candidates = [n[0] for n in cent_list if n[0] not in art]
107

    
108
        fallible_nodes = []
109
        for n in fail_candidates:
110
            gg = graph.copy()
111
            gg.remove_node(n)
112
            comp = list(nx.connected_components(gg))
113
            isolated_nodes = [x for component in comp[1:] for x in component]
114
            fallible_nodes.append([n] + isolated_nodes) 
115

    
116
        return fallible_nodes
117

    
118

    
119
class dummyRoutingTest(MininetTest):
120

    
121
    def __init__(self, mininet, duration=10):
122

    
123
        super(dummyRoutingTest, self).__init__(mininet, path, duration)
124
        self.centList = []
125

    
126

    
127
    def launchSniffer(self, host):
128

    
129
        cmd = "tcpdump -i any -n -X -e "
130

    
131
        logfile = self.prefix + host.name + "-dump.log"
132

    
133
        params = {}
134
        params['>'] = logfile
135
        params['2>'] = logfile
136

    
137

    
138
        return self.bgCmd(host, True, cmd,
139
            *reduce(lambda x, y: x + y, params.items()))
140

    
141

    
142
    def launchdummyRouting(self, host,  args):
143

    
144
        cmd = "../dummy_routing_protocol/routingdaemon.py " + \
145
                args
146

    
147
        logfile = self.prefix + host.name + ".log"
148

    
149
        params = {}
150
        params['>'] = logfile
151
        params['2>'] = logfile
152

    
153

    
154
        return self.bgCmd(host, True, cmd,
155
            *reduce(lambda x, y: x + y, params.items()))
156

    
157
    def runTest(self):
158

    
159
        info("*** Launching dummyRouting test\n")
160
        info("Data folder: "+self.prefix+"\n")
161

    
162
        run_list = []
163

    
164
        if self.stopAllNodes:
165
            if type(self.stopAllNodes) == int:
166
                self.centList = self.getCentrality()[:self.stopAllNodes]
167
                run_list = range(self.stopAllNodes)
168
            elif type(self.stopAllNodes) == list:
169
                for idx in self.stopAllNodes:
170
                    self.centList.append(self.getCentrality()[idx])
171
                run_list = self.stopAllNodes
172

    
173
        for runid in run_list:
174
            info("\nStarting run " + str(runid) + "\n")
175
            self.runId = str(runid)
176
            if self.stopAllNodes:
177
                self.nodeCrashed = self.centList.pop(0)
178

    
179
            if not self.startRun():
180
                # some times process are not killed in time, UDP
181
                # ports are still occupied and the next run can not
182
                # start correctly. I kill everything, wait some time, try 
183
                # to restart. If something still goes wrong i stop the emulation
184
                self.killAll()
185
                time.sleep(10)
186
                info("\nWARNING: run_id " + str(runid) + " could not start, retrying...\n")
187
                if not self.startRun():
188
                    error("\nERROR: run_id " + str(runid) + " could not start!" + \
189
                            "please check the logs\n")
190
                    sys.exit(1)
191

    
192

    
193

    
194
            eventDict = {
195
                    self.startLog:["Start logging", self.sendSignal,
196
                        {"sig":signal.SIGUSR1}],
197
                    self.stopNode:["Stopping node(s) " + str(self.nodeCrashed) + " ",
198
                        self.sendSignal, {"sig":signal.SIGTSTP,
199
                        "hostName":self.nodeCrashed}],
200
                    self.stopLog:["Stop logging", self.sendSignal,
201
                        {"sig":signal.SIGUSR1}]}
202

    
203
            eventList = []
204
            relativeTime = 0
205
            for e in sorted(eventDict.keys()):
206
                if e > 0:
207
                    data = eventDict[e]
208
                    eventList.append([e - relativeTime] + data)
209
                    relativeTime += (e - relativeTime)
210

    
211
            waitTime = self.duration - relativeTime
212

    
213
            for event in eventList:
214
                sleep(event[0])
215
                print event
216
                event[2](**event[3])
217
                info(event[1] + str(time.time()) + "\n")
218
            sleep(waitTime)
219
            for pid in self.pendingProc.keys():
220
                self.sendSig(pid, signal.SIGTERM)
221
            time.sleep(2)
222
            # in case some process got stuck:
223
            self.killAll()
224
            time.sleep(2)
225
            #sendSignal(signal.SIGUSR2)
226
            #if self.startLog > 0:
227
            #    duration -= self.startLog
228
            #    sleep(self.startLog)
229
            #    print "XX", self.startLog, duration, time.time()
230
            #    # this is interpreted by the daemons as 
231
            #    # "start (or stop) logging"
232
            #    self.sendSignal(signal.SIGUSR1)
233
            #    info("\nStart logging now!\n") 
234
            #if self.stopNode > 0:
235
            #    crashTime = self.stopNode - self.startLog
236
            #    duration -= crashTime
237
            #    sleep(crashTime)
238
            #    print "XX", self.stopNode, duration, time.time()
239
            #    # this is interpreted as "crash"
240
            #    self.sendSignal(signal.SIGTSTP, self.nodeCrashed)
241
            #    info("\nSent crash signal to node " + str(self.nodeCrashed))
242
            #if self.stopLog > 0:
243
            #    stopTime = self.stopLog - (self.startLog + self.stopNode)
244
            #    duration -= stopTime
245
            #    print "XX", stopTime, duration, time.time()
246
            #    sleep(stopTime)
247
            #    self.sendSignal(signal.SIGUSR1)
248
            #    info("\nStop logging now!\n") 
249
            #print "XX", time.time(), duration
250
            #sleep(duration)
251
            ## this is interpreted by the daemons as 
252
            ## "restart a new run"
253
            #self.sendSignal(signal.SIGUSR2)
254

    
255

    
256

    
257

    
258
    def getCentrality(self):
259
        o = OptimizeGraphChoice()
260
        return o.get_emulation_runs_per_topology(self.graph)
261

    
262
    def startRun(self):
263

    
264
        rNode = ""
265
        hostList = self.getAllHosts()
266

    
267
        if rNode:
268
            info("\nChosen node " + str(rNode) + " to fail\n")
269

    
270
        for h in hostList:
271
            args = " --runid=" + self.runId
272
            if self.logInterval != "":
273
                args += " " + self.logInterval
274
            if self.verbose != "":
275
                args += " " + self.verbose
276
            if self.centralityTuning != "":
277
                args += " " + self.centralityTuning
278

    
279
            launch_pid = self.launchdummyRouting(h, args)
280

    
281
            if self.dump:
282
                self.launchSniffer(h)
283

    
284
        if not self.nodeCrashed and rNode:
285
            self.nodeCrashed = [rNode]
286
        return launch_pid
287

    
288
    def sendSignal(self, sig, hostName=""):
289
        for pid, h in self.pendingProc.items():
290
            if hostName:
291
                for host in hostName:
292
                    if host == h.name:
293
                        print "sending signal to host:", host, ", pid", pid
294
                        self.sendSig(pid, sig)
295
            # send to all 
296
            else:
297
                self.sendSig(pid, sig)
298

    
299

    
300
    def parseTime(self, timeString):
301

    
302
        retString = ""
303
        if timeString.endswith('s'):
304
            retString = timeString[:-1]
305
        elif timeString.endswith('m'):
306
            retString = int(timeString[:-1])*60
307
        else:
308
            retString = timeString
309
        return str(retString)
310

    
311
class dummyRoutingRandomTest(dummyRoutingTest):
312

    
313
    def __init__(self, mininet, name, args):
314

    
315
        self.graph = mininet.gg
316
        if "dumpWithTCPDump" in args.keys():
317
            self.dump = True
318
        else:
319
            self.dump = False
320
        # Doesn't work. If processes are started one after the other
321
        # there is misalignment in the relative log time. I use 
322
        # a signal instead.
323
        #if "startLog" in args.keys():
324
        #    self.startLog = "--startlog " + self.parseTime(args["startLog"])
325
        #else:
326
        #    self.startLog = ""
327

    
328
        if "startLog" in args.keys():
329
            self.startLog = float(self.parseTime(args["startLog"]))
330
        else:
331
            self.startLog = -1
332

    
333
        if "stopLog" in args.keys():
334
            self.stopLog = float(self.parseTime(args["stopLog"]))
335
        else:
336
            self.stopLog = -1
337

    
338
        if "logInterval" in args.keys():
339
            self.logInterval = "--loginterval " \
340
                    + self.parseTime(args["logInterval"])
341
        else:
342
            self.logInterval = ""
343

    
344
        if "verbose" in args.keys():
345
            self.verbose = "-v "
346
        else:
347
            self.verbose = ""
348

    
349
        if "centralityTuning" in args.keys():
350
            self.centralityTuning = "-c "
351
        else:
352
            self.centralityTuning = ""
353

    
354
        if "stopNode" in args.keys():
355
            self.stopNode = int(self.parseTime(args["stopNode"]))
356
        else:
357
            self.stopNode = -1
358

    
359
        if "nodeCrashed" in args.keys():
360
            self.nodeCrashed = [args["nodeCrashed"]]
361
        else:
362
            self.nodeCrashed = ""
363

    
364
        if "stopAllNodes" in args.keys():
365
            info("Going to stop all the nodes in sequence")
366
            # convert option in a python object
367
            if args['stopAllNodes'] == '':
368
                # stop all the nodes 
369
                self.stopAllNodes = len(self.graph)
370
            else:
371
                try:
372
                    s = ast.literal_eval(args["stopAllNodes"])
373
                except ValueError:
374
                    error("Option " + args["stopAllNodes"] + " is not valid")
375
                    exit(1)
376
                if type(s) == int:
377
                    if s <= 0:
378
                        error("\nPlease stopAllNodes must be > 0\n")
379
                        sys.exit(1)
380
                    self.stopAllNodes = s
381
                    info("... limited to " + args["stopAllNodes"] + " node(s).")
382
                elif type(s) == list and s:
383
                    self.stopAllNodes = s
384
                    info("... limited to the list of nodes: " + str(s))
385
                else:
386
                    error("Option " + args["stopAllNodes"] + " is not valid")
387

    
388
            self.nodeCrashed = []
389
        else:
390
            self.stopAllNodes = False
391

    
392
        self.runId = 0
393

    
394
        duration = int(self.parseTime(args["duration"]))
395

    
396
        super(dummyRoutingRandomTest, self).__init__(mininet, duration)
397
        self.localPrefix = os.path.basename(os.path.splitext(
398
            mininet.gg_name)[0])
399
        self.setPrefix(name + self.localPrefix)
400

    
401