Statistics
| Branch: | Tag: | Revision:

mininet / mininet / node.py @ 63ae13fc

History | View | Annotate | Download (57.7 KB)

1
"""
2
Node objects for Mininet.
3

4
Nodes provide a simple abstraction for interacting with hosts, switches
5
and controllers. Local nodes are simply one or more processes on the local
6
machine.
7

8
Node: superclass for all (primarily local) network nodes.
9

10
Host: a virtual host. By default, a host is simply a shell; commands
11
    may be sent using Cmd (which waits for output), or using sendCmd(),
12
    which returns immediately, allowing subsequent monitoring using
13
    monitor(). Examples of how to run experiments using this
14
    functionality are provided in the examples/ directory. By default,
15
    hosts share the root file system, but they may also specify private
16
    directories.
17

18
CPULimitedHost: a virtual host whose CPU bandwidth is limited by
19
    RT or CFS bandwidth limiting.
20

21
Switch: superclass for switch nodes.
22

23
UserSwitch: a switch using the user-space switch from the OpenFlow
24
    reference implementation.
25

26
OVSSwitch: a switch using the Open vSwitch OpenFlow-compatible switch
27
    implementation (openvswitch.org).
28

29
OVSBridge: an Ethernet bridge implemented using Open vSwitch.
30
    Supports STP.
31

32
IVSSwitch: OpenFlow switch using the Indigo Virtual Switch.
33

34
Controller: superclass for OpenFlow controllers. The default controller
35
    is controller(8) from the reference implementation.
36

37
OVSController: The test controller from Open vSwitch.
38

39
NOXController: a controller node using NOX (noxrepo.org).
40

41
Ryu: The Ryu controller (https://osrg.github.io/ryu/)
42

43
RemoteController: a remote controller node, which may use any
44
    arbitrary OpenFlow-compatible controller, and which is not
45
    created or managed by Mininet.
46

47
Future enhancements:
48

49
- Possibly make Node, Switch and Controller more abstract so that
50
  they can be used for both local and remote nodes
51

52
- Create proxy objects for remote nodes (Mininet: Cluster Edition)
53
"""
54

    
55
import os
56
import pty
57
import re
58
import signal
59
import select
60
from subprocess import Popen, PIPE
61
from time import sleep
62

    
63
from mininet.log import info, error, warn, debug
64
from mininet.util import ( quietRun, errRun, errFail, moveIntf, isShellBuiltin,
65
                           numCores, retry, mountCgroups )
66
from mininet.moduledeps import moduleDeps, pathCheck, TUN
67
from mininet.link import Link, Intf, TCIntf, OVSIntf
68
from re import findall
69
from distutils.version import StrictVersion
70

    
71
class Node( object ):
72
    """A virtual network node is simply a shell in a network namespace.
73
       We communicate with it using pipes."""
74

    
75
    portBase = 0  # Nodes always start with eth0/port0, even in OF 1.0
76

    
77
    def __init__( self, name, inNamespace=True, **params ):
78
        """name: name of node
79
           inNamespace: in network namespace?
80
           privateDirs: list of private directory strings or tuples
81
           params: Node parameters (see config() for details)"""
82

    
83
        # Make sure class actually works
84
        self.checkSetup()
85

    
86
        self.name = params.get( 'name', name )
87
        self.privateDirs = params.get( 'privateDirs', [] )
88
        self.inNamespace = params.get( 'inNamespace', inNamespace )
89

    
90
        # Stash configuration parameters for future reference
91
        self.params = params
92

    
93
        self.intfs = {}  # dict of port numbers to interfaces
94
        self.ports = {}  # dict of interfaces to port numbers
95
                         # replace with Port objects, eventually ?
96
        self.nameToIntf = {}  # dict of interface names to Intfs
97

    
98
        # Make pylint happy
99
        ( self.shell, self.execed, self.pid, self.stdin, self.stdout,
100
            self.lastPid, self.lastCmd, self.pollOut ) = (
101
                None, None, None, None, None, None, None, None )
102
        self.waiting = False
103
        self.readbuf = ''
104

    
105
        # Start command interpreter shell
106
        self.startShell()
107
        self.mountPrivateDirs()
108

    
109
    # File descriptor to node mapping support
110
    # Class variables and methods
111

    
112
    inToNode = {}  # mapping of input fds to nodes
113
    outToNode = {}  # mapping of output fds to nodes
114

    
115
    @classmethod
116
    def fdToNode( cls, fd ):
117
        """Return node corresponding to given file descriptor.
118
           fd: file descriptor
119
           returns: node"""
120
        node = cls.outToNode.get( fd )
121
        return node or cls.inToNode.get( fd )
122

    
123
    # Command support via shell process in namespace
124
    def startShell( self, mnopts=None ):
125
        "Start a shell process for running commands"
126
        if self.shell:
127
            error( "%s: shell is already running\n" % self.name )
128
            return
129
        # mnexec: (c)lose descriptors, (d)etach from tty,
130
        # (p)rint pid, and run in (n)amespace
131
        opts = '-cd' if mnopts is None else mnopts
132
        if self.inNamespace:
133
            opts += 'n'
134
        # bash -i: force interactive
135
        # -s: pass $* to shell, and make process easy to find in ps
136
        # prompt is set to sentinel chr( 127 )
137
        cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ),
138
                'bash', '--norc', '-is', 'mininet:' + self.name ]
139
        # Spawn a shell subprocess in a pseudo-tty, to disable buffering
140
        # in the subprocess and insulate it from signals (e.g. SIGINT)
141
        # received by the parent
142
        master, slave = pty.openpty()
143
        self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
144
                                  close_fds=False )
145
        self.stdin = os.fdopen( master, 'rw' )
146
        self.stdout = self.stdin
147
        self.pid = self.shell.pid
148
        self.pollOut = select.poll()
149
        self.pollOut.register( self.stdout )
150
        # Maintain mapping between file descriptors and nodes
151
        # This is useful for monitoring multiple nodes
152
        # using select.poll()
153
        self.outToNode[ self.stdout.fileno() ] = self
154
        self.inToNode[ self.stdin.fileno() ] = self
155
        self.execed = False
156
        self.lastCmd = None
157
        self.lastPid = None
158
        self.readbuf = ''
159
        # Wait for prompt
160
        while True:
161
            data = self.read( 1024 )
162
            if data[ -1 ] == chr( 127 ):
163
                break
164
            self.pollOut.poll()
165
        self.waiting = False
166
        # +m: disable job control notification
167
        self.cmd( 'unset HISTFILE; stty -echo; set +m' )
168

    
169
    def mountPrivateDirs( self ):
170
        "mount private directories"
171
        for directory in self.privateDirs:
172
            if isinstance( directory, tuple ):
173
                # mount given private directory
174
                privateDir = directory[ 1 ] % self.__dict__
175
                mountPoint = directory[ 0 ]
176
                self.cmd( 'mkdir -p %s' % privateDir )
177
                self.cmd( 'mkdir -p %s' % mountPoint )
178
                self.cmd( 'mount --bind %s %s' %
179
                               ( privateDir, mountPoint ) )
180
            else:
181
                # mount temporary filesystem on directory
182
                self.cmd( 'mkdir -p %s' % directory )
183
                self.cmd( 'mount -n -t tmpfs tmpfs %s' % directory )
184

    
185
    def unmountPrivateDirs( self ):
186
        "mount private directories"
187
        for directory in self.privateDirs:
188
            if isinstance( directory, tuple ):
189
                self.cmd( 'umount ', directory[ 0 ] )
190
            else:
191
                self.cmd( 'umount ', directory )
192

    
193
    def _popen( self, cmd, **params ):
194
        """Internal method: spawn and return a process
195
            cmd: command to run (list)
196
            params: parameters to Popen()"""
197
        # Leave this is as an instance method for now
198
        assert self
199
        return Popen( cmd, **params )
200

    
201
    def cleanup( self ):
202
        "Help python collect its garbage."
203
        # We used to do this, but it slows us down:
204
        # Intfs may end up in root NS
205
        # for intfName in self.intfNames():
206
        # if self.name in intfName:
207
        # quietRun( 'ip link del ' + intfName )
208
        self.shell = None
209

    
210
    # Subshell I/O, commands and control
211

    
212
    def read( self, maxbytes=1024 ):
213
        """Buffered read from node, non-blocking.
214
           maxbytes: maximum number of bytes to return"""
215
        count = len( self.readbuf )
216
        if count < maxbytes:
217
            data = os.read( self.stdout.fileno(), maxbytes - count )
218
            self.readbuf += data
219
        if maxbytes >= len( self.readbuf ):
220
            result = self.readbuf
221
            self.readbuf = ''
222
        else:
223
            result = self.readbuf[ :maxbytes ]
224
            self.readbuf = self.readbuf[ maxbytes: ]
225
        return result
226

    
227
    def readline( self ):
228
        """Buffered readline from node, non-blocking.
229
           returns: line (minus newline) or None"""
230
        self.readbuf += self.read( 1024 )
231
        if '\n' not in self.readbuf:
232
            return None
233
        pos = self.readbuf.find( '\n' )
234
        line = self.readbuf[ 0: pos ]
235
        self.readbuf = self.readbuf[ pos + 1: ]
236
        return line
237

    
238
    def write( self, data ):
239
        """Write data to node.
240
           data: string"""
241
        os.write( self.stdin.fileno(), data )
242

    
243
    def terminate( self ):
244
        "Send kill signal to Node and clean up after it."
245
        self.unmountPrivateDirs()
246
        if self.shell:
247
            if self.shell.poll() is None:
248
                os.killpg( self.shell.pid, signal.SIGHUP )
249
        self.cleanup()
250

    
251
    def stop( self, deleteIntfs=False ):
252
        """Stop node.
253
           deleteIntfs: delete interfaces? (False)"""
254
        if deleteIntfs:
255
            self.deleteIntfs()
256
        self.terminate()
257

    
258
    def waitReadable( self, timeoutms=None ):
259
        """Wait until node's output is readable.
260
           timeoutms: timeout in ms or None to wait indefinitely."""
261
        if len( self.readbuf ) == 0:
262
            self.pollOut.poll( timeoutms )
263

    
264
    def sendCmd( self, *args, **kwargs ):
265
        """Send a command, followed by a command to echo a sentinel,
266
           and return without waiting for the command to complete.
267
           args: command and arguments, or string
268
           printPid: print command's PID? (False)"""
269
        assert self.shell and not self.waiting
270
        printPid = kwargs.get( 'printPid', False )
271
        # Allow sendCmd( [ list ] )
272
        if len( args ) == 1 and isinstance( args[ 0 ], list ):
273
            cmd = args[ 0 ]
274
        # Allow sendCmd( cmd, arg1, arg2... )
275
        elif len( args ) > 0:
276
            cmd = args
277
        # Convert to string
278
        if not isinstance( cmd, str ):
279
            cmd = ' '.join( [ str( c ) for c in cmd ] )
280
        if not re.search( r'\w', cmd ):
281
            # Replace empty commands with something harmless
282
            cmd = 'echo -n'
283
        self.lastCmd = cmd
284
        # if a builtin command is backgrounded, it still yields a PID
285
        if len( cmd ) > 0 and cmd[ -1 ] == '&':
286
            # print ^A{pid}\n so monitor() can set lastPid
287
            cmd += ' printf "\\001%d\\012" $! '
288
        elif printPid and not isShellBuiltin( cmd ):
289
            cmd = 'mnexec -p ' + cmd
290
        self.write( cmd + '\n' )
291
        self.lastPid = None
292
        self.waiting = True
293

    
294
    def sendInt( self, intr=chr( 3 ) ):
295
        "Interrupt running command."
296
        debug( 'sendInt: writing chr(%d)\n' % ord( intr ) )
297
        self.write( intr )
298

    
299
    def monitor( self, timeoutms=None, findPid=True ):
300
        """Monitor and return the output of a command.
301
           Set self.waiting to False if command has completed.
302
           timeoutms: timeout in ms or None to wait indefinitely
303
           findPid: look for PID from mnexec -p"""
304
        self.waitReadable( timeoutms )
305
        data = self.read( 1024 )
306
        pidre = r'\[\d+\] \d+\r\n'
307
        # Look for PID
308
        marker = chr( 1 ) + r'\d+\r\n'
309
        if findPid and chr( 1 ) in data:
310
            # suppress the job and PID of a backgrounded command
311
            if re.findall( pidre, data ):
312
                data = re.sub( pidre, '', data )
313
            # Marker can be read in chunks; continue until all of it is read
314
            while not re.findall( marker, data ):
315
                data += self.read( 1024 )
316
            markers = re.findall( marker, data )
317
            if markers:
318
                self.lastPid = int( markers[ 0 ][ 1: ] )
319
                data = re.sub( marker, '', data )
320
        # Look for sentinel/EOF
321
        if len( data ) > 0 and data[ -1 ] == chr( 127 ):
322
            self.waiting = False
323
            data = data[ :-1 ]
324
        elif chr( 127 ) in data:
325
            self.waiting = False
326
            data = data.replace( chr( 127 ), '' )
327
        return data
328

    
329
    def waitOutput( self, verbose=False, findPid=True ):
330
        """Wait for a command to complete.
331
           Completion is signaled by a sentinel character, ASCII(127)
332
           appearing in the output stream.  Wait for the sentinel and return
333
           the output, including trailing newline.
334
           verbose: print output interactively"""
335
        log = info if verbose else debug
336
        output = ''
337
        while self.waiting:
338
            data = self.monitor( findPid=findPid )
339
            output += data
340
            log( data )
341
        return output
342

    
343
    def cmd( self, *args, **kwargs ):
344
        """Send a command, wait for output, and return it.
345
           cmd: string"""
346
        verbose = kwargs.get( 'verbose', False )
347
        log = info if verbose else debug
348
        log( '*** %s : %s\n' % ( self.name, args ) )
349
        if self.shell:
350
            self.sendCmd( *args, **kwargs )
351
            return self.waitOutput( verbose )
352
        else:
353
            warn( '(%s exited - ignoring cmd%s)\n' % ( self, args ) )
354

    
355
    def cmdPrint( self, *args):
356
        """Call cmd and printing its output
357
           cmd: string"""
358
        return self.cmd( *args, **{ 'verbose': True } )
359

    
360
    def popen( self, *args, **kwargs ):
361
        """Return a Popen() object in our namespace
362
           args: Popen() args, single list, or string
363
           kwargs: Popen() keyword args"""
364
        defaults = { 'stdout': PIPE, 'stderr': PIPE,
365
                     'mncmd':
366
                     [ 'mnexec', '-da', str( self.pid ) ] }
367
        defaults.update( kwargs )
368
        if len( args ) == 1:
369
            if isinstance( args[ 0 ], list ):
370
                # popen([cmd, arg1, arg2...])
371
                cmd = args[ 0 ]
372
            elif isinstance( args[ 0 ], basestring ):
373
                # popen("cmd arg1 arg2...")
374
                cmd = args[ 0 ].split()
375
            else:
376
                raise Exception( 'popen() requires a string or list' )
377
        elif len( args ) > 0:
378
            # popen( cmd, arg1, arg2... )
379
            cmd = list( args )
380
        # Attach to our namespace  using mnexec -a
381
        cmd = defaults.pop( 'mncmd' ) + cmd
382
        # Shell requires a string, not a list!
383
        if defaults.get( 'shell', False ):
384
            cmd = ' '.join( cmd )
385
        popen = self._popen( cmd, **defaults )
386
        return popen
387

    
388
    def pexec( self, *args, **kwargs ):
389
        """Execute a command using popen
390
           returns: out, err, exitcode"""
391
        popen = self.popen( *args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
392
                            **kwargs )
393
        # Warning: this can fail with large numbers of fds!
394
        out, err = popen.communicate()
395
        exitcode = popen.wait()
396
        return out, err, exitcode
397

    
398
    # Interface management, configuration, and routing
399

    
400
    # BL notes: This might be a bit redundant or over-complicated.
401
    # However, it does allow a bit of specialization, including
402
    # changing the canonical interface names. It's also tricky since
403
    # the real interfaces are created as veth pairs, so we can't
404
    # make a single interface at a time.
405

    
406
    def newPort( self ):
407
        "Return the next port number to allocate."
408
        if len( self.ports ) > 0:
409
            return max( self.ports.values() ) + 1
410
        return self.portBase
411

    
412
    def addIntf( self, intf, port=None, moveIntfFn=moveIntf ):
413
        """Add an interface.
414
           intf: interface
415
           port: port number (optional, typically OpenFlow port number)
416
           moveIntfFn: function to move interface (optional)"""
417
        if port is None:
418
            port = self.newPort()
419
        self.intfs[ port ] = intf
420
        self.ports[ intf ] = port
421
        self.nameToIntf[ intf.name ] = intf
422
        debug( '\n' )
423
        debug( 'added intf %s (%d) to node %s\n' % (
424
                intf, port, self.name ) )
425
        if self.inNamespace:
426
            debug( 'moving', intf, 'into namespace for', self.name, '\n' )
427
            moveIntfFn( intf.name, self  )
428

    
429
    def defaultIntf( self ):
430
        "Return interface for lowest port"
431
        ports = self.intfs.keys()
432
        if ports:
433
            return self.intfs[ min( ports ) ]
434
        else:
435
            warn( '*** defaultIntf: warning:', self.name,
436
                  'has no interfaces\n' )
437

    
438
    def intf( self, intf=None ):
439
        """Return our interface object with given string name,
440
           default intf if name is falsy (None, empty string, etc).
441
           or the input intf arg.
442

443
        Having this fcn return its arg for Intf objects makes it
444
        easier to construct functions with flexible input args for
445
        interfaces (those that accept both string names and Intf objects).
446
        """
447
        if not intf:
448
            return self.defaultIntf()
449
        elif isinstance( intf, basestring):
450
            return self.nameToIntf[ intf ]
451
        else:
452
            return intf
453

    
454
    def connectionsTo( self, node):
455
        "Return [ intf1, intf2... ] for all intfs that connect self to node."
456
        # We could optimize this if it is important
457
        connections = []
458
        for intf in self.intfList():
459
            link = intf.link
460
            if link:
461
                node1, node2 = link.intf1.node, link.intf2.node
462
                if node1 == self and node2 == node:
463
                    connections += [ ( intf, link.intf2 ) ]
464
                elif node1 == node and node2 == self:
465
                    connections += [ ( intf, link.intf1 ) ]
466
        return connections
467

    
468
    def deleteIntfs( self, checkName=True ):
469
        """Delete all of our interfaces.
470
           checkName: only delete interfaces that contain our name"""
471
        # In theory the interfaces should go away after we shut down.
472
        # However, this takes time, so we're better off removing them
473
        # explicitly so that we won't get errors if we run before they
474
        # have been removed by the kernel. Unfortunately this is very slow,
475
        # at least with Linux kernels before 2.6.33
476
        for intf in self.intfs.values():
477
            # Protect against deleting hardware interfaces
478
            if ( self.name in intf.name ) or ( not checkName ):
479
                intf.delete()
480
                info( '.' )
481

    
482
    # Routing support
483

    
484
    def setARP( self, ip, mac ):
485
        """Add an ARP entry.
486
           ip: IP address as string
487
           mac: MAC address as string"""
488
        result = self.cmd( 'arp', '-s', ip, mac )
489
        return result
490

    
491
    def setHostRoute( self, ip, intf ):
492
        """Add route to host.
493
           ip: IP address as dotted decimal
494
           intf: string, interface name"""
495
        return self.cmd( 'route add -host', ip, 'dev', intf )
496

    
497
    def setDefaultRoute( self, intf=None ):
498
        """Set the default route to go through intf.
499
           intf: Intf or {dev <intfname> via <gw-ip> ...}"""
500
        # Note setParam won't call us if intf is none
501
        if isinstance( intf, basestring ) and ' ' in intf:
502
            params = intf
503
        else:
504
            params = 'dev %s' % intf
505
        # Do this in one line in case we're messing with the root namespace
506
        self.cmd( 'ip route del default; ip route add default', params )
507

    
508
    # Convenience and configuration methods
509

    
510
    def setMAC( self, mac, intf=None ):
511
        """Set the MAC address for an interface.
512
           intf: intf or intf name
513
           mac: MAC address as string"""
514
        return self.intf( intf ).setMAC( mac )
515

    
516
    def setIP( self, ip, prefixLen=8, intf=None, **kwargs ):
517
        """Set the IP address for an interface.
518
           intf: intf or intf name
519
           ip: IP address as a string
520
           prefixLen: prefix length, e.g. 8 for /8 or 16M addrs
521
           kwargs: any additional arguments for intf.setIP"""
522
        return self.intf( intf ).setIP( ip, prefixLen, **kwargs )
523

    
524
    def IP( self, intf=None ):
525
        "Return IP address of a node or specific interface."
526
        return self.intf( intf ).IP()
527

    
528
    def MAC( self, intf=None ):
529
        "Return MAC address of a node or specific interface."
530
        return self.intf( intf ).MAC()
531

    
532
    def intfIsUp( self, intf=None ):
533
        "Check if an interface is up."
534
        return self.intf( intf ).isUp()
535

    
536
    # The reason why we configure things in this way is so
537
    # That the parameters can be listed and documented in
538
    # the config method.
539
    # Dealing with subclasses and superclasses is slightly
540
    # annoying, but at least the information is there!
541

    
542
    def setParam( self, results, method, **param ):
543
        """Internal method: configure a *single* parameter
544
           results: dict of results to update
545
           method: config method name
546
           param: arg=value (ignore if value=None)
547
           value may also be list or dict"""
548
        name, value = param.items()[ 0 ]
549
        if value is None:
550
            return
551
        f = getattr( self, method, None )
552
        if not f:
553
            return
554
        if isinstance( value, list ):
555
            result = f( *value )
556
        elif isinstance( value, dict ):
557
            result = f( **value )
558
        else:
559
            result = f( value )
560
        results[ name ] = result
561
        return result
562

    
563
    def config( self, mac=None, ip=None,
564
                defaultRoute=None, lo='up', **_params ):
565
        """Configure Node according to (optional) parameters:
566
           mac: MAC address for default interface
567
           ip: IP address for default interface
568
           ifconfig: arbitrary interface configuration
569
           Subclasses should override this method and call
570
           the parent class's config(**params)"""
571
        # If we were overriding this method, we would call
572
        # the superclass config method here as follows:
573
        # r = Parent.config( **_params )
574
        r = {}
575
        self.setParam( r, 'setMAC', mac=mac )
576
        self.setParam( r, 'setIP', ip=ip )
577
        self.setParam( r, 'setDefaultRoute', defaultRoute=defaultRoute )
578
        # This should be examined
579
        self.cmd( 'ifconfig lo ' + lo )
580
        return r
581

    
582
    def configDefault( self, **moreParams ):
583
        "Configure with default parameters"
584
        self.params.update( moreParams )
585
        self.config( **self.params )
586

    
587
    # This is here for backward compatibility
588
    def linkTo( self, node, link=Link ):
589
        """(Deprecated) Link to another node
590
           replace with Link( node1, node2)"""
591
        return link( self, node )
592

    
593
    # Other methods
594

    
595
    def intfList( self ):
596
        "List of our interfaces sorted by port number"
597
        return [ self.intfs[ p ] for p in sorted( self.intfs.iterkeys() ) ]
598

    
599
    def intfNames( self ):
600
        "The names of our interfaces sorted by port number"
601
        return [ str( i ) for i in self.intfList() ]
602

    
603
    def __repr__( self ):
604
        "More informative string representation"
605
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
606
                              for i in self.intfList() ] ) )
607
        return '<%s %s: %s pid=%s> ' % (
608
            self.__class__.__name__, self.name, intfs, self.pid )
609

    
610
    def __str__( self ):
611
        "Abbreviated string representation"
612
        return self.name
613

    
614
    # Automatic class setup support
615

    
616
    isSetup = False
617

    
618
    @classmethod
619
    def checkSetup( cls ):
620
        "Make sure our class and superclasses are set up"
621
        while cls and not getattr( cls, 'isSetup', True ):
622
            cls.setup()
623
            cls.isSetup = True
624
            # Make pylint happy
625
            cls = getattr( type( cls ), '__base__', None )
626

    
627
    @classmethod
628
    def setup( cls ):
629
        "Make sure our class dependencies are available"
630
        pathCheck( 'mnexec', 'ifconfig', moduleName='Mininet')
631

    
632
class Host( Node ):
633
    "A host is simply a Node"
634
    pass
635

    
636
class CPULimitedHost( Host ):
637

    
638
    "CPU limited host"
639

    
640
    def __init__( self, name, sched='cfs', **kwargs ):
641
        Host.__init__( self, name, **kwargs )
642
        # Initialize class if necessary
643
        if not CPULimitedHost.inited:
644
            CPULimitedHost.init()
645
        # Create a cgroup and move shell into it
646
        self.cgroup = 'cpu,cpuacct,cpuset:/' + self.name
647
        errFail( 'cgcreate -g ' + self.cgroup )
648
        # We don't add ourselves to a cpuset because you must
649
        # specify the cpu and memory placement first
650
        errFail( 'cgclassify -g cpu,cpuacct:/%s %s' % ( self.name, self.pid ) )
651
        # BL: Setting the correct period/quota is tricky, particularly
652
        # for RT. RT allows very small quotas, but the overhead
653
        # seems to be high. CFS has a mininimum quota of 1 ms, but
654
        # still does better with larger period values.
655
        self.period_us = kwargs.get( 'period_us', 100000 )
656
        self.sched = sched
657
        if sched == 'rt':
658
            self.checkRtGroupSched()
659
            self.rtprio = 20
660

    
661
    def cgroupSet( self, param, value, resource='cpu' ):
662
        "Set a cgroup parameter and return its value"
663
        cmd = 'cgset -r %s.%s=%s /%s' % (
664
            resource, param, value, self.name )
665
        quietRun( cmd )
666
        nvalue = int( self.cgroupGet( param, resource ) )
667
        if nvalue != value:
668
            error( '*** error: cgroupSet: %s set to %s instead of %s\n'
669
                   % ( param, nvalue, value ) )
670
        return nvalue
671

    
672
    def cgroupGet( self, param, resource='cpu' ):
673
        "Return value of cgroup parameter"
674
        cmd = 'cgget -r %s.%s /%s' % (
675
            resource, param, self.name )
676
        return int( quietRun( cmd ).split()[ -1 ] )
677

    
678
    def cgroupDel( self ):
679
        "Clean up our cgroup"
680
        # info( '*** deleting cgroup', self.cgroup, '\n' )
681
        _out, _err, exitcode = errRun( 'cgdelete -r ' + self.cgroup )
682
        return exitcode == 0  # success condition
683

    
684
    def popen( self, *args, **kwargs ):
685
        """Return a Popen() object in node's namespace
686
           args: Popen() args, single list, or string
687
           kwargs: Popen() keyword args"""
688
        # Tell mnexec to execute command in our cgroup
689
        mncmd = [ 'mnexec', '-g', self.name,
690
                  '-da', str( self.pid ) ]
691
        # if our cgroup is not given any cpu time,
692
        # we cannot assign the RR Scheduler.
693
        if self.sched == 'rt':
694
            if int( self.cgroupGet( 'rt_runtime_us', 'cpu' ) ) <= 0:
695
                mncmd += [ '-r', str( self.rtprio ) ]
696
            else:
697
                debug( '*** error: not enough cpu time available for %s.' %
698
                       self.name, 'Using cfs scheduler for subprocess\n' )
699
        return Host.popen( self, *args, mncmd=mncmd, **kwargs )
700

    
701
    def cleanup( self ):
702
        "Clean up Node, then clean up our cgroup"
703
        super( CPULimitedHost, self ).cleanup()
704
        retry( retries=3, delaySecs=1, fn=self.cgroupDel )
705

    
706
    _rtGroupSched = False   # internal class var: Is CONFIG_RT_GROUP_SCHED set?
707

    
708
    @classmethod
709
    def checkRtGroupSched( cls ):
710
        "Check (Ubuntu,Debian) kernel config for CONFIG_RT_GROUP_SCHED for RT"
711
        if not cls._rtGroupSched:
712
            release = quietRun( 'uname -r' ).strip('\r\n')
713
            output = quietRun( 'grep CONFIG_RT_GROUP_SCHED /boot/config-%s' %
714
                               release )
715
            if output == '# CONFIG_RT_GROUP_SCHED is not set\n':
716
                error( '\n*** error: please enable RT_GROUP_SCHED '
717
                       'in your kernel\n' )
718
                exit( 1 )
719
            cls._rtGroupSched = True
720

    
721
    def chrt( self ):
722
        "Set RT scheduling priority"
723
        quietRun( 'chrt -p %s %s' % ( self.rtprio, self.pid ) )
724
        result = quietRun( 'chrt -p %s' % self.pid )
725
        firstline = result.split( '\n' )[ 0 ]
726
        lastword = firstline.split( ' ' )[ -1 ]
727
        if lastword != 'SCHED_RR':
728
            error( '*** error: could not assign SCHED_RR to %s\n' % self.name )
729
        return lastword
730

    
731
    def rtInfo( self, f ):
732
        "Internal method: return parameters for RT bandwidth"
733
        pstr, qstr = 'rt_period_us', 'rt_runtime_us'
734
        # RT uses wall clock time for period and quota
735
        quota = int( self.period_us * f )
736
        return pstr, qstr, self.period_us, quota
737

    
738
    def cfsInfo( self, f ):
739
        "Internal method: return parameters for CFS bandwidth"
740
        pstr, qstr = 'cfs_period_us', 'cfs_quota_us'
741
        # CFS uses wall clock time for period and CPU time for quota.
742
        quota = int( self.period_us * f * numCores() )
743
        period = self.period_us
744
        if f > 0 and quota < 1000:
745
            debug( '(cfsInfo: increasing default period) ' )
746
            quota = 1000
747
            period = int( quota / f / numCores() )
748
        # Reset to unlimited on negative quota
749
        if quota < 0:
750
            quota = -1
751
        return pstr, qstr, period, quota
752

    
753
    # BL comment:
754
    # This may not be the right API,
755
    # since it doesn't specify CPU bandwidth in "absolute"
756
    # units the way link bandwidth is specified.
757
    # We should use MIPS or SPECINT or something instead.
758
    # Alternatively, we should change from system fraction
759
    # to CPU seconds per second, essentially assuming that
760
    # all CPUs are the same.
761

    
762
    def setCPUFrac( self, f, sched=None ):
763
        """Set overall CPU fraction for this host
764
           f: CPU bandwidth limit (positive fraction, or -1 for cfs unlimited)
765
           sched: 'rt' or 'cfs'
766
           Note 'cfs' requires CONFIG_CFS_BANDWIDTH,
767
           and 'rt' requires CONFIG_RT_GROUP_SCHED"""
768
        if not sched:
769
            sched = self.sched
770
        if sched == 'rt':
771
            if not f or f < 0:
772
                raise Exception( 'Please set a positive CPU fraction'
773
                                 ' for sched=rt\n' )
774
            pstr, qstr, period, quota = self.rtInfo( f )
775
        elif sched == 'cfs':
776
            pstr, qstr, period, quota = self.cfsInfo( f )
777
        else:
778
            return
779
        # Set cgroup's period and quota
780
        setPeriod = self.cgroupSet( pstr, period )
781
        setQuota = self.cgroupSet( qstr, quota )
782
        if sched == 'rt':
783
            # Set RT priority if necessary
784
            sched = self.chrt()
785
        info( '(%s %d/%dus) ' % ( sched, setQuota, setPeriod ) )
786

    
787
    def setCPUs( self, cores, mems=0 ):
788
        "Specify (real) cores that our cgroup can run on"
789
        if not cores:
790
            return
791
        if isinstance( cores, list ):
792
            cores = ','.join( [ str( c ) for c in cores ] )
793
        self.cgroupSet( resource='cpuset', param='cpus',
794
                        value=cores )
795
        # Memory placement is probably not relevant, but we
796
        # must specify it anyway
797
        self.cgroupSet( resource='cpuset', param='mems',
798
                        value=mems)
799
        # We have to do this here after we've specified
800
        # cpus and mems
801
        errFail( 'cgclassify -g cpuset:/%s %s' % (
802
                 self.name, self.pid ) )
803

    
804
    def config( self, cpu=-1, cores=None, **params ):
805
        """cpu: desired overall system CPU fraction
806
           cores: (real) core(s) this host can run on
807
           params: parameters for Node.config()"""
808
        r = Node.config( self, **params )
809
        # Was considering cpu={'cpu': cpu , 'sched': sched}, but
810
        # that seems redundant
811
        self.setParam( r, 'setCPUFrac', cpu=cpu )
812
        self.setParam( r, 'setCPUs', cores=cores )
813
        return r
814

    
815
    inited = False
816

    
817
    @classmethod
818
    def init( cls ):
819
        "Initialization for CPULimitedHost class"
820
        mountCgroups()
821
        cls.inited = True
822

    
823

    
824
# Some important things to note:
825
#
826
# The "IP" address which setIP() assigns to the switch is not
827
# an "IP address for the switch" in the sense of IP routing.
828
# Rather, it is the IP address for the control interface,
829
# on the control network, and it is only relevant to the
830
# controller. If you are running in the root namespace
831
# (which is the only way to run OVS at the moment), the
832
# control interface is the loopback interface, and you
833
# normally never want to change its IP address!
834
#
835
# In general, you NEVER want to attempt to use Linux's
836
# network stack (i.e. ifconfig) to "assign" an IP address or
837
# MAC address to a switch data port. Instead, you "assign"
838
# the IP and MAC addresses in the controller by specifying
839
# packets that you want to receive or send. The "MAC" address
840
# reported by ifconfig for a switch data port is essentially
841
# meaningless. It is important to understand this if you
842
# want to create a functional router using OpenFlow.
843

    
844
class Switch( Node ):
845
    """A Switch is a Node that is running (or has execed?)
846
       an OpenFlow switch."""
847

    
848
    portBase = 1  # Switches start with port 1 in OpenFlow
849
    dpidLen = 16  # digits in dpid passed to switch
850

    
851
    def __init__( self, name, dpid=None, opts='', listenPort=None, **params):
852
        """dpid: dpid hex string (or None to derive from name, e.g. s1 -> 1)
853
           opts: additional switch options
854
           listenPort: port to listen on for dpctl connections"""
855
        Node.__init__( self, name, **params )
856
        self.dpid = self.defaultDpid( dpid )
857
        self.opts = opts
858
        self.listenPort = listenPort
859
        if not self.inNamespace:
860
            self.controlIntf = Intf( 'lo', self, port=0 )
861

    
862
    def defaultDpid( self, dpid=None ):
863
        "Return correctly formatted dpid from dpid or switch name (s1 -> 1)"
864
        if dpid:
865
            # Remove any colons and make sure it's a good hex number
866
            dpid = dpid.translate( None, ':' )
867
            assert len( dpid ) <= self.dpidLen and int( dpid, 16 ) >= 0
868
        else:
869
            # Use hex of the first number in the switch name
870
            nums = re.findall( r'\d+', self.name )
871
            if nums:
872
                dpid = hex( int( nums[ 0 ] ) )[ 2: ]
873
            else:
874
                raise Exception( 'Unable to derive default datapath ID - '
875
                                 'please either specify a dpid or use a '
876
                                 'canonical switch name such as s23.' )
877
        return '0' * ( self.dpidLen - len( dpid ) ) + dpid
878

    
879
    def defaultIntf( self ):
880
        "Return control interface"
881
        if self.controlIntf:
882
            return self.controlIntf
883
        else:
884
            return Node.defaultIntf( self )
885

    
886
    def sendCmd( self, *cmd, **kwargs ):
887
        """Send command to Node.
888
           cmd: string"""
889
        kwargs.setdefault( 'printPid', False )
890
        if not self.execed:
891
            return Node.sendCmd( self, *cmd, **kwargs )
892
        else:
893
            error( '*** Error: %s has execed and cannot accept commands' %
894
                   self.name )
895

    
896
    def connected( self ):
897
        "Is the switch connected to a controller? (override this method)"
898
        # Assume that we are connected by default to whatever we need to
899
        # be connected to. This should be overridden by any OpenFlow
900
        # switch, but not by a standalone bridge.
901
        debug( 'Assuming', repr( self ), 'is connected to a controller\n' )
902
        return True
903

    
904
    def stop( self, deleteIntfs=True ):
905
        """Stop switch
906
           deleteIntfs: delete interfaces? (True)"""
907
        if deleteIntfs:
908
            self.deleteIntfs()
909

    
910
    def __repr__( self ):
911
        "More informative string representation"
912
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
913
                              for i in self.intfList() ] ) )
914
        return '<%s %s: %s pid=%s> ' % (
915
            self.__class__.__name__, self.name, intfs, self.pid )
916

    
917

    
918
class UserSwitch( Switch ):
919
    "User-space switch."
920

    
921
    dpidLen = 12
922

    
923
    def __init__( self, name, dpopts='--no-slicing', **kwargs ):
924
        """Init.
925
           name: name for the switch
926
           dpopts: additional arguments to ofdatapath (--no-slicing)"""
927
        Switch.__init__( self, name, **kwargs )
928
        pathCheck( 'ofdatapath', 'ofprotocol',
929
                   moduleName='the OpenFlow reference user switch' +
930
                              '(openflow.org)' )
931
        if self.listenPort:
932
            self.opts += ' --listen=ptcp:%i ' % self.listenPort
933
        else:
934
            self.opts += ' --listen=punix:/tmp/%s.listen' % self.name
935
        self.dpopts = dpopts
936

    
937
    @classmethod
938
    def setup( cls ):
939
        "Ensure any dependencies are loaded; if not, try to load them."
940
        if not os.path.exists( '/dev/net/tun' ):
941
            moduleDeps( add=TUN )
942

    
943
    def dpctl( self, *args ):
944
        "Run dpctl command"
945
        listenAddr = None
946
        if not self.listenPort:
947
            listenAddr = 'unix:/tmp/%s.listen' % self.name
948
        else:
949
            listenAddr = 'tcp:127.0.0.1:%i' % self.listenPort
950
        return self.cmd( 'dpctl ' + ' '.join( args ) +
951
                         ' ' + listenAddr )
952

    
953
    def connected( self ):
954
        "Is the switch connected to a controller?"
955
        status = self.dpctl( 'status' )
956
        return ( 'remote.is-connected=true' in status and
957
                 'local.is-connected=true' in status )
958

    
959
    @staticmethod
960
    def TCReapply( intf ):
961
        """Unfortunately user switch and Mininet are fighting
962
           over tc queuing disciplines. To resolve the conflict,
963
           we re-create the user switch's configuration, but as a
964
           leaf of the TCIntf-created configuration."""
965
        if isinstance( intf, TCIntf ):
966
            ifspeed = 10000000000  # 10 Gbps
967
            minspeed = ifspeed * 0.001
968

    
969
            res = intf.config( **intf.params )
970

    
971
            if res is None:  # link may not have TC parameters
972
                return
973

    
974
            # Re-add qdisc, root, and default classes user switch created, but
975
            # with new parent, as setup by Mininet's TCIntf
976
            parent = res['parent']
977
            intf.tc( "%s qdisc add dev %s " + parent +
978
                     " handle 1: htb default 0xfffe" )
979
            intf.tc( "%s class add dev %s classid 1:0xffff parent 1: htb rate "
980
                     + str(ifspeed) )
981
            intf.tc( "%s class add dev %s classid 1:0xfffe parent 1:0xffff " +
982
                     "htb rate " + str(minspeed) + " ceil " + str(ifspeed) )
983

    
984
    def start( self, controllers ):
985
        """Start OpenFlow reference user datapath.
986
           Log to /tmp/sN-{ofd,ofp}.log.
987
           controllers: list of controller objects"""
988
        # Add controllers
989
        clist = ','.join( [ 'tcp:%s:%d' % ( c.IP(), c.port )
990
                            for c in controllers ] )
991
        ofdlog = '/tmp/' + self.name + '-ofd.log'
992
        ofplog = '/tmp/' + self.name + '-ofp.log'
993
        intfs = [ str( i ) for i in self.intfList() if not i.IP() ]
994
        self.cmd( 'ofdatapath -i ' + ','.join( intfs ) +
995
                  ' punix:/tmp/' + self.name + ' -d %s ' % self.dpid +
996
                  self.dpopts +
997
                  ' 1> ' + ofdlog + ' 2> ' + ofdlog + ' &' )
998
        self.cmd( 'ofprotocol unix:/tmp/' + self.name +
999
                  ' ' + clist +
1000
                  ' --fail=closed ' + self.opts +
1001
                  ' 1> ' + ofplog + ' 2>' + ofplog + ' &' )
1002
        if "no-slicing" not in self.dpopts:
1003
            # Only TCReapply if slicing is enable
1004
            sleep(1)  # Allow ofdatapath to start before re-arranging qdisc's
1005
            for intf in self.intfList():
1006
                if not intf.IP():
1007
                    self.TCReapply( intf )
1008

    
1009
    def stop( self, deleteIntfs=True ):
1010
        """Stop OpenFlow reference user datapath.
1011
           deleteIntfs: delete interfaces? (True)"""
1012
        self.cmd( 'kill %ofdatapath' )
1013
        self.cmd( 'kill %ofprotocol' )
1014
        super( UserSwitch, self ).stop( deleteIntfs )
1015

    
1016

    
1017
class OVSSwitch( Switch ):
1018
    "Open vSwitch switch. Depends on ovs-vsctl."
1019

    
1020
    def __init__( self, name, failMode='secure', datapath='kernel',
1021
                  inband=False, protocols=None,
1022
                  reconnectms=1000, stp=False, batch=False, **params ):
1023
        """name: name for switch
1024
           failMode: controller loss behavior (secure|open)
1025
           datapath: userspace or kernel mode (kernel|user)
1026
           inband: use in-band control (False)
1027
           protocols: use specific OpenFlow version(s) (e.g. OpenFlow13)
1028
                      Unspecified (or old OVS version) uses OVS default
1029
           reconnectms: max reconnect timeout in ms (0/None for default)
1030
           stp: enable STP (False, requires failMode=standalone)
1031
           batch: enable batch startup (False)"""
1032
        Switch.__init__( self, name, **params )
1033
        self.failMode = failMode
1034
        self.datapath = datapath
1035
        self.inband = inband
1036
        self.protocols = protocols
1037
        self.reconnectms = reconnectms
1038
        self.stp = stp
1039
        self._uuids = []  # controller UUIDs
1040
        self.batch = batch
1041
        self.commands = []  # saved commands for batch startup
1042

    
1043
    @classmethod
1044
    def setup( cls ):
1045
        "Make sure Open vSwitch is installed and working"
1046
        pathCheck( 'ovs-vsctl',
1047
                   moduleName='Open vSwitch (openvswitch.org)')
1048
        # This should no longer be needed, and it breaks
1049
        # with OVS 1.7 which has renamed the kernel module:
1050
        #  moduleDeps( subtract=OF_KMOD, add=OVS_KMOD )
1051
        out, err, exitcode = errRun( 'ovs-vsctl -t 1 show' )
1052
        if exitcode:
1053
            error( out + err +
1054
                   'ovs-vsctl exited with code %d\n' % exitcode +
1055
                   '*** Error connecting to ovs-db with ovs-vsctl\n'
1056
                   'Make sure that Open vSwitch is installed, '
1057
                   'that ovsdb-server is running, and that\n'
1058
                   '"ovs-vsctl show" works correctly.\n'
1059
                   'You may wish to try '
1060
                   '"service openvswitch-switch start".\n' )
1061
            exit( 1 )
1062
        version = quietRun( 'ovs-vsctl --version' )
1063
        cls.OVSVersion = findall( r'\d+\.\d+', version )[ 0 ]
1064

    
1065
    @classmethod
1066
    def isOldOVS( cls ):
1067
        "Is OVS ersion < 1.10?"
1068
        return ( StrictVersion( cls.OVSVersion ) <
1069
                 StrictVersion( '1.10' ) )
1070

    
1071
    def dpctl( self, *args ):
1072
        "Run ovs-ofctl command"
1073
        return self.cmd( 'ovs-ofctl', args[ 0 ], self, *args[ 1: ] )
1074

    
1075
    def vsctl( self, *args, **kwargs ):
1076
        "Run ovs-vsctl command (or queue for later execution)"
1077
        if self.batch:
1078
            cmd = ' '.join( str( arg ).strip() for arg in args )
1079
            self.commands.append( cmd )
1080
        else:
1081
            return self.cmd( 'ovs-vsctl', *args, **kwargs )
1082

    
1083
    @staticmethod
1084
    def TCReapply( intf ):
1085
        """Unfortunately OVS and Mininet are fighting
1086
           over tc queuing disciplines. As a quick hack/
1087
           workaround, we clear OVS's and reapply our own."""
1088
        if isinstance( intf, TCIntf ):
1089
            intf.config( **intf.params )
1090

    
1091
    def attach( self, intf ):
1092
        "Connect a data port"
1093
        self.vsctl( 'add-port', self, intf )
1094
        self.cmd( 'ifconfig', intf, 'up' )
1095
        self.TCReapply( intf )
1096

    
1097
    def detach( self, intf ):
1098
        "Disconnect a data port"
1099
        self.vsctl( 'del-port', self, intf )
1100

    
1101
    def controllerUUIDs( self, update=False ):
1102
        """Return ovsdb UUIDs for our controllers
1103
           update: update cached value"""
1104
        if not self._uuids or update:
1105
            controllers = self.cmd( 'ovs-vsctl -- get Bridge', self,
1106
                                    'Controller' ).strip()
1107
            if controllers.startswith( '[' ) and controllers.endswith( ']' ):
1108
                controllers = controllers[ 1 : -1 ]
1109
                if controllers:
1110
                    self._uuids = [ c.strip()
1111
                                    for c in controllers.split( ',' ) ]
1112
        return self._uuids
1113

    
1114
    def connected( self ):
1115
        "Are we connected to at least one of our controllers?"
1116
        for uuid in self.controllerUUIDs():
1117
            if 'true' in self.vsctl( '-- get Controller',
1118
                                     uuid, 'is_connected' ):
1119
                return True
1120
        return self.failMode == 'standalone'
1121

    
1122
    def intfOpts( self, intf ):
1123
        "Return OVS interface options for intf"
1124
        opts = ''
1125
        if not self.isOldOVS():
1126
            # ofport_request is not supported on old OVS
1127
            opts += ' ofport_request=%s' % self.ports[ intf ]
1128
            # Patch ports don't work well with old OVS
1129
            if isinstance( intf, OVSIntf ):
1130
                intf1, intf2 = intf.link.intf1, intf.link.intf2
1131
                peer = intf1 if intf1 != intf else intf2
1132
                opts += ' type=patch options:peer=%s' % peer
1133
        return '' if not opts else ' -- set Interface %s' % intf + opts
1134

    
1135
    def bridgeOpts( self ):
1136
        "Return OVS bridge options"
1137
        opts = ( ' other_config:datapath-id=%s' % self.dpid +
1138
                 ' fail_mode=%s' % self.failMode )
1139
        if not self.inband:
1140
            opts += ' other-config:disable-in-band=true'
1141
        if self.datapath == 'user':
1142
            opts += ' datapath_type=netdev'
1143
        if self.protocols and not self.isOldOVS():
1144
            opts += ' protocols=%s' % self.protocols
1145
        if self.stp and self.failMode == 'standalone':
1146
            opts += ' stp_enable=true'
1147
        return opts
1148

    
1149
    def start( self, controllers ):
1150
        "Start up a new OVS OpenFlow switch using ovs-vsctl"
1151
        if self.inNamespace:
1152
            raise Exception(
1153
                'OVS kernel switch does not work in a namespace' )
1154
        int( self.dpid, 16 )  # DPID must be a hex string
1155
        # Command to add interfaces
1156
        intfs = ''.join( ' -- add-port %s %s' % ( self, intf ) +
1157
                         self.intfOpts( intf )
1158
                         for intf in self.intfList()
1159
                         if self.ports[ intf ] and not intf.IP() )
1160
        # Command to create controller entries
1161
        clist = [ ( self.name + c.name, '%s:%s:%d' %
1162
                  ( c.protocol, c.IP(), c.port ) )
1163
                  for c in controllers ]
1164
        if self.listenPort:
1165
            clist.append( ( self.name + '-listen',
1166
                            'ptcp:%s' % self.listenPort ) )
1167
        ccmd = '-- --id=@%s create Controller target=\\"%s\\"'
1168
        if self.reconnectms:
1169
            ccmd += ' max_backoff=%d' % self.reconnectms
1170
        cargs = ' '.join( ccmd % ( name, target )
1171
                          for name, target in clist )
1172
        # Controller ID list
1173
        cids = ','.join( '@%s' % name for name, _target in clist )
1174
        # Try to delete any existing bridges with the same name
1175
        if not self.isOldOVS():
1176
            cargs += ' -- --if-exists del-br %s' % self
1177
        # One ovs-vsctl command to rule them all!
1178
        self.vsctl( cargs +
1179
                    ' -- add-br %s' % self +
1180
                    ' -- set bridge %s controller=[%s]' % ( self, cids  ) +
1181
                    self.bridgeOpts() +
1182
                    intfs )
1183
        # If necessary, restore TC config overwritten by OVS
1184
        if not self.batch:
1185
            for intf in self.intfList():
1186
                self.TCReapply( intf )
1187

    
1188
    # This should be ~ int( quietRun( 'getconf ARG_MAX' ) ),
1189
    # but the real limit seems to be much lower
1190
    argmax = 128000
1191

    
1192
    @classmethod
1193
    def batchStartup( cls, switches, run=errRun ):
1194
        """Batch startup for OVS
1195
           switches: switches to start up
1196
           run: function to run commands (errRun)"""
1197
        info( '...' )
1198
        cmds = 'ovs-vsctl'
1199
        for switch in switches:
1200
            if switch.isOldOVS():
1201
                # Ideally we'd optimize this also
1202
                run( 'ovs-vsctl del-br %s' % switch )
1203
            for cmd in switch.commands:
1204
                cmd = cmd.strip()
1205
                # Don't exceed ARG_MAX
1206
                if len( cmds ) + len( cmd ) >= cls.argmax:
1207
                    run( cmds, shell=True )
1208
                    cmds = 'ovs-vsctl'
1209
                cmds += ' ' + cmd
1210
                switch.cmds = []
1211
                switch.batch = False
1212
        if cmds:
1213
            run( cmds, shell=True )
1214
        # Reapply link config if necessary...
1215
        for switch in switches:
1216
            for intf in switch.intfs.itervalues():
1217
                if isinstance( intf, TCIntf ):
1218
                    intf.config( **intf.params )
1219
        return switches
1220

    
1221
    def stop( self, deleteIntfs=True ):
1222
        """Terminate OVS switch.
1223
           deleteIntfs: delete interfaces? (True)"""
1224
        self.cmd( 'ovs-vsctl del-br', self )
1225
        if self.datapath == 'user':
1226
            self.cmd( 'ip link del', self )
1227
        super( OVSSwitch, self ).stop( deleteIntfs )
1228

    
1229
    @classmethod
1230
    def batchShutdown( cls, switches, run=errRun ):
1231
        "Shut down a list of OVS switches"
1232
        delcmd = 'del-br %s'
1233
        if switches and not switches[ 0 ].isOldOVS():
1234
            delcmd = '--if-exists ' + delcmd
1235
        # First, delete them all from ovsdb
1236
        run( 'ovs-vsctl ' +
1237
             ' -- '.join( delcmd % s for s in switches ) )
1238
        # Next, shut down all of the processes
1239
        pids = ' '.join( str( switch.pid ) for switch in switches )
1240
        run( 'kill -HUP ' + pids )
1241
        for switch in switches:
1242
            switch.shell = None
1243
        return switches
1244

    
1245

    
1246
OVSKernelSwitch = OVSSwitch
1247

    
1248

    
1249
class OVSBridge( OVSSwitch ):
1250
    "OVSBridge is an OVSSwitch in standalone/bridge mode"
1251

    
1252
    def __init__( self, *args, **kwargs ):
1253
        """stp: enable Spanning Tree Protocol (False)
1254
           see OVSSwitch for other options"""
1255
        kwargs.update( failMode='standalone' )
1256
        OVSSwitch.__init__( self, *args, **kwargs )
1257

    
1258
    def start( self, controllers ):
1259
        "Start bridge, ignoring controllers argument"
1260
        OVSSwitch.start( self, controllers=[] )
1261

    
1262
    def connected( self ):
1263
        "Are we forwarding yet?"
1264
        if self.stp:
1265
            status = self.dpctl( 'show' )
1266
            return 'STP_FORWARD' in status and not 'STP_LEARN' in status
1267
        else:
1268
            return True
1269

    
1270

    
1271
class IVSSwitch( Switch ):
1272
    "Indigo Virtual Switch"
1273

    
1274
    def __init__( self, name, verbose=False, **kwargs ):
1275
        Switch.__init__( self, name, **kwargs )
1276
        self.verbose = verbose
1277

    
1278
    @classmethod
1279
    def setup( cls ):
1280
        "Make sure IVS is installed"
1281
        pathCheck( 'ivs-ctl', 'ivs',
1282
                   moduleName="Indigo Virtual Switch (projectfloodlight.org)" )
1283
        out, err, exitcode = errRun( 'ivs-ctl show' )
1284
        if exitcode:
1285
            error( out + err +
1286
                   'ivs-ctl exited with code %d\n' % exitcode +
1287
                   '*** The openvswitch kernel module might '
1288
                   'not be loaded. Try modprobe openvswitch.\n' )
1289
            exit( 1 )
1290

    
1291
    @classmethod
1292
    def batchShutdown( cls, switches ):
1293
        "Kill each IVS switch, to be waited on later in stop()"
1294
        for switch in switches:
1295
            switch.cmd( 'kill %ivs' )
1296
        return switches
1297

    
1298
    def start( self, controllers ):
1299
        "Start up a new IVS switch"
1300
        args = ['ivs']
1301
        args.extend( ['--name', self.name] )
1302
        args.extend( ['--dpid', self.dpid] )
1303
        if self.verbose:
1304
            args.extend( ['--verbose'] )
1305
        for intf in self.intfs.values():
1306
            if not intf.IP():
1307
                args.extend( ['-i', intf.name] )
1308
        for c in controllers:
1309
            args.extend( ['-c', '%s:%d' % (c.IP(), c.port)] )
1310
        if self.listenPort:
1311
            args.extend( ['--listen', '127.0.0.1:%i' % self.listenPort] )
1312
        args.append( self.opts )
1313

    
1314
        logfile = '/tmp/ivs.%s.log' % self.name
1315

    
1316
        self.cmd( ' '.join(args) + ' >' + logfile + ' 2>&1 </dev/null &' )
1317

    
1318
    def stop( self, deleteIntfs=True ):
1319
        """Terminate IVS switch.
1320
           deleteIntfs: delete interfaces? (True)"""
1321
        self.cmd( 'kill %ivs' )
1322
        self.cmd( 'wait' )
1323
        super( IVSSwitch, self ).stop( deleteIntfs )
1324

    
1325
    def attach( self, intf ):
1326
        "Connect a data port"
1327
        self.cmd( 'ivs-ctl', 'add-port', '--datapath', self.name, intf )
1328

    
1329
    def detach( self, intf ):
1330
        "Disconnect a data port"
1331
        self.cmd( 'ivs-ctl', 'del-port', '--datapath', self.name, intf )
1332

    
1333
    def dpctl( self, *args ):
1334
        "Run dpctl command"
1335
        if not self.listenPort:
1336
            return "can't run dpctl without passive listening port"
1337
        return self.cmd( 'ovs-ofctl ' + ' '.join( args ) +
1338
                         ' tcp:127.0.0.1:%i' % self.listenPort )
1339

    
1340

    
1341
class Controller( Node ):
1342
    """A Controller is a Node that is running (or has execed?) an
1343
       OpenFlow controller."""
1344

    
1345
    def __init__( self, name, inNamespace=False, command='controller',
1346
                  cargs='-v ptcp:%d', cdir=None, ip="127.0.0.1",
1347
                  port=6633, protocol='tcp', **params ):
1348
        self.command = command
1349
        self.cargs = cargs
1350
        self.cdir = cdir
1351
        # Accept 'ip:port' syntax as shorthand
1352
        if ':' in ip:
1353
            ip, port = ip.split( ':' )
1354
            port = int( port )
1355
        self.ip = ip
1356
        self.port = port
1357
        self.protocol = protocol
1358
        Node.__init__( self, name, inNamespace=inNamespace,
1359
                       ip=ip, **params  )
1360
        self.checkListening()
1361

    
1362
    def checkListening( self ):
1363
        "Make sure no controllers are running on our port"
1364
        # Verify that Telnet is installed first:
1365
        out, _err, returnCode = errRun( "which telnet" )
1366
        if 'telnet' not in out or returnCode != 0:
1367
            raise Exception( "Error running telnet to check for listening "
1368
                             "controllers; please check that it is "
1369
                             "installed." )
1370
        listening = self.cmd( "echo A | telnet -e A %s %d" %
1371
                              ( self.ip, self.port ) )
1372
        if 'Connected' in listening:
1373
            servers = self.cmd( 'netstat -natp' ).split( '\n' )
1374
            pstr = ':%d ' % self.port
1375
            clist = servers[ 0:1 ] + [ s for s in servers if pstr in s ]
1376
            raise Exception( "Please shut down the controller which is"
1377
                             " running on port %d:\n" % self.port +
1378
                             '\n'.join( clist ) )
1379

    
1380
    def start( self ):
1381
        """Start <controller> <args> on controller.
1382
           Log to /tmp/cN.log"""
1383
        pathCheck( self.command )
1384
        cout = '/tmp/' + self.name + '.log'
1385
        if self.cdir is not None:
1386
            self.cmd( 'cd ' + self.cdir )
1387
        self.cmd( self.command + ' ' + self.cargs % self.port +
1388
                  ' 1>' + cout + ' 2>' + cout + ' &' )
1389
        self.execed = False
1390

    
1391
    def stop( self, *args, **kwargs ):
1392
        "Stop controller."
1393
        self.cmd( 'kill %' + self.command )
1394
        self.cmd( 'wait %' + self.command )
1395
        super( Controller, self ).stop( *args, **kwargs )
1396

    
1397
    def IP( self, intf=None ):
1398
        "Return IP address of the Controller"
1399
        if self.intfs:
1400
            ip = Node.IP( self, intf )
1401
        else:
1402
            ip = self.ip
1403
        return ip
1404

    
1405
    def __repr__( self ):
1406
        "More informative string representation"
1407
        return '<%s %s: %s:%s pid=%s> ' % (
1408
            self.__class__.__name__, self.name,
1409
            self.IP(), self.port, self.pid )
1410

    
1411
    @classmethod
1412
    def isAvailable( cls ):
1413
        "Is controller available?"
1414
        return quietRun( 'which controller' )
1415

    
1416

    
1417
class OVSController( Controller ):
1418
    "Open vSwitch controller"
1419
    def __init__( self, name, command='ovs-controller', **kwargs ):
1420
        if quietRun( 'which test-controller' ):
1421
            command = 'test-controller'
1422
        Controller.__init__( self, name, command=command, **kwargs )
1423

    
1424
    @classmethod
1425
    def isAvailable( cls ):
1426
        return ( quietRun( 'which ovs-controller' ) or
1427
                 quietRun( 'which test-controller' ) )
1428

    
1429
class NOX( Controller ):
1430
    "Controller to run a NOX application."
1431

    
1432
    def __init__( self, name, *noxArgs, **kwargs ):
1433
        """Init.
1434
           name: name to give controller
1435
           noxArgs: arguments (strings) to pass to NOX"""
1436
        if not noxArgs:
1437
            warn( 'warning: no NOX modules specified; '
1438
                  'running packetdump only\n' )
1439
            noxArgs = [ 'packetdump' ]
1440
        elif type( noxArgs ) not in ( list, tuple ):
1441
            noxArgs = [ noxArgs ]
1442

    
1443
        if 'NOX_CORE_DIR' not in os.environ:
1444
            exit( 'exiting; please set missing NOX_CORE_DIR env var' )
1445
        noxCoreDir = os.environ[ 'NOX_CORE_DIR' ]
1446

    
1447
        Controller.__init__( self, name,
1448
                             command=noxCoreDir + '/nox_core',
1449
                             cargs='--libdir=/usr/local/lib -v -i ptcp:%s ' +
1450
                             ' '.join( noxArgs ),
1451
                             cdir=noxCoreDir,
1452
                             **kwargs )
1453

    
1454
class Ryu( Controller ):
1455
    "Controller to run Ryu application"
1456
    def __init__( self, name, *ryuArgs, **kwargs ):
1457
        """Init.
1458
        name: name to give controller.
1459
        ryuArgs: arguments and modules to pass to Ryu"""
1460
        homeDir = quietRun( 'printenv HOME' ).strip( '\r\n' )
1461
        ryuCoreDir = '%s/ryu/ryu/app/' % homeDir
1462
        if not ryuArgs:
1463
            warn( 'warning: no Ryu modules specified; '
1464
                  'running simple_switch only\n' )
1465
            ryuArgs = [ ryuCoreDir + 'simple_switch.py' ]
1466
        elif type( ryuArgs ) not in ( list, tuple ):
1467
            ryuArgs = [ ryuArgs ]
1468

    
1469
        Controller.__init__( self, name,
1470
                             command='ryu-manager',
1471
                             cargs='--ofp-tcp-listen-port %s ' +
1472
                             ' '.join( ryuArgs ),
1473
                             cdir=ryuCoreDir,
1474
                             **kwargs )
1475

    
1476

    
1477
class RemoteController( Controller ):
1478
    "Controller running outside of Mininet's control."
1479

    
1480
    def __init__( self, name, ip='127.0.0.1',
1481
                  port=6633, **kwargs):
1482
        """Init.
1483
           name: name to give controller
1484
           ip: the IP address where the remote controller is
1485
           listening
1486
           port: the port where the remote controller is listening"""
1487
        Controller.__init__( self, name, ip=ip, port=port, **kwargs )
1488

    
1489
    def start( self ):
1490
        "Overridden to do nothing."
1491
        return
1492

    
1493
    def stop( self ):
1494
        "Overridden to do nothing."
1495
        return
1496

    
1497
    def checkListening( self ):
1498
        "Warn if remote controller is not accessible"
1499
        listening = self.cmd( "echo A | telnet -e A %s %d" %
1500
                              ( self.ip, self.port ) )
1501
        if 'Connected' not in listening:
1502
            warn( "Unable to contact the remote controller"
1503
                  " at %s:%d\n" % ( self.ip, self.port ) )
1504

    
1505

    
1506
DefaultControllers = ( Controller, OVSController )
1507

    
1508
def findController( controllers=DefaultControllers ):
1509
    "Return first available controller from list, if any"
1510
    for controller in controllers:
1511
        if controller.isAvailable():
1512
            return controller
1513

    
1514
def DefaultController( name, controllers=DefaultControllers, **kwargs ):
1515
    "Find a controller that is available and instantiate it"
1516
    controller = findController( controllers )
1517
    if not controller:
1518
        raise Exception( 'Could not find a default OpenFlow controller' )
1519
    return controller( name, **kwargs )
1520

    
1521
def NullController( *_args, **_kwargs ):
1522
    "Nonexistent controller - simply returns None"
1523
    return None