Statistics
| Branch: | Revision:

grapes / src / TopologyManager / cyclon.c @ 8bf781e5

History | View | Annotate | Download (5.15 KB)

1 194ec532 Luca Abeni
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6
7
#include <sys/time.h>
8
#include <time.h>
9
#include <stdlib.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdbool.h>
13
#include <string.h>
14
15
#include "net_helper.h"
16
#include "peersampler_iface.h"
17
#include "topocache.h"
18
#include "topo_proto.h"
19
#include "cyclon_proto.h"
20
#include "proto.h"
21
#include "config.h"
22
#include "grapes_msg_types.h"
23
24
#define MAX_PEERS 20
25
26
static uint64_t currtime;
27
static int cache_size = MAX_PEERS;
28
static int sent_entries = MAX_PEERS / 4;
29
static struct peer_cache *local_cache;
30
static bool bootstrap = true;
31
static int bootstrap_period = 2000000;
32
static int period = 10000000;
33
34
static struct peer_cache *flying_cache;
35
static struct nodeID *dst;
36
37
static uint64_t gettime(void)
38
{
39
  struct timeval tv;
40
41
  gettimeofday(&tv, NULL);
42
43
  return tv.tv_usec + tv.tv_sec * 1000000ull;
44
}
45
46
static int time_to_send(void)
47
{
48
  int p = bootstrap ? bootstrap_period : period;
49
  if (gettime() - currtime > p) {
50
    currtime += p;
51
52
    return 1;
53
  }
54
55
  return 0;
56
}
57
58
/*
59
 * Public Functions!
60
 */
61
static int cyclon_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
62
{
63
  local_cache = cache_init(cache_size, metadata_size, 0);
64
  if (local_cache == NULL) {
65
    return -1;
66
  }
67
  topo_proto_init(myID, metadata, metadata_size);
68
  currtime = gettime();
69
  bootstrap = true;
70
71
  return 1;
72
}
73
74
static int cyclon_change_metadata(void *metadata, int metadata_size)
75
{
76
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
77
    return -1;
78
  }
79
80
  return 1;
81
}
82
83
static int cyclon_add_neighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
84
{
85
  if (!flying_cache) {
86
    flying_cache = rand_cache(local_cache, sent_entries - 1);
87
  }
88
  if (cache_add(local_cache, neighbour, metadata, metadata_size) < 0) {
89
    return -1;
90
  }
91
92
  return cyclon_query(flying_cache, neighbour);
93
}
94
95
static int cyclon_parse_data(const uint8_t *buff, int len)
96
{
97
  cache_check(local_cache);
98
  if (len) {
99
    const struct topo_header *h = (const struct topo_header *)buff;
100
    struct peer_cache *new, *remote_cache;
101
    struct peer_cache *sent_cache = NULL;
102
    int dummy;
103
104
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
105
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
106
107
      return -1;
108
    }
109
110
    bootstrap = false;
111
112
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
113
    if (h->type == CYCLON_QUERY) {
114
      sent_cache = rand_cache(local_cache, sent_entries);
115
      cyclon_reply(remote_cache, sent_cache);
116
      dst = NULL;
117
    }
118
    cache_check(local_cache);
119
    new = merge_caches(local_cache, remote_cache, cache_size, &dummy);
120
    cache_free(remote_cache);
121
    if (new != NULL) {
122
      cache_free(local_cache);
123
      local_cache = new;
124
    }
125
    if (sent_cache) {
126
      int dummy;
127
128
      new = cache_union(local_cache, sent_cache, &dummy);
129 380f01e0 Luca
      if (new != NULL) {
130
        cache_free(local_cache);
131
        local_cache = new;
132
      }
133 194ec532 Luca Abeni
      cache_free(sent_cache);
134
    } else {
135
      if (flying_cache) {
136
        cache_free(flying_cache);
137
        flying_cache = NULL;
138
      }
139
    }
140
  }
141
142
  if (time_to_send()) {
143
    if (flying_cache) {
144
      struct peer_cache *new;
145
      int dummy;
146 380f01e0 Luca
147 194ec532 Luca Abeni
      new = cache_union(local_cache, flying_cache, &dummy);
148 380f01e0 Luca
      if (new != NULL) {
149
        cache_free(local_cache);
150
        local_cache = new;
151
      }
152 194ec532 Luca Abeni
      cache_free(flying_cache);
153
      flying_cache = NULL;
154
    }
155
    cache_update(local_cache);
156
    dst = last_peer(local_cache);
157
    if (dst == NULL) {
158
      return 0;
159
    }
160
    dst = nodeid_dup(dst);
161
    cache_del(local_cache, dst);
162
    flying_cache = rand_cache(local_cache, sent_entries - 1);
163
    cyclon_query(flying_cache, dst);
164
  }
165
  cache_check(local_cache);
166
167
  return 0;
168
}
169
170
static const struct nodeID **cyclon_get_neighbourhood(int *n)
171
{
172
  static struct nodeID **r;
173
174
  r = realloc(r, cache_size * sizeof(struct nodeID *));
175
  if (r == NULL) {
176
    return NULL;
177
  }
178
179
  for (*n = 0; nodeid(local_cache, *n) && (*n < MAX_PEERS); (*n)++) {
180
    r[*n] = nodeid(local_cache, *n);
181
    //fprintf(stderr, "Checking table[%d]\n", *n);
182
  }
183
  if (flying_cache) {
184
    int i;
185
186
    for (i = 0; nodeid(flying_cache, i) && (*n < MAX_PEERS); (*n)++, i++) {
187
      r[*n] = nodeid(flying_cache, i);
188
    }
189
  }
190
  if (dst && (*n < MAX_PEERS)) {
191
    r[*n] = dst;
192
    (*n)++;
193
  }
194
195
  return (const struct nodeID **)r;
196
}
197
198
static const void *cyclon_get_metadata(int *metadata_size)
199
{
200
  return get_metadata(local_cache, metadata_size);
201
}
202
203
static int cyclon_grow_neighbourhood(int n)
204
{
205
  cache_size += n;
206
207
  return cache_size;
208
}
209
210
static int cyclon_shrink_neighbourhood(int n)
211
{
212
  if (cache_size < n) {
213
    return -1;
214
  }
215
  cache_size -= n;
216
217
  return cache_size;
218
}
219
220
static int cyclon_remove_neighbour(struct nodeID *neighbour)
221
{
222
  return cache_del(local_cache, neighbour);
223
}
224
225
struct peersampler_iface cyclon = {
226
  .init = cyclon_init,
227
  .change_metadata = cyclon_change_metadata,
228
  .add_neighbour = cyclon_add_neighbour,
229
  .parse_data = cyclon_parse_data,
230
  .get_neighbourhood = cyclon_get_neighbourhood,
231
  .get_metadata = cyclon_get_metadata,
232
  .grow_neighbourhood = cyclon_grow_neighbourhood,
233
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
234
  .remove_neighbour = cyclon_remove_neighbour,
235
};