Statistics
| Branch: | Revision:

grapes / som / TopologyManager / tman.c @ 18d83f26

History | View | Annotate | Download (6.56 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "msg_types.h"
20
#include "tman.h"
21

    
22
#define TMAN_INIT_PEERS 20 // max # of neighbors in local cache (should be > than the next)
23
#define TMAN_MAX_PREFERRED_PEERS 10 // # of peers to choose a receiver among (should be < than the previous)
24
#define TMAN_MAX_GOSSIPING_PEERS 10 // # size of the view to be sent to receiver peer (should be <= than the previous)
25
#define TMAN_IDLE_TIME 10 // # of iterations to wait before switching to inactive state
26
#define TMAN_STD_PERIOD 3000000
27
#define TMAN_INIT_PERIOD 1000000
28

    
29
static  int max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
30
static  int max_gossiping_peers = TMAN_MAX_GOSSIPING_PEERS;
31
static  int idle_time = TMAN_IDLE_TIME;
32

    
33
static uint64_t currtime;
34
static int cache_size = TMAN_INIT_PEERS;
35
static struct peer_cache *local_cache = NULL;
36
static int period = TMAN_INIT_PERIOD;
37
static int active = 0;
38
static int do_resize = 0;
39
static void *mymeta;
40

    
41
static tmanRankingFunction rankFunct;
42

    
43

    
44
// TODO: first parameter may be discarded, because it is always called with local_cache...
45
static struct peer_cache *rank_cache (const struct peer_cache *c, const struct nodeID *target, const void *target_meta)
46
{
47
        struct peer_cache *res;
48
        int i, msize;
49
        const uint8_t *mdata;
50

    
51
        mdata = get_metadata(c,&msize);
52
        res = cache_init(cache_size, msize, 0);
53
        if (res == NULL) {
54
          return res;
55
        }
56

    
57
        for (i=0; nodeid(c,i); i++) {
58
                if (!nodeid_equal(nodeid(c,i),target))
59
                        cache_add_ranked(res,nodeid(c,i),mdata+i*msize,msize, rankFunct, target_meta);
60
        }
61

    
62
        return res;
63
}
64

    
65

    
66
static uint64_t gettime(void)
67
{
68
  struct timeval tv;
69

    
70
  gettimeofday(&tv, NULL);
71

    
72
  return tv.tv_usec + tv.tv_sec * 1000000ull;
73
}
74

    
75
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function rfun, int gossip_peers)
76
{
77
  rankFunct = rfun;
78
  topo_proto_init(myID, metadata, metadata_size);
79
  mymeta = metadata;
80
  
81
  local_cache = cache_init(cache_size, metadata_size, 0);
82
  if (local_cache == NULL) {
83
    return -1;
84
  }
85
  idle_time = TMAN_IDLE_TIME;
86
  if (gossip_peers) {
87
    max_gossiping_peers = gossip_peers;
88
  }
89
  max_preferred_peers = TMAN_MAX_PREFERRED_PEERS;
90
  active = 0;
91
  currtime = gettime();
92

    
93
  return 0;
94
}
95

    
96
int tmanGivePeers (int n, struct nodeID **peers, void *metadata)
97
{
98
        int metadata_size;
99
        const uint8_t *mdata;
100
        int i;
101

    
102
        mdata = get_metadata(local_cache, &metadata_size);
103
        for (i=0; nodeid(local_cache, i) && (i < n); i++) {
104
                        peers[i] = nodeid(local_cache,i);
105
                        if (metadata_size)
106
                                memcpy((uint8_t *)metadata + i * metadata_size, mdata + i * metadata_size, metadata_size);
107
        }
108
        if (i != n) {
109
                active = 0;
110
        }
111

    
112
        return i;
113
}
114

    
115
int tmanGetNeighbourhoodSize(void)
116
{
117
  int i;
118

    
119
  for (i = 0; nodeid(local_cache, i); i++);
120

    
121
  return i;
122
}
123

    
124
static int time_to_send(void)
125
{
126
        if (gettime() - currtime > period) {
127
                currtime += period;
128
                return 1;
129
        }
130

    
131
  return 0;
132
}
133

    
134
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
135
{
136
  if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, rankFunct, mymeta) < 0) {
137
    return -1;
138
  }
139

    
140
  return 1;
141
}
142

    
143

    
144
// not self metadata, but neighbors'.
145
const void *tmanGetMetadata(int *metadata_size)
146
{
147
  return get_metadata(local_cache, metadata_size);
148
}
149

    
150

    
151
int tmanChangeMetadata(struct nodeID *peer, void *metadata, int metadata_size)
152
{
153
  if (topo_proto_metadata_update(peer, metadata, metadata_size) <= 0) {
154
    return -1;
155
  }
156
  mymeta = metadata;
157

    
158
  return 1;
159
}
160

    
161

    
162
int tmanParseData(const uint8_t *buff, int len, struct nodeID **peers, int size, const void *metadata, int metadata_size)
163
{
164
        int msize,s;
165
        const uint8_t *mdata;
166
        struct peer_cache *new;
167
        int source;
168

    
169
        if (len) {
170
                const struct topo_header *h = (const struct topo_header *)buff;
171
                struct peer_cache *remote_cache;
172

    
173
            if (h->protocol != MSG_TYPE_TMAN) {
174
              fprintf(stderr, "TMAN: Wrong protocol!\n");
175
              return -1;
176
            }
177

    
178
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
179
                mdata = get_metadata(remote_cache,&msize);
180
                get_metadata(local_cache,&s);
181

    
182
                if (msize != s) {
183
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
184
                                s, msize);
185
                        return 1;
186
                }
187

    
188
                if (h->type == TMAN_QUERY) {
189
                        new = rank_cache(local_cache, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
190
                        if (new) {
191
                                tman_reply(remote_cache, new);
192
                                cache_free(new);
193
                                // TODO: put sender in tabu list (check list size, etc.), if any...
194
                        }
195
                }
196
                cache_add_ranked(local_cache, nodeid(remote_cache,0), mdata, msize, rankFunct, mymeta);
197
                new = merge_caches_ranked(local_cache, remote_cache, cache_size, &source, rankFunct, mymeta);
198

    
199
                cache_free(remote_cache);
200
                if (new!=NULL) {
201
                  cache_free(local_cache);
202
                  local_cache = new;
203
                  if (source > 1) {
204
                          active = idle_time;
205
                  }
206
                  else {
207
                          period = TMAN_STD_PERIOD;
208
                          if (active>0) active--;
209
                  }
210

    
211
                  do_resize = 0;
212
                }
213
        }
214

    
215
  if (time_to_send()) {
216
        uint8_t *meta;
217
        struct nodeID *chosen;
218

    
219
        cache_update(local_cache);
220
        if (active == 0) {
221
                struct peer_cache *ncache;
222
                int j;
223

    
224
                if (size) ncache = cache_init(size, metadata_size, 0);
225
                else {return 1;}
226
                for (j=0;j<size;j++)
227
                        cache_add_ranked(ncache, peers[j],(const uint8_t *)metadata + j * metadata_size, metadata_size, rankFunct, mymeta);
228
                if (nodeid(ncache, 0)) {
229
                        new = merge_caches_ranked(local_cache, ncache, cache_size, &source, rankFunct, mymeta);
230
                        if (new) {
231
                                cache_free(local_cache);
232
                                local_cache = new;
233
                        if (source > 1) {
234
                                active = idle_time;
235
                        }
236
                        do_resize = 0;
237
                }
238
                }
239
                cache_free(ncache);
240
        }
241
                
242
        mdata = get_metadata(local_cache,&msize);
243
        chosen = rand_peer(local_cache, (void **)&meta);                //MAX_PREFERRED_PEERS
244
        new = rank_cache(local_cache, chosen, meta);
245
        if (new==NULL) {
246
                fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
247
                return 1;
248
        }
249
        tman_query_peer(new, chosen);
250
        cache_free(new);
251
  }
252

    
253
  return 0;
254
}
255

    
256

    
257

    
258
// limit : at most it doubles the current cache size...
259
int tmanGrowNeighbourhood(int n)
260
{
261
        if (n<=0 || do_resize)
262
                return -1;
263
        n = n>cache_size?cache_size:n;
264
        cache_size += n;
265
        do_resize = 1;
266
        return cache_size;
267
}
268

    
269

    
270
int tmanShrinkNeighbourhood(int n)
271
{
272
        if (n<=0 || n>=cache_size || do_resize)
273
                return -1;
274
        cache_size -= n;
275
        do_resize = 1;
276
        return cache_size;
277
}
278