Statistics
| Branch: | Revision:

grapes / src / PeerSampler / cloudcast.c @ 10ddaca7

History | View | Annotate | Download (14.6 KB)

1 36dc16e0 Andrea Zito
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5 c8adcc6c Andrea Zito
 *
6
 * Implementation of the Cloudcast peer sampling protocol.
7
 *
8
 * If thread are used calls to psample_init which result in calls to
9
 * cloudcast_init must be syncronized with calls to cloud_helper_init
10
 * and get_cloud_helper_for.
11 36dc16e0 Andrea Zito
 */
12
13
#include <sys/time.h>
14
#include <time.h>
15
#include <stdlib.h>
16
#include <stdint.h>
17
#include <stdio.h>
18
#include <stdbool.h>
19
#include <string.h>
20 e910cf6c Andrea Zito
#include <assert.h>
21 36dc16e0 Andrea Zito
22 a2f38935 Luca Abeni
#include "cloud_helper.h"
23 36dc16e0 Andrea Zito
#include "net_helper.h"
24
#include "peersampler_iface.h"
25 ee8db4ca Andrea Zito
#include "../Cache/topocache.h"
26
#include "../Cache/cloudcast_proto.h"
27
#include "../Cache/proto.h"
28 176b8de8 Luca Baldesi
#include "grapes_config.h"
29 36dc16e0 Andrea Zito
#include "grapes_msg_types.h"
30
31 e910cf6c Andrea Zito
#define DEFAULT_CACHE_SIZE 20
32
#define DEFAULT_PARTIAL_VIEW_SIZE 5
33 93202b19 Andrea Zito
34
#define CLOUDCAST_TRESHOLD 4
35
#define CLOUDCAST_PERIOD 10000000 // 10 seconds
36
#define CLOUDCAST_BOOTSTRAP_PERIOD 2000000 // 2 seconds
37 36dc16e0 Andrea Zito
38
struct peersampler_context{
39
  uint64_t currtime;
40
  int cache_size;
41
  int sent_entries;
42
  struct peer_cache *local_cache;
43
  bool bootstrap;
44
  int bootstrap_period;
45
  int period;
46
47 e910cf6c Andrea Zito
  uint64_t last_cloud_contact_sec;
48
  int max_silence;
49
  double cloud_respawn_prob;
50 36dc16e0 Andrea Zito
  int cloud_contact_treshold;
51
  struct nodeID *local_node;
52
  struct nodeID **cloud_nodes;
53
54 e910cf6c Andrea Zito
  int reserved_entries;
55 36dc16e0 Andrea Zito
  struct peer_cache *flying_cache;
56
  struct nodeID *dst;
57
58
  struct cloudcast_proto_context *proto_context;
59 672eb08e Andrea Zito
  struct nodeID **r;
60 36dc16e0 Andrea Zito
};
61
62
63 e910cf6c Andrea Zito
/* return the time in microseconds */
64 36dc16e0 Andrea Zito
static uint64_t gettime(void)
65
{
66
  struct timeval tv;
67
68
  gettimeofday(&tv, NULL);
69
70
  return tv.tv_usec + tv.tv_sec * 1000000ull;
71
}
72
73
static struct peersampler_context* cloudcast_context_init(void){
74
  struct peersampler_context* con;
75
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
76 28334070 Andrea Zito
  if (!con) {
77
    fprintf(stderr, "cloudcast: Error! could not create context. ENOMEM\n");
78
    return NULL;
79
  }
80 36dc16e0 Andrea Zito
  memset(con, 0, sizeof(struct peersampler_context));
81
82
  //Initialize context with default values
83
  con->bootstrap = true;
84 93202b19 Andrea Zito
  con->bootstrap_period = CLOUDCAST_BOOTSTRAP_PERIOD;
85
  con->period = CLOUDCAST_PERIOD;
86 36dc16e0 Andrea Zito
  con->currtime = gettime();
87 93202b19 Andrea Zito
  con->cloud_contact_treshold = CLOUDCAST_TRESHOLD;
88 e910cf6c Andrea Zito
  con->last_cloud_contact_sec = 0;
89
  con->max_silence = 0;
90
  con->cloud_respawn_prob = 0;
91 93202b19 Andrea Zito
92 672eb08e Andrea Zito
  con->r = NULL;
93 36dc16e0 Andrea Zito
94
  return con;
95
}
96
97
static int time_to_send(struct peersampler_context* con)
98
{
99 e910cf6c Andrea Zito
  long time;
100 36dc16e0 Andrea Zito
  int p = con->bootstrap ? con->bootstrap_period : con->period;
101
102 e910cf6c Andrea Zito
  time = gettime();
103
  if (time - con->currtime > p) {
104
    if (con->bootstrap) con->currtime = time;
105
    else con->currtime += p;
106 36dc16e0 Andrea Zito
    return 1;
107
  }
108
109
  return 0;
110
}
111
112
/*
113
 * Public Functions!
114
 */
115 ee8db4ca Andrea Zito
static struct peersampler_context* cloudcast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
116 36dc16e0 Andrea Zito
{
117
  struct tag *cfg_tags;
118
  struct peersampler_context *con;
119
  int res;
120
121
  con = cloudcast_context_init();
122
  if (!con) return NULL;
123
  con->local_node = myID;
124
125 176b8de8 Luca Baldesi
  cfg_tags = grapes_config_parse(config);
126
  res = grapes_config_value_int(cfg_tags, "cache_size", &(con->cache_size));
127 36dc16e0 Andrea Zito
  if (!res) {
128
    con->cache_size = DEFAULT_CACHE_SIZE;
129
  }
130 176b8de8 Luca Baldesi
  res = grapes_config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
131 36dc16e0 Andrea Zito
  if (!res) {
132 e910cf6c Andrea Zito
    con->sent_entries = DEFAULT_PARTIAL_VIEW_SIZE;
133 36dc16e0 Andrea Zito
  }
134 176b8de8 Luca Baldesi
  res = grapes_config_value_int(cfg_tags, "max_silence", &(con->max_silence));
135 36dc16e0 Andrea Zito
  if (!res) {
136 e910cf6c Andrea Zito
    con->max_silence = 0;
137 36dc16e0 Andrea Zito
  }
138 e910cf6c Andrea Zito
139 176b8de8 Luca Baldesi
  res = grapes_config_value_double(cfg_tags, "cloud_respawn_prob", &(con->cloud_respawn_prob));
140 36dc16e0 Andrea Zito
  if (!res) {
141 e910cf6c Andrea Zito
    con->max_silence = 0;
142 36dc16e0 Andrea Zito
  }
143
144
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
145
  if (con->local_cache == NULL) {
146 28334070 Andrea Zito
    fprintf(stderr, "cloudcast: Error initializing local cache\n");
147 36dc16e0 Andrea Zito
    free(con);
148
    return NULL;
149
  }
150
151
  con->proto_context = cloudcast_proto_init(myID, metadata, metadata_size);
152
  if (!con->proto_context){
153
    free(con->local_cache);
154
    free(con);
155
    return NULL;
156
  }
157
  con->cloud_nodes = cloudcast_get_cloud_nodes(con->proto_context, 2);
158
159
  return con;
160
}
161
162 ee8db4ca Andrea Zito
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
163 36dc16e0 Andrea Zito
{
164
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
165
    return -1;
166 e910cf6c Andrea Zito
  } return 0;
167
}
168
169
static int cloudcast_query(struct peersampler_context *context,
170
                           struct peer_cache *remote_cache)
171
{
172
  struct peer_cache *sent_cache;
173
174
  /* Fill up reply and send it.
175
     NOTE: we don't know the remote node so we cannot prevent putting it into
176
     the reply. Remote node should check for entries of itself */
177
  sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
178
  cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache,
179
                       context->last_cloud_contact_sec);
180
181
  /* Insert entries in view without using space reserved by the active thread */
182
  cache_fill_rand(context->local_cache, remote_cache,
183
                    context->cache_size - context->reserved_entries);
184
185
186
  cache_free(remote_cache);
187
  cache_free(sent_cache);
188
  return 0;
189
}
190
191
static int cloudcast_reply(struct peersampler_context *context,
192
                           struct peer_cache *remote_cache)
193
{
194
  /* Be sure not to insert local_node in cache!!! */
195
  cache_del(remote_cache, context->local_node);
196
197
  /* Insert remote entries in view */
198
  context->reserved_entries = 0;
199
  cache_fill_rand(context->local_cache, remote_cache, 0);
200
201
  cache_free(remote_cache);
202
  return 0;
203
}
204
205
static int cloudcast_cloud_dialogue(struct peersampler_context *context,
206
                                    struct peer_cache *cloud_cache)
207
{
208
  struct peer_cache * sent_cache = NULL;
209
  uint64_t cloud_tstamp;
210
  uint64_t current_time;
211
  int delta;
212
213
  /* Obtain the timestamp reported by the last query_cloud operation and
214
     compute delta */
215
  cloud_tstamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
216
  current_time = gettime();
217
  delta = current_time - cloud_tstamp;
218
219
  /* Fill up the request with one spot free for local node */
220
  sent_cache = rand_cache_except(context->local_cache, context->sent_entries - 1,
221
                                 context->cloud_nodes, 2);
222
223
224
  if (cloud_cache == NULL) {
225
    /* Cloud is not initialized, just set the cloud cache to send_cache */
226
    cloudcast_reply_cloud(context->proto_context, sent_cache);
227
    cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
228
    return 0;
229
  } else {
230
    struct peer_cache *remote_cache = NULL;
231
    int err = 0;
232
    int g = 0;
233
    int i = 0;
234
235
236
    /* If the cloud contact frequency *IS NOT* too high save an entry for the
237
       cloud  */
238
    if (delta > (context->period / context->cloud_contact_treshold)) {
239
      g++;
240
    }
241
242
    /* If the cloud contact frequency *IS* too low save another entry for the
243
       cloud */
244
    if (delta > (context->cloud_contact_treshold * context->period)) {
245
      g++;
246
    }
247
248
    /* Fill up the reply with g spots free for cloud entries */
249
    remote_cache = rand_cache_except(cloud_cache, context->sent_entries-g,
250
                                     &context->local_node, 1);
251
252
    /* Insert cloud entries in remote cache */
253
    cache_resize(remote_cache,  cache_current_size(remote_cache)+g);
254
    for (i=0; i<g; i++) {
255
      err = cache_add(remote_cache, context->cloud_nodes[i], NULL, 0);
256
      assert(err > 0);
257
    }
258
259
    /* Insert remote entries in local view */
260
    cache_fill_rand(context->local_cache, remote_cache, 0);
261
262
    /* Insert sent entries in cloud view */
263
    cache_del(cloud_cache, context->local_node);
264
    cache_resize(cloud_cache, context->cache_size);
265
    assert(cache_max_size(cloud_cache) == context->cache_size);
266
    cache_fill_rand(cloud_cache, sent_cache, 0);
267
268
    /* Write the new view in the cloud */
269
    cloudcast_reply_cloud(context->proto_context, cloud_cache);
270
271
    cache_free(remote_cache);
272
    cache_free(cloud_cache);
273
    cache_free(sent_cache);
274
    return 0;
275 36dc16e0 Andrea Zito
  }
276 e910cf6c Andrea Zito
}
277
278
static int silence_treshold(struct peersampler_context *context)
279
{
280
  int threshold;
281
  int delta;
282
  if (!context->max_silence) return 0;
283
  if (context->last_cloud_contact_sec == 0) return 0;
284 36dc16e0 Andrea Zito
285 e910cf6c Andrea Zito
  threshold = (context->max_silence * context->period) / 1000000ull;
286
  delta = (gettime() / 1000000ull) - context->last_cloud_contact_sec;
287
288
  return delta > threshold &&
289
    ((double) rand())/RAND_MAX < context->cloud_respawn_prob;
290 36dc16e0 Andrea Zito
}
291
292 e910cf6c Andrea Zito
static int cloudcast_active_thread(struct peersampler_context *context)
293 36dc16e0 Andrea Zito
{
294 e910cf6c Andrea Zito
  int err = 0;
295
296
  /* If space permits, re-insert old entries that have been sent in the previous
297
     cycle */
298
  if (context->flying_cache) {
299
    cache_fill_rand(context->local_cache, context->flying_cache, 0);
300
    cache_free(context->flying_cache);
301
    context->flying_cache = NULL;
302
303
    /* If we didn't received an answer since the last cycle, forget about it */
304
    context->reserved_entries = 0;
305
  }
306
307
  /* Increase age of all nodes in the view */
308
  cache_update(context->local_cache);
309
310
  /* Select oldest node in the view */
311
  context->dst = last_peer(context->local_cache);
312
313
  /* If we remain without neighbors, forcely add a cloud entry */
314
  if (context->dst == NULL) {
315
    context->dst = context->cloud_nodes[0];
316
  }
317
318
  context->dst = nodeid_dup(context->dst);
319
  cache_del(context->local_cache, context->dst);
320
321
  /* If enabled, readd a cloud node when too much time is passed from the
322
     last contact (computed globally) */
323
  if (!cloudcast_is_cloud_node(context->proto_context, context->dst) &&
324
      silence_treshold(context)) {
325
    cache_add(context->local_cache, context->cloud_nodes[0], NULL, 0);
326
  }
327
328
  if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
329
    context->last_cloud_contact_sec = gettime() / 1000000ull;
330
331
    /* Request cloud view */
332
    err = cloudcast_query_cloud(context->proto_context);
333
  } else {
334
    /* Fill up request keeping space for local descriptor */
335
    context->flying_cache = rand_cache(context->local_cache,
336
                                       context->sent_entries - 1);
337
338
    context->reserved_entries = cache_current_size(context->flying_cache);
339
340
    /* Send request to remote peer */
341
    err = cloudcast_query_peer(context->proto_context, context->flying_cache,
342
                               context->dst, context->last_cloud_contact_sec);
343 36dc16e0 Andrea Zito
  }
344
345 e910cf6c Andrea Zito
  return err;
346 36dc16e0 Andrea Zito
}
347
348 e910cf6c Andrea Zito
static int
349
cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff,
350
                     int len)
351 36dc16e0 Andrea Zito
{
352 e910cf6c Andrea Zito
  int err = 0;
353 36dc16e0 Andrea Zito
  cache_check(context->local_cache);
354 e910cf6c Andrea Zito
355
  /* If we got data, perform the appropriate passive thread operation */
356 36dc16e0 Andrea Zito
  if (len) {
357 e910cf6c Andrea Zito
    const struct topo_header *th = NULL;
358
    const struct cloudcast_header *ch = NULL;
359 36dc16e0 Andrea Zito
    struct peer_cache *remote_cache  = NULL;
360 e910cf6c Andrea Zito
    size_t shift = 0;
361
362 36dc16e0 Andrea Zito
363 e910cf6c Andrea Zito
    th = (const struct topo_header *) buff;
364
    shift += sizeof(struct topo_header);
365 36dc16e0 Andrea Zito
366 e910cf6c Andrea Zito
    ch = (const struct cloudcast_header *) (buff + shift);
367
    shift += sizeof(struct cloudcast_header);
368
369
    if (th->protocol != MSG_TYPE_TOPOLOGY) {
370
      fprintf(stderr, "Peer Sampler: Wrong protocol: %d!\n", th->protocol);
371 36dc16e0 Andrea Zito
      return -1;
372
    }
373
374
    context->bootstrap = false;
375
376 e910cf6c Andrea Zito
    /* Update info on global cloud contact time */
377
    if (ch->last_cloud_contact_sec > context->last_cloud_contact_sec) {
378
      context->last_cloud_contact_sec = ch->last_cloud_contact_sec;
379 36dc16e0 Andrea Zito
    }
380
381 e910cf6c Andrea Zito
    if (len - shift > 0)
382
      remote_cache = entries_undump(buff + (shift * sizeof(uint8_t)), len - shift);
383
384
    if (th->type == CLOUDCAST_QUERY) {
385
      /* We're being queried by a remote peer */
386
      err = cloudcast_query(context, remote_cache);
387
    } else if(th->type == CLOUDCAST_REPLY){
388
      /* We've received the response to our query from the remote peer */
389
      err = cloudcast_reply(context, remote_cache);
390
    } else if (th->type == CLOUDCAST_CLOUD) {
391
      /* We've received the response form the cloud. Simulate dialogue */
392
      err = cloudcast_cloud_dialogue(context, remote_cache);
393 36dc16e0 Andrea Zito
    } else {
394 e910cf6c Andrea Zito
      fprintf(stderr, "Cloudcast: Wrong message type: %d\n", th->type);
395
      return -1;
396 36dc16e0 Andrea Zito
    }
397
  }
398
399 e910cf6c Andrea Zito
  /* It it's time, perform the active thread */
400 36dc16e0 Andrea Zito
  if (time_to_send(context)) {
401 e910cf6c Andrea Zito
    err = cloudcast_active_thread(context);
402 36dc16e0 Andrea Zito
  }
403
  cache_check(context->local_cache);
404
405 e910cf6c Andrea Zito
  return err;
406
}
407 36dc16e0 Andrea Zito
408 92358b75 Luca Abeni
static const struct nodeID *const *cloudcast_get_neighbourhood(struct peersampler_context *context, int *n)
409 36dc16e0 Andrea Zito
{
410 672eb08e Andrea Zito
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
411
  if (context->r == NULL) {
412 36dc16e0 Andrea Zito
    return NULL;
413
  }
414
415
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
416 672eb08e Andrea Zito
    context->r[*n] = nodeid(context->local_cache, *n);
417 36dc16e0 Andrea Zito
  }
418
  if (context->flying_cache) {
419
    int i,j,dup;
420
421
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
422
      dup = 0;
423
      for (j = 0; j<*n; j++){
424 672eb08e Andrea Zito
        if (nodeid_equal(context->r[j], nodeid(context->flying_cache, i))){
425 36dc16e0 Andrea Zito
          dup = 1;
426
          continue;
427
        }
428
      }
429
      if (dup) (*n)--;
430 672eb08e Andrea Zito
      else context->r[*n] = nodeid(context->flying_cache, i);
431 36dc16e0 Andrea Zito
    }
432
  }
433
434
  if (context->dst && (*n < context->cache_size)) {
435
    int j,dup;
436
    dup = 0;
437
    for (j = 0; j<*n; j++){
438 672eb08e Andrea Zito
      if (nodeid_equal(context->r[j], context->dst)){
439 36dc16e0 Andrea Zito
        dup = 1;
440
        continue;
441
      }
442
    }
443
    if (!dup){
444 672eb08e Andrea Zito
      context->r[*n] = context->dst;
445 36dc16e0 Andrea Zito
      (*n)++;
446
    }
447
  }
448
449 92358b75 Luca Abeni
  return (const struct nodeID *const *)context->r;
450 36dc16e0 Andrea Zito
}
451
452
static const void *cloudcast_get_metadata(struct peersampler_context *context, int *metadata_size)
453
{
454
  return get_metadata(context->local_cache, metadata_size);
455
}
456
457
static int cloudcast_grow_neighbourhood(struct peersampler_context *context, int n)
458
{
459
  context->cache_size += n;
460
461
  return context->cache_size;
462
}
463
464
static int cloudcast_shrink_neighbourhood(struct peersampler_context *context, int n)
465
{
466
  if (context->cache_size < n) {
467
    return -1;
468
  }
469
  context->cache_size -= n;
470
471
  return context->cache_size;
472
}
473
474 ee8db4ca Andrea Zito
static int cloudcast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
475 36dc16e0 Andrea Zito
{
476
  return cache_del(context->local_cache, neighbour);
477
}
478
479 ee8db4ca Andrea Zito
static int cloudcast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
480 36dc16e0 Andrea Zito
{
481
  return cloudcast_proto_change_metadata(context->proto_context, metadata, metadata_size);
482
}
483
484 10ddaca7 Luca Baldesi
void cloudcast_destroy(struct peersampler_context **context)
485
{
486
        if (context && *context)
487
        {
488
                if((*context)->r)
489
                        free((*context)->r);
490
                if((*context)->local_cache)
491
                        cache_free((*context)->local_cache);
492
                if((*context)->flying_cache)
493
                        cache_free((*context)->flying_cache);
494
                free(*context);
495
                *context = NULL;
496
        }
497
}
498
499 36dc16e0 Andrea Zito
struct peersampler_iface cloudcast = {
500
  .init = cloudcast_init,
501 10ddaca7 Luca Baldesi
  .destroy = cloudcast_destroy,
502 36dc16e0 Andrea Zito
  .change_metadata = cloudcast_change_metadata,
503
  .add_neighbour = cloudcast_add_neighbour,
504
  .parse_data = cloudcast_parse_data,
505
  .get_neighbourhood = cloudcast_get_neighbourhood,
506
  .get_metadata = cloudcast_get_metadata,
507
  .grow_neighbourhood = cloudcast_grow_neighbourhood,
508
  .shrink_neighbourhood = cloudcast_shrink_neighbourhood,
509
  .remove_neighbour = cloudcast_remove_neighbour,
510
};