Statistics
| Branch: | Tag: | Revision:

mininet / mininet / node.py @ master

History | View | Annotate | Download (58.8 KB)

1
"""
2
Node objects for Mininet.
3

4
Nodes provide a simple abstraction for interacting with hosts, switches
5
and controllers. Local nodes are simply one or more processes on the local
6
machine.
7

8
Node: superclass for all (primarily local) network nodes.
9

10
Host: a virtual host. By default, a host is simply a shell; commands
11
    may be sent using Cmd (which waits for output), or using sendCmd(),
12
    which returns immediately, allowing subsequent monitoring using
13
    monitor(). Examples of how to run experiments using this
14
    functionality are provided in the examples/ directory. By default,
15
    hosts share the root file system, but they may also specify private
16
    directories.
17

18
CPULimitedHost: a virtual host whose CPU bandwidth is limited by
19
    RT or CFS bandwidth limiting.
20

21
Switch: superclass for switch nodes.
22

23
UserSwitch: a switch using the user-space switch from the OpenFlow
24
    reference implementation.
25

26
OVSSwitch: a switch using the Open vSwitch OpenFlow-compatible switch
27
    implementation (openvswitch.org).
28

29
OVSBridge: an Ethernet bridge implemented using Open vSwitch.
30
    Supports STP.
31

32
IVSSwitch: OpenFlow switch using the Indigo Virtual Switch.
33

34
Controller: superclass for OpenFlow controllers. The default controller
35
    is controller(8) from the reference implementation.
36

37
OVSController: The test controller from Open vSwitch.
38

39
NOXController: a controller node using NOX (noxrepo.org).
40

41
Ryu: The Ryu controller (https://osrg.github.io/ryu/)
42

43
RemoteController: a remote controller node, which may use any
44
    arbitrary OpenFlow-compatible controller, and which is not
45
    created or managed by Mininet.
46

47
Future enhancements:
48

49
- Possibly make Node, Switch and Controller more abstract so that
50
  they can be used for both local and remote nodes
51

52
- Create proxy objects for remote nodes (Mininet: Cluster Edition)
53
"""
54

    
55
import os
56
import pty
57
import re
58
import signal
59
import select
60
from subprocess import Popen, PIPE
61
from time import sleep
62

    
63
from mininet.log import info, error, warn, debug
64
from mininet.util import ( quietRun, errRun, errFail, moveIntf, isShellBuiltin,
65
                           numCores, retry, mountCgroups )
66
from mininet.moduledeps import moduleDeps, pathCheck, TUN
67
from mininet.link import Link, Intf, TCIntf, OVSIntf
68
from re import findall
69
from distutils.version import StrictVersion
70

    
71
class Node( object ):
72
    """A virtual network node is simply a shell in a network namespace.
73
       We communicate with it using pipes."""
74

    
75
    portBase = 0  # Nodes always start with eth0/port0, even in OF 1.0
76

    
77
    def __init__( self, name, inNamespace=True, **params ):
78
        """name: name of node
79
           inNamespace: in network namespace?
80
           privateDirs: list of private directory strings or tuples
81
           params: Node parameters (see config() for details)"""
82

    
83
        # Make sure class actually works
84
        self.checkSetup()
85

    
86
        self.name = params.get( 'name', name )
87
        self.privateDirs = params.get( 'privateDirs', [] )
88
        self.inNamespace = params.get( 'inNamespace', inNamespace )
89

    
90
        # Stash configuration parameters for future reference
91
        self.params = params
92

    
93
        self.intfs = {}  # dict of port numbers to interfaces
94
        self.ports = {}  # dict of interfaces to port numbers
95
                         # replace with Port objects, eventually ?
96
        self.nameToIntf = {}  # dict of interface names to Intfs
97

    
98
        # Make pylint happy
99
        ( self.shell, self.execed, self.pid, self.stdin, self.stdout,
100
            self.lastPid, self.lastCmd, self.pollOut ) = (
101
                None, None, None, None, None, None, None, None )
102
        self.waiting = False
103
        self.readbuf = ''
104

    
105
        # Start command interpreter shell
106
        self.startShell()
107
        self.mountPrivateDirs()
108

    
109
    # File descriptor to node mapping support
110
    # Class variables and methods
111

    
112
    inToNode = {}  # mapping of input fds to nodes
113
    outToNode = {}  # mapping of output fds to nodes
114

    
115
    @classmethod
116
    def fdToNode( cls, fd ):
117
        """Return node corresponding to given file descriptor.
118
           fd: file descriptor
119
           returns: node"""
120
        node = cls.outToNode.get( fd )
121
        return node or cls.inToNode.get( fd )
122

    
123
    # Command support via shell process in namespace
124
    def startShell( self, mnopts=None ):
125
        "Start a shell process for running commands"
126
        if self.shell:
127
            error( "%s: shell is already running\n" % self.name )
128
            return
129
        # mnexec: (c)lose descriptors, (d)etach from tty,
130
        # (p)rint pid, and run in (n)amespace
131
        opts = '-cd' if mnopts is None else mnopts
132
        if self.inNamespace:
133
            opts += 'n'
134
        # bash -i: force interactive
135
        # -s: pass $* to shell, and make process easy to find in ps
136
        # prompt is set to sentinel chr( 127 )
137
        cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ),
138
                'bash', '--norc', '-is', 'mininet:' + self.name ]
139
        # Spawn a shell subprocess in a pseudo-tty, to disable buffering
140
        # in the subprocess and insulate it from signals (e.g. SIGINT)
141
        # received by the parent
142
        master, slave = pty.openpty()
143
        self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
144
                                  close_fds=False )
145
        self.stdin = os.fdopen( master, 'rw' )
146
        self.stdout = self.stdin
147
        self.pid = self.shell.pid
148
        self.pollOut = select.poll()
149
        self.pollOut.register( self.stdout )
150
        # Maintain mapping between file descriptors and nodes
151
        # This is useful for monitoring multiple nodes
152
        # using select.poll()
153
        self.outToNode[ self.stdout.fileno() ] = self
154
        self.inToNode[ self.stdin.fileno() ] = self
155
        self.execed = False
156
        self.lastCmd = None
157
        self.lastPid = None
158
        self.readbuf = ''
159
        # Wait for prompt
160
        while True:
161
            data = self.read( 1024 )
162
            if data[ -1 ] == chr( 127 ):
163
                break
164
            self.pollOut.poll()
165
        self.waiting = False
166
        # +m: disable job control notification
167
        self.cmd( 'unset HISTFILE; stty -echo; set +m' )
168

    
169
    def mountPrivateDirs( self ):
170
        "mount private directories"
171
        # Avoid expanding a string into a list of chars
172
        assert not isinstance( self.privateDirs, basestring )
173
        for directory in self.privateDirs:
174
            if isinstance( directory, tuple ):
175
                # mount given private directory
176
                privateDir = directory[ 1 ] % self.__dict__
177
                mountPoint = directory[ 0 ]
178
                self.cmd( 'mkdir -p %s' % privateDir )
179
                self.cmd( 'mkdir -p %s' % mountPoint )
180
                self.cmd( 'mount --bind %s %s' %
181
                               ( privateDir, mountPoint ) )
182
            else:
183
                # mount temporary filesystem on directory
184
                self.cmd( 'mkdir -p %s' % directory )
185
                self.cmd( 'mount -n -t tmpfs tmpfs %s' % directory )
186

    
187
    def unmountPrivateDirs( self ):
188
        "mount private directories"
189
        for directory in self.privateDirs:
190
            if isinstance( directory, tuple ):
191
                self.cmd( 'umount ', directory[ 0 ] )
192
            else:
193
                self.cmd( 'umount ', directory )
194

    
195
    def _popen( self, cmd, **params ):
196
        """Internal method: spawn and return a process
197
            cmd: command to run (list)
198
            params: parameters to Popen()"""
199
        # Leave this is as an instance method for now
200
        assert self
201
        return Popen( cmd, **params )
202

    
203
    def cleanup( self ):
204
        "Help python collect its garbage."
205
        # We used to do this, but it slows us down:
206
        # Intfs may end up in root NS
207
        # for intfName in self.intfNames():
208
        # if self.name in intfName:
209
        # quietRun( 'ip link del ' + intfName )
210
        self.shell = None
211

    
212
    # Subshell I/O, commands and control
213

    
214
    def read( self, maxbytes=1024 ):
215
        """Buffered read from node, potentially blocking.
216
           maxbytes: maximum number of bytes to return"""
217
        count = len( self.readbuf )
218
        if count < maxbytes:
219
            data = os.read( self.stdout.fileno(), maxbytes - count )
220
            self.readbuf += data
221
        if maxbytes >= len( self.readbuf ):
222
            result = self.readbuf
223
            self.readbuf = ''
224
        else:
225
            result = self.readbuf[ :maxbytes ]
226
            self.readbuf = self.readbuf[ maxbytes: ]
227
        return result
228

    
229
    def readline( self ):
230
        """Buffered readline from node, potentially blocking.
231
           returns: line (minus newline) or None"""
232
        self.readbuf += self.read( 1024 )
233
        if '\n' not in self.readbuf:
234
            return None
235
        pos = self.readbuf.find( '\n' )
236
        line = self.readbuf[ 0: pos ]
237
        self.readbuf = self.readbuf[ pos + 1: ]
238
        return line
239

    
240
    def write( self, data ):
241
        """Write data to node.
242
           data: string"""
243
        os.write( self.stdin.fileno(), data )
244

    
245
    def terminate( self ):
246
        "Send kill signal to Node and clean up after it."
247
        self.unmountPrivateDirs()
248
        if self.shell:
249
            if self.shell.poll() is None:
250
                os.killpg( self.shell.pid, signal.SIGHUP )
251
        self.cleanup()
252

    
253
    def stop( self, deleteIntfs=False ):
254
        """Stop node.
255
           deleteIntfs: delete interfaces? (False)"""
256
        if deleteIntfs:
257
            self.deleteIntfs()
258
        self.terminate()
259

    
260
    def waitReadable( self, timeoutms=None ):
261
        """Wait until node's output is readable.
262
           timeoutms: timeout in ms or None to wait indefinitely.
263
           returns: result of poll()"""
264
        if len( self.readbuf ) == 0:
265
            return self.pollOut.poll( timeoutms )
266

    
267
    def sendCmd( self, *args, **kwargs ):
268
        """Send a command, followed by a command to echo a sentinel,
269
           and return without waiting for the command to complete.
270
           args: command and arguments, or string
271
           printPid: print command's PID? (False)"""
272
        assert self.shell and not self.waiting
273
        printPid = kwargs.get( 'printPid', False )
274
        # Allow sendCmd( [ list ] )
275
        if len( args ) == 1 and isinstance( args[ 0 ], list ):
276
            cmd = args[ 0 ]
277
        # Allow sendCmd( cmd, arg1, arg2... )
278
        elif len( args ) > 0:
279
            cmd = args
280
        # Convert to string
281
        if not isinstance( cmd, str ):
282
            cmd = ' '.join( [ str( c ) for c in cmd ] )
283
        if not re.search( r'\w', cmd ):
284
            # Replace empty commands with something harmless
285
            cmd = 'echo -n'
286
        self.lastCmd = cmd
287
        # if a builtin command is backgrounded, it still yields a PID
288
        if len( cmd ) > 0 and cmd[ -1 ] == '&':
289
            # print ^A{pid}\n so monitor() can set lastPid
290
            cmd += ' printf "\\001%d\\012" $! '
291
        elif printPid and not isShellBuiltin( cmd ):
292
            cmd = 'mnexec -p ' + cmd
293
        self.write( cmd + '\n' )
294
        self.lastPid = None
295
        self.waiting = True
296

    
297
    def sendInt( self, intr=chr( 3 ) ):
298
        "Interrupt running command."
299
        debug( 'sendInt: writing chr(%d)\n' % ord( intr ) )
300
        self.write( intr )
301

    
302
    def monitor( self, timeoutms=None, findPid=True ):
303
        """Monitor and return the output of a command.
304
           Set self.waiting to False if command has completed.
305
           timeoutms: timeout in ms or None to wait indefinitely
306
           findPid: look for PID from mnexec -p"""
307
        ready = self.waitReadable( timeoutms )
308
        if not ready:
309
            return ''
310
        data = self.read( 1024 )
311
        pidre = r'\[\d+\] \d+\r\n'
312
        # Look for PID
313
        marker = chr( 1 ) + r'\d+\r\n'
314
        if findPid and chr( 1 ) in data:
315
            # suppress the job and PID of a backgrounded command
316
            if re.findall( pidre, data ):
317
                data = re.sub( pidre, '', data )
318
            # Marker can be read in chunks; continue until all of it is read
319
            while not re.findall( marker, data ):
320
                data += self.read( 1024 )
321
            markers = re.findall( marker, data )
322
            if markers:
323
                self.lastPid = int( markers[ 0 ][ 1: ] )
324
                data = re.sub( marker, '', data )
325
        # Look for sentinel/EOF
326
        if len( data ) > 0 and data[ -1 ] == chr( 127 ):
327
            self.waiting = False
328
            data = data[ :-1 ]
329
        elif chr( 127 ) in data:
330
            self.waiting = False
331
            data = data.replace( chr( 127 ), '' )
332
        return data
333

    
334
    def waitOutput( self, verbose=False, findPid=True ):
335
        """Wait for a command to complete.
336
           Completion is signaled by a sentinel character, ASCII(127)
337
           appearing in the output stream.  Wait for the sentinel and return
338
           the output, including trailing newline.
339
           verbose: print output interactively"""
340
        log = info if verbose else debug
341
        output = ''
342
        while self.waiting:
343
            data = self.monitor( findPid=findPid )
344
            output += data
345
            log( data )
346
        return output
347

    
348
    def cmd( self, *args, **kwargs ):
349
        """Send a command, wait for output, and return it.
350
           cmd: string"""
351
        verbose = kwargs.get( 'verbose', False )
352
        log = info if verbose else debug
353
        log( '*** %s : %s\n' % ( self.name, args ) )
354
        if self.shell:
355
            self.sendCmd( *args, **kwargs )
356
            return self.waitOutput( verbose )
357
        else:
358
            warn( '(%s exited - ignoring cmd%s)\n' % ( self, args ) )
359

    
360
    def cmdPrint( self, *args):
361
        """Call cmd and printing its output
362
           cmd: string"""
363
        return self.cmd( *args, **{ 'verbose': True } )
364

    
365
    def popen( self, *args, **kwargs ):
366
        """Return a Popen() object in our namespace
367
           args: Popen() args, single list, or string
368
           kwargs: Popen() keyword args"""
369
        defaults = { 'stdout': PIPE, 'stderr': PIPE,
370
                     'mncmd':
371
                     [ 'mnexec', '-da', str( self.pid ) ] }
372
        defaults.update( kwargs )
373
        if len( args ) == 1:
374
            if isinstance( args[ 0 ], list ):
375
                # popen([cmd, arg1, arg2...])
376
                cmd = args[ 0 ]
377
            elif isinstance( args[ 0 ], basestring ):
378
                # popen("cmd arg1 arg2...")
379
                cmd = args[ 0 ].split()
380
            else:
381
                raise Exception( 'popen() requires a string or list' )
382
        elif len( args ) > 0:
383
            # popen( cmd, arg1, arg2... )
384
            cmd = list( args )
385
        # Attach to our namespace  using mnexec -a
386
        cmd = defaults.pop( 'mncmd' ) + cmd
387
        # Shell requires a string, not a list!
388
        if defaults.get( 'shell', False ):
389
            cmd = ' '.join( cmd )
390
        popen = self._popen( cmd, **defaults )
391
        return popen
392

    
393
    def pexec( self, *args, **kwargs ):
394
        """Execute a command using popen
395
           returns: out, err, exitcode"""
396
        popen = self.popen( *args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
397
                            **kwargs )
398
        # Warning: this can fail with large numbers of fds!
399
        out, err = popen.communicate()
400
        exitcode = popen.wait()
401
        return out, err, exitcode
402

    
403
    # Interface management, configuration, and routing
404

    
405
    # BL notes: This might be a bit redundant or over-complicated.
406
    # However, it does allow a bit of specialization, including
407
    # changing the canonical interface names. It's also tricky since
408
    # the real interfaces are created as veth pairs, so we can't
409
    # make a single interface at a time.
410

    
411
    def newPort( self ):
412
        "Return the next port number to allocate."
413
        if len( self.ports ) > 0:
414
            return max( self.ports.values() ) + 1
415
        return self.portBase
416

    
417
    def addIntf( self, intf, port=None, moveIntfFn=moveIntf ):
418
        """Add an interface.
419
           intf: interface
420
           port: port number (optional, typically OpenFlow port number)
421
           moveIntfFn: function to move interface (optional)"""
422
        if port is None:
423
            port = self.newPort()
424
        self.intfs[ port ] = intf
425
        self.ports[ intf ] = port
426
        self.nameToIntf[ intf.name ] = intf
427
        debug( '\n' )
428
        debug( 'added intf %s (%d) to node %s\n' % (
429
                intf, port, self.name ) )
430
        if self.inNamespace:
431
            debug( 'moving', intf, 'into namespace for', self.name, '\n' )
432
            moveIntfFn( intf.name, self  )
433

    
434
    def defaultIntf( self ):
435
        "Return interface for lowest port"
436
        ports = self.intfs.keys()
437
        if ports:
438
            return self.intfs[ min( ports ) ]
439
        else:
440
            warn( '*** defaultIntf: warning:', self.name,
441
                  'has no interfaces\n' )
442

    
443
    def intf( self, intf=None ):
444
        """Return our interface object with given string name,
445
           default intf if name is falsy (None, empty string, etc).
446
           or the input intf arg.
447

448
        Having this fcn return its arg for Intf objects makes it
449
        easier to construct functions with flexible input args for
450
        interfaces (those that accept both string names and Intf objects).
451
        """
452
        if not intf:
453
            return self.defaultIntf()
454
        elif isinstance( intf, basestring):
455
            return self.nameToIntf[ intf ]
456
        else:
457
            return intf
458

    
459
    def connectionsTo( self, node):
460
        "Return [ intf1, intf2... ] for all intfs that connect self to node."
461
        # We could optimize this if it is important
462
        connections = []
463
        for intf in self.intfList():
464
            link = intf.link
465
            if link:
466
                node1, node2 = link.intf1.node, link.intf2.node
467
                if node1 == self and node2 == node:
468
                    connections += [ ( intf, link.intf2 ) ]
469
                elif node1 == node and node2 == self:
470
                    connections += [ ( intf, link.intf1 ) ]
471
        return connections
472

    
473
    def deleteIntfs( self, checkName=True ):
474
        """Delete all of our interfaces.
475
           checkName: only delete interfaces that contain our name"""
476
        # In theory the interfaces should go away after we shut down.
477
        # However, this takes time, so we're better off removing them
478
        # explicitly so that we won't get errors if we run before they
479
        # have been removed by the kernel. Unfortunately this is very slow,
480
        # at least with Linux kernels before 2.6.33
481
        for intf in self.intfs.values():
482
            # Protect against deleting hardware interfaces
483
            if ( self.name in intf.name ) or ( not checkName ):
484
                intf.delete()
485
                info( '.' )
486

    
487
    # Routing support
488

    
489
    def setARP( self, ip, mac ):
490
        """Add an ARP entry.
491
           ip: IP address as string
492
           mac: MAC address as string"""
493
        result = self.cmd( 'arp', '-s', ip, mac )
494
        return result
495

    
496
    def setHostRoute( self, ip, intf ):
497
        """Add route to host.
498
           ip: IP address as dotted decimal
499
           intf: string, interface name"""
500
        return self.cmd( 'route add -host', ip, 'dev', intf )
501

    
502
    def setDefaultRoute( self, intf=None ):
503
        """Set the default route to go through intf.
504
           intf: Intf or {dev <intfname> via <gw-ip> ...}"""
505
        # Note setParam won't call us if intf is none
506
        if isinstance( intf, basestring ) and ' ' in intf:
507
            params = intf
508
        else:
509
            params = 'dev %s' % intf
510
        # Do this in one line in case we're messing with the root namespace
511
        self.cmd( 'ip route del default; ip route add default', params )
512

    
513
    # Convenience and configuration methods
514

    
515
    def setMAC( self, mac, intf=None ):
516
        """Set the MAC address for an interface.
517
           intf: intf or intf name
518
           mac: MAC address as string"""
519
        return self.intf( intf ).setMAC( mac )
520

    
521
    def setIP( self, ip, prefixLen=8, intf=None, **kwargs ):
522
        """Set the IP address for an interface.
523
           intf: intf or intf name
524
           ip: IP address as a string
525
           prefixLen: prefix length, e.g. 8 for /8 or 16M addrs
526
           kwargs: any additional arguments for intf.setIP"""
527
        return self.intf( intf ).setIP( ip, prefixLen, **kwargs )
528

    
529
    def IP( self, intf=None ):
530
        "Return IP address of a node or specific interface."
531
        return self.intf( intf ).IP()
532

    
533
    def MAC( self, intf=None ):
534
        "Return MAC address of a node or specific interface."
535
        return self.intf( intf ).MAC()
536

    
537
    def intfIsUp( self, intf=None ):
538
        "Check if an interface is up."
539
        return self.intf( intf ).isUp()
540

    
541
    # The reason why we configure things in this way is so
542
    # That the parameters can be listed and documented in
543
    # the config method.
544
    # Dealing with subclasses and superclasses is slightly
545
    # annoying, but at least the information is there!
546

    
547
    def setParam( self, results, method, **param ):
548
        """Internal method: configure a *single* parameter
549
           results: dict of results to update
550
           method: config method name
551
           param: arg=value (ignore if value=None)
552
           value may also be list or dict"""
553
        name, value = param.items()[ 0 ]
554
        if value is None:
555
            return
556
        f = getattr( self, method, None )
557
        if not f:
558
            return
559
        if isinstance( value, list ):
560
            result = f( *value )
561
        elif isinstance( value, dict ):
562
            result = f( **value )
563
        else:
564
            result = f( value )
565
        results[ name ] = result
566
        return result
567

    
568
    def config( self, mac=None, ip=None,
569
                defaultRoute=None, lo='up', **_params ):
570
        """Configure Node according to (optional) parameters:
571
           mac: MAC address for default interface
572
           ip: IP address for default interface
573
           ifconfig: arbitrary interface configuration
574
           Subclasses should override this method and call
575
           the parent class's config(**params)"""
576
        # If we were overriding this method, we would call
577
        # the superclass config method here as follows:
578
        # r = Parent.config( **_params )
579
        r = {}
580
        self.setParam( r, 'setMAC', mac=mac )
581
        self.setParam( r, 'setIP', ip=ip )
582
        self.setParam( r, 'setDefaultRoute', defaultRoute=defaultRoute )
583
        # This should be examined
584
        self.cmd( 'ifconfig lo ' + lo )
585
        return r
586

    
587
    def configDefault( self, **moreParams ):
588
        "Configure with default parameters"
589
        self.params.update( moreParams )
590
        self.config( **self.params )
591

    
592
    # This is here for backward compatibility
593
    def linkTo( self, node, link=Link ):
594
        """(Deprecated) Link to another node
595
           replace with Link( node1, node2)"""
596
        return link( self, node )
597

    
598
    # Other methods
599

    
600
    def intfList( self ):
601
        "List of our interfaces sorted by port number"
602
        return [ self.intfs[ p ] for p in sorted( self.intfs.iterkeys() ) ]
603

    
604
    def intfNames( self ):
605
        "The names of our interfaces sorted by port number"
606
        return [ str( i ) for i in self.intfList() ]
607

    
608
    def __repr__( self ):
609
        "More informative string representation"
610
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
611
                              for i in self.intfList() ] ) )
612
        return '<%s %s: %s pid=%s> ' % (
613
            self.__class__.__name__, self.name, intfs, self.pid )
614

    
615
    def __str__( self ):
616
        "Abbreviated string representation"
617
        return self.name
618

    
619
    # Automatic class setup support
620

    
621
    isSetup = False
622

    
623
    @classmethod
624
    def checkSetup( cls ):
625
        "Make sure our class and superclasses are set up"
626
        while cls and not getattr( cls, 'isSetup', True ):
627
            cls.setup()
628
            cls.isSetup = True
629
            # Make pylint happy
630
            cls = getattr( type( cls ), '__base__', None )
631

    
632
    @classmethod
633
    def setup( cls ):
634
        "Make sure our class dependencies are available"
635
        pathCheck( 'mnexec', 'ifconfig', moduleName='Mininet')
636

    
637
class Host( Node ):
638
    "A host is simply a Node"
639
    pass
640

    
641
class CPULimitedHost( Host ):
642

    
643
    "CPU limited host"
644

    
645
    def __init__( self, name, sched='cfs', **kwargs ):
646
        Host.__init__( self, name, **kwargs )
647
        # Initialize class if necessary
648
        if not CPULimitedHost.inited:
649
            CPULimitedHost.init()
650
        # Create a cgroup and move shell into it
651
        self.cgroup = 'cpu,cpuacct,cpuset:/' + self.name
652
        errFail( 'cgcreate -g ' + self.cgroup )
653
        # We don't add ourselves to a cpuset because you must
654
        # specify the cpu and memory placement first
655
        errFail( 'cgclassify -g cpu,cpuacct:/%s %s' % ( self.name, self.pid ) )
656
        # BL: Setting the correct period/quota is tricky, particularly
657
        # for RT. RT allows very small quotas, but the overhead
658
        # seems to be high. CFS has a mininimum quota of 1 ms, but
659
        # still does better with larger period values.
660
        self.period_us = kwargs.get( 'period_us', 100000 )
661
        self.sched = sched
662
        if sched == 'rt':
663
            self.checkRtGroupSched()
664
            self.rtprio = 20
665

    
666
    def cgroupSet( self, param, value, resource='cpu' ):
667
        "Set a cgroup parameter and return its value"
668
        cmd = 'cgset -r %s.%s=%s /%s' % (
669
            resource, param, value, self.name )
670
        quietRun( cmd )
671
        nvalue = int( self.cgroupGet( param, resource ) )
672
        if nvalue != value:
673
            error( '*** error: cgroupSet: %s set to %s instead of %s\n'
674
                   % ( param, nvalue, value ) )
675
        return nvalue
676

    
677
    def cgroupGet( self, param, resource='cpu' ):
678
        "Return value of cgroup parameter"
679
        cmd = 'cgget -r %s.%s /%s' % (
680
            resource, param, self.name )
681
        return int( quietRun( cmd ).split()[ -1 ] )
682

    
683
    def cgroupDel( self ):
684
        "Clean up our cgroup"
685
        # info( '*** deleting cgroup', self.cgroup, '\n' )
686
        _out, _err, exitcode = errRun( 'cgdelete -r ' + self.cgroup )
687
        # Sometimes cgdelete returns a resource busy error but still
688
        # deletes the group; next attempt will give "no such file"
689
        return exitcode == 0  or ( 'no such file' in _err.lower() )
690

    
691
    def popen( self, *args, **kwargs ):
692
        """Return a Popen() object in node's namespace
693
           args: Popen() args, single list, or string
694
           kwargs: Popen() keyword args"""
695
        # Tell mnexec to execute command in our cgroup
696
        mncmd = [ 'mnexec', '-g', self.name,
697
                  '-da', str( self.pid ) ]
698
        # if our cgroup is not given any cpu time,
699
        # we cannot assign the RR Scheduler.
700
        if self.sched == 'rt':
701
            if int( self.cgroupGet( 'rt_runtime_us', 'cpu' ) ) <= 0:
702
                mncmd += [ '-r', str( self.rtprio ) ]
703
            else:
704
                debug( '*** error: not enough cpu time available for %s.' %
705
                       self.name, 'Using cfs scheduler for subprocess\n' )
706
        return Host.popen( self, *args, mncmd=mncmd, **kwargs )
707

    
708
    def cleanup( self ):
709
        "Clean up Node, then clean up our cgroup"
710
        super( CPULimitedHost, self ).cleanup()
711
        retry( retries=3, delaySecs=.1, fn=self.cgroupDel )
712

    
713
    _rtGroupSched = False   # internal class var: Is CONFIG_RT_GROUP_SCHED set?
714

    
715
    @classmethod
716
    def checkRtGroupSched( cls ):
717
        "Check (Ubuntu,Debian) kernel config for CONFIG_RT_GROUP_SCHED for RT"
718
        if not cls._rtGroupSched:
719
            release = quietRun( 'uname -r' ).strip('\r\n')
720
            output = quietRun( 'grep CONFIG_RT_GROUP_SCHED /boot/config-%s' %
721
                               release )
722
            if output == '# CONFIG_RT_GROUP_SCHED is not set\n':
723
                error( '\n*** error: please enable RT_GROUP_SCHED '
724
                       'in your kernel\n' )
725
                exit( 1 )
726
            cls._rtGroupSched = True
727

    
728
    def chrt( self ):
729
        "Set RT scheduling priority"
730
        quietRun( 'chrt -p %s %s' % ( self.rtprio, self.pid ) )
731
        result = quietRun( 'chrt -p %s' % self.pid )
732
        firstline = result.split( '\n' )[ 0 ]
733
        lastword = firstline.split( ' ' )[ -1 ]
734
        if lastword != 'SCHED_RR':
735
            error( '*** error: could not assign SCHED_RR to %s\n' % self.name )
736
        return lastword
737

    
738
    def rtInfo( self, f ):
739
        "Internal method: return parameters for RT bandwidth"
740
        pstr, qstr = 'rt_period_us', 'rt_runtime_us'
741
        # RT uses wall clock time for period and quota
742
        quota = int( self.period_us * f )
743
        return pstr, qstr, self.period_us, quota
744

    
745
    def cfsInfo( self, f ):
746
        "Internal method: return parameters for CFS bandwidth"
747
        pstr, qstr = 'cfs_period_us', 'cfs_quota_us'
748
        # CFS uses wall clock time for period and CPU time for quota.
749
        quota = int( self.period_us * f * numCores() )
750
        period = self.period_us
751
        if f > 0 and quota < 1000:
752
            debug( '(cfsInfo: increasing default period) ' )
753
            quota = 1000
754
            period = int( quota / f / numCores() )
755
        # Reset to unlimited on negative quota
756
        if quota < 0:
757
            quota = -1
758
        return pstr, qstr, period, quota
759

    
760
    # BL comment:
761
    # This may not be the right API,
762
    # since it doesn't specify CPU bandwidth in "absolute"
763
    # units the way link bandwidth is specified.
764
    # We should use MIPS or SPECINT or something instead.
765
    # Alternatively, we should change from system fraction
766
    # to CPU seconds per second, essentially assuming that
767
    # all CPUs are the same.
768

    
769
    def setCPUFrac( self, f, sched=None ):
770
        """Set overall CPU fraction for this host
771
           f: CPU bandwidth limit (positive fraction, or -1 for cfs unlimited)
772
           sched: 'rt' or 'cfs'
773
           Note 'cfs' requires CONFIG_CFS_BANDWIDTH,
774
           and 'rt' requires CONFIG_RT_GROUP_SCHED"""
775
        if not sched:
776
            sched = self.sched
777
        if sched == 'rt':
778
            if not f or f < 0:
779
                raise Exception( 'Please set a positive CPU fraction'
780
                                 ' for sched=rt\n' )
781
            pstr, qstr, period, quota = self.rtInfo( f )
782
        elif sched == 'cfs':
783
            pstr, qstr, period, quota = self.cfsInfo( f )
784
        else:
785
            return
786
        # Set cgroup's period and quota
787
        setPeriod = self.cgroupSet( pstr, period )
788
        setQuota = self.cgroupSet( qstr, quota )
789
        if sched == 'rt':
790
            # Set RT priority if necessary
791
            sched = self.chrt()
792
        info( '(%s %d/%dus) ' % ( sched, setQuota, setPeriod ) )
793

    
794
    def setCPUs( self, cores, mems=0 ):
795
        "Specify (real) cores that our cgroup can run on"
796
        if not cores:
797
            return
798
        if isinstance( cores, list ):
799
            cores = ','.join( [ str( c ) for c in cores ] )
800
        self.cgroupSet( resource='cpuset', param='cpus',
801
                        value=cores )
802
        # Memory placement is probably not relevant, but we
803
        # must specify it anyway
804
        self.cgroupSet( resource='cpuset', param='mems',
805
                        value=mems)
806
        # We have to do this here after we've specified
807
        # cpus and mems
808
        errFail( 'cgclassify -g cpuset:/%s %s' % (
809
                 self.name, self.pid ) )
810

    
811
    def config( self, cpu=-1, cores=None, **params ):
812
        """cpu: desired overall system CPU fraction
813
           cores: (real) core(s) this host can run on
814
           params: parameters for Node.config()"""
815
        r = Node.config( self, **params )
816
        # Was considering cpu={'cpu': cpu , 'sched': sched}, but
817
        # that seems redundant
818
        self.setParam( r, 'setCPUFrac', cpu=cpu )
819
        self.setParam( r, 'setCPUs', cores=cores )
820
        return r
821

    
822
    inited = False
823

    
824
    @classmethod
825
    def init( cls ):
826
        "Initialization for CPULimitedHost class"
827
        mountCgroups()
828
        cls.inited = True
829

    
830

    
831
# Some important things to note:
832
#
833
# The "IP" address which setIP() assigns to the switch is not
834
# an "IP address for the switch" in the sense of IP routing.
835
# Rather, it is the IP address for the control interface,
836
# on the control network, and it is only relevant to the
837
# controller. If you are running in the root namespace
838
# (which is the only way to run OVS at the moment), the
839
# control interface is the loopback interface, and you
840
# normally never want to change its IP address!
841
#
842
# In general, you NEVER want to attempt to use Linux's
843
# network stack (i.e. ifconfig) to "assign" an IP address or
844
# MAC address to a switch data port. Instead, you "assign"
845
# the IP and MAC addresses in the controller by specifying
846
# packets that you want to receive or send. The "MAC" address
847
# reported by ifconfig for a switch data port is essentially
848
# meaningless. It is important to understand this if you
849
# want to create a functional router using OpenFlow.
850

    
851
class Switch( Node ):
852
    """A Switch is a Node that is running (or has execed?)
853
       an OpenFlow switch."""
854

    
855
    portBase = 1  # Switches start with port 1 in OpenFlow
856
    dpidLen = 16  # digits in dpid passed to switch
857

    
858
    def __init__( self, name, dpid=None, opts='', listenPort=None, **params):
859
        """dpid: dpid hex string (or None to derive from name, e.g. s1 -> 1)
860
           opts: additional switch options
861
           listenPort: port to listen on for dpctl connections"""
862
        Node.__init__( self, name, **params )
863
        self.dpid = self.defaultDpid( dpid )
864
        self.opts = opts
865
        self.listenPort = listenPort
866
        if not self.inNamespace:
867
            self.controlIntf = Intf( 'lo', self, port=0 )
868

    
869
    def defaultDpid( self, dpid=None ):
870
        "Return correctly formatted dpid from dpid or switch name (s1 -> 1)"
871
        if dpid:
872
            # Remove any colons and make sure it's a good hex number
873
            dpid = dpid.translate( None, ':' )
874
            assert len( dpid ) <= self.dpidLen and int( dpid, 16 ) >= 0
875
        else:
876
            # Use hex of the first number in the switch name
877
            nums = re.findall( r'\d+', self.name )
878
            if nums:
879
                dpid = hex( int( nums[ 0 ] ) )[ 2: ]
880
            else:
881
                raise Exception( 'Unable to derive default datapath ID - '
882
                                 'please either specify a dpid or use a '
883
                                 'canonical switch name such as s23.' )
884
        return '0' * ( self.dpidLen - len( dpid ) ) + dpid
885

    
886
    def defaultIntf( self ):
887
        "Return control interface"
888
        if self.controlIntf:
889
            return self.controlIntf
890
        else:
891
            return Node.defaultIntf( self )
892

    
893
    def sendCmd( self, *cmd, **kwargs ):
894
        """Send command to Node.
895
           cmd: string"""
896
        kwargs.setdefault( 'printPid', False )
897
        if not self.execed:
898
            return Node.sendCmd( self, *cmd, **kwargs )
899
        else:
900
            error( '*** Error: %s has execed and cannot accept commands' %
901
                   self.name )
902

    
903
    def connected( self ):
904
        "Is the switch connected to a controller? (override this method)"
905
        # Assume that we are connected by default to whatever we need to
906
        # be connected to. This should be overridden by any OpenFlow
907
        # switch, but not by a standalone bridge.
908
        debug( 'Assuming', repr( self ), 'is connected to a controller\n' )
909
        return True
910

    
911
    def stop( self, deleteIntfs=True ):
912
        """Stop switch
913
           deleteIntfs: delete interfaces? (True)"""
914
        if deleteIntfs:
915
            self.deleteIntfs()
916

    
917
    def __repr__( self ):
918
        "More informative string representation"
919
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
920
                              for i in self.intfList() ] ) )
921
        return '<%s %s: %s pid=%s> ' % (
922
            self.__class__.__name__, self.name, intfs, self.pid )
923

    
924

    
925
class UserSwitch( Switch ):
926
    "User-space switch."
927

    
928
    dpidLen = 12
929

    
930
    def __init__( self, name, dpopts='--no-slicing', **kwargs ):
931
        """Init.
932
           name: name for the switch
933
           dpopts: additional arguments to ofdatapath (--no-slicing)"""
934
        Switch.__init__( self, name, **kwargs )
935
        pathCheck( 'ofdatapath', 'ofprotocol',
936
                   moduleName='the OpenFlow reference user switch' +
937
                              '(openflow.org)' )
938
        if self.listenPort:
939
            self.opts += ' --listen=ptcp:%i ' % self.listenPort
940
        else:
941
            self.opts += ' --listen=punix:/tmp/%s.listen' % self.name
942
        self.dpopts = dpopts
943

    
944
    @classmethod
945
    def setup( cls ):
946
        "Ensure any dependencies are loaded; if not, try to load them."
947
        if not os.path.exists( '/dev/net/tun' ):
948
            moduleDeps( add=TUN )
949

    
950
    def dpctl( self, *args ):
951
        "Run dpctl command"
952
        listenAddr = None
953
        if not self.listenPort:
954
            listenAddr = 'unix:/tmp/%s.listen' % self.name
955
        else:
956
            listenAddr = 'tcp:127.0.0.1:%i' % self.listenPort
957
        return self.cmd( 'dpctl ' + ' '.join( args ) +
958
                         ' ' + listenAddr )
959

    
960
    def connected( self ):
961
        "Is the switch connected to a controller?"
962
        status = self.dpctl( 'status' )
963
        return ( 'remote.is-connected=true' in status and
964
                 'local.is-connected=true' in status )
965

    
966
    @staticmethod
967
    def TCReapply( intf ):
968
        """Unfortunately user switch and Mininet are fighting
969
           over tc queuing disciplines. To resolve the conflict,
970
           we re-create the user switch's configuration, but as a
971
           leaf of the TCIntf-created configuration."""
972
        if isinstance( intf, TCIntf ):
973
            ifspeed = 10000000000  # 10 Gbps
974
            minspeed = ifspeed * 0.001
975

    
976
            res = intf.config( **intf.params )
977

    
978
            if res is None:  # link may not have TC parameters
979
                return
980

    
981
            # Re-add qdisc, root, and default classes user switch created, but
982
            # with new parent, as setup by Mininet's TCIntf
983
            parent = res['parent']
984
            intf.tc( "%s qdisc add dev %s " + parent +
985
                     " handle 1: htb default 0xfffe" )
986
            intf.tc( "%s class add dev %s classid 1:0xffff parent 1: htb rate "
987
                     + str(ifspeed) )
988
            intf.tc( "%s class add dev %s classid 1:0xfffe parent 1:0xffff " +
989
                     "htb rate " + str(minspeed) + " ceil " + str(ifspeed) )
990

    
991
    def start( self, controllers ):
992
        """Start OpenFlow reference user datapath.
993
           Log to /tmp/sN-{ofd,ofp}.log.
994
           controllers: list of controller objects"""
995
        # Add controllers
996
        clist = ','.join( [ 'tcp:%s:%d' % ( c.IP(), c.port )
997
                            for c in controllers ] )
998
        ofdlog = '/tmp/' + self.name + '-ofd.log'
999
        ofplog = '/tmp/' + self.name + '-ofp.log'
1000
        intfs = [ str( i ) for i in self.intfList() if not i.IP() ]
1001
        self.cmd( 'ofdatapath -i ' + ','.join( intfs ) +
1002
                  ' punix:/tmp/' + self.name + ' -d %s ' % self.dpid +
1003
                  self.dpopts +
1004
                  ' 1> ' + ofdlog + ' 2> ' + ofdlog + ' &' )
1005
        self.cmd( 'ofprotocol unix:/tmp/' + self.name +
1006
                  ' ' + clist +
1007
                  ' --fail=closed ' + self.opts +
1008
                  ' 1> ' + ofplog + ' 2>' + ofplog + ' &' )
1009
        if "no-slicing" not in self.dpopts:
1010
            # Only TCReapply if slicing is enable
1011
            sleep(1)  # Allow ofdatapath to start before re-arranging qdisc's
1012
            for intf in self.intfList():
1013
                if not intf.IP():
1014
                    self.TCReapply( intf )
1015

    
1016
    def stop( self, deleteIntfs=True ):
1017
        """Stop OpenFlow reference user datapath.
1018
           deleteIntfs: delete interfaces? (True)"""
1019
        self.cmd( 'kill %ofdatapath' )
1020
        self.cmd( 'kill %ofprotocol' )
1021
        super( UserSwitch, self ).stop( deleteIntfs )
1022

    
1023

    
1024
class OVSSwitch( Switch ):
1025
    "Open vSwitch switch. Depends on ovs-vsctl."
1026

    
1027
    def __init__( self, name, failMode='secure', datapath='kernel',
1028
                  inband=False, protocols=None,
1029
                  reconnectms=1000, stp=False, batch=False, **params ):
1030
        """name: name for switch
1031
           failMode: controller loss behavior (secure|standalone)
1032
           datapath: userspace or kernel mode (kernel|user)
1033
           inband: use in-band control (False)
1034
           protocols: use specific OpenFlow version(s) (e.g. OpenFlow13)
1035
                      Unspecified (or old OVS version) uses OVS default
1036
           reconnectms: max reconnect timeout in ms (0/None for default)
1037
           stp: enable STP (False, requires failMode=standalone)
1038
           batch: enable batch startup (False)"""
1039
        Switch.__init__( self, name, **params )
1040
        self.failMode = failMode
1041
        self.datapath = datapath
1042
        self.inband = inband
1043
        self.protocols = protocols
1044
        self.reconnectms = reconnectms
1045
        self.stp = stp
1046
        self._uuids = []  # controller UUIDs
1047
        self.batch = batch
1048
        self.commands = []  # saved commands for batch startup
1049

    
1050
    @classmethod
1051
    def setup( cls ):
1052
        "Make sure Open vSwitch is installed and working"
1053
        pathCheck( 'ovs-vsctl',
1054
                   moduleName='Open vSwitch (openvswitch.org)')
1055
        # This should no longer be needed, and it breaks
1056
        # with OVS 1.7 which has renamed the kernel module:
1057
        #  moduleDeps( subtract=OF_KMOD, add=OVS_KMOD )
1058
        out, err, exitcode = errRun( 'ovs-vsctl -t 1 show' )
1059
        if exitcode:
1060
            error( out + err +
1061
                   'ovs-vsctl exited with code %d\n' % exitcode +
1062
                   '*** Error connecting to ovs-db with ovs-vsctl\n'
1063
                   'Make sure that Open vSwitch is installed, '
1064
                   'that ovsdb-server is running, and that\n'
1065
                   '"ovs-vsctl show" works correctly.\n'
1066
                   'You may wish to try '
1067
                   '"service openvswitch-switch start".\n' )
1068
            exit( 1 )
1069
        version = quietRun( 'ovs-vsctl --version' )
1070
        cls.OVSVersion = findall( r'\d+\.\d+', version )[ 0 ]
1071

    
1072
    @classmethod
1073
    def isOldOVS( cls ):
1074
        "Is OVS ersion < 1.10?"
1075
        return ( StrictVersion( cls.OVSVersion ) <
1076
                 StrictVersion( '1.10' ) )
1077

    
1078
    def dpctl( self, *args ):
1079
        "Run ovs-ofctl command"
1080
        return self.cmd( 'ovs-ofctl', args[ 0 ], self, *args[ 1: ] )
1081

    
1082
    def vsctl( self, *args, **kwargs ):
1083
        "Run ovs-vsctl command (or queue for later execution)"
1084
        if self.batch:
1085
            cmd = ' '.join( str( arg ).strip() for arg in args )
1086
            self.commands.append( cmd )
1087
        else:
1088
            return self.cmd( 'ovs-vsctl', *args, **kwargs )
1089

    
1090
    @staticmethod
1091
    def TCReapply( intf ):
1092
        """Unfortunately OVS and Mininet are fighting
1093
           over tc queuing disciplines. As a quick hack/
1094
           workaround, we clear OVS's and reapply our own."""
1095
        if isinstance( intf, TCIntf ):
1096
            intf.config( **intf.params )
1097

    
1098
    def attach( self, intf ):
1099
        "Connect a data port"
1100
        self.vsctl( 'add-port', self, intf )
1101
        self.cmd( 'ifconfig', intf, 'up' )
1102
        self.TCReapply( intf )
1103

    
1104
    def detach( self, intf ):
1105
        "Disconnect a data port"
1106
        self.vsctl( 'del-port', self, intf )
1107

    
1108
    def controllerUUIDs( self, update=False ):
1109
        """Return ovsdb UUIDs for our controllers
1110
           update: update cached value"""
1111
        if not self._uuids or update:
1112
            controllers = self.cmd( 'ovs-vsctl -- get Bridge', self,
1113
                                    'Controller' ).strip()
1114
            if controllers.startswith( '[' ) and controllers.endswith( ']' ):
1115
                controllers = controllers[ 1 : -1 ]
1116
                if controllers:
1117
                    self._uuids = [ c.strip()
1118
                                    for c in controllers.split( ',' ) ]
1119
        return self._uuids
1120

    
1121
    def connected( self ):
1122
        "Are we connected to at least one of our controllers?"
1123
        for uuid in self.controllerUUIDs():
1124
            if 'true' in self.vsctl( '-- get Controller',
1125
                                     uuid, 'is_connected' ):
1126
                return True
1127
        return self.failMode == 'standalone'
1128

    
1129
    def intfOpts( self, intf ):
1130
        "Return OVS interface options for intf"
1131
        opts = ''
1132
        if not self.isOldOVS():
1133
            # ofport_request is not supported on old OVS
1134
            opts += ' ofport_request=%s' % self.ports[ intf ]
1135
            # Patch ports don't work well with old OVS
1136
            if isinstance( intf, OVSIntf ):
1137
                intf1, intf2 = intf.link.intf1, intf.link.intf2
1138
                peer = intf1 if intf1 != intf else intf2
1139
                opts += ' type=patch options:peer=%s' % peer
1140
        return '' if not opts else ' -- set Interface %s' % intf + opts
1141

    
1142
    def bridgeOpts( self ):
1143
        "Return OVS bridge options"
1144
        opts = ( ' other_config:datapath-id=%s' % self.dpid +
1145
                 ' fail_mode=%s' % self.failMode )
1146
        if not self.inband:
1147
            opts += ' other-config:disable-in-band=true'
1148
        if self.datapath == 'user':
1149
            opts += ' datapath_type=netdev'
1150
        if self.protocols and not self.isOldOVS():
1151
            opts += ' protocols=%s' % self.protocols
1152
        if self.stp and self.failMode == 'standalone':
1153
            opts += ' stp_enable=true'
1154
        return opts
1155

    
1156
    def start( self, controllers ):
1157
        "Start up a new OVS OpenFlow switch using ovs-vsctl"
1158
        if self.inNamespace:
1159
            raise Exception(
1160
                'OVS kernel switch does not work in a namespace' )
1161
        int( self.dpid, 16 )  # DPID must be a hex string
1162
        # Command to add interfaces
1163
        intfs = ''.join( ' -- add-port %s %s' % ( self, intf ) +
1164
                         self.intfOpts( intf )
1165
                         for intf in self.intfList()
1166
                         if self.ports[ intf ] and not intf.IP() )
1167
        # Command to create controller entries
1168
        clist = [ ( self.name + c.name, '%s:%s:%d' %
1169
                  ( c.protocol, c.IP(), c.port ) )
1170
                  for c in controllers ]
1171
        if self.listenPort:
1172
            clist.append( ( self.name + '-listen',
1173
                            'ptcp:%s' % self.listenPort ) )
1174
        ccmd = '-- --id=@%s create Controller target=\\"%s\\"'
1175
        if self.reconnectms:
1176
            ccmd += ' max_backoff=%d' % self.reconnectms
1177
        cargs = ' '.join( ccmd % ( name, target )
1178
                          for name, target in clist )
1179
        # Controller ID list
1180
        cids = ','.join( '@%s' % name for name, _target in clist )
1181
        # Try to delete any existing bridges with the same name
1182
        if not self.isOldOVS():
1183
            cargs += ' -- --if-exists del-br %s' % self
1184
        # One ovs-vsctl command to rule them all!
1185
        self.vsctl( cargs +
1186
                    ' -- add-br %s' % self +
1187
                    ' -- set bridge %s controller=[%s]' % ( self, cids  ) +
1188
                    self.bridgeOpts() +
1189
                    intfs )
1190
        # If necessary, restore TC config overwritten by OVS
1191
        if not self.batch:
1192
            for intf in self.intfList():
1193
                self.TCReapply( intf )
1194

    
1195
    # This should be ~ int( quietRun( 'getconf ARG_MAX' ) ),
1196
    # but the real limit seems to be much lower
1197
    argmax = 128000
1198

    
1199
    @classmethod
1200
    def batchStartup( cls, switches, run=errRun ):
1201
        """Batch startup for OVS
1202
           switches: switches to start up
1203
           run: function to run commands (errRun)"""
1204
        info( '...' )
1205
        cmds = 'ovs-vsctl'
1206
        for switch in switches:
1207
            if switch.isOldOVS():
1208
                # Ideally we'd optimize this also
1209
                run( 'ovs-vsctl del-br %s' % switch )
1210
            for cmd in switch.commands:
1211
                cmd = cmd.strip()
1212
                # Don't exceed ARG_MAX
1213
                if len( cmds ) + len( cmd ) >= cls.argmax:
1214
                    run( cmds, shell=True )
1215
                    cmds = 'ovs-vsctl'
1216
                cmds += ' ' + cmd
1217
                switch.cmds = []
1218
                switch.batch = False
1219
        if cmds:
1220
            run( cmds, shell=True )
1221
        # Reapply link config if necessary...
1222
        for switch in switches:
1223
            for intf in switch.intfs.itervalues():
1224
                if isinstance( intf, TCIntf ):
1225
                    intf.config( **intf.params )
1226
        return switches
1227

    
1228
    def stop( self, deleteIntfs=True ):
1229
        """Terminate OVS switch.
1230
           deleteIntfs: delete interfaces? (True)"""
1231
        self.cmd( 'ovs-vsctl del-br', self )
1232
        if self.datapath == 'user':
1233
            self.cmd( 'ip link del', self )
1234
        super( OVSSwitch, self ).stop( deleteIntfs )
1235

    
1236
    @classmethod
1237
    def batchShutdown( cls, switches, run=errRun ):
1238
        "Shut down a list of OVS switches"
1239
        delcmd = 'del-br %s'
1240
        if switches and not switches[ 0 ].isOldOVS():
1241
            delcmd = '--if-exists ' + delcmd
1242
        # First, delete them all from ovsdb
1243
        run( 'ovs-vsctl ' +
1244
             ' -- '.join( delcmd % s for s in switches ) )
1245
        # Next, shut down all of the processes
1246
        pids = ' '.join( str( switch.pid ) for switch in switches )
1247
        run( 'kill -HUP ' + pids )
1248
        for switch in switches:
1249
            switch.shell = None
1250
        return switches
1251

    
1252

    
1253
OVSKernelSwitch = OVSSwitch
1254

    
1255

    
1256
class OVSBridge( OVSSwitch ):
1257
    "OVSBridge is an OVSSwitch in standalone/bridge mode"
1258

    
1259
    def __init__( self, *args, **kwargs ):
1260
        """stp: enable Spanning Tree Protocol (False)
1261
           see OVSSwitch for other options"""
1262
        kwargs.update( failMode='standalone' )
1263
        OVSSwitch.__init__( self, *args, **kwargs )
1264

    
1265
    def start( self, controllers ):
1266
        "Start bridge, ignoring controllers argument"
1267
        OVSSwitch.start( self, controllers=[] )
1268

    
1269
    def connected( self ):
1270
        "Are we forwarding yet?"
1271
        if self.stp:
1272
            status = self.dpctl( 'show' )
1273
            return 'STP_FORWARD' in status and not 'STP_LEARN' in status
1274
        else:
1275
            return True
1276

    
1277

    
1278
class IVSSwitch( Switch ):
1279
    "Indigo Virtual Switch"
1280

    
1281
    def __init__( self, name, verbose=False, **kwargs ):
1282
        Switch.__init__( self, name, **kwargs )
1283
        self.verbose = verbose
1284

    
1285
    @classmethod
1286
    def setup( cls ):
1287
        "Make sure IVS is installed"
1288
        pathCheck( 'ivs-ctl', 'ivs',
1289
                   moduleName="Indigo Virtual Switch (projectfloodlight.org)" )
1290
        out, err, exitcode = errRun( 'ivs-ctl show' )
1291
        if exitcode:
1292
            error( out + err +
1293
                   'ivs-ctl exited with code %d\n' % exitcode +
1294
                   '*** The openvswitch kernel module might '
1295
                   'not be loaded. Try modprobe openvswitch.\n' )
1296
            exit( 1 )
1297

    
1298
    @classmethod
1299
    def batchShutdown( cls, switches ):
1300
        "Kill each IVS switch, to be waited on later in stop()"
1301
        for switch in switches:
1302
            switch.cmd( 'kill %ivs' )
1303
        return switches
1304

    
1305
    def start( self, controllers ):
1306
        "Start up a new IVS switch"
1307
        args = ['ivs']
1308
        args.extend( ['--name', self.name] )
1309
        args.extend( ['--dpid', self.dpid] )
1310
        if self.verbose:
1311
            args.extend( ['--verbose'] )
1312
        for intf in self.intfs.values():
1313
            if not intf.IP():
1314
                args.extend( ['-i', intf.name] )
1315
        for c in controllers:
1316
            args.extend( ['-c', '%s:%d' % (c.IP(), c.port)] )
1317
        if self.listenPort:
1318
            args.extend( ['--listen', '127.0.0.1:%i' % self.listenPort] )
1319
        args.append( self.opts )
1320

    
1321
        logfile = '/tmp/ivs.%s.log' % self.name
1322

    
1323
        self.cmd( ' '.join(args) + ' >' + logfile + ' 2>&1 </dev/null &' )
1324

    
1325
    def stop( self, deleteIntfs=True ):
1326
        """Terminate IVS switch.
1327
           deleteIntfs: delete interfaces? (True)"""
1328
        self.cmd( 'kill %ivs' )
1329
        self.cmd( 'wait' )
1330
        super( IVSSwitch, self ).stop( deleteIntfs )
1331

    
1332
    def attach( self, intf ):
1333
        "Connect a data port"
1334
        self.cmd( 'ivs-ctl', 'add-port', '--datapath', self.name, intf )
1335

    
1336
    def detach( self, intf ):
1337
        "Disconnect a data port"
1338
        self.cmd( 'ivs-ctl', 'del-port', '--datapath', self.name, intf )
1339

    
1340
    def dpctl( self, *args ):
1341
        "Run dpctl command"
1342
        if not self.listenPort:
1343
            return "can't run dpctl without passive listening port"
1344
        return self.cmd( 'ovs-ofctl ' + ' '.join( args ) +
1345
                         ' tcp:127.0.0.1:%i' % self.listenPort )
1346

    
1347

    
1348
class Controller( Node ):
1349
    """A Controller is a Node that is running (or has execed?) an
1350
       OpenFlow controller."""
1351

    
1352
    def __init__( self, name, inNamespace=False, command='controller',
1353
                  cargs='-v ptcp:%d', cdir=None, ip="127.0.0.1",
1354
                  port=6653, protocol='tcp', **params ):
1355
        self.command = command
1356
        self.cargs = cargs
1357
        self.cdir = cdir
1358
        # Accept 'ip:port' syntax as shorthand
1359
        if ':' in ip:
1360
            ip, port = ip.split( ':' )
1361
            port = int( port )
1362
        self.ip = ip
1363
        self.port = port
1364
        self.protocol = protocol
1365
        Node.__init__( self, name, inNamespace=inNamespace,
1366
                       ip=ip, **params  )
1367
        self.checkListening()
1368

    
1369
    def checkListening( self ):
1370
        "Make sure no controllers are running on our port"
1371
        # Verify that Telnet is installed first:
1372
        out, _err, returnCode = errRun( "which telnet" )
1373
        if 'telnet' not in out or returnCode != 0:
1374
            raise Exception( "Error running telnet to check for listening "
1375
                             "controllers; please check that it is "
1376
                             "installed." )
1377
        listening = self.cmd( "echo A | telnet -e A %s %d" %
1378
                              ( self.ip, self.port ) )
1379
        if 'Connected' in listening:
1380
            servers = self.cmd( 'netstat -natp' ).split( '\n' )
1381
            pstr = ':%d ' % self.port
1382
            clist = servers[ 0:1 ] + [ s for s in servers if pstr in s ]
1383
            raise Exception( "Please shut down the controller which is"
1384
                             " running on port %d:\n" % self.port +
1385
                             '\n'.join( clist ) )
1386

    
1387
    def start( self ):
1388
        """Start <controller> <args> on controller.
1389
           Log to /tmp/cN.log"""
1390
        pathCheck( self.command )
1391
        cout = '/tmp/' + self.name + '.log'
1392
        if self.cdir is not None:
1393
            self.cmd( 'cd ' + self.cdir )
1394
        self.cmd( self.command + ' ' + self.cargs % self.port +
1395
                  ' 1>' + cout + ' 2>' + cout + ' &' )
1396
        self.execed = False
1397

    
1398
    def stop( self, *args, **kwargs ):
1399
        "Stop controller."
1400
        self.cmd( 'kill %' + self.command )
1401
        self.cmd( 'wait %' + self.command )
1402
        super( Controller, self ).stop( *args, **kwargs )
1403

    
1404
    def IP( self, intf=None ):
1405
        "Return IP address of the Controller"
1406
        if self.intfs:
1407
            ip = Node.IP( self, intf )
1408
        else:
1409
            ip = self.ip
1410
        return ip
1411

    
1412
    def __repr__( self ):
1413
        "More informative string representation"
1414
        return '<%s %s: %s:%s pid=%s> ' % (
1415
            self.__class__.__name__, self.name,
1416
            self.IP(), self.port, self.pid )
1417

    
1418
    @classmethod
1419
    def isAvailable( cls ):
1420
        "Is controller available?"
1421
        return quietRun( 'which controller' )
1422

    
1423

    
1424
class OVSController( Controller ):
1425
    "Open vSwitch controller"
1426
    def __init__( self, name, command='ovs-controller', **kwargs ):
1427
        if quietRun( 'which test-controller' ):
1428
            command = 'test-controller'
1429
        Controller.__init__( self, name, command=command, **kwargs )
1430

    
1431
    @classmethod
1432
    def isAvailable( cls ):
1433
        return ( quietRun( 'which ovs-controller' ) or
1434
                 quietRun( 'which test-controller' ) or
1435
                 quietRun( 'which ovs-testcontroller' ) )
1436

    
1437
class NOX( Controller ):
1438
    "Controller to run a NOX application."
1439

    
1440
    def __init__( self, name, *noxArgs, **kwargs ):
1441
        """Init.
1442
           name: name to give controller
1443
           noxArgs: arguments (strings) to pass to NOX"""
1444
        if not noxArgs:
1445
            warn( 'warning: no NOX modules specified; '
1446
                  'running packetdump only\n' )
1447
            noxArgs = [ 'packetdump' ]
1448
        elif type( noxArgs ) not in ( list, tuple ):
1449
            noxArgs = [ noxArgs ]
1450

    
1451
        if 'NOX_CORE_DIR' not in os.environ:
1452
            exit( 'exiting; please set missing NOX_CORE_DIR env var' )
1453
        noxCoreDir = os.environ[ 'NOX_CORE_DIR' ]
1454

    
1455
        Controller.__init__( self, name,
1456
                             command=noxCoreDir + '/nox_core',
1457
                             cargs='--libdir=/usr/local/lib -v -i ptcp:%s ' +
1458
                             ' '.join( noxArgs ),
1459
                             cdir=noxCoreDir,
1460
                             **kwargs )
1461

    
1462
class Ryu( Controller ):
1463
    "Controller to run Ryu application"
1464
    def __init__( self, name, *ryuArgs, **kwargs ):
1465
        """Init.
1466
        name: name to give controller.
1467
        ryuArgs: arguments and modules to pass to Ryu"""
1468
        homeDir = quietRun( 'printenv HOME' ).strip( '\r\n' )
1469
        ryuCoreDir = '%s/ryu/ryu/app/' % homeDir
1470
        if not ryuArgs:
1471
            warn( 'warning: no Ryu modules specified; '
1472
                  'running simple_switch only\n' )
1473
            ryuArgs = [ ryuCoreDir + 'simple_switch.py' ]
1474
        elif type( ryuArgs ) not in ( list, tuple ):
1475
            ryuArgs = [ ryuArgs ]
1476

    
1477
        Controller.__init__( self, name,
1478
                             command='ryu-manager',
1479
                             cargs='--ofp-tcp-listen-port %s ' +
1480
                             ' '.join( ryuArgs ),
1481
                             cdir=ryuCoreDir,
1482
                             **kwargs )
1483

    
1484

    
1485
class RemoteController( Controller ):
1486
    "Controller running outside of Mininet's control."
1487

    
1488
    def __init__( self, name, ip='127.0.0.1',
1489
                  port=None, **kwargs):
1490
        """Init.
1491
           name: name to give controller
1492
           ip: the IP address where the remote controller is
1493
           listening
1494
           port: the port where the remote controller is listening"""
1495
        Controller.__init__( self, name, ip=ip, port=port, **kwargs )
1496

    
1497
    def start( self ):
1498
        "Overridden to do nothing."
1499
        return
1500

    
1501
    def stop( self ):
1502
        "Overridden to do nothing."
1503
        return
1504

    
1505
    def checkListening( self ):
1506
        "Warn if remote controller is not accessible"
1507
        if self.port is not None:
1508
            self.isListening( self.ip, self.port )
1509
        else:
1510
            for port in 6653, 6633:
1511
                if self.isListening( self.ip, port ):
1512
                    self.port = port
1513
                    info( "Connecting to remote controller"
1514
                          " at %s:%d\n" % ( self.ip, self.port ))
1515
                    break
1516

    
1517
        if self.port is None:
1518
            self.port = 6653
1519
            warn( "Setting remote controller"
1520
                  " to %s:%d\n" % ( self.ip, self.port ))
1521

    
1522
    def isListening( self, ip, port ):
1523
        "Check if a remote controller is listening at a specific ip and port"
1524
        listening = self.cmd( "echo A | telnet -e A %s %d" % ( ip, port ) )
1525
        if 'Connected' not in listening:
1526
            warn( "Unable to contact the remote controller"
1527
                  " at %s:%d\n" % ( ip, port ) )
1528
            return False
1529
        else:
1530
            return True
1531

    
1532
DefaultControllers = ( Controller, OVSController )
1533

    
1534
def findController( controllers=DefaultControllers ):
1535
    "Return first available controller from list, if any"
1536
    for controller in controllers:
1537
        if controller.isAvailable():
1538
            return controller
1539

    
1540
def DefaultController( name, controllers=DefaultControllers, **kwargs ):
1541
    "Find a controller that is available and instantiate it"
1542
    controller = findController( controllers )
1543
    if not controller:
1544
        raise Exception( 'Could not find a default OpenFlow controller' )
1545
    return controller( name, **kwargs )
1546

    
1547
def NullController( *_args, **_kwargs ):
1548
    "Nonexistent controller - simply returns None"
1549
    return None