Statistics
| Branch: | Revision:

grapes / som / TopologyManager / tman.c @ df98cd3a

History | View | Annotate | Download (7.09 KB)

1
/*
2
 *  Copyright (c) 2010 Marco Biazzini
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <string.h>
13

    
14
#include "net_helper.h"
15
#include "topmanager.h"
16
#include "topocache.h"
17
#include "topo_proto.h"
18
#include "proto.h"
19
#include "msg_types.h"
20

    
21
#define TMAN_MAX_PEERS 5 // max # of neighbors in local cache (should be > than the next)
22
#define TMAN_MAX_PREFERRED_PEERS 5 // # of peers to choose a receiver among (should be < than the previous)
23
#define TMAN_MAX_GOSSIPING_PEERS 5 // # size of the view to be sent to receiver peer (should be <= than the previous)
24
#define TMAN_IDLE_TIME 10 // # of iterations to wait before switching to inactive state
25
#define TMAN_STD_PERIOD 1000000
26

    
27
static const int TMAN_FLAGS = 2;
28
static const int TMAN_BLACK = 0;
29
static const int TMAN_STICKY = 1;
30

    
31
//static const int MAX_PEERS = TMAN_MAX_PEERS;
32
static  int MAX_PREFERRED_PEERS = TMAN_MAX_PREFERRED_PEERS;
33
static  int MAX_GOSSIPING_PEERS = TMAN_MAX_GOSSIPING_PEERS;
34
static  int IDLE_TIME = TMAN_IDLE_TIME;
35

    
36
static uint64_t currtime;
37
static int cache_size = TMAN_MAX_PEERS;
38
static struct peer_cache *local_cache = NULL;
39
static int period = TMAN_STD_PERIOD;
40
static int active = 0;
41
static ranking_function rfun;
42
static void *mymeta;
43

    
44
static struct peer_cache *rank_cache (const struct peer_cache *c, const struct nodeID *target, const void *target_meta)
45
{
46
        struct peer_cache *res;
47
        int i,msize,max = MAX_GOSSIPING_PEERS;
48
        const uint8_t *mdata;
49

    
50
        mdata = get_metadata(c,&msize);
51
        res = cache_init(cache_size,msize);
52
        if (res == NULL) {
53
          return res;
54
        }
55

    
56
        for (i=0;nodeid(c,i) && i<max;i++) {
57
                                if (!nodeid_equal(nodeid(c,i),target))
58
                                        cache_add_ranked(res,nodeid(c,i),mdata+i*msize,msize, rfun, target_meta);
59
                                else
60
                                        max++;
61
        }
62

    
63
        return res;
64
}
65

    
66
static uint64_t gettime(void)
67
{
68
  struct timeval tv;
69

    
70
  gettimeofday(&tv, NULL);
71

    
72
  return tv.tv_usec + tv.tv_sec * 1000000ull;
73
}
74

    
75
int tmanInit(struct nodeID *myID, void *metadata, int metadata_size, ranking_function f) //, int max_peers, int max_idle, int periodicity
76
{
77
  rfun = f;
78
  topo_proto_init(myID, metadata, metadata_size);
79
  mymeta = metadata;
80
//  if (max_peers) {
81
//          cache_size = max_peers;
82
//  }
83
//  else
84
          cache_size = TMAN_MAX_PEERS;
85
  local_cache = cache_init(cache_size, metadata_size);
86
  if (local_cache == NULL) {
87
    return -1;
88
  }
89
//  if (max_idle)
90
//          IDLE_TIME = max_idle;
91
//  else
92
          IDLE_TIME = TMAN_IDLE_TIME;
93
//  if (periodicity>=1000000)
94
//          period = periodicity;
95
//  else
96
          period = TMAN_STD_PERIOD;
97

    
98
  currtime = gettime();
99

    
100
  return 0;
101
}
102

    
103

    
104
int tmanStart(struct nodeID **peers, int size, const void *metadata, int metadata_size,
105
        int best_peers, int gossip_peers)
106
{
107
        int j,res=0;
108

    
109
        if (best_peers)
110
                MAX_PREFERRED_PEERS = best_peers;
111
        else MAX_PREFERRED_PEERS = TMAN_MAX_PREFERRED_PEERS;
112
        if (gossip_peers)
113
                MAX_GOSSIPING_PEERS = gossip_peers;
114
        else
115
                MAX_GOSSIPING_PEERS = TMAN_MAX_GOSSIPING_PEERS;
116
        // TODO: empty tabu list, if any...
117
        for (j=0;j<size && res!=-3;j++) // TODO: conditions to keep some peers if app needs...
118
                res = cache_add_ranked(local_cache,peers[j],(const uint8_t *)metadata+j*metadata_size,metadata_size,rfun, mymeta);
119
        if (res == -3)
120
                return -1;
121
        active = TMAN_IDLE_TIME;
122
        return 0;
123
}
124

    
125
static int time_to_send()
126
{
127
        if (gettime() - currtime > period) {
128
                currtime += period;
129
                if (active > 0)
130
                        return 1;
131
        }
132

    
133
  return 0;
134
}
135

    
136

    
137
int tmanChangeMetadata(struct nodeID *peer, void *metadata, int metadata_size)
138
{
139
  if (topo_proto_metadata_update(peer, metadata, metadata_size) <= 0) {
140
    return -1;
141
  }
142
  mymeta = metadata;
143

    
144
  return 1;
145
}
146

    
147
int tmanAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
148
{
149
  // TODO: change this for flagging...
150
        if (cache_add_ranked(local_cache, neighbour, metadata, metadata_size, rfun, mymeta) < 0) {
151
    return -1;
152
  }
153
  return 1;//topo_query_peer(local_cache, neighbour, TMAN_QUERY);
154
}
155

    
156
int tmanParseData(const uint8_t *buff, int len)
157
{
158
        int msize,s;
159
        const uint8_t *mdata;
160
        struct peer_cache *new;
161

    
162
        new = NULL;
163
          // TODO: change this for flagging...
164
        if (local_cache == NULL) return 1;
165
        if (len) {
166
                const struct topo_header *h = (const struct topo_header *)buff;
167
                struct peer_cache *remote_cache; int idx;
168
                int from;
169

    
170
            if (h->protocol != MSG_TYPE_TOPOLOGY) {
171
              fprintf(stderr, "TMAN: Wrong protocol!\n");
172
              return -1;
173
            }
174

    
175
            if (h->type != TMAN_QUERY && h->type != TMAN_REPLY) {
176
              return -1;
177
            }
178
                remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
179
                mdata = get_metadata(remote_cache,&msize);
180
                get_metadata(local_cache,&s);
181

    
182
                if (msize != s) {
183
                        fprintf(stderr, "TMAN: Metadata size mismatch! -> local (%d) != received (%d)\n",
184
                                s, msize);
185
                        return 1;
186
                }
187

    
188
                if (h->type == TMAN_QUERY) {
189
//                        fprintf(stderr, "\tTman: Parsing received msg to reply...\n");
190
                        new = rank_cache(local_cache, nodeid(remote_cache, 0), get_metadata(remote_cache, &msize));
191
                        if (new) {
192
                                tman_reply(remote_cache, new);
193
                                cache_free(new);
194
                                // TODO: put sender in tabu list (check list size, etc.), if any...
195
                        }
196
                }
197
                idx = cache_add_ranked(local_cache, nodeid(remote_cache,0), mdata, msize,rfun, mymeta);
198
//                fprintf(stderr, "\tidx = %d\n",idx);
199
                new = merge_caches_ranked(local_cache, remote_cache, cache_size, &from, rfun, mymeta);
200

    
201
//                  newsize = new_cache->current_size>c1->cache_size?new_cache->current_size:c1->cache_size;
202
//                  newsize = cache_shrink(new_cache,(new_cache->cache_size-newsize));
203
//                  // TODO: check newsize??
204
//                  return new_cache;
205

    
206
                cache_free(remote_cache);
207
                if (new!=NULL) {
208
                  cache_free(local_cache);
209
                  local_cache = new;
210
                }
211
        
212
                if (from == 1) {
213
                             if (active > 0) active = TMAN_IDLE_TIME;
214
                }        else if (active > 0){
215
                             active--;
216
                }
217
  }
218

    
219
  if (time_to_send()) {
220
        void *meta;
221
        struct nodeID *chosen;
222

    
223
        cache_update(local_cache);
224
        mdata = get_metadata(local_cache,&msize);
225
        chosen = rand_peer(local_cache, &meta);                //MAX_PREFERRED_PEERS
226
                 // TODO: check for chosen peer against tabu list, if any...
227
                new = rank_cache(local_cache, chosen, meta);
228
                if (new==NULL) {
229
                        fprintf(stderr, "TMAN: No cache could be sent to remote peer!\n");
230
                        return 1;
231
        }
232
        if (new) {
233
                tman_query_peer(new, chosen);
234
                cache_free(new);
235
        }
236
  }
237

    
238
  return 0;
239
}
240

    
241
const struct nodeID **tmanGetNeighbourhood(int *n)
242
{
243
  static struct nodeID *r[2000 /*FIXME!*/];
244
  for (*n = 0; nodeid(local_cache, *n); (*n)++) {
245
    r[*n] = nodeid(local_cache, *n);
246
    //fprintf(stderr, "Checking table[%d]\n", *n);
247
  }
248
  fprintf(stderr, "\tTman : active = %d\n",active);
249
  return (const struct nodeID **)r;
250
}
251

    
252
// not self metadata, but neighbors'.
253
const void *tmanGetMetadata(int *metadata_size)
254
{
255
  return get_metadata(local_cache, metadata_size);
256
}
257

    
258
//assuming peers are always ordered
259
int tmanGivePeers (int n, struct nodeID **peers, uint8_t *metadata) {
260

    
261
//        void *m[n];
262
        int msize;
263
        const uint8_t *mdata = get_metadata(local_cache,&msize);
264
        int i;
265
        for (i=0; nodeid(local_cache, i) && (i < n); i++)
266
                        peers[i] = nodeid(local_cache,i);
267
                        if (msize)
268
                                memcpy(metadata+i*msize,mdata+i*msize,msize);
269
        return i;
270
}