Statistics
| Branch: | Revision:

grapes / som / TopologyManager / tman.c @ 483ff9b6

History | View | Annotate | Download (7.96 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "msg_types.h"
20
#include "tman.h"
21

    
22
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 10 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_IDLE_TIME 10 // # of iterations to wait before switching to inactive state
26
#define TMAN_STD_PERIOD 1000000
27
#define TMAN_INIT_PERIOD 1000000
28

    
29
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
30
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
31
static  int idle_time = TMAN_IDLE_TIME;
32

    
33
static uint64_t currtime;
34
static int cache_size = TMAN_INIT_PEERS;
35
static int current_size;
36
static struct peer_cache *local_cache;
37
static int period = TMAN_INIT_PERIOD;
38
static int active, countdown = TMAN_IDLE_TIME*2;
39
static int do_resize;
40
static void *mymeta;
41
static struct nodeID *restart_peer;
42

    
43
static tmanRankingFunction rankFunct;
44

    
45

    
46
// TODO: first parameter may be discarded, because it is always called with local_cache...
47
static struct peer_cache *rank_cache (const struct peer_cache *c, const struct nodeID *target, const void *target_meta)
48
{
49
        struct peer_cache *res;
50
        int i, msize;
51
        const uint8_t *mdata;
52

    
53
        mdata = get_metadata(c,&msize);
54
        res = cache_init(cache_size,msize);
55
        if (res == NULL) {
56
          return res;
57
        }
58

    
59
        for (i=0; nodeid(c,i); i++) {
60
                if (!nodeid_equal(nodeid(c,i),target))
61
                        cache_add_ranked(res,nodeid(c,i),mdata+i*msize,msize, rankFunct, target_meta);
62
        }
63

    
64
        return res;
65
}
66

    
67

    
68
static uint64_t gettime(void)
69
{
70
  struct timeval tv;
71

    
72
  gettimeofday(&tv, NULL);
73

    
74
  return tv.tv_usec + tv.tv_sec * 1000000ull;
75
}
76

    
77
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
78
{
79
  rankFunct = rfun;
80
  topo_proto_init(myID, metadata, metadata_size);
81
  mymeta = metadata;
82
  
83
  local_cache = cache_init(cache_size, metadata_size);
84
  if (local_cache == NULL) {
85
    return -1;
86
  }
87
  idle_time = TMAN_IDLE_TIME;
88
  if (gossip_peers) {
89
    max_gossiping_peers = gossip_peers;
90
  }
91
  max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
92
  active = -1;
93
  currtime = gettime();
94

    
95
  return 0;
96
}
97

    
98
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
99
{
100
        int metadata_size;
101
        const uint8_t *mdata;
102
        int i;
103

    
104
        mdata = get_metadata(local_cache, &metadata_size);
105
        for (i=0; nodeid(local_cache, i) && (i < n); i++) {
106
                        peers[i] = nodeid(local_cache,i);
107
                        if (metadata_size)
108
                                memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
109
        }
110
        if (i != n) {
111
                active = 0;
112
        }
113

    
114
        return i;
115
}
116

    
117
int tmanGetNeighbourhoodSize(void)
118
{
119
  int i;
120

    
121
  for (i = 0; nodeid(local_cache, i); i++);
122

    
123
  return i;
124
}
125

    
126
static int time_to_send(void)
127
{
128
        if (gettime() - currtime > period) {
129
                if (--countdown == 0) {
130
                        countdown = idle_time*2;
131
                        if (active > 0) active = 0;
132
                }
133
                currtime += period;
134
                return 1;
135
        }
136

    
137
  return 0;
138
}
139

    
140
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
141
{
142
        if (!metadata_size) {
143
                tman_query_peer(local_cache, neighbour);
144
                return -1;
145
        }
146
  if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, rankFunct, mymeta) < 0) {
147
    return -1;
148
  }
149

    
150
  current_size++;
151
  return 1;
152
}
153

    
154

    
155
// not self metadata, but neighbors'.
156
const void *tmanGetMetadata(int *metadata_size)
157
{
158
  return get_metadata(local_cache, metadata_size);
159
}
160

    
161

    
162
int tmanChangeMetadata(void *metadata, int metadata_size)
163
{
164
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
165
    return -1;
166
  }
167
  mymeta = metadata;
168

    
169
  return 1;
170
}
171

    
172

    
173
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
174
{
175
        int msize,s;
176
        const uint8_t *mdata;
177
        struct peer_cache *new = NULL, *temp;
178
        int source;
179

    
180
        if (len) {
181
                const struct topo_header *h = (const struct topo_header *)buff;
182
                struct peer_cache *remote_cache;
183

    
184
            if (h->protocol != MSG_TYPE_TMAN) {
185
              fprintf(stderr, "TMAN: Wrong protocol!\n");
186
              return -1;
187
            }
188

    
189
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
190
                mdata = get_metadata(remote_cache,&msize);
191
                get_metadata(local_cache,&s);
192

    
193
                if (msize != s) {
194
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
195
                                s, msize);
196
                        return 1;
197
                }
198

    
199
                if (h->type == TMAN_QUERY && active >= 0) {
200
                        new = rank_cache(local_cache, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
201
                        if (new) {
202
                                tman_reply(remote_cache, new);
203
                                cache_free(new);
204
                                new = NULL;
205
                                // TODO: put sender in tabu list (check list size, etc.), if any...
206
                        }
207
                }
208

    
209
                if (restart_peer && nodeid_equal(restart_peer, nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
210
                                cache_size = TMAN_INIT_PEERS;
211
                                temp = cache_init(cache_size,s);
212
                                if (temp) { // flush away old entries and store the newcomers
213
                                cache_add_ranked(temp, nodeid(remote_cache,0), mdata, msize, rankFunct, mymeta);
214
                                cache_del(remote_cache,nodeid(remote_cache,0));
215
                                new = merge_caches_ranked(temp, remote_cache, cache_size, &source, rankFunct, mymeta);
216
                                if (new) {
217
                                        countdown = idle_time*2;
218
                                }
219
                        }
220
                        nodeid_free(restart_peer);
221
                        restart_peer = NULL;
222
                }
223
                else {        // normal phase
224
                cache_add_ranked(local_cache, nodeid(remote_cache,0), mdata, msize, rankFunct, mymeta);
225
                cache_del(remote_cache,nodeid(remote_cache,0));
226
                cache_size = ((current_size/2)*3) > cache_size ? current_size*2 : cache_size;
227
                new = merge_caches_ranked(local_cache, remote_cache, cache_size, &source, rankFunct, mymeta);
228
                }
229

    
230
                cache_free(remote_cache);
231
                if (new!=NULL) {
232
                  cache_free(local_cache);
233
                  local_cache = new;
234
                  current_size = tmanGetNeighbourhoodSize();
235
                  if (source > 1) { // cache is different than before
236
                          period = TMAN_INIT_PERIOD;
237
                          if(!restart_peer) active = idle_time;
238
                  }
239
                  else {
240
                          period = TMAN_STD_PERIOD;
241
                          if (active>0) active--;
242
                  }
243

    
244
                  do_resize = 0;
245
                }
246
        }
247

    
248
  if (time_to_send()) {
249
        uint8_t *meta;
250
        struct nodeID *chosen;
251

    
252
        cache_update(local_cache);
253

    
254
        if (active <= 0) {        // active < 0 -> bootstrap phase ; active = 0 -> restart phase
255
                struct peer_cache *ncache;
256
                int j,nsize;
257

    
258
                nsize = TMAN_INIT_PEERS > size ? TMAN_INIT_PEERS : size + 1;
259
                if (size) ncache = cache_init(nsize,metadata_size);
260
                else {return 1;}
261
                for (j=0;j<size;j++)
262
                        cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, rankFunct, mymeta);
263
                if (nodeid(ncache, 0)) {
264
                        restart_peer = nodeid_dup(nodeid(ncache, 0));
265
                        mdata = get_metadata(ncache, &msize);
266
                        new = rank_cache(active < 0 ? ncache : local_cache, restart_peer, mdata);
267
                        if (new) {
268
                                tman_query_peer(new, restart_peer);
269
                                cache_free(new);
270
                        }
271
                }
272
                if (active < 0) { // bootstrap
273
                        local_cache = ncache;
274
                        current_size = size;
275
                        cache_size = nsize;
276
                        active = 0;
277
                } else { // restart
278
                        cache_free(ncache);
279
                }
280
        }
281
        else { // normal phase
282
        chosen = rand_peer(local_cache, (void **)&meta);                //MAX_PREFERRED_PEERS
283
        new = rank_cache(local_cache, chosen, meta);
284
        if (new==NULL) {
285
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
286
                return 1;
287
        }
288
        tman_query_peer(new, chosen);
289
        cache_free(new);
290
        }
291
  }
292

    
293
  return 0;
294
}
295

    
296

    
297

    
298
// limit : at most it doubles the current cache size...
299
int tmanGrowNeighbourhood(int n)
300
{
301
        if (n<=0 || do_resize)
302
                return -1;
303
        n = n>cache_size?cache_size:n;
304
        cache_size += n;
305
        do_resize = 1;
306
        return cache_size;
307
}
308

    
309

    
310
int tmanShrinkNeighbourhood(int n)
311
{
312
        if (n<=0 || n>=cache_size || do_resize)
313
                return -1;
314
        cache_size -= n;
315
        do_resize = 1;
316
        return cache_size;
317
}
318