Revision e910cf6c src/PeerSampler/cloudcast.c

View differences:

src/PeerSampler/cloudcast.c
17 17
#include <stdio.h>
18 18
#include <stdbool.h>
19 19
#include <string.h>
20
#include <assert.h>
20 21

  
21 22
#include "net_helper.h"
22 23
#include "peersampler_iface.h"
......
26 27
#include "config.h"
27 28
#include "grapes_msg_types.h"
28 29

  
29
#define DEFAULT_CACHE_SIZE 10
30
#define DEFAULT_CACHE_SIZE 20
31
#define DEFAULT_PARTIAL_VIEW_SIZE 5
30 32

  
31 33
#define CLOUDCAST_TRESHOLD 4
32 34
#define CLOUDCAST_PERIOD 10000000 // 10 seconds
......
36 38
  uint64_t currtime;
37 39
  int cache_size;
38 40
  int sent_entries;
39
  int keep_cache_full;
40 41
  struct peer_cache *local_cache;
41 42
  bool bootstrap;
42 43
  int bootstrap_period;
43 44
  int period;
44 45

  
46
  uint64_t last_cloud_contact_sec;
47
  int max_silence;
48
  double cloud_respawn_prob;
45 49
  int cloud_contact_treshold;
46 50
  struct nodeID *local_node;
47 51
  struct nodeID **cloud_nodes;
48 52

  
53
  int reserved_entries;
49 54
  struct peer_cache *flying_cache;
50 55
  struct nodeID *dst;
51 56

  
......
54 59
};
55 60

  
56 61

  
62
/* return the time in microseconds */
57 63
static uint64_t gettime(void)
58 64
{
59 65
  struct timeval tv;
......
78 84
  con->period = CLOUDCAST_PERIOD;
79 85
  con->currtime = gettime();
80 86
  con->cloud_contact_treshold = CLOUDCAST_TRESHOLD;
87
  con->last_cloud_contact_sec = 0;
88
  con->max_silence = 0;
89
  con->cloud_respawn_prob = 0;
81 90

  
82 91
  con->r = NULL;
83 92

  
......
86 95

  
87 96
static int time_to_send(struct peersampler_context* con)
88 97
{
98
  long time;
89 99
  int p = con->bootstrap ? con->bootstrap_period : con->period;
90
  if (gettime() - con->currtime > p) {
91
    con->currtime += p;
92 100

  
101
  time = gettime();
102
  if (time - con->currtime > p) {
103
    if (con->bootstrap) con->currtime = time;
104
    else con->currtime += p;
93 105
    return 1;
94 106
  }
95 107

  
96 108
  return 0;
97 109
}
98 110

  
99
static void cache_add_cache(struct peer_cache *dst, const struct peer_cache *add)
100
{
101
  int i, meta_size;
102
  const uint8_t *meta;
103

  
104
  meta = get_metadata(add, &meta_size);
105
  for (i = 0; nodeid(add, i); i++) {
106
    cache_add(dst,  nodeid(add, i), meta + (meta_size * i), meta_size);
107
  }
108
}
109

  
110
static int cache_add_resize(struct peer_cache *dst, struct nodeID *n, const int cache_max_size)
111
{
112
  int err;
113
  err = cache_add(dst, n, NULL, 0);
114
  if (err == -2){
115
    // maybe the cache is full... try resizing it to cache_max
116
    cache_resize(dst, cache_max_size);
117
    err = cache_add(dst, n, NULL, 0);
118
  }
119
  return (err > 0)? 0 : 1;
120
}
121

  
122 111
/*
123 112
 * Public Functions!
124 113
 */
......
139 128
  }
140 129
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
141 130
  if (!res) {
142
    con->sent_entries = con->cache_size / 2;
131
    con->sent_entries = DEFAULT_PARTIAL_VIEW_SIZE;
143 132
  }
144
  res = config_value_int(cfg_tags, "keep_cache_full", &(con->keep_cache_full));
133
  res = config_value_int(cfg_tags, "max_silence", &(con->max_silence));
145 134
  if (!res) {
146
    con->keep_cache_full = 1;
135
    con->max_silence = 0;
136
  }
137

  
138
  res = config_value_double(cfg_tags, "cloud_respawn_prob", &(con->cloud_respawn_prob));
139
  if (!res) {
140
    con->max_silence = 0;
147 141
  }
148 142

  
149 143
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
......
166 160

  
167 161
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
168 162
{
169
  if (!context->flying_cache) {
170
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
171
  }
172 163
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
173 164
    return -1;
165
  } return 0;
166
}
167

  
168
static int cloudcast_query(struct peersampler_context *context,
169
                           struct peer_cache *remote_cache)
170
{
171
  struct peer_cache *sent_cache;
172

  
173
  /* Fill up reply and send it.
174
     NOTE: we don't know the remote node so we cannot prevent putting it into
175
     the reply. Remote node should check for entries of itself */
176
  sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
177
  cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache,
178
                       context->last_cloud_contact_sec);
179

  
180
  /* Insert entries in view without using space reserved by the active thread */
181
  cache_fill_rand(context->local_cache, remote_cache,
182
                    context->cache_size - context->reserved_entries);
183

  
184

  
185
  cache_free(remote_cache);
186
  cache_free(sent_cache);
187
  return 0;
188
}
189

  
190
static int cloudcast_reply(struct peersampler_context *context,
191
                           struct peer_cache *remote_cache)
192
{
193
  /* Be sure not to insert local_node in cache!!! */
194
  cache_del(remote_cache, context->local_node);
195

  
196
  /* Insert remote entries in view */
197
  context->reserved_entries = 0;
198
  cache_fill_rand(context->local_cache, remote_cache, 0);
199

  
200
  cache_free(remote_cache);
201
  return 0;
202
}
203

  
204
static int cloudcast_cloud_dialogue(struct peersampler_context *context,
205
                                    struct peer_cache *cloud_cache)
206
{
207
  struct peer_cache * sent_cache = NULL;
208
  uint64_t cloud_tstamp;
209
  uint64_t current_time;
210
  int delta;
211

  
212
  /* Obtain the timestamp reported by the last query_cloud operation and
213
     compute delta */
214
  cloud_tstamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
215
  current_time = gettime();
216
  delta = current_time - cloud_tstamp;
217

  
218
  /* Fill up the request with one spot free for local node */
219
  sent_cache = rand_cache_except(context->local_cache, context->sent_entries - 1,
220
                                 context->cloud_nodes, 2);
221

  
222

  
223
  if (cloud_cache == NULL) {
224
    /* Cloud is not initialized, just set the cloud cache to send_cache */
225
    cloudcast_reply_cloud(context->proto_context, sent_cache);
226
    cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
227
    return 0;
228
  } else {
229
    struct peer_cache *remote_cache = NULL;
230
    int err = 0;
231
    int g = 0;
232
    int i = 0;
233

  
234

  
235
    /* If the cloud contact frequency *IS NOT* too high save an entry for the
236
       cloud  */
237
    if (delta > (context->period / context->cloud_contact_treshold)) {
238
      g++;
239
    }
240

  
241
    /* If the cloud contact frequency *IS* too low save another entry for the
242
       cloud */
243
    if (delta > (context->cloud_contact_treshold * context->period)) {
244
      g++;
245
    }
246

  
247
    /* Fill up the reply with g spots free for cloud entries */
248
    remote_cache = rand_cache_except(cloud_cache, context->sent_entries-g,
249
                                     &context->local_node, 1);
250

  
251
    /* Insert cloud entries in remote cache */
252
    cache_resize(remote_cache,  cache_current_size(remote_cache)+g);
253
    for (i=0; i<g; i++) {
254
      err = cache_add(remote_cache, context->cloud_nodes[i], NULL, 0);
255
      assert(err > 0);
256
    }
257

  
258
    /* Insert remote entries in local view */
259
    cache_fill_rand(context->local_cache, remote_cache, 0);
260

  
261
    /* Insert sent entries in cloud view */
262
    cache_del(cloud_cache, context->local_node);
263
    cache_resize(cloud_cache, context->cache_size);
264
    assert(cache_max_size(cloud_cache) == context->cache_size);
265
    cache_fill_rand(cloud_cache, sent_cache, 0);
266

  
267
    /* Write the new view in the cloud */
268
    cloudcast_reply_cloud(context->proto_context, cloud_cache);
269

  
270
    cache_free(remote_cache);
271
    cache_free(cloud_cache);
272
    cache_free(sent_cache);
273
    return 0;
274
  }
275
}
276

  
277
static int silence_treshold(struct peersampler_context *context)
278
{
279
  int threshold;
280
  int delta;
281
  if (!context->max_silence) return 0;
282
  if (context->last_cloud_contact_sec == 0) return 0;
283

  
284
  threshold = (context->max_silence * context->period) / 1000000ull;
285
  delta = (gettime() / 1000000ull) - context->last_cloud_contact_sec;
286

  
287
  return delta > threshold &&
288
    ((double) rand())/RAND_MAX < context->cloud_respawn_prob;
289
}
290

  
291
static int cloudcast_active_thread(struct peersampler_context *context)
292
{
293
  int err = 0;
294

  
295
  /* If space permits, re-insert old entries that have been sent in the previous
296
     cycle */
297
  if (context->flying_cache) {
298
    cache_fill_rand(context->local_cache, context->flying_cache, 0);
299
    cache_free(context->flying_cache);
300
    context->flying_cache = NULL;
301

  
302
    /* If we didn't received an answer since the last cycle, forget about it */
303
    context->reserved_entries = 0;
304
  }
305

  
306
  /* Increase age of all nodes in the view */
307
  cache_update(context->local_cache);
308

  
309
  /* Select oldest node in the view */
310
  context->dst = last_peer(context->local_cache);
311

  
312
  /* If we remain without neighbors, forcely add a cloud entry */
313
  if (context->dst == NULL) {
314
    context->dst = context->cloud_nodes[0];
315
  }
316

  
317
  context->dst = nodeid_dup(context->dst);
318
  cache_del(context->local_cache, context->dst);
319

  
320
  /* If enabled, readd a cloud node when too much time is passed from the
321
     last contact (computed globally) */
322
  if (!cloudcast_is_cloud_node(context->proto_context, context->dst) &&
323
      silence_treshold(context)) {
324
    cache_add(context->local_cache, context->cloud_nodes[0], NULL, 0);
325
  }
326

  
327
  if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
328
    context->last_cloud_contact_sec = gettime() / 1000000ull;
329

  
330
    /* Request cloud view */
331
    err = cloudcast_query_cloud(context->proto_context);
332
  } else {
333
    /* Fill up request keeping space for local descriptor */
334
    context->flying_cache = rand_cache(context->local_cache,
335
                                       context->sent_entries - 1);
336

  
337
    context->reserved_entries = cache_current_size(context->flying_cache);
338

  
339
    /* Send request to remote peer */
340
    err = cloudcast_query_peer(context->proto_context, context->flying_cache,
341
                               context->dst, context->last_cloud_contact_sec);
174 342
  }
175 343

  
176
  if (cloudcast_is_cloud_node(context->proto_context, neighbour) == 0)
177
    return cloudcast_query_peer(context->proto_context, context->flying_cache, neighbour);
178
  else
179
    return cloudcast_query_cloud(context->proto_context);
344
  return err;
180 345
}
181 346

  
182
static int cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
347
static int
348
cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff,
349
                     int len)
183 350
{
351
  int err = 0;
184 352
  cache_check(context->local_cache);
353

  
354
  /* If we got data, perform the appropriate passive thread operation */
185 355
  if (len) {
186
    // Passive thread
187
    const struct topo_header *h = (const struct topo_header *)buff;
356
    const struct topo_header *th = NULL;
357
    const struct cloudcast_header *ch = NULL;
188 358
    struct peer_cache *remote_cache  = NULL;
189
    struct peer_cache *sent_cache = NULL;
359
    size_t shift = 0;
360

  
190 361

  
191
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
192
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
362
    th = (const struct topo_header *) buff;
363
    shift += sizeof(struct topo_header);
193 364

  
365
    ch = (const struct cloudcast_header *) (buff + shift);
366
    shift += sizeof(struct cloudcast_header);
367

  
368
    if (th->protocol != MSG_TYPE_TOPOLOGY) {
369
      fprintf(stderr, "Peer Sampler: Wrong protocol: %d!\n", th->protocol);
194 370
      return -1;
195 371
    }
196 372

  
197 373
    context->bootstrap = false;
198 374

  
199
    if (len - sizeof(struct topo_header) > 0)
200
      remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
201

  
202

  
203
    if (h->type == CLOUDCAST_QUERY) {
204
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
205
      cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache);
206
      context->dst = NULL;
207
    } else if (h->type == CLOUDCAST_CLOUD) {
208
      int g = 0;
209
      int i = 0;
210
      struct peer_cache *reply_cache = NULL;
211

  
212
      // Obtain the timestamp reported by the last query_cloud operation
213
      uint64_t cloud_time_stamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
214
      uint64_t current_time = gettime();
215
      int delta = current_time - cloud_time_stamp;
216

  
217
      // local view with one spot free for the local nodeID
218
      sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
219
      for (i=0; i<2; i++) cache_del(sent_cache, context->cloud_nodes[i]);
220

  
221
      if (remote_cache == NULL) {
222
        // Cloud is not initialized, just set the cloud cache to send_cache
223
        cloudcast_reply_cloud(context->proto_context, sent_cache);
224
        remote_cache = cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
225
      } else {
226
        // Locally perform exchange of cache entries
227
        int err;
228
        struct peer_cache *cloud_cache = NULL;
229

  
230
        // if the cloud contact frequency *IS NOT* too high save an entry for the cloud
231
        if (delta > (context->period / context->cloud_contact_treshold)) g++;
232

  
233
        // if the cloud contact frequency *IS* too low save another entry for the cloud
234
        if (delta > (context->cloud_contact_treshold * context->period)) g++;
235

  
236
        // Remove mentions of local node from the cache
237
        cache_del(remote_cache, context->local_node);
238

  
239
        // cloud view with g spot free for cloud nodeID
240
        reply_cache = rand_cache(remote_cache, context->sent_entries - g);
241

  
242
        // building new cloud view
243
        cloud_cache = merge_caches(remote_cache, sent_cache, context->cache_size, &err);
244
        free(remote_cache);
245
        cache_add_cache(cloud_cache, sent_cache);
246
        if (context->keep_cache_full){
247
          cache_add_cache(cloud_cache, reply_cache);
248
        }
249

  
250
        err = cloudcast_reply_cloud(context->proto_context, cloud_cache);
251
        free(cloud_cache);
252

  
253
        //Add the actual cloud nodes to the cache
254
        for (i = 0; i<g; i++){
255
          if (cache_add_resize(reply_cache, context->cloud_nodes[i], context->sent_entries) != 0){
256
            fprintf(stderr, "WTF!!! cannot add to cache\n");
257
            return 1;
258
          }
259
        }
260

  
261
        remote_cache = reply_cache;
262
        context->dst = NULL;
263
      }
375
    /* Update info on global cloud contact time */
376
    if (ch->last_cloud_contact_sec > context->last_cloud_contact_sec) {
377
      context->last_cloud_contact_sec = ch->last_cloud_contact_sec;
264 378
    }
265
    cache_check(context->local_cache);
266 379

  
267
    if (remote_cache){
268
      // Remote cache may be NULL due to a non initialize cloud
269
      cache_add_cache(context->local_cache, remote_cache);
270
      cache_free(remote_cache);
271
    }
272
    if (sent_cache) {
273
      if (context->keep_cache_full)
274
        cache_add_cache(context->local_cache, sent_cache);
275
      cache_free(sent_cache);
380
    if (len - shift > 0)
381
      remote_cache = entries_undump(buff + (shift * sizeof(uint8_t)), len - shift);
382

  
383
    if (th->type == CLOUDCAST_QUERY) {
384
      /* We're being queried by a remote peer */
385
      err = cloudcast_query(context, remote_cache);
386
    } else if(th->type == CLOUDCAST_REPLY){
387
      /* We've received the response to our query from the remote peer */
388
      err = cloudcast_reply(context, remote_cache);
389
    } else if (th->type == CLOUDCAST_CLOUD) {
390
      /* We've received the response form the cloud. Simulate dialogue */
391
      err = cloudcast_cloud_dialogue(context, remote_cache);
276 392
    } else {
277
      if (context->flying_cache) {
278
        cache_add_cache(context->local_cache, context->flying_cache);
279
        cache_free(context->flying_cache);
280
        context->flying_cache = NULL;
281
      }
393
      fprintf(stderr, "Cloudcast: Wrong message type: %d\n", th->type);
394
      return -1;
282 395
    }
283 396
  }
284 397

  
285
  // Active thread
398
  /* It it's time, perform the active thread */
286 399
  if (time_to_send(context)) {
287
    if (context->flying_cache) {
288
      cache_add_cache(context->local_cache, context->flying_cache);
289
      cache_free(context->flying_cache);
290
      context->flying_cache = NULL;
291
    }
292
    cache_update(context->local_cache);
293
    context->dst = last_peer(context->local_cache);
294
    if (context->dst == NULL) {
295
      // If we remain without neighbors, forcely add a cloud entry
296
      context->dst = cloudcast_get_cloud_node(context->proto_context);
297
      cache_add(context->local_cache, context->dst, NULL, 0);
298
    }
299
    context->dst = nodeid_dup(context->dst);
300
    cache_del(context->local_cache, context->dst);
301

  
302
    if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
303
      int err;
304
      err = cloudcast_query_cloud(context->proto_context);
305
    } else {
306
      context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
307
      cloudcast_query_peer(context->proto_context, context->flying_cache, context->dst);
308
    }
400
    err = cloudcast_active_thread(context);
309 401
  }
310 402
  cache_check(context->local_cache);
311 403

  
312
  return 0;
313
  }
404
  return err;
405
}
314 406

  
315 407
static const struct nodeID **cloudcast_get_neighbourhood(struct peersampler_context *context, int *n)
316 408
{
......
321 413

  
322 414
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
323 415
    context->r[*n] = nodeid(context->local_cache, *n);
324
    //fprintf(stderr, "Checking table[%d]\n", *n);
325 416
  }
326 417
  if (context->flying_cache) {
327 418
    int i,j,dup;

Also available in: Unified diff