grapes / src / PeerSampler / cloudcast.c @ e910cf6c
History | View | Annotate | Download (14.2 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Andrea Zito
|
3 |
*
|
4 |
* This is free software; see lgpl-2.1.txt
|
5 |
*
|
6 |
* Implementation of the Cloudcast peer sampling protocol.
|
7 |
*
|
8 |
* If thread are used calls to psample_init which result in calls to
|
9 |
* cloudcast_init must be syncronized with calls to cloud_helper_init
|
10 |
* and get_cloud_helper_for.
|
11 |
*/
|
12 |
|
13 |
#include <sys/time.h> |
14 |
#include <time.h> |
15 |
#include <stdlib.h> |
16 |
#include <stdint.h> |
17 |
#include <stdio.h> |
18 |
#include <stdbool.h> |
19 |
#include <string.h> |
20 |
#include <assert.h> |
21 |
|
22 |
#include "net_helper.h" |
23 |
#include "peersampler_iface.h" |
24 |
#include "../Cache/topocache.h" |
25 |
#include "../Cache/cloudcast_proto.h" |
26 |
#include "../Cache/proto.h" |
27 |
#include "config.h" |
28 |
#include "grapes_msg_types.h" |
29 |
|
30 |
#define DEFAULT_CACHE_SIZE 20 |
31 |
#define DEFAULT_PARTIAL_VIEW_SIZE 5 |
32 |
|
33 |
#define CLOUDCAST_TRESHOLD 4 |
34 |
#define CLOUDCAST_PERIOD 10000000 // 10 seconds |
35 |
#define CLOUDCAST_BOOTSTRAP_PERIOD 2000000 // 2 seconds |
36 |
|
37 |
struct peersampler_context{
|
38 |
uint64_t currtime; |
39 |
int cache_size;
|
40 |
int sent_entries;
|
41 |
struct peer_cache *local_cache;
|
42 |
bool bootstrap;
|
43 |
int bootstrap_period;
|
44 |
int period;
|
45 |
|
46 |
uint64_t last_cloud_contact_sec; |
47 |
int max_silence;
|
48 |
double cloud_respawn_prob;
|
49 |
int cloud_contact_treshold;
|
50 |
struct nodeID *local_node;
|
51 |
struct nodeID **cloud_nodes;
|
52 |
|
53 |
int reserved_entries;
|
54 |
struct peer_cache *flying_cache;
|
55 |
struct nodeID *dst;
|
56 |
|
57 |
struct cloudcast_proto_context *proto_context;
|
58 |
struct nodeID **r;
|
59 |
}; |
60 |
|
61 |
|
62 |
/* return the time in microseconds */
|
63 |
static uint64_t gettime(void) |
64 |
{ |
65 |
struct timeval tv;
|
66 |
|
67 |
gettimeofday(&tv, NULL);
|
68 |
|
69 |
return tv.tv_usec + tv.tv_sec * 1000000ull; |
70 |
} |
71 |
|
72 |
static struct peersampler_context* cloudcast_context_init(void){ |
73 |
struct peersampler_context* con;
|
74 |
con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context)); |
75 |
if (!con) {
|
76 |
fprintf(stderr, "cloudcast: Error! could not create context. ENOMEM\n");
|
77 |
return NULL; |
78 |
} |
79 |
memset(con, 0, sizeof(struct peersampler_context)); |
80 |
|
81 |
//Initialize context with default values
|
82 |
con->bootstrap = true;
|
83 |
con->bootstrap_period = CLOUDCAST_BOOTSTRAP_PERIOD; |
84 |
con->period = CLOUDCAST_PERIOD; |
85 |
con->currtime = gettime(); |
86 |
con->cloud_contact_treshold = CLOUDCAST_TRESHOLD; |
87 |
con->last_cloud_contact_sec = 0;
|
88 |
con->max_silence = 0;
|
89 |
con->cloud_respawn_prob = 0;
|
90 |
|
91 |
con->r = NULL;
|
92 |
|
93 |
return con;
|
94 |
} |
95 |
|
96 |
static int time_to_send(struct peersampler_context* con) |
97 |
{ |
98 |
long time;
|
99 |
int p = con->bootstrap ? con->bootstrap_period : con->period;
|
100 |
|
101 |
time = gettime(); |
102 |
if (time - con->currtime > p) {
|
103 |
if (con->bootstrap) con->currtime = time;
|
104 |
else con->currtime += p;
|
105 |
return 1; |
106 |
} |
107 |
|
108 |
return 0; |
109 |
} |
110 |
|
111 |
/*
|
112 |
* Public Functions!
|
113 |
*/
|
114 |
static struct peersampler_context* cloudcast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config) |
115 |
{ |
116 |
struct tag *cfg_tags;
|
117 |
struct peersampler_context *con;
|
118 |
int res;
|
119 |
|
120 |
con = cloudcast_context_init(); |
121 |
if (!con) return NULL; |
122 |
con->local_node = myID; |
123 |
|
124 |
cfg_tags = config_parse(config); |
125 |
res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
|
126 |
if (!res) {
|
127 |
con->cache_size = DEFAULT_CACHE_SIZE; |
128 |
} |
129 |
res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
|
130 |
if (!res) {
|
131 |
con->sent_entries = DEFAULT_PARTIAL_VIEW_SIZE; |
132 |
} |
133 |
res = config_value_int(cfg_tags, "max_silence", &(con->max_silence));
|
134 |
if (!res) {
|
135 |
con->max_silence = 0;
|
136 |
} |
137 |
|
138 |
res = config_value_double(cfg_tags, "cloud_respawn_prob", &(con->cloud_respawn_prob));
|
139 |
if (!res) {
|
140 |
con->max_silence = 0;
|
141 |
} |
142 |
|
143 |
con->local_cache = cache_init(con->cache_size, metadata_size, 0);
|
144 |
if (con->local_cache == NULL) { |
145 |
fprintf(stderr, "cloudcast: Error initializing local cache\n");
|
146 |
free(con); |
147 |
return NULL; |
148 |
} |
149 |
|
150 |
con->proto_context = cloudcast_proto_init(myID, metadata, metadata_size); |
151 |
if (!con->proto_context){
|
152 |
free(con->local_cache); |
153 |
free(con); |
154 |
return NULL; |
155 |
} |
156 |
con->cloud_nodes = cloudcast_get_cloud_nodes(con->proto_context, 2);
|
157 |
|
158 |
return con;
|
159 |
} |
160 |
|
161 |
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size) |
162 |
{ |
163 |
if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) { |
164 |
return -1; |
165 |
} return 0; |
166 |
} |
167 |
|
168 |
static int cloudcast_query(struct peersampler_context *context, |
169 |
struct peer_cache *remote_cache)
|
170 |
{ |
171 |
struct peer_cache *sent_cache;
|
172 |
|
173 |
/* Fill up reply and send it.
|
174 |
NOTE: we don't know the remote node so we cannot prevent putting it into
|
175 |
the reply. Remote node should check for entries of itself */
|
176 |
sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
|
177 |
cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache, |
178 |
context->last_cloud_contact_sec); |
179 |
|
180 |
/* Insert entries in view without using space reserved by the active thread */
|
181 |
cache_fill_rand(context->local_cache, remote_cache, |
182 |
context->cache_size - context->reserved_entries); |
183 |
|
184 |
|
185 |
cache_free(remote_cache); |
186 |
cache_free(sent_cache); |
187 |
return 0; |
188 |
} |
189 |
|
190 |
static int cloudcast_reply(struct peersampler_context *context, |
191 |
struct peer_cache *remote_cache)
|
192 |
{ |
193 |
/* Be sure not to insert local_node in cache!!! */
|
194 |
cache_del(remote_cache, context->local_node); |
195 |
|
196 |
/* Insert remote entries in view */
|
197 |
context->reserved_entries = 0;
|
198 |
cache_fill_rand(context->local_cache, remote_cache, 0);
|
199 |
|
200 |
cache_free(remote_cache); |
201 |
return 0; |
202 |
} |
203 |
|
204 |
static int cloudcast_cloud_dialogue(struct peersampler_context *context, |
205 |
struct peer_cache *cloud_cache)
|
206 |
{ |
207 |
struct peer_cache * sent_cache = NULL; |
208 |
uint64_t cloud_tstamp; |
209 |
uint64_t current_time; |
210 |
int delta;
|
211 |
|
212 |
/* Obtain the timestamp reported by the last query_cloud operation and
|
213 |
compute delta */
|
214 |
cloud_tstamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
|
215 |
current_time = gettime(); |
216 |
delta = current_time - cloud_tstamp; |
217 |
|
218 |
/* Fill up the request with one spot free for local node */
|
219 |
sent_cache = rand_cache_except(context->local_cache, context->sent_entries - 1,
|
220 |
context->cloud_nodes, 2);
|
221 |
|
222 |
|
223 |
if (cloud_cache == NULL) { |
224 |
/* Cloud is not initialized, just set the cloud cache to send_cache */
|
225 |
cloudcast_reply_cloud(context->proto_context, sent_cache); |
226 |
cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
|
227 |
return 0; |
228 |
} else {
|
229 |
struct peer_cache *remote_cache = NULL; |
230 |
int err = 0; |
231 |
int g = 0; |
232 |
int i = 0; |
233 |
|
234 |
|
235 |
/* If the cloud contact frequency *IS NOT* too high save an entry for the
|
236 |
cloud */
|
237 |
if (delta > (context->period / context->cloud_contact_treshold)) {
|
238 |
g++; |
239 |
} |
240 |
|
241 |
/* If the cloud contact frequency *IS* too low save another entry for the
|
242 |
cloud */
|
243 |
if (delta > (context->cloud_contact_treshold * context->period)) {
|
244 |
g++; |
245 |
} |
246 |
|
247 |
/* Fill up the reply with g spots free for cloud entries */
|
248 |
remote_cache = rand_cache_except(cloud_cache, context->sent_entries-g, |
249 |
&context->local_node, 1);
|
250 |
|
251 |
/* Insert cloud entries in remote cache */
|
252 |
cache_resize(remote_cache, cache_current_size(remote_cache)+g); |
253 |
for (i=0; i<g; i++) { |
254 |
err = cache_add(remote_cache, context->cloud_nodes[i], NULL, 0); |
255 |
assert(err > 0);
|
256 |
} |
257 |
|
258 |
/* Insert remote entries in local view */
|
259 |
cache_fill_rand(context->local_cache, remote_cache, 0);
|
260 |
|
261 |
/* Insert sent entries in cloud view */
|
262 |
cache_del(cloud_cache, context->local_node); |
263 |
cache_resize(cloud_cache, context->cache_size); |
264 |
assert(cache_max_size(cloud_cache) == context->cache_size); |
265 |
cache_fill_rand(cloud_cache, sent_cache, 0);
|
266 |
|
267 |
/* Write the new view in the cloud */
|
268 |
cloudcast_reply_cloud(context->proto_context, cloud_cache); |
269 |
|
270 |
cache_free(remote_cache); |
271 |
cache_free(cloud_cache); |
272 |
cache_free(sent_cache); |
273 |
return 0; |
274 |
} |
275 |
} |
276 |
|
277 |
static int silence_treshold(struct peersampler_context *context) |
278 |
{ |
279 |
int threshold;
|
280 |
int delta;
|
281 |
if (!context->max_silence) return 0; |
282 |
if (context->last_cloud_contact_sec == 0) return 0; |
283 |
|
284 |
threshold = (context->max_silence * context->period) / 1000000ull;
|
285 |
delta = (gettime() / 1000000ull) - context->last_cloud_contact_sec;
|
286 |
|
287 |
return delta > threshold &&
|
288 |
((double) rand())/RAND_MAX < context->cloud_respawn_prob;
|
289 |
} |
290 |
|
291 |
static int cloudcast_active_thread(struct peersampler_context *context) |
292 |
{ |
293 |
int err = 0; |
294 |
|
295 |
/* If space permits, re-insert old entries that have been sent in the previous
|
296 |
cycle */
|
297 |
if (context->flying_cache) {
|
298 |
cache_fill_rand(context->local_cache, context->flying_cache, 0);
|
299 |
cache_free(context->flying_cache); |
300 |
context->flying_cache = NULL;
|
301 |
|
302 |
/* If we didn't received an answer since the last cycle, forget about it */
|
303 |
context->reserved_entries = 0;
|
304 |
} |
305 |
|
306 |
/* Increase age of all nodes in the view */
|
307 |
cache_update(context->local_cache); |
308 |
|
309 |
/* Select oldest node in the view */
|
310 |
context->dst = last_peer(context->local_cache); |
311 |
|
312 |
/* If we remain without neighbors, forcely add a cloud entry */
|
313 |
if (context->dst == NULL) { |
314 |
context->dst = context->cloud_nodes[0];
|
315 |
} |
316 |
|
317 |
context->dst = nodeid_dup(context->dst); |
318 |
cache_del(context->local_cache, context->dst); |
319 |
|
320 |
/* If enabled, readd a cloud node when too much time is passed from the
|
321 |
last contact (computed globally) */
|
322 |
if (!cloudcast_is_cloud_node(context->proto_context, context->dst) &&
|
323 |
silence_treshold(context)) { |
324 |
cache_add(context->local_cache, context->cloud_nodes[0], NULL, 0); |
325 |
} |
326 |
|
327 |
if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
|
328 |
context->last_cloud_contact_sec = gettime() / 1000000ull;
|
329 |
|
330 |
/* Request cloud view */
|
331 |
err = cloudcast_query_cloud(context->proto_context); |
332 |
} else {
|
333 |
/* Fill up request keeping space for local descriptor */
|
334 |
context->flying_cache = rand_cache(context->local_cache, |
335 |
context->sent_entries - 1);
|
336 |
|
337 |
context->reserved_entries = cache_current_size(context->flying_cache); |
338 |
|
339 |
/* Send request to remote peer */
|
340 |
err = cloudcast_query_peer(context->proto_context, context->flying_cache, |
341 |
context->dst, context->last_cloud_contact_sec); |
342 |
} |
343 |
|
344 |
return err;
|
345 |
} |
346 |
|
347 |
static int |
348 |
cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff, |
349 |
int len)
|
350 |
{ |
351 |
int err = 0; |
352 |
cache_check(context->local_cache); |
353 |
|
354 |
/* If we got data, perform the appropriate passive thread operation */
|
355 |
if (len) {
|
356 |
const struct topo_header *th = NULL; |
357 |
const struct cloudcast_header *ch = NULL; |
358 |
struct peer_cache *remote_cache = NULL; |
359 |
size_t shift = 0;
|
360 |
|
361 |
|
362 |
th = (const struct topo_header *) buff; |
363 |
shift += sizeof(struct topo_header); |
364 |
|
365 |
ch = (const struct cloudcast_header *) (buff + shift); |
366 |
shift += sizeof(struct cloudcast_header); |
367 |
|
368 |
if (th->protocol != MSG_TYPE_TOPOLOGY) {
|
369 |
fprintf(stderr, "Peer Sampler: Wrong protocol: %d!\n", th->protocol);
|
370 |
return -1; |
371 |
} |
372 |
|
373 |
context->bootstrap = false;
|
374 |
|
375 |
/* Update info on global cloud contact time */
|
376 |
if (ch->last_cloud_contact_sec > context->last_cloud_contact_sec) {
|
377 |
context->last_cloud_contact_sec = ch->last_cloud_contact_sec; |
378 |
} |
379 |
|
380 |
if (len - shift > 0) |
381 |
remote_cache = entries_undump(buff + (shift * sizeof(uint8_t)), len - shift);
|
382 |
|
383 |
if (th->type == CLOUDCAST_QUERY) {
|
384 |
/* We're being queried by a remote peer */
|
385 |
err = cloudcast_query(context, remote_cache); |
386 |
} else if(th->type == CLOUDCAST_REPLY){ |
387 |
/* We've received the response to our query from the remote peer */
|
388 |
err = cloudcast_reply(context, remote_cache); |
389 |
} else if (th->type == CLOUDCAST_CLOUD) { |
390 |
/* We've received the response form the cloud. Simulate dialogue */
|
391 |
err = cloudcast_cloud_dialogue(context, remote_cache); |
392 |
} else {
|
393 |
fprintf(stderr, "Cloudcast: Wrong message type: %d\n", th->type);
|
394 |
return -1; |
395 |
} |
396 |
} |
397 |
|
398 |
/* It it's time, perform the active thread */
|
399 |
if (time_to_send(context)) {
|
400 |
err = cloudcast_active_thread(context); |
401 |
} |
402 |
cache_check(context->local_cache); |
403 |
|
404 |
return err;
|
405 |
} |
406 |
|
407 |
static const struct nodeID **cloudcast_get_neighbourhood(struct peersampler_context *context, int *n) |
408 |
{ |
409 |
context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *)); |
410 |
if (context->r == NULL) { |
411 |
return NULL; |
412 |
} |
413 |
|
414 |
for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) { |
415 |
context->r[*n] = nodeid(context->local_cache, *n); |
416 |
} |
417 |
if (context->flying_cache) {
|
418 |
int i,j,dup;
|
419 |
|
420 |
for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) { |
421 |
dup = 0;
|
422 |
for (j = 0; j<*n; j++){ |
423 |
if (nodeid_equal(context->r[j], nodeid(context->flying_cache, i))){
|
424 |
dup = 1;
|
425 |
continue;
|
426 |
} |
427 |
} |
428 |
if (dup) (*n)--;
|
429 |
else context->r[*n] = nodeid(context->flying_cache, i);
|
430 |
} |
431 |
} |
432 |
|
433 |
if (context->dst && (*n < context->cache_size)) {
|
434 |
int j,dup;
|
435 |
dup = 0;
|
436 |
for (j = 0; j<*n; j++){ |
437 |
if (nodeid_equal(context->r[j], context->dst)){
|
438 |
dup = 1;
|
439 |
continue;
|
440 |
} |
441 |
} |
442 |
if (!dup){
|
443 |
context->r[*n] = context->dst; |
444 |
(*n)++; |
445 |
} |
446 |
} |
447 |
|
448 |
return (const struct nodeID **)context->r; |
449 |
} |
450 |
|
451 |
static const void *cloudcast_get_metadata(struct peersampler_context *context, int *metadata_size) |
452 |
{ |
453 |
return get_metadata(context->local_cache, metadata_size);
|
454 |
} |
455 |
|
456 |
static int cloudcast_grow_neighbourhood(struct peersampler_context *context, int n) |
457 |
{ |
458 |
context->cache_size += n; |
459 |
|
460 |
return context->cache_size;
|
461 |
} |
462 |
|
463 |
static int cloudcast_shrink_neighbourhood(struct peersampler_context *context, int n) |
464 |
{ |
465 |
if (context->cache_size < n) {
|
466 |
return -1; |
467 |
} |
468 |
context->cache_size -= n; |
469 |
|
470 |
return context->cache_size;
|
471 |
} |
472 |
|
473 |
static int cloudcast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour) |
474 |
{ |
475 |
return cache_del(context->local_cache, neighbour);
|
476 |
} |
477 |
|
478 |
static int cloudcast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size) |
479 |
{ |
480 |
return cloudcast_proto_change_metadata(context->proto_context, metadata, metadata_size);
|
481 |
} |
482 |
|
483 |
struct peersampler_iface cloudcast = {
|
484 |
.init = cloudcast_init, |
485 |
.change_metadata = cloudcast_change_metadata, |
486 |
.add_neighbour = cloudcast_add_neighbour, |
487 |
.parse_data = cloudcast_parse_data, |
488 |
.get_neighbourhood = cloudcast_get_neighbourhood, |
489 |
.get_metadata = cloudcast_get_metadata, |
490 |
.grow_neighbourhood = cloudcast_grow_neighbourhood, |
491 |
.shrink_neighbourhood = cloudcast_shrink_neighbourhood, |
492 |
.remove_neighbour = cloudcast_remove_neighbour, |
493 |
}; |