Statistics
| Branch: | Revision:

grapes / src / PeerSampler / ncast.c @ 10ddaca7

History | View | Annotate | Download (9.58 KB)

1 8ab58ec7 Luca Abeni
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6
7 480921a6 Luca Abeni
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12 badadecf CsabaKiraly
#include <stdbool.h>
13 480921a6 Luca Abeni
#include <string.h>
14 f08e1d8c Csaba Kiraly
#include <limits.h>
15 480921a6 Luca Abeni
16 155319cd Luca Abeni
#include "net_helper.h"
17 a515fd33 Luca
#include "peersampler_iface.h"
18 c7cc13c7 Luca Abeni
#include "../Cache/topocache.h"
19
#include "../Cache/ncast_proto.h"
20
#include "../Cache/proto.h"
21 176b8de8 Luca Baldesi
#include "grapes_config.h"
22 5941d7a1 Csaba Kiraly
#include "grapes_msg_types.h"
23 480921a6 Luca Abeni
24 a1333f46 Luca Abeni
#define DEFAULT_CACHE_SIZE 10
25 18d83f26 Luca Abeni
#define DEFAULT_MAX_TIMESTAMP 5
26 cc8a901b Csaba Kiraly
#define DEFAULT_BOOTSTRAP_CYCLES 5
27
#define DEFAULT_BOOTSTRAP_PERIOD 2*1000*1000
28
#define DEFAULT_PERIOD 10*1000*1000
29 480921a6 Luca Abeni
30 d83a94fa Andrea Zito
struct peersampler_context{
31
  uint64_t currtime;
32
  int cache_size;
33 1583fd78 Csaba Kiraly
  int cache_size_threshold;
34 d83a94fa Andrea Zito
  struct peer_cache *local_cache;
35
  bool bootstrap;
36 56419bdb Csaba Kiraly
  struct nodeID *bootstrap_node;
37 d83a94fa Andrea Zito
  int bootstrap_period;
38 cc8a901b Csaba Kiraly
  int bootstrap_cycles;
39 d83a94fa Andrea Zito
  int period;
40
  int counter;
41
  struct ncast_proto_context *tc;
42 46878222 Luca Abeni
  const struct nodeID **r;
43 f08e1d8c Csaba Kiraly
  int query_tokens;
44 42f6c1eb Csaba Kiraly
  int reply_tokens;
45 a4497a87 Csaba Kiraly
  int first_ts;
46 9d7935af Csaba Kiraly
  int adaptive;
47
  int restart;
48
  int randomize;
49
  int slowstart;
50 d83a94fa Andrea Zito
};
51 496b4f3c Andrea Zito
52 480921a6 Luca Abeni
static uint64_t gettime(void)
53
{
54
  struct timeval tv;
55
56
  gettimeofday(&tv, NULL);
57
58
  return tv.tv_usec + tv.tv_sec * 1000000ull;
59
}
60
61 babdcc03 Luca Abeni
static struct peersampler_context* ncast_context_init(void)
62
{
63 06113eb4 Andrea Zito
  struct peersampler_context* con;
64
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
65
66
  //Initialize context with default values
67 d83a94fa Andrea Zito
  con->bootstrap = true;
68 56419bdb Csaba Kiraly
  con->bootstrap_node = NULL;
69 d83a94fa Andrea Zito
  con->currtime = gettime();
70 46878222 Luca Abeni
  con->r = NULL;
71 06113eb4 Andrea Zito
72
  return con;
73
}
74
75 d83a94fa Andrea Zito
static int time_to_send(struct peersampler_context *context)
76 480921a6 Luca Abeni
{
77 d83a94fa Andrea Zito
  int p = context->bootstrap ? context->bootstrap_period : context->period;
78
  if (gettime() - context->currtime > p) {
79
    context->currtime += p;
80 480921a6 Luca Abeni
81
    return 1;
82
  }
83
84
  return 0;
85
}
86
87 1583fd78 Csaba Kiraly
static void cache_size_threshold_init(struct peersampler_context* context)
88
{
89
  context->cache_size_threshold = (context->cache_size - 1 / 2);
90
}
91
92 480921a6 Luca Abeni
/*
93 a515fd33 Luca
 * Exported Functions!
94 480921a6 Luca Abeni
 */
95 39f5f751 Csaba Kiraly
static struct peersampler_context* init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config, int plus_features)
96 480921a6 Luca Abeni
{
97 a1333f46 Luca Abeni
  struct tag *cfg_tags;
98 d83a94fa Andrea Zito
  struct peersampler_context *context;
99 cae80b1f Luca Abeni
  int max_timestamp;
100 a1333f46 Luca Abeni
101 d83a94fa Andrea Zito
  context = ncast_context_init();
102
  if (!context) return NULL;
103 06113eb4 Andrea Zito
104 176b8de8 Luca Baldesi
  cfg_tags = grapes_config_parse(config);
105 ad6e8e86 Luca Abeni
  grapes_config_value_int_default(cfg_tags, "cache_size", &context->cache_size, DEFAULT_CACHE_SIZE);
106
  grapes_config_value_int_default(cfg_tags, "max_timestamp", &max_timestamp, DEFAULT_MAX_TIMESTAMP);
107
  grapes_config_value_int_default(cfg_tags, "period", &context->period, DEFAULT_PERIOD);
108
  grapes_config_value_int_default(cfg_tags, "bootstrap_period", &context->bootstrap_period, DEFAULT_BOOTSTRAP_PERIOD);
109
  grapes_config_value_int_default(cfg_tags, "bootstrap_cycles", &context->bootstrap_cycles, DEFAULT_BOOTSTRAP_CYCLES);
110
  grapes_config_value_int_default(cfg_tags, "adaptive", &context->adaptive, plus_features);
111
  grapes_config_value_int_default(cfg_tags, "restart", &context->restart, plus_features);
112
  grapes_config_value_int_default(cfg_tags, "randomize", &context->randomize, plus_features);
113
  grapes_config_value_int_default(cfg_tags, "slowstart", &context->slowstart, plus_features);
114 a1333f46 Luca Abeni
  free(cfg_tags);
115 160131b3 Csaba Kiraly
116 d83a94fa Andrea Zito
  context->local_cache = cache_init(context->cache_size, metadata_size, max_timestamp);
117
  if (context->local_cache == NULL) {
118
    free(context);
119 06113eb4 Andrea Zito
    return NULL;
120 480921a6 Luca Abeni
  }
121
122 1583fd78 Csaba Kiraly
  cache_size_threshold_init(context);
123
124 d83a94fa Andrea Zito
  context->tc = ncast_proto_init(myID, metadata, metadata_size);
125
  if (!context->tc){
126
    free(context->local_cache);
127
    free(context);
128
    return NULL;
129
  }
130
131 f08e1d8c Csaba Kiraly
  context->query_tokens = 0;
132 42f6c1eb Csaba Kiraly
  context->reply_tokens = 0;
133 a4497a87 Csaba Kiraly
  context->first_ts = (max_timestamp + 1) / 2;
134
  // increase timestamp for initial message, since that is out of the normal cycle of the bootstrap peer
135
  ncast_proto_myentry_update(context->tc, NULL, context->first_ts, NULL, 0);
136 f08e1d8c Csaba Kiraly
137 d83a94fa Andrea Zito
  return context;
138 480921a6 Luca Abeni
}
139
140 15d5934d Luca Abeni
static int ncast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
141 90cea048 Luca
{
142 d83a94fa Andrea Zito
  if (ncast_proto_metadata_update(context->tc, metadata, metadata_size) <= 0) {
143 90cea048 Luca
    return -1;
144
  }
145
146
  return 1;
147
}
148
149 39f5f751 Csaba Kiraly
static struct peersampler_context* ncast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
150
{
151
    return init(myID, metadata, metadata_size, config, 0);
152
}
153
154
static struct peersampler_context* ncastplus_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
155
{
156
    return init(myID, metadata, metadata_size, config, 1);
157
}
158
159 15d5934d Luca Abeni
static int ncast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
160 480921a6 Luca Abeni
{
161 d83a94fa Andrea Zito
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
162 36b460c1 Csaba Kiraly
    return -1;
163
  }
164 56419bdb Csaba Kiraly
  if (!context->bootstrap_node) {        //save the first added nodeid as bootstrap nodeid
165
    context->bootstrap_node = nodeid_dup(neighbour);
166
  }
167 d83a94fa Andrea Zito
  return ncast_query_peer(context->tc, context->local_cache, neighbour);
168 480921a6 Luca Abeni
}
169
170 06113eb4 Andrea Zito
static int ncast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
171 480921a6 Luca Abeni
{
172 661d190d Luca Abeni
  int dummy;
173
174 480921a6 Luca Abeni
  if (len) {
175 01873987 Luca Abeni
    const struct topo_header *h = (const struct topo_header *)buff;
176 11485577 Luca Abeni
    struct peer_cache *new, *remote_cache;
177 480921a6 Luca Abeni
178 026a7e5d Luca Abeni
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
179 480921a6 Luca Abeni
      fprintf(stderr, "NCAST: Wrong protocol!\n");
180
181
      return -1;
182
    }
183
184 d83a94fa Andrea Zito
    context->counter++;
185 a4497a87 Csaba Kiraly
    if (context->counter == context->bootstrap_cycles) {
186
      context->bootstrap = false;
187
      ncast_proto_myentry_update(context->tc, NULL , - context->first_ts, NULL, 0);  // reset the timestamp of our own ID, we are in normal cycle, we will not disturb the algorithm
188
    }
189 badadecf CsabaKiraly
190 418dafe6 Luca
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
191 480921a6 Luca Abeni
    if (h->type == NCAST_QUERY) {
192 42f6c1eb Csaba Kiraly
      context->reply_tokens--;        //sending a reply to someone who presumably receives it
193 d36aa822 Csaba Kiraly
      cache_randomize(context->local_cache);
194 d83a94fa Andrea Zito
      ncast_reply(context->tc, remote_cache, context->local_cache);
195 f08e1d8c Csaba Kiraly
    } else {
196
     context->query_tokens--;        //a query was successful
197 480921a6 Luca Abeni
    }
198 d36aa822 Csaba Kiraly
    cache_randomize(context->local_cache);
199
    cache_randomize(remote_cache);
200 d83a94fa Andrea Zito
    new = merge_caches(context->local_cache, remote_cache, context->cache_size, &dummy);
201 6ee18244 Luca
    cache_free(remote_cache);
202 480921a6 Luca Abeni
    if (new != NULL) {
203 d83a94fa Andrea Zito
      cache_free(context->local_cache);
204
      context->local_cache = new;
205 480921a6 Luca Abeni
    }
206
  }
207
208 d83a94fa Andrea Zito
  if (time_to_send(context)) {
209 8f446863 Luca Baldesi
    //fprintf(stderr,"[DEBUG] Time to send a TOPO message\n");
210 f08e1d8c Csaba Kiraly
    int ret = INT_MIN;
211
    int i;
212 8afbc263 Csaba Kiraly
    int entries = cache_entries(context->local_cache);
213 f08e1d8c Csaba Kiraly
214 1583fd78 Csaba Kiraly
    if (context->bootstrap_node &&
215 43e8c20d Csaba Kiraly
        (cache_entries(context->local_cache) <= context->cache_size_threshold) &&
216 1583fd78 Csaba Kiraly
        (cache_pos(context->local_cache, context->bootstrap_node) < 0)) {
217
      cache_add(context->local_cache, context->bootstrap_node, NULL, 0);
218 1e924dc9 Csaba Kiraly
    }
219 f08e1d8c Csaba Kiraly
    context->query_tokens++;
220 42f6c1eb Csaba Kiraly
    if (context->reply_tokens++ > 0) {//on average one reply is sent, if not, do something
221
      context->query_tokens += context->reply_tokens;
222
      context->reply_tokens = 0;
223
    }
224 8afbc263 Csaba Kiraly
    if (context->query_tokens > entries) context->query_tokens = entries;        //don't be too aggressive
225 f08e1d8c Csaba Kiraly
226 d83a94fa Andrea Zito
    cache_update(context->local_cache);
227 f08e1d8c Csaba Kiraly
    for (i = 0; i < context->query_tokens; i++) {
228
      int r;
229 480921a6 Luca Abeni
230 f08e1d8c Csaba Kiraly
      r = ncast_query(context->tc, context->local_cache);
231
      r = r > ret ? r : ret;
232
    }
233
  }
234 480921a6 Luca Abeni
  return 0;
235
}
236
237 92358b75 Luca Abeni
static const struct nodeID *const*ncast_get_neighbourhood(struct peersampler_context *context, int *n)
238 480921a6 Luca Abeni
{
239 46878222 Luca Abeni
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
240
  if (context->r == NULL) {
241 3c7f8a0c Luca Abeni
    return NULL;
242
  }
243
244 d83a94fa Andrea Zito
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
245 46878222 Luca Abeni
    context->r[*n] = nodeid(context->local_cache, *n);
246 480921a6 Luca Abeni
    //fprintf(stderr, "Checking table[%d]\n", *n);
247
  }
248 3c7f8a0c Luca Abeni
249 46878222 Luca Abeni
  return context->r;
250 480921a6 Luca Abeni
}
251
252 06113eb4 Andrea Zito
static const void *ncast_get_metadata(struct peersampler_context *context, int *metadata_size)
253 005954ae Luca Abeni
{
254 d83a94fa Andrea Zito
  return get_metadata(context->local_cache, metadata_size);
255 005954ae Luca Abeni
}
256
257 06113eb4 Andrea Zito
static int ncast_grow_neighbourhood(struct peersampler_context *context, int n)
258 480921a6 Luca Abeni
{
259 d83a94fa Andrea Zito
  context->cache_size += n;
260 1583fd78 Csaba Kiraly
  cache_size_threshold_init(context);
261 480921a6 Luca Abeni
262 d83a94fa Andrea Zito
  return context->cache_size;
263 480921a6 Luca Abeni
}
264
265 06113eb4 Andrea Zito
static int ncast_shrink_neighbourhood(struct peersampler_context *context, int n)
266 480921a6 Luca Abeni
{
267 d83a94fa Andrea Zito
  if (context->cache_size < n) {
268 480921a6 Luca Abeni
    return -1;
269
  }
270 d83a94fa Andrea Zito
  context->cache_size -= n;
271 1583fd78 Csaba Kiraly
  cache_size_threshold_init(context);
272 480921a6 Luca Abeni
273 d83a94fa Andrea Zito
  return context->cache_size;
274 480921a6 Luca Abeni
}
275
276 15d5934d Luca Abeni
static int ncast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
277 480921a6 Luca Abeni
{
278 d83a94fa Andrea Zito
  return cache_del(context->local_cache, neighbour);
279 480921a6 Luca Abeni
}
280
281 10ddaca7 Luca Baldesi
void ncast_destroy(struct peersampler_context **context)
282
{
283
        if (context && *context)
284
        {
285
                if((*context)->r)
286
                        free((*context)->r);
287
                if((*context)->local_cache)
288
                        cache_free((*context)->local_cache);
289
                if((*context)->tc)
290
                        ncast_proto_destroy(&((*context)->tc));
291
                if((*context)->bootstrap_node)
292
                        nodeid_free(((*context)->bootstrap_node));
293
                free(*context);
294
                *context = NULL;
295
        }
296
}
297
298 a515fd33 Luca
struct peersampler_iface ncast = {
299
  .init = ncast_init,
300 10ddaca7 Luca Baldesi
  .destroy = ncast_destroy,
301 a515fd33 Luca
  .change_metadata = ncast_change_metadata,
302
  .add_neighbour = ncast_add_neighbour,
303
  .parse_data = ncast_parse_data,
304
  .get_neighbourhood = ncast_get_neighbourhood,
305
  .get_metadata = ncast_get_metadata,
306
  .grow_neighbourhood = ncast_grow_neighbourhood,
307
  .shrink_neighbourhood = ncast_shrink_neighbourhood,
308
  .remove_neighbour = ncast_remove_neighbour,
309
};
310 39f5f751 Csaba Kiraly
311
struct peersampler_iface ncastplus = {
312
  .init = ncastplus_init,
313 10ddaca7 Luca Baldesi
  .destroy = ncast_destroy,
314 39f5f751 Csaba Kiraly
  .change_metadata = ncast_change_metadata,
315
  .add_neighbour = ncast_add_neighbour,
316
  .parse_data = ncast_parse_data,
317
  .get_neighbourhood = ncast_get_neighbourhood,
318
  .get_metadata = ncast_get_metadata,
319
  .grow_neighbourhood = ncast_grow_neighbourhood,
320
  .shrink_neighbourhood = ncast_shrink_neighbourhood,
321
  .remove_neighbour = ncast_remove_neighbour,
322
};