Statistics
| Branch: | Revision:

grapes / src / PeerSampler / cloudcast.c @ 93202b19

History | View | Annotate | Download (11.8 KB)

1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 * Implementation of the Cloudcast peer sampling protocol.
7
 *
8
 * If thread are used calls to psample_init which result in calls to
9
 * cloudcast_init must be syncronized with calls to cloud_helper_init
10
 * and get_cloud_helper_for.
11
 */
12

    
13
#include <sys/time.h>
14
#include <time.h>
15
#include <stdlib.h>
16
#include <stdint.h>
17
#include <stdio.h>
18
#include <stdbool.h>
19
#include <string.h>
20

    
21
#include "net_helper.h"
22
#include "peersampler_iface.h"
23
#include "../Cache/topocache.h"
24
#include "../Cache/cloudcast_proto.h"
25
#include "../Cache/proto.h"
26
#include "config.h"
27
#include "grapes_msg_types.h"
28

    
29
#define DEFAULT_CACHE_SIZE 10
30

    
31
#define CLOUDCAST_TRESHOLD 4
32
#define CLOUDCAST_PERIOD 10000000 // 10 seconds
33
#define CLOUDCAST_BOOTSTRAP_PERIOD 2000000 // 2 seconds
34

    
35
struct peersampler_context{
36
  uint64_t currtime;
37
  int cache_size;
38
  int sent_entries;
39
  int keep_cache_full;
40
  struct peer_cache *local_cache;
41
  bool bootstrap;
42
  int bootstrap_period;
43
  int period;
44

    
45
  int cloud_contact_treshold;
46
  struct nodeID *local_node;
47
  struct nodeID **cloud_nodes;
48

    
49
  struct peer_cache *flying_cache;
50
  struct nodeID *dst;
51

    
52
  struct cloudcast_proto_context *proto_context;
53
  struct nodeID **r;
54
};
55

    
56

    
57
static uint64_t gettime(void)
58
{
59
  struct timeval tv;
60

    
61
  gettimeofday(&tv, NULL);
62

    
63
  return tv.tv_usec + tv.tv_sec * 1000000ull;
64
}
65

    
66
static struct peersampler_context* cloudcast_context_init(void){
67
  struct peersampler_context* con;
68
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
69
  if (!con) {
70
    fprintf(stderr, "cloudcast: Error! could not create context. ENOMEM\n");
71
    return NULL;
72
  }
73
  memset(con, 0, sizeof(struct peersampler_context));
74

    
75
  //Initialize context with default values
76
  con->bootstrap = true;
77
  con->bootstrap_period = CLOUDCAST_BOOTSTRAP_PERIOD;
78
  con->period = CLOUDCAST_PERIOD;
79
  con->currtime = gettime();
80
  con->cloud_contact_treshold = CLOUDCAST_TRESHOLD;
81

    
82
  con->r = NULL;
83

    
84
  return con;
85
}
86

    
87
static int time_to_send(struct peersampler_context* con)
88
{
89
  int p = con->bootstrap ? con->bootstrap_period : con->period;
90
  if (gettime() - con->currtime > p) {
91
    con->currtime += p;
92

    
93
    return 1;
94
  }
95

    
96
  return 0;
97
}
98

    
99
static void cache_add_cache(struct peer_cache *dst, const struct peer_cache *add)
100
{
101
  int i, meta_size;
102
  const uint8_t *meta;
103

    
104
  meta = get_metadata(add, &meta_size);
105
  for (i = 0; nodeid(add, i); i++) {
106
    cache_add(dst,  nodeid(add, i), meta + (meta_size * i), meta_size);
107
  }
108
}
109

    
110
static int cache_add_resize(struct peer_cache *dst, struct nodeID *n, const int cache_max_size)
111
{
112
  int err;
113
  err = cache_add(dst, n, NULL, 0);
114
  if (err == -2){
115
    // maybe the cache is full... try resizing it to cache_max
116
    cache_resize(dst, cache_max_size);
117
    err = cache_add(dst, n, NULL, 0);
118
  }
119
  return (err > 0)? 0 : 1;
120
}
121

    
122
/*
123
 * Public Functions!
124
 */
125
static struct peersampler_context* cloudcast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
126
{
127
  struct tag *cfg_tags;
128
  struct peersampler_context *con;
129
  int res;
130

    
131
  con = cloudcast_context_init();
132
  if (!con) return NULL;
133
  con->local_node = myID;
134

    
135
  cfg_tags = config_parse(config);
136
  res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
137
  if (!res) {
138
    con->cache_size = DEFAULT_CACHE_SIZE;
139
  }
140
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
141
  if (!res) {
142
    con->sent_entries = con->cache_size / 2;
143
  }
144
  res = config_value_int(cfg_tags, "keep_cache_full", &(con->keep_cache_full));
145
  if (!res) {
146
    con->keep_cache_full = 1;
147
  }
148

    
149
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
150
  if (con->local_cache == NULL) {
151
    fprintf(stderr, "cloudcast: Error initializing local cache\n");
152
    free(con);
153
    return NULL;
154
  }
155

    
156
  con->proto_context = cloudcast_proto_init(myID, metadata, metadata_size);
157
  if (!con->proto_context){
158
    free(con->local_cache);
159
    free(con);
160
    return NULL;
161
  }
162
  con->cloud_nodes = cloudcast_get_cloud_nodes(con->proto_context, 2);
163

    
164
  return con;
165
}
166

    
167
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
168
{
169
  if (!context->flying_cache) {
170
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
171
  }
172
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
173
    return -1;
174
  }
175

    
176
  if (cloudcast_is_cloud_node(context->proto_context, neighbour) == 0)
177
    return cloudcast_query_peer(context->proto_context, context->flying_cache, neighbour);
178
  else
179
    return cloudcast_query_cloud(context->proto_context);
180
}
181

    
182
static int cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
183
{
184
  cache_check(context->local_cache);
185
  if (len) {
186
    // Passive thread
187
    const struct topo_header *h = (const struct topo_header *)buff;
188
    struct peer_cache *remote_cache  = NULL;
189
    struct peer_cache *sent_cache = NULL;
190

    
191
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
192
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
193

    
194
      return -1;
195
    }
196

    
197
    context->bootstrap = false;
198

    
199
    if (len - sizeof(struct topo_header) > 0)
200
      remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
201

    
202

    
203
    if (h->type == CLOUDCAST_QUERY) {
204
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
205
      cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache);
206
      context->dst = NULL;
207
    } else if (h->type == CLOUDCAST_CLOUD) {
208
      int g = 0;
209
      int i = 0;
210
      struct peer_cache *reply_cache = NULL;
211

    
212
      // Obtain the timestamp reported by the last query_cloud operation
213
      uint64_t cloud_time_stamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
214
      uint64_t current_time = gettime();
215
      int delta = current_time - cloud_time_stamp;
216

    
217
      // local view with one spot free for the local nodeID
218
      sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
219
      for (i=0; i<2; i++) cache_del(sent_cache, context->cloud_nodes[i]);
220

    
221
      if (remote_cache == NULL) {
222
        // Cloud is not initialized, just set the cloud cache to send_cache
223
        cloudcast_reply_cloud(context->proto_context, sent_cache);
224
        remote_cache = cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
225
      } else {
226
        // Locally perform exchange of cache entries
227
        int err;
228
        struct peer_cache *cloud_cache = NULL;
229

    
230
        // if the cloud contact frequency *IS NOT* too high save an entry for the cloud
231
        if (delta > (context->period / context->cloud_contact_treshold)) g++;
232

    
233
        // if the cloud contact frequency *IS* too low save another entry for the cloud
234
        if (delta > (context->cloud_contact_treshold * context->period)) g++;
235

    
236
        // Remove mentions of local node from the cache
237
        cache_del(remote_cache, context->local_node);
238

    
239
        // cloud view with g spot free for cloud nodeID
240
        reply_cache = rand_cache(remote_cache, context->sent_entries - g);
241

    
242
        // building new cloud view
243
        cloud_cache = merge_caches(remote_cache, sent_cache, context->cache_size, &err);
244
        free(remote_cache);
245
        cache_add_cache(cloud_cache, sent_cache);
246
        if (context->keep_cache_full){
247
          cache_add_cache(cloud_cache, reply_cache);
248
        }
249

    
250
        err = cloudcast_reply_cloud(context->proto_context, cloud_cache);
251
        free(cloud_cache);
252

    
253
        //Add the actual cloud nodes to the cache
254
        for (i = 0; i<g; i++){
255
          if (cache_add_resize(reply_cache, context->cloud_nodes[i], context->sent_entries) != 0){
256
            fprintf(stderr, "WTF!!! cannot add to cache\n");
257
            return 1;
258
          }
259
        }
260

    
261
        remote_cache = reply_cache;
262
        context->dst = NULL;
263
      }
264
    }
265
    cache_check(context->local_cache);
266

    
267
    if (remote_cache){
268
      // Remote cache may be NULL due to a non initialize cloud
269
      cache_add_cache(context->local_cache, remote_cache);
270
      cache_free(remote_cache);
271
    }
272
    if (sent_cache) {
273
      if (context->keep_cache_full)
274
        cache_add_cache(context->local_cache, sent_cache);
275
      cache_free(sent_cache);
276
    } else {
277
      if (context->flying_cache) {
278
        cache_add_cache(context->local_cache, context->flying_cache);
279
        cache_free(context->flying_cache);
280
        context->flying_cache = NULL;
281
      }
282
    }
283
  }
284

    
285
  // Active thread
286
  if (time_to_send(context)) {
287
    if (context->flying_cache) {
288
      cache_add_cache(context->local_cache, context->flying_cache);
289
      cache_free(context->flying_cache);
290
      context->flying_cache = NULL;
291
    }
292
    cache_update(context->local_cache);
293
    context->dst = last_peer(context->local_cache);
294
    if (context->dst == NULL) {
295
      // If we remain without neighbors, forcely add a cloud entry
296
      context->dst = cloudcast_get_cloud_node(context->proto_context);
297
      cache_add(context->local_cache, context->dst, NULL, 0);
298
    }
299
    context->dst = nodeid_dup(context->dst);
300
    cache_del(context->local_cache, context->dst);
301

    
302
    if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
303
      int err;
304
      err = cloudcast_query_cloud(context->proto_context);
305
    } else {
306
      context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
307
      cloudcast_query_peer(context->proto_context, context->flying_cache, context->dst);
308
    }
309
  }
310
  cache_check(context->local_cache);
311

    
312
  return 0;
313
  }
314

    
315
static const struct nodeID **cloudcast_get_neighbourhood(struct peersampler_context *context, int *n)
316
{
317
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
318
  if (context->r == NULL) {
319
    return NULL;
320
  }
321

    
322
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
323
    context->r[*n] = nodeid(context->local_cache, *n);
324
    //fprintf(stderr, "Checking table[%d]\n", *n);
325
  }
326
  if (context->flying_cache) {
327
    int i,j,dup;
328

    
329
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
330
      dup = 0;
331
      for (j = 0; j<*n; j++){
332
        if (nodeid_equal(context->r[j], nodeid(context->flying_cache, i))){
333
          dup = 1;
334
          continue;
335
        }
336
      }
337
      if (dup) (*n)--;
338
      else context->r[*n] = nodeid(context->flying_cache, i);
339
    }
340
  }
341

    
342
  if (context->dst && (*n < context->cache_size)) {
343
    int j,dup;
344
    dup = 0;
345
    for (j = 0; j<*n; j++){
346
      if (nodeid_equal(context->r[j], context->dst)){
347
        dup = 1;
348
        continue;
349
      }
350
    }
351
    if (!dup){
352
      context->r[*n] = context->dst;
353
      (*n)++;
354
    }
355
  }
356

    
357
  return (const struct nodeID **)context->r;
358
}
359

    
360
static const void *cloudcast_get_metadata(struct peersampler_context *context, int *metadata_size)
361
{
362
  return get_metadata(context->local_cache, metadata_size);
363
}
364

    
365
static int cloudcast_grow_neighbourhood(struct peersampler_context *context, int n)
366
{
367
  context->cache_size += n;
368

    
369
  return context->cache_size;
370
}
371

    
372
static int cloudcast_shrink_neighbourhood(struct peersampler_context *context, int n)
373
{
374
  if (context->cache_size < n) {
375
    return -1;
376
  }
377
  context->cache_size -= n;
378

    
379
  return context->cache_size;
380
}
381

    
382
static int cloudcast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
383
{
384
  return cache_del(context->local_cache, neighbour);
385
}
386

    
387
static int cloudcast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
388
{
389
  return cloudcast_proto_change_metadata(context->proto_context, metadata, metadata_size);
390
}
391

    
392
struct peersampler_iface cloudcast = {
393
  .init = cloudcast_init,
394
  .change_metadata = cloudcast_change_metadata,
395
  .add_neighbour = cloudcast_add_neighbour,
396
  .parse_data = cloudcast_parse_data,
397
  .get_neighbourhood = cloudcast_get_neighbourhood,
398
  .get_metadata = cloudcast_get_metadata,
399
  .grow_neighbourhood = cloudcast_grow_neighbourhood,
400
  .shrink_neighbourhood = cloudcast_shrink_neighbourhood,
401
  .remove_neighbour = cloudcast_remove_neighbour,
402
};