Statistics
| Branch: | Revision:

grapes / src / TopologyManager / tman.c @ 5941d7a1

History | View | Annotate | Download (8.05 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "grapes_msg_types.h"
20
#include "tman.h"
21

    
22
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 20 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_IDLE_TIME 20 // # of iterations to wait before switching to inactive state
26
#define TMAN_STD_PERIOD 1000000
27
#define TMAN_INIT_PERIOD 1000000
28

    
29
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
30
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
31
static  int idle_time = TMAN_IDLE_TIME;
32

    
33
static uint64_t currtime;
34
static int cache_size = TMAN_INIT_PEERS;
35
static struct peer_cache *local_cache;
36
static int period = TMAN_INIT_PERIOD;
37
static int active, countdown = TMAN_IDLE_TIME*2;
38
static int do_resize;
39
static void *mymeta;
40
static int mymeta_size;
41
static struct nodeID *restart_peer;
42
static uint8_t *zero;
43

    
44
static tmanRankingFunction userRankFunct;
45

    
46
static int tmanRankFunct (const void *target, const void *p1, const void *p2) {
47

    
48
        if (memcmp(target,zero,mymeta_size) == 0 || (memcmp(p1,zero,mymeta_size) == 0 && memcmp(p2,zero,mymeta_size) == 0))
49
                return 0;
50
        if (memcmp(p1,zero,mymeta_size) == 0)
51
                return 2;
52
        if (memcmp(p2,zero,mymeta_size) == 0)
53
                return 1;
54
        return userRankFunct(target, p1, p2);
55
}
56

    
57
static uint64_t gettime(void)
58
{
59
  struct timeval tv;
60

    
61
  gettimeofday(&tv, NULL);
62

    
63
  return tv.tv_usec + tv.tv_sec * 1000000ull;
64
}
65

    
66
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
67
{
68
  userRankFunct = rfun;
69
  topo_proto_init(myID, metadata, metadata_size);
70
  mymeta = metadata;
71
  mymeta_size = metadata_size;
72
  zero = calloc(mymeta_size,1);
73
  
74
  local_cache = cache_init(cache_size, metadata_size);
75
  if (local_cache == NULL) {
76
    return -1;
77
  }
78
  idle_time = TMAN_IDLE_TIME;
79
  if (gossip_peers) {
80
    max_gossiping_peers = gossip_peers;
81
  }
82
  max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
83
  active = -1;
84
  currtime = gettime();
85

    
86
  return 0;
87
}
88

    
89
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
90
{
91
        int metadata_size;
92
        const uint8_t *mdata;
93
        int i;
94

    
95
        mdata = get_metadata(local_cache, &metadata_size);
96
        for (i=0; nodeid(local_cache, i) && (i < n); i++) {
97
                        peers[i] = nodeid(local_cache,i);
98
                        if (metadata_size)
99
                                memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
100
        }
101

    
102
        return i;
103
}
104

    
105
int tmanGetNeighbourhoodSize(void)
106
{
107
  int i;
108

    
109
  for (i = 0; nodeid(local_cache, i); i++);
110

    
111
  return i;
112
}
113

    
114
static int time_to_send(void)
115
{
116
        if (gettime() - currtime > period) {
117
                if (--countdown == 0) {
118
                        countdown = idle_time*2;
119
                        if (active > 0) active = 0;
120
                }
121
                currtime += period;
122
                return 1;
123
        }
124

    
125
  return 0;
126
}
127

    
128
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
129
{
130
        if (!metadata_size) {
131
                tman_query_peer(local_cache, neighbour, max_gossiping_peers);
132
                return -1;
133
        }
134
  if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, tmanRankFunct, mymeta) < 0) {
135
    return -1;
136
  }
137

    
138
  return 1;
139
}
140

    
141

    
142
// not self metadata, but neighbors'.
143
const void *tmanGetMetadata(int *metadata_size)
144
{
145
  return get_metadata(local_cache, metadata_size);
146
}
147

    
148

    
149
int tmanChangeMetadata(void *metadata, int metadata_size)
150
{
151
  struct peer_cache *new = NULL;
152

    
153
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
154
    return -1;
155
  }
156
  mymeta = metadata;
157

    
158
  if (active >= 0) {
159
  new = cache_rank(local_cache, tmanRankFunct, NULL, mymeta);
160
  if (new) {
161
        cache_free(local_cache);
162
        local_cache = new;
163
  }
164
  }
165

    
166
  if (active > 0) active = 0;
167
  countdown = idle_time*2;
168

    
169
  return 1;
170
}
171

    
172

    
173
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
174
{
175
        int msize,s;
176
        const uint8_t *mdata;
177
        struct peer_cache *new = NULL, *temp;
178
        int source = 4; // init with value > 1, needed in bootstrap/restart phase...
179

    
180
        if (len && active >= 0) {
181
                const struct topo_header *h = (const struct topo_header *)buff;
182
                struct peer_cache *remote_cache;
183

    
184
            if (h->protocol != MSG_TYPE_TMAN) {
185
              fprintf(stderr, "TMAN: Wrong protocol!\n");
186
              return -1;
187
            }
188

    
189
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
190
                mdata = get_metadata(remote_cache,&msize);
191
                get_metadata(local_cache,&s);
192

    
193
                if (msize != s) {
194
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
195
                                s, msize);
196
                        return 1;
197
                }
198

    
199
                if (h->type == TMAN_QUERY) {
200
                        new = cache_rank(local_cache, tmanRankFunct, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
201
                        if (new) {
202
                                tman_reply(remote_cache, new, max_gossiping_peers);
203
                                cache_free(new);
204
                                new = NULL;
205
                                // TODO: put sender in tabu list (check list size, etc.), if any...
206
                        }
207
                }
208

    
209
                if (restart_peer && nodeid_equal(restart_peer, nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
210
                        new = cache_rank(remote_cache,tmanRankFunct,NULL,mymeta);
211
                        if (new) {
212
                                cache_size = TMAN_INIT_PEERS;
213
                                cache_resize(new,cache_size);
214
                                countdown = idle_time*2;
215
                                fprintf(stderr,"RESTARTING TMAN!!!\n");
216
                        }
217
                        nodeid_free(restart_peer);
218
                        restart_peer = NULL;
219
                }
220
                else {        // normal phase
221
                        temp = cache_union(local_cache,remote_cache,&s);
222
                        if (temp) {
223
                                new = cache_rank(temp,tmanRankFunct,NULL,mymeta);
224
                                cache_size = ((s/2)*2.5) > cache_size ? ((s/2)*2.5) : cache_size;
225
                                cache_resize(new,cache_size);
226
                                cache_free(temp);
227
                        }
228
                }
229

    
230
                cache_free(remote_cache);
231
                if (new!=NULL) {
232
                  cache_free(local_cache);
233
                  local_cache = new;
234
                  if (source > 1) { // cache is different than before
235
                          period = TMAN_INIT_PERIOD;
236
                          if(!restart_peer) active = idle_time;
237
                  }
238
                  else {
239
                          period = TMAN_STD_PERIOD;
240
                          if (active>0) active--;
241
                  }
242

    
243
                  do_resize = 0;
244
                }
245
        }
246

    
247
  if (time_to_send()) {
248
        uint8_t *meta;
249
        struct nodeID *chosen;
250

    
251
        cache_update(local_cache);
252

    
253
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
254
                struct peer_cache *ncache;
255
                int j,nsize;
256

    
257
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
258
                if (size) ncache = cache_init(nsize,metadata_size);
259
                else {return 1;}
260
                for (j=0;j<size;j++)
261
                        cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, tmanRankFunct, mymeta);
262
                if (nodeid(ncache, 0)) {
263
                        restart_peer = nodeid_dup(nodeid(ncache, 0));
264
                        mdata = get_metadata(ncache, &msize);
265
                        new = cache_rank(active < 0 ? ncache : local_cache, tmanRankFunct, restart_peer, mdata);
266
                        if (new) {
267
                                tman_query_peer(new, restart_peer, max_gossiping_peers);
268
                                cache_free(new);
269
                        }
270
                if (active < 0) { // bootstrap
271
                        fprintf(stderr,"BOOTSTRAPPING TMAN!!!\n");
272
                        cache_free(local_cache);
273
                        local_cache = ncache;
274
                        cache_size = nsize;
275
                        active = 0;
276
                } else { // restart
277
                        cache_free(ncache);
278
                }
279
                }
280
                else {
281
                        cache_free(ncache);
282
                        fprintf(stderr, "TMAN: No peer available from peer sampler!\n");
283
                        return 1;
284
                }
285
        }
286
        else { // normal phase
287
        chosen = rand_peer(local_cache, (void **)&meta);                //MAX_PREFERRED_PEERS
288
        new = cache_rank(local_cache, tmanRankFunct, chosen, meta);
289
        if (new==NULL) {
290
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
291
                return 1;
292
        }
293
        tman_query_peer(new, chosen, max_gossiping_peers);
294
        cache_free(new);
295
        }
296
  }
297

    
298
  return 0;
299
}
300

    
301

    
302

    
303
// limit : at most it doubles the current cache size...
304
int tmanGrowNeighbourhood(int n)
305
{
306
        if (n<=0 || do_resize)
307
                return -1;
308
        n = n>cache_size?cache_size:n;
309
        cache_size += n;
310
        do_resize = 1;
311
        return cache_size;
312
}
313

    
314

    
315
int tmanShrinkNeighbourhood(int n)
316
{
317
        if (n<=0 || n>=cache_size || do_resize)
318
                return -1;
319
        cache_size -= n;
320
        do_resize = 1;
321
        return cache_size;
322
}
323