Revision f3ab0d6d

View differences:

src/TopologyManager/cyclon.c
34 34
  
35 35
  struct peer_cache *flying_cache;
36 36
  struct nodeID *dst;
37
  
38
  struct cyclon_proto_context *pc; 
37 39
};
38 40

  
39 41

  
......
110 112
    free(con);
111 113
    return NULL;
112 114
  }
113
  topo_proto_init(myID, metadata, metadata_size);
114
  return con;
115
}
116 115

  
117
static int cyclon_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
118
{
119
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
120
    return -1;
116
  con->pc = cyclon_proto_init(myID, metadata, metadata_size);
117
  if (!con->pc){
118
    free(con->local_cache);
119
    free(con);
120
    return NULL;
121 121
  }
122 122

  
123
  return 1;
123
  return con;
124 124
}
125 125

  
126 126
static int cyclon_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, void *metadata, int metadata_size)
......
132 132
    return -1;
133 133
  }
134 134

  
135
  return cyclon_query(context->flying_cache, neighbour);
135
  return cyclon_query(context->pc, context->flying_cache, neighbour);
136 136
}
137 137

  
138 138
static int cyclon_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
......
154 154
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
155 155
    if (h->type == CYCLON_QUERY) {
156 156
      sent_cache = rand_cache(context->local_cache, context->sent_entries);
157
      cyclon_reply(remote_cache, sent_cache);
157
      cyclon_reply(context->pc, remote_cache, sent_cache);
158 158
      context->dst = NULL;
159 159
    }
160 160
    cache_check(context->local_cache);
......
186 186
    context->dst = nodeid_dup(context->dst);
187 187
    cache_del(context->local_cache, context->dst);
188 188
    context->flying_cache = rand_cache(context->local_cache, context->sent_entries - 1);
189
    cyclon_query(context->flying_cache, context->dst);
189
    cyclon_query(context->pc, context->flying_cache, context->dst);
190 190
  }
191 191
  cache_check(context->local_cache);
192 192

  
......
248 248
  return cache_del(context->local_cache, neighbour);
249 249
}
250 250

  
251
static int cyclon_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
252
{
253
  return cyclon_proto_change_metadata(context->pc, metadata, metadata_size);
254
}
255

  
251 256
struct peersampler_iface cyclon = {
252 257
  .init = cyclon_init,
253 258
  .change_metadata = cyclon_change_metadata,
src/TopologyManager/cyclon_proto.c
15 15
#include "cyclon_proto.h"
16 16
#include "grapes_msg_types.h"
17 17

  
18
struct cyclon_proto_context{
19
  struct topo_context *context;
20
};
18 21

  
19
int cyclon_reply(const struct peer_cache *c, struct peer_cache *local_cache)
22
struct cyclon_proto_context* cyclon_proto_init(struct nodeID *s, void *meta, int meta_size){
23
  struct cyclon_proto_context *con;
24
  con = malloc(sizeof(struct cyclon_proto_context));
25

  
26
  if (!con) return NULL;
27

  
28
  con->context = topo_proto_init(s, meta, meta_size);
29
  if (!con->context){
30
    free(con);
31
    return NULL;
32
  }
33

  
34
  return con;
35
}    
36

  
37

  
38
int cyclon_reply(struct cyclon_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache)
20 39
{
21
  return topo_reply(c, local_cache, MSG_TYPE_TOPOLOGY, CYCLON_REPLY, 0, 0);
40
  return topo_reply(context->context, c, local_cache, MSG_TYPE_TOPOLOGY, CYCLON_REPLY, 0, 0);
22 41
}
23 42

  
24
int cyclon_query(struct peer_cache *sent_cache, struct nodeID *dst)
43
int cyclon_query(struct cyclon_proto_context *context, struct peer_cache *sent_cache, struct nodeID *dst)
25 44
{
26
  return topo_query_peer(sent_cache, dst, MSG_TYPE_TOPOLOGY, CYCLON_QUERY, 0);
45
  return topo_query_peer(context->context, sent_cache, dst, MSG_TYPE_TOPOLOGY, CYCLON_QUERY, 0);
46
}
47

  
48
int cyclon_proto_change_metadata(struct cyclon_proto_context *context, void *metadata, int metadata_size)
49
{
50
  if (topo_proto_metadata_update(context->context, metadata, metadata_size) <= 0) {
51
    return -1;
52
  }
53

  
54
  return 1;
27 55
}
src/TopologyManager/cyclon_proto.h
1 1
#ifndef CYCLON_PROTO
2 2
#define CYCLON_PROTO
3 3

  
4
int cyclon_reply(const struct peer_cache *c, struct peer_cache *local_cache);
5
int cyclon_query(struct peer_cache *local_cache, struct nodeID *dst);
4
struct cyclon_proto_context;
6 5

  
6
struct cyclon_proto_context* cyclon_proto_init(struct nodeID *s, void *meta, int meta_size);
7

  
8

  
9
int cyclon_reply(struct cyclon_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache);
10
int cyclon_query(struct cyclon_proto_context *context, struct peer_cache *local_cache, struct nodeID *dst);
11

  
12
int cyclon_proto_change_metadata(struct cyclon_proto_context *context, void *metadata, int metadata_size);
7 13
#endif	/* CYCLON_PROTO */
src/TopologyManager/ncast.c
15 15
#include "net_helper.h"
16 16
#include "peersampler_iface.h"
17 17
#include "topocache.h"
18
#include "topo_proto.h"
19 18
#include "ncast_proto.h"
20 19
#include "proto.h"
21 20
#include "config.h"
......
25 24
#define DEFAULT_CACHE_SIZE 10
26 25
#define DEFAULT_MAX_TIMESTAMP 5
27 26

  
28
static uint64_t currtime;
29
static int cache_size;
30
static struct peer_cache *local_cache;
31
static bool bootstrap = true;
32
static int bootstrap_period = 2000000;
33
static int period = 10000000;
34
static int counter;
35

  
36
//TODO: context support not introduced
37
struct peersampler_context{};
27
struct peersampler_context{
28
  uint64_t currtime;
29
  int cache_size;
30
  struct peer_cache *local_cache;
31
  bool bootstrap;
32
  int bootstrap_period;
33
  int period;
34
  int counter;
35
  struct ncast_proto_context *tc;
36
};
38 37

  
39 38
static uint64_t gettime(void)
40 39
{
......
50 49
  con = (struct peersampler_context*) calloc(1,sizeof(struct peersampler_context));
51 50

  
52 51
  //Initialize context with default values
53
  //TODO: implement initialization
52
  con->bootstrap = true;
53
  con->bootstrap_period = 2000000;
54
  con->period = 10000000;
55
  con->currtime = gettime();
54 56

  
55 57
  return con;
56 58
}
57 59

  
58
static int time_to_send(void)
60
static int time_to_send(struct peersampler_context *context)
59 61
{
60
  int p = bootstrap ? bootstrap_period : period;
61
  if (gettime() - currtime > p) {
62
    currtime += p;
62
  int p = context->bootstrap ? context->bootstrap_period : context->period;
63
  if (gettime() - context->currtime > p) {
64
    context->currtime += p;
63 65

  
64 66
    return 1;
65 67
  }
......
73 75
static struct peersampler_context* ncast_init(struct nodeID *myID, void *metadata, int metadata_size, const char *config)
74 76
{
75 77
  struct tag *cfg_tags;
76
  struct peersampler_context *con;
78
  struct peersampler_context *context;
77 79
  int res, max_timestamp;
78 80

  
79
  con = ncast_context_init();
80
  if (!con) return NULL;
81
  context = ncast_context_init();
82
  if (!context) return NULL;
81 83

  
82 84
  cfg_tags = config_parse(config);
83
  res = config_value_int(cfg_tags, "cache_size", &cache_size);
85
  res = config_value_int(cfg_tags, "cache_size", &context->cache_size);
84 86
  if (!res) {
85
    cache_size = DEFAULT_CACHE_SIZE;
87
    context->cache_size = DEFAULT_CACHE_SIZE;
86 88
  }
87 89
  res = config_value_int(cfg_tags, "max_timestamp", &max_timestamp);
88 90
  if (!res) {
......
90 92
  }
91 93
  free(cfg_tags);
92 94
  
93
  local_cache = cache_init(cache_size, metadata_size, max_timestamp);
94
  if (local_cache == NULL) {
95
    free(con);
95
  context->local_cache = cache_init(context->cache_size, metadata_size, max_timestamp);
96
  if (context->local_cache == NULL) {
97
    free(context);
96 98
    return NULL;
97 99
  }
98
  topo_proto_init(myID, metadata, metadata_size);
99
  currtime = gettime();
100
  bootstrap = true;
101 100

  
102
  return con;
101
  context->tc = ncast_proto_init(myID, metadata, metadata_size);
102
  if (!context->tc){
103
    free(context->local_cache);
104
    free(context);
105
    return NULL;
106
  }
107

  
108
  return context;
103 109
}
104 110

  
105 111
static int ncast_change_metadata(struct peersampler_context *context, void *metadata, int metadata_size)
106 112
{
107
  if (topo_proto_metadata_update(metadata, metadata_size) <= 0) {
113
  if (ncast_proto_metadata_update(context->tc, metadata, metadata_size) <= 0) {
108 114
    return -1;
109 115
  }
110 116

  
......
113 119

  
114 120
static int ncast_add_neighbour(struct peersampler_context *context, struct nodeID *neighbour, void *metadata, int metadata_size)
115 121
{
116
  if (cache_add(local_cache, neighbour, metadata, metadata_size) < 0) {
122
  if (cache_add(context->local_cache, neighbour, metadata, metadata_size) < 0) {
117 123
    return -1;
118 124
  }
119
  return ncast_query_peer(local_cache, neighbour);
125
  return ncast_query_peer(context->tc, context->local_cache, neighbour);
120 126
}
121 127

  
122 128
static int ncast_parse_data(struct peersampler_context *context, const uint8_t *buff, int len)
......
133 139
      return -1;
134 140
    }
135 141

  
136
    counter++;
137
    if (counter == BOOTSTRAP_CYCLES) bootstrap = false;
142
    context->counter++;
143
    if (context->counter == BOOTSTRAP_CYCLES) context->bootstrap = false;
138 144

  
139 145
    remote_cache = entries_undump(buff + sizeof(struct topo_header), len - sizeof(struct topo_header));
140 146
    if (h->type == NCAST_QUERY) {
141
      ncast_reply(remote_cache, local_cache);
147
      ncast_reply(context->tc, remote_cache, context->local_cache);
142 148
    }
143
    new = merge_caches(local_cache, remote_cache, cache_size, &dummy);
149
    new = merge_caches(context->local_cache, remote_cache, context->cache_size, &dummy);
144 150
    cache_free(remote_cache);
145 151
    if (new != NULL) {
146
      cache_free(local_cache);
147
      local_cache = new;
152
      cache_free(context->local_cache);
153
      context->local_cache = new;
148 154
    }
149 155
  }
150 156

  
151
  if (time_to_send()) {
152
    cache_update(local_cache);
153
    ncast_query(local_cache);
157
  if (time_to_send(context)) {
158
    cache_update(context->local_cache);
159
    ncast_query(context->tc, context->local_cache);
154 160
  }
155 161

  
156 162
  return 0;
......
160 166
{
161 167
  static struct nodeID **r;
162 168

  
163
  r = realloc(r, cache_size * sizeof(struct nodeID *));
169
  r = realloc(r, context->cache_size * sizeof(struct nodeID *));
164 170
  if (r == NULL) {
165 171
    return NULL;
166 172
  }
167 173

  
168
  for (*n = 0; nodeid(local_cache, *n) && (*n < cache_size); (*n)++) {
169
    r[*n] = nodeid(local_cache, *n);
174
  for (*n = 0; nodeid(context->local_cache, *n) && (*n < context->cache_size); (*n)++) {
175
    r[*n] = nodeid(context->local_cache, *n);
170 176
    //fprintf(stderr, "Checking table[%d]\n", *n);
171 177
  }
172 178

  
......
175 181

  
176 182
static const void *ncast_get_metadata(struct peersampler_context *context, int *metadata_size)
177 183
{
178
  return get_metadata(local_cache, metadata_size);
184
  return get_metadata(context->local_cache, metadata_size);
179 185
}
180 186

  
181 187
static int ncast_grow_neighbourhood(struct peersampler_context *context, int n)
182 188
{
183
  cache_size += n;
189
  context->cache_size += n;
184 190

  
185
  return cache_size;
191
  return context->cache_size;
186 192
}
187 193

  
188 194
static int ncast_shrink_neighbourhood(struct peersampler_context *context, int n)
189 195
{
190
  if (cache_size < n) {
196
  if (context->cache_size < n) {
191 197
    return -1;
192 198
  }
193
  cache_size -= n;
199
  context->cache_size -= n;
194 200

  
195
  return cache_size;
201
  return context->cache_size;
196 202
}
197 203

  
198 204
static int ncast_remove_neighbour(struct peersampler_context *context, struct nodeID *neighbour)
199 205
{
200
  return cache_del(local_cache, neighbour);
206
  return cache_del(context->local_cache, neighbour);
201 207
}
202 208

  
203 209
struct peersampler_iface ncast = {
src/TopologyManager/ncast_proto.c
15 15
#include "ncast_proto.h"
16 16
#include "grapes_msg_types.h"
17 17

  
18
int ncast_reply(const struct peer_cache *c, struct peer_cache *local_cache)
18
struct ncast_proto_context{
19
  struct topo_context *context;
20
};
21

  
22
struct ncast_proto_context* ncast_proto_init(struct nodeID *s, void *meta, int meta_size){
23
  struct ncast_proto_context *con;
24
  con = malloc(sizeof(struct ncast_proto_context));
25

  
26
  if (!con) return NULL;
27

  
28
  con->context = topo_proto_init(s, meta, meta_size);
29
  if (!con->context){
30
    free(con);
31
    return NULL;
32
  }
33

  
34
  return con;    
35
}
36

  
37
int ncast_reply(struct ncast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache)
19 38
{
20
  return topo_reply(c, local_cache, MSG_TYPE_TOPOLOGY, NCAST_REPLY, 0, 1);
39
  return topo_reply(context->context, c, local_cache, MSG_TYPE_TOPOLOGY, NCAST_REPLY, 0, 1);
21 40
}
22 41

  
23
int ncast_query_peer(struct peer_cache *local_cache, struct nodeID *dst)
42
int ncast_query_peer(struct ncast_proto_context *context, struct peer_cache *local_cache, struct nodeID *dst)
24 43
{
25
  return topo_query_peer(local_cache, dst, MSG_TYPE_TOPOLOGY, NCAST_QUERY, 0);
44
  return topo_query_peer(context->context, local_cache, dst, MSG_TYPE_TOPOLOGY, NCAST_QUERY, 0);
26 45
}
27 46

  
28
int ncast_query(struct peer_cache *local_cache)
47
int ncast_query(struct ncast_proto_context *context, struct peer_cache *local_cache)
29 48
{
30 49
  struct nodeID *dst;
31 50

  
......
33 52
  if (dst == NULL) {
34 53
    return 0;
35 54
  }
36
  return topo_query_peer(local_cache, dst, MSG_TYPE_TOPOLOGY, NCAST_QUERY, 0);
55
  return topo_query_peer(context->context, local_cache, dst, MSG_TYPE_TOPOLOGY, NCAST_QUERY, 0);
56
}
57

  
58
int ncast_proto_metadata_update(struct ncast_proto_context *context, void *meta, int meta_size){
59
  return topo_proto_metadata_update(context->context, meta, meta_size);
37 60
}
src/TopologyManager/ncast_proto.h
1 1
#ifndef NCAST_PROTO
2 2
#define NCAST_PROTO
3 3

  
4
int ncast_reply(const struct peer_cache *c, struct peer_cache *local_cache);
5
int ncast_query(struct peer_cache *local_cache);
6
int ncast_query_peer(struct peer_cache *local_cache, struct nodeID *dst);
4
struct ncast_proto_context;
5

  
6
struct ncast_proto_context* ncast_proto_init(struct nodeID *s, void *meta, int meta_size);
7

  
8
int ncast_reply(struct ncast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache);
9
int ncast_query(struct ncast_proto_context *context, struct peer_cache *local_cache);
10
int ncast_query_peer(struct ncast_proto_context *context, struct peer_cache *local_cache, struct nodeID *dst);
11
int ncast_proto_metadata_update(struct ncast_proto_context *context, void *meta, int meta_size);
7 12

  
8 13
#endif	/* NCAST_PROTO */
src/TopologyManager/proto.h
12 12
#define TMAN_REPLY 0x04
13 13
#define CYCLON_QUERY 0x05
14 14
#define CYCLON_REPLY 0x06
15
#define CLOUDCAST_QUERY 0x07
16
#define CLOUDCAST_REPLY 0x08
15 17

  
16 18
#endif	/* PROTO */
src/TopologyManager/topo_proto.c
15 15

  
16 16
#define MAX_MSG_SIZE 1500
17 17

  
18
static struct peer_cache *myEntry;
18
struct topo_context{
19
 struct peer_cache *myEntry;
20
};
19 21

  
20
static int topo_payload_fill(uint8_t *payload, int size, struct peer_cache *c, struct nodeID *snot, int max_peers, int include_me)
22
static int topo_payload_fill(struct topo_context *context, uint8_t *payload, int size, struct peer_cache *c, struct nodeID *snot, int max_peers, int include_me)
21 23
{
22 24
  int i;
23 25
  uint8_t *p = payload;
......
25 27
  if (!max_peers) max_peers = MAX_MSG_SIZE; // just to be sure to dump the whole cache...
26 28
  p += cache_header_dump(p, c, include_me);
27 29
  if (include_me) {
28
    p += entry_dump(p, myEntry, 0, size - (p - payload));
30
    p += entry_dump(p, context->myEntry, 0, size - (p - payload));
29 31
    max_peers--;
30 32
  }
31 33
  for (i = 0; nodeid(c, i) && max_peers; i++) {
......
44 46
  return p - payload;
45 47
}
46 48

  
47
int topo_reply(const struct peer_cache *c, struct peer_cache *local_cache, int protocol, int type, int max_peers, int include_me)
49
int topo_reply(struct topo_context *context, const struct peer_cache *c, struct peer_cache *local_cache, int protocol, int type, int max_peers, int include_me)
48 50
{
49 51
  uint8_t pkt[MAX_MSG_SIZE];
50 52
  struct topo_header *h = (struct topo_header *)pkt;
......
61 63
  dst = nodeid(c, 0);
62 64
  h->protocol = protocol;
63 65
  h->type = type;
64
  len = topo_payload_fill(pkt + sizeof(struct topo_header), MAX_MSG_SIZE - sizeof(struct topo_header), local_cache, dst, max_peers, include_me);
66
  len = topo_payload_fill(context, pkt + sizeof(struct topo_header), MAX_MSG_SIZE - sizeof(struct topo_header), local_cache, dst, max_peers, include_me);
65 67

  
66
  res = len > 0 ? send_to_peer(nodeid(myEntry, 0), dst, pkt, sizeof(struct topo_header) + len) : len;
68
  res = len > 0 ? send_to_peer(nodeid(context->myEntry, 0), dst, pkt, sizeof(struct topo_header) + len) : len;
67 69

  
68 70
  return res;
69 71
}
70 72

  
71
int topo_query_peer(struct peer_cache *local_cache, struct nodeID *dst, int protocol, int type, int max_peers)
73
int topo_query_peer(struct topo_context *context, struct peer_cache *local_cache, struct nodeID *dst, int protocol, int type, int max_peers)
72 74
{
73 75
  uint8_t pkt[MAX_MSG_SIZE];
74 76
  struct topo_header *h = (struct topo_header *)pkt;
......
76 78

  
77 79
  h->protocol = protocol;
78 80
  h->type = type;
79
  len = topo_payload_fill(pkt + sizeof(struct topo_header), MAX_MSG_SIZE - sizeof(struct topo_header), local_cache, dst, max_peers, 1);
80
  return len > 0  ? send_to_peer(nodeid(myEntry, 0), dst, pkt, sizeof(struct topo_header) + len) : len;
81
  len = topo_payload_fill(context, pkt + sizeof(struct topo_header), MAX_MSG_SIZE - sizeof(struct topo_header), local_cache, dst, max_peers, 1);
82
  return len > 0  ? send_to_peer(nodeid(context->myEntry, 0), dst, pkt, sizeof(struct topo_header) + len) : len;
81 83
}
82 84

  
83
int topo_proto_metadata_update(void *meta, int meta_size)
85
int topo_proto_metadata_update(struct topo_context *context, void *meta, int meta_size)
84 86
{
85
  if (cache_metadata_update(myEntry, nodeid(myEntry, 0), meta, meta_size) > 0) {
87
  if (cache_metadata_update(context->myEntry, nodeid(context->myEntry, 0), meta, meta_size) > 0) {
86 88
    return 1;
87 89
  }
88 90

  
89 91
  return -1;
90 92
}
91 93

  
92
int topo_proto_init(struct nodeID *s, void *meta, int meta_size)
94
struct topo_context* topo_proto_init(struct nodeID *s, void *meta, int meta_size)
93 95
{
94
  if (!myEntry) {
95
    myEntry = cache_init(1, meta_size, 0);
96
    cache_add(myEntry, s, meta, meta_size);
97
  }
96
  struct topo_context* con;
97

  
98
  con = malloc(sizeof(struct topo_context));
99
  if (!con) return NULL;
100

  
101
  con->myEntry = cache_init(1, meta_size, 0);
102
  cache_add(con->myEntry, s, meta, meta_size);
98 103

  
99
  return 0;
104
  return con;
100 105
}
src/TopologyManager/topo_proto.h
1 1
#ifndef TOPO_PROTO
2 2
#define TOPO_PROTO
3 3

  
4
int topo_reply(const struct peer_cache *c, struct peer_cache *local_cache, int protocol, int type, int max_peers, int include_me);
5
int topo_query_peer(struct peer_cache *local_cache, struct nodeID *dst, int protocol, int type, int max_peers);
4
struct topo_context;
6 5

  
7
int topo_proto_metadata_update(void *meta, int meta_size);
8
int topo_proto_init(struct nodeID *s, void *meta, int meta_size);
6
int topo_reply(struct topo_context *context, const struct peer_cache *c, struct peer_cache *local_cache, int protocol, int type, int max_peers, int include_me);
7
int topo_query_peer(struct topo_context *context, struct peer_cache *local_cache, struct nodeID *dst, int protocol, int type, int max_peers);
8

  
9
int topo_proto_metadata_update(struct topo_context *context, void *meta, int meta_size);
10
struct topo_context* topo_proto_init(struct nodeID *s, void *meta, int meta_size);
9 11

  
10 12
#endif	/* TOPO_PROTO */

Also available in: Unified diff