Statistics
| Branch: | Tag: | Revision:

psng-pyserf / psng-pyserf.py @ master

History | View | Annotate | Download (20.5 KB)

1 7df1781e Nicolo' Facchi
#!/usr/bin/env python
2
3 dc6db133 Nicolo' Facchi
# BSD 3-Clause License
4
#
5
# Copyright (c) 2017, netCommons H2020 project
6
# All rights reserved.
7
#
8
# Redistribution and use in source and binary forms, with or without
9
# modification, are permitted provided that the following conditions are met:
10
#
11
# * Redistributions of source code must retain the above copyright notice, this
12
# list of conditions and the following disclaimer.
13
#
14
# * Redistributions in binary form must reproduce the above copyright notice,
15
# this list of conditions and the following disclaimer in the documentation
16
# and/or other materials provided with the distribution.
17
#
18
# * Neither the name of the copyright holder nor the names of its contributors
19
# may be used to endorse or promote products derived from this software without
20
# specific prior written permission.
21
#
22
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
26
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
# POSSIBILITY OF SUCH DAMAGE.
33
34 ff14495b Nicolo' Facchi
import argparse
35
import argparse_actions
36
import serf
37
import sys
38
import base64
39
import bz2
40
from pyroute2 import IPRoute
41
import portalocker
42 434d5e23 Nicolo' Facchi
import time
43 ff14495b Nicolo' Facchi
44
45
class PsngSerfClient:
46
47
    def __init__(self, tag_name, rpc_address, rpc_port):
48
        self.tag_name = tag_name
49
        self.rpc_address = rpc_address
50
        self.rpc_port = rpc_port
51
        self.ch_dbfile = None
52
        self.client = None
53
        # Remember all tags parsed the last time
54
        self.last_channels_tags_list = []
55
56
    def encode_and_compress(self, s):
57
        ret_data = base64.b64encode(s)
58
        ret_data = bz2.compress(ret_data)
59
60
        return ret_data
61
62
    def decompress_and_decode(self, data):
63
        ret_str = bz2.decompress(data)
64
        ret_str = base64.b64decode(ret_str)
65
66
        return ret_str
67
68
    def encode(self, s):
69
        ret_data = base64.b64encode(s)
70
71
        return ret_data
72
73
    def decode(self, data):
74
        ret_str = base64.b64decode(data)
75
76
        return ret_str
77
78
    def get_local_ips(self):
79
        ip = IPRoute()
80
        local_if = ip.get_addr()
81
        local_ips = []
82
        for interf in local_if:
83
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
84
85
        return local_ips
86
87
    def get_members(self):
88
        # Retrieve informations from all the serf memebrs.
89
        # This is done through the RPC members call.
90 434d5e23 Nicolo' Facchi
91
        resp = None
92
93
        try:
94
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
95
            client.connect()
96
            client.members()
97
            resp = client.request(timeout=5)
98
            client.disconnect()
99
        except serf._exceptions.ConnectionError:
100
            print "Connection error"
101 ff14495b Nicolo' Facchi
102
        return resp
103
104
    def set_tag(self, tag_dict):
105 434d5e23 Nicolo' Facchi
        resp = None
106
107
        try:
108
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
109
            client.connect()
110
            client.tags(Tags=tag_dict)
111
            resp = client.request()
112
            client.disconnect()
113
        except serf._exceptions.ConnectionError:
114
            print "Connection error"
115 ff14495b Nicolo' Facchi
116
        return resp
117
118
    def del_tag(self, tag_names_list):
119 434d5e23 Nicolo' Facchi
        resp = None
120
121
        try:
122
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
123
            client.connect()
124
            client.tags(DeleteTags=tag_names_list)
125
            resp = client.request()
126
            client.disconnect()
127
        except serf._exceptions.ConnectionError:
128
            print "Connection error"
129 ff14495b Nicolo' Facchi
130
        return resp
131
132
    def is_local_member(self, member, local_ips):
133
        if member["Addr"]:
134
            m_addr = ".".join([str(x) for x in member["Addr"]])
135
            if m_addr in local_ips:
136
                return True
137
            else:
138
                return False
139
        else:
140
            print "Warning: found a memeber without address\n"
141
            print member
142
            return False
143
144
    def write_db_file(self, file_name, channels_list):
145
146 7df1781e Nicolo' Facchi
        file_hdr = "# channel_name,source_addr,source_port," \
147
                   "channel_params,sdp_uri"
148 ff14495b Nicolo' Facchi
149
        db_file = open(file_name, 'w')
150
        portalocker.lock(db_file, portalocker.LOCK_EX)
151
        db_file.write("%s\n" % (file_hdr,))
152
        for c in channels_list:
153
            print "Add channel: %s" % (c,)
154
            db_file.write("%s\n" % (c,))
155
156
        db_file.close()
157
158
    def member_update_event_callback(self, resp):
159
160
        if resp.is_success:
161
            print "Received event: member-update"
162
            resp_body = resp.body
163
            members = resp_body["Members"]
164
165
            for m in members:
166
                # For now we consider only "alive" members
167
                if m["Status"] == "alive":
168
                    if m["Tags"]:
169
                        if m["Tags"].get(self.tag_name):
170
                            c_tag = m["Tags"].get(self.tag_name)
171
                            if c_tag not in self.last_channels_tags_list:
172
                                self.update_db_from_members()
173
                                return 0
174 cdd8478b Nicolo' Facchi
175
            if self.last_channels_tags_list:
176
                self.update_db_from_members()
177 ff14495b Nicolo' Facchi
        else:
178
            sys.stderr.write("Serf streamed event failed\n")
179
            sys.stderr.write("%s" % (resp.error,))
180
181
        return 0
182
183
    def listen_for_member_update_events(self, ch_dbfile):
184
        print "Database file: %s" % (ch_dbfile,)
185
186 cdd8478b Nicolo' Facchi
        members_updated = False
187 434d5e23 Nicolo' Facchi
        while True:
188 ff14495b Nicolo' Facchi
189 434d5e23 Nicolo' Facchi
            # Write the db file based on the current members channels tags
190
            sleep_time = 5
191 ff14495b Nicolo' Facchi
192 434d5e23 Nicolo' Facchi
            self.client = serf.Client("%s:%d" % (self.rpc_address,
193
                                      self.rpc_port),
194
                                      auto_reconnect=True)
195
196
            while not members_updated:
197
                try:
198
                    self.client.connect()
199
200
                    self.ch_dbfile = ch_dbfile
201
                    if self.update_db_from_members():
202
                        members_updated = True
203
                    else:
204
                        time.sleep(sleep_time)
205
                except serf._exceptions.ConnectionError:
206
                    print "Client connection error (sleep %d)" % (sleep_time,)
207
                    time.sleep(sleep_time)
208
209
            try:
210
                # Register callback for memebr update events
211
                # Todo: handle sigint
212
                self.client.stream(Type="member-update").add_callback(
213
                                   self.member_update_event_callback).request(
214
                                   timeout=120)
215 cdd8478b Nicolo' Facchi
                self.client.disconnect()
216
                members_updated = False
217 434d5e23 Nicolo' Facchi
            except serf._exceptions.ConnectionError:
218
                print "Client connection error (sleep %d)" % (sleep_time,)
219
                time.sleep(sleep_time)
220
            except KeyboardInterrupt:
221
                print "Disconnection from RPC deamon"
222
                self.client.disconnect()
223
                return
224 ff14495b Nicolo' Facchi
225
    def update_db_from_members(self):
226
        # WARNING: This method assumes the connection towards the RPC deamon
227
        # is already open and the client seved in self.client
228
229
        if not self.client:
230 434d5e23 Nicolo' Facchi
            return False
231 ff14495b Nicolo' Facchi
232
        # Retrieve serf members
233
        self.client.members()
234
        resp = self.client.request(timeout=5)
235
236
        if resp[0].is_success:
237
            resp_body = resp[0].body
238
            members = resp_body["Members"]
239
240
            # Retrieve channel tags
241
            self.last_channels_tags_list = []
242
            channel_tags_list = []
243
244
            for m in members:
245
                # For now we consider only "alive" members
246
                if m["Status"] == "alive":
247
                    if m["Tags"]:
248
                        if m["Tags"].get(self.tag_name):
249
                            c_tag = m["Tags"].get(self.tag_name)
250
                            channel_tags_list.append(c_tag)
251
                            self.last_channels_tags_list.append(c_tag)
252
253
            # Build channels list
254
            channels_list = []
255
256
            for t_comp in channel_tags_list:
257
                # Decode the channel tag
258
                t = self.decode(t_comp)
259
260
                # Each member can have more than one channel.
261
                # Channels are separated by the ";" character.
262
                channels_list += t.split(";")
263
264
            # Write database file
265
            self.write_db_file(self.ch_dbfile, channels_list)
266
267 434d5e23 Nicolo' Facchi
            return True
268
269 ff14495b Nicolo' Facchi
        else:
270 434d5e23 Nicolo' Facchi
            self.write_db_file(self.ch_dbfile, [])
271 ff14495b Nicolo' Facchi
            sys.stderr.write("Serf members command failed\n")
272
            sys.stderr.write("%s" % (resp[0].error,))
273 434d5e23 Nicolo' Facchi
            return False
274 ff14495b Nicolo' Facchi
275
    def delete_channel(self, ch_addr, ch_port):
276
        resp = self.get_members()
277
278 434d5e23 Nicolo' Facchi
        if not resp:
279
            return
280
281 ff14495b Nicolo' Facchi
        if resp[0].is_success:
282
            resp_body = resp[0].body
283
            members = resp_body["Members"]
284
285
            # Used to save the channels tag of the local memebr
286
            local_node_channels = None
287
            channels_tag_exist = False
288
            new_channels_string = ""
289
            delete_channel = False
290
291
            # find all local IP addresses and save them in local_ips
292
            local_ips = self.get_local_ips()
293
294
            for m in members:
295
                # For now we consider only "alive" members
296
                if m["Status"] == "alive":
297
                    if self.is_local_member(m, local_ips):
298
                        if m["Tags"]:
299
                            if m["Tags"].get(self.tag_name):
300
                                channels_tag_exist = True
301
                            local_node_channels = m["Tags"].get(self.tag_name)
302
303
            if local_node_channels:
304
                # Decompress and decode the local channel and compare to the
305
                # channel we want to delete
306
                # The channel are compared only considering the
307
                # address and the port
308
                channels = self.decode(local_node_channels)
309
310
                # Each member can have more than one channel.
311
                # Channels are separated by the ";" character.
312
                channels_list = channels.split(";")
313
314
                for c in channels_list:
315 7df1781e Nicolo' Facchi
                    [_, caddr, cport, _, _] = c.split(",")
316 ff14495b Nicolo' Facchi
317
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
318
                        # This is not the channel we want to delete.
319
                        # Add it to the new channels string
320
                        if new_channels_string:
321
                            new_channels_string += ";" + c
322
                        else:
323
                            new_channels_string = c
324
                    else:
325
                        delete_channel = True
326
                        print "Delete channel: %s" % (c,)
327
328
            if new_channels_string and delete_channel:
329
                # If new_channels_string is not empty we just need to update
330
                # the channels tag through the RPC tags -set call.
331
332
                print "Update channels: %s" % (new_channels_string,)
333
334
                # Encode the string
335
                ch_str_comp = self.encode(new_channels_string)
336
337
                # Update (or add) the channels tag.
338
                # This is done through the RCP tags call
339
340
                resp = self.set_tag({self.tag_name: ch_str_comp})
341
342 434d5e23 Nicolo' Facchi
                if not resp:
343
                    return
344
345 ff14495b Nicolo' Facchi
                if not resp[0].is_success:
346
                    sys.stderr.write("Serf tags set command failed\n")
347
                    sys.stderr.write("%s" % (resp[0].error,))
348
349
            elif not new_channels_string and channels_tag_exist:
350
                # If new_channels_string is empty but channels_tag_exist
351
                # is True we can delete che cahnnels tag through the RPC
352
                # tags -delete call
353
                print "Delete tag: %s" % (self.tag_name,)
354
355
                resp = self.del_tag((self.tag_name,))
356
357 434d5e23 Nicolo' Facchi
                if not resp:
358
                    return
359
360 ff14495b Nicolo' Facchi
                if not resp[0].is_success:
361
                    sys.stderr.write("Serf tags delete command failed\n")
362
                    sys.stderr.write("%s" % (resp[0].error,))
363
364
        else:
365
            sys.stderr.write("Serf members command failed\n")
366
            sys.stderr.write("%s" % (resp[0].error,))
367
            return
368
369 7df1781e Nicolo' Facchi
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt, ch_sdpuri):
370 ff14495b Nicolo' Facchi
        # Build new channel string
371 7df1781e Nicolo' Facchi
        ch_str = '%s,%s,%d,%s,%s' % (ch_name, ch_addr, ch_port,
372
                                     ch_txt, ch_sdpuri)
373 ff14495b Nicolo' Facchi
        print "Add channel: %s\n" % (ch_str,)
374
375
        # Retrieve informations from all the serf memebrs.
376
        resp = self.get_members()
377
378 434d5e23 Nicolo' Facchi
        if not resp:
379
            return
380
381 ff14495b Nicolo' Facchi
        if resp[0].is_success:
382
            resp_body = resp[0].body
383
            members = resp_body["Members"]
384
385
            # One encoded channels string for each "alive" member
386
            nodes_channels_list = []
387
            # Used to save the channels tag of the local memebr
388
            local_node_channels = None
389
390
            # find all local IP addresses and save them in local_ips
391
            local_ips = self.get_local_ips()
392
393
            for m in members:
394
                # For now we consider only "alive" members
395
                if m["Status"] == "alive":
396
397
                    # Check if this is the local member
398
                    local_member = self.is_local_member(m, local_ips)
399
400
                    # Save the channels tag
401
                    if m["Tags"]:
402
                        node_channels = m["Tags"].get(self.tag_name)
403
404
                        if node_channels:
405
                            nodes_channels_list.append(node_channels)
406
                            if local_member:
407
                                local_node_channels = node_channels
408
409
            # Don't add the new channel if it already exists
410
            for channels_comp in nodes_channels_list:
411
                # Dont' consider empty strings
412
                if not channels_comp:
413
                    continue
414
415
                # Decompress and decode each channel and compare to the new
416
                # channel we want to set
417
                # The channel are compared only considering the
418
                # address and the port
419
                channels = self.decode(channels_comp)
420
421
                # Each member can have more than one channel.
422
                # Channels are separated by the ";" character.
423
                channels_list = channels.split(";")
424
425
                for c in channels_list:
426 7df1781e Nicolo' Facchi
                    [_, caddr, cport, _, _] = c.split(",")
427 ff14495b Nicolo' Facchi
428
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
429
                        print "Channel already exists"
430
                        return
431
432
            # If we arrive here this means that we are trying to add a new
433
            # channel.
434
            # Encode and compress data
435
            if local_node_channels:
436
                ch_str = ';'.join([self.decode(
437
                                  local_node_channels),
438
                                  ch_str])
439
            ch_str_comp = self.encode(ch_str)
440
441
            # Update (or add) the channels tag.
442
            # This is done through the RCP tags call
443
            resp = self.set_tag({self.tag_name: ch_str_comp})
444
445 434d5e23 Nicolo' Facchi
            if not resp:
446
                return
447
448 ff14495b Nicolo' Facchi
            if not resp[0].is_success:
449
                sys.stderr.write("Serf tags command failed\n")
450
                sys.stderr.write("%s" % (resp[0].error,))
451
452
        else:
453
            sys.stderr.write("Serf members command failed\n")
454
            sys.stderr.write("%s" % (resp[0].error,))
455
            return
456
457
    def __str__(self):
458
        ret_str = "PSNG Serf RPC client: "
459
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
460
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
461
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
462
        return ret_str
463
464
465
def psng_serf_client_init():
466
    parser = argparse.ArgumentParser()
467
468
    # Optionals
469
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
470
                        help="PeerStreamer Next-Generation source tag name",
471
                        dest="tagname")
472
473
    # Mandatory for all modes
474
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
475
                        help="IP address of the Serf RPC server",
476
                        dest="rpcaddress",
477
                        action=argparse_actions.ProperIpFormatAction)
478
    parser.add_argument("-p", "--rpcport", type=int, required=True,
479
                        help="TCP port of the Serf RPC server",
480
                        dest="rpcport", choices=range(0, 65536),
481
                        metavar="[0-65535]")
482
483
    subparsers = parser.add_subparsers(dest="command")
484
    # Set PeerStreamer Next-Generation source tag
485
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
486
                                       "PeerStreamer Next-Generation "
487
                                       "source tag (Call RPC tags --set). "
488
                                       "If this node has already a "
489
                                       "PeerStreamer Next-Generation source "
490
                                       "tag associated, then the new channel "
491
                                       "is appended to the existing ones.")
492
    parser_set.add_argument("caddr", type=str,
493
                            help="Source channel IP address",
494
                            action=argparse_actions.ProperIpFormatAction)
495
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
496
                            help="Source channel port",
497
                            metavar="[0-65535]")
498
    parser_set.add_argument("cname", type=str,
499
                            help="Source channel name")
500
    parser_set.add_argument("ctxt", type=str,
501
                            help="Source channel additional parameters")
502 7df1781e Nicolo' Facchi
    parser_set.add_argument("csdpuri", type=str,
503
                            help="SDP URI of the channel")
504 ff14495b Nicolo' Facchi
505
    # Delete PeerStreamer Next-Generation source tag
506
    parser_del = subparsers.add_parser("del", help="Delete a channel "
507
                                       "identified by a source address, a "
508
                                       "source port and a name. If the "
509
                                       "resulting PeerStreamer "
510
                                       "Next-Generation source tag is empty, "
511
                                       "then it will be deleted by calling "
512
                                       "the RPC procedure tags --delete.")
513
    parser_del.add_argument("caddr", type=str,
514
                            help="Source channel IP address",
515
                            action=argparse_actions.ProperIpFormatAction)
516
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
517
                            help="Source channel port",
518
                            metavar="[0-65535]")
519
    # parser_del.add_argument("cname", type=str,
520
    #                         help="Source channel name")
521
522
    parser_db = subparsers.add_parser("bg",
523
                                      help="Run in background and keep the "
524
                                      "database file updated by listening to "
525
                                      "member-update Serf events.")
526
    parser_db.add_argument("dbfile", type=str,
527
                           help="Channels database file")
528
529
    try:
530
        args = parser.parse_args()
531
532
        tag_name = args.tagname
533
        rpc_address = args.rpcaddress
534
        rpc_port = args.rpcport
535
536
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
537
        print(serf_client)
538
539
        command = args.command
540
        if command == "bg":
541
            ch_dbfile = args.dbfile
542
            serf_client.listen_for_member_update_events(ch_dbfile)
543
        elif command == "set":
544
            ch_addr = args.caddr
545
            ch_port = args.cport
546
            ch_name = args.cname
547
            ch_txt = args.ctxt
548 7df1781e Nicolo' Facchi
            ch_sdpuri = args.csdpuri
549
            serf_client.set_new_channel(ch_addr, ch_port, ch_name,
550
                                        ch_txt, ch_sdpuri)
551 ff14495b Nicolo' Facchi
        elif command == "del":
552
            ch_addr = args.caddr
553
            ch_port = args.cport
554
            serf_client.delete_channel(ch_addr, ch_port)
555
        else:
556
            print "Unknown mode"
557
            return -1
558
559
    except argparse_actions.InvalidIp as e:
560
        print "IP address is invalid: {0}".format(e.ip)
561
562
563
if __name__ == "__main__":
564
    psng_serf_client_init()