Revision 36dc16e0

View differences:

src/TopologyManager/Makefile
5 5
endif
6 6
CFGDIR ?= ..
7 7

  
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o cloudcast.o cloudcast_proto.o
9 9

  
10 10
all: libtopman.a
11 11

  
src/TopologyManager/cloudcast.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14

  
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "topocache.h"
18
#include "cloudcast_proto.h"
19
#include "proto.h"
20
#include "config.h"
21
#include "grapes_msg_types.h"
22

  
23
#define DEFAULT_CACHE_SIZE 10
24
#define DEFAULT_CLOUD_CONTACT_TRESHOLD 4000000
25

  
26
struct peersampler_context{
27
  uint64_t currtime;
28
  int cache_size;
29
  int sent_entries;
30
  int keep_cache_full;
31
  struct peer_cache *local_cache;
32
  bool bootstrap;
33
  int bootstrap_period;
34
  int period;
35

  
36
  int cloud_contact_treshold;
37
  struct nodeID *local_node;
38
  struct nodeID **cloud_nodes;
39

  
40
  struct peer_cache *flying_cache;
41
  struct nodeID *dst;
42

  
43
  struct cloudcast_proto_context *proto_context;
44
};
45

  
46

  
47
static uint64_t gettime(void)
48
{
49
  struct timeval tv;
50

  
51
  gettimeofday(&tv, NULL);
52

  
53
  return tv.tv_usec + tv.tv_sec * 1000000ull;
54
}
55

  
56
static struct peersampler_context* cloudcast_context_init(void){
57
  struct peersampler_context* con;
58
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
59
  memset(con, 0, sizeof(struct peersampler_context));
60

  
61
  //Initialize context with default values
62
  con->bootstrap = true;
63
  con->bootstrap_period = 2000000;
64
  con->period = 10000000;
65
  con->currtime = gettime();
66

  
67
  con->cloud_contact_treshold = DEFAULT_CLOUD_CONTACT_TRESHOLD;
68

  
69
  return con;
70
}
71

  
72
static int time_to_send(struct peersampler_context* con)
73
{
74
  int p = con->bootstrap ? con->bootstrap_period : con->period;
75
  if (gettime() - con->currtime > p) {
76
    con->currtime += p;
77

  
78
    return 1;
79
  }
80

  
81
  return 0;
82
}
83

  
84
static void cache_add_cache(struct peer_cache *dst, const struct peer_cache *add)
85
{
86
  int i, meta_size;
87
  const uint8_t *meta;
88

  
89
  meta = get_metadata(add, &meta_size);
90
  for (i = 0; nodeid(add, i); i++) {
91
    cache_add(dst,  nodeid(add, i), meta + (meta_size * i), meta_size);
92
  }
93
}
94

  
95
static int cache_add_resize(struct peer_cache *dst, struct nodeID *n, const int cache_max_size)
96
{
97
  int err;
98
  err = cache_add(dst, n, NULL, 0);
99
  if (err == -2){
100
    // maybe the cache is full... try resizing it to cache_max
101
    cache_resize(dst, cache_max_size);
102
    err = cache_add(dst, n, NULL, 0);
103
  }
104
  return (err > 0)? 0 : 1;
105
}
106

  
107
/*
108
 * Public Functions!
109
 */
110
static struct peersampler_context* cloudcast_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
111
{
112
  struct tag *cfg_tags;
113
  struct peersampler_context *con;
114
  int res;
115

  
116
  con = cloudcast_context_init();
117
  if (!con) return NULL;
118
  con->local_node = myID;
119

  
120
  cfg_tags = config_parse(config);
121
  res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
122
  if (!res) {
123
    con->cache_size = DEFAULT_CACHE_SIZE;
124
  }
125
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
126
  if (!res) {
127
    con->sent_entries = con->cache_size / 2;
128
  }
129
  res = config_value_int(cfg_tags, "keep_cache_full", &(con->keep_cache_full));
130
  if (!res) {
131
    con->keep_cache_full = 1;
132
  }
133

  
134
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
135
  if (con->local_cache == NULL) {
136
    free(con);
137
    return NULL;
138
  }
139

  
140
  con->proto_context = cloudcast_proto_init(myID, metadata, metadata_size);
141
  if (!con->proto_context){
142
    free(con->local_cache);
143
    free(con);
144
    return NULL;
145
  }
146
  con->cloud_nodes = cloudcast_get_cloud_nodes(con->proto_context, 2);
147

  
148
  return con;
149
}
150

  
151
static int cloudcast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, void *metadata, int metadata_size)
152
{
153
  if (!context->flying_cache) {
154
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
155
  }
156
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
157
    return -1;
158
  }
159

  
160
  if (cloudcast_is_cloud_node(context->proto_context, neighbour) == 0)
161
    return cloudcast_query_peer(context->proto_context, context->flying_cache, neighbour);
162
  else
163
    return cloudcast_query_cloud(context->proto_context);
164
}
165

  
166
static int cloudcast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
167
{
168
  cache_check(context->local_cache);
169
  if (len) {
170
    // Passive thread
171
    const struct topo_header *h = (const struct topo_header *)buff;
172
    struct peer_cache *remote_cache  = NULL;
173
    struct peer_cache *sent_cache = NULL;
174

  
175
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
176
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
177

  
178
      return -1;
179
    }
180

  
181
    context->bootstrap = false;
182

  
183
    if (len - sizeof(struct topo_header) > 0)
184
      remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
185

  
186

  
187
    if (h->type == CLOUDCAST_QUERY) {
188
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
189
      cloudcast_reply_peer(context->proto_context, remote_cache, sent_cache);
190
      context->dst = NULL;
191
    } else if (h->type == CLOUDCAST_CLOUD) {
192
      int g = 0;
193
      int i = 0;
194
      struct peer_cache *reply_cache = NULL;
195

  
196
      // Obtain the timestamp reported by the last query_cloud operation
197
      uint64_t cloud_time_stamp = cloudcast_timestamp_cloud(context->proto_context) * 1000000ull;
198
      uint64_t current_time = gettime();
199
      int delta = current_time - cloud_time_stamp;
200

  
201
      // local view with one spot free for the local nodeID
202
      sent_cache = rand_cache(context->local_cache, context->sent_entries - 1);
203
      for (i=0; i<2; i++) cache_del(sent_cache, context->cloud_nodes[i]);
204

  
205
      if (remote_cache == NULL) {
206
        // Cloud is not initialized, just set the cloud cache to send_cache
207
        cloudcast_reply_cloud(context->proto_context, sent_cache);
208
        remote_cache = cloudcast_cloud_default_reply(context->local_cache, context->cloud_nodes[0]);
209
      } else {
210
        // Locally perform exchange of cache entries
211
        int err;
212
        struct peer_cache *cloud_cache = NULL;
213

  
214
        // if the cloud contact frequency *IS NOT* too high save an entry for the cloud
215
        if (delta > (context->period / context->cloud_contact_treshold)) g++;
216

  
217
        // if the cloud contact frequency *IS* too low save another entry for the cloud
218
        if (delta > (context->cloud_contact_treshold * context->period)) g++;
219

  
220
        // Remove mentions of local node from the cache
221
        cache_del(remote_cache, context->local_node);
222

  
223
        // cloud view with g spot free for cloud nodeID
224
        reply_cache = rand_cache(remote_cache, context->sent_entries - g);
225

  
226
        // building new cloud view
227
        cloud_cache = merge_caches(remote_cache, sent_cache, context->cache_size, &err);
228
        free(remote_cache);
229
        cache_add_cache(cloud_cache, sent_cache);
230
        if (context->keep_cache_full){
231
          cache_add_cache(cloud_cache, reply_cache);
232
        }
233

  
234
        err = cloudcast_reply_cloud(context->proto_context, cloud_cache);
235
        free(cloud_cache);
236

  
237
        //Add the actual cloud nodes to the cache
238
        for (i = 0; i<g; i++){
239
          if (cache_add_resize(reply_cache, context->cloud_nodes[i], context->sent_entries) != 0){
240
            fprintf(stderr, "WTF!!! cannot add to cache\n");
241
            return 1;
242
          }
243
        }
244

  
245
        remote_cache = reply_cache;
246
        context->dst = NULL;
247
      }
248
    }
249
    cache_check(context->local_cache);
250

  
251
    if (remote_cache){
252
      // Remote cache may be NULL due to a non initialize cloud
253
      cache_add_cache(context->local_cache, remote_cache);
254
      cache_free(remote_cache);
255
    }
256
    if (sent_cache) {
257
      if (context->keep_cache_full)
258
        cache_add_cache(context->local_cache, sent_cache);
259
      cache_free(sent_cache);
260
    } else {
261
      if (context->flying_cache) {
262
        cache_add_cache(context->local_cache, context->flying_cache);
263
        cache_free(context->flying_cache);
264
        context->flying_cache = NULL;
265
      }
266
    }
267
  }
268

  
269
  // Active thread
270
  if (time_to_send(context)) {
271
    if (context->flying_cache) {
272
      cache_add_cache(context->local_cache, context->flying_cache);
273
      cache_free(context->flying_cache);
274
      context->flying_cache = NULL;
275
    }
276
    cache_update(context->local_cache);
277
    context->dst = last_peer(context->local_cache);
278
    if (context->dst == NULL) {
279
      // If we remain without neighbors, forcely add a cloud entry
280
      context->dst = cloudcast_get_cloud_node(context->proto_context);
281
      cache_add(context->local_cache, context->dst, NULL, 0);
282
    }
283
    context->dst = nodeid_dup(context->dst);
284
    cache_del(context->local_cache, context->dst);
285

  
286
    if (cloudcast_is_cloud_node(context->proto_context, context->dst)) {
287
      int err;
288
      err = cloudcast_query_cloud(context->proto_context);
289
    } else {
290
      context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
291
      cloudcast_query_peer(context->proto_context, context->flying_cache, context->dst);
292
    }
293
  }
294
  cache_check(context->local_cache);
295

  
296
  return 0;
297
  }
298

  
299
static const struct nodeID **cloudcast_get_neighbourhood(struct peersampler_context *context, int *n)
300
{
301
  static struct nodeID **r;
302

  
303
  r = realloc(r, context->cache_size * sizeof(struct nodeID *));
304
  if (r == NULL) {
305
    return NULL;
306
  }
307

  
308
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
309
    r[*n] = nodeid(context->local_cache, *n);
310
    //fprintf(stderr, "Checking table[%d]\n", *n);
311
  }
312
  if (context->flying_cache) {
313
    int i,j,dup;
314

  
315
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
316
      dup = 0;
317
      for (j = 0; j<*n; j++){
318
        if (nodeid_equal(r[j], nodeid(context->flying_cache, i))){
319
          dup = 1;
320
          continue;
321
        }
322
      }
323
      if (dup) (*n)--;
324
      else r[*n] = nodeid(context->flying_cache, i);
325
    }
326
  }
327

  
328
  if (context->dst && (*n < context->cache_size)) {
329
    int j,dup;
330
    dup = 0;
331
    for (j = 0; j<*n; j++){
332
      if (nodeid_equal(r[j], context->dst)){
333
        dup = 1;
334
        continue;
335
      }
336
    }
337
    if (!dup){
338
      r[*n] = context->dst;
339
      (*n)++;
340
    }
341
  }
342

  
343
  return (const struct nodeID **)r;
344
}
345

  
346
static const void *cloudcast_get_metadata(struct peersampler_context *context, int *metadata_size)
347
{
348
  return get_metadata(context->local_cache, metadata_size);
349
}
350

  
351
static int cloudcast_grow_neighbourhood(struct peersampler_context *context, int n)
352
{
353
  context->cache_size += n;
354

  
355
  return context->cache_size;
356
}
357

  
358
static int cloudcast_shrink_neighbourhood(struct peersampler_context *context, int n)
359
{
360
  if (context->cache_size < n) {
361
    return -1;
362
  }
363
  context->cache_size -= n;
364

  
365
  return context->cache_size;
366
}
367

  
368
static int cloudcast_remove_neighbour(struct peersampler_context *context, struct nodeID *neighbour)
369
{
370
  return cache_del(context->local_cache, neighbour);
371
}
372

  
373
static int cloudcast_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
374
{
375
  return cloudcast_proto_change_metadata(context->proto_context, metadata, metadata_size);
376
}
377

  
378
struct peersampler_iface cloudcast = {
379
  .init = cloudcast_init,
380
  .change_metadata = cloudcast_change_metadata,
381
  .add_neighbour = cloudcast_add_neighbour,
382
  .parse_data = cloudcast_parse_data,
383
  .get_neighbourhood = cloudcast_get_neighbourhood,
384
  .get_metadata = cloudcast_get_metadata,
385
  .grow_neighbourhood = cloudcast_grow_neighbourhood,
386
  .shrink_neighbourhood = cloudcast_shrink_neighbourhood,
387
  .remove_neighbour = cloudcast_remove_neighbour,
388
};
src/TopologyManager/cloudcast_proto.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdint.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10

  
11
#include "net_helper.h"
12
#include "topocache.h"
13
#include "proto.h"
14
#include "topo_proto.h"
15
#include "cloudcast_proto.h"
16
#include "grapes_msg_types.h"
17

  
18
// TODO: This should not be duplicated here. should inherit from topo_proto.c
19
#define MAX_MSG_SIZE 1500
20

  
21
#define CLOUD_VIEW_KEY "view"
22

  
23
struct topo_header cloud_header = {MSG_TYPE_TOPOLOGY, CLOUDCAST_CLOUD};
24

  
25
struct cloudcast_proto_context {
26
  struct peer_cache* myEntry;
27
  struct topo_context *topo_context;
28
  struct cloud_helper_context *cloud_context;
29
};
30

  
31
struct cloudcast_proto_context* cloudcast_proto_init(struct nodeID *s, void *meta, int meta_size)
32
{
33
  struct cloudcast_proto_context *con;
34
  con = malloc(sizeof(struct cloudcast_proto_context));
35

  
36
  if (!con) return NULL;
37

  
38
  con->topo_context = topo_proto_init(s, meta, meta_size);
39
  if (!con->topo_context){
40
    free(con);
41
    return NULL;
42
  }
43

  
44
  con->cloud_context = get_cloud_helper_for(s);
45
  if (!con->cloud_context) {
46
    free(con);
47
    return NULL;
48
  }
49

  
50
  con->myEntry = cache_init(1, meta_size, 0);
51
  cache_add(con->myEntry, s, meta, meta_size);
52

  
53
  return con;
54
}
55

  
56
struct nodeID* cloudcast_get_cloud_node(struct cloudcast_proto_context *con)
57
{
58
  return get_cloud_node(con->cloud_context, 0);
59
}
60

  
61
struct nodeID** cloudcast_get_cloud_nodes(struct cloudcast_proto_context *con, uint8_t number)
62
{
63
  int i;
64
  struct nodeID* *cloud_nodes;
65

  
66
  if (number == 0) return NULL;
67
  cloud_nodes = calloc(number, sizeof(struct nodeID*));
68

  
69
  for (i=0; i< number; i++)
70
    cloud_nodes[i] = get_cloud_node(con->cloud_context, i);
71
  return cloud_nodes;
72
}
73

  
74
int cloudcast_is_cloud_node(struct cloudcast_proto_context *con, struct nodeID* node)
75
{
76
  return is_cloud_node(con->cloud_context, node);
77
}
78

  
79
int cloudcast_reply_peer(struct cloudcast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache)
80
{
81
  return topo_reply(context->topo_context, c, local_cache, MSG_TYPE_TOPOLOGY, CLOUDCAST_REPLY, 0, 0);
82
}
83

  
84
int cloudcast_query_peer(struct cloudcast_proto_context *context, struct peer_cache *sent_cache, struct nodeID *dst)
85
{
86
  return topo_query_peer(context->topo_context, sent_cache, dst, MSG_TYPE_TOPOLOGY, CLOUDCAST_QUERY, 0);
87
}
88

  
89

  
90
int cloudcast_payload_fill(struct cloudcast_proto_context *context, uint8_t *payload, int size, struct peer_cache *c, int max_peers, int include_me)
91
{
92
  int i;
93
  uint8_t *p = payload;
94

  
95
  if (!max_peers) max_peers = MAX_MSG_SIZE; // just to be sure to dump the whole cache...
96
  p += cache_header_dump(p, c, include_me);
97
  if (include_me) {
98
    p += entry_dump(p, context->myEntry, 0, size - (p - payload));
99
    max_peers--;
100
  }
101
  for (i = 0; nodeid(c, i) && max_peers; i++) {
102
    if (!is_cloud_node(context->cloud_context, nodeid(c, i))) {
103
      int res;
104
      res = entry_dump(p, c, i, size - (p - payload));
105
      if (res < 0) {
106
        fprintf(stderr, "too many entries!\n");
107
        return -1;
108
      }
109
      p += res;
110
      --max_peers;
111
    }
112
  }
113

  
114
  return p - payload;
115
}
116

  
117

  
118
int cloudcast_reply_cloud(struct cloudcast_proto_context *context, struct peer_cache *cloud_cache)
119
{
120
  uint8_t headerless_pkt[MAX_MSG_SIZE - sizeof(cloud_header)];
121
  int len, res;
122

  
123
  len = cloudcast_payload_fill(context, headerless_pkt, MAX_MSG_SIZE - sizeof(cloud_header), cloud_cache, 0, 1);
124

  
125
  if (len > 0)
126
    res = put_on_cloud(context->cloud_context, CLOUD_VIEW_KEY, headerless_pkt, len);
127
  else
128
    res = 0;
129
  return res;
130
}
131

  
132
int cloudcast_query_cloud(struct cloudcast_proto_context *context)
133
{
134
  return get_from_cloud(context->cloud_context, CLOUD_VIEW_KEY, (uint8_t *)&cloud_header, sizeof(cloud_header));
135
}
136

  
137
struct peer_cache * cloudcast_cloud_default_reply(struct peer_cache *template, struct nodeID *cloud_entry)
138
{
139
  int size;
140
  struct peer_cache *cloud_reply;
141
  get_metadata(template, &size);
142
  cloud_reply = cache_init(1, size, 0);
143
  cache_add(cloud_reply, cloud_entry, NULL, 0);
144
  return cloud_reply;
145
}
146

  
147
time_t cloudcast_timestamp_cloud(struct cloudcast_proto_context *context)
148
{
149
  return timestamp_cloud(context->cloud_context);
150
}
151

  
152
int cloudcast_proto_change_metadata(struct cloudcast_proto_context *context, void *metadata, int metadata_size)
153
{
154
  if (topo_proto_metadata_update(context->topo_context, metadata, metadata_size) <= 0) {
155
    return -1;
156
  }
157

  
158
  return 1;
159
}
src/TopologyManager/cloudcast_proto.h
1
#ifndef CLOUDCAST_PROTO
2
#define CLOUDCAST_PROTO
3

  
4
#include <time.h>
5
#include "cloud_helper.h"
6

  
7
struct cloudcast_proto_context;
8

  
9
struct cloudcast_proto_context* cloudcast_proto_init(struct nodeID *s, void *meta, int meta_size);
10

  
11

  
12
int cloudcast_reply_peer(struct cloudcast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache);
13
int cloudcast_query_peer(struct cloudcast_proto_context *context, struct peer_cache *local_cache, struct nodeID *dst);
14

  
15
int cloudcast_reply_cloud(struct cloudcast_proto_context *context, struct peer_cache *cloud_cache);
16

  
17
struct peer_cache * cloudcast_cloud_default_reply(struct peer_cache *template, struct nodeID *cloud_entry);
18

  
19
int cloudcast_query_cloud(struct cloudcast_proto_context *context);
20
time_t cloudcast_timestamp_cloud(struct cloudcast_proto_context *context);
21

  
22
int cloudcast_proto_change_metadata(struct cloudcast_proto_context *context, void *metadata, int metadata_size);
23

  
24
struct nodeID** cloudcast_get_cloud_nodes(struct cloudcast_proto_context *context, uint8_t number);
25

  
26
struct nodeID* cloudcast_get_cloud_node(struct cloudcast_proto_context *context);
27

  
28
int cloudcast_is_cloud_node(struct cloudcast_proto_context *context, struct nodeID* node);
29
#endif	/* CLOUDCAST_PROTO */
src/TopologyManager/peersampler.c
10 10

  
11 11
extern struct peersampler_iface ncast;
12 12
extern struct peersampler_iface cyclon;
13
extern struct peersampler_iface cloudcast;
13 14
extern struct peersampler_iface dummy;
14 15

  
15 16
struct psample_context{
......
33 34
  if (proto) {
34 35
    if (strcmp(proto, "cyclon") == 0) {
35 36
      tc->ps = &cyclon;
36
    }
37
    if (strcmp(proto, "dummy") == 0) {
37
    } else if (strcmp(proto, "cloudcast") == 0) {
38
      tc->ps = &cloudcast;
39
    }else if (strcmp(proto, "dummy") == 0) {
38 40
      tc->ps = &dummy;
41
    } else {
42
      free(tc);
43
      return NULL;
39 44
    }
40 45
  }
41 46
  
src/TopologyManager/proto.h
14 14
#define CYCLON_REPLY 0x06
15 15
#define CLOUDCAST_QUERY 0x07
16 16
#define CLOUDCAST_REPLY 0x08
17
#define CLOUDCAST_CLOUD 0x09
17 18

  
18 19
#endif	/* PROTO */

Also available in: Unified diff