Statistics
| Branch: | Tag: | Revision:

mininet / mininet / node.py @ 574d634f

History | View | Annotate | Download (59.2 KB)

1
"""
2
Node objects for Mininet.
3

4
Nodes provide a simple abstraction for interacting with hosts, switches
5
and controllers. Local nodes are simply one or more processes on the local
6
machine.
7

8
Node: superclass for all (primarily local) network nodes.
9

10
Host: a virtual host. By default, a host is simply a shell; commands
11
    may be sent using Cmd (which waits for output), or using sendCmd(),
12
    which returns immediately, allowing subsequent monitoring using
13
    monitor(). Examples of how to run experiments using this
14
    functionality are provided in the examples/ directory. By default,
15
    hosts share the root file system, but they may also specify private
16
    directories.
17

18
CPULimitedHost: a virtual host whose CPU bandwidth is limited by
19
    RT or CFS bandwidth limiting.
20

21
Switch: superclass for switch nodes.
22

23
UserSwitch: a switch using the user-space switch from the OpenFlow
24
    reference implementation.
25

26
KernelSwitch: a switch using the kernel switch from the OpenFlow reference
27
    implementation.
28

29
OVSSwitch: a switch using the OpenVSwitch OpenFlow-compatible switch
30
    implementation (openvswitch.org).
31

32
Controller: superclass for OpenFlow controllers. The default controller
33
    is controller(8) from the reference implementation.
34

35
NOXController: a controller node using NOX (noxrepo.org).
36

37
RemoteController: a remote controller node, which may use any
38
    arbitrary OpenFlow-compatible controller, and which is not
39
    created or managed by mininet.
40

41
Future enhancements:
42

43
- Possibly make Node, Switch and Controller more abstract so that
44
  they can be used for both local and remote nodes
45

46
- Create proxy objects for remote nodes (Mininet: Cluster Edition)
47
"""
48

    
49
import os
50
import pty
51
import re
52
import signal
53
import select
54
from subprocess import Popen, PIPE
55
from time import sleep
56

    
57
from mininet.log import info, error, warn, debug
58
from mininet.util import ( quietRun, errRun, errFail, moveIntf, isShellBuiltin,
59
                           numCores, retry, mountCgroups )
60
from mininet.moduledeps import moduleDeps, pathCheck, OVS_KMOD, OF_KMOD, TUN
61
from mininet.link import Link, Intf, TCIntf, OVSIntf
62
from re import findall
63
from distutils.version import StrictVersion
64

    
65
class Node( object ):
66
    """A virtual network node is simply a shell in a network namespace.
67
       We communicate with it using pipes."""
68

    
69
    portBase = 0  # Nodes always start with eth0/port0, even in OF 1.0
70

    
71
    def __init__( self, name, inNamespace=True, **params ):
72
        """name: name of node
73
           inNamespace: in network namespace?
74
           privateDirs: list of private directory strings or tuples
75
           params: Node parameters (see config() for details)"""
76

    
77
        # Make sure class actually works
78
        self.checkSetup()
79

    
80
        self.name = params.get( 'name', name )
81
        self.privateDirs = params.get( 'privateDirs', [] )
82
        self.inNamespace = params.get( 'inNamespace', inNamespace )
83

    
84
        # Stash configuration parameters for future reference
85
        self.params = params
86

    
87
        self.intfs = {}  # dict of port numbers to interfaces
88
        self.ports = {}  # dict of interfaces to port numbers
89
                         # replace with Port objects, eventually ?
90
        self.nameToIntf = {}  # dict of interface names to Intfs
91

    
92
        # Make pylint happy
93
        ( self.shell, self.execed, self.pid, self.stdin, self.stdout,
94
            self.lastPid, self.lastCmd, self.pollOut ) = (
95
                None, None, None, None, None, None, None, None )
96
        self.waiting = False
97
        self.readbuf = ''
98

    
99
        # Start command interpreter shell
100
        self.startShell()
101
        self.mountPrivateDirs()
102

    
103
    # File descriptor to node mapping support
104
    # Class variables and methods
105

    
106
    inToNode = {}  # mapping of input fds to nodes
107
    outToNode = {}  # mapping of output fds to nodes
108

    
109
    @classmethod
110
    def fdToNode( cls, fd ):
111
        """Return node corresponding to given file descriptor.
112
           fd: file descriptor
113
           returns: node"""
114
        node = cls.outToNode.get( fd )
115
        return node or cls.inToNode.get( fd )
116

    
117
    # Command support via shell process in namespace
118
    def startShell( self, mnopts=None ):
119
        "Start a shell process for running commands"
120
        if self.shell:
121
            error( "%s: shell is already running\n" % self.name )
122
            return
123
        # mnexec: (c)lose descriptors, (d)etach from tty,
124
        # (p)rint pid, and run in (n)amespace
125
        opts = '-cd' if mnopts is None else mnopts
126
        if self.inNamespace:
127
            opts += 'n'
128
        # bash -m: enable job control, i: force interactive
129
        # -s: pass $* to shell, and make process easy to find in ps
130
        # prompt is set to sentinel chr( 127 )
131
        cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ),
132
                'bash', '--norc', '-mis', 'mininet:' + self.name ]
133
        # Spawn a shell subprocess in a pseudo-tty, to disable buffering
134
        # in the subprocess and insulate it from signals (e.g. SIGINT)
135
        # received by the parent
136
        master, slave = pty.openpty()
137
        self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
138
                                  close_fds=False )
139
        self.stdin = os.fdopen( master, 'rw' )
140
        self.stdout = self.stdin
141
        self.pid = self.shell.pid
142
        self.pollOut = select.poll()
143
        self.pollOut.register( self.stdout )
144
        # Maintain mapping between file descriptors and nodes
145
        # This is useful for monitoring multiple nodes
146
        # using select.poll()
147
        self.outToNode[ self.stdout.fileno() ] = self
148
        self.inToNode[ self.stdin.fileno() ] = self
149
        self.execed = False
150
        self.lastCmd = None
151
        self.lastPid = None
152
        self.readbuf = ''
153
        # Wait for prompt
154
        while True:
155
            data = self.read( 1024 )
156
            if data[ -1 ] == chr( 127 ):
157
                break
158
            self.pollOut.poll()
159
        self.waiting = False
160
        self.cmd( 'stty -echo' )
161
        self.cmd( 'set +m' )
162

    
163
    def mountPrivateDirs( self ):
164
        "mount private directories"
165
        for directory in self.privateDirs:
166
            if isinstance( directory, tuple ):
167
                # mount given private directory
168
                privateDir = directory[ 1 ] % self.__dict__
169
                mountPoint = directory[ 0 ]
170
                self.cmd( 'mkdir -p %s' % privateDir )
171
                self.cmd( 'mkdir -p %s' % mountPoint )
172
                self.cmd( 'mount --bind %s %s' %
173
                               ( privateDir, mountPoint ) )
174
            else:
175
                # mount temporary filesystem on directory
176
                self.cmd( 'mkdir -p %s' % directory )
177
                self.cmd( 'mount -n -t tmpfs tmpfs %s' % directory )
178

    
179
    def unmountPrivateDirs( self ):
180
        "mount private directories"
181
        for directory in self.privateDirs:
182
            if isinstance( directory, tuple ):
183
                self.cmd( 'umount ', directory[ 0 ] )
184
            else:
185
                self.cmd( 'umount ', directory )
186

    
187
    def _popen( self, cmd, **params ):
188
        """Internal method: spawn and return a process
189
            cmd: command to run (list)
190
            params: parameters to Popen()"""
191
        # Leave this is as an instance method for now
192
        assert self
193
        return Popen( cmd, **params )
194

    
195
    def cleanup( self ):
196
        "Help python collect its garbage."
197
        # We used to do this, but it slows us down:
198
        # Intfs may end up in root NS
199
        # for intfName in self.intfNames():
200
        # if self.name in intfName:
201
        # quietRun( 'ip link del ' + intfName )
202
        self.shell = None
203

    
204
    # Subshell I/O, commands and control
205

    
206
    def read( self, maxbytes=1024 ):
207
        """Buffered read from node, non-blocking.
208
           maxbytes: maximum number of bytes to return"""
209
        count = len( self.readbuf )
210
        if count < maxbytes:
211
            data = os.read( self.stdout.fileno(), maxbytes - count )
212
            self.readbuf += data
213
        if maxbytes >= len( self.readbuf ):
214
            result = self.readbuf
215
            self.readbuf = ''
216
        else:
217
            result = self.readbuf[ :maxbytes ]
218
            self.readbuf = self.readbuf[ maxbytes: ]
219
        return result
220

    
221
    def readline( self ):
222
        """Buffered readline from node, non-blocking.
223
           returns: line (minus newline) or None"""
224
        self.readbuf += self.read( 1024 )
225
        if '\n' not in self.readbuf:
226
            return None
227
        pos = self.readbuf.find( '\n' )
228
        line = self.readbuf[ 0: pos ]
229
        self.readbuf = self.readbuf[ pos + 1: ]
230
        return line
231

    
232
    def write( self, data ):
233
        """Write data to node.
234
           data: string"""
235
        os.write( self.stdin.fileno(), data )
236

    
237
    def terminate( self ):
238
        "Send kill signal to Node and clean up after it."
239
        self.unmountPrivateDirs()
240
        if self.shell:
241
            if self.shell.poll() is None:
242
                os.killpg( self.shell.pid, signal.SIGHUP )
243
        self.cleanup()
244

    
245
    def stop( self, deleteIntfs=False ):
246
        """Stop node.
247
           deleteIntfs: delete interfaces? (False)"""
248
        if deleteIntfs:
249
            self.deleteIntfs()
250
        self.terminate()
251

    
252
    def waitReadable( self, timeoutms=None ):
253
        """Wait until node's output is readable.
254
           timeoutms: timeout in ms or None to wait indefinitely."""
255
        if len( self.readbuf ) == 0:
256
            self.pollOut.poll( timeoutms )
257

    
258
    def sendCmd( self, *args, **kwargs ):
259
        """Send a command, followed by a command to echo a sentinel,
260
           and return without waiting for the command to complete.
261
           args: command and arguments, or string
262
           printPid: print command's PID?"""
263
        assert self.shell and not self.waiting
264
        printPid = kwargs.get( 'printPid', True )
265
        # Allow sendCmd( [ list ] )
266
        if len( args ) == 1 and isinstance( args[ 0 ], list ):
267
            cmd = args[ 0 ]
268
        # Allow sendCmd( cmd, arg1, arg2... )
269
        elif len( args ) > 0:
270
            cmd = args
271
        # Convert to string
272
        if not isinstance( cmd, str ):
273
            cmd = ' '.join( [ str( c ) for c in cmd ] )
274
        if not re.search( r'\w', cmd ):
275
            # Replace empty commands with something harmless
276
            cmd = 'echo -n'
277
        self.lastCmd = cmd
278
        # if a builtin command is backgrounded, it still yields a PID
279
        if len( cmd ) > 0 and cmd[ -1 ] == '&':
280
            # print ^A{pid}\n so monitor() can set lastPid
281
            cmd += ' printf "\\001%d\\012" $! '
282
        elif printPid and not isShellBuiltin( cmd ):
283
            cmd = 'mnexec -p ' + cmd
284
        self.write( cmd + '\n' )
285
        self.lastPid = None
286
        self.waiting = True
287

    
288
    def sendInt( self, intr=chr( 3 ) ):
289
        "Interrupt running command."
290
        debug( 'sendInt: writing chr(%d)\n' % ord( intr ) )
291
        self.write( intr )
292

    
293
    def monitor( self, timeoutms=None, findPid=True ):
294
        """Monitor and return the output of a command.
295
           Set self.waiting to False if command has completed.
296
           timeoutms: timeout in ms or None to wait indefinitely
297
           findPid: look for PID from mnexec -p"""
298
        self.waitReadable( timeoutms )
299
        data = self.read( 1024 )
300
        pidre = r'\[\d+\] \d+\r\n'
301
        # Look for PID
302
        marker = chr( 1 ) + r'\d+\r\n'
303
        if findPid and chr( 1 ) in data:
304
            # suppress the job and PID of a backgrounded command
305
            if re.findall( pidre, data ):
306
                data = re.sub( pidre, '', data )
307
            # Marker can be read in chunks; continue until all of it is read
308
            while not re.findall( marker, data ):
309
                data += self.read( 1024 )
310
            markers = re.findall( marker, data )
311
            if markers:
312
                self.lastPid = int( markers[ 0 ][ 1: ] )
313
                data = re.sub( marker, '', data )
314
        # Look for sentinel/EOF
315
        if len( data ) > 0 and data[ -1 ] == chr( 127 ):
316
            self.waiting = False
317
            data = data[ :-1 ]
318
        elif chr( 127 ) in data:
319
            self.waiting = False
320
            data = data.replace( chr( 127 ), '' )
321
        return data
322

    
323
    def waitOutput( self, verbose=False, findPid=True ):
324
        """Wait for a command to complete.
325
           Completion is signaled by a sentinel character, ASCII(127)
326
           appearing in the output stream.  Wait for the sentinel and return
327
           the output, including trailing newline.
328
           verbose: print output interactively"""
329
        log = info if verbose else debug
330
        output = ''
331
        while self.waiting:
332
            data = self.monitor( findPid=findPid )
333
            output += data
334
            log( data )
335
        return output
336

    
337
    def cmd( self, *args, **kwargs ):
338
        """Send a command, wait for output, and return it.
339
           cmd: string"""
340
        verbose = kwargs.get( 'verbose', False )
341
        log = info if verbose else debug
342
        log( '*** %s : %s\n' % ( self.name, args ) )
343
        if self.shell:
344
            self.sendCmd( *args, **kwargs )
345
            return self.waitOutput( verbose )
346
        else:
347
            warn( '(%s exited - ignoring cmd%s)\n' % ( self, args ) )
348

    
349
    def cmdPrint( self, *args):
350
        """Call cmd and printing its output
351
           cmd: string"""
352
        return self.cmd( *args, **{ 'verbose': True } )
353

    
354
    def popen( self, *args, **kwargs ):
355
        """Return a Popen() object in our namespace
356
           args: Popen() args, single list, or string
357
           kwargs: Popen() keyword args"""
358
        defaults = { 'stdout': PIPE, 'stderr': PIPE,
359
                     'mncmd':
360
                     [ 'mnexec', '-da', str( self.pid ) ] }
361
        defaults.update( kwargs )
362
        if len( args ) == 1:
363
            if isinstance( args[ 0 ], list ):
364
                # popen([cmd, arg1, arg2...])
365
                cmd = args[ 0 ]
366
            elif isinstance( args[ 0 ], basestring ):
367
                # popen("cmd arg1 arg2...")
368
                cmd = args[ 0 ].split()
369
            else:
370
                raise Exception( 'popen() requires a string or list' )
371
        elif len( args ) > 0:
372
            # popen( cmd, arg1, arg2... )
373
            cmd = list( args )
374
        # Attach to our namespace  using mnexec -a
375
        cmd = defaults.pop( 'mncmd' ) + cmd
376
        # Shell requires a string, not a list!
377
        if defaults.get( 'shell', False ):
378
            cmd = ' '.join( cmd )
379
        popen = self._popen( cmd, **defaults )
380
        return popen
381

    
382
    def pexec( self, *args, **kwargs ):
383
        """Execute a command using popen
384
           returns: out, err, exitcode"""
385
        popen = self.popen( *args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
386
                            **kwargs )
387
        # Warning: this can fail with large numbers of fds!
388
        out, err = popen.communicate()
389
        exitcode = popen.wait()
390
        return out, err, exitcode
391

    
392
    # Interface management, configuration, and routing
393

    
394
    # BL notes: This might be a bit redundant or over-complicated.
395
    # However, it does allow a bit of specialization, including
396
    # changing the canonical interface names. It's also tricky since
397
    # the real interfaces are created as veth pairs, so we can't
398
    # make a single interface at a time.
399

    
400
    def newPort( self ):
401
        "Return the next port number to allocate."
402
        if len( self.ports ) > 0:
403
            return max( self.ports.values() ) + 1
404
        return self.portBase
405

    
406
    def addIntf( self, intf, port=None, moveIntfFn=moveIntf ):
407
        """Add an interface.
408
           intf: interface
409
           port: port number (optional, typically OpenFlow port number)
410
           moveIntfFn: function to move interface (optional)"""
411
        if port is None:
412
            port = self.newPort()
413
        self.intfs[ port ] = intf
414
        self.ports[ intf ] = port
415
        self.nameToIntf[ intf.name ] = intf
416
        debug( '\n' )
417
        debug( 'added intf %s (%d) to node %s\n' % (
418
                intf, port, self.name ) )
419
        if self.inNamespace:
420
            debug( 'moving', intf, 'into namespace for', self.name, '\n' )
421
            moveIntfFn( intf.name, self  )
422

    
423
    def defaultIntf( self ):
424
        "Return interface for lowest port"
425
        ports = self.intfs.keys()
426
        if ports:
427
            return self.intfs[ min( ports ) ]
428
        else:
429
            warn( '*** defaultIntf: warning:', self.name,
430
                  'has no interfaces\n' )
431

    
432
    def intf( self, intf=None ):
433
        """Return our interface object with given string name,
434
           default intf if name is falsy (None, empty string, etc).
435
           or the input intf arg.
436

437
        Having this fcn return its arg for Intf objects makes it
438
        easier to construct functions with flexible input args for
439
        interfaces (those that accept both string names and Intf objects).
440
        """
441
        if not intf:
442
            return self.defaultIntf()
443
        elif isinstance( intf, basestring):
444
            return self.nameToIntf[ intf ]
445
        else:
446
            return intf
447

    
448
    def connectionsTo( self, node):
449
        "Return [ intf1, intf2... ] for all intfs that connect self to node."
450
        # We could optimize this if it is important
451
        connections = []
452
        for intf in self.intfList():
453
            link = intf.link
454
            if link:
455
                node1, node2 = link.intf1.node, link.intf2.node
456
                if node1 == self and node2 == node:
457
                    connections += [ ( intf, link.intf2 ) ]
458
                elif node1 == node and node2 == self:
459
                    connections += [ ( intf, link.intf1 ) ]
460
        return connections
461

    
462
    def deleteIntfs( self, checkName=True ):
463
        """Delete all of our interfaces.
464
           checkName: only delete interfaces that contain our name"""
465
        # In theory the interfaces should go away after we shut down.
466
        # However, this takes time, so we're better off removing them
467
        # explicitly so that we won't get errors if we run before they
468
        # have been removed by the kernel. Unfortunately this is very slow,
469
        # at least with Linux kernels before 2.6.33
470
        for intf in self.intfs.values():
471
            # Protect against deleting hardware interfaces
472
            if ( self.name in intf.name ) or ( not checkName ):
473
                intf.delete()
474
                info( '.' )
475

    
476
    # Routing support
477

    
478
    def setARP( self, ip, mac ):
479
        """Add an ARP entry.
480
           ip: IP address as string
481
           mac: MAC address as string"""
482
        result = self.cmd( 'arp', '-s', ip, mac )
483
        return result
484

    
485
    def setHostRoute( self, ip, intf ):
486
        """Add route to host.
487
           ip: IP address as dotted decimal
488
           intf: string, interface name"""
489
        return self.cmd( 'route add -host', ip, 'dev', intf )
490

    
491
    def setDefaultRoute( self, intf=None ):
492
        """Set the default route to go through intf.
493
           intf: Intf or {dev <intfname> via <gw-ip> ...}"""
494
        # Note setParam won't call us if intf is none
495
        if isinstance( intf, basestring ) and ' ' in intf:
496
            params = intf
497
        else:
498
            params = 'dev %s' % intf
499
        # Do this in one line in case we're messing with the root namespace
500
        self.cmd( 'ip route del default; ip route add default', params )
501

    
502
    # Convenience and configuration methods
503

    
504
    def setMAC( self, mac, intf=None ):
505
        """Set the MAC address for an interface.
506
           intf: intf or intf name
507
           mac: MAC address as string"""
508
        return self.intf( intf ).setMAC( mac )
509

    
510
    def setIP( self, ip, prefixLen=8, intf=None ):
511
        """Set the IP address for an interface.
512
           intf: intf or intf name
513
           ip: IP address as a string
514
           prefixLen: prefix length, e.g. 8 for /8 or 16M addrs"""
515
        # This should probably be rethought
516
        if '/' not in ip:
517
            ip = '%s/%s' % ( ip, prefixLen )
518
        return self.intf( intf ).setIP( ip )
519

    
520
    def IP( self, intf=None ):
521
        "Return IP address of a node or specific interface."
522
        return self.intf( intf ).IP()
523

    
524
    def MAC( self, intf=None ):
525
        "Return MAC address of a node or specific interface."
526
        return self.intf( intf ).MAC()
527

    
528
    def intfIsUp( self, intf=None ):
529
        "Check if an interface is up."
530
        return self.intf( intf ).isUp()
531

    
532
    # The reason why we configure things in this way is so
533
    # That the parameters can be listed and documented in
534
    # the config method.
535
    # Dealing with subclasses and superclasses is slightly
536
    # annoying, but at least the information is there!
537

    
538
    def setParam( self, results, method, **param ):
539
        """Internal method: configure a *single* parameter
540
           results: dict of results to update
541
           method: config method name
542
           param: arg=value (ignore if value=None)
543
           value may also be list or dict"""
544
        name, value = param.items()[ 0 ]
545
        if value is None:
546
            return
547
        f = getattr( self, method, None )
548
        if not f:
549
            return
550
        if isinstance( value, list ):
551
            result = f( *value )
552
        elif isinstance( value, dict ):
553
            result = f( **value )
554
        else:
555
            result = f( value )
556
        results[ name ] = result
557
        return result
558

    
559
    def config( self, mac=None, ip=None,
560
                defaultRoute=None, lo='up', **_params ):
561
        """Configure Node according to (optional) parameters:
562
           mac: MAC address for default interface
563
           ip: IP address for default interface
564
           ifconfig: arbitrary interface configuration
565
           Subclasses should override this method and call
566
           the parent class's config(**params)"""
567
        # If we were overriding this method, we would call
568
        # the superclass config method here as follows:
569
        # r = Parent.config( **_params )
570
        r = {}
571
        self.setParam( r, 'setMAC', mac=mac )
572
        self.setParam( r, 'setIP', ip=ip )
573
        self.setParam( r, 'setDefaultRoute', defaultRoute=defaultRoute )
574
        # This should be examined
575
        self.cmd( 'ifconfig lo ' + lo )
576
        return r
577

    
578
    def configDefault( self, **moreParams ):
579
        "Configure with default parameters"
580
        self.params.update( moreParams )
581
        self.config( **self.params )
582

    
583
    # This is here for backward compatibility
584
    def linkTo( self, node, link=Link ):
585
        """(Deprecated) Link to another node
586
           replace with Link( node1, node2)"""
587
        return link( self, node )
588

    
589
    # Other methods
590

    
591
    def intfList( self ):
592
        "List of our interfaces sorted by port number"
593
        return [ self.intfs[ p ] for p in sorted( self.intfs.iterkeys() ) ]
594

    
595
    def intfNames( self ):
596
        "The names of our interfaces sorted by port number"
597
        return [ str( i ) for i in self.intfList() ]
598

    
599
    def __repr__( self ):
600
        "More informative string representation"
601
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
602
                              for i in self.intfList() ] ) )
603
        return '<%s %s: %s pid=%s> ' % (
604
            self.__class__.__name__, self.name, intfs, self.pid )
605

    
606
    def __str__( self ):
607
        "Abbreviated string representation"
608
        return self.name
609

    
610
    # Automatic class setup support
611

    
612
    isSetup = False
613

    
614
    @classmethod
615
    def checkSetup( cls ):
616
        "Make sure our class and superclasses are set up"
617
        while cls and not getattr( cls, 'isSetup', True ):
618
            cls.setup()
619
            cls.isSetup = True
620
            # Make pylint happy
621
            cls = getattr( type( cls ), '__base__', None )
622

    
623
    @classmethod
624
    def setup( cls ):
625
        "Make sure our class dependencies are available"
626
        pathCheck( 'mnexec', 'ifconfig', moduleName='Mininet')
627

    
628
class Host( Node ):
629
    "A host is simply a Node"
630
    pass
631

    
632
class CPULimitedHost( Host ):
633

    
634
    "CPU limited host"
635

    
636
    def __init__( self, name, sched='cfs', **kwargs ):
637
        Host.__init__( self, name, **kwargs )
638
        # Initialize class if necessary
639
        if not CPULimitedHost.inited:
640
            CPULimitedHost.init()
641
        # Create a cgroup and move shell into it
642
        self.cgroup = 'cpu,cpuacct,cpuset:/' + self.name
643
        errFail( 'cgcreate -g ' + self.cgroup )
644
        # We don't add ourselves to a cpuset because you must
645
        # specify the cpu and memory placement first
646
        errFail( 'cgclassify -g cpu,cpuacct:/%s %s' % ( self.name, self.pid ) )
647
        # BL: Setting the correct period/quota is tricky, particularly
648
        # for RT. RT allows very small quotas, but the overhead
649
        # seems to be high. CFS has a mininimum quota of 1 ms, but
650
        # still does better with larger period values.
651
        self.period_us = kwargs.get( 'period_us', 100000 )
652
        self.sched = sched
653
        if sched == 'rt':
654
            self.checkRtGroupSched()
655
            self.rtprio = 20
656

    
657
    def cgroupSet( self, param, value, resource='cpu' ):
658
        "Set a cgroup parameter and return its value"
659
        cmd = 'cgset -r %s.%s=%s /%s' % (
660
            resource, param, value, self.name )
661
        quietRun( cmd )
662
        nvalue = int( self.cgroupGet( param, resource ) )
663
        if nvalue != value:
664
            error( '*** error: cgroupSet: %s set to %s instead of %s\n'
665
                   % ( param, nvalue, value ) )
666
        return nvalue
667

    
668
    def cgroupGet( self, param, resource='cpu' ):
669
        "Return value of cgroup parameter"
670
        cmd = 'cgget -r %s.%s /%s' % (
671
            resource, param, self.name )
672
        return int( quietRun( cmd ).split()[ -1 ] )
673

    
674
    def cgroupDel( self ):
675
        "Clean up our cgroup"
676
        # info( '*** deleting cgroup', self.cgroup, '\n' )
677
        _out, _err, exitcode = errRun( 'cgdelete -r ' + self.cgroup )
678
        return exitcode == 0  # success condition
679

    
680
    def popen( self, *args, **kwargs ):
681
        """Return a Popen() object in node's namespace
682
           args: Popen() args, single list, or string
683
           kwargs: Popen() keyword args"""
684
        # Tell mnexec to execute command in our cgroup
685
        mncmd = [ 'mnexec', '-g', self.name,
686
                  '-da', str( self.pid ) ]
687
        # if our cgroup is not given any cpu time,
688
        # we cannot assign the RR Scheduler.
689
        if self.sched == 'rt':
690
            if int( self.cgroupGet( 'rt_runtime_us', 'cpu' ) ) <= 0:
691
                mncmd += [ '-r', str( self.rtprio ) ]
692
            else:
693
                debug( '*** error: not enough cpu time available for %s.' %
694
                       self.name, 'Using cfs scheduler for subprocess\n' )
695
        return Host.popen( self, *args, mncmd=mncmd, **kwargs )
696

    
697
    def cleanup( self ):
698
        "Clean up Node, then clean up our cgroup"
699
        super( CPULimitedHost, self ).cleanup()
700
        retry( retries=3, delaySecs=1, fn=self.cgroupDel )
701

    
702
    _rtGroupSched = False   # internal class var: Is CONFIG_RT_GROUP_SCHED set?
703

    
704
    @classmethod
705
    def checkRtGroupSched( cls ):
706
        "Check (Ubuntu,Debian) kernel config for CONFIG_RT_GROUP_SCHED for RT"
707
        if not cls._rtGroupSched:
708
            release = quietRun( 'uname -r' ).strip('\r\n')
709
            output = quietRun( 'grep CONFIG_RT_GROUP_SCHED /boot/config-%s' %
710
                               release )
711
            if output == '# CONFIG_RT_GROUP_SCHED is not set\n':
712
                error( '\n*** error: please enable RT_GROUP_SCHED '
713
                       'in your kernel\n' )
714
                exit( 1 )
715
            cls._rtGroupSched = True
716

    
717
    def chrt( self ):
718
        "Set RT scheduling priority"
719
        quietRun( 'chrt -p %s %s' % ( self.rtprio, self.pid ) )
720
        result = quietRun( 'chrt -p %s' % self.pid )
721
        firstline = result.split( '\n' )[ 0 ]
722
        lastword = firstline.split( ' ' )[ -1 ]
723
        if lastword != 'SCHED_RR':
724
            error( '*** error: could not assign SCHED_RR to %s\n' % self.name )
725
        return lastword
726

    
727
    def rtInfo( self, f ):
728
        "Internal method: return parameters for RT bandwidth"
729
        pstr, qstr = 'rt_period_us', 'rt_runtime_us'
730
        # RT uses wall clock time for period and quota
731
        quota = int( self.period_us * f )
732
        return pstr, qstr, self.period_us, quota
733

    
734
    def cfsInfo( self, f ):
735
        "Internal method: return parameters for CFS bandwidth"
736
        pstr, qstr = 'cfs_period_us', 'cfs_quota_us'
737
        # CFS uses wall clock time for period and CPU time for quota.
738
        quota = int( self.period_us * f * numCores() )
739
        period = self.period_us
740
        if f > 0 and quota < 1000:
741
            debug( '(cfsInfo: increasing default period) ' )
742
            quota = 1000
743
            period = int( quota / f / numCores() )
744
        # Reset to unlimited on negative quota
745
        if quota < 0:
746
            quota = -1
747
        return pstr, qstr, period, quota
748

    
749
    # BL comment:
750
    # This may not be the right API,
751
    # since it doesn't specify CPU bandwidth in "absolute"
752
    # units the way link bandwidth is specified.
753
    # We should use MIPS or SPECINT or something instead.
754
    # Alternatively, we should change from system fraction
755
    # to CPU seconds per second, essentially assuming that
756
    # all CPUs are the same.
757

    
758
    def setCPUFrac( self, f, sched=None ):
759
        """Set overall CPU fraction for this host
760
           f: CPU bandwidth limit (positive fraction, or -1 for cfs unlimited)
761
           sched: 'rt' or 'cfs'
762
           Note 'cfs' requires CONFIG_CFS_BANDWIDTH,
763
           and 'rt' requires CONFIG_RT_GROUP_SCHED"""
764
        if not sched:
765
            sched = self.sched
766
        if sched == 'rt':
767
            if not f or f < 0:
768
                raise Exception( 'Please set a positive CPU fraction'
769
                                 ' for sched=rt\n' )
770
            pstr, qstr, period, quota = self.rtInfo( f )
771
        elif sched == 'cfs':
772
            pstr, qstr, period, quota = self.cfsInfo( f )
773
        else:
774
            return
775
        # Set cgroup's period and quota
776
        setPeriod = self.cgroupSet( pstr, period )
777
        setQuota = self.cgroupSet( qstr, quota )
778
        if sched == 'rt':
779
            # Set RT priority if necessary
780
            sched = self.chrt()
781
        info( '(%s %d/%dus) ' % ( sched, setQuota, setPeriod ) )
782

    
783
    def setCPUs( self, cores, mems=0 ):
784
        "Specify (real) cores that our cgroup can run on"
785
        if not cores:
786
            return
787
        if isinstance( cores, list ):
788
            cores = ','.join( [ str( c ) for c in cores ] )
789
        self.cgroupSet( resource='cpuset', param='cpus',
790
                        value=cores )
791
        # Memory placement is probably not relevant, but we
792
        # must specify it anyway
793
        self.cgroupSet( resource='cpuset', param='mems',
794
                        value=mems)
795
        # We have to do this here after we've specified
796
        # cpus and mems
797
        errFail( 'cgclassify -g cpuset:/%s %s' % (
798
                 self.name, self.pid ) )
799

    
800
    def config( self, cpu=-1, cores=None, **params ):
801
        """cpu: desired overall system CPU fraction
802
           cores: (real) core(s) this host can run on
803
           params: parameters for Node.config()"""
804
        r = Node.config( self, **params )
805
        # Was considering cpu={'cpu': cpu , 'sched': sched}, but
806
        # that seems redundant
807
        self.setParam( r, 'setCPUFrac', cpu=cpu )
808
        self.setParam( r, 'setCPUs', cores=cores )
809
        return r
810

    
811
    inited = False
812

    
813
    @classmethod
814
    def init( cls ):
815
        "Initialization for CPULimitedHost class"
816
        mountCgroups()
817
        cls.inited = True
818

    
819

    
820
# Some important things to note:
821
#
822
# The "IP" address which setIP() assigns to the switch is not
823
# an "IP address for the switch" in the sense of IP routing.
824
# Rather, it is the IP address for the control interface,
825
# on the control network, and it is only relevant to the
826
# controller. If you are running in the root namespace
827
# (which is the only way to run OVS at the moment), the
828
# control interface is the loopback interface, and you
829
# normally never want to change its IP address!
830
#
831
# In general, you NEVER want to attempt to use Linux's
832
# network stack (i.e. ifconfig) to "assign" an IP address or
833
# MAC address to a switch data port. Instead, you "assign"
834
# the IP and MAC addresses in the controller by specifying
835
# packets that you want to receive or send. The "MAC" address
836
# reported by ifconfig for a switch data port is essentially
837
# meaningless. It is important to understand this if you
838
# want to create a functional router using OpenFlow.
839

    
840
class Switch( Node ):
841
    """A Switch is a Node that is running (or has execed?)
842
       an OpenFlow switch."""
843

    
844
    portBase = 1  # Switches start with port 1 in OpenFlow
845
    dpidLen = 16  # digits in dpid passed to switch
846

    
847
    def __init__( self, name, dpid=None, opts='', listenPort=None, **params):
848
        """dpid: dpid hex string (or None to derive from name, e.g. s1 -> 1)
849
           opts: additional switch options
850
           listenPort: port to listen on for dpctl connections"""
851
        Node.__init__( self, name, **params )
852
        self.dpid = self.defaultDpid( dpid )
853
        self.opts = opts
854
        self.listenPort = listenPort
855
        if not self.inNamespace:
856
            self.controlIntf = Intf( 'lo', self, port=0 )
857

    
858
    def defaultDpid( self, dpid=None ):
859
        "Return correctly formatted dpid from dpid or switch name (s1 -> 1)"
860
        if dpid:
861
            # Remove any colons and make sure it's a good hex number
862
            dpid = dpid.translate( None, ':' )
863
            assert len( dpid ) <= self.dpidLen and int( dpid, 16 ) >= 0
864
        else:
865
            # Use hex of the first number in the switch name
866
            nums = re.findall( r'\d+', self.name )
867
            if nums:
868
                dpid = hex( int( nums[ 0 ] ) )[ 2: ]
869
            else:
870
                raise Exception( 'Unable to derive default datapath ID - '
871
                                 'please either specify a dpid or use a '
872
                                 'canonical switch name such as s23.' )
873
        return '0' * ( self.dpidLen - len( dpid ) ) + dpid
874

    
875
    def defaultIntf( self ):
876
        "Return control interface"
877
        if self.controlIntf:
878
            return self.controlIntf
879
        else:
880
            return Node.defaultIntf( self )
881

    
882
    def sendCmd( self, *cmd, **kwargs ):
883
        """Send command to Node.
884
           cmd: string"""
885
        kwargs.setdefault( 'printPid', False )
886
        if not self.execed:
887
            return Node.sendCmd( self, *cmd, **kwargs )
888
        else:
889
            error( '*** Error: %s has execed and cannot accept commands' %
890
                   self.name )
891

    
892
    def connected( self ):
893
        "Is the switch connected to a controller? (override this method)"
894
        # Assume that we are connected by default to whatever we need to
895
        # be connected to. This should be overridden by any OpenFlow
896
        # switch, but not by a standalone bridge.
897
        debug( 'Assuming', repr( self ), 'is connected to a controller\n' )
898
        return True
899

    
900
    def __repr__( self ):
901
        "More informative string representation"
902
        intfs = ( ','.join( [ '%s:%s' % ( i.name, i.IP() )
903
                              for i in self.intfList() ] ) )
904
        return '<%s %s: %s pid=%s> ' % (
905
            self.__class__.__name__, self.name, intfs, self.pid )
906

    
907

    
908
class UserSwitch( Switch ):
909
    "User-space switch."
910

    
911
    dpidLen = 12
912

    
913
    def __init__( self, name, dpopts='--no-slicing', **kwargs ):
914
        """Init.
915
           name: name for the switch
916
           dpopts: additional arguments to ofdatapath (--no-slicing)"""
917
        Switch.__init__( self, name, **kwargs )
918
        pathCheck( 'ofdatapath', 'ofprotocol',
919
                   moduleName='the OpenFlow reference user switch' +
920
                              '(openflow.org)' )
921
        if self.listenPort:
922
            self.opts += ' --listen=ptcp:%i ' % self.listenPort
923
        else:
924
            self.opts += ' --listen=punix:/tmp/%s.listen' % self.name
925
        self.dpopts = dpopts
926

    
927
    @classmethod
928
    def setup( cls ):
929
        "Ensure any dependencies are loaded; if not, try to load them."
930
        if not os.path.exists( '/dev/net/tun' ):
931
            moduleDeps( add=TUN )
932

    
933
    def dpctl( self, *args ):
934
        "Run dpctl command"
935
        listenAddr = None
936
        if not self.listenPort:
937
            listenAddr = 'unix:/tmp/%s.listen' % self.name
938
        else:
939
            listenAddr = 'tcp:127.0.0.1:%i' % self.listenPort
940
        return self.cmd( 'dpctl ' + ' '.join( args ) +
941
                         ' ' + listenAddr )
942

    
943
    def connected( self ):
944
        "Is the switch connected to a controller?"
945
        status = self.dpctl( 'status' )
946
        return ( 'remote.is-connected=true' in status and
947
                 'local.is-connected=true' in status )
948

    
949
    @staticmethod
950
    def TCReapply( intf ):
951
        """Unfortunately user switch and Mininet are fighting
952
           over tc queuing disciplines. To resolve the conflict,
953
           we re-create the user switch's configuration, but as a
954
           leaf of the TCIntf-created configuration."""
955
        if isinstance( intf, TCIntf ):
956
            ifspeed = 10000000000  # 10 Gbps
957
            minspeed = ifspeed * 0.001
958

    
959
            res = intf.config( **intf.params )
960

    
961
            if res is None:  # link may not have TC parameters
962
                return
963

    
964
            # Re-add qdisc, root, and default classes user switch created, but
965
            # with new parent, as setup by Mininet's TCIntf
966
            parent = res['parent']
967
            intf.tc( "%s qdisc add dev %s " + parent +
968
                     " handle 1: htb default 0xfffe" )
969
            intf.tc( "%s class add dev %s classid 1:0xffff parent 1: htb rate "
970
                     + str(ifspeed) )
971
            intf.tc( "%s class add dev %s classid 1:0xfffe parent 1:0xffff " +
972
                     "htb rate " + str(minspeed) + " ceil " + str(ifspeed) )
973

    
974
    def start( self, controllers ):
975
        """Start OpenFlow reference user datapath.
976
           Log to /tmp/sN-{ofd,ofp}.log.
977
           controllers: list of controller objects"""
978
        # Add controllers
979
        clist = ','.join( [ 'tcp:%s:%d' % ( c.IP(), c.port )
980
                            for c in controllers ] )
981
        ofdlog = '/tmp/' + self.name + '-ofd.log'
982
        ofplog = '/tmp/' + self.name + '-ofp.log'
983
        intfs = [ str( i ) for i in self.intfList() if not i.IP() ]
984
        self.cmd( 'ofdatapath -i ' + ','.join( intfs ) +
985
                  ' punix:/tmp/' + self.name + ' -d %s ' % self.dpid +
986
                  self.dpopts +
987
                  ' 1> ' + ofdlog + ' 2> ' + ofdlog + ' &' )
988
        self.cmd( 'ofprotocol unix:/tmp/' + self.name +
989
                  ' ' + clist +
990
                  ' --fail=closed ' + self.opts +
991
                  ' 1> ' + ofplog + ' 2>' + ofplog + ' &' )
992
        if "no-slicing" not in self.dpopts:
993
            # Only TCReapply if slicing is enable
994
            sleep(1)  # Allow ofdatapath to start before re-arranging qdisc's
995
            for intf in self.intfList():
996
                if not intf.IP():
997
                    self.TCReapply( intf )
998

    
999
    def stop( self, deleteIntfs=True ):
1000
        """Stop OpenFlow reference user datapath.
1001
           deleteIntfs: delete interfaces? (True)"""
1002
        self.cmd( 'kill %ofdatapath' )
1003
        self.cmd( 'kill %ofprotocol' )
1004
        super( UserSwitch, self ).stop( deleteIntfs )
1005

    
1006
class OVSLegacyKernelSwitch( Switch ):
1007
    """Open VSwitch legacy kernel-space switch using ovs-openflowd.
1008
       Currently only works in the root namespace."""
1009

    
1010
    def __init__( self, name, dp=None, **kwargs ):
1011
        """Init.
1012
           name: name for switch
1013
           dp: netlink id (0, 1, 2, ...)
1014
           defaultMAC: default MAC as unsigned int; random value if None"""
1015
        Switch.__init__( self, name, **kwargs )
1016
        self.dp = dp if dp else self.name
1017
        self.intf = self.dp
1018
        if self.inNamespace:
1019
            error( "OVSKernelSwitch currently only works"
1020
                   " in the root namespace.\n" )
1021
            exit( 1 )
1022

    
1023
    @classmethod
1024
    def setup( cls ):
1025
        "Ensure any dependencies are loaded; if not, try to load them."
1026
        pathCheck( 'ovs-dpctl', 'ovs-openflowd',
1027
                   moduleName='Open vSwitch (openvswitch.org)')
1028
        moduleDeps( subtract=OF_KMOD, add=OVS_KMOD )
1029

    
1030
    def start( self, controllers ):
1031
        "Start up kernel datapath."
1032
        ofplog = '/tmp/' + self.name + '-ofp.log'
1033
        # Delete local datapath if it exists;
1034
        # then create a new one monitoring the given interfaces
1035
        self.cmd( 'ovs-dpctl del-dp ' + self.dp )
1036
        self.cmd( 'ovs-dpctl add-dp ' + self.dp )
1037
        intfs = [ str( i ) for i in self.intfList() if not i.IP() ]
1038
        self.cmd( 'ovs-dpctl', 'add-if', self.dp, ' '.join( intfs ) )
1039
        # Run protocol daemon
1040
        clist = ','.join( [ 'tcp:%s:%d' % ( c.IP(), c.port )
1041
                            for c in controllers ] )
1042
        self.cmd( 'ovs-openflowd ' + self.dp +
1043
                  ' ' + clist +
1044
                  ' --fail=secure ' + self.opts +
1045
                  ' --datapath-id=' + self.dpid +
1046
                  ' 1>' + ofplog + ' 2>' + ofplog + '&' )
1047
        self.execed = False
1048

    
1049
    def stop( self, deleteIntfs=True ):
1050
        """Terminate kernel datapath."
1051
           deleteIntfs: delete interfaces? (True)"""
1052
        quietRun( 'ovs-dpctl del-dp ' + self.dp )
1053
        self.cmd( 'kill %ovs-openflowd' )
1054
        super( OVSLegacyKernelSwitch, self ).stop( deleteIntfs )
1055

    
1056

    
1057
class OVSSwitch( Switch ):
1058
    "Open vSwitch switch. Depends on ovs-vsctl."
1059

    
1060
    def __init__( self, name, failMode='secure', datapath='kernel',
1061
                  inband=False, protocols=None,
1062
                  reconnectms=1000, stp=False, **params ):
1063
        """name: name for switch
1064
           failMode: controller loss behavior (secure|open)
1065
           datapath: userspace or kernel mode (kernel|user)
1066
           inband: use in-band control (False)
1067
           protocols: use specific OpenFlow version(s) (e.g. OpenFlow13)
1068
                      Unspecified (or old OVS version) uses OVS default
1069
           reconnectms: max reconnect timeout in ms (0/None for default)
1070
           stp: enable STP (False, requires failMode=standalone)"""
1071
        Switch.__init__( self, name, **params )
1072
        self.failMode = failMode
1073
        self.datapath = datapath
1074
        self.inband = inband
1075
        self.protocols = protocols
1076
        self.reconnectms = reconnectms
1077
        self.stp = stp
1078
        self._uuids = []  # controller UUIDs
1079

    
1080
    @classmethod
1081
    def setup( cls ):
1082
        "Make sure Open vSwitch is installed and working"
1083
        pathCheck( 'ovs-vsctl',
1084
                   moduleName='Open vSwitch (openvswitch.org)')
1085
        # This should no longer be needed, and it breaks
1086
        # with OVS 1.7 which has renamed the kernel module:
1087
        #  moduleDeps( subtract=OF_KMOD, add=OVS_KMOD )
1088
        out, err, exitcode = errRun( 'ovs-vsctl -t 1 show' )
1089
        if exitcode:
1090
            error( out + err +
1091
                   'ovs-vsctl exited with code %d\n' % exitcode +
1092
                   '*** Error connecting to ovs-db with ovs-vsctl\n'
1093
                   'Make sure that Open vSwitch is installed, '
1094
                   'that ovsdb-server is running, and that\n'
1095
                   '"ovs-vsctl show" works correctly.\n'
1096
                   'You may wish to try '
1097
                   '"service openvswitch-switch start".\n' )
1098
            exit( 1 )
1099
        version = quietRun( 'ovs-vsctl --version' )
1100
        cls.OVSVersion = findall( r'\d+\.\d+', version )[ 0 ]
1101

    
1102
    @classmethod
1103
    def isOldOVS( cls ):
1104
        "Is OVS ersion < 1.10?"
1105
        return ( StrictVersion( cls.OVSVersion ) <
1106
                 StrictVersion( '1.10' ) )
1107

    
1108
    @classmethod
1109
    def batchShutdown( cls, switches ):
1110
        "Shut down a list of OVS switches"
1111
        delcmd = 'del-br %s'
1112
        if not cls.isOldOVS():
1113
            delcmd = '--if-exists ' + delcmd
1114
        # First, delete them all from ovsdb
1115
        quietRun( 'ovs-vsctl ' +
1116
                  ' -- '.join( delcmd % s for s in switches ) )
1117
        # Next, shut down all of the processes
1118
        pids = ' '.join( str( switch.pid ) for switch in switches )
1119
        quietRun( 'kill -HUP ' + pids )
1120
        for switch in switches:
1121
            switch.shell = None
1122
        return True
1123

    
1124
    def dpctl( self, *args ):
1125
        "Run ovs-ofctl command"
1126
        return self.cmd( 'ovs-ofctl', args[ 0 ], self, *args[ 1: ] )
1127

    
1128
    def vsctl( self, *args, **kwargs ):
1129
        "Run ovs-vsctl command"
1130
        return self.cmd( 'ovs-vsctl', *args, **kwargs )
1131

    
1132
    @staticmethod
1133
    def TCReapply( intf ):
1134
        """Unfortunately OVS and Mininet are fighting
1135
           over tc queuing disciplines. As a quick hack/
1136
           workaround, we clear OVS's and reapply our own."""
1137
        if isinstance( intf, TCIntf ):
1138
            intf.config( **intf.params )
1139

    
1140
    def attach( self, intf ):
1141
        "Connect a data port"
1142
        self.vsctl( 'add-port', self, intf )
1143
        self.cmd( 'ifconfig', intf, 'up' )
1144
        self.TCReapply( intf )
1145

    
1146
    def detach( self, intf ):
1147
        "Disconnect a data port"
1148
        self.vsctl( 'del-port', self, intf )
1149

    
1150
    def controllerUUIDs( self, update=False ):
1151
        """Return ovsdb UUIDs for our controllers
1152
           update: update cached value"""
1153
        if not self._uuids or update:
1154
            controllers = self.cmd( 'ovs-vsctl -- get Bridge', self,
1155
                                    'Controller' ).strip()
1156
            if controllers.startswith( '[' ) and controllers.endswith( ']' ):
1157
                controllers = controllers[ 1 : -1 ]
1158
                if controllers:
1159
                    self._uuids = [ c.strip()
1160
                                    for c in controllers.split( ',' ) ]
1161
        return self._uuids
1162

    
1163
    def connected( self ):
1164
        "Are we connected to at least one of our controllers?"
1165
        for uuid in self.controllerUUIDs():
1166
            if 'true' in self.vsctl( '-- get Controller',
1167
                                     uuid, 'is_connected' ):
1168
                return True
1169
        return self.failMode == 'standalone'
1170

    
1171
    def intfOpts( self, intf ):
1172
        "Return OVS interface options for intf"
1173
        opts = ''
1174
        if not self.isOldOVS():
1175
            # ofport_request is not supported on old OVS
1176
            opts += ' ofport_request=%s' % self.ports[ intf ]
1177
            # Patch ports don't work well with old OVS
1178
            if isinstance( intf, OVSIntf ):
1179
                intf1, intf2 = intf.link.intf1, intf.link.intf2
1180
                peer = intf1 if intf1 != intf else intf2
1181
                opts += ' type=patch options:peer=%s' % peer
1182
        return '' if not opts else ' -- set Interface %s' % intf + opts
1183

    
1184
    def bridgeOpts( self ):
1185
        "Return OVS bridge options"
1186
        opts = ( ' other_config:datapath-id=%s' % self.dpid +
1187
                 ' fail_mode=%s' % self.failMode )
1188
        if not self.inband:
1189
            opts += ' other-config:disable-in-band=true'
1190
        if self.datapath == 'user':
1191
            opts += ' datapath_type=netdev' % self
1192
        if self.protocols and not self.isOldOVS():
1193
            opts += ' protocols=%s' % ( self, self.protocols )
1194
        if self.stp and self.failMode == 'standalone':
1195
            opts += ' stp_enable=true' % self
1196
        return opts
1197

    
1198
    def start( self, controllers ):
1199
        "Start up a new OVS OpenFlow switch using ovs-vsctl"
1200
        if self.inNamespace:
1201
            raise Exception(
1202
                'OVS kernel switch does not work in a namespace' )
1203
        int( self.dpid, 16 )  # DPID must be a hex string
1204
        # Command to add interfaces
1205
        intfs = ''.join( ' -- add-port %s %s' % ( self, intf ) +
1206
                         self.intfOpts( intf )
1207
                         for intf in self.intfList()
1208
                         if self.ports[ intf ] and not intf.IP() )
1209
        # Command to create controller entries
1210
        clist = [ ( self.name + c.name, '%s:%s:%d' %
1211
                  ( c.protocol, c.IP(), c.port ) )
1212
                  for c in controllers ]
1213
        if self.listenPort:
1214
            clist.append( ( self.name + '-listen',
1215
                            'ptcp:%s' % self.listenPort ) )
1216
        ccmd = '-- --id=@%s create Controller target=\\"%s\\"'
1217
        if self.reconnectms:
1218
            ccmd += ' max_backoff=%d' % self.reconnectms
1219
        cargs = ' '.join( ccmd % ( name, target )
1220
                         for name, target in clist )
1221
        # Controller ID list
1222
        cids = ','.join( '@%s' % name for name, _target in clist )
1223
        # Try to delete any existing bridges with the same name
1224
        if not self.isOldOVS():
1225
            cargs += ' -- --if-exists del-br %s' % self
1226
        # One ovs-vsctl command to rule them all!
1227
        self.vsctl( cargs +
1228
                    ' -- add-br %s' % self +
1229
                    ' -- set bridge %s controller=[%s]' % ( self, cids  ) +
1230
                    self.bridgeOpts() +
1231
                    intfs )
1232
        # XXX BROKEN - need to fix this!!
1233
        # If necessary, restore TC config overwritten by OVS
1234
        # for intf in self.intfList():
1235
        # self.TCReapply( intf )
1236

    
1237
    def stop( self, deleteIntfs=True ):
1238
        """Terminate OVS switch.
1239
           deleteIntfs: delete interfaces? (True)"""
1240
        self.cmd( 'ovs-vsctl del-br', self )
1241
        if self.datapath == 'user':
1242
            self.cmd( 'ip link del', self )
1243
        super( OVSSwitch, self ).stop( deleteIntfs )
1244

    
1245

    
1246
OVSKernelSwitch = OVSSwitch
1247

    
1248

    
1249
class OVSBridge( OVSSwitch ):
1250
    "OVSBridge is an OVSSwitch in standalone/bridge mode"
1251

    
1252
    def __init__( self, args, **kwargs ):
1253
        kwargs.update( failMode='standalone' )
1254
        OVSSwitch.__init__( self, args, **kwargs )
1255

    
1256
    def start( self, controllers ):
1257
        OVSSwitch.start( self, controllers=[] )
1258

    
1259
    def connected( self ):
1260
        "Are we forwarding yet?"
1261
        if self.stp:
1262
            status = self.dpctl( 'show' )
1263
            return 'STP_FORWARD' in status and not 'STP_LEARN' in status
1264
        else:
1265
            return True
1266

    
1267

    
1268
class OVSBatch( OVSSwitch ):
1269
    "Experiment: batch startup of OVS switches"
1270

    
1271
    # This should be ~ int( quietRun( 'getconf ARG_MAX' ) ),
1272
    # but the real limit seems to be much lower
1273
    argmax = 128000
1274
    
1275
    def __init__( self, *args, **kwargs ):
1276
        self.commands = []
1277
        self.started = False
1278
        super( OVSBatch, self ).__init__( *args, **kwargs )
1279

    
1280
    @classmethod
1281
    def batchStartup( cls, switches ):
1282
        "Batch startup for OVS"
1283
        info( '...' )
1284
        cmds = 'ovs-vsctl'
1285
        for switch in switches:
1286
            if cls.isOldOVS():
1287
                quietRun( 'ovs-vsctl del-br %s' % switch )
1288
            for cmd in switch.commands:
1289
                cmd = cmd.strip()
1290
                # Don't exceed ARG_MAX
1291
                if len( cmds ) + len( cmd ) >= cls.argmax:
1292
                    errRun( cmds, shell=True )
1293
                    cmds = 'ovs-vsctl'
1294
                cmds += ' ' + cmd
1295
                switch.started = True
1296
        if cmds:
1297
            errRun( cmds, shell=True )
1298
        return True
1299

    
1300
    def vsctl( self, *args, **kwargs ):
1301
        "Append ovs-vsctl command to list for later execution"
1302
        if self.started:
1303
            return super( OVSBatch, self).vsctl( *args, **kwargs )
1304
        cmd = ' '.join( str( arg ).strip() for arg in args )
1305
        self.commands.append( cmd )
1306

    
1307
    def start( self, *args, **kwargs ):
1308
        super( OVSBatch, self ).start( *args, **kwargs )
1309
        self.started = True
1310
 
1311
    def stop( self, *args, **kwargs ):
1312
        super( OVSBatch, self ).stop( *args, **kwargs )
1313
        self.started = False
1314
         
1315

    
1316
class IVSSwitch( Switch ):
1317
    "Indigo Virtual Switch"
1318

    
1319
    def __init__( self, name, verbose=False, **kwargs ):
1320
        Switch.__init__( self, name, **kwargs )
1321
        self.verbose = verbose
1322

    
1323
    @classmethod
1324
    def setup( cls ):
1325
        "Make sure IVS is installed"
1326
        pathCheck( 'ivs-ctl', 'ivs',
1327
                   moduleName="Indigo Virtual Switch (projectfloodlight.org)" )
1328
        out, err, exitcode = errRun( 'ivs-ctl show' )
1329
        if exitcode:
1330
            error( out + err +
1331
                   'ivs-ctl exited with code %d\n' % exitcode +
1332
                   '*** The openvswitch kernel module might '
1333
                   'not be loaded. Try modprobe openvswitch.\n' )
1334
            exit( 1 )
1335

    
1336
    @classmethod
1337
    def batchShutdown( cls, switches ):
1338
        "Kill each IVS switch, to be waited on later in stop()"
1339
        for switch in switches:
1340
            switch.cmd( 'kill %ivs' )
1341

    
1342
    def start( self, controllers ):
1343
        "Start up a new IVS switch"
1344
        args = ['ivs']
1345
        args.extend( ['--name', self.name] )
1346
        args.extend( ['--dpid', self.dpid] )
1347
        if self.verbose:
1348
            args.extend( ['--verbose'] )
1349
        for intf in self.intfs.values():
1350
            if not intf.IP():
1351
                args.extend( ['-i', intf.name] )
1352
        for c in controllers:
1353
            args.extend( ['-c', '%s:%d' % (c.IP(), c.port)] )
1354
        if self.listenPort:
1355
            args.extend( ['--listen', '127.0.0.1:%i' % self.listenPort] )
1356
        args.append( self.opts )
1357

    
1358
        logfile = '/tmp/ivs.%s.log' % self.name
1359

    
1360
        self.cmd( ' '.join(args) + ' >' + logfile + ' 2>&1 </dev/null &' )
1361

    
1362
    def stop( self, deleteIntfs=True ):
1363
        """Terminate IVS switch.
1364
           deleteIntfs: delete interfaces? (True)"""
1365
        self.cmd( 'kill %ivs' )
1366
        self.cmd( 'wait' )
1367
        super( IVSSwitch, self ).stop( deleteIntfs )
1368

    
1369
    def attach( self, intf ):
1370
        "Connect a data port"
1371
        self.cmd( 'ivs-ctl', 'add-port', '--datapath', self.name, intf )
1372

    
1373
    def detach( self, intf ):
1374
        "Disconnect a data port"
1375
        self.cmd( 'ivs-ctl', 'del-port', '--datapath', self.name, intf )
1376

    
1377
    def dpctl( self, *args ):
1378
        "Run dpctl command"
1379
        if not self.listenPort:
1380
            return "can't run dpctl without passive listening port"
1381
        return self.cmd( 'ovs-ofctl ' + ' '.join( args ) +
1382
                         ' tcp:127.0.0.1:%i' % self.listenPort )
1383

    
1384

    
1385
class Controller( Node ):
1386
    """A Controller is a Node that is running (or has execed?) an
1387
       OpenFlow controller."""
1388

    
1389
    def __init__( self, name, inNamespace=False, command='controller',
1390
                  cargs='-v ptcp:%d', cdir=None, ip="127.0.0.1",
1391
                  port=6633, protocol='tcp', **params ):
1392
        self.command = command
1393
        self.cargs = cargs
1394
        self.cdir = cdir
1395
        self.ip = ip
1396
        self.port = port
1397
        self.protocol = protocol
1398
        Node.__init__( self, name, inNamespace=inNamespace,
1399
                       ip=ip, **params  )
1400
        self.checkListening()
1401

    
1402
    def checkListening( self ):
1403
        "Make sure no controllers are running on our port"
1404
        # Verify that Telnet is installed first:
1405
        out, _err, returnCode = errRun( "which telnet" )
1406
        if 'telnet' not in out or returnCode != 0:
1407
            raise Exception( "Error running telnet to check for listening "
1408
                             "controllers; please check that it is "
1409
                             "installed." )
1410
        listening = self.cmd( "echo A | telnet -e A %s %d" %
1411
                              ( self.ip, self.port ) )
1412
        if 'Connected' in listening:
1413
            servers = self.cmd( 'netstat -natp' ).split( '\n' )
1414
            pstr = ':%d ' % self.port
1415
            clist = servers[ 0:1 ] + [ s for s in servers if pstr in s ]
1416
            raise Exception( "Please shut down the controller which is"
1417
                             " running on port %d:\n" % self.port +
1418
                             '\n'.join( clist ) )
1419

    
1420
    def start( self ):
1421
        """Start <controller> <args> on controller.
1422
           Log to /tmp/cN.log"""
1423
        pathCheck( self.command )
1424
        cout = '/tmp/' + self.name + '.log'
1425
        if self.cdir is not None:
1426
            self.cmd( 'cd ' + self.cdir )
1427
        self.cmd( self.command + ' ' + self.cargs % self.port +
1428
                  ' 1>' + cout + ' 2>' + cout + ' &' )
1429
        self.execed = False
1430

    
1431
    def stop( self, *args, **kwargs ):
1432
        "Stop controller."
1433
        self.cmd( 'kill %' + self.command )
1434
        self.cmd( 'wait %' + self.command )
1435
        kwargs.update( deleteIntfs=False )
1436
        super( Controller, self ).stop( *args, **kwargs )
1437

    
1438
    def IP( self, intf=None ):
1439
        "Return IP address of the Controller"
1440
        if self.intfs:
1441
            ip = Node.IP( self, intf )
1442
        else:
1443
            ip = self.ip
1444
        return ip
1445

    
1446
    def __repr__( self ):
1447
        "More informative string representation"
1448
        return '<%s %s: %s:%s pid=%s> ' % (
1449
            self.__class__.__name__, self.name,
1450
            self.IP(), self.port, self.pid )
1451

    
1452
    @classmethod
1453
    def isAvailable( cls ):
1454
        "Is controller available?"
1455
        return quietRun( 'which controller' )
1456

    
1457

    
1458
class OVSController( Controller ):
1459
    "Open vSwitch controller"
1460
    def __init__( self, name, command='ovs-controller', **kwargs ):
1461
        if quietRun( 'which test-controller' ):
1462
            command = 'test-controller'
1463
        Controller.__init__( self, name, command=command, **kwargs )
1464

    
1465
    @classmethod
1466
    def isAvailable( cls ):
1467
        return ( quietRun( 'which ovs-controller' ) or
1468
                 quietRun( 'which test-controller' ) )
1469

    
1470
class NOX( Controller ):
1471
    "Controller to run a NOX application."
1472

    
1473
    def __init__( self, name, *noxArgs, **kwargs ):
1474
        """Init.
1475
           name: name to give controller
1476
           noxArgs: arguments (strings) to pass to NOX"""
1477
        if not noxArgs:
1478
            warn( 'warning: no NOX modules specified; '
1479
                  'running packetdump only\n' )
1480
            noxArgs = [ 'packetdump' ]
1481
        elif type( noxArgs ) not in ( list, tuple ):
1482
            noxArgs = [ noxArgs ]
1483

    
1484
        if 'NOX_CORE_DIR' not in os.environ:
1485
            exit( 'exiting; please set missing NOX_CORE_DIR env var' )
1486
        noxCoreDir = os.environ[ 'NOX_CORE_DIR' ]
1487

    
1488
        Controller.__init__( self, name,
1489
                             command=noxCoreDir + '/nox_core',
1490
                             cargs='--libdir=/usr/local/lib -v -i ptcp:%s ' +
1491
                             ' '.join( noxArgs ),
1492
                             cdir=noxCoreDir,
1493
                             **kwargs )
1494

    
1495
class RYU( Controller ):
1496
    "Controller to run Ryu application"
1497
    def __init__( self, name, *ryuArgs, **kwargs ):
1498
        """Init.
1499
        name: name to give controller.
1500
        ryuArgs: arguments and modules to pass to Ryu"""
1501
        homeDir = quietRun( 'printenv HOME' ).strip( '\r\n' )
1502
        ryuCoreDir = '%s/ryu/ryu/app/' % homeDir
1503
        if not ryuArgs:
1504
            warn( 'warning: no Ryu modules specified; '
1505
                  'running simple_switch only\n' )
1506
            ryuArgs = [ ryuCoreDir + 'simple_switch.py' ]
1507
        elif type( ryuArgs ) not in ( list, tuple ):
1508
            ryuArgs = [ ryuArgs ]
1509

    
1510
        Controller.__init__( self, name,
1511
                             command='ryu-manager',
1512
                             cargs='--ofp-tcp-listen-port %s ' +
1513
                             ' '.join( ryuArgs ),
1514
                             cdir=ryuCoreDir,
1515
                             **kwargs )
1516

    
1517
class RemoteController( Controller ):
1518
    "Controller running outside of Mininet's control."
1519

    
1520
    def __init__( self, name, ip='127.0.0.1',
1521
                  port=6633, **kwargs):
1522
        """Init.
1523
           name: name to give controller
1524
           ip: the IP address where the remote controller is
1525
           listening
1526
           port: the port where the remote controller is listening"""
1527
        Controller.__init__( self, name, ip=ip, port=port, **kwargs )
1528

    
1529
    def start( self ):
1530
        "Overridden to do nothing."
1531
        return
1532

    
1533
    def stop( self ):
1534
        "Overridden to do nothing."
1535
        return
1536

    
1537
    def checkListening( self ):
1538
        "Warn if remote controller is not accessible"
1539
        listening = self.cmd( "echo A | telnet -e A %s %d" %
1540
                              ( self.ip, self.port ) )
1541
        if 'Connected' not in listening:
1542
            warn( "Unable to contact the remote controller"
1543
                  " at %s:%d\n" % ( self.ip, self.port ) )
1544

    
1545

    
1546
DefaultControllers = ( Controller, OVSController )
1547

    
1548
def findController( controllers=DefaultControllers ):
1549
    "Return first available controller from list, if any"
1550
    for controller in controllers:
1551
        if controller.isAvailable():
1552
            return controller
1553

    
1554
def DefaultController( name, controllers=DefaultControllers, **kwargs ):
1555
    "Find a controller that is available and instantiate it"
1556
    controller = findController( controllers )
1557
    if not controller:
1558
        raise Exception( 'Could not find a default OpenFlow controller' )
1559
    return controller( name, **kwargs )