Statistics
| Branch: | Revision:

grapes / som / TopologyManager / tman.c @ eb29e340

History | View | Annotate | Download (8.04 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "msg_types.h"
20
#include "tman.h"
21

    
22
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 20 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_IDLE_TIME 20 // # of iterations to wait before switching to inactive state
26
#define TMAN_STD_PERIOD 1000000
27
#define TMAN_INIT_PERIOD 1000000
28

    
29
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
30
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
31
static  int idle_time = TMAN_IDLE_TIME;
32

    
33
static uint64_t currtime;
34
static int cache_size = TMAN_INIT_PEERS;
35
static int current_size;
36
static struct peer_cache *local_cache;
37
static int period = TMAN_INIT_PERIOD;
38
static int active, countdown = TMAN_IDLE_TIME*2;
39
static int do_resize;
40
static void *mymeta;
41
static int mymeta_size;
42
static struct nodeID *restart_peer;
43
static uint8_t *zero;
44

    
45
static tmanRankingFunction userRankFunct;
46

    
47
static int tmanRankFunct (const void *target, const void *p1, const void *p2) {
48

    
49
        if (memcmp(target,zero,mymeta_size) == 0 || (memcmp(p1,zero,mymeta_size) == 0 && memcmp(p2,zero,mymeta_size) == 0))
50
                return 0;
51
        if (memcmp(p1,zero,mymeta_size) == 0)
52
                return 2;
53
        if (memcmp(p2,zero,mymeta_size) == 0)
54
                return 1;
55
        return userRankFunct(target, p1, p2);
56
}
57

    
58
static uint64_t gettime(void)
59
{
60
  struct timeval tv;
61

    
62
  gettimeofday(&tv, NULL);
63

    
64
  return tv.tv_usec + tv.tv_sec * 1000000ull;
65
}
66

    
67
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
68
{
69
  userRankFunct = rfun;
70
  topo_proto_init(myID, metadata, metadata_size);
71
  mymeta = metadata;
72
  mymeta_size = metadata_size;
73
  zero = calloc(mymeta_size,1);
74
  
75
  local_cache = cache_init(cache_size, metadata_size);
76
  if (local_cache == NULL) {
77
    return -1;
78
  }
79
  idle_time = TMAN_IDLE_TIME;
80
  if (gossip_peers) {
81
    max_gossiping_peers = gossip_peers;
82
  }
83
  max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
84
  active = -1;
85
  currtime = gettime();
86

    
87
  return 0;
88
}
89

    
90
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
91
{
92
        int metadata_size;
93
        const uint8_t *mdata;
94
        int i;
95

    
96
        mdata = get_metadata(local_cache, &metadata_size);
97
        for (i=0; nodeid(local_cache, i) && (i < n); i++) {
98
                        peers[i] = nodeid(local_cache,i);
99
                        if (metadata_size)
100
                                memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
101
        }
102

    
103
        return i;
104
}
105

    
106
int tmanGetNeighbourhoodSize(void)
107
{
108
  int i;
109

    
110
  for (i = 0; nodeid(local_cache, i); i++);
111

    
112
  return i;
113
}
114

    
115
static int time_to_send(void)
116
{
117
        if (gettime() - currtime > period) {
118
                if (--countdown == 0) {
119
                        countdown = idle_time*2;
120
                        if (active > 0) active = 0;
121
                }
122
                currtime += period;
123
                return 1;
124
        }
125

    
126
  return 0;
127
}
128

    
129
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
130
{
131
        if (!metadata_size) {
132
                tman_query_peer(local_cache, neighbour);
133
                return -1;
134
        }
135
  if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, tmanRankFunct, mymeta) < 0) {
136
    return -1;
137
  }
138

    
139
  current_size++;
140
  return 1;
141
}
142

    
143

    
144
// not self metadata, but neighbors'.
145
const void *tmanGetMetadata(int *metadata_size)
146
{
147
  return get_metadata(local_cache, metadata_size);
148
}
149

    
150

    
151
int tmanChangeMetadata(void *metadata, int metadata_size)
152
{
153
  struct peer_cache *new = NULL;
154

    
155
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
156
    return -1;
157
  }
158
  mymeta = metadata;
159

    
160
  new = cache_rank(local_cache, tmanRankFunct, NULL, mymeta);
161
  if (new) {
162
        cache_free(local_cache);
163
        local_cache = new;
164
  }
165

    
166
  active = 0;
167
  countdown = idle_time*2;
168

    
169
  return 1;
170
}
171

    
172

    
173
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
174
{
175
        int msize,s;
176
        const uint8_t *mdata;
177
        struct peer_cache *new = NULL, *temp;
178
        int source = 4; // init with value > 1, needed in bootstrap/restart phase...
179

    
180
        if (len && active >= 0) {
181
                const struct topo_header *h = (const struct topo_header *)buff;
182
                struct peer_cache *remote_cache;
183

    
184
            if (h->protocol != MSG_TYPE_TMAN) {
185
              fprintf(stderr, "TMAN: Wrong protocol!\n");
186
              return -1;
187
            }
188

    
189
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
190
                mdata = get_metadata(remote_cache,&msize);
191
                get_metadata(local_cache,&s);
192

    
193
                if (msize != s) {
194
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
195
                                s, msize);
196
                        return 1;
197
                }
198

    
199
                if (h->type == TMAN_QUERY) {
200
                        new = cache_rank(local_cache, tmanRankFunct, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
201
                        if (new) {
202
                                tman_reply(remote_cache, new);
203
                                cache_free(new);
204
                                new = NULL;
205
                                // TODO: put sender in tabu list (check list size, etc.), if any...
206
                        }
207
                }
208

    
209
                if (restart_peer && nodeid_equal(restart_peer, nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
210
                        new = cache_rank(remote_cache,tmanRankFunct,NULL,mymeta);
211
                        if (new) {
212
                                cache_size = TMAN_INIT_PEERS;
213
                                cache_resize(new,cache_size);
214
                                countdown = idle_time*2;
215
                                fprintf(stderr,"RESTARTING TMAN!!!\n");
216
                        }
217
                        nodeid_free(restart_peer);
218
                        restart_peer = NULL;
219
                }
220
                else {        // normal phase
221
                        temp = cache_union(local_cache,remote_cache,&s);
222
                        if (temp) {
223
                                new = cache_rank(temp,tmanRankFunct,NULL,mymeta);
224
                                cache_size = ((s/2)*2.5) > cache_size ? ((s/2)*2.5) : cache_size;
225
                                cache_resize(new,cache_size);
226
                                cache_free(temp);
227
                        }
228
                }
229

    
230
                cache_free(remote_cache);
231
                if (new!=NULL) {
232
                  cache_free(local_cache);
233
                  local_cache = new;
234
                  current_size = tmanGetNeighbourhoodSize();
235
                  if (source > 1) { // cache is different than before
236
                          period = TMAN_INIT_PERIOD;
237
                          if(!restart_peer) active = idle_time;
238
                  }
239
                  else {
240
                          period = TMAN_STD_PERIOD;
241
                          if (active>0) active--;
242
                  }
243

    
244
                  do_resize = 0;
245
                }
246
        }
247

    
248
  if (time_to_send()) {
249
        uint8_t *meta;
250
        struct nodeID *chosen;
251

    
252
        cache_update(local_cache);
253

    
254
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
255
                struct peer_cache *ncache;
256
                int j,nsize;
257

    
258
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
259
                if (size) ncache = cache_init(nsize,metadata_size);
260
                else {return 1;}
261
                for (j=0;j<size;j++)
262
                        cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, tmanRankFunct, mymeta);
263
                if (nodeid(ncache, 0)) {
264
                        restart_peer = nodeid_dup(nodeid(ncache, 0));
265
                        mdata = get_metadata(ncache, &msize);
266
                        new = cache_rank(active < 0 ? ncache : local_cache, tmanRankFunct, restart_peer, mdata);
267
                        if (new) {
268
                                tman_query_peer(new, restart_peer);
269
                                cache_free(new);
270
                        }
271
                if (active < 0) { // bootstrap
272
                        fprintf(stderr,"BOOTSTRAPPING TMAN!!!\n");
273
                        cache_free(local_cache);
274
                        local_cache = ncache;
275
                        current_size = size;
276
                        cache_size = nsize;
277
                        active = 0;
278
                } else { // restart
279
                        cache_free(ncache);
280
                }
281
                }
282
                else {
283
                        cache_free(ncache);
284
                        fprintf(stderr, "TMAN: No peer available from peer sampler!\n");
285
                        return 1;
286
                }
287
        }
288
        else { // normal phase
289
        chosen = rand_peer(local_cache, (void **)&meta);                //MAX_PREFERRED_PEERS
290
        new = cache_rank(local_cache, tmanRankFunct, chosen, meta);
291
        if (new==NULL) {
292
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
293
                return 1;
294
        }
295
        tman_query_peer(new, chosen);
296
        cache_free(new);
297
        }
298
  }
299

    
300
  return 0;
301
}
302

    
303

    
304

    
305
// limit : at most it doubles the current cache size...
306
int tmanGrowNeighbourhood(int n)
307
{
308
        if (n<=0 || do_resize)
309
                return -1;
310
        n = n>cache_size?cache_size:n;
311
        cache_size += n;
312
        do_resize = 1;
313
        return cache_size;
314
}
315

    
316

    
317
int tmanShrinkNeighbourhood(int n)
318
{
319
        if (n<=0 || n>=cache_size || do_resize)
320
                return -1;
321
        cache_size -= n;
322
        do_resize = 1;
323
        return cache_size;
324
}
325