Statistics
| Branch: | Revision:

grapes / src / PeerSampler / cyclon.c @ 21a6d161

History | View | Annotate | Download (6.67 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14

    
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "../Cache/topocache.h"
18
#include "../Cache/cyclon_proto.h"
19
#include "../Cache/proto.h"
20
#include "config.h"
21
#include "grapes_msg_types.h"
22

    
23
#define DEFAULT_CACHE_SIZE 10
24

    
25
struct peersampler_context{
26
  uint64_t currtime;
27
  int cache_size;
28
  int sent_entries;
29
  struct peer_cache *local_cache;
30
  bool bootstrap;
31
  int bootstrap_period;
32
  int period;
33
  
34
  struct peer_cache *flying_cache;
35
  struct nodeID *dst;
36

    
37
  struct cyclon_proto_context *pc;
38
  const struct nodeID **r;
39
};
40

    
41

    
42
static uint64_t gettime(void)
43
{
44
  struct timeval tv;
45

    
46
  gettimeofday(&tv, NULL);
47

    
48
  return tv.tv_usec + tv.tv_sec * 1000000ull;
49
}
50

    
51
static struct peersampler_context* cyclon_context_init(void)
52
{
53
  struct peersampler_context* con;
54
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
55

    
56
  //Initialize context with default values
57
  con->bootstrap = true;
58
  con->bootstrap_period = 2000000;
59
  con->period = 10000000;
60
  con->currtime = gettime();
61

    
62
  return con;
63
}
64

    
65
static int time_to_send(struct peersampler_context* con)
66
{
67
  int p = con->bootstrap ? con->bootstrap_period : con->period;
68
  if (gettime() - con->currtime > p) {
69
    con->currtime += p;
70

    
71
    return 1;
72
  }
73

    
74
  return 0;
75
}
76

    
77
/*
78
 * Public Functions!
79
 */
80
static struct peersampler_context* cyclon_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
81
{
82
  struct tag *cfg_tags;
83
  struct peersampler_context *con;
84
  int res;
85

    
86
  con = cyclon_context_init();
87
  if (!con) return NULL;
88

    
89
  cfg_tags = config_parse(config);
90
  res = config_value_int(cfg_tags, "cache_size", &(con->cache_size));
91
  if (!res) {
92
    con->cache_size = DEFAULT_CACHE_SIZE;
93
  }
94
  res = config_value_int(cfg_tags, "sent_entries", &(con->sent_entries));
95
  if (!res) {
96
    con->sent_entries = con->cache_size / 2;
97
  }
98
  free(cfg_tags);
99

    
100
  con->local_cache = cache_init(con->cache_size, metadata_size, 0);
101
  if (con->local_cache == NULL) {
102
    free(con);
103
    return NULL;
104
  }
105

    
106
  con->pc = cyclon_proto_init(myID, metadata, metadata_size);
107
  if (!con->pc){
108
    free(con->local_cache);
109
    free(con);
110
    return NULL;
111
  }
112

    
113
  return con;
114
}
115

    
116
static int cyclon_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
117
{
118
  if (!context->flying_cache) {
119
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
120
  }
121
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
122
    return -1;
123
  }
124

    
125
  return cyclon_query(context->pc, context->flying_cache, neighbour);
126
}
127

    
128
static int cyclon_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
129
{
130
  cache_check(context->local_cache);
131
  if (len) {
132
    const struct topo_header *h = (const struct topo_header *)buff;
133
    struct peer_cache *remote_cache;
134
    struct peer_cache *sent_cache = NULL;
135

    
136
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
137
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
138

    
139
      return -1;
140
    }
141

    
142
    context->bootstrap = false;
143

    
144
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
145
    if (h->type == CYCLON_QUERY) {
146
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
147
      cyclon_reply(context->pc, remote_cache, sent_cache);
148
      nodeid_free(context->dst);
149
      context->dst = NULL;
150
    }
151
    cache_check(context->local_cache);
152
    cache_add_cache(context->local_cache, remote_cache);
153
    cache_free(remote_cache);
154
    if (sent_cache) {
155
      cache_add_cache(context->local_cache, sent_cache);
156
      cache_free(sent_cache);
157
    } else {
158
      if (context->flying_cache) {
159
        cache_add_cache(context->local_cache, context->flying_cache);
160
        cache_free(context->flying_cache);
161
        context->flying_cache = NULL;
162
      }
163
    }
164
  }
165

    
166
  if (time_to_send(context)) {
167
    if (context->flying_cache) {
168
      cache_add_cache(context->local_cache, context->flying_cache);
169
      cache_free(context->flying_cache);
170
      context->flying_cache = NULL;
171
    }
172
    cache_update(context->local_cache);
173
    nodeid_free(context->dst);
174
    context->dst = last_peer(context->local_cache);
175
    if (context->dst == NULL) {
176
      return 0;
177
    }
178
    context->dst = nodeid_dup(context->dst);
179
    cache_del(context->local_cache, context->dst);
180
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
181
    return cyclon_query(context->pc, context->flying_cache, context->dst);
182
  }
183
  cache_check(context->local_cache);
184

    
185
  return 0;
186
}
187

    
188
static const struct nodeID **cyclon_get_neighbourhood(struct peersampler_context *context, int *n)
189
{
190
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
191
  if (context->r == NULL) {
192
    return NULL;
193
  }
194

    
195
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
196
    context->r[*n] = nodeid(context->local_cache, *n);
197
    //fprintf(stderr, "Checking table[%d]\n", *n);
198
  }
199
  if (context->flying_cache) {
200
    int i;
201

    
202
    for (i = 0; nodeid(context->flying_cache, i) && (*n < context->cache_size); (*n)++, i++) {
203
      context->r[*n] = nodeid(context->flying_cache, i);
204
    }
205
  }
206
  if (context->dst && (*n < context->cache_size)) {
207
    context->r[*n] = context->dst;
208
    (*n)++;
209
  }
210

    
211
  return context->r;
212
}
213

    
214
static const void *cyclon_get_metadata(struct peersampler_context *context, int *metadata_size)
215
{
216
  return get_metadata(context->local_cache, metadata_size);
217
}
218

    
219
static int cyclon_grow_neighbourhood(struct peersampler_context *context, int n)
220
{
221
  context->cache_size += n;
222

    
223
  return context->cache_size;
224
}
225

    
226
static int cyclon_shrink_neighbourhood(struct peersampler_context *context, int n)
227
{
228
  if (context->cache_size < n) {
229
    return -1;
230
  }
231
  context->cache_size -= n;
232

    
233
  return context->cache_size;
234
}
235

    
236
static int cyclon_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
237
{
238
  return cache_del(context->local_cache, neighbour);
239
}
240

    
241
static int cyclon_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
242
{
243
  return cyclon_proto_change_metadata(context->pc, metadata, metadata_size);
244
}
245

    
246
struct peersampler_iface cyclon = {
247
  .init = cyclon_init,
248
  .change_metadata = cyclon_change_metadata,
249
  .add_neighbour = cyclon_add_neighbour,
250
  .parse_data = cyclon_parse_data,
251
  .get_neighbourhood = cyclon_get_neighbourhood,
252
  .get_metadata = cyclon_get_metadata,
253
  .grow_neighbourhood = cyclon_grow_neighbourhood,
254
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
255
  .remove_neighbour = cyclon_remove_neighbour,
256
};