Statistics
| Branch: | Tag: | Revision:

psng-pyserf / psng-pyserf.py @ master

History | View | Annotate | Download (20.5 KB)

1
#!/usr/bin/env python
2

    
3
# BSD 3-Clause License
4
#
5
# Copyright (c) 2017, netCommons H2020 project
6
# All rights reserved.
7
#
8
# Redistribution and use in source and binary forms, with or without
9
# modification, are permitted provided that the following conditions are met:
10
#
11
# * Redistributions of source code must retain the above copyright notice, this
12
# list of conditions and the following disclaimer.
13
#
14
# * Redistributions in binary form must reproduce the above copyright notice,
15
# this list of conditions and the following disclaimer in the documentation
16
# and/or other materials provided with the distribution.
17
#
18
# * Neither the name of the copyright holder nor the names of its contributors
19
# may be used to endorse or promote products derived from this software without
20
# specific prior written permission.
21
#
22
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
26
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
# POSSIBILITY OF SUCH DAMAGE.
33

    
34
import argparse
35
import argparse_actions
36
import serf
37
import sys
38
import base64
39
import bz2
40
from pyroute2 import IPRoute
41
import portalocker
42
import time
43

    
44

    
45
class PsngSerfClient:
46

    
47
    def __init__(self, tag_name, rpc_address, rpc_port):
48
        self.tag_name = tag_name
49
        self.rpc_address = rpc_address
50
        self.rpc_port = rpc_port
51
        self.ch_dbfile = None
52
        self.client = None
53
        # Remember all tags parsed the last time
54
        self.last_channels_tags_list = []
55

    
56
    def encode_and_compress(self, s):
57
        ret_data = base64.b64encode(s)
58
        ret_data = bz2.compress(ret_data)
59

    
60
        return ret_data
61

    
62
    def decompress_and_decode(self, data):
63
        ret_str = bz2.decompress(data)
64
        ret_str = base64.b64decode(ret_str)
65

    
66
        return ret_str
67

    
68
    def encode(self, s):
69
        ret_data = base64.b64encode(s)
70

    
71
        return ret_data
72

    
73
    def decode(self, data):
74
        ret_str = base64.b64decode(data)
75

    
76
        return ret_str
77

    
78
    def get_local_ips(self):
79
        ip = IPRoute()
80
        local_if = ip.get_addr()
81
        local_ips = []
82
        for interf in local_if:
83
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
84

    
85
        return local_ips
86

    
87
    def get_members(self):
88
        # Retrieve informations from all the serf memebrs.
89
        # This is done through the RPC members call.
90

    
91
        resp = None
92

    
93
        try:
94
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
95
            client.connect()
96
            client.members()
97
            resp = client.request(timeout=5)
98
            client.disconnect()
99
        except serf._exceptions.ConnectionError:
100
            print "Connection error"
101

    
102
        return resp
103

    
104
    def set_tag(self, tag_dict):
105
        resp = None
106

    
107
        try:
108
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
109
            client.connect()
110
            client.tags(Tags=tag_dict)
111
            resp = client.request()
112
            client.disconnect()
113
        except serf._exceptions.ConnectionError:
114
            print "Connection error"
115

    
116
        return resp
117

    
118
    def del_tag(self, tag_names_list):
119
        resp = None
120

    
121
        try:
122
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
123
            client.connect()
124
            client.tags(DeleteTags=tag_names_list)
125
            resp = client.request()
126
            client.disconnect()
127
        except serf._exceptions.ConnectionError:
128
            print "Connection error"
129

    
130
        return resp
131

    
132
    def is_local_member(self, member, local_ips):
133
        if member["Addr"]:
134
            m_addr = ".".join([str(x) for x in member["Addr"]])
135
            if m_addr in local_ips:
136
                return True
137
            else:
138
                return False
139
        else:
140
            print "Warning: found a memeber without address\n"
141
            print member
142
            return False
143

    
144
    def write_db_file(self, file_name, channels_list):
145

    
146
        file_hdr = "# channel_name,source_addr,source_port," \
147
                   "channel_params,sdp_uri"
148

    
149
        db_file = open(file_name, 'w')
150
        portalocker.lock(db_file, portalocker.LOCK_EX)
151
        db_file.write("%s\n" % (file_hdr,))
152
        for c in channels_list:
153
            print "Add channel: %s" % (c,)
154
            db_file.write("%s\n" % (c,))
155

    
156
        db_file.close()
157

    
158
    def member_update_event_callback(self, resp):
159

    
160
        if resp.is_success:
161
            print "Received event: member-update"
162
            resp_body = resp.body
163
            members = resp_body["Members"]
164

    
165
            for m in members:
166
                # For now we consider only "alive" members
167
                if m["Status"] == "alive":
168
                    if m["Tags"]:
169
                        if m["Tags"].get(self.tag_name):
170
                            c_tag = m["Tags"].get(self.tag_name)
171
                            if c_tag not in self.last_channels_tags_list:
172
                                self.update_db_from_members()
173
                                return 0
174

    
175
            if self.last_channels_tags_list:
176
                self.update_db_from_members()
177
        else:
178
            sys.stderr.write("Serf streamed event failed\n")
179
            sys.stderr.write("%s" % (resp.error,))
180

    
181
        return 0
182

    
183
    def listen_for_member_update_events(self, ch_dbfile):
184
        print "Database file: %s" % (ch_dbfile,)
185

    
186
        members_updated = False
187
        while True:
188

    
189
            # Write the db file based on the current members channels tags
190
            sleep_time = 5
191

    
192
            self.client = serf.Client("%s:%d" % (self.rpc_address,
193
                                      self.rpc_port),
194
                                      auto_reconnect=True)
195

    
196
            while not members_updated:
197
                try:
198
                    self.client.connect()
199

    
200
                    self.ch_dbfile = ch_dbfile
201
                    if self.update_db_from_members():
202
                        members_updated = True
203
                    else:
204
                        time.sleep(sleep_time)
205
                except serf._exceptions.ConnectionError:
206
                    print "Client connection error (sleep %d)" % (sleep_time,)
207
                    time.sleep(sleep_time)
208

    
209
            try:
210
                # Register callback for memebr update events
211
                # Todo: handle sigint
212
                self.client.stream(Type="member-update").add_callback(
213
                                   self.member_update_event_callback).request(
214
                                   timeout=120)
215
                self.client.disconnect()
216
                members_updated = False
217
            except serf._exceptions.ConnectionError:
218
                print "Client connection error (sleep %d)" % (sleep_time,)
219
                time.sleep(sleep_time)
220
            except KeyboardInterrupt:
221
                print "Disconnection from RPC deamon"
222
                self.client.disconnect()
223
                return
224

    
225
    def update_db_from_members(self):
226
        # WARNING: This method assumes the connection towards the RPC deamon
227
        # is already open and the client seved in self.client
228

    
229
        if not self.client:
230
            return False
231

    
232
        # Retrieve serf members
233
        self.client.members()
234
        resp = self.client.request(timeout=5)
235

    
236
        if resp[0].is_success:
237
            resp_body = resp[0].body
238
            members = resp_body["Members"]
239

    
240
            # Retrieve channel tags
241
            self.last_channels_tags_list = []
242
            channel_tags_list = []
243

    
244
            for m in members:
245
                # For now we consider only "alive" members
246
                if m["Status"] == "alive":
247
                    if m["Tags"]:
248
                        if m["Tags"].get(self.tag_name):
249
                            c_tag = m["Tags"].get(self.tag_name)
250
                            channel_tags_list.append(c_tag)
251
                            self.last_channels_tags_list.append(c_tag)
252

    
253
            # Build channels list
254
            channels_list = []
255

    
256
            for t_comp in channel_tags_list:
257
                # Decode the channel tag
258
                t = self.decode(t_comp)
259

    
260
                # Each member can have more than one channel.
261
                # Channels are separated by the ";" character.
262
                channels_list += t.split(";")
263

    
264
            # Write database file
265
            self.write_db_file(self.ch_dbfile, channels_list)
266

    
267
            return True
268

    
269
        else:
270
            self.write_db_file(self.ch_dbfile, [])
271
            sys.stderr.write("Serf members command failed\n")
272
            sys.stderr.write("%s" % (resp[0].error,))
273
            return False
274

    
275
    def delete_channel(self, ch_addr, ch_port):
276
        resp = self.get_members()
277

    
278
        if not resp:
279
            return
280

    
281
        if resp[0].is_success:
282
            resp_body = resp[0].body
283
            members = resp_body["Members"]
284

    
285
            # Used to save the channels tag of the local memebr
286
            local_node_channels = None
287
            channels_tag_exist = False
288
            new_channels_string = ""
289
            delete_channel = False
290

    
291
            # find all local IP addresses and save them in local_ips
292
            local_ips = self.get_local_ips()
293

    
294
            for m in members:
295
                # For now we consider only "alive" members
296
                if m["Status"] == "alive":
297
                    if self.is_local_member(m, local_ips):
298
                        if m["Tags"]:
299
                            if m["Tags"].get(self.tag_name):
300
                                channels_tag_exist = True
301
                            local_node_channels = m["Tags"].get(self.tag_name)
302

    
303
            if local_node_channels:
304
                # Decompress and decode the local channel and compare to the
305
                # channel we want to delete
306
                # The channel are compared only considering the
307
                # address and the port
308
                channels = self.decode(local_node_channels)
309

    
310
                # Each member can have more than one channel.
311
                # Channels are separated by the ";" character.
312
                channels_list = channels.split(";")
313

    
314
                for c in channels_list:
315
                    [_, caddr, cport, _, _] = c.split(",")
316

    
317
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
318
                        # This is not the channel we want to delete.
319
                        # Add it to the new channels string
320
                        if new_channels_string:
321
                            new_channels_string += ";" + c
322
                        else:
323
                            new_channels_string = c
324
                    else:
325
                        delete_channel = True
326
                        print "Delete channel: %s" % (c,)
327

    
328
            if new_channels_string and delete_channel:
329
                # If new_channels_string is not empty we just need to update
330
                # the channels tag through the RPC tags -set call.
331

    
332
                print "Update channels: %s" % (new_channels_string,)
333

    
334
                # Encode the string
335
                ch_str_comp = self.encode(new_channels_string)
336

    
337
                # Update (or add) the channels tag.
338
                # This is done through the RCP tags call
339

    
340
                resp = self.set_tag({self.tag_name: ch_str_comp})
341

    
342
                if not resp:
343
                    return
344

    
345
                if not resp[0].is_success:
346
                    sys.stderr.write("Serf tags set command failed\n")
347
                    sys.stderr.write("%s" % (resp[0].error,))
348

    
349
            elif not new_channels_string and channels_tag_exist:
350
                # If new_channels_string is empty but channels_tag_exist
351
                # is True we can delete che cahnnels tag through the RPC
352
                # tags -delete call
353
                print "Delete tag: %s" % (self.tag_name,)
354

    
355
                resp = self.del_tag((self.tag_name,))
356

    
357
                if not resp:
358
                    return
359

    
360
                if not resp[0].is_success:
361
                    sys.stderr.write("Serf tags delete command failed\n")
362
                    sys.stderr.write("%s" % (resp[0].error,))
363

    
364
        else:
365
            sys.stderr.write("Serf members command failed\n")
366
            sys.stderr.write("%s" % (resp[0].error,))
367
            return
368

    
369
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt, ch_sdpuri):
370
        # Build new channel string
371
        ch_str = '%s,%s,%d,%s,%s' % (ch_name, ch_addr, ch_port,
372
                                     ch_txt, ch_sdpuri)
373
        print "Add channel: %s\n" % (ch_str,)
374

    
375
        # Retrieve informations from all the serf memebrs.
376
        resp = self.get_members()
377

    
378
        if not resp:
379
            return
380

    
381
        if resp[0].is_success:
382
            resp_body = resp[0].body
383
            members = resp_body["Members"]
384

    
385
            # One encoded channels string for each "alive" member
386
            nodes_channels_list = []
387
            # Used to save the channels tag of the local memebr
388
            local_node_channels = None
389

    
390
            # find all local IP addresses and save them in local_ips
391
            local_ips = self.get_local_ips()
392

    
393
            for m in members:
394
                # For now we consider only "alive" members
395
                if m["Status"] == "alive":
396

    
397
                    # Check if this is the local member
398
                    local_member = self.is_local_member(m, local_ips)
399

    
400
                    # Save the channels tag
401
                    if m["Tags"]:
402
                        node_channels = m["Tags"].get(self.tag_name)
403

    
404
                        if node_channels:
405
                            nodes_channels_list.append(node_channels)
406
                            if local_member:
407
                                local_node_channels = node_channels
408

    
409
            # Don't add the new channel if it already exists
410
            for channels_comp in nodes_channels_list:
411
                # Dont' consider empty strings
412
                if not channels_comp:
413
                    continue
414

    
415
                # Decompress and decode each channel and compare to the new
416
                # channel we want to set
417
                # The channel are compared only considering the
418
                # address and the port
419
                channels = self.decode(channels_comp)
420

    
421
                # Each member can have more than one channel.
422
                # Channels are separated by the ";" character.
423
                channels_list = channels.split(";")
424

    
425
                for c in channels_list:
426
                    [_, caddr, cport, _, _] = c.split(",")
427

    
428
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
429
                        print "Channel already exists"
430
                        return
431

    
432
            # If we arrive here this means that we are trying to add a new
433
            # channel.
434
            # Encode and compress data
435
            if local_node_channels:
436
                ch_str = ';'.join([self.decode(
437
                                  local_node_channels),
438
                                  ch_str])
439
            ch_str_comp = self.encode(ch_str)
440

    
441
            # Update (or add) the channels tag.
442
            # This is done through the RCP tags call
443
            resp = self.set_tag({self.tag_name: ch_str_comp})
444

    
445
            if not resp:
446
                return
447

    
448
            if not resp[0].is_success:
449
                sys.stderr.write("Serf tags command failed\n")
450
                sys.stderr.write("%s" % (resp[0].error,))
451

    
452
        else:
453
            sys.stderr.write("Serf members command failed\n")
454
            sys.stderr.write("%s" % (resp[0].error,))
455
            return
456

    
457
    def __str__(self):
458
        ret_str = "PSNG Serf RPC client: "
459
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
460
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
461
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
462
        return ret_str
463

    
464

    
465
def psng_serf_client_init():
466
    parser = argparse.ArgumentParser()
467

    
468
    # Optionals
469
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
470
                        help="PeerStreamer Next-Generation source tag name",
471
                        dest="tagname")
472

    
473
    # Mandatory for all modes
474
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
475
                        help="IP address of the Serf RPC server",
476
                        dest="rpcaddress",
477
                        action=argparse_actions.ProperIpFormatAction)
478
    parser.add_argument("-p", "--rpcport", type=int, required=True,
479
                        help="TCP port of the Serf RPC server",
480
                        dest="rpcport", choices=range(0, 65536),
481
                        metavar="[0-65535]")
482

    
483
    subparsers = parser.add_subparsers(dest="command")
484
    # Set PeerStreamer Next-Generation source tag
485
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
486
                                       "PeerStreamer Next-Generation "
487
                                       "source tag (Call RPC tags --set). "
488
                                       "If this node has already a "
489
                                       "PeerStreamer Next-Generation source "
490
                                       "tag associated, then the new channel "
491
                                       "is appended to the existing ones.")
492
    parser_set.add_argument("caddr", type=str,
493
                            help="Source channel IP address",
494
                            action=argparse_actions.ProperIpFormatAction)
495
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
496
                            help="Source channel port",
497
                            metavar="[0-65535]")
498
    parser_set.add_argument("cname", type=str,
499
                            help="Source channel name")
500
    parser_set.add_argument("ctxt", type=str,
501
                            help="Source channel additional parameters")
502
    parser_set.add_argument("csdpuri", type=str,
503
                            help="SDP URI of the channel")
504

    
505
    # Delete PeerStreamer Next-Generation source tag
506
    parser_del = subparsers.add_parser("del", help="Delete a channel "
507
                                       "identified by a source address, a "
508
                                       "source port and a name. If the "
509
                                       "resulting PeerStreamer "
510
                                       "Next-Generation source tag is empty, "
511
                                       "then it will be deleted by calling "
512
                                       "the RPC procedure tags --delete.")
513
    parser_del.add_argument("caddr", type=str,
514
                            help="Source channel IP address",
515
                            action=argparse_actions.ProperIpFormatAction)
516
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
517
                            help="Source channel port",
518
                            metavar="[0-65535]")
519
    # parser_del.add_argument("cname", type=str,
520
    #                         help="Source channel name")
521

    
522
    parser_db = subparsers.add_parser("bg",
523
                                      help="Run in background and keep the "
524
                                      "database file updated by listening to "
525
                                      "member-update Serf events.")
526
    parser_db.add_argument("dbfile", type=str,
527
                           help="Channels database file")
528

    
529
    try:
530
        args = parser.parse_args()
531

    
532
        tag_name = args.tagname
533
        rpc_address = args.rpcaddress
534
        rpc_port = args.rpcport
535

    
536
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
537
        print(serf_client)
538

    
539
        command = args.command
540
        if command == "bg":
541
            ch_dbfile = args.dbfile
542
            serf_client.listen_for_member_update_events(ch_dbfile)
543
        elif command == "set":
544
            ch_addr = args.caddr
545
            ch_port = args.cport
546
            ch_name = args.cname
547
            ch_txt = args.ctxt
548
            ch_sdpuri = args.csdpuri
549
            serf_client.set_new_channel(ch_addr, ch_port, ch_name,
550
                                        ch_txt, ch_sdpuri)
551
        elif command == "del":
552
            ch_addr = args.caddr
553
            ch_port = args.cport
554
            serf_client.delete_channel(ch_addr, ch_port)
555
        else:
556
            print "Unknown mode"
557
            return -1
558

    
559
    except argparse_actions.InvalidIp as e:
560
        print "IP address is invalid: {0}".format(e.ip)
561

    
562

    
563
if __name__ == "__main__":
564
    psng_serf_client_init()