Revision ff14495b

View differences:

README
1
### Dependencies
2

  
3
This software requires the following libraries:
4

  
5
argparse_actions
6
iproute2
7
portalocker
8

  
9

  
10
### Usage examples
11

  
12
In all the examples that follow we assume we want to use "psngc" as the channels
13
tag name (specified with option -t) and that the RPC deamon responds at address
14
127.0.0.1 on port 7373 (specified with options -a and -p respectively).
15

  
16
## Add a new channel
17

  
18
Suppose you want to add a new channel named "Channel1" and whose source IP
19
address and port are 192.168.100.2 and 6000 respectively. Then you execute the
20
following command:
21

  
22
python psng-pyserf.py -t psngc -a 127.0.0.1 -p 7373 \
23
  set 192.168.100.2 6000 "Channel1" "Reserved"
24

  
25
The last string "Reserved" can be used to specify any additional parameters that
26
can be useful for whoever will receive this channel.
27

  
28
NOTE: It is not possible to add two different channel that use the same address
29
and port. This contraint is verified only on the local node, this means that is
30
responsibility of the local node to use the correct source channel IP address.
31

  
32
NOTE: Adding a new channel propagates a member-update event on the Serf network.
33
If the channel already exist on the list of channels of the local node, then the
34
member-update event is not generated.
35

  
36

  
37
## Delete a channel
38

  
39
Channel deletion is based exclusively on the source IP address and port of a
40
channel. The channel name and the "Reserved" string are ignored. Suppose you
41
want to delete the channel that was added in the previous example (address
42
192.168.100.2 and port 6000). The you execute the following command:
43

  
44
python psng-pyserf.py -t psngc -a 127.0.0.1 -p 7373 \
45
  del 192.168.100.2 6000
46

  
47
NOTE: Deleting a channel propagates a member-update event on the Serf network.
48
If the channel is not present in the list of channels that belong to the local
49
node, then the member-update event is not generated.
50

  
51

  
52
## Running the software in background
53

  
54
All the nodes that are interested in keepeing an updted list of channels
55
existing on the network should run the software in background mode with the
56
following command:
57

  
58
python psng-pyserf.py -t psngc -a 127.0.0.1 -p 7373 \
59
  bg /tmp/channels.db
60

  
61
the path specified at the end of the command is the channels database file that
62
the software will use to list the existing channels. This file is written for
63
the first time when the software starts and is updated everytime a channel is
64
added or deleted (this is known by listening to the member-update events). The
65
software use an exclusive file system file lock every time the file is updated.
66
The file is formatted by an header line (starting with the character "#") and by
67
a list of zero or more channels. Here an examples of the channels database file
68
containing two channels:
69

  
70
# channel_name,source_addr,source_port,channel_params
71
Channel1,192.168.100.2,6000,Reserved
72
Cool Channel,192.168.100.3,7000,720p h264
73

  
psng-pyserf.py
1
import argparse
2
import argparse_actions
3
import serf
4
import sys
5
import base64
6
import bz2
7
from pyroute2 import IPRoute
8
import portalocker
9

  
10

  
11
class PsngSerfClient:
12

  
13
    def __init__(self, tag_name, rpc_address, rpc_port):
14
        self.tag_name = tag_name
15
        self.rpc_address = rpc_address
16
        self.rpc_port = rpc_port
17
        self.ch_dbfile = None
18
        self.client = None
19
        # Remember all tags parsed the last time
20
        self.last_channels_tags_list = []
21

  
22
    def encode_and_compress(self, s):
23
        ret_data = base64.b64encode(s)
24
        ret_data = bz2.compress(ret_data)
25

  
26
        return ret_data
27

  
28
    def decompress_and_decode(self, data):
29
        ret_str = bz2.decompress(data)
30
        ret_str = base64.b64decode(ret_str)
31

  
32
        return ret_str
33

  
34
    def encode(self, s):
35
        ret_data = base64.b64encode(s)
36

  
37
        return ret_data
38

  
39
    def decode(self, data):
40
        ret_str = base64.b64decode(data)
41

  
42
        return ret_str
43

  
44
    def get_local_ips(self):
45
        ip = IPRoute()
46
        local_if = ip.get_addr()
47
        local_ips = []
48
        for interf in local_if:
49
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
50

  
51
        return local_ips
52

  
53
    def get_members(self):
54
        # Retrieve informations from all the serf memebrs.
55
        # This is done through the RPC members call.
56
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
57
        client.connect()
58
        client.members()
59
        resp = client.request(timeout=5)
60
        client.disconnect()
61

  
62
        return resp
63

  
64
    def set_tag(self, tag_dict):
65
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
66
        client.connect()
67
        client.tags(Tags=tag_dict)
68
        resp = client.request()
69
        client.disconnect()
70

  
71
        return resp
72

  
73
    def del_tag(self, tag_names_list):
74
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
75
        client.connect()
76
        client.tags(DeleteTags=tag_names_list)
77
        resp = client.request()
78
        client.disconnect()
79

  
80
        return resp
81

  
82
    def is_local_member(self, member, local_ips):
83
        if member["Addr"]:
84
            m_addr = ".".join([str(x) for x in member["Addr"]])
85
            if m_addr in local_ips:
86
                return True
87
            else:
88
                return False
89
        else:
90
            print "Warning: found a memeber without address\n"
91
            print member
92
            return False
93

  
94
    def write_db_file(self, file_name, channels_list):
95

  
96
        file_hdr = "# channel_name,source_addr,source_port,channel_params"
97

  
98
        db_file = open(file_name, 'w')
99
        portalocker.lock(db_file, portalocker.LOCK_EX)
100
        db_file.write("%s\n" % (file_hdr,))
101
        for c in channels_list:
102
            print "Add channel: %s" % (c,)
103
            db_file.write("%s\n" % (c,))
104

  
105
        db_file.close()
106

  
107
    def member_update_event_callback(self, resp):
108

  
109
        if resp.is_success:
110
            print "Received event: member-update"
111
            resp_body = resp.body
112
            members = resp_body["Members"]
113

  
114
            for m in members:
115
                # For now we consider only "alive" members
116
                if m["Status"] == "alive":
117
                    if m["Tags"]:
118
                        if m["Tags"].get(self.tag_name):
119
                            c_tag = m["Tags"].get(self.tag_name)
120
                            if c_tag not in self.last_channels_tags_list:
121
                                self.update_db_from_members()
122
                                return 0
123
        else:
124
            sys.stderr.write("Serf streamed event failed\n")
125
            sys.stderr.write("%s" % (resp.error,))
126

  
127
        return 0
128

  
129
    def listen_for_member_update_events(self, ch_dbfile):
130
        print "Database file: %s" % (ch_dbfile,)
131

  
132
        # Write the db file based on the current members channels tags
133
        self.client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port),
134
                                  auto_reconnect=True)
135
        self.client.connect()
136

  
137
        self.ch_dbfile = ch_dbfile
138
        self.update_db_from_members()
139

  
140
        try:
141
            # Register callback for memebr update events
142
            # Todo: handle sigint
143
            self.client.stream(Type="*").add_callback(
144
                               self.member_update_event_callback).request(
145
                               watch=True)
146
        except KeyboardInterrupt:
147
            print "Disconnection from RPC deamon"
148
            self.client.disconnect()
149

  
150
    def update_db_from_members(self):
151
        # WARNING: This method assumes the connection towards the RPC deamon
152
        # is already open and the client seved in self.client
153

  
154
        if not self.client:
155
            return
156

  
157
        # Retrieve serf members
158
        self.client.members()
159
        resp = self.client.request(timeout=5)
160

  
161
        if resp[0].is_success:
162
            resp_body = resp[0].body
163
            members = resp_body["Members"]
164

  
165
            # Retrieve channel tags
166
            self.last_channels_tags_list = []
167
            channel_tags_list = []
168

  
169
            for m in members:
170
                # For now we consider only "alive" members
171
                if m["Status"] == "alive":
172
                    if m["Tags"]:
173
                        if m["Tags"].get(self.tag_name):
174
                            c_tag = m["Tags"].get(self.tag_name)
175
                            channel_tags_list.append(c_tag)
176
                            self.last_channels_tags_list.append(c_tag)
177

  
178
            # Build channels list
179
            channels_list = []
180

  
181
            for t_comp in channel_tags_list:
182
                # Decode the channel tag
183
                t = self.decode(t_comp)
184

  
185
                # Each member can have more than one channel.
186
                # Channels are separated by the ";" character.
187
                channels_list += t.split(";")
188

  
189
            # Write database file
190
            self.write_db_file(self.ch_dbfile, channels_list)
191

  
192
        else:
193
            sys.stderr.write("Serf members command failed\n")
194
            sys.stderr.write("%s" % (resp[0].error,))
195
            return
196

  
197
    def delete_channel(self, ch_addr, ch_port):
198
        resp = self.get_members()
199

  
200
        if resp[0].is_success:
201
            resp_body = resp[0].body
202
            members = resp_body["Members"]
203

  
204
            # Used to save the channels tag of the local memebr
205
            local_node_channels = None
206
            channels_tag_exist = False
207
            new_channels_string = ""
208
            delete_channel = False
209

  
210
            # find all local IP addresses and save them in local_ips
211
            local_ips = self.get_local_ips()
212

  
213
            for m in members:
214
                # For now we consider only "alive" members
215
                if m["Status"] == "alive":
216
                    if self.is_local_member(m, local_ips):
217
                        if m["Tags"]:
218
                            if m["Tags"].get(self.tag_name):
219
                                channels_tag_exist = True
220
                            local_node_channels = m["Tags"].get(self.tag_name)
221

  
222
            if local_node_channels:
223
                # Decompress and decode the local channel and compare to the
224
                # channel we want to delete
225
                # The channel are compared only considering the
226
                # address and the port
227
                channels = self.decode(local_node_channels)
228

  
229
                # Each member can have more than one channel.
230
                # Channels are separated by the ";" character.
231
                channels_list = channels.split(";")
232

  
233
                for c in channels_list:
234
                    [_, caddr, cport, _] = c.split(",")
235

  
236
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
237
                        # This is not the channel we want to delete.
238
                        # Add it to the new channels string
239
                        if new_channels_string:
240
                            new_channels_string += ";" + c
241
                        else:
242
                            new_channels_string = c
243
                    else:
244
                        delete_channel = True
245
                        print "Delete channel: %s" % (c,)
246

  
247
            if new_channels_string and delete_channel:
248
                # If new_channels_string is not empty we just need to update
249
                # the channels tag through the RPC tags -set call.
250

  
251
                print "Update channels: %s" % (new_channels_string,)
252

  
253
                # Encode the string
254
                ch_str_comp = self.encode(new_channels_string)
255

  
256
                # Update (or add) the channels tag.
257
                # This is done through the RCP tags call
258

  
259
                resp = self.set_tag({self.tag_name: ch_str_comp})
260

  
261
                if not resp[0].is_success:
262
                    sys.stderr.write("Serf tags set command failed\n")
263
                    sys.stderr.write("%s" % (resp[0].error,))
264

  
265
            elif not new_channels_string and channels_tag_exist:
266
                # If new_channels_string is empty but channels_tag_exist
267
                # is True we can delete che cahnnels tag through the RPC
268
                # tags -delete call
269
                print "Delete tag: %s" % (self.tag_name,)
270

  
271
                resp = self.del_tag((self.tag_name,))
272

  
273
                if not resp[0].is_success:
274
                    sys.stderr.write("Serf tags delete command failed\n")
275
                    sys.stderr.write("%s" % (resp[0].error,))
276

  
277
        else:
278
            sys.stderr.write("Serf members command failed\n")
279
            sys.stderr.write("%s" % (resp[0].error,))
280
            return
281

  
282
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt):
283
        # Build new channel string
284
        ch_str = '%s,%s,%d,%s' % (ch_name, ch_addr, ch_port, ch_txt)
285
        print "Add channel: %s\n" % (ch_str,)
286

  
287
        # Retrieve informations from all the serf memebrs.
288
        resp = self.get_members()
289

  
290
        if resp[0].is_success:
291
            resp_body = resp[0].body
292
            members = resp_body["Members"]
293

  
294
            # One encoded channels string for each "alive" member
295
            nodes_channels_list = []
296
            # Used to save the channels tag of the local memebr
297
            local_node_channels = None
298

  
299
            # find all local IP addresses and save them in local_ips
300
            local_ips = self.get_local_ips()
301

  
302
            for m in members:
303
                # For now we consider only "alive" members
304
                if m["Status"] == "alive":
305

  
306
                    # Check if this is the local member
307
                    local_member = self.is_local_member(m, local_ips)
308

  
309
                    # Save the channels tag
310
                    if m["Tags"]:
311
                        node_channels = m["Tags"].get(self.tag_name)
312

  
313
                        if node_channels:
314
                            nodes_channels_list.append(node_channels)
315
                            if local_member:
316
                                local_node_channels = node_channels
317

  
318
            # Don't add the new channel if it already exists
319
            for channels_comp in nodes_channels_list:
320
                # Dont' consider empty strings
321
                if not channels_comp:
322
                    continue
323

  
324
                # Decompress and decode each channel and compare to the new
325
                # channel we want to set
326
                # The channel are compared only considering the
327
                # address and the port
328
                channels = self.decode(channels_comp)
329

  
330
                # Each member can have more than one channel.
331
                # Channels are separated by the ";" character.
332
                channels_list = channels.split(";")
333

  
334
                for c in channels_list:
335
                    [_, caddr, cport, _] = c.split(",")
336

  
337
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
338
                        print "Channel already exists"
339
                        return
340

  
341
            # If we arrive here this means that we are trying to add a new
342
            # channel.
343
            # Encode and compress data
344
            if local_node_channels:
345
                ch_str = ';'.join([self.decode(
346
                                  local_node_channels),
347
                                  ch_str])
348
            ch_str_comp = self.encode(ch_str)
349

  
350
            # Update (or add) the channels tag.
351
            # This is done through the RCP tags call
352
            resp = self.set_tag({self.tag_name: ch_str_comp})
353

  
354
            if not resp[0].is_success:
355
                sys.stderr.write("Serf tags command failed\n")
356
                sys.stderr.write("%s" % (resp[0].error,))
357

  
358
        else:
359
            sys.stderr.write("Serf members command failed\n")
360
            sys.stderr.write("%s" % (resp[0].error,))
361
            return
362

  
363
    def __str__(self):
364
        ret_str = "PSNG Serf RPC client: "
365
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
366
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
367
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
368
        return ret_str
369

  
370

  
371
def psng_serf_client_init():
372
    parser = argparse.ArgumentParser()
373

  
374
    # Optionals
375
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
376
                        help="PeerStreamer Next-Generation source tag name",
377
                        dest="tagname")
378

  
379
    # Mandatory for all modes
380
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
381
                        help="IP address of the Serf RPC server",
382
                        dest="rpcaddress",
383
                        action=argparse_actions.ProperIpFormatAction)
384
    parser.add_argument("-p", "--rpcport", type=int, required=True,
385
                        help="TCP port of the Serf RPC server",
386
                        dest="rpcport", choices=range(0, 65536),
387
                        metavar="[0-65535]")
388

  
389
    subparsers = parser.add_subparsers(dest="command")
390
    # Set PeerStreamer Next-Generation source tag
391
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
392
                                       "PeerStreamer Next-Generation "
393
                                       "source tag (Call RPC tags --set). "
394
                                       "If this node has already a "
395
                                       "PeerStreamer Next-Generation source "
396
                                       "tag associated, then the new channel "
397
                                       "is appended to the existing ones.")
398
    parser_set.add_argument("caddr", type=str,
399
                            help="Source channel IP address",
400
                            action=argparse_actions.ProperIpFormatAction)
401
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
402
                            help="Source channel port",
403
                            metavar="[0-65535]")
404
    parser_set.add_argument("cname", type=str,
405
                            help="Source channel name")
406
    parser_set.add_argument("ctxt", type=str,
407
                            help="Source channel additional parameters")
408

  
409
    # Delete PeerStreamer Next-Generation source tag
410
    parser_del = subparsers.add_parser("del", help="Delete a channel "
411
                                       "identified by a source address, a "
412
                                       "source port and a name. If the "
413
                                       "resulting PeerStreamer "
414
                                       "Next-Generation source tag is empty, "
415
                                       "then it will be deleted by calling "
416
                                       "the RPC procedure tags --delete.")
417
    parser_del.add_argument("caddr", type=str,
418
                            help="Source channel IP address",
419
                            action=argparse_actions.ProperIpFormatAction)
420
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
421
                            help="Source channel port",
422
                            metavar="[0-65535]")
423
    # parser_del.add_argument("cname", type=str,
424
    #                         help="Source channel name")
425

  
426
    parser_db = subparsers.add_parser("bg",
427
                                      help="Run in background and keep the "
428
                                      "database file updated by listening to "
429
                                      "member-update Serf events.")
430
    parser_db.add_argument("dbfile", type=str,
431
                           help="Channels database file")
432

  
433
    try:
434
        args = parser.parse_args()
435

  
436
        tag_name = args.tagname
437
        rpc_address = args.rpcaddress
438
        rpc_port = args.rpcport
439

  
440
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
441
        print(serf_client)
442

  
443
        command = args.command
444
        if command == "bg":
445
            ch_dbfile = args.dbfile
446
            serf_client.listen_for_member_update_events(ch_dbfile)
447
        elif command == "set":
448
            ch_addr = args.caddr
449
            ch_port = args.cport
450
            ch_name = args.cname
451
            ch_txt = args.ctxt
452
            serf_client.set_new_channel(ch_addr, ch_port, ch_name, ch_txt)
453
        elif command == "del":
454
            ch_addr = args.caddr
455
            ch_port = args.cport
456
            serf_client.delete_channel(ch_addr, ch_port)
457
        else:
458
            print "Unknown mode"
459
            return -1
460

  
461
    except argparse_actions.InvalidIp as e:
462
        print "IP address is invalid: {0}".format(e.ip)
463

  
464

  
465
if __name__ == "__main__":
466
    psng_serf_client_init()

Also available in: Unified diff