Statistics
| Branch: | Revision:

grapes / src / TopologyManager / tman.c @ 8bf781e5

History | View | Annotate | Download (8.03 KB)

1 df98cd3a Luca Abeni
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13
14
#include "net_helper.h"
15 33ea04ee Marco Biazzini
#include "blist_cache.h"
16
#include "blist_proto.h"
17 df98cd3a Luca Abeni
#include "proto.h"
18 5941d7a1 Csaba Kiraly
#include "grapes_msg_types.h"
19 8a0aa150 Luca Abeni
#include "tman.h"
20 df98cd3a Luca Abeni
21 50fea9dc Marco Biazzini
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
22
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
23 a16f5e1d Marco Biazzini
#define TMAN_MAX_GOSSIPING_PEERS 20 // # size of the view to be sent to receiver peer (should be <= than the previous)
24 8171d18c Marco Biazzini
#define TMAN_STD_PERIOD 1000000
25 8a0aa150 Luca Abeni
#define TMAN_INIT_PERIOD 1000000
26 fff6ceed Marco Biazzini
#define TMAN_RESTART_COUNT 20;
27 df98cd3a Luca Abeni
28 4070e769 Marco Biazzini
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
29
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
30 fff6ceed Marco Biazzini
static        int restart_countdown = TMAN_RESTART_COUNT;
31 df98cd3a Luca Abeni
32
static uint64_t currtime;
33 8a0aa150 Luca Abeni
static int cache_size = TMAN_INIT_PEERS;
34 1d8e0f3d Luca Abeni
static struct peer_cache *local_cache;
35 8a0aa150 Luca Abeni
static int period = TMAN_INIT_PERIOD;
36 fff6ceed Marco Biazzini
static int active;
37 1d8e0f3d Luca Abeni
static int do_resize;
38 df98cd3a Luca Abeni
static void *mymeta;
39 f1d1a122 Marco Biazzini
static int mymeta_size;
40 1d8e0f3d Luca Abeni
static struct nodeID *restart_peer;
41 f1d1a122 Marco Biazzini
static uint8_t *zero;
42 df98cd3a Luca Abeni
43 f1d1a122 Marco Biazzini
static tmanRankingFunction userRankFunct;
44 8a0aa150 Luca Abeni
45 f1d1a122 Marco Biazzini
static int tmanRankFunct (const void *target, const void *p1, const void *p2) {
46
47
        if (memcmp(target,zero,mymeta_size) == 0 || (memcmp(p1,zero,mymeta_size) == 0 && memcmp(p2,zero,mymeta_size) == 0))
48
                return 0;
49
        if (memcmp(p1,zero,mymeta_size) == 0)
50
                return 2;
51
        if (memcmp(p2,zero,mymeta_size) == 0)
52
                return 1;
53
        return userRankFunct(target, p1, p2);
54
}
55 8a0aa150 Luca Abeni
56 df98cd3a Luca Abeni
static uint64_t gettime(void)
57
{
58 0a5b3237 Marco Biazzini
        struct timeval tv;
59 df98cd3a Luca Abeni
60 0a5b3237 Marco Biazzini
        gettimeofday(&tv, NULL);
61 df98cd3a Luca Abeni
62 0a5b3237 Marco Biazzini
        return tv.tv_usec + tv.tv_sec * 1000000ull;
63 df98cd3a Luca Abeni
}
64
65 8a0aa150 Luca Abeni
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
66 df98cd3a Luca Abeni
{
67 0a5b3237 Marco Biazzini
        userRankFunct = rfun;
68
        blist_proto_init(myID, metadata, metadata_size);
69
        mymeta = metadata;
70
        mymeta_size = metadata_size;
71
        zero = calloc(mymeta_size,1);
72
73
        local_cache = blist_cache_init(cache_size, metadata_size, 0);
74
        if (local_cache == NULL) {
75
                return -1;
76
        }
77
        if (gossip_peers) {
78
                max_gossiping_peers = gossip_peers;
79
        }
80
        max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
81
        active = -1;
82
        currtime = gettime();
83 df98cd3a Luca Abeni
84 0a5b3237 Marco Biazzini
        return 0;
85 df98cd3a Luca Abeni
}
86
87 8a0aa150 Luca Abeni
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
88 df98cd3a Luca Abeni
{
89 8a0aa150 Luca Abeni
        int metadata_size;
90
        const uint8_t *mdata;
91
        int i;
92
93 0a5b3237 Marco Biazzini
        mdata = blist_get_metadata(local_cache, &metadata_size);
94 33ea04ee Marco Biazzini
        for (i=0; blist_nodeid(local_cache, i) && (i < n); i++) {
95 0a5b3237 Marco Biazzini
                peers[i] = blist_nodeid(local_cache,i);
96
                if (metadata_size)
97
                        memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
98 9eab8f0b Marco Biazzini
        }
99 8a0aa150 Luca Abeni
100
        return i;
101 df98cd3a Luca Abeni
}
102
103 bb14413e Luca Abeni
int tmanGetNeighbourhoodSize(void)
104
{
105 0a5b3237 Marco Biazzini
        int i;
106 bb14413e Luca Abeni
107 0a5b3237 Marco Biazzini
        for (i = 0; blist_nodeid(local_cache, i); i++);
108 bb14413e Luca Abeni
109 0a5b3237 Marco Biazzini
        return i;
110 bb14413e Luca Abeni
}
111
112 8a0aa150 Luca Abeni
static int time_to_send(void)
113 df98cd3a Luca Abeni
{
114
        if (gettime() - currtime > period) {
115
                currtime += period;
116 ba6b5d5d Marco Biazzini
                return 1;
117 df98cd3a Luca Abeni
        }
118
119 0a5b3237 Marco Biazzini
        return 0;
120 df98cd3a Luca Abeni
}
121
122 8a0aa150 Luca Abeni
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
123
{
124 1f0684b3 Marco Biazzini
        if (!metadata_size) {
125 33ea04ee Marco Biazzini
                blist_tman_query_peer(local_cache, neighbour, max_gossiping_peers);
126 1f0684b3 Marco Biazzini
                return -1;
127
        }
128 0a5b3237 Marco Biazzini
        if (blist_cache_add_ranked(local_cache, neighbour, metadata, metadata_size, tmanRankFunct, mymeta) < 0) {
129
                return -1;
130
        }
131 8a0aa150 Luca Abeni
132 0a5b3237 Marco Biazzini
        return 1;
133 8a0aa150 Luca Abeni
}
134
135
136
// not self metadata, but neighbors'.
137
const void *tmanGetMetadata(int *metadata_size)
138
{
139 0a5b3237 Marco Biazzini
        return blist_get_metadata(local_cache, metadata_size);
140 8a0aa150 Luca Abeni
}
141
142
143 483ff9b6 Marco Biazzini
int tmanChangeMetadata(void *metadata, int metadata_size)
144 df98cd3a Luca Abeni
{
145 0a5b3237 Marco Biazzini
        struct peer_cache *new = NULL;
146 6f6ebef2 Marco Biazzini
147 0a5b3237 Marco Biazzini
        if (blist_proto_metadata_update(metadata, metadata_size) <= 0) {
148
                return -1;
149
        }
150
        mymeta = metadata;
151 df98cd3a Luca Abeni
152 0a5b3237 Marco Biazzini
        if (active >= 0) {
153
                new = blist_cache_rank(local_cache, tmanRankFunct, NULL, mymeta);
154
                if (new) {
155
                        blist_cache_free(local_cache);
156
                        local_cache = new;
157
                }
158
        }
159 6f6ebef2 Marco Biazzini
160 0a5b3237 Marco Biazzini
        return 1;
161 df98cd3a Luca Abeni
}
162
163
164 d74d9d89 Luca Abeni
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
165 df98cd3a Luca Abeni
{
166 0a5b3237 Marco Biazzini
        int msize,s;
167
        const uint8_t *mdata;
168 41022ab4 Marco Biazzini
        struct peer_cache *new = NULL, *temp;
169 df98cd3a Luca Abeni
170 eb29e340 Luca
        if (len && active >= 0) {
171 df98cd3a Luca Abeni
                const struct topo_header *h = (const struct topo_header *)buff;
172 9589827a Marco Biazzini
                struct peer_cache *remote_cache;
173 df98cd3a Luca Abeni
174 04119bba Marco Biazzini
            if (h->protocol != MSG_TYPE_TMAN) {
175 df98cd3a Luca Abeni
              fprintf(stderr, "TMAN: Wrong protocol!\n");
176
              return -1;
177
            }
178
179 33ea04ee Marco Biazzini
                remote_cache = blist_entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
180
                mdata = blist_get_metadata(remote_cache,&msize);
181
                blist_get_metadata(local_cache,&s);
182 df98cd3a Luca Abeni
183
                if (msize != s) {
184
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
185
                                s, msize);
186
                        return 1;
187
                }
188
189 eb29e340 Luca
                if (h->type == TMAN_QUERY) {
190 33ea04ee Marco Biazzini
                        new = blist_cache_rank(local_cache, tmanRankFunct, blist_nodeid(remote_cache, 0), blist_get_metadata(remote_cache, &msize));
191 df98cd3a Luca Abeni
                        if (new) {
192 33ea04ee Marco Biazzini
                                blist_tman_reply(remote_cache, new, max_gossiping_peers);
193
                                blist_cache_free(new);
194 0c10c07b Marco Biazzini
                                new = NULL;
195 df98cd3a Luca Abeni
                                // TODO: put sender in tabu list (check list size, etc.), if any...
196
                        }
197
                }
198 0c10c07b Marco Biazzini
199 33ea04ee Marco Biazzini
                if (restart_peer && nodeid_equal(restart_peer, blist_nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
200
                        new = blist_cache_rank(remote_cache,tmanRankFunct,NULL,mymeta);
201 78253c6a Marco Biazzini
                        if (new) {
202 eb29e340 Luca
                                cache_size = TMAN_INIT_PEERS;
203 33ea04ee Marco Biazzini
                                blist_cache_resize(new,cache_size);
204 fff6ceed Marco Biazzini
                                period = TMAN_STD_PERIOD;
205 eb29e340 Luca
                                fprintf(stderr,"RESTARTING TMAN!!!\n");
206 0c10c07b Marco Biazzini
                        }
207
                        nodeid_free(restart_peer);
208
                        restart_peer = NULL;
209 fff6ceed Marco Biazzini
                        active = 1;
210 0c10c07b Marco Biazzini
                }
211 41022ab4 Marco Biazzini
                else {        // normal phase
212 33ea04ee Marco Biazzini
                        temp = blist_cache_union(local_cache,remote_cache,&s);
213 78253c6a Marco Biazzini
                        if (temp) {
214 33ea04ee Marco Biazzini
                                new = blist_cache_rank(temp,tmanRankFunct,NULL,mymeta);
215 eb29e340 Luca
                                cache_size = ((s/2)*2.5) > cache_size ? ((s/2)*2.5) : cache_size;
216 33ea04ee Marco Biazzini
                                blist_cache_resize(new,cache_size);
217
                                blist_cache_free(temp);
218 78253c6a Marco Biazzini
                        }
219 fff6ceed Marco Biazzini
                        if (restart_peer) {
220
                                restart_countdown--;
221
                                if (restart_countdown <= 0) {
222
                                        nodeid_free(restart_peer);
223
                                        restart_peer = NULL;
224
                                }
225
                        }
226 0c10c07b Marco Biazzini
                }
227 df98cd3a Luca Abeni
228 33ea04ee Marco Biazzini
                blist_cache_free(remote_cache);
229 df98cd3a Luca Abeni
                if (new!=NULL) {
230 33ea04ee Marco Biazzini
                  blist_cache_free(local_cache);
231 df98cd3a Luca Abeni
                  local_cache = new;
232 8a0aa150 Luca Abeni
                  do_resize = 0;
233 df98cd3a Luca Abeni
                }
234 8a0aa150 Luca Abeni
        }
235 df98cd3a Luca Abeni
236
  if (time_to_send()) {
237 81108160 Luca Abeni
        uint8_t *meta;
238 df98cd3a Luca Abeni
        struct nodeID *chosen;
239
240 33ea04ee Marco Biazzini
        blist_cache_update(local_cache);
241 50fea9dc Marco Biazzini
242 fff6ceed Marco Biazzini
        if (active > 0 && tmanGetNeighbourhoodSize() < size && !restart_countdown) {
243
                fprintf(stderr, "TMAN: Too few peers in cache! Triggering a restart...\n");
244 3c88391a Marco Biazzini
                active = 0;
245 fff6ceed Marco Biazzini
                period = TMAN_INIT_PERIOD;
246 3c88391a Marco Biazzini
        }
247
248 41022ab4 Marco Biazzini
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
249 8a0aa150 Luca Abeni
                struct peer_cache *ncache;
250 41022ab4 Marco Biazzini
                int j,nsize;
251 8a0aa150 Luca Abeni
252 41022ab4 Marco Biazzini
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
253 33ea04ee Marco Biazzini
                if (size) ncache = blist_cache_init(nsize, metadata_size, 0);
254 ba6b5d5d Marco Biazzini
                else {return 1;}
255 9589827a Marco Biazzini
                for (j=0;j<size;j++)
256 33ea04ee Marco Biazzini
                        blist_cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, tmanRankFunct, mymeta);
257
                if (blist_nodeid(ncache, 0)) {
258
                        restart_peer = nodeid_dup(blist_nodeid(ncache, 0));
259 fff6ceed Marco Biazzini
                        restart_countdown = TMAN_RESTART_COUNT;
260 33ea04ee Marco Biazzini
                        mdata = blist_get_metadata(ncache, &msize);
261
                        new = blist_cache_rank(active < 0 ? ncache : local_cache, tmanRankFunct, restart_peer, mdata);
262 0c10c07b Marco Biazzini
                        if (new) {
263 33ea04ee Marco Biazzini
                                blist_tman_query_peer(new, restart_peer, max_gossiping_peers);
264
                                blist_cache_free(new);
265 8a0aa150 Luca Abeni
                        }
266 41022ab4 Marco Biazzini
                if (active < 0) { // bootstrap
267 eb29e340 Luca
                        fprintf(stderr,"BOOTSTRAPPING TMAN!!!\n");
268 33ea04ee Marco Biazzini
                        blist_cache_free(local_cache);
269 50fea9dc Marco Biazzini
                        local_cache = ncache;
270 41022ab4 Marco Biazzini
                        cache_size = nsize;
271 50fea9dc Marco Biazzini
                        active = 0;
272 41022ab4 Marco Biazzini
                } else { // restart
273 33ea04ee Marco Biazzini
                        blist_cache_free(ncache);
274 50fea9dc Marco Biazzini
                }
275 db8fa933 Marco Biazzini
                }
276
                else {
277 33ea04ee Marco Biazzini
                        blist_cache_free(ncache);
278 db8fa933 Marco Biazzini
                        fprintf(stderr, "TMAN: No peer available from peer sampler!\n");
279
                        return 1;
280
                }
281 8a0aa150 Luca Abeni
        }
282 41022ab4 Marco Biazzini
        else { // normal phase
283 33ea04ee Marco Biazzini
        chosen = blist_rand_peer(local_cache, (void **)&meta, max_preferred_peers);
284
        new = blist_cache_rank(local_cache, tmanRankFunct, chosen, meta);
285 8a0aa150 Luca Abeni
        if (new==NULL) {
286
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
287
                return 1;
288 df98cd3a Luca Abeni
        }
289 33ea04ee Marco Biazzini
        blist_tman_query_peer(new, chosen, max_gossiping_peers);
290
        blist_cache_free(new);
291 0c10c07b Marco Biazzini
        }
292 df98cd3a Luca Abeni
  }
293
294
  return 0;
295
}
296
297
298 8a0aa150 Luca Abeni
// limit : at most it doubles the current cache size...
299
int tmanGrowNeighbourhood(int n)
300 df98cd3a Luca Abeni
{
301 8a0aa150 Luca Abeni
        if (n<=0 || do_resize)
302
                return -1;
303
        n = n>cache_size?cache_size:n;
304
        cache_size += n;
305
        do_resize = 1;
306
        return cache_size;
307 df98cd3a Luca Abeni
}
308
309
310 8a0aa150 Luca Abeni
int tmanShrinkNeighbourhood(int n)
311
{
312
        if (n<=0 || n>=cache_size || do_resize)
313
                return -1;
314
        cache_size -= n;
315
        do_resize = 1;
316
        return cache_size;
317 df98cd3a Luca Abeni
}