Statistics
| Branch: | Revision:

grapes / src / TopologyManager / cyclon.c @ 5033613a

History | View | Annotate | Download (6.8 KB)

1 194ec532 Luca Abeni
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "topocache.h"
18
#include "topo_proto.h"
19
#include "cyclon_proto.h"
20
#include "proto.h"
21
#include "config.h"
22
#include "grapes_msg_types.h"
23
24 b501da37 Luca Abeni
#define DEFAULT_CACHE_SIZE 10
25 194ec532 Luca Abeni
26 06113eb4 Andrea Zito
struct peersampler_context{
27 32fa0db9 Andrea Zito
  uint64_t currtime;
28
  int cache_size;
29
  int sent_entries;
30
  struct peer_cache *local_cache;
31
  bool bootstrap;
32
  int bootstrap_period;
33
  int period;
34
  
35
  struct peer_cache *flying_cache;
36
  struct nodeID *dst;
37 d83a94fa Andrea Zito
38 de42b756 Andrea Zito
39
 struct cyclon_proto_context *pc;
40 7474452a Andrea Zito
};
41 32fa0db9 Andrea Zito
42 194ec532 Luca Abeni
43
static uint64_t gettime(void)
44
{
45
  struct timeval tv;
46
47
  gettimeofday(&tv, NULL);
48
49
  return tv.tv_usec + tv.tv_sec * 1000000ull;
50
}
51
52 06113eb4 Andrea Zito
static struct peersampler_context* cyclon_context_init(void){
53
  struct peersampler_context* con;
54
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
55
56
  //Initialize context with default values
57
  con->bootstrap = true;
58
  con->bootstrap_period = 2000000;
59
  con->period = 10000000;
60
  con->currtime = gettime();
61
62
  return con;
63
}
64
65
static int time_to_send(struct peersampler_context* con)
66 194ec532 Luca Abeni
{
67 32fa0db9 Andrea Zito
  int p = con->bootstrap ? con->bootstrap_period : con->period;
68
  if (gettime() - con->currtime > p) {
69
    con->currtime += p;
70 194ec532 Luca Abeni
71
    return 1;
72
  }
73
74
  return 0;
75
}
76
77 db6bff6c Luca Abeni
static void cache_add_cache(struct peer_cache *dst, const struct peer_cache *add)
78
{
79
  int i, meta_size;
80
  const uint8_t *meta;
81
82
  meta = get_metadata(add, &meta_size);
83
  for (i = 0; nodeid(add, i); i++) {
84
    cache_add(dst,  nodeid(add, i), meta + (meta_size * i), meta_size);
85
  }
86
}
87
88
89 194ec532 Luca Abeni
/*
90
 * Public Functions!
91
 */
92 06113eb4 Andrea Zito
static struct peersampler_context* cyclon_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
93 194ec532 Luca Abeni
{
94 b501da37 Luca Abeni
  struct tag *cfg_tags;
95 06113eb4 Andrea Zito
  struct peersampler_context *con;
96 b501da37 Luca Abeni
  int res;
97
98 32fa0db9 Andrea Zito
  con = cyclon_context_init();
99 06113eb4 Andrea Zito
  if (!con) return NULL;
100 32fa0db9 Andrea Zito
101 b501da37 Luca Abeni
  cfg_tags = config_parse(config);
102 32fa0db9 Andrea Zito
  res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
103 b501da37 Luca Abeni
  if (!res) {
104 32fa0db9 Andrea Zito
    con->cache_size = DEFAULT_CACHE_SIZE;
105 b501da37 Luca Abeni
  }
106 32fa0db9 Andrea Zito
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
107 b501da37 Luca Abeni
  if (!res) {
108 32fa0db9 Andrea Zito
    con->sent_entries = con->cache_size / 2;
109 b501da37 Luca Abeni
  }
110
111 32fa0db9 Andrea Zito
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
112
  if (con->local_cache == NULL) {
113 06113eb4 Andrea Zito
    free(con);
114
    return NULL;
115 194ec532 Luca Abeni
  }
116
117 f3ab0d6d Andrea Zito
  con->pc = cyclon_proto_init(myID, metadata, metadata_size);
118
  if (!con->pc){
119
    free(con->local_cache);
120
    free(con);
121
    return NULL;
122 194ec532 Luca Abeni
  }
123
124 f3ab0d6d Andrea Zito
  return con;
125 194ec532 Luca Abeni
}
126
127 06113eb4 Andrea Zito
static int cyclon_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, void *metadata, int metadata_size)
128 194ec532 Luca Abeni
{
129 06113eb4 Andrea Zito
  if (!context->flying_cache) {
130
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
131 194ec532 Luca Abeni
  }
132 06113eb4 Andrea Zito
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
133 194ec532 Luca Abeni
    return -1;
134
  }
135
136 f3ab0d6d Andrea Zito
  return cyclon_query(context->pc, context->flying_cache, neighbour);
137 194ec532 Luca Abeni
}
138
139 06113eb4 Andrea Zito
static int cyclon_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
140 194ec532 Luca Abeni
{
141 06113eb4 Andrea Zito
  cache_check(context->local_cache);
142 194ec532 Luca Abeni
  if (len) {
143
    const struct topo_header *h = (const struct topo_header *)buff;
144 db6bff6c Luca Abeni
    struct peer_cache *remote_cache;
145 194ec532 Luca Abeni
    struct peer_cache *sent_cache = NULL;
146
147
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
148
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
149
150
      return -1;
151
    }
152
153 06113eb4 Andrea Zito
    context->bootstrap = false;
154 194ec532 Luca Abeni
155
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
156
    if (h->type == CYCLON_QUERY) {
157 06113eb4 Andrea Zito
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
158 f3ab0d6d Andrea Zito
      cyclon_reply(context->pc, remote_cache, sent_cache);
159 06113eb4 Andrea Zito
      context->dst = NULL;
160 194ec532 Luca Abeni
    }
161 06113eb4 Andrea Zito
    cache_check(context->local_cache);
162
    cache_add_cache(context->local_cache, remote_cache);
163 db6bff6c Luca Abeni
    cache_free(remote_cache);
164 194ec532 Luca Abeni
    if (sent_cache) {
165 06113eb4 Andrea Zito
      cache_add_cache(context->local_cache, sent_cache);
166 194ec532 Luca Abeni
      cache_free(sent_cache);
167
    } else {
168 06113eb4 Andrea Zito
      if (context->flying_cache) {
169
        cache_add_cache(context->local_cache, context->flying_cache);
170
        cache_free(context->flying_cache);
171
        context->flying_cache = NULL;
172 194ec532 Luca Abeni
      }
173
    }
174
  }
175
176 06113eb4 Andrea Zito
  if (time_to_send(context)) {
177
    if (context->flying_cache) {
178
      cache_add_cache(context->local_cache, context->flying_cache);
179
      cache_free(context->flying_cache);
180
      context->flying_cache = NULL;
181 194ec532 Luca Abeni
    }
182 06113eb4 Andrea Zito
    cache_update(context->local_cache);
183
    context->dst = last_peer(context->local_cache);
184
    if (context->dst == NULL) {
185 194ec532 Luca Abeni
      return 0;
186
    }
187 06113eb4 Andrea Zito
    context->dst = nodeid_dup(context->dst);
188
    cache_del(context->local_cache, context->dst);
189
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
190 f3ab0d6d Andrea Zito
    cyclon_query(context->pc, context->flying_cache, context->dst);
191 194ec532 Luca Abeni
  }
192 06113eb4 Andrea Zito
  cache_check(context->local_cache);
193 194ec532 Luca Abeni
194
  return 0;
195
}
196
197 06113eb4 Andrea Zito
static const struct nodeID **cyclon_get_neighbourhood(struct peersampler_context *context, int *n)
198 194ec532 Luca Abeni
{
199
  static struct nodeID **r;
200
201 06113eb4 Andrea Zito
  r = realloc(r, context->cache_size * sizeof(struct nodeID *));
202 194ec532 Luca Abeni
  if (r == NULL) {
203
    return NULL;
204
  }
205
206 06113eb4 Andrea Zito
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
207
    r[*n] = nodeid(context->local_cache, *n);
208 194ec532 Luca Abeni
    //fprintf(stderr, "Checking table[%d]\n", *n);
209
  }
210 06113eb4 Andrea Zito
  if (context->flying_cache) {
211 194ec532 Luca Abeni
    int i;
212
213 06113eb4 Andrea Zito
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
214
      r[*n] = nodeid(context->flying_cache, i);
215 194ec532 Luca Abeni
    }
216
  }
217 06113eb4 Andrea Zito
  if (context->dst && (*n < context->cache_size)) {
218
    r[*n] = context->dst;
219 194ec532 Luca Abeni
    (*n)++;
220
  }
221
222
  return (const struct nodeID **)r;
223
}
224
225 06113eb4 Andrea Zito
static const void *cyclon_get_metadata(struct peersampler_context *context, int *metadata_size)
226 194ec532 Luca Abeni
{
227 06113eb4 Andrea Zito
  return get_metadata(context->local_cache, metadata_size);
228 194ec532 Luca Abeni
}
229
230 06113eb4 Andrea Zito
static int cyclon_grow_neighbourhood(struct peersampler_context *context, int n)
231 194ec532 Luca Abeni
{
232 06113eb4 Andrea Zito
  context->cache_size += n;
233 194ec532 Luca Abeni
234 06113eb4 Andrea Zito
  return context->cache_size;
235 194ec532 Luca Abeni
}
236
237 06113eb4 Andrea Zito
static int cyclon_shrink_neighbourhood(struct peersampler_context *context, int n)
238 194ec532 Luca Abeni
{
239 06113eb4 Andrea Zito
  if (context->cache_size < n) {
240 194ec532 Luca Abeni
    return -1;
241
  }
242 06113eb4 Andrea Zito
  context->cache_size -= n;
243 194ec532 Luca Abeni
244 06113eb4 Andrea Zito
  return context->cache_size;
245 194ec532 Luca Abeni
}
246
247 06113eb4 Andrea Zito
static int cyclon_remove_neighbour(struct peersampler_context *context, struct nodeID *neighbour)
248 194ec532 Luca Abeni
{
249 06113eb4 Andrea Zito
  return cache_del(context->local_cache, neighbour);
250 194ec532 Luca Abeni
}
251
252 f3ab0d6d Andrea Zito
static int cyclon_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
253
{
254
  return cyclon_proto_change_metadata(context->pc, metadata, metadata_size);
255
}
256
257 194ec532 Luca Abeni
struct peersampler_iface cyclon = {
258
  .init = cyclon_init,
259
  .change_metadata = cyclon_change_metadata,
260
  .add_neighbour = cyclon_add_neighbour,
261
  .parse_data = cyclon_parse_data,
262
  .get_neighbourhood = cyclon_get_neighbourhood,
263
  .get_metadata = cyclon_get_metadata,
264
  .grow_neighbourhood = cyclon_grow_neighbourhood,
265
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
266
  .remove_neighbour = cyclon_remove_neighbour,
267
};