Statistics
| Branch: | Revision:

grapes / som / TopologyManager / tman.c @ 50fea9dc

History | View | Annotate | Download (7.23 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "msg_types.h"
20
#include "tman.h"
21

    
22
#define TMAN_INIT_PEERS 10 // max # of neighbors in local cache (should be >= than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be <= than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 10 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_IDLE_TIME 10 // # of iterations to wait before switching to inactive state
26
#define TMAN_STD_PERIOD 10000000
27
#define TMAN_INIT_PERIOD 1000000
28

    
29
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
30
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
31
static  int idle_time = TMAN_IDLE_TIME;
32

    
33
static uint64_t currtime;
34
static int cache_size = TMAN_INIT_PEERS;
35
static struct peer_cache *local_cache;
36
static int period = TMAN_INIT_PERIOD;
37
static int active;
38
static int do_resize;
39
static void *mymeta;
40
static struct nodeID *restart_peer;
41

    
42
static tmanRankingFunction rankFunct;
43

    
44

    
45
// TODO: first parameter may be discarded, because it is always called with local_cache...
46
static struct peer_cache *rank_cache (const struct peer_cache *c, const struct nodeID *target, const void *target_meta)
47
{
48
        struct peer_cache *res;
49
        int i, msize;
50
        const uint8_t *mdata;
51

    
52
        mdata = get_metadata(c,&msize);
53
        res = cache_init(cache_size,msize);
54
        if (res == NULL) {
55
          return res;
56
        }
57

    
58
        for (i=0; nodeid(c,i); i++) {
59
                if (!nodeid_equal(nodeid(c,i),target))
60
                        cache_add_ranked(res,nodeid(c,i),mdata+i*msize,msize, rankFunct, target_meta);
61
        }
62

    
63
        return res;
64
}
65

    
66

    
67
static uint64_t gettime(void)
68
{
69
  struct timeval tv;
70

    
71
  gettimeofday(&tv, NULL);
72

    
73
  return tv.tv_usec + tv.tv_sec * 1000000ull;
74
}
75

    
76
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
77
{
78
  rankFunct = rfun;
79
  topo_proto_init(myID, metadata, metadata_size);
80
  mymeta = metadata;
81
  
82
  local_cache = cache_init(cache_size, metadata_size);
83
  if (local_cache == NULL) {
84
    return -1;
85
  }
86
  idle_time = TMAN_IDLE_TIME;
87
  if (gossip_peers) {
88
    max_gossiping_peers = gossip_peers;
89
  }
90
  max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
91
  active = -1;
92
  currtime = gettime();
93

    
94
  return 0;
95
}
96

    
97
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
98
{
99
        int metadata_size;
100
        const uint8_t *mdata;
101
        int i;
102

    
103
        mdata = get_metadata(local_cache, &metadata_size);
104
        for (i=0; nodeid(local_cache, i) && (i < n); i++) {
105
                        peers[i] = nodeid(local_cache,i);
106
                        if (metadata_size)
107
                                memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
108
        }
109
        if (i != n) {
110
                active = 0;
111
        }
112

    
113
        return i;
114
}
115

    
116
int tmanGetNeighbourhoodSize(void)
117
{
118
  int i;
119

    
120
  for (i = 0; nodeid(local_cache, i); i++);
121

    
122
  return i;
123
}
124

    
125
static int time_to_send(void)
126
{
127
        if (gettime() - currtime > period) {
128
                currtime += period;
129
                return 1;
130
        }
131

    
132
  return 0;
133
}
134

    
135
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
136
{
137
  if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, rankFunct, mymeta) < 0) {
138
    return -1;
139
  }
140

    
141
  return 1;
142
}
143

    
144

    
145
// not self metadata, but neighbors'.
146
const void *tmanGetMetadata(int *metadata_size)
147
{
148
  return get_metadata(local_cache, metadata_size);
149
}
150

    
151

    
152
int tmanChangeMetadata(struct nodeID *peer, void *metadata, int metadata_size)
153
{
154
  if (topo_proto_metadata_update(peer, metadata, metadata_size) <= 0) {
155
    return -1;
156
  }
157
  mymeta = metadata;
158

    
159
  return 1;
160
}
161

    
162

    
163
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
164
{
165
        int msize,s;
166
        const uint8_t *mdata;
167
        struct peer_cache *new = NULL;
168
        int source;
169

    
170
        if (len) {
171
                const struct topo_header *h = (const struct topo_header *)buff;
172
                struct peer_cache *remote_cache;
173

    
174
            if (h->protocol != MSG_TYPE_TMAN) {
175
              fprintf(stderr, "TMAN: Wrong protocol!\n");
176
              return -1;
177
            }
178

    
179
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
180
                mdata = get_metadata(remote_cache,&msize);
181
                get_metadata(local_cache,&s);
182

    
183
                if (msize != s) {
184
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
185
                                s, msize);
186
                        return 1;
187
                }
188

    
189
                if (h->type == TMAN_QUERY && !restart_peer) {
190
                        new = rank_cache(local_cache, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
191
                        if (new) {
192
                                tman_reply(remote_cache, new);
193
                                cache_free(new);
194
                                new = NULL;
195
                                // TODO: put sender in tabu list (check list size, etc.), if any...
196
                        }
197
                }
198

    
199
                if (restart_peer && nodeid_equal(restart_peer, nodeid(remote_cache,0))) { // restart phase : receiving new cache from chosen alive peer...
200
                        struct peer_cache *temp = cache_init(cache_size,s);
201
                        if (temp) {
202
                                cache_add_ranked(temp, nodeid(remote_cache,0), mdata, msize, rankFunct, mymeta);
203
                                cache_del(remote_cache,nodeid(remote_cache,0));
204
                                new = merge_caches_ranked(temp, remote_cache, cache_size, &source, rankFunct, mymeta);
205
                        }
206
                        nodeid_free(restart_peer);
207
                        restart_peer = NULL;
208
                }
209
                else {
210
                cache_add_ranked(local_cache, nodeid(remote_cache,0), mdata, msize, rankFunct, mymeta);
211
                cache_del(remote_cache,nodeid(remote_cache,0));
212
                new = merge_caches_ranked(local_cache, remote_cache, cache_size, &source, rankFunct, mymeta);
213
                }
214

    
215
                cache_free(remote_cache);
216
                if (new!=NULL) {
217
                  cache_free(local_cache);
218
                  local_cache = new;
219
                  if (source > 1) {
220
                          period = TMAN_INIT_PERIOD;
221
                          active = idle_time;
222
                  }
223
                  else {
224
                          period = TMAN_STD_PERIOD;
225
                          if (active>0) active--;
226
                  }
227

    
228
                  do_resize = 0;
229
                }
230
        }
231

    
232
  if (time_to_send()) {
233
        uint8_t *meta;
234
        struct nodeID *chosen;
235

    
236
        cache_update(local_cache);
237

    
238
        if (active <= 0) {
239
                struct peer_cache *ncache;
240
                int j;
241

    
242
                if (size) ncache = cache_init(size,metadata_size);
243
                else {return 1;}
244
                for (j=0;j<size;j++)
245
                        cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, rankFunct, mymeta);
246
                if (nodeid(ncache, 0)) {
247
                        restart_peer = nodeid_dup(nodeid(ncache, 0));
248
                        mdata = get_metadata(ncache, &msize);
249
                        new = rank_cache(active<0?ncache:local_cache, restart_peer, mdata);
250
                        if (new) {
251
                                tman_query_peer(new, restart_peer);
252
                                cache_free(new);
253
                        }
254
                }
255
                if (active < 0) {
256
                        local_cache = ncache;
257
                        active = 0;
258
                } else {
259
                        cache_free(ncache);
260
                }
261
        }
262
        else {
263
        chosen = rand_peer(local_cache, (void **)&meta);                //MAX_PREFERRED_PEERS
264
        new = rank_cache(local_cache, chosen, meta);
265
        if (new==NULL) {
266
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
267
                return 1;
268
        }
269
        tman_query_peer(new, chosen);
270
        cache_free(new);
271
        }
272
  }
273

    
274
  return 0;
275
}
276

    
277

    
278

    
279
// limit : at most it doubles the current cache size...
280
int tmanGrowNeighbourhood(int n)
281
{
282
        if (n<=0 || do_resize)
283
                return -1;
284
        n = n>cache_size?cache_size:n;
285
        cache_size += n;
286
        do_resize = 1;
287
        return cache_size;
288
}
289

    
290

    
291
int tmanShrinkNeighbourhood(int n)
292
{
293
        if (n<=0 || n>=cache_size || do_resize)
294
                return -1;
295
        cache_size -= n;
296
        do_resize = 1;
297
        return cache_size;
298
}
299