Statistics
| Branch: | Tag: | Revision:

psng-pyserf / psng-pyserf.py @ 434d5e23

History | View | Annotate | Download (18.5 KB)

1
import argparse
2
import argparse_actions
3
import serf
4
import sys
5
import base64
6
import bz2
7
from pyroute2 import IPRoute
8
import portalocker
9
import time
10

    
11

    
12
class PsngSerfClient:
13

    
14
    def __init__(self, tag_name, rpc_address, rpc_port):
15
        self.tag_name = tag_name
16
        self.rpc_address = rpc_address
17
        self.rpc_port = rpc_port
18
        self.ch_dbfile = None
19
        self.client = None
20
        # Remember all tags parsed the last time
21
        self.last_channels_tags_list = []
22

    
23
    def encode_and_compress(self, s):
24
        ret_data = base64.b64encode(s)
25
        ret_data = bz2.compress(ret_data)
26

    
27
        return ret_data
28

    
29
    def decompress_and_decode(self, data):
30
        ret_str = bz2.decompress(data)
31
        ret_str = base64.b64decode(ret_str)
32

    
33
        return ret_str
34

    
35
    def encode(self, s):
36
        ret_data = base64.b64encode(s)
37

    
38
        return ret_data
39

    
40
    def decode(self, data):
41
        ret_str = base64.b64decode(data)
42

    
43
        return ret_str
44

    
45
    def get_local_ips(self):
46
        ip = IPRoute()
47
        local_if = ip.get_addr()
48
        local_ips = []
49
        for interf in local_if:
50
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
51

    
52
        return local_ips
53

    
54
    def get_members(self):
55
        # Retrieve informations from all the serf memebrs.
56
        # This is done through the RPC members call.
57

    
58
        resp = None
59

    
60
        try:
61
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
62
            client.connect()
63
            client.members()
64
            resp = client.request(timeout=5)
65
            client.disconnect()
66
        except serf._exceptions.ConnectionError:
67
            print "Connection error"
68

    
69
        return resp
70

    
71
    def set_tag(self, tag_dict):
72
        resp = None
73

    
74
        try:
75
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
76
            client.connect()
77
            client.tags(Tags=tag_dict)
78
            resp = client.request()
79
            client.disconnect()
80
        except serf._exceptions.ConnectionError:
81
            print "Connection error"
82

    
83
        return resp
84

    
85
    def del_tag(self, tag_names_list):
86
        resp = None
87

    
88
        try:
89
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
90
            client.connect()
91
            client.tags(DeleteTags=tag_names_list)
92
            resp = client.request()
93
            client.disconnect()
94
        except serf._exceptions.ConnectionError:
95
            print "Connection error"
96

    
97
        return resp
98

    
99
    def is_local_member(self, member, local_ips):
100
        if member["Addr"]:
101
            m_addr = ".".join([str(x) for x in member["Addr"]])
102
            if m_addr in local_ips:
103
                return True
104
            else:
105
                return False
106
        else:
107
            print "Warning: found a memeber without address\n"
108
            print member
109
            return False
110

    
111
    def write_db_file(self, file_name, channels_list):
112

    
113
        file_hdr = "# channel_name,source_addr,source_port,channel_params"
114

    
115
        db_file = open(file_name, 'w')
116
        portalocker.lock(db_file, portalocker.LOCK_EX)
117
        db_file.write("%s\n" % (file_hdr,))
118
        for c in channels_list:
119
            print "Add channel: %s" % (c,)
120
            db_file.write("%s\n" % (c,))
121

    
122
        db_file.close()
123

    
124
    def member_update_event_callback(self, resp):
125

    
126
        if resp.is_success:
127
            print "Received event: member-update"
128
            resp_body = resp.body
129
            members = resp_body["Members"]
130

    
131
            for m in members:
132
                # For now we consider only "alive" members
133
                if m["Status"] == "alive":
134
                    if m["Tags"]:
135
                        if m["Tags"].get(self.tag_name):
136
                            c_tag = m["Tags"].get(self.tag_name)
137
                            if c_tag not in self.last_channels_tags_list:
138
                                self.update_db_from_members()
139
                                return 0
140
        else:
141
            sys.stderr.write("Serf streamed event failed\n")
142
            sys.stderr.write("%s" % (resp.error,))
143

    
144
        return 0
145

    
146
    def listen_for_member_update_events(self, ch_dbfile):
147
        print "Database file: %s" % (ch_dbfile,)
148

    
149
        while True:
150

    
151
            # Write the db file based on the current members channels tags
152
            members_updated = False
153
            sleep_time = 5
154

    
155
            self.client = serf.Client("%s:%d" % (self.rpc_address,
156
                                      self.rpc_port),
157
                                      auto_reconnect=True)
158

    
159
            while not members_updated:
160
                try:
161
                    self.client.connect()
162

    
163
                    self.ch_dbfile = ch_dbfile
164
                    if self.update_db_from_members():
165
                        members_updated = True
166
                    else:
167
                        time.sleep(sleep_time)
168
                except serf._exceptions.ConnectionError:
169
                    print "Client connection error (sleep %d)" % (sleep_time,)
170
                    time.sleep(sleep_time)
171

    
172
            try:
173
                # Register callback for memebr update events
174
                # Todo: handle sigint
175
                self.client.stream(Type="member-update").add_callback(
176
                                   self.member_update_event_callback).request(
177
                                   timeout=120)
178
            except serf._exceptions.ConnectionError:
179
                print "Client connection error (sleep %d)" % (sleep_time,)
180
                time.sleep(sleep_time)
181
            except KeyboardInterrupt:
182
                print "Disconnection from RPC deamon"
183
                self.client.disconnect()
184
                return
185

    
186
    def update_db_from_members(self):
187
        # WARNING: This method assumes the connection towards the RPC deamon
188
        # is already open and the client seved in self.client
189

    
190
        if not self.client:
191
            return False
192

    
193
        # Retrieve serf members
194
        self.client.members()
195
        resp = self.client.request(timeout=5)
196

    
197
        if resp[0].is_success:
198
            resp_body = resp[0].body
199
            members = resp_body["Members"]
200

    
201
            # Retrieve channel tags
202
            self.last_channels_tags_list = []
203
            channel_tags_list = []
204

    
205
            for m in members:
206
                # For now we consider only "alive" members
207
                if m["Status"] == "alive":
208
                    if m["Tags"]:
209
                        if m["Tags"].get(self.tag_name):
210
                            c_tag = m["Tags"].get(self.tag_name)
211
                            channel_tags_list.append(c_tag)
212
                            self.last_channels_tags_list.append(c_tag)
213

    
214
            # Build channels list
215
            channels_list = []
216

    
217
            for t_comp in channel_tags_list:
218
                # Decode the channel tag
219
                t = self.decode(t_comp)
220

    
221
                # Each member can have more than one channel.
222
                # Channels are separated by the ";" character.
223
                channels_list += t.split(";")
224

    
225
            # Write database file
226
            self.write_db_file(self.ch_dbfile, channels_list)
227

    
228
            return True
229

    
230
        else:
231
            self.write_db_file(self.ch_dbfile, [])
232
            sys.stderr.write("Serf members command failed\n")
233
            sys.stderr.write("%s" % (resp[0].error,))
234
            return False
235

    
236
    def delete_channel(self, ch_addr, ch_port):
237
        resp = self.get_members()
238

    
239
        if not resp:
240
            return
241

    
242
        if resp[0].is_success:
243
            resp_body = resp[0].body
244
            members = resp_body["Members"]
245

    
246
            # Used to save the channels tag of the local memebr
247
            local_node_channels = None
248
            channels_tag_exist = False
249
            new_channels_string = ""
250
            delete_channel = False
251

    
252
            # find all local IP addresses and save them in local_ips
253
            local_ips = self.get_local_ips()
254

    
255
            for m in members:
256
                # For now we consider only "alive" members
257
                if m["Status"] == "alive":
258
                    if self.is_local_member(m, local_ips):
259
                        if m["Tags"]:
260
                            if m["Tags"].get(self.tag_name):
261
                                channels_tag_exist = True
262
                            local_node_channels = m["Tags"].get(self.tag_name)
263

    
264
            if local_node_channels:
265
                # Decompress and decode the local channel and compare to the
266
                # channel we want to delete
267
                # The channel are compared only considering the
268
                # address and the port
269
                channels = self.decode(local_node_channels)
270

    
271
                # Each member can have more than one channel.
272
                # Channels are separated by the ";" character.
273
                channels_list = channels.split(";")
274

    
275
                for c in channels_list:
276
                    [_, caddr, cport, _] = c.split(",")
277

    
278
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
279
                        # This is not the channel we want to delete.
280
                        # Add it to the new channels string
281
                        if new_channels_string:
282
                            new_channels_string += ";" + c
283
                        else:
284
                            new_channels_string = c
285
                    else:
286
                        delete_channel = True
287
                        print "Delete channel: %s" % (c,)
288

    
289
            if new_channels_string and delete_channel:
290
                # If new_channels_string is not empty we just need to update
291
                # the channels tag through the RPC tags -set call.
292

    
293
                print "Update channels: %s" % (new_channels_string,)
294

    
295
                # Encode the string
296
                ch_str_comp = self.encode(new_channels_string)
297

    
298
                # Update (or add) the channels tag.
299
                # This is done through the RCP tags call
300

    
301
                resp = self.set_tag({self.tag_name: ch_str_comp})
302

    
303
                if not resp:
304
                    return
305

    
306
                if not resp[0].is_success:
307
                    sys.stderr.write("Serf tags set command failed\n")
308
                    sys.stderr.write("%s" % (resp[0].error,))
309

    
310
            elif not new_channels_string and channels_tag_exist:
311
                # If new_channels_string is empty but channels_tag_exist
312
                # is True we can delete che cahnnels tag through the RPC
313
                # tags -delete call
314
                print "Delete tag: %s" % (self.tag_name,)
315

    
316
                resp = self.del_tag((self.tag_name,))
317

    
318
                if not resp:
319
                    return
320

    
321
                if not resp[0].is_success:
322
                    sys.stderr.write("Serf tags delete command failed\n")
323
                    sys.stderr.write("%s" % (resp[0].error,))
324

    
325
        else:
326
            sys.stderr.write("Serf members command failed\n")
327
            sys.stderr.write("%s" % (resp[0].error,))
328
            return
329

    
330
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt):
331
        # Build new channel string
332
        ch_str = '%s,%s,%d,%s' % (ch_name, ch_addr, ch_port, ch_txt)
333
        print "Add channel: %s\n" % (ch_str,)
334

    
335
        # Retrieve informations from all the serf memebrs.
336
        resp = self.get_members()
337

    
338
        if not resp:
339
            return
340

    
341
        if resp[0].is_success:
342
            resp_body = resp[0].body
343
            members = resp_body["Members"]
344

    
345
            # One encoded channels string for each "alive" member
346
            nodes_channels_list = []
347
            # Used to save the channels tag of the local memebr
348
            local_node_channels = None
349

    
350
            # find all local IP addresses and save them in local_ips
351
            local_ips = self.get_local_ips()
352

    
353
            for m in members:
354
                # For now we consider only "alive" members
355
                if m["Status"] == "alive":
356

    
357
                    # Check if this is the local member
358
                    local_member = self.is_local_member(m, local_ips)
359

    
360
                    # Save the channels tag
361
                    if m["Tags"]:
362
                        node_channels = m["Tags"].get(self.tag_name)
363

    
364
                        if node_channels:
365
                            nodes_channels_list.append(node_channels)
366
                            if local_member:
367
                                local_node_channels = node_channels
368

    
369
            # Don't add the new channel if it already exists
370
            for channels_comp in nodes_channels_list:
371
                # Dont' consider empty strings
372
                if not channels_comp:
373
                    continue
374

    
375
                # Decompress and decode each channel and compare to the new
376
                # channel we want to set
377
                # The channel are compared only considering the
378
                # address and the port
379
                channels = self.decode(channels_comp)
380

    
381
                # Each member can have more than one channel.
382
                # Channels are separated by the ";" character.
383
                channels_list = channels.split(";")
384

    
385
                for c in channels_list:
386
                    [_, caddr, cport, _] = c.split(",")
387

    
388
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
389
                        print "Channel already exists"
390
                        return
391

    
392
            # If we arrive here this means that we are trying to add a new
393
            # channel.
394
            # Encode and compress data
395
            if local_node_channels:
396
                ch_str = ';'.join([self.decode(
397
                                  local_node_channels),
398
                                  ch_str])
399
            ch_str_comp = self.encode(ch_str)
400

    
401
            # Update (or add) the channels tag.
402
            # This is done through the RCP tags call
403
            resp = self.set_tag({self.tag_name: ch_str_comp})
404

    
405
            if not resp:
406
                return
407

    
408
            if not resp[0].is_success:
409
                sys.stderr.write("Serf tags command failed\n")
410
                sys.stderr.write("%s" % (resp[0].error,))
411

    
412
        else:
413
            sys.stderr.write("Serf members command failed\n")
414
            sys.stderr.write("%s" % (resp[0].error,))
415
            return
416

    
417
    def __str__(self):
418
        ret_str = "PSNG Serf RPC client: "
419
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
420
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
421
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
422
        return ret_str
423

    
424

    
425
def psng_serf_client_init():
426
    parser = argparse.ArgumentParser()
427

    
428
    # Optionals
429
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
430
                        help="PeerStreamer Next-Generation source tag name",
431
                        dest="tagname")
432

    
433
    # Mandatory for all modes
434
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
435
                        help="IP address of the Serf RPC server",
436
                        dest="rpcaddress",
437
                        action=argparse_actions.ProperIpFormatAction)
438
    parser.add_argument("-p", "--rpcport", type=int, required=True,
439
                        help="TCP port of the Serf RPC server",
440
                        dest="rpcport", choices=range(0, 65536),
441
                        metavar="[0-65535]")
442

    
443
    subparsers = parser.add_subparsers(dest="command")
444
    # Set PeerStreamer Next-Generation source tag
445
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
446
                                       "PeerStreamer Next-Generation "
447
                                       "source tag (Call RPC tags --set). "
448
                                       "If this node has already a "
449
                                       "PeerStreamer Next-Generation source "
450
                                       "tag associated, then the new channel "
451
                                       "is appended to the existing ones.")
452
    parser_set.add_argument("caddr", type=str,
453
                            help="Source channel IP address",
454
                            action=argparse_actions.ProperIpFormatAction)
455
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
456
                            help="Source channel port",
457
                            metavar="[0-65535]")
458
    parser_set.add_argument("cname", type=str,
459
                            help="Source channel name")
460
    parser_set.add_argument("ctxt", type=str,
461
                            help="Source channel additional parameters")
462

    
463
    # Delete PeerStreamer Next-Generation source tag
464
    parser_del = subparsers.add_parser("del", help="Delete a channel "
465
                                       "identified by a source address, a "
466
                                       "source port and a name. If the "
467
                                       "resulting PeerStreamer "
468
                                       "Next-Generation source tag is empty, "
469
                                       "then it will be deleted by calling "
470
                                       "the RPC procedure tags --delete.")
471
    parser_del.add_argument("caddr", type=str,
472
                            help="Source channel IP address",
473
                            action=argparse_actions.ProperIpFormatAction)
474
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
475
                            help="Source channel port",
476
                            metavar="[0-65535]")
477
    # parser_del.add_argument("cname", type=str,
478
    #                         help="Source channel name")
479

    
480
    parser_db = subparsers.add_parser("bg",
481
                                      help="Run in background and keep the "
482
                                      "database file updated by listening to "
483
                                      "member-update Serf events.")
484
    parser_db.add_argument("dbfile", type=str,
485
                           help="Channels database file")
486

    
487
    try:
488
        args = parser.parse_args()
489

    
490
        tag_name = args.tagname
491
        rpc_address = args.rpcaddress
492
        rpc_port = args.rpcport
493

    
494
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
495
        print(serf_client)
496

    
497
        command = args.command
498
        if command == "bg":
499
            ch_dbfile = args.dbfile
500
            serf_client.listen_for_member_update_events(ch_dbfile)
501
        elif command == "set":
502
            ch_addr = args.caddr
503
            ch_port = args.cport
504
            ch_name = args.cname
505
            ch_txt = args.ctxt
506
            serf_client.set_new_channel(ch_addr, ch_port, ch_name, ch_txt)
507
        elif command == "del":
508
            ch_addr = args.caddr
509
            ch_port = args.cport
510
            serf_client.delete_channel(ch_addr, ch_port)
511
        else:
512
            print "Unknown mode"
513
            return -1
514

    
515
    except argparse_actions.InvalidIp as e:
516
        print "IP address is invalid: {0}".format(e.ip)
517

    
518

    
519
if __name__ == "__main__":
520
    psng_serf_client_init()