Statistics
| Branch: | Revision:

grapes / src / TopologyManager / cyclon.c @ f3ab0d6d

History | View | Annotate | Download (6.8 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14

    
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "topocache.h"
18
#include "topo_proto.h"
19
#include "cyclon_proto.h"
20
#include "proto.h"
21
#include "config.h"
22
#include "grapes_msg_types.h"
23

    
24
#define DEFAULT_CACHE_SIZE 10
25

    
26
struct peersampler_context{
27
  uint64_t currtime;
28
  int cache_size;
29
  int sent_entries;
30
  struct peer_cache *local_cache;
31
  bool bootstrap;
32
  int bootstrap_period;
33
  int period;
34
  
35
  struct peer_cache *flying_cache;
36
  struct nodeID *dst;
37
  
38
  struct cyclon_proto_context *pc; 
39
};
40

    
41

    
42
static uint64_t gettime(void)
43
{
44
  struct timeval tv;
45

    
46
  gettimeofday(&tv, NULL);
47

    
48
  return tv.tv_usec + tv.tv_sec * 1000000ull;
49
}
50

    
51
static struct peersampler_context* cyclon_context_init(void){
52
  struct peersampler_context* con;
53
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
54

    
55
  //Initialize context with default values
56
  con->bootstrap = true;
57
  con->bootstrap_period = 2000000;
58
  con->period = 10000000;
59
  con->currtime = gettime();
60

    
61
  return con;
62
}
63

    
64
static int time_to_send(struct peersampler_context* con)
65
{
66
  int p = con->bootstrap ? con->bootstrap_period : con->period;
67
  if (gettime() - con->currtime > p) {
68
    con->currtime += p;
69

    
70
    return 1;
71
  }
72

    
73
  return 0;
74
}
75

    
76
static void cache_add_cache(struct peer_cache *dst, const struct peer_cache *add)
77
{
78
  int i, meta_size;
79
  const uint8_t *meta;
80

    
81
  meta = get_metadata(add, &meta_size);
82
  for (i = 0; nodeid(add, i); i++) {
83
    cache_add(dst,  nodeid(add, i), meta + (meta_size * i), meta_size);
84
  }
85
}
86

    
87

    
88
/*
89
 * Public Functions!
90
 */
91
static struct peersampler_context* cyclon_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
92
{
93
  struct tag *cfg_tags;
94
  struct peersampler_context *con;
95
  int res;
96

    
97
  con = cyclon_context_init();
98
  if (!con) return NULL;
99

    
100
  cfg_tags = config_parse(config);
101
  res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
102
  if (!res) {
103
    con->cache_size = DEFAULT_CACHE_SIZE;
104
  }
105
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
106
  if (!res) {
107
    con->sent_entries = con->cache_size / 2;
108
  }
109

    
110
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
111
  if (con->local_cache == NULL) {
112
    free(con);
113
    return NULL;
114
  }
115

    
116
  con->pc = cyclon_proto_init(myID, metadata, metadata_size);
117
  if (!con->pc){
118
    free(con->local_cache);
119
    free(con);
120
    return NULL;
121
  }
122

    
123
  return con;
124
}
125

    
126
static int cyclon_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, void *metadata, int metadata_size)
127
{
128
  if (!context->flying_cache) {
129
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
130
  }
131
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
132
    return -1;
133
  }
134

    
135
  return cyclon_query(context->pc, context->flying_cache, neighbour);
136
}
137

    
138
static int cyclon_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
139
{
140
  cache_check(context->local_cache);
141
  if (len) {
142
    const struct topo_header *h = (const struct topo_header *)buff;
143
    struct peer_cache *remote_cache;
144
    struct peer_cache *sent_cache = NULL;
145

    
146
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
147
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
148

    
149
      return -1;
150
    }
151

    
152
    context->bootstrap = false;
153

    
154
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
155
    if (h->type == CYCLON_QUERY) {
156
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
157
      cyclon_reply(context->pc, remote_cache, sent_cache);
158
      context->dst = NULL;
159
    }
160
    cache_check(context->local_cache);
161
    cache_add_cache(context->local_cache, remote_cache);
162
    cache_free(remote_cache);
163
    if (sent_cache) {
164
      cache_add_cache(context->local_cache, sent_cache);
165
      cache_free(sent_cache);
166
    } else {
167
      if (context->flying_cache) {
168
        cache_add_cache(context->local_cache, context->flying_cache);
169
        cache_free(context->flying_cache);
170
        context->flying_cache = NULL;
171
      }
172
    }
173
  }
174

    
175
  if (time_to_send(context)) {
176
    if (context->flying_cache) {
177
      cache_add_cache(context->local_cache, context->flying_cache);
178
      cache_free(context->flying_cache);
179
      context->flying_cache = NULL;
180
    }
181
    cache_update(context->local_cache);
182
    context->dst = last_peer(context->local_cache);
183
    if (context->dst == NULL) {
184
      return 0;
185
    }
186
    context->dst = nodeid_dup(context->dst);
187
    cache_del(context->local_cache, context->dst);
188
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
189
    cyclon_query(context->pc, context->flying_cache, context->dst);
190
  }
191
  cache_check(context->local_cache);
192

    
193
  return 0;
194
}
195

    
196
static const struct nodeID **cyclon_get_neighbourhood(struct peersampler_context *context, int *n)
197
{
198
  static struct nodeID **r;
199

    
200
  r = realloc(r, context->cache_size * sizeof(struct nodeID *));
201
  if (r == NULL) {
202
    return NULL;
203
  }
204

    
205
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
206
    r[*n] = nodeid(context->local_cache, *n);
207
    //fprintf(stderr, "Checking table[%d]\n", *n);
208
  }
209
  if (context->flying_cache) {
210
    int i;
211

    
212
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
213
      r[*n] = nodeid(context->flying_cache, i);
214
    }
215
  }
216
  if (context->dst && (*n < context->cache_size)) {
217
    r[*n] = context->dst;
218
    (*n)++;
219
  }
220

    
221
  return (const struct nodeID **)r;
222
}
223

    
224
static const void *cyclon_get_metadata(struct peersampler_context *context, int *metadata_size)
225
{
226
  return get_metadata(context->local_cache, metadata_size);
227
}
228

    
229
static int cyclon_grow_neighbourhood(struct peersampler_context *context, int n)
230
{
231
  context->cache_size += n;
232

    
233
  return context->cache_size;
234
}
235

    
236
static int cyclon_shrink_neighbourhood(struct peersampler_context *context, int n)
237
{
238
  if (context->cache_size < n) {
239
    return -1;
240
  }
241
  context->cache_size -= n;
242

    
243
  return context->cache_size;
244
}
245

    
246
static int cyclon_remove_neighbour(struct peersampler_context *context, struct nodeID *neighbour)
247
{
248
  return cache_del(context->local_cache, neighbour);
249
}
250

    
251
static int cyclon_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
252
{
253
  return cyclon_proto_change_metadata(context->pc, metadata, metadata_size);
254
}
255

    
256
struct peersampler_iface cyclon = {
257
  .init = cyclon_init,
258
  .change_metadata = cyclon_change_metadata,
259
  .add_neighbour = cyclon_add_neighbour,
260
  .parse_data = cyclon_parse_data,
261
  .get_neighbourhood = cyclon_get_neighbourhood,
262
  .get_metadata = cyclon_get_metadata,
263
  .grow_neighbourhood = cyclon_grow_neighbourhood,
264
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
265
  .remove_neighbour = cyclon_remove_neighbour,
266
};