Statistics
| Branch: | Revision:

grapes / src / TopologyManager / tman.c @ 176b8de8

History | View | Annotate | Download (9.16 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "../Cache/blist_cache.h"
16
#include "../Cache/blist_proto.h"
17
#include "../Cache/proto.h"
18
#include "grapes_msg_types.h"
19
#include "grapes_config.h"
20
#include "topman_iface.h"
21

    
22
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 20 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_STD_PERIOD 5
26
#define TMAN_INIT_PERIOD 1000000
27
#define TMAN_RESTART_COUNT 20;
28

    
29
static  int max_preferred_peers;
30
static  int max_gossiping_peers;
31
static        int restart_countdown = TMAN_RESTART_COUNT;
32

    
33
static uint64_t currtime;
34
static int cache_size;
35
static struct peer_cache *local_cache;
36
static int default_period;
37
static int init_cache_size;
38
static int period = TMAN_INIT_PERIOD;
39
static int active;
40
static int do_resize;
41
static void *mymeta;
42
static int mymeta_size;
43
static struct nodeID *restart_peer;
44
static uint8_t *zero;
45

    
46
static rankingFunction userRankFunct;
47

    
48
static int tmanRankFunct (const void *target, const void *p1, const void *p2) {
49

    
50
        if (memcmp(target,zero,mymeta_size) == 0 || (memcmp(p1,zero,mymeta_size) == 0 && memcmp(p2,zero,mymeta_size) == 0))
51
                return 0;
52
        if (memcmp(p1,zero,mymeta_size) == 0)
53
                return 2;
54
        if (memcmp(p2,zero,mymeta_size) == 0)
55
                return 1;
56
        return userRankFunct(target, p1, p2);
57
}
58

    
59
static uint64_t gettime(void)
60
{
61
        struct timeval tv;
62

    
63
        gettimeofday(&tv, NULL);
64

    
65
        return tv.tv_usec + tv.tv_sec * 1000000ull;
66
}
67

    
68
static int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, rankingFunction rfun, const char *config)
69
{
70
        struct tag *cfg_tags;
71
        int res;
72

    
73
        cfg_tags = grapes_config_parse(config);
74
        res = grapes_config_value_int(cfg_tags, "cache_size", &init_cache_size);
75
        if (!res) {
76
                init_cache_size = TMAN_INIT_PEERS;
77
        }
78
        cache_size = init_cache_size;
79
        res = grapes_config_value_int(cfg_tags, "max_preferred_peers", &max_preferred_peers);
80
        if (!res) {
81
                max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
82
        }
83
        res = grapes_config_value_int(cfg_tags, "max_gossiping_peers", &max_gossiping_peers);
84
        if (!res) {
85
                max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
86
        }
87
        res = grapes_config_value_int(cfg_tags, "period", &default_period);
88
        if (!res) {
89
                default_period = TMAN_STD_PERIOD;
90
        }
91
        default_period *= 1000000;
92

    
93
        userRankFunct = rfun;
94
        blist_proto_init(myID, metadata, metadata_size);
95
        mymeta = metadata;
96
        mymeta_size = metadata_size;
97
        zero = calloc(mymeta_size,1);
98

    
99
        local_cache = blist_cache_init(cache_size, metadata_size, 0);
100
        if (local_cache == NULL) {
101
                return -1;
102
        }
103
        active = -1;
104
        currtime = gettime();
105

    
106
        return 0;
107
}
108

    
109
static int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
110
{
111
        int metadata_size;
112
        const uint8_t *mdata;
113
        int i;
114

    
115
        mdata = blist_get_metadata(local_cache, &metadata_size);
116
        for (i=0; blist_nodeid(local_cache, i) && (i < n); i++) {
117
                peers[i] = blist_nodeid(local_cache,i);
118
                if (metadata_size)
119
                        memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
120
        }
121

    
122
        return i;
123
}
124

    
125
static int tmanGetNeighbourhoodSize(void)
126
{
127
        int i;
128

    
129
        for (i = 0; blist_nodeid(local_cache, i); i++);
130

    
131
        return i;
132
}
133

    
134
static int time_to_send(void)
135
{
136
        if (gettime() - currtime > period) {
137
                currtime += period;
138
                return 1;
139
        }
140

    
141
        return 0;
142
}
143

    
144
static int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
145
{
146
        if (!metadata_size) {
147
                blist_tman_query_peer(local_cache, neighbour, max_gossiping_peers);
148
                return -1;
149
        }
150
        if (blist_cache_add_ranked(local_cache, neighbour, metadata, metadata_size, tmanRankFunct, mymeta) < 0) {
151
                return -1;
152
        }
153

    
154
        return 1;
155
}
156

    
157

    
158
// not self metadata, but neighbors'.
159
static const void *tmanGetMetadata(int *metadata_size)
160
{
161
        return blist_get_metadata(local_cache, metadata_size);
162
}
163

    
164

    
165
static int tmanChangeMetadata(void *metadata, int metadata_size)
166
{
167
        struct peer_cache *new = NULL;
168

    
169
        if (blist_proto_metadata_update(metadata, metadata_size) <= 0) {
170
                return -1;
171
        }
172
        mymeta = metadata;
173

    
174
        if (active >= 0) {
175
                new = blist_cache_rank(local_cache, tmanRankFunct, NULL, mymeta);
176
                if (new) {
177
                        blist_cache_free(local_cache);
178
                        local_cache = new;
179
                }
180
        }
181

    
182
        return 1;
183
}
184

    
185

    
186
static int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
187
{
188
        int msize,s;
189
        const uint8_t *mdata;
190
        struct peer_cache *new = NULL, *temp;
191

    
192
        if (len && active >= 0) {
193
                const struct topo_header *h = (const struct topo_header *)buff;
194
                struct peer_cache *remote_cache;
195

    
196
            if (h->protocol != MSG_TYPE_TMAN) {
197
              fprintf(stderr, "TMAN: Wrong protocol!\n");
198
              return -1;
199
            }
200

    
201
                remote_cache = blist_entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
202
                mdata = blist_get_metadata(remote_cache,&msize);
203
                blist_get_metadata(local_cache,&s);
204

    
205
                if (msize != s) {
206
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
207
                                s, msize);
208
                        return 1;
209
                }
210

    
211
                if (h->type == TMAN_QUERY) {
212
                        new = blist_cache_rank(local_cache, tmanRankFunct, blist_nodeid(remote_cache, 0), blist_get_metadata(remote_cache, &msize));
213
                        if (new) {
214
                                blist_tman_reply(remote_cache, new, max_gossiping_peers);
215
                                blist_cache_free(new);
216
                                new = NULL;
217
                                // TODO: put sender in tabu list (check list size, etc.), if any...
218
                        }
219
                }
220

    
221
                if (restart_peer && nodeid_equal(restart_peer, blist_nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
222
                        new = blist_cache_rank(remote_cache,tmanRankFunct,NULL,mymeta);
223
                        if (new) {
224
                                cache_size = init_cache_size;
225
                                blist_cache_resize(new,cache_size);
226
                                period = default_period;
227
                                fprintf(stderr,"RESTARTING TMAN!!!\n");
228
                        }
229
                        nodeid_free(restart_peer);
230
                        restart_peer = NULL;
231
                        active = 1;
232
                }
233
                else {        // normal phase
234
                        temp = blist_cache_union(local_cache,remote_cache,&s);
235
                        if (temp) {
236
                                new = blist_cache_rank(temp,tmanRankFunct,NULL,mymeta);
237
                                cache_size = ((s/2)*2.5) > cache_size ? ((s/2)*2.5) : cache_size;
238
                                blist_cache_resize(new,cache_size);
239
                                blist_cache_free(temp);
240
                        }
241
                        if (restart_peer) {
242
                                restart_countdown--;
243
                                if (restart_countdown <= 0) {
244
                                        nodeid_free(restart_peer);
245
                                        restart_peer = NULL;
246
                                }
247
                        }
248
                }
249

    
250
                blist_cache_free(remote_cache);
251
                if (new!=NULL) {
252
                  blist_cache_free(local_cache);
253
                  local_cache = new;
254
                  do_resize = 0;
255
                }
256
        }
257

    
258
  if (time_to_send()) {
259
        uint8_t *meta;
260
        struct nodeID *chosen;
261

    
262
        blist_cache_update(local_cache);
263

    
264
        if (active > 0 && tmanGetNeighbourhoodSize() < size && !restart_countdown) {
265
                fprintf(stderr, "TMAN: Too few peers in cache! Triggering a restart...\n");
266
                active = 0;
267
                period = TMAN_INIT_PERIOD;
268
        }
269

    
270
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
271
                struct peer_cache *ncache;
272
                int j,nsize;
273

    
274
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
275
                if (size) ncache = blist_cache_init(nsize, metadata_size, 0);
276
                else {return 1;}
277
                for (j=0;j<size;j++)
278
                        blist_cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, tmanRankFunct, mymeta);
279
                if (blist_nodeid(ncache, 0)) {
280
                        restart_peer = nodeid_dup(blist_nodeid(ncache, 0));
281
                        restart_countdown = TMAN_RESTART_COUNT;
282
                        mdata = blist_get_metadata(ncache, &msize);
283
                        new = blist_cache_rank(active < 0 ? ncache : local_cache, tmanRankFunct, restart_peer, mdata);
284
                        if (new) {
285
                                blist_tman_query_peer(new, restart_peer, max_gossiping_peers);
286
                                blist_cache_free(new);
287
                        }
288
                if (active < 0) { // bootstrap
289
                        fprintf(stderr,"BOOTSTRAPPING TMAN!!!\n");
290
                        blist_cache_free(local_cache);
291
                        local_cache = ncache;
292
                        cache_size = nsize;
293
                        active = 0;
294
                } else { // restart
295
                        blist_cache_free(ncache);
296
                }
297
                }
298
                else {
299
                        blist_cache_free(ncache);
300
                        fprintf(stderr, "TMAN: No peer available from peer sampler!\n");
301
                        return 1;
302
                }
303
        }
304
        else { // normal phase
305
        chosen = blist_rand_peer(local_cache, (void **)&meta, max_preferred_peers);
306
        new = blist_cache_rank(local_cache, tmanRankFunct, chosen, meta);
307
        if (new==NULL) {
308
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
309
                return 1;
310
        }
311
        blist_tman_query_peer(new, chosen, max_gossiping_peers);
312
        blist_cache_free(new);
313
        }
314
  }
315

    
316
  return 0;
317
}
318

    
319

    
320
// limit : at most it doubles the current cache size...
321
static int tmanGrowNeighbourhood(int n)
322
{
323
        if (n<=0 || do_resize)
324
                return -1;
325
        n = n>cache_size?cache_size:n;
326
        cache_size += n;
327
        do_resize = 1;
328
        return cache_size;
329
}
330

    
331

    
332
static int tmanShrinkNeighbourhood(int n)
333
{
334
        if (n<=0 || n>=cache_size || do_resize)
335
                return -1;
336
        cache_size -= n;
337
        do_resize = 1;
338
        return cache_size;
339
}
340

    
341

    
342
static int tmanRemoveNeighbour(struct nodeID *neighbour)
343
{
344
        return 0;
345
}
346

    
347

    
348
struct topman_iface tman = {
349
        .init = tmanInit,
350
        .changeMetadata = tmanChangeMetadata,
351
        .addNeighbour = tmanAddNeighbour,
352
        .parseData = tmanParseData,
353
        .givePeers = tmanGivePeers,
354
        .getMetadata = tmanGetMetadata,
355
        .growNeighbourhood = tmanGrowNeighbourhood,
356
        .shrinkNeighbourhood = tmanShrinkNeighbourhood,
357
        .removeNeighbour = tmanRemoveNeighbour,
358
        .getNeighbourhoodSize = tmanGetNeighbourhoodSize,
359
};