Revision 434d5e23

View differences:

psng-pyserf.py
6 6
import bz2
7 7
from pyroute2 import IPRoute
8 8
import portalocker
9
import time
9 10

  
10 11

  
11 12
class PsngSerfClient:
......
53 54
    def get_members(self):
54 55
        # Retrieve informations from all the serf memebrs.
55 56
        # This is done through the RPC members call.
56
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
57
        client.connect()
58
        client.members()
59
        resp = client.request(timeout=5)
60
        client.disconnect()
57

  
58
        resp = None
59

  
60
        try:
61
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
62
            client.connect()
63
            client.members()
64
            resp = client.request(timeout=5)
65
            client.disconnect()
66
        except serf._exceptions.ConnectionError:
67
            print "Connection error"
61 68

  
62 69
        return resp
63 70

  
64 71
    def set_tag(self, tag_dict):
65
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
66
        client.connect()
67
        client.tags(Tags=tag_dict)
68
        resp = client.request()
69
        client.disconnect()
72
        resp = None
73

  
74
        try:
75
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
76
            client.connect()
77
            client.tags(Tags=tag_dict)
78
            resp = client.request()
79
            client.disconnect()
80
        except serf._exceptions.ConnectionError:
81
            print "Connection error"
70 82

  
71 83
        return resp
72 84

  
73 85
    def del_tag(self, tag_names_list):
74
        client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
75
        client.connect()
76
        client.tags(DeleteTags=tag_names_list)
77
        resp = client.request()
78
        client.disconnect()
86
        resp = None
87

  
88
        try:
89
            client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port))
90
            client.connect()
91
            client.tags(DeleteTags=tag_names_list)
92
            resp = client.request()
93
            client.disconnect()
94
        except serf._exceptions.ConnectionError:
95
            print "Connection error"
79 96

  
80 97
        return resp
81 98

  
......
129 146
    def listen_for_member_update_events(self, ch_dbfile):
130 147
        print "Database file: %s" % (ch_dbfile,)
131 148

  
132
        # Write the db file based on the current members channels tags
133
        self.client = serf.Client("%s:%d" % (self.rpc_address, self.rpc_port),
134
                                  auto_reconnect=True)
135
        self.client.connect()
149
        while True:
136 150

  
137
        self.ch_dbfile = ch_dbfile
138
        self.update_db_from_members()
151
            # Write the db file based on the current members channels tags
152
            members_updated = False
153
            sleep_time = 5
139 154

  
140
        try:
141
            # Register callback for memebr update events
142
            # Todo: handle sigint
143
            self.client.stream(Type="*").add_callback(
144
                               self.member_update_event_callback).request(
145
                               watch=True)
146
        except KeyboardInterrupt:
147
            print "Disconnection from RPC deamon"
148
            self.client.disconnect()
155
            self.client = serf.Client("%s:%d" % (self.rpc_address,
156
                                      self.rpc_port),
157
                                      auto_reconnect=True)
158

  
159
            while not members_updated:
160
                try:
161
                    self.client.connect()
162

  
163
                    self.ch_dbfile = ch_dbfile
164
                    if self.update_db_from_members():
165
                        members_updated = True
166
                    else:
167
                        time.sleep(sleep_time)
168
                except serf._exceptions.ConnectionError:
169
                    print "Client connection error (sleep %d)" % (sleep_time,)
170
                    time.sleep(sleep_time)
171

  
172
            try:
173
                # Register callback for memebr update events
174
                # Todo: handle sigint
175
                self.client.stream(Type="member-update").add_callback(
176
                                   self.member_update_event_callback).request(
177
                                   timeout=120)
178
            except serf._exceptions.ConnectionError:
179
                print "Client connection error (sleep %d)" % (sleep_time,)
180
                time.sleep(sleep_time)
181
            except KeyboardInterrupt:
182
                print "Disconnection from RPC deamon"
183
                self.client.disconnect()
184
                return
149 185

  
150 186
    def update_db_from_members(self):
151 187
        # WARNING: This method assumes the connection towards the RPC deamon
152 188
        # is already open and the client seved in self.client
153 189

  
154 190
        if not self.client:
155
            return
191
            return False
156 192

  
157 193
        # Retrieve serf members
158 194
        self.client.members()
......
189 225
            # Write database file
190 226
            self.write_db_file(self.ch_dbfile, channels_list)
191 227

  
228
            return True
229

  
192 230
        else:
231
            self.write_db_file(self.ch_dbfile, [])
193 232
            sys.stderr.write("Serf members command failed\n")
194 233
            sys.stderr.write("%s" % (resp[0].error,))
195
            return
234
            return False
196 235

  
197 236
    def delete_channel(self, ch_addr, ch_port):
198 237
        resp = self.get_members()
199 238

  
239
        if not resp:
240
            return
241

  
200 242
        if resp[0].is_success:
201 243
            resp_body = resp[0].body
202 244
            members = resp_body["Members"]
......
258 300

  
259 301
                resp = self.set_tag({self.tag_name: ch_str_comp})
260 302

  
303
                if not resp:
304
                    return
305

  
261 306
                if not resp[0].is_success:
262 307
                    sys.stderr.write("Serf tags set command failed\n")
263 308
                    sys.stderr.write("%s" % (resp[0].error,))
......
270 315

  
271 316
                resp = self.del_tag((self.tag_name,))
272 317

  
318
                if not resp:
319
                    return
320

  
273 321
                if not resp[0].is_success:
274 322
                    sys.stderr.write("Serf tags delete command failed\n")
275 323
                    sys.stderr.write("%s" % (resp[0].error,))
......
287 335
        # Retrieve informations from all the serf memebrs.
288 336
        resp = self.get_members()
289 337

  
338
        if not resp:
339
            return
340

  
290 341
        if resp[0].is_success:
291 342
            resp_body = resp[0].body
292 343
            members = resp_body["Members"]
......
351 402
            # This is done through the RCP tags call
352 403
            resp = self.set_tag({self.tag_name: ch_str_comp})
353 404

  
405
            if not resp:
406
                return
407

  
354 408
            if not resp[0].is_success:
355 409
                sys.stderr.write("Serf tags command failed\n")
356 410
                sys.stderr.write("%s" % (resp[0].error,))

Also available in: Unified diff