Statistics
| Branch: | Tag: | Revision:

psng-pyserf / psng-pyserf.py @ ff14495b

History | View | Annotate | Download (17 KB)

1
import argparse
2
import argparse_actions
3
import serf
4
import sys
5
import base64
6
import bz2
7
from pyroute2 import IPRoute
8
import portalocker
9

    
10

    
11
class PsngSerfClient:
12

    
13
    def __init__(self, tag_name, rpc_address, rpc_port):
14
        self.tag_name = tag_name
15
        self.rpc_address = rpc_address
16
        self.rpc_port = rpc_port
17
        self.ch_dbfile = None
18
        self.client = None
19
        # Remember all tags parsed the last time
20
        self.last_channels_tags_list = []
21

    
22
    def encode_and_compress(self, s):
23
        ret_data = base64.b64encode(s)
24
        ret_data = bz2.compress(ret_data)
25

    
26
        return ret_data
27

    
28
    def decompress_and_decode(self, data):
29
        ret_str = bz2.decompress(data)
30
        ret_str = base64.b64decode(ret_str)
31

    
32
        return ret_str
33

    
34
    def encode(self, s):
35
        ret_data = base64.b64encode(s)
36

    
37
        return ret_data
38

    
39
    def decode(self, data):
40
        ret_str = base64.b64decode(data)
41

    
42
        return ret_str
43

    
44
    def get_local_ips(self):
45
        ip = IPRoute()
46
        local_if = ip.get_addr()
47
        local_ips = []
48
        for interf in local_if:
49
            local_ips.append(interf.get_attr('IFA_ADDRESS'))
50

    
51
        return local_ips
52

    
53
    def get_members(self):
54
        # Retrieve informations from all the serf memebrs.
55
        # This is done through the RPC members call.
56
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
57
        client.connect()
58
        client.members()
59
        resp = client.request(timeout=5)
60
        client.disconnect()
61

    
62
        return resp
63

    
64
    def set_tag(self, tag_dict):
65
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
66
        client.connect()
67
        client.tags(Tags=tag_dict)
68
        resp = client.request()
69
        client.disconnect()
70

    
71
        return resp
72

    
73
    def del_tag(self, tag_names_list):
74
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
75
        client.connect()
76
        client.tags(DeleteTags=tag_names_list)
77
        resp = client.request()
78
        client.disconnect()
79

    
80
        return resp
81

    
82
    def is_local_member(self, member, local_ips):
83
        if member["Addr"]:
84
            m_addr = ".".join([str(x) for x in member["Addr"]])
85
            if m_addr in local_ips:
86
                return True
87
            else:
88
                return False
89
        else:
90
            print "Warning: found a memeber without address\n"
91
            print member
92
            return False
93

    
94
    def write_db_file(self, file_name, channels_list):
95

    
96
        file_hdr = "# channel_name,source_addr,source_port,channel_params"
97

    
98
        db_file = open(file_name, 'w')
99
        portalocker.lock(db_file, portalocker.LOCK_EX)
100
        db_file.write("%s\n" % (file_hdr,))
101
        for c in channels_list:
102
            print "Add channel: %s" % (c,)
103
            db_file.write("%s\n" % (c,))
104

    
105
        db_file.close()
106

    
107
    def member_update_event_callback(self, resp):
108

    
109
        if resp.is_success:
110
            print "Received event: member-update"
111
            resp_body = resp.body
112
            members = resp_body["Members"]
113

    
114
            for m in members:
115
                # For now we consider only "alive" members
116
                if m["Status"] == "alive":
117
                    if m["Tags"]:
118
                        if m["Tags"].get(self.tag_name):
119
                            c_tag = m["Tags"].get(self.tag_name)
120
                            if c_tag not in self.last_channels_tags_list:
121
                                self.update_db_from_members()
122
                                return 0
123
        else:
124
            sys.stderr.write("Serf streamed event failed\n")
125
            sys.stderr.write("%s" % (resp.error,))
126

    
127
        return 0
128

    
129
    def listen_for_member_update_events(self, ch_dbfile):
130
        print "Database file: %s" % (ch_dbfile,)
131

    
132
        # Write the db file based on the current members channels tags
133
        self.client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port),
134
                                  auto_reconnect=True)
135
        self.client.connect()
136

    
137
        self.ch_dbfile = ch_dbfile
138
        self.update_db_from_members()
139

    
140
        try:
141
            # Register callback for memebr update events
142
            # Todo: handle sigint
143
            self.client.stream(Type="*").add_callback(
144
                               self.member_update_event_callback).request(
145
                               watch=True)
146
        except KeyboardInterrupt:
147
            print "Disconnection from RPC deamon"
148
            self.client.disconnect()
149

    
150
    def update_db_from_members(self):
151
        # WARNING: This method assumes the connection towards the RPC deamon
152
        # is already open and the client seved in self.client
153

    
154
        if not self.client:
155
            return
156

    
157
        # Retrieve serf members
158
        self.client.members()
159
        resp = self.client.request(timeout=5)
160

    
161
        if resp[0].is_success:
162
            resp_body = resp[0].body
163
            members = resp_body["Members"]
164

    
165
            # Retrieve channel tags
166
            self.last_channels_tags_list = []
167
            channel_tags_list = []
168

    
169
            for m in members:
170
                # For now we consider only "alive" members
171
                if m["Status"] == "alive":
172
                    if m["Tags"]:
173
                        if m["Tags"].get(self.tag_name):
174
                            c_tag = m["Tags"].get(self.tag_name)
175
                            channel_tags_list.append(c_tag)
176
                            self.last_channels_tags_list.append(c_tag)
177

    
178
            # Build channels list
179
            channels_list = []
180

    
181
            for t_comp in channel_tags_list:
182
                # Decode the channel tag
183
                t = self.decode(t_comp)
184

    
185
                # Each member can have more than one channel.
186
                # Channels are separated by the ";" character.
187
                channels_list += t.split(";")
188

    
189
            # Write database file
190
            self.write_db_file(self.ch_dbfile, channels_list)
191

    
192
        else:
193
            sys.stderr.write("Serf members command failed\n")
194
            sys.stderr.write("%s" % (resp[0].error,))
195
            return
196

    
197
    def delete_channel(self, ch_addr, ch_port):
198
        resp = self.get_members()
199

    
200
        if resp[0].is_success:
201
            resp_body = resp[0].body
202
            members = resp_body["Members"]
203

    
204
            # Used to save the channels tag of the local memebr
205
            local_node_channels = None
206
            channels_tag_exist = False
207
            new_channels_string = ""
208
            delete_channel = False
209

    
210
            # find all local IP addresses and save them in local_ips
211
            local_ips = self.get_local_ips()
212

    
213
            for m in members:
214
                # For now we consider only "alive" members
215
                if m["Status"] == "alive":
216
                    if self.is_local_member(m, local_ips):
217
                        if m["Tags"]:
218
                            if m["Tags"].get(self.tag_name):
219
                                channels_tag_exist = True
220
                            local_node_channels = m["Tags"].get(self.tag_name)
221

    
222
            if local_node_channels:
223
                # Decompress and decode the local channel and compare to the
224
                # channel we want to delete
225
                # The channel are compared only considering the
226
                # address and the port
227
                channels = self.decode(local_node_channels)
228

    
229
                # Each member can have more than one channel.
230
                # Channels are separated by the ";" character.
231
                channels_list = channels.split(";")
232

    
233
                for c in channels_list:
234
                    [_, caddr, cport, _] = c.split(",")
235

    
236
                    if (caddr != ch_addr or int(cport) != int(ch_port)):
237
                        # This is not the channel we want to delete.
238
                        # Add it to the new channels string
239
                        if new_channels_string:
240
                            new_channels_string += ";" + c
241
                        else:
242
                            new_channels_string = c
243
                    else:
244
                        delete_channel = True
245
                        print "Delete channel: %s" % (c,)
246

    
247
            if new_channels_string and delete_channel:
248
                # If new_channels_string is not empty we just need to update
249
                # the channels tag through the RPC tags -set call.
250

    
251
                print "Update channels: %s" % (new_channels_string,)
252

    
253
                # Encode the string
254
                ch_str_comp = self.encode(new_channels_string)
255

    
256
                # Update (or add) the channels tag.
257
                # This is done through the RCP tags call
258

    
259
                resp = self.set_tag({self.tag_name: ch_str_comp})
260

    
261
                if not resp[0].is_success:
262
                    sys.stderr.write("Serf tags set command failed\n")
263
                    sys.stderr.write("%s" % (resp[0].error,))
264

    
265
            elif not new_channels_string and channels_tag_exist:
266
                # If new_channels_string is empty but channels_tag_exist
267
                # is True we can delete che cahnnels tag through the RPC
268
                # tags -delete call
269
                print "Delete tag: %s" % (self.tag_name,)
270

    
271
                resp = self.del_tag((self.tag_name,))
272

    
273
                if not resp[0].is_success:
274
                    sys.stderr.write("Serf tags delete command failed\n")
275
                    sys.stderr.write("%s" % (resp[0].error,))
276

    
277
        else:
278
            sys.stderr.write("Serf members command failed\n")
279
            sys.stderr.write("%s" % (resp[0].error,))
280
            return
281

    
282
    def set_new_channel(self, ch_addr, ch_port, ch_name, ch_txt):
283
        # Build new channel string
284
        ch_str = '%s,%s,%d,%s' % (ch_name, ch_addr, ch_port, ch_txt)
285
        print "Add channel: %s\n" % (ch_str,)
286

    
287
        # Retrieve informations from all the serf memebrs.
288
        resp = self.get_members()
289

    
290
        if resp[0].is_success:
291
            resp_body = resp[0].body
292
            members = resp_body["Members"]
293

    
294
            # One encoded channels string for each "alive" member
295
            nodes_channels_list = []
296
            # Used to save the channels tag of the local memebr
297
            local_node_channels = None
298

    
299
            # find all local IP addresses and save them in local_ips
300
            local_ips = self.get_local_ips()
301

    
302
            for m in members:
303
                # For now we consider only "alive" members
304
                if m["Status"] == "alive":
305

    
306
                    # Check if this is the local member
307
                    local_member = self.is_local_member(m, local_ips)
308

    
309
                    # Save the channels tag
310
                    if m["Tags"]:
311
                        node_channels = m["Tags"].get(self.tag_name)
312

    
313
                        if node_channels:
314
                            nodes_channels_list.append(node_channels)
315
                            if local_member:
316
                                local_node_channels = node_channels
317

    
318
            # Don't add the new channel if it already exists
319
            for channels_comp in nodes_channels_list:
320
                # Dont' consider empty strings
321
                if not channels_comp:
322
                    continue
323

    
324
                # Decompress and decode each channel and compare to the new
325
                # channel we want to set
326
                # The channel are compared only considering the
327
                # address and the port
328
                channels = self.decode(channels_comp)
329

    
330
                # Each member can have more than one channel.
331
                # Channels are separated by the ";" character.
332
                channels_list = channels.split(";")
333

    
334
                for c in channels_list:
335
                    [_, caddr, cport, _] = c.split(",")
336

    
337
                    if (caddr == ch_addr and int(cport) == int(ch_port)):
338
                        print "Channel already exists"
339
                        return
340

    
341
            # If we arrive here this means that we are trying to add a new
342
            # channel.
343
            # Encode and compress data
344
            if local_node_channels:
345
                ch_str = ';'.join([self.decode(
346
                                  local_node_channels),
347
                                  ch_str])
348
            ch_str_comp = self.encode(ch_str)
349

    
350
            # Update (or add) the channels tag.
351
            # This is done through the RCP tags call
352
            resp = self.set_tag({self.tag_name: ch_str_comp})
353

    
354
            if not resp[0].is_success:
355
                sys.stderr.write("Serf tags command failed\n")
356
                sys.stderr.write("%s" % (resp[0].error,))
357

    
358
        else:
359
            sys.stderr.write("Serf members command failed\n")
360
            sys.stderr.write("%s" % (resp[0].error,))
361
            return
362

    
363
    def __str__(self):
364
        ret_str = "PSNG Serf RPC client: "
365
        ret_str += "{channels_tag_name: %s, " % (self.tag_name,)
366
        ret_str += "rpc_address: %s, " % (self.rpc_address,)
367
        ret_str += "rpc_port %d}\n" % (self.rpc_port,)
368
        return ret_str
369

    
370

    
371
def psng_serf_client_init():
372
    parser = argparse.ArgumentParser()
373

    
374
    # Optionals
375
    parser.add_argument("-t", "--tagname", type=str, default="psngc",
376
                        help="PeerStreamer Next-Generation source tag name",
377
                        dest="tagname")
378

    
379
    # Mandatory for all modes
380
    parser.add_argument("-a", "--rpcaddress", type=str, required=True,
381
                        help="IP address of the Serf RPC server",
382
                        dest="rpcaddress",
383
                        action=argparse_actions.ProperIpFormatAction)
384
    parser.add_argument("-p", "--rpcport", type=int, required=True,
385
                        help="TCP port of the Serf RPC server",
386
                        dest="rpcport", choices=range(0, 65536),
387
                        metavar="[0-65535]")
388

    
389
    subparsers = parser.add_subparsers(dest="command")
390
    # Set PeerStreamer Next-Generation source tag
391
    parser_set = subparsers.add_parser("set", help="Set and propagate the "
392
                                       "PeerStreamer Next-Generation "
393
                                       "source tag (Call RPC tags --set). "
394
                                       "If this node has already a "
395
                                       "PeerStreamer Next-Generation source "
396
                                       "tag associated, then the new channel "
397
                                       "is appended to the existing ones.")
398
    parser_set.add_argument("caddr", type=str,
399
                            help="Source channel IP address",
400
                            action=argparse_actions.ProperIpFormatAction)
401
    parser_set.add_argument("cport", type=int, choices=range(0, 65536),
402
                            help="Source channel port",
403
                            metavar="[0-65535]")
404
    parser_set.add_argument("cname", type=str,
405
                            help="Source channel name")
406
    parser_set.add_argument("ctxt", type=str,
407
                            help="Source channel additional parameters")
408

    
409
    # Delete PeerStreamer Next-Generation source tag
410
    parser_del = subparsers.add_parser("del", help="Delete a channel "
411
                                       "identified by a source address, a "
412
                                       "source port and a name. If the "
413
                                       "resulting PeerStreamer "
414
                                       "Next-Generation source tag is empty, "
415
                                       "then it will be deleted by calling "
416
                                       "the RPC procedure tags --delete.")
417
    parser_del.add_argument("caddr", type=str,
418
                            help="Source channel IP address",
419
                            action=argparse_actions.ProperIpFormatAction)
420
    parser_del.add_argument("cport", type=int, choices=range(0, 65536),
421
                            help="Source channel port",
422
                            metavar="[0-65535]")
423
    # parser_del.add_argument("cname", type=str,
424
    #                         help="Source channel name")
425

    
426
    parser_db = subparsers.add_parser("bg",
427
                                      help="Run in background and keep the "
428
                                      "database file updated by listening to "
429
                                      "member-update Serf events.")
430
    parser_db.add_argument("dbfile", type=str,
431
                           help="Channels database file")
432

    
433
    try:
434
        args = parser.parse_args()
435

    
436
        tag_name = args.tagname
437
        rpc_address = args.rpcaddress
438
        rpc_port = args.rpcport
439

    
440
        serf_client = PsngSerfClient(tag_name, rpc_address, rpc_port)
441
        print(serf_client)
442

    
443
        command = args.command
444
        if command == "bg":
445
            ch_dbfile = args.dbfile
446
            serf_client.listen_for_member_update_events(ch_dbfile)
447
        elif command == "set":
448
            ch_addr = args.caddr
449
            ch_port = args.cport
450
            ch_name = args.cname
451
            ch_txt = args.ctxt
452
            serf_client.set_new_channel(ch_addr, ch_port, ch_name, ch_txt)
453
        elif command == "del":
454
            ch_addr = args.caddr
455
            ch_port = args.cport
456
            serf_client.delete_channel(ch_addr, ch_port)
457
        else:
458
            print "Unknown mode"
459
            return -1
460

    
461
    except argparse_actions.InvalidIp as e:
462
        print "IP address is invalid: {0}".format(e.ip)
463

    
464

    
465
if __name__ == "__main__":
466
    psng_serf_client_init()