Statistics
| Branch: | Revision:

grapes / src / TopologyManager / tman.c @ 6fd9945c

History | View | Annotate | Download (8.46 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "blist_cache.h"
16
#include "blist_proto.h"
17
#include "proto.h"
18
#include "grapes_msg_types.h"
19
#include "topman_iface.h"
20

    
21
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
22
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
23
#define TMAN_MAX_GOSSIPING_PEERS 20 // # size of the view to be sent to receiver peer (should be <= than the previous)
24
#define TMAN_STD_PERIOD 1000000
25
#define TMAN_INIT_PERIOD 1000000
26
#define TMAN_RESTART_COUNT 20;
27

    
28
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
29
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
30
static        int restart_countdown = TMAN_RESTART_COUNT;
31

    
32
static uint64_t currtime;
33
static int cache_size = TMAN_INIT_PEERS;
34
static struct peer_cache *local_cache;
35
static int period = TMAN_INIT_PERIOD;
36
static int active;
37
static int do_resize;
38
static void *mymeta;
39
static int mymeta_size;
40
static struct nodeID *restart_peer;
41
static uint8_t *zero;
42

    
43
static rankingFunction userRankFunct;
44

    
45
static int tmanRankFunct (const void *target, const void *p1, const void *p2) {
46

    
47
        if (memcmp(target,zero,mymeta_size) == 0 || (memcmp(p1,zero,mymeta_size) == 0 && memcmp(p2,zero,mymeta_size) == 0))
48
                return 0;
49
        if (memcmp(p1,zero,mymeta_size) == 0)
50
                return 2;
51
        if (memcmp(p2,zero,mymeta_size) == 0)
52
                return 1;
53
        return userRankFunct(target, p1, p2);
54
}
55

    
56
static uint64_t gettime(void)
57
{
58
        struct timeval tv;
59

    
60
        gettimeofday(&tv, NULL);
61

    
62
        return tv.tv_usec + tv.tv_sec * 1000000ull;
63
}
64

    
65
static int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, rankingFunction rfun, const char *config)
66
{
67
        userRankFunct = rfun;
68
        blist_proto_init(myID, metadata, metadata_size);
69
        mymeta = metadata;
70
        mymeta_size = metadata_size;
71
        zero = calloc(mymeta_size,1);
72

    
73
        local_cache = blist_cache_init(cache_size, metadata_size, 0);
74
        if (local_cache == NULL) {
75
                return -1;
76
        }
77
        active = -1;
78
        currtime = gettime();
79

    
80
        return 0;
81
}
82

    
83
static int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
84
{
85
        int metadata_size;
86
        const uint8_t *mdata;
87
        int i;
88

    
89
        mdata = blist_get_metadata(local_cache, &metadata_size);
90
        for (i=0; blist_nodeid(local_cache, i) && (i < n); i++) {
91
                peers[i] = blist_nodeid(local_cache,i);
92
                if (metadata_size)
93
                        memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
94
        }
95

    
96
        return i;
97
}
98

    
99
static int tmanGetNeighbourhoodSize(void)
100
{
101
        int i;
102

    
103
        for (i = 0; blist_nodeid(local_cache, i); i++);
104

    
105
        return i;
106
}
107

    
108
static int time_to_send(void)
109
{
110
        if (gettime() - currtime > period) {
111
                currtime += period;
112
                return 1;
113
        }
114

    
115
        return 0;
116
}
117

    
118
static int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
119
{
120
        if (!metadata_size) {
121
                blist_tman_query_peer(local_cache, neighbour, max_gossiping_peers);
122
                return -1;
123
        }
124
        if (blist_cache_add_ranked(local_cache, neighbour, metadata, metadata_size, tmanRankFunct, mymeta) < 0) {
125
                return -1;
126
        }
127

    
128
        return 1;
129
}
130

    
131

    
132
// not self metadata, but neighbors'.
133
static const void *tmanGetMetadata(int *metadata_size)
134
{
135
        return blist_get_metadata(local_cache, metadata_size);
136
}
137

    
138

    
139
static int tmanChangeMetadata(void *metadata, int metadata_size)
140
{
141
        struct peer_cache *new = NULL;
142

    
143
        if (blist_proto_metadata_update(metadata, metadata_size) <= 0) {
144
                return -1;
145
        }
146
        mymeta = metadata;
147

    
148
        if (active >= 0) {
149
                new = blist_cache_rank(local_cache, tmanRankFunct, NULL, mymeta);
150
                if (new) {
151
                        blist_cache_free(local_cache);
152
                        local_cache = new;
153
                }
154
        }
155

    
156
        return 1;
157
}
158

    
159

    
160
static int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
161
{
162
        int msize,s;
163
        const uint8_t *mdata;
164
        struct peer_cache *new = NULL, *temp;
165

    
166
        if (len && active >= 0) {
167
                const struct topo_header *h = (const struct topo_header *)buff;
168
                struct peer_cache *remote_cache;
169

    
170
            if (h->protocol != MSG_TYPE_TMAN) {
171
              fprintf(stderr, "TMAN: Wrong protocol!\n");
172
              return -1;
173
            }
174

    
175
                remote_cache = blist_entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
176
                mdata = blist_get_metadata(remote_cache,&msize);
177
                blist_get_metadata(local_cache,&s);
178

    
179
                if (msize != s) {
180
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
181
                                s, msize);
182
                        return 1;
183
                }
184

    
185
                if (h->type == TMAN_QUERY) {
186
                        new = blist_cache_rank(local_cache, tmanRankFunct, blist_nodeid(remote_cache, 0), blist_get_metadata(remote_cache, &msize));
187
                        if (new) {
188
                                blist_tman_reply(remote_cache, new, max_gossiping_peers);
189
                                blist_cache_free(new);
190
                                new = NULL;
191
                                // TODO: put sender in tabu list (check list size, etc.), if any...
192
                        }
193
                }
194

    
195
                if (restart_peer && nodeid_equal(restart_peer, blist_nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
196
                        new = blist_cache_rank(remote_cache,tmanRankFunct,NULL,mymeta);
197
                        if (new) {
198
                                cache_size = TMAN_INIT_PEERS;
199
                                blist_cache_resize(new,cache_size);
200
                                period = TMAN_STD_PERIOD;
201
                                fprintf(stderr,"RESTARTING TMAN!!!\n");
202
                        }
203
                        nodeid_free(restart_peer);
204
                        restart_peer = NULL;
205
                        active = 1;
206
                }
207
                else {        // normal phase
208
                        temp = blist_cache_union(local_cache,remote_cache,&s);
209
                        if (temp) {
210
                                new = blist_cache_rank(temp,tmanRankFunct,NULL,mymeta);
211
                                cache_size = ((s/2)*2.5) > cache_size ? ((s/2)*2.5) : cache_size;
212
                                blist_cache_resize(new,cache_size);
213
                                blist_cache_free(temp);
214
                        }
215
                        if (restart_peer) {
216
                                restart_countdown--;
217
                                if (restart_countdown <= 0) {
218
                                        nodeid_free(restart_peer);
219
                                        restart_peer = NULL;
220
                                }
221
                        }
222
                }
223

    
224
                blist_cache_free(remote_cache);
225
                if (new!=NULL) {
226
                  blist_cache_free(local_cache);
227
                  local_cache = new;
228
                  do_resize = 0;
229
                }
230
        }
231

    
232
  if (time_to_send()) {
233
        uint8_t *meta;
234
        struct nodeID *chosen;
235

    
236
        blist_cache_update(local_cache);
237

    
238
        if (active > 0 && tmanGetNeighbourhoodSize() < size && !restart_countdown) {
239
                fprintf(stderr, "TMAN: Too few peers in cache! Triggering a restart...\n");
240
                active = 0;
241
                period = TMAN_INIT_PERIOD;
242
        }
243

    
244
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
245
                struct peer_cache *ncache;
246
                int j,nsize;
247

    
248
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
249
                if (size) ncache = blist_cache_init(nsize, metadata_size, 0);
250
                else {return 1;}
251
                for (j=0;j<size;j++)
252
                        blist_cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, tmanRankFunct, mymeta);
253
                if (blist_nodeid(ncache, 0)) {
254
                        restart_peer = nodeid_dup(blist_nodeid(ncache, 0));
255
                        restart_countdown = TMAN_RESTART_COUNT;
256
                        mdata = blist_get_metadata(ncache, &msize);
257
                        new = blist_cache_rank(active < 0 ? ncache : local_cache, tmanRankFunct, restart_peer, mdata);
258
                        if (new) {
259
                                blist_tman_query_peer(new, restart_peer, max_gossiping_peers);
260
                                blist_cache_free(new);
261
                        }
262
                if (active < 0) { // bootstrap
263
                        fprintf(stderr,"BOOTSTRAPPING TMAN!!!\n");
264
                        blist_cache_free(local_cache);
265
                        local_cache = ncache;
266
                        cache_size = nsize;
267
                        active = 0;
268
                } else { // restart
269
                        blist_cache_free(ncache);
270
                }
271
                }
272
                else {
273
                        blist_cache_free(ncache);
274
                        fprintf(stderr, "TMAN: No peer available from peer sampler!\n");
275
                        return 1;
276
                }
277
        }
278
        else { // normal phase
279
        chosen = blist_rand_peer(local_cache, (void **)&meta, max_preferred_peers);
280
        new = blist_cache_rank(local_cache, tmanRankFunct, chosen, meta);
281
        if (new==NULL) {
282
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
283
                return 1;
284
        }
285
        blist_tman_query_peer(new, chosen, max_gossiping_peers);
286
        blist_cache_free(new);
287
        }
288
  }
289

    
290
  return 0;
291
}
292

    
293

    
294
// limit : at most it doubles the current cache size...
295
static int tmanGrowNeighbourhood(int n)
296
{
297
        if (n<=0 || do_resize)
298
                return -1;
299
        n = n>cache_size?cache_size:n;
300
        cache_size += n;
301
        do_resize = 1;
302
        return cache_size;
303
}
304

    
305

    
306
static int tmanShrinkNeighbourhood(int n)
307
{
308
        if (n<=0 || n>=cache_size || do_resize)
309
                return -1;
310
        cache_size -= n;
311
        do_resize = 1;
312
        return cache_size;
313
}
314

    
315

    
316
static int tmanRemoveNeighbour(struct nodeID *neighbour)
317
{
318
        return 0;
319
}
320

    
321

    
322
struct topman_iface tman = {
323
        .init = tmanInit,
324
        .changeMetadata = tmanChangeMetadata,
325
        .addNeighbour = tmanAddNeighbour,
326
        .parseData = tmanParseData,
327
        .givePeers = tmanGivePeers,
328
        .getMetadata = tmanGetMetadata,
329
        .growNeighbourhood = tmanGrowNeighbourhood,
330
        .shrinkNeighbourhood = tmanShrinkNeighbourhood,
331
        .removeNeighbour = tmanRemoveNeighbour,
332
        .getNeighbourhoodSize = tmanGetNeighbourhoodSize,
333
};