Statistics
| Branch: | Tag: | Revision:

psng-pyserf / psng-pyserf.py @ cdd8478b

History | View | Annotate | Download (19 KB)

1
#!/usr/bin/env python
2

    
3
import argparse
4
import argparse_actions
5
import serf
6
import sys
7
import base64
8
import bz2
9
from pyroute2 import IPRoute
10
import portalocker
11
import time
12

    
13

    
14
class PsngSerfClient:
15

    
16
    def __init__(self, tag_name, rpc_address, rpc_port):
17
        self.tag_name = tag_name
18
        self.rpc_address = rpc_address
19
        self.rpc_port = rpc_port
20
        self.ch_dbfile = None
21
        self.client = None
22
        # Remember all tags parsed the last time
23
        self.last_channels_tags_list = []
24

    
25
    def encode_and_compress(self, s):
26
        ret_data = base64.b64encode(s)
27
        ret_data = bz2.compress(ret_data)
28

    
29
        return ret_data
30

    
31
    def decompress_and_decode(self, data):
32
        ret_str = bz2.decompress(data)
33
        ret_str = base64.b64decode(ret_str)
34

    
35
        return ret_str
36

    
37
    def encode(self, s):
38
        ret_data = base64.b64encode(s)
39

    
40
        return ret_data
41

    
42
    def decode(self, data):
43
        ret_str = base64.b64decode(data)
44

    
45
        return ret_str
46

    
47
    def get_local_ips(self):
48
        ip = IPRoute()
49
        local_if = ip.get_addr()
50
        local_ips = []
51
        for interf in local_if:
52
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
53

    
54
        return local_ips
55

    
56
    def get_members(self):
57
        # Retrieve informations from all the serf memebrs.
58
        # This is done through the RPC members call.
59

    
60
        resp = None
61

    
62
        try:
63
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
64
            client.connect()
65
            client.members()
66
            resp = client.request(timeout=5)
67
            client.disconnect()
68
        except serf._exceptions.ConnectionError:
69
            print "Connection error"
70

    
71
        return resp
72

    
73
    def set_tag(self, tag_dict):
74
        resp = None
75

    
76
        try:
77
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
78
            client.connect()
79
            client.tags(Tags=tag_dict)
80
            resp = client.request()
81
            client.disconnect()
82
        except serf._exceptions.ConnectionError:
83
            print "Connection error"
84

    
85
        return resp
86

    
87
    def del_tag(self, tag_names_list):
88
        resp = None
89

    
90
        try:
91
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
92
            client.connect()
93
            client.tags(DeleteTags=tag_names_list)
94
            resp = client.request()
95
            client.disconnect()
96
        except serf._exceptions.ConnectionError:
97
            print "Connection error"
98

    
99
        return resp
100

    
101
    def is_local_member(self, member, local_ips):
102
        if member["Addr"]:
103
            m_addr = ".".join([str(x) for x in member["Addr"]])
104
            if m_addr in local_ips:
105
                return True
106
            else:
107
                return False
108
        else:
109
            print "Warning: found a memeber without address\n"
110
            print member
111
            return False
112

    
113
    def write_db_file(self, file_name, channels_list):
114

    
115
        file_hdr = "# channel_name,source_addr,source_port," \
116
                   "channel_params,sdp_uri"
117

    
118
        db_file = open(file_name, 'w')
119
        portalocker.lock(db_file, portalocker.LOCK_EX)
120
        db_file.write("%s\n" % (file_hdr,))
121
        for c in channels_list:
122
            print "Add channel: %s" % (c,)
123
            db_file.write("%s\n" % (c,))
124

    
125
        db_file.close()
126

    
127
    def member_update_event_callback(self, resp):
128

    
129
        if resp.is_success:
130
            print "Received event: member-update"
131
            resp_body = resp.body
132
            members = resp_body["Members"]
133

    
134
            for m in members:
135
                # For now we consider only "alive" members
136
                if m["Status"] == "alive":
137
                    if m["Tags"]:
138
                        if m["Tags"].get(self.tag_name):
139
                            c_tag = m["Tags"].get(self.tag_name)
140
                            if c_tag not in self.last_channels_tags_list:
141
                                self.update_db_from_members()
142
                                return 0
143

    
144
            if self.last_channels_tags_list:
145
                self.update_db_from_members()
146
        else:
147
            sys.stderr.write("Serf streamed event failed\n")
148
            sys.stderr.write("%s" % (resp.error,))
149

    
150
        return 0
151

    
152
    def listen_for_member_update_events(self, ch_dbfile):
153
        print "Database file: %s" % (ch_dbfile,)
154

    
155
        members_updated = False
156
        while True:
157

    
158
            # Write the db file based on the current members channels tags
159
            sleep_time = 5
160

    
161
            self.client = serf.Client("%s:%d" % (self.rpc_address,
162
                                      self.rpc_port),
163
                                      auto_reconnect=True)
164

    
165
            while not members_updated:
166
                try:
167
                    self.client.connect()
168

    
169
                    self.ch_dbfile = ch_dbfile
170
                    if self.update_db_from_members():
171
                        members_updated = True
172
                    else:
173
                        time.sleep(sleep_time)
174
                except serf._exceptions.ConnectionError:
175
                    print "Client connection error (sleep %d)" % (sleep_time,)
176
                    time.sleep(sleep_time)
177

    
178
            try:
179
                # Register callback for memebr update events
180
                # Todo: handle sigint
181
                self.client.stream(Type="member-update").add_callback(
182
                                   self.member_update_event_callback).request(
183
                                   timeout=120)
184
                self.client.disconnect()
185
                members_updated = False
186
            except serf._exceptions.ConnectionError:
187
                print "Client connection error (sleep %d)" % (sleep_time,)
188
                time.sleep(sleep_time)
189
            except KeyboardInterrupt:
190
                print "Disconnection from RPC deamon"
191
                self.client.disconnect()
192
                return
193

    
194
    def update_db_from_members(self):
195
        # WARNING: This method assumes the connection towards the RPC deamon
196
        # is already open and the client seved in self.client
197

    
198
        if not self.client:
199
            return False
200

    
201
        # Retrieve serf members
202
        self.client.members()
203
        resp = self.client.request(timeout=5)
204

    
205
        if resp[0].is_success:
206
            resp_body = resp[0].body
207
            members = resp_body["Members"]
208

    
209
            # Retrieve channel tags
210
            self.last_channels_tags_list = []
211
            channel_tags_list = []
212

    
213
            for m in members:
214
                # For now we consider only "alive" members
215
                if m["Status"] == "alive":
216
                    if m["Tags"]:
217
                        if m["Tags"].get(self.tag_name):
218
                            c_tag = m["Tags"].get(self.tag_name)
219
                            channel_tags_list.append(c_tag)
220
                            self.last_channels_tags_list.append(c_tag)
221

    
222
            # Build channels list
223
            channels_list = []
224

    
225
            for t_comp in channel_tags_list:
226
                # Decode the channel tag
227
                t = self.decode(t_comp)
228

    
229
                # Each member can have more than one channel.
230
                # Channels are separated by the ";" character.
231
                channels_list += t.split(";")
232

    
233
            # Write database file
234
            self.write_db_file(self.ch_dbfile, channels_list)
235

    
236
            return True
237

    
238
        else:
239
            self.write_db_file(self.ch_dbfile, [])
240
            sys.stderr.write("Serf members command failed\n")
241
            sys.stderr.write("%s" % (resp[0].error,))
242
            return False
243

    
244
    def delete_channel(self, ch_addr, ch_port):
245
        resp = self.get_members()
246

    
247
        if not resp:
248
            return
249

    
250
        if resp[0].is_success:
251
            resp_body = resp[0].body
252
            members = resp_body["Members"]
253

    
254
            # Used to save the channels tag of the local memebr
255
            local_node_channels = None
256
            channels_tag_exist = False
257
            new_channels_string = ""
258
            delete_channel = False
259

    
260
            # find all local IP addresses and save them in local_ips
261
            local_ips = self.get_local_ips()
262

    
263
            for m in members:
264
                # For now we consider only "alive" members
265
                if m["Status"] == "alive":
266
                    if self.is_local_member(m, local_ips):
267
                        if m["Tags"]:
268
                            if m["Tags"].get(self.tag_name):
269
                                channels_tag_exist = True
270
                            local_node_channels = m["Tags"].get(self.tag_name)
271

    
272
            if local_node_channels:
273
                # Decompress and decode the local channel and compare to the
274
                # channel we want to delete
275
                # The channel are compared only considering the
276
                # address and the port
277
                channels = self.decode(local_node_channels)
278

    
279
                # Each member can have more than one channel.
280
                # Channels are separated by the ";" character.
281
                channels_list = channels.split(";")
282

    
283
                for c in channels_list:
284
                    [_, caddr, cport, _, _] = c.split(",")
285

    
286
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
287
                        # This is not the channel we want to delete.
288
                        # Add it to the new channels string
289
                        if new_channels_string:
290
                            new_channels_string += ";" + c
291
                        else:
292
                            new_channels_string = c
293
                    else:
294
                        delete_channel = True
295
                        print "Delete channel: %s" % (c,)
296

    
297
            if new_channels_string and delete_channel:
298
                # If new_channels_string is not empty we just need to update
299
                # the channels tag through the RPC tags -set call.
300

    
301
                print "Update channels: %s" % (new_channels_string,)
302

    
303
                # Encode the string
304
                ch_str_comp = self.encode(new_channels_string)
305

    
306
                # Update (or add) the channels tag.
307
                # This is done through the RCP tags call
308

    
309
                resp = self.set_tag({self.tag_name: ch_str_comp})
310

    
311
                if not resp:
312
                    return
313

    
314
                if not resp[0].is_success:
315
                    sys.stderr.write("Serf tags set command failed\n")
316
                    sys.stderr.write("%s" % (resp[0].error,))
317

    
318
            elif not new_channels_string and channels_tag_exist:
319
                # If new_channels_string is empty but channels_tag_exist
320
                # is True we can delete che cahnnels tag through the RPC
321
                # tags -delete call
322
                print "Delete tag: %s" % (self.tag_name,)
323

    
324
                resp = self.del_tag((self.tag_name,))
325

    
326
                if not resp:
327
                    return
328

    
329
                if not resp[0].is_success:
330
                    sys.stderr.write("Serf tags delete command failed\n")
331
                    sys.stderr.write("%s" % (resp[0].error,))
332

    
333
        else:
334
            sys.stderr.write("Serf members command failed\n")
335
            sys.stderr.write("%s" % (resp[0].error,))
336
            return
337

    
338
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt, ch_sdpuri):
339
        # Build new channel string
340
        ch_str = '%s,%s,%d,%s,%s' % (ch_name, ch_addr, ch_port,
341
                                     ch_txt, ch_sdpuri)
342
        print "Add channel: %s\n" % (ch_str,)
343

    
344
        # Retrieve informations from all the serf memebrs.
345
        resp = self.get_members()
346

    
347
        if not resp:
348
            return
349

    
350
        if resp[0].is_success:
351
            resp_body = resp[0].body
352
            members = resp_body["Members"]
353

    
354
            # One encoded channels string for each "alive" member
355
            nodes_channels_list = []
356
            # Used to save the channels tag of the local memebr
357
            local_node_channels = None
358

    
359
            # find all local IP addresses and save them in local_ips
360
            local_ips = self.get_local_ips()
361

    
362
            for m in members:
363
                # For now we consider only "alive" members
364
                if m["Status"] == "alive":
365

    
366
                    # Check if this is the local member
367
                    local_member = self.is_local_member(m, local_ips)
368

    
369
                    # Save the channels tag
370
                    if m["Tags"]:
371
                        node_channels = m["Tags"].get(self.tag_name)
372

    
373
                        if node_channels:
374
                            nodes_channels_list.append(node_channels)
375
                            if local_member:
376
                                local_node_channels = node_channels
377

    
378
            # Don't add the new channel if it already exists
379
            for channels_comp in nodes_channels_list:
380
                # Dont' consider empty strings
381
                if not channels_comp:
382
                    continue
383

    
384
                # Decompress and decode each channel and compare to the new
385
                # channel we want to set
386
                # The channel are compared only considering the
387
                # address and the port
388
                channels = self.decode(channels_comp)
389

    
390
                # Each member can have more than one channel.
391
                # Channels are separated by the ";" character.
392
                channels_list = channels.split(";")
393

    
394
                for c in channels_list:
395
                    [_, caddr, cport, _, _] = c.split(",")
396

    
397
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
398
                        print "Channel already exists"
399
                        return
400

    
401
            # If we arrive here this means that we are trying to add a new
402
            # channel.
403
            # Encode and compress data
404
            if local_node_channels:
405
                ch_str = ';'.join([self.decode(
406
                                  local_node_channels),
407
                                  ch_str])
408
            ch_str_comp = self.encode(ch_str)
409

    
410
            # Update (or add) the channels tag.
411
            # This is done through the RCP tags call
412
            resp = self.set_tag({self.tag_name: ch_str_comp})
413

    
414
            if not resp:
415
                return
416

    
417
            if not resp[0].is_success:
418
                sys.stderr.write("Serf tags command failed\n")
419
                sys.stderr.write("%s" % (resp[0].error,))
420

    
421
        else:
422
            sys.stderr.write("Serf members command failed\n")
423
            sys.stderr.write("%s" % (resp[0].error,))
424
            return
425

    
426
    def __str__(self):
427
        ret_str = "PSNG Serf RPC client: "
428
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
429
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
430
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
431
        return ret_str
432

    
433

    
434
def psng_serf_client_init():
435
    parser = argparse.ArgumentParser()
436

    
437
    # Optionals
438
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
439
                        help="PeerStreamer Next-Generation source tag name",
440
                        dest="tagname")
441

    
442
    # Mandatory for all modes
443
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
444
                        help="IP address of the Serf RPC server",
445
                        dest="rpcaddress",
446
                        action=argparse_actions.ProperIpFormatAction)
447
    parser.add_argument("-p", "--rpcport", type=int, required=True,
448
                        help="TCP port of the Serf RPC server",
449
                        dest="rpcport", choices=range(0, 65536),
450
                        metavar="[0-65535]")
451

    
452
    subparsers = parser.add_subparsers(dest="command")
453
    # Set PeerStreamer Next-Generation source tag
454
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
455
                                       "PeerStreamer Next-Generation "
456
                                       "source tag (Call RPC tags --set). "
457
                                       "If this node has already a "
458
                                       "PeerStreamer Next-Generation source "
459
                                       "tag associated, then the new channel "
460
                                       "is appended to the existing ones.")
461
    parser_set.add_argument("caddr", type=str,
462
                            help="Source channel IP address",
463
                            action=argparse_actions.ProperIpFormatAction)
464
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
465
                            help="Source channel port",
466
                            metavar="[0-65535]")
467
    parser_set.add_argument("cname", type=str,
468
                            help="Source channel name")
469
    parser_set.add_argument("ctxt", type=str,
470
                            help="Source channel additional parameters")
471
    parser_set.add_argument("csdpuri", type=str,
472
                            help="SDP URI of the channel")
473

    
474
    # Delete PeerStreamer Next-Generation source tag
475
    parser_del = subparsers.add_parser("del", help="Delete a channel "
476
                                       "identified by a source address, a "
477
                                       "source port and a name. If the "
478
                                       "resulting PeerStreamer "
479
                                       "Next-Generation source tag is empty, "
480
                                       "then it will be deleted by calling "
481
                                       "the RPC procedure tags --delete.")
482
    parser_del.add_argument("caddr", type=str,
483
                            help="Source channel IP address",
484
                            action=argparse_actions.ProperIpFormatAction)
485
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
486
                            help="Source channel port",
487
                            metavar="[0-65535]")
488
    # parser_del.add_argument("cname", type=str,
489
    #                         help="Source channel name")
490

    
491
    parser_db = subparsers.add_parser("bg",
492
                                      help="Run in background and keep the "
493
                                      "database file updated by listening to "
494
                                      "member-update Serf events.")
495
    parser_db.add_argument("dbfile", type=str,
496
                           help="Channels database file")
497

    
498
    try:
499
        args = parser.parse_args()
500

    
501
        tag_name = args.tagname
502
        rpc_address = args.rpcaddress
503
        rpc_port = args.rpcport
504

    
505
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
506
        print(serf_client)
507

    
508
        command = args.command
509
        if command == "bg":
510
            ch_dbfile = args.dbfile
511
            serf_client.listen_for_member_update_events(ch_dbfile)
512
        elif command == "set":
513
            ch_addr = args.caddr
514
            ch_port = args.cport
515
            ch_name = args.cname
516
            ch_txt = args.ctxt
517
            ch_sdpuri = args.csdpuri
518
            serf_client.set_new_channel(ch_addr, ch_port, ch_name,
519
                                        ch_txt, ch_sdpuri)
520
        elif command == "del":
521
            ch_addr = args.caddr
522
            ch_port = args.cport
523
            serf_client.delete_channel(ch_addr, ch_port)
524
        else:
525
            print "Unknown mode"
526
            return -1
527

    
528
    except argparse_actions.InvalidIp as e:
529
        print "IP address is invalid: {0}".format(e.ip)
530

    
531

    
532
if __name__ == "__main__":
533
    psng_serf_client_init()