Statistics
| Branch: | Revision:

grapes / src / PeerSampler / ncast.c @ 1583fd78

History | View | Annotate | Download (6.72 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14

    
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "../Cache/topocache.h"
18
#include "../Cache/ncast_proto.h"
19
#include "../Cache/proto.h"
20
#include "config.h"
21
#include "grapes_msg_types.h"
22

    
23
#define DEFAULT_CACHE_SIZE 10
24
#define DEFAULT_MAX_TIMESTAMP 5
25
#define DEFAULT_BOOTSTRAP_CYCLES 5
26
#define DEFAULT_BOOTSTRAP_PERIOD 2*1000*1000
27
#define DEFAULT_PERIOD 10*1000*1000
28

    
29
struct peersampler_context{
30
  uint64_t currtime;
31
  int cache_size;
32
  int cache_size_threshold;
33
  struct peer_cache *local_cache;
34
  bool bootstrap;
35
  struct nodeID *bootstrap_node;
36
  int bootstrap_period;
37
  int bootstrap_cycles;
38
  int period;
39
  int counter;
40
  struct ncast_proto_context *tc;
41
  const struct nodeID **r;
42
};
43

    
44
static uint64_t gettime(void)
45
{
46
  struct timeval tv;
47

    
48
  gettimeofday(&tv, NULL);
49

    
50
  return tv.tv_usec + tv.tv_sec * 1000000ull;
51
}
52

    
53
static struct peersampler_context* ncast_context_init(void)
54
{
55
  struct peersampler_context* con;
56
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
57

    
58
  //Initialize context with default values
59
  con->bootstrap = true;
60
  con->bootstrap_node = NULL;
61
  con->currtime = gettime();
62
  con->r = NULL;
63

    
64
  return con;
65
}
66

    
67
static int time_to_send(struct peersampler_context *context)
68
{
69
  int p = context->bootstrap ? context->bootstrap_period : context->period;
70
  if (gettime() - context->currtime > p) {
71
    context->currtime += p;
72

    
73
    return 1;
74
  }
75

    
76
  return 0;
77
}
78

    
79
static void cache_size_threshold_init(struct peersampler_context* context)
80
{
81
  context->cache_size_threshold = (context->cache_size - 1 / 2);
82
}
83

    
84
/*
85
 * Exported Functions!
86
 */
87
static struct peersampler_context* ncast_init(struct nodeID *myID, const void *metadata, int metadata_size, const char *config)
88
{
89
  struct tag *cfg_tags;
90
  struct peersampler_context *context;
91
  int res, max_timestamp;
92

    
93
  context = ncast_context_init();
94
  if (!context) return NULL;
95

    
96
  cfg_tags = config_parse(config);
97
  res = config_value_int(cfg_tags, "cache_size", &context->cache_size);
98
  if (!res) {
99
    context->cache_size = DEFAULT_CACHE_SIZE;
100
  }
101
  res = config_value_int(cfg_tags, "max_timestamp", &max_timestamp);
102
  if (!res) {
103
    max_timestamp = DEFAULT_MAX_TIMESTAMP;
104
  }
105
  res = config_value_int(cfg_tags, "period", &context->period);
106
  if (!res) {
107
    context->period = DEFAULT_PERIOD;
108
  }
109
  res = config_value_int(cfg_tags, "bootstrap_period", &context->bootstrap_period);
110
  if (!res) {
111
    context->bootstrap_period = DEFAULT_BOOTSTRAP_PERIOD;
112
  }
113
  res = config_value_int(cfg_tags, "bootstrap_cycles", &context->bootstrap_cycles);
114
  if (!res) {
115
    context->bootstrap_cycles = DEFAULT_BOOTSTRAP_CYCLES;
116
  }
117
  free(cfg_tags);
118
  
119
  context->local_cache = cache_init(context->cache_size, metadata_size, max_timestamp);
120
  if (context->local_cache == NULL) {
121
    free(context);
122
    return NULL;
123
  }
124

    
125
  cache_size_threshold_init(context);
126

    
127
  context->tc = ncast_proto_init(myID, metadata, metadata_size);
128
  if (!context->tc){
129
    free(context->local_cache);
130
    free(context);
131
    return NULL;
132
  }
133

    
134
  return context;
135
}
136

    
137
static int ncast_change_metadata(struct peersampler_context *context, const void *metadata, int metadata_size)
138
{
139
  if (ncast_proto_metadata_update(context->tc, metadata, metadata_size) <= 0) {
140
    return -1;
141
  }
142

    
143
  return 1;
144
}
145

    
146
static int ncast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, const void *metadata, int metadata_size)
147
{
148
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
149
    return -1;
150
  }
151
  if (!context->bootstrap_node) {        //save the first added nodeid as bootstrap nodeid
152
    context->bootstrap_node = nodeid_dup(neighbour);
153
  }
154
  return ncast_query_peer(context->tc, context->local_cache, neighbour);
155
}
156

    
157
static int ncast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
158
{
159
  int dummy;
160

    
161
  if (len) {
162
    const struct topo_header *h = (const struct topo_header *)buff;
163
    struct peer_cache *new, *remote_cache;
164

    
165
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
166
      fprintf(stderr, "NCAST: Wrong protocol!\n");
167

    
168
      return -1;
169
    }
170

    
171
    context->counter++;
172
    if (context->counter == context->bootstrap_cycles) context->bootstrap = false;
173

    
174
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
175
    if (h->type == NCAST_QUERY) {
176
      ncast_reply(context->tc, remote_cache, context->local_cache);
177
    }
178
    new = merge_caches(context->local_cache, remote_cache, context->cache_size, &dummy);
179
    cache_free(remote_cache);
180
    if (new != NULL) {
181
      cache_free(context->local_cache);
182
      context->local_cache = new;
183
    }
184
  }
185

    
186
  if (time_to_send(context)) {
187
    if (context->bootstrap_node &&
188
        (cache_entries(context->local_cache) <= context->cache_size_threshold) &&
189
        (cache_pos(context->local_cache, context->bootstrap_node) < 0)) {
190
      cache_add(context->local_cache, context->bootstrap_node, NULL, 0);
191
    }
192
    cache_update(context->local_cache);
193
    return ncast_query(context->tc, context->local_cache);
194
  }
195

    
196
  return 0;
197
}
198

    
199
static const struct nodeID *const*ncast_get_neighbourhood(struct peersampler_context *context, int *n)
200
{
201
  context->r = realloc(context->r, context->cache_size * sizeof(struct nodeID *));
202
  if (context->r == NULL) {
203
    return NULL;
204
  }
205

    
206
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
207
    context->r[*n] = nodeid(context->local_cache, *n);
208
    //fprintf(stderr, "Checking table[%d]\n", *n);
209
  }
210

    
211
  return context->r;
212
}
213

    
214
static const void *ncast_get_metadata(struct peersampler_context *context, int *metadata_size)
215
{
216
  return get_metadata(context->local_cache, metadata_size);
217
}
218

    
219
static int ncast_grow_neighbourhood(struct peersampler_context *context, int n)
220
{
221
  context->cache_size += n;
222
  cache_size_threshold_init(context);
223

    
224
  return context->cache_size;
225
}
226

    
227
static int ncast_shrink_neighbourhood(struct peersampler_context *context, int n)
228
{
229
  if (context->cache_size < n) {
230
    return -1;
231
  }
232
  context->cache_size -= n;
233
  cache_size_threshold_init(context);
234

    
235
  return context->cache_size;
236
}
237

    
238
static int ncast_remove_neighbour(struct peersampler_context *context, const struct nodeID *neighbour)
239
{
240
  return cache_del(context->local_cache, neighbour);
241
}
242

    
243
struct peersampler_iface ncast = {
244
  .init = ncast_init,
245
  .change_metadata = ncast_change_metadata,
246
  .add_neighbour = ncast_add_neighbour,
247
  .parse_data = ncast_parse_data,
248
  .get_neighbourhood = ncast_get_neighbourhood,
249
  .get_metadata = ncast_get_metadata,
250
  .grow_neighbourhood = ncast_grow_neighbourhood,
251
  .shrink_neighbourhood = ncast_shrink_neighbourhood,
252
  .remove_neighbour = ncast_remove_neighbour,
253
};