Statistics
| Branch: | Revision:

iof-tools / networkxMiCe / networkx-master / networkx / utils / decorators.py @ 5cef0f13

History | View | Annotate | Download (14.4 KB)

1
import sys
2
from warnings import warn
3

    
4
from collections import defaultdict
5
from os.path import splitext
6
from contextlib import contextmanager
7
try:
8
    from pathlib import Path
9
except ImportError:
10
    # Use Path to indicate if pathlib exists (like numpy does)
11
    Path = None
12

    
13
import networkx as nx
14
from decorator import decorator
15
from networkx.utils import is_string_like, create_random_state, \
16
                           create_py_random_state
17

    
18
__all__ = [
19
    'not_implemented_for',
20
    'open_file',
21
    'nodes_or_number',
22
    'preserve_random_state',
23
    'random_state',
24
    'np_random_state',
25
    'py_random_state',
26
]
27

    
28

    
29
def not_implemented_for(*graph_types):
30
    """Decorator to mark algorithms as not implemented
31

32
    Parameters
33
    ----------
34
    graph_types : container of strings
35
        Entries must be one of 'directed','undirected', 'multigraph', 'graph'.
36

37
    Returns
38
    -------
39
    _require : function
40
        The decorated function.
41

42
    Raises
43
    ------
44
    NetworkXNotImplemented
45
    If any of the packages cannot be imported
46

47
    Notes
48
    -----
49
    Multiple types are joined logically with "and".
50
    For "or" use multiple @not_implemented_for() lines.
51

52
    Examples
53
    --------
54
    Decorate functions like this::
55

56
       @not_implemnted_for('directed')
57
       def sp_function(G):
58
           pass
59

60
       @not_implemnted_for('directed','multigraph')
61
       def sp_np_function(G):
62
           pass
63
    """
64
    @decorator
65
    def _not_implemented_for(not_implement_for_func, *args, **kwargs):
66
        graph = args[0]
67
        terms = {'directed': graph.is_directed(),
68
                 'undirected': not graph.is_directed(),
69
                 'multigraph': graph.is_multigraph(),
70
                 'graph': not graph.is_multigraph()}
71
        match = True
72
        try:
73
            for t in graph_types:
74
                match = match and terms[t]
75
        except KeyError:
76
            raise KeyError('use one or more of ',
77
                           'directed, undirected, multigraph, graph')
78
        if match:
79
            msg = 'not implemented for %s type' % ' '.join(graph_types)
80
            raise nx.NetworkXNotImplemented(msg)
81
        else:
82
            return not_implement_for_func(*args, **kwargs)
83
    return _not_implemented_for
84

    
85

    
86
def _open_gz(path, mode):
87
    import gzip
88
    return gzip.open(path, mode=mode)
89

    
90

    
91
def _open_bz2(path, mode):
92
    import bz2
93
    return bz2.BZ2File(path, mode=mode)
94

    
95

    
96
# To handle new extensions, define a function accepting a `path` and `mode`.
97
# Then add the extension to _dispatch_dict.
98
_dispatch_dict = defaultdict(lambda: open)
99
_dispatch_dict['.gz'] = _open_gz
100
_dispatch_dict['.bz2'] = _open_bz2
101
_dispatch_dict['.gzip'] = _open_gz
102

    
103

    
104
def open_file(path_arg, mode='r'):
105
    """Decorator to ensure clean opening and closing of files.
106

107
    Parameters
108
    ----------
109
    path_arg : int
110
        Location of the path argument in args.  Even if the argument is a
111
        named positional argument (with a default value), you must specify its
112
        index as a positional argument.
113
    mode : str
114
        String for opening mode.
115

116
    Returns
117
    -------
118
    _open_file : function
119
        Function which cleanly executes the io.
120

121
    Examples
122
    --------
123
    Decorate functions like this::
124

125
       @open_file(0,'r')
126
       def read_function(pathname):
127
           pass
128

129
       @open_file(1,'w')
130
       def write_function(G,pathname):
131
           pass
132

133
       @open_file(1,'w')
134
       def write_function(G, pathname='graph.dot')
135
           pass
136

137
       @open_file('path', 'w+')
138
       def another_function(arg, **kwargs):
139
           path = kwargs['path']
140
           pass
141
    """
142
    # Note that this decorator solves the problem when a path argument is
143
    # specified as a string, but it does not handle the situation when the
144
    # function wants to accept a default of None (and then handle it).
145
    # Here is an example:
146
    #
147
    # @open_file('path')
148
    # def some_function(arg1, arg2, path=None):
149
    #    if path is None:
150
    #        fobj = tempfile.NamedTemporaryFile(delete=False)
151
    #        close_fobj = True
152
    #    else:
153
    #        # `path` could have been a string or file object or something
154
    #        # similar. In any event, the decorator has given us a file object
155
    #        # and it will close it for us, if it should.
156
    #        fobj = path
157
    #        close_fobj = False
158
    #
159
    #    try:
160
    #        fobj.write('blah')
161
    #    finally:
162
    #        if close_fobj:
163
    #            fobj.close()
164
    #
165
    # Normally, we'd want to use "with" to ensure that fobj gets closed.
166
    # However, recall that the decorator will make `path` a file object for
167
    # us, and using "with" would undesirably close that file object. Instead,
168
    # you use a try block, as shown above. When we exit the function, fobj will
169
    # be closed, if it should be, by the decorator.
170

    
171
    @decorator
172
    def _open_file(func_to_be_decorated, *args, **kwargs):
173

    
174
        # Note that since we have used @decorator, *args, and **kwargs have
175
        # already been resolved to match the function signature of func. This
176
        # means default values have been propagated. For example,  the function
177
        # func(x, y, a=1, b=2, **kwargs) if called as func(0,1,b=5,c=10) would
178
        # have args=(0,1,1,5) and kwargs={'c':10}.
179

    
180
        # First we parse the arguments of the decorator. The path_arg could
181
        # be an positional argument or a keyword argument.  Even if it is
182
        try:
183
            # path_arg is a required positional argument
184
            # This works precisely because we are using @decorator
185
            path = args[path_arg]
186
        except TypeError:
187
            # path_arg is a keyword argument. It is "required" in the sense
188
            # that it must exist, according to the decorator specification,
189
            # It can exist in `kwargs` by a developer specified default value
190
            # or it could have been explicitly set by the user.
191
            try:
192
                path = kwargs[path_arg]
193
            except KeyError:
194
                # Could not find the keyword. Thus, no default was specified
195
                # in the function signature and the user did not provide it.
196
                msg = 'Missing required keyword argument: {0}'
197
                raise nx.NetworkXError(msg.format(path_arg))
198
            else:
199
                is_kwarg = True
200
        except IndexError:
201
            # A "required" argument was missing. This can only happen if
202
            # the decorator of the function was incorrectly specified.
203
            # So this probably is not a user error, but a developer error.
204
            msg = "path_arg of open_file decorator is incorrect"
205
            raise nx.NetworkXError(msg)
206
        else:
207
            is_kwarg = False
208

    
209
        # Now we have the path_arg. There are two types of input to consider:
210
        #   1) string representing a path that should be opened
211
        #   2) an already opened file object
212
        if is_string_like(path):
213
            ext = splitext(path)[1]
214
            fobj = _dispatch_dict[ext](path, mode=mode)
215
            close_fobj = True
216
        elif hasattr(path, 'read'):
217
            # path is already a file-like object
218
            fobj = path
219
            close_fobj = False
220
        elif Path is not None and isinstance(path, Path):
221
            # path is a pathlib reference to a filename
222
            fobj = _dispatch_dict[path.suffix](str(path), mode=mode)
223
            close_fobj = True
224
        else:
225
            # could be None, in which case the algorithm will deal with it
226
            fobj = path
227
            close_fobj = False
228

    
229
        # Insert file object into args or kwargs.
230
        if is_kwarg:
231
            new_args = args
232
            kwargs[path_arg] = fobj
233
        else:
234
            # args is a tuple, so we must convert to list before modifying it.
235
            new_args = list(args)
236
            new_args[path_arg] = fobj
237

    
238
        # Finally, we call the original function, making sure to close the fobj
239
        try:
240
            result = func_to_be_decorated(*new_args, **kwargs)
241
        finally:
242
            if close_fobj:
243
                fobj.close()
244

    
245
        return result
246

    
247
    return _open_file
248

    
249

    
250
def nodes_or_number(which_args):
251
    """Decorator to allow number of nodes or container of nodes.
252

253
    Parameters
254
    ----------
255
    which_args : int or sequence of ints
256
        Location of the node arguments in args. Even if the argument is a
257
        named positional argument (with a default value), you must specify its
258
        index as a positional argument.
259
        If more than one node argument is allowed, can be a list of locations.
260

261
    Returns
262
    -------
263
    _nodes_or_numbers : function
264
        Function which replaces int args with ranges.
265

266
    Examples
267
    --------
268
    Decorate functions like this::
269

270
       @nodes_or_number(0)
271
       def empty_graph(nodes):
272
           pass
273

274
       @nodes_or_number([0,1])
275
       def grid_2d_graph(m1, m2, periodic=False):
276
           pass
277

278
       @nodes_or_number(1)
279
       def full_rary_tree(r, n)
280
           # r is a number. n can be a number of a list of nodes
281
           pass
282
    """
283
    @decorator
284
    def _nodes_or_number(func_to_be_decorated, *args, **kw):
285
        # form tuple of arg positions to be converted.
286
        try:
287
            iter_wa = iter(which_args)
288
        except TypeError:
289
            iter_wa = (which_args,)
290
        # change each argument in turn
291
        new_args = list(args)
292
        for i in iter_wa:
293
            n = args[i]
294
            try:
295
                nodes = list(range(n))
296
            except TypeError:
297
                nodes = tuple(n)
298
            else:
299
                if n < 0:
300
                    msg = "Negative number of nodes not valid: %i" % n
301
                    raise nx.NetworkXError(msg)
302
            new_args[i] = (n, nodes)
303
        return func_to_be_decorated(*new_args, **kw)
304
    return _nodes_or_number
305

    
306

    
307
def preserve_random_state(func):
308
    """ Decorator to preserve the numpy.random state during a function.
309

310
    Parameters
311
    ----------
312
    func : function
313
        function around which to preserve the random state.
314

315
    Returns
316
    -------
317
    wrapper : function
318
        Function which wraps the input function by saving the state before
319
        calling the function and restoring the function afterward.
320

321
    Examples
322
    --------
323
    Decorate functions like this::
324

325
        @preserve_random_state
326
        def do_random_stuff(x, y):
327
            return x + y * numpy.random.random()
328

329
    Notes
330
    -----
331
    If numpy.random is not importable, the state is not saved or restored.
332
    """
333
    try:
334
        from numpy.random import get_state, seed, set_state
335

    
336
        @contextmanager
337
        def save_random_state():
338
            state = get_state()
339
            try:
340
                yield
341
            finally:
342
                set_state(state)
343

    
344
        def wrapper(*args, **kwargs):
345
            with save_random_state():
346
                seed(1234567890)
347
                return func(*args, **kwargs)
348
        wrapper.__name__ = func.__name__
349
        return wrapper
350
    except ImportError:
351
        return func
352

    
353

    
354
def random_state(random_state_index):
355
    """Decorator to generate a numpy.random.RandomState instance.
356

357
    Argument position `random_state_index` is processed by create_random_state.
358
    The result is a numpy.random.RandomState instance.
359

360
    Parameters
361
    ----------
362
    random_state_index : int
363
        Location of the random_state argument in args that is to be used to
364
        generate the numpy.random.RandomState instance. Even if the argument is
365
        a named positional argument (with a default value), you must specify
366
        its index as a positional argument.
367

368
    Returns
369
    -------
370
    _random_state : function
371
        Function whose random_state keyword argument is a RandomState instance.
372

373
    Examples
374
    --------
375
    Decorate functions like this::
376

377
       @np_random_state(0)
378
       def random_float(random_state=None):
379
           return random_state.rand()
380

381
       @np_random_state(1)
382
       def random_array(dims, random_state=1):
383
           return random_state.rand(*dims)
384

385
    See Also
386
    --------
387
    py_random_state
388
    """
389
    @decorator
390
    def _random_state(func, *args, **kwargs):
391
        # Parse the decorator arguments.
392
        try:
393
            random_state_arg = args[random_state_index]
394
        except TypeError:
395
            raise nx.NetworkXError("random_state_index must be an integer")
396
        except IndexError:
397
            raise nx.NetworkXError("random_state_index is incorrect")
398

    
399
        # Create a numpy.random.RandomState instance
400
        random_state = create_random_state(random_state_arg)
401

    
402
        # args is a tuple, so we must convert to list before modifying it.
403
        new_args = list(args)
404
        new_args[random_state_index] = random_state
405
        return func(*new_args, **kwargs)
406
    return _random_state
407

    
408

    
409
np_random_state = random_state
410

    
411

    
412
def py_random_state(random_state_index):
413
    """Decorator to generate a random.Random instance (or equiv).
414

415
    Argument position `random_state_index` processed by create_py_random_state.
416
    The result is either a random.Random instance, or numpy.random.RandomState
417
    instance with additional attributes to mimic basic methods of Random.
418

419
    Parameters
420
    ----------
421
    random_state_index : int
422
        Location of the random_state argument in args that is to be used to
423
        generate the numpy.random.RandomState instance. Even if the argument is
424
        a named positional argument (with a default value), you must specify
425
        its index as a positional argument.
426

427
    Returns
428
    -------
429
    _random_state : function
430
        Function whose random_state keyword argument is a RandomState instance.
431

432
    Examples
433
    --------
434
    Decorate functions like this::
435

436
       @py_random_state(0)
437
       def random_float(random_state=None):
438
           return random_state.rand()
439

440
       @py_random_state(1)
441
       def random_array(dims, random_state=1):
442
           return random_state.rand(*dims)
443

444
    See Also
445
    --------
446
    np_random_state
447
    """
448
    @decorator
449
    def _random_state(func, *args, **kwargs):
450
        # Parse the decorator arguments.
451
        try:
452
            random_state_arg = args[random_state_index]
453
        except TypeError:
454
            raise nx.NetworkXError("random_state_index must be an integer")
455
        except IndexError:
456
            raise nx.NetworkXError("random_state_index is incorrect")
457

    
458
        # Create a numpy.random.RandomState instance
459
        random_state = create_py_random_state(random_state_arg)
460

    
461
        # args is a tuple, so we must convert to list before modifying it.
462
        new_args = list(args)
463
        new_args[random_state_index] = random_state
464
        return func(*new_args, **kwargs)
465
    return _random_state