Statistics
| Branch: | Revision:

grapes / src / TopologyManager / cyclon.c @ 380f01e0

History | View | Annotate | Download (5.15 KB)

1

    
2
/*
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdlib.h>
11
#include <stdint.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include "net_helper.h"
17
#include "peersampler_iface.h"
18
#include "topocache.h"
19
#include "topo_proto.h"
20
#include "cyclon_proto.h"
21
#include "proto.h"
22
#include "config.h"
23
#include "grapes_msg_types.h"
24

    
25
#define MAX_PEERS 20
26

    
27
static uint64_t currtime;
28
static int cache_size = MAX_PEERS;
29
static int sent_entries = MAX_PEERS / 4;
30
static struct peer_cache *local_cache;
31
static bool bootstrap = true;
32
static int bootstrap_period = 2000000;
33
static int period = 10000000;
34

    
35
static struct peer_cache *flying_cache;
36
static struct nodeID *dst;
37

    
38
static uint64_t gettime(void)
39
{
40
  struct timeval tv;
41

    
42
  gettimeofday(&tv, NULL);
43

    
44
  return tv.tv_usec + tv.tv_sec * 1000000ull;
45
}
46

    
47
static int time_to_send(void)
48
{
49
  int p = bootstrap ? bootstrap_period : period;
50
  if (gettime() - currtime > p) {
51
    currtime += p;
52

    
53
    return 1;
54
  }
55

    
56
  return 0;
57
}
58

    
59
/*
60
 * Public Functions!
61
 */
62
static int cyclon_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
63
{
64
  local_cache = cache_init(cache_size, metadata_size, 0);
65
  if (local_cache == NULL) {
66
    return -1;
67
  }
68
  topo_proto_init(myID, metadata, metadata_size);
69
  currtime = gettime();
70
  bootstrap = true;
71

    
72
  return 1;
73
}
74

    
75
static int cyclon_change_metadata(void *metadata, int metadata_size)
76
{
77
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
78
    return -1;
79
  }
80

    
81
  return 1;
82
}
83

    
84
static int cyclon_add_neighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
85
{
86
  if (!flying_cache) {
87
    flying_cache = rand_cache(local_cache, sent_entries - 1);
88
  }
89
  if (cache_add(local_cache, neighbour, metadata, metadata_size) < 0) {
90
    return -1;
91
  }
92

    
93
  return cyclon_query(flying_cache, neighbour);
94
}
95

    
96
static int cyclon_parse_data(const uint8_t *buff, int len)
97
{
98
  cache_check(local_cache);
99
  if (len) {
100
    const struct topo_header *h = (const struct topo_header *)buff;
101
    struct peer_cache *new, *remote_cache;
102
    struct peer_cache *sent_cache = NULL;
103
    int dummy;
104

    
105
    if (h->protocol != MSG_TYPE_TOPOLOGY) {
106
      fprintf(stderr, "Peer Sampler: Wrong protocol!\n");
107

    
108
      return -1;
109
    }
110

    
111
    bootstrap = false;
112

    
113
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
114
    if (h->type == CYCLON_QUERY) {
115
      sent_cache = rand_cache(local_cache, sent_entries);
116
      cyclon_reply(remote_cache, sent_cache);
117
      dst = NULL;
118
    }
119
    cache_check(local_cache);
120
    new = merge_caches(local_cache, remote_cache, cache_size, &dummy);
121
    cache_free(remote_cache);
122
    if (new != NULL) {
123
      cache_free(local_cache);
124
      local_cache = new;
125
    }
126
    if (sent_cache) {
127
      int dummy;
128

    
129
      new = cache_union(local_cache, sent_cache, &dummy);
130
      if (new != NULL) {
131
        cache_free(local_cache);
132
        local_cache = new;
133
      }
134
      cache_free(sent_cache);
135
    } else {
136
      if (flying_cache) {
137
        cache_free(flying_cache);
138
        flying_cache = NULL;
139
      }
140
    }
141
  }
142

    
143
  if (time_to_send()) {
144
    if (flying_cache) {
145
      struct peer_cache *new;
146
      int dummy;
147

    
148
      new = cache_union(local_cache, flying_cache, &dummy);
149
      if (new != NULL) {
150
        cache_free(local_cache);
151
        local_cache = new;
152
      }
153
      cache_free(flying_cache);
154
      flying_cache = NULL;
155
    }
156
    cache_update(local_cache);
157
    dst = last_peer(local_cache);
158
    if (dst == NULL) {
159
      return 0;
160
    }
161
    dst = nodeid_dup(dst);
162
    cache_del(local_cache, dst);
163
    flying_cache = rand_cache(local_cache, sent_entries - 1);
164
    cyclon_query(flying_cache, dst);
165
  }
166
  cache_check(local_cache);
167

    
168
  return 0;
169
}
170

    
171
static const struct nodeID **cyclon_get_neighbourhood(int *n)
172
{
173
  static struct nodeID **r;
174

    
175
  r = realloc(r, cache_size * sizeof(struct nodeID *));
176
  if (r == NULL) {
177
    return NULL;
178
  }
179

    
180
  for (*n = 0; nodeid(local_cache, *n) && (*n < MAX_PEERS); (*n)++) {
181
    r[*n] = nodeid(local_cache, *n);
182
    //fprintf(stderr, "Checking table[%d]\n", *n);
183
  }
184
  if (flying_cache) {
185
    int i;
186

    
187
    for (i = 0; nodeid(flying_cache, i) && (*n < MAX_PEERS); (*n)++, i++) {
188
      r[*n] = nodeid(flying_cache, i);
189
    }
190
  }
191
  if (dst && (*n < MAX_PEERS)) {
192
    r[*n] = dst;
193
    (*n)++;
194
  }
195

    
196
  return (const struct nodeID **)r;
197
}
198

    
199
static const void *cyclon_get_metadata(int *metadata_size)
200
{
201
  return get_metadata(local_cache, metadata_size);
202
}
203

    
204
static int cyclon_grow_neighbourhood(int n)
205
{
206
  cache_size += n;
207

    
208
  return cache_size;
209
}
210

    
211
static int cyclon_shrink_neighbourhood(int n)
212
{
213
  if (cache_size < n) {
214
    return -1;
215
  }
216
  cache_size -= n;
217

    
218
  return cache_size;
219
}
220

    
221
static int cyclon_remove_neighbour(struct nodeID *neighbour)
222
{
223
  return cache_del(local_cache, neighbour);
224
}
225

    
226
struct peersampler_iface cyclon = {
227
  .init = cyclon_init,
228
  .change_metadata = cyclon_change_metadata,
229
  .add_neighbour = cyclon_add_neighbour,
230
  .parse_data = cyclon_parse_data,
231
  .get_neighbourhood = cyclon_get_neighbourhood,
232
  .get_metadata = cyclon_get_metadata,
233
  .grow_neighbourhood = cyclon_grow_neighbourhood,
234
  .shrink_neighbourhood = cyclon_shrink_neighbourhood,
235
  .remove_neighbour = cyclon_remove_neighbour,
236
};