Statistics
| Branch: | Revision:

grapes / src / PeerSampler / cyclon.c @ 10ddaca7

History | View | Annotate | Download (7.65 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14

    
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "../Cache/topocache.h"
18
#include "../Cache/cyclon_proto.h"
19
#include "../Cache/proto.h"
20
#include "grapes_config.h"
21
#include "grapes_msg_types.h"
22

    
23
#define DEFAULT_CACHE_SIZE 10
24
#define DEFAULT_BOOTSTRAP_CYCLES 5
25
#define DEFAULT_BOOTSTRAP_PERIOD 2*1000*1000
26
#define DEFAULT_PERIOD 10*1000*1000
27

    
28
struct peersampler_context{
29
  uint64_t currtime;
30
  int cache_size;
31
  int sent_entries;
32
  struct peer_cache *local_cache;
33
  bool bootstrap;
34
  int bootstrap_period;
35
  int bootstrap_cycles;
36
  int period;
37
  
38
  struct peer_cache *flying_cache;
39
  struct nodeID *dst;
40

    
41
  struct cyclon_proto_context *pc;
42
  const struct nodeID **r;
43
};
44

    
45

    
46
static uint64_t gettime(void)
47
{
48
  struct timeval tv;
49

    
50
  gettimeofday(&tv, NULL);
51

    
52
  return tv.tv_usec + tv.tv_sec * 1000000ull;
53
}
54

    
55
static struct peersampler_context* cyclon_context_init(void)
56
{
57
  struct peersampler_context* con;
58
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
59

    
60
  //Initialize context with default values
61
  con->bootstrap = true;
62
  con->currtime = gettime();
63

    
64
  return con;
65
}
66

    
67
static int time_to_send(struct peersampler_context* con)
68
{
69
  int p = con->bootstrap ? con->bootstrap_period : con->period;
70
  if (gettime() - con->currtime > p) {
71
    con->currtime += p;
72

    
73
    return 1;
74
  }
75

    
76
  return 0;
77
}
78

    
79
/*
80
 * Public Functions!
81
 */
82
static struct peersampler_context* cyclon_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
83
{
84
  struct tag *cfg_tags;
85
  struct peersampler_context *con;
86
  int res;
87

    
88
  con = cyclon_context_init();
89
  if (!con) return NULL;
90

    
91
  cfg_tags = grapes_config_parse(config);
92
  res = grapes_config_value_int(cfg_tags, "cache_size", &(con->cache_size));
93
  if (!res) {
94
    con->cache_size = DEFAULT_CACHE_SIZE;
95
  }
96
  res = grapes_config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
97
  if (!res) {
98
    con->sent_entries = con->cache_size / 2;
99
  }
100
  res = grapes_config_value_int(cfg_tags, "period", &con->period);
101
  if (!res) {
102
    con->period = DEFAULT_PERIOD;
103
  }
104
  res = grapes_config_value_int(cfg_tags, "bootstrap_period", &con->bootstrap_period);
105
  if (!res) {
106
    con->bootstrap_period = DEFAULT_BOOTSTRAP_PERIOD;
107
  }
108
  res = grapes_config_value_int(cfg_tags, "bootstrap_cycles", &con->bootstrap_cycles);
109
  if (!res) {
110
    con->bootstrap_cycles = DEFAULT_BOOTSTRAP_CYCLES;
111
  }
112
  free(cfg_tags);
113

    
114
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
115
  if (con->local_cache == NULL) {
116
    free(con);
117
    return NULL;
118
  }
119

    
120
  con->pc = cyclon_proto_init(myID, metadata, metadata_size);
121
  if (!con->pc){
122
    free(con->local_cache);
123
    free(con);
124
    return NULL;
125
  }
126

    
127
  return con;
128
}
129

    
130
static int cyclon_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
131
{
132
  if (!context->flying_cache) {
133
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
134
  }
135
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
136
    return -1;
137
  }
138

    
139
  return cyclon_query(context->pc, context->flying_cache, neighbour);
140
}
141

    
142
static int cyclon_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
143
{
144
  cache_check(context->local_cache);
145
  if (len) {
146
    const struct topo_header *h = (const struct topo_header *)buff;
147
    struct peer_cache *remote_cache;
148
    struct peer_cache *sent_cache = NULL;
149

    
150
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
151
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
152

    
153
      return -1;
154
    }
155

    
156
    if (context->bootstrap_cycles) {
157
      if (--context->bootstrap_cycles == 0) {
158
        context->bootstrap = false;
159
      }
160
    }
161

    
162
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
163
    if (h->type == CYCLON_QUERY) {
164
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
165
      cyclon_reply(context->pc, remote_cache, sent_cache);
166
      nodeid_free(context->dst);
167
      context->dst = NULL;
168
    }
169
    cache_check(context->local_cache);
170
    cache_add_cache(context->local_cache, remote_cache);
171
    cache_free(remote_cache);
172
    if (sent_cache) {
173
      cache_add_cache(context->local_cache, sent_cache);
174
      cache_free(sent_cache);
175
    } else {
176
      if (context->flying_cache) {
177
        cache_add_cache(context->local_cache, context->flying_cache);
178
        cache_free(context->flying_cache);
179
        context->flying_cache = NULL;
180
      }
181
    }
182
  }
183

    
184
  if (time_to_send(context)) {
185
    if (context->flying_cache) {
186
      cache_add_cache(context->local_cache, context->flying_cache);
187
      cache_free(context->flying_cache);
188
      context->flying_cache = NULL;
189
    }
190
    cache_update(context->local_cache);
191
    nodeid_free(context->dst);
192
    context->dst = last_peer(context->local_cache);
193
    if (context->dst == NULL) {
194
      return 0;
195
    }
196
    context->dst = nodeid_dup(context->dst);
197
    cache_del(context->local_cache, context->dst);
198
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
199
    return cyclon_query(context->pc, context->flying_cache, context->dst);
200
  }
201
  cache_check(context->local_cache);
202

    
203
  return 0;
204
}
205

    
206
static const struct nodeID *const *cyclon_get_neighbourhood(struct peersampler_context *context, int *n)
207
{
208
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
209
  if (context->r == NULL) {
210
    return NULL;
211
  }
212

    
213
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
214
    context->r[*n] = nodeid(context->local_cache, *n);
215
    //fprintf(stderr, "Checking table[%d]\n", *n);
216
  }
217
  if (context->flying_cache) {
218
    int i;
219

    
220
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
221
      context->r[*n] = nodeid(context->flying_cache, i);
222
    }
223
  }
224
  if (context->dst && (*n < context->cache_size)) {
225
    context->r[*n] = context->dst;
226
    (*n)++;
227
  }
228

    
229
  return context->r;
230
}
231

    
232
static const void *cyclon_get_metadata(struct peersampler_context *context, int *metadata_size)
233
{
234
  return get_metadata(context->local_cache, metadata_size);
235
}
236

    
237
static int cyclon_grow_neighbourhood(struct peersampler_context *context, int n)
238
{
239
  context->cache_size += n;
240

    
241
  return context->cache_size;
242
}
243

    
244
static int cyclon_shrink_neighbourhood(struct peersampler_context *context, int n)
245
{
246
  if (context->cache_size < n) {
247
    return -1;
248
  }
249
  context->cache_size -= n;
250

    
251
  return context->cache_size;
252
}
253

    
254
static int cyclon_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
255
{
256
  return cache_del(context->local_cache, neighbour);
257
}
258

    
259
static int cyclon_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
260
{
261
  return cyclon_proto_change_metadata(context->pc, metadata, metadata_size);
262
}
263

    
264
void cyclon_destroy(struct peersampler_context **context)
265
{
266
        if (context && *context)
267
        {
268
                if((*context)->r)
269
                        free((*context)->r);
270
                if((*context)->local_cache)
271
                        cache_free((*context)->local_cache);
272
                if((*context)->flying_cache)
273
                        cache_free((*context)->flying_cache);
274
                free(*context);
275
                *context = NULL;
276
        }
277
}
278

    
279
struct peersampler_iface cyclon = {
280
  .init = cyclon_init,
281
  .destroy = cyclon_destroy,
282
  .change_metadata = cyclon_change_metadata,
283
  .add_neighbour = cyclon_add_neighbour,
284
  .parse_data = cyclon_parse_data,
285
  .get_neighbourhood = cyclon_get_neighbourhood,
286
  .get_metadata = cyclon_get_metadata,
287
  .grow_neighbourhood = cyclon_grow_neighbourhood,
288
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
289
  .remove_neighbour = cyclon_remove_neighbour,
290
};