Statistics
| Branch: | Revision:

grapes / src / PeerSampler / cloudcast.c @ 176b8de8

History | View | Annotate | Download (14.3 KB)

1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 * Implementation of the Cloudcast peer sampling protocol.
7
 *
8
 * If thread are used calls to psample_init which result in calls to
9
 * cloudcast_init must be syncronized with calls to cloud_helper_init
10
 * and get_cloud_helper_for.
11
 */
12

    
13
#include <sys/time.h>
14
#include <time.h>
15
#include <stdlib.h>
16
#include <stdint.h>
17
#include <stdio.h>
18
#include <stdbool.h>
19
#include <string.h>
20
#include <assert.h>
21

    
22
#include "net_helper.h"
23
#include "peersampler_iface.h"
24
#include "../Cache/topocache.h"
25
#include "../Cache/cloudcast_proto.h"
26
#include "../Cache/proto.h"
27
#include "grapes_config.h"
28
#include "grapes_msg_types.h"
29

    
30
#define DEFAULT_CACHE_SIZE 20
31
#define DEFAULT_PARTIAL_VIEW_SIZE 5
32

    
33
#define CLOUDCAST_TRESHOLD 4
34
#define CLOUDCAST_PERIOD 10000000 // 10 seconds
35
#define CLOUDCAST_BOOTSTRAP_PERIOD 2000000 // 2 seconds
36

    
37
struct peersampler_context{
38
  uint64_t currtime;
39
  int cache_size;
40
  int sent_entries;
41
  struct peer_cache *local_cache;
42
  bool bootstrap;
43
  int bootstrap_period;
44
  int period;
45

    
46
  uint64_t last_cloud_contact_sec;
47
  int max_silence;
48
  double cloud_respawn_prob;
49
  int cloud_contact_treshold;
50
  struct nodeID *local_node;
51
  struct nodeID **cloud_nodes;
52

    
53
  int reserved_entries;
54
  struct peer_cache *flying_cache;
55
  struct nodeID *dst;
56

    
57
  struct cloudcast_proto_context *proto_context;
58
  struct nodeID **r;
59
};
60

    
61

    
62
/* return the time in microseconds */
63
static uint64_t gettime(void)
64
{
65
  struct timeval tv;
66

    
67
  gettimeofday(&tv, NULL);
68

    
69
  return tv.tv_usec + tv.tv_sec * 1000000ull;
70
}
71

    
72
static struct peersampler_context* cloudcast_context_init(void){
73
  struct peersampler_context* con;
74
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
75
  if (!con) {
76
    fprintf(stderr, "cloudcast: Error! could not create context. ENOMEM\n");
77
    return NULL;
78
  }
79
  memset(con, 0, sizeof(struct peersampler_context));
80

    
81
  //Initialize context with default values
82
  con->bootstrap = true;
83
  con->bootstrap_period = CLOUDCAST_BOOTSTRAP_PERIOD;
84
  con->period = CLOUDCAST_PERIOD;
85
  con->currtime = gettime();
86
  con->cloud_contact_treshold = CLOUDCAST_TRESHOLD;
87
  con->last_cloud_contact_sec = 0;
88
  con->max_silence = 0;
89
  con->cloud_respawn_prob = 0;
90

    
91
  con->r = NULL;
92

    
93
  return con;
94
}
95

    
96
static int time_to_send(struct peersampler_context* con)
97
{
98
  long time;
99
  int p = con->bootstrap ? con->bootstrap_period : con->period;
100

    
101
  time = gettime();
102
  if (time - con->currtime > p) {
103
    if (con->bootstrap) con->currtime = time;
104
    else con->currtime += p;
105
    return 1;
106
  }
107

    
108
  return 0;
109
}
110

    
111
/*
112
 * Public Functions!
113
 */
114
static struct peersampler_context* cloudcast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
115
{
116
  struct tag *cfg_tags;
117
  struct peersampler_context *con;
118
  int res;
119

    
120
  con = cloudcast_context_init();
121
  if (!con) return NULL;
122
  con->local_node = myID;
123

    
124
  cfg_tags = grapes_config_parse(config);
125
  res = grapes_config_value_int(cfg_tags, "cache_size", &(con->cache_size));
126
  if (!res) {
127
    con->cache_size = DEFAULT_CACHE_SIZE;
128
  }
129
  res = grapes_config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
130
  if (!res) {
131
    con->sent_entries = DEFAULT_PARTIAL_VIEW_SIZE;
132
  }
133
  res = grapes_config_value_int(cfg_tags, "max_silence", &(con->max_silence));
134
  if (!res) {
135
    con->max_silence = 0;
136
  }
137

    
138
  res = grapes_config_value_double(cfg_tags, "cloud_respawn_prob", &(con->cloud_respawn_prob));
139
  if (!res) {
140
    con->max_silence = 0;
141
  }
142

    
143
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
144
  if (con->local_cache == NULL) {
145
    fprintf(stderr, "cloudcast: Error initializing local cache\n");
146
    free(con);
147
    return NULL;
148
  }
149

    
150
  con->proto_context = cloudcast_proto_init(myID, metadata, metadata_size);
151
  if (!con->proto_context){
152
    free(con->local_cache);
153
    free(con);
154
    return NULL;
155
  }
156
  con->cloud_nodes = cloudcast_get_cloud_nodes(con->proto_context, 2);
157

    
158
  return con;
159
}
160

    
161
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
162
{
163
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
164
    return -1;
165
  } return 0;
166
}
167

    
168
static int cloudcast_query(struct peersampler_context *context,
169
                           struct peer_cache *remote_cache)
170
{
171
  struct peer_cache *sent_cache;
172

    
173
  /* Fill up reply and send it.
174
     NOTE: we don't know the remote node so we cannot prevent putting it into
175
     the reply. Remote node should check for entries of itself */
176
  sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
177
  cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache,
178
                       context->last_cloud_contact_sec);
179

    
180
  /* Insert entries in view without using space reserved by the active thread */
181
  cache_fill_rand(context->local_cache, remote_cache,
182
                    context->cache_size - context->reserved_entries);
183

    
184

    
185
  cache_free(remote_cache);
186
  cache_free(sent_cache);
187
  return 0;
188
}
189

    
190
static int cloudcast_reply(struct peersampler_context *context,
191
                           struct peer_cache *remote_cache)
192
{
193
  /* Be sure not to insert local_node in cache!!! */
194
  cache_del(remote_cache, context->local_node);
195

    
196
  /* Insert remote entries in view */
197
  context->reserved_entries = 0;
198
  cache_fill_rand(context->local_cache, remote_cache, 0);
199

    
200
  cache_free(remote_cache);
201
  return 0;
202
}
203

    
204
static int cloudcast_cloud_dialogue(struct peersampler_context *context,
205
                                    struct peer_cache *cloud_cache)
206
{
207
  struct peer_cache * sent_cache = NULL;
208
  uint64_t cloud_tstamp;
209
  uint64_t current_time;
210
  int delta;
211

    
212
  /* Obtain the timestamp reported by the last query_cloud operation and
213
     compute delta */
214
  cloud_tstamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
215
  current_time = gettime();
216
  delta = current_time - cloud_tstamp;
217

    
218
  /* Fill up the request with one spot free for local node */
219
  sent_cache = rand_cache_except(context->local_cache, context->sent_entries - 1,
220
                                 context->cloud_nodes, 2);
221

    
222

    
223
  if (cloud_cache == NULL) {
224
    /* Cloud is not initialized, just set the cloud cache to send_cache */
225
    cloudcast_reply_cloud(context->proto_context, sent_cache);
226
    cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
227
    return 0;
228
  } else {
229
    struct peer_cache *remote_cache = NULL;
230
    int err = 0;
231
    int g = 0;
232
    int i = 0;
233

    
234

    
235
    /* If the cloud contact frequency *IS NOT* too high save an entry for the
236
       cloud  */
237
    if (delta > (context->period / context->cloud_contact_treshold)) {
238
      g++;
239
    }
240

    
241
    /* If the cloud contact frequency *IS* too low save another entry for the
242
       cloud */
243
    if (delta > (context->cloud_contact_treshold * context->period)) {
244
      g++;
245
    }
246

    
247
    /* Fill up the reply with g spots free for cloud entries */
248
    remote_cache = rand_cache_except(cloud_cache, context->sent_entries-g,
249
                                     &context->local_node, 1);
250

    
251
    /* Insert cloud entries in remote cache */
252
    cache_resize(remote_cache,  cache_current_size(remote_cache)+g);
253
    for (i=0; i<g; i++) {
254
      err = cache_add(remote_cache, context->cloud_nodes[i], NULL, 0);
255
      assert(err > 0);
256
    }
257

    
258
    /* Insert remote entries in local view */
259
    cache_fill_rand(context->local_cache, remote_cache, 0);
260

    
261
    /* Insert sent entries in cloud view */
262
    cache_del(cloud_cache, context->local_node);
263
    cache_resize(cloud_cache, context->cache_size);
264
    assert(cache_max_size(cloud_cache) == context->cache_size);
265
    cache_fill_rand(cloud_cache, sent_cache, 0);
266

    
267
    /* Write the new view in the cloud */
268
    cloudcast_reply_cloud(context->proto_context, cloud_cache);
269

    
270
    cache_free(remote_cache);
271
    cache_free(cloud_cache);
272
    cache_free(sent_cache);
273
    return 0;
274
  }
275
}
276

    
277
static int silence_treshold(struct peersampler_context *context)
278
{
279
  int threshold;
280
  int delta;
281
  if (!context->max_silence) return 0;
282
  if (context->last_cloud_contact_sec == 0) return 0;
283

    
284
  threshold = (context->max_silence * context->period) / 1000000ull;
285
  delta = (gettime() / 1000000ull) - context->last_cloud_contact_sec;
286

    
287
  return delta > threshold &&
288
    ((double) rand())/RAND_MAX < context->cloud_respawn_prob;
289
}
290

    
291
static int cloudcast_active_thread(struct peersampler_context *context)
292
{
293
  int err = 0;
294

    
295
  /* If space permits, re-insert old entries that have been sent in the previous
296
     cycle */
297
  if (context->flying_cache) {
298
    cache_fill_rand(context->local_cache, context->flying_cache, 0);
299
    cache_free(context->flying_cache);
300
    context->flying_cache = NULL;
301

    
302
    /* If we didn't received an answer since the last cycle, forget about it */
303
    context->reserved_entries = 0;
304
  }
305

    
306
  /* Increase age of all nodes in the view */
307
  cache_update(context->local_cache);
308

    
309
  /* Select oldest node in the view */
310
  context->dst = last_peer(context->local_cache);
311

    
312
  /* If we remain without neighbors, forcely add a cloud entry */
313
  if (context->dst == NULL) {
314
    context->dst = context->cloud_nodes[0];
315
  }
316

    
317
  context->dst = nodeid_dup(context->dst);
318
  cache_del(context->local_cache, context->dst);
319

    
320
  /* If enabled, readd a cloud node when too much time is passed from the
321
     last contact (computed globally) */
322
  if (!cloudcast_is_cloud_node(context->proto_context, context->dst) &&
323
      silence_treshold(context)) {
324
    cache_add(context->local_cache, context->cloud_nodes[0], NULL, 0);
325
  }
326

    
327
  if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
328
    context->last_cloud_contact_sec = gettime() / 1000000ull;
329

    
330
    /* Request cloud view */
331
    err = cloudcast_query_cloud(context->proto_context);
332
  } else {
333
    /* Fill up request keeping space for local descriptor */
334
    context->flying_cache = rand_cache(context->local_cache,
335
                                       context->sent_entries - 1);
336

    
337
    context->reserved_entries = cache_current_size(context->flying_cache);
338

    
339
    /* Send request to remote peer */
340
    err = cloudcast_query_peer(context->proto_context, context->flying_cache,
341
                               context->dst, context->last_cloud_contact_sec);
342
  }
343

    
344
  return err;
345
}
346

    
347
static int
348
cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff,
349
                     int len)
350
{
351
  int err = 0;
352
  cache_check(context->local_cache);
353

    
354
  /* If we got data, perform the appropriate passive thread operation */
355
  if (len) {
356
    const struct topo_header *th = NULL;
357
    const struct cloudcast_header *ch = NULL;
358
    struct peer_cache *remote_cache  = NULL;
359
    size_t shift = 0;
360

    
361

    
362
    th = (const struct topo_header *) buff;
363
    shift += sizeof(struct topo_header);
364

    
365
    ch = (const struct cloudcast_header *) (buff + shift);
366
    shift += sizeof(struct cloudcast_header);
367

    
368
    if (th->protocol != MSG_TYPE_TOPOLOGY) {
369
      fprintf(stderr, "Peer Sampler: Wrong protocol: %d!\n", th->protocol);
370
      return -1;
371
    }
372

    
373
    context->bootstrap = false;
374

    
375
    /* Update info on global cloud contact time */
376
    if (ch->last_cloud_contact_sec > context->last_cloud_contact_sec) {
377
      context->last_cloud_contact_sec = ch->last_cloud_contact_sec;
378
    }
379

    
380
    if (len - shift > 0)
381
      remote_cache = entries_undump(buff + (shift * sizeof(uint8_t)), len - shift);
382

    
383
    if (th->type == CLOUDCAST_QUERY) {
384
      /* We're being queried by a remote peer */
385
      err = cloudcast_query(context, remote_cache);
386
    } else if(th->type == CLOUDCAST_REPLY){
387
      /* We've received the response to our query from the remote peer */
388
      err = cloudcast_reply(context, remote_cache);
389
    } else if (th->type == CLOUDCAST_CLOUD) {
390
      /* We've received the response form the cloud. Simulate dialogue */
391
      err = cloudcast_cloud_dialogue(context, remote_cache);
392
    } else {
393
      fprintf(stderr, "Cloudcast: Wrong message type: %d\n", th->type);
394
      return -1;
395
    }
396
  }
397

    
398
  /* It it's time, perform the active thread */
399
  if (time_to_send(context)) {
400
    err = cloudcast_active_thread(context);
401
  }
402
  cache_check(context->local_cache);
403

    
404
  return err;
405
}
406

    
407
static const struct nodeID *const *cloudcast_get_neighbourhood(struct peersampler_context *context, int *n)
408
{
409
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
410
  if (context->r == NULL) {
411
    return NULL;
412
  }
413

    
414
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
415
    context->r[*n] = nodeid(context->local_cache, *n);
416
  }
417
  if (context->flying_cache) {
418
    int i,j,dup;
419

    
420
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
421
      dup = 0;
422
      for (j = 0; j<*n; j++){
423
        if (nodeid_equal(context->r[j], nodeid(context->flying_cache, i))){
424
          dup = 1;
425
          continue;
426
        }
427
      }
428
      if (dup) (*n)--;
429
      else context->r[*n] = nodeid(context->flying_cache, i);
430
    }
431
  }
432

    
433
  if (context->dst && (*n < context->cache_size)) {
434
    int j,dup;
435
    dup = 0;
436
    for (j = 0; j<*n; j++){
437
      if (nodeid_equal(context->r[j], context->dst)){
438
        dup = 1;
439
        continue;
440
      }
441
    }
442
    if (!dup){
443
      context->r[*n] = context->dst;
444
      (*n)++;
445
    }
446
  }
447

    
448
  return (const struct nodeID *const *)context->r;
449
}
450

    
451
static const void *cloudcast_get_metadata(struct peersampler_context *context, int *metadata_size)
452
{
453
  return get_metadata(context->local_cache, metadata_size);
454
}
455

    
456
static int cloudcast_grow_neighbourhood(struct peersampler_context *context, int n)
457
{
458
  context->cache_size += n;
459

    
460
  return context->cache_size;
461
}
462

    
463
static int cloudcast_shrink_neighbourhood(struct peersampler_context *context, int n)
464
{
465
  if (context->cache_size < n) {
466
    return -1;
467
  }
468
  context->cache_size -= n;
469

    
470
  return context->cache_size;
471
}
472

    
473
static int cloudcast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
474
{
475
  return cache_del(context->local_cache, neighbour);
476
}
477

    
478
static int cloudcast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
479
{
480
  return cloudcast_proto_change_metadata(context->proto_context, metadata, metadata_size);
481
}
482

    
483
struct peersampler_iface cloudcast = {
484
  .init = cloudcast_init,
485
  .change_metadata = cloudcast_change_metadata,
486
  .add_neighbour = cloudcast_add_neighbour,
487
  .parse_data = cloudcast_parse_data,
488
  .get_neighbourhood = cloudcast_get_neighbourhood,
489
  .get_metadata = cloudcast_get_metadata,
490
  .grow_neighbourhood = cloudcast_grow_neighbourhood,
491
  .shrink_neighbourhood = cloudcast_shrink_neighbourhood,
492
  .remove_neighbour = cloudcast_remove_neighbour,
493
};