Statistics
| Branch: | Revision:

streamers / topology.c @ ad8a7c4a

History | View | Annotate | Download (14.1 KB)

1 7f591208 Csaba Kiraly
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 2f846ddd Csaba Kiraly
#include <stdint.h>
8 fcb5c29b Csaba Kiraly
#include <stdio.h>
9 2f846ddd Csaba Kiraly
#include <sys/time.h>
10
#include <time.h>
11 7e39164f MarcoBiazzini
#include <stdlib.h>
12 c2d8fe2d Csaba Kiraly
#include <string.h>
13 2f846ddd Csaba Kiraly
14 7e39164f MarcoBiazzini
#include <math.h>
15 74a5d4ae CsabaKiraly
#include <net_helper.h>
16 2f846ddd Csaba Kiraly
#include <peerset.h>
17
#include <peer.h>
18 33d23b91 MarcoBiazzini
#include <grapes_msg_types.h>
19 2f846ddd Csaba Kiraly
#include <topmanager.h>
20 7e39164f MarcoBiazzini
#include <tman.h>
21 2f846ddd Csaba Kiraly
22 63ebb93d CsabaKiraly
#include "compatibility/timer.h"
23
24 2f846ddd Csaba Kiraly
#include "topology.h"
25 16ae4927 Csaba Kiraly
#include "nodeid_set.h"
26 132f8b40 Csaba Kiraly
#include "streaming.h"
27 4729fe2d Csaba Kiraly
#include "dbg.h"
28 4bf91643 Csaba Kiraly
#include "measures.h"
29 0877a9f6 Csaba Kiraly
#include "streamer.h"
30 4bf91643 Csaba Kiraly
31 a4933dfb Csaba Kiraly
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
32 b4419dd1 Csaba Kiraly
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
33 31ba5789 Csaba Kiraly
34 14fe383b Csaba Kiraly
#ifndef NAN        //NAN is missing in some old math.h versions
35
#define NAN            (0.0/0.0)
36
#endif
37
38 0cc44d18 Csaba Kiraly
double desired_bw = 1000000;
39 970bd24a napawine
double desired_rtt = 0.2;
40 0cc44d18 Csaba Kiraly
double alpha_target = 0.4;
41
double topo_mem = 0.7;
42 acbc9155 Csaba Kiraly
43 483dec26 Csaba Kiraly
bool topo_out = true; //peer selects out-neighbours
44 3d73068a Csaba Kiraly
bool topo_in = true; //peer selects in-neighbours (combined means bidirectional)
45 483dec26 Csaba Kiraly
46 f6e9f400 Csaba Kiraly
bool topo_keep_best = false;
47
bool topo_add_best = false;
48
49 5f483e66 Csaba Kiraly
int NEIGHBORHOOD_TARGET_SIZE = 30;
50 01a25012 MarcoBiazzini
#define TMAN_MAX_IDLE 10
51 821cc939 CsabaKiraly
#define TMAN_LOG_EVERY 1000
52 d78e9c6a Csaba Kiraly
53 9176d3d1 Csaba Kiraly
#define STREAMER_TOPOLOGY_MSG_ADD 0
54
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
55
56 fcb5c29b Csaba Kiraly
static struct peerset *pset;
57 9b36d077 Csaba Kiraly
static struct timeval tout_bmap = {20, 0};
58 7e39164f MarcoBiazzini
static int counter = 0;
59
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
60
static tmanRankingFunction rankFunct = simpleRanker;
61 e4247dda Csaba Kiraly
62 cc0cd459 Csaba Kiraly
struct metadata {
63 d2514625 Csaba Kiraly
  uint16_t cb_size;
64 9d55b3b7 Csaba Kiraly
  uint16_t cps;
65 b1d41fdb Csaba Kiraly
  float capacity;
66 5be2e90d Csaba Kiraly
  float recv_delay;
67 e4247dda Csaba Kiraly
} __attribute__((packed));
68
69 cc0cd459 Csaba Kiraly
static struct metadata my_metadata;
70 7e39164f MarcoBiazzini
static int cnt = 0;
71
static struct nodeID *me = NULL;
72 9176d3d1 Csaba Kiraly
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
73 9ffd84bb MarcoBiazzini
static struct nodeID ** neighbors;
74 7e39164f MarcoBiazzini
75
static void update_metadata(void) {
76 b88c9f4a Csaba Kiraly
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
77 5be2e90d Csaba Kiraly
        my_metadata.recv_delay = get_receive_delay();
78 9d55b3b7 Csaba Kiraly
        my_metadata.cps = get_chunks_per_sec();
79 b1d41fdb Csaba Kiraly
        my_metadata.capacity = get_capacity();
80 7e39164f MarcoBiazzini
}
81
82
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
83
84
        double t,p1,p2;
85
        t = *((const double *)tin);
86
        p1 = *((const double *)p1in);
87
        p2 = *((const double *)p2in);
88
89
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
90
        else if (isnan(p1)) return 2;
91
        else if (isnan(p2)) return 1;
92
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
93
94
}
95
96
int topologyInit(struct nodeID *myID, const char *config)
97
{
98 33d23b91 MarcoBiazzini
        int i;
99 9f1da074 Csaba Kiraly
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
100 33d23b91 MarcoBiazzini
                bind_msg_type(mTypes[i]);
101 7e39164f MarcoBiazzini
        update_metadata();
102
        me = myID;
103
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
104
}
105
106
void topologyShutdown(void)
107
{
108
}
109
110
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
111
{
112
        // TODO: check this!! Just to use this function to bootstrap ncast...
113 cc0cd459 Csaba Kiraly
        struct metadata m = {0};        //TODO: check what metadata option should mean
114
115 7e39164f MarcoBiazzini
        if (counter < TMAN_MAX_IDLE)
116 cc0cd459 Csaba Kiraly
                return topAddNeighbour(neighbour,&m,sizeof(m));
117
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
118 7e39164f MarcoBiazzini
}
119
120
static int topoParseData(const uint8_t *buff, int len)
121
{
122
        int res = -1,ncs = 0,msize;
123
        const struct nodeID **n; const void *m;
124
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
125
                res = topParseData(buff,len);
126 c65ac153 Csaba Kiraly
//                if (counter <= TMAN_MAX_IDLE)
127
//                        counter++;
128 7e39164f MarcoBiazzini
        }
129
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
130
        {
131
                n = topGetNeighbourhood(&ncs);
132
                if (ncs) {
133
                m = topGetMetadata(&msize);
134
                res = tmanParseData(buff,len,n,ncs,m,msize);
135
                }
136
        }
137
  return res;
138
}
139
140
static const struct nodeID **topoGetNeighbourhood(int *n)
141
{
142
        int i; double d;
143
        if (counter > TMAN_MAX_IDLE) {
144 9ffd84bb MarcoBiazzini
                uint8_t *mdata; int msize;
145 7e39164f MarcoBiazzini
                *n = tmanGetNeighbourhoodSize();
146 9ffd84bb MarcoBiazzini
                if (neighbors) free(neighbors);
147 7e39164f MarcoBiazzini
                neighbors = calloc(*n,sizeof(struct nodeID *));
148
                tmanGetMetadata(&msize);
149
                mdata = calloc(*n,msize);
150
                tmanGivePeers(*n,neighbors,(void *)mdata);
151
152 0711317b CsabaKiraly
                if (cnt % TMAN_LOG_EVERY == 0) {
153 5be2e90d Csaba Kiraly
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
154 7e39164f MarcoBiazzini
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
155
                                d = *((double *)(mdata+i*msize));
156
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
157
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
158
                        }
159
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
160
                }
161
162
                free(mdata);
163
                return (const struct nodeID **)neighbors;
164
        }
165
        else
166
                return topGetNeighbourhood(n);
167
}
168 2f846ddd Csaba Kiraly
169 685c4dd0 MarcoBiazzini
static void topoAddToBL (struct nodeID *id)
170
{
171
        if (counter >= TMAN_MAX_IDLE)
172
                tmanAddToBlackList(id);
173
//        else
174
                topAddToBlackList(id);
175
}
176 fcb5c29b Csaba Kiraly
177 9176d3d1 Csaba Kiraly
//TODO: send metadata as well
178
static int send_topo_msg(struct nodeID *dst, uint8_t type)
179
{
180
  char msg[2];
181
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
182
  msg[1] = type;
183
  return send_to_peer(me, dst, msg, 2);
184
}
185
186 2dab7813 Csaba Kiraly
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
187 fcb5c29b Csaba Kiraly
{
188 2dab7813 Csaba Kiraly
  if (local) {
189 b88c9f4a Csaba Kiraly
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
190 fcb5c29b Csaba Kiraly
      peerset_add_peer(pset, id);
191 b88c9f4a Csaba Kiraly
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
192 b1d41fdb Csaba Kiraly
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
193 4bf91643 Csaba Kiraly
      /* add measures here */
194
      add_measures(id);
195 b0225995 Csaba Kiraly
      send_bmap(id);
196 2dab7813 Csaba Kiraly
  }
197
  if (remote) {
198 9176d3d1 Csaba Kiraly
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
199
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
200 2dab7813 Csaba Kiraly
  }
201 9176d3d1 Csaba Kiraly
}
202
203 2dab7813 Csaba Kiraly
static void remove_peer(const struct nodeID *id, bool local, bool remote)
204 fcb5c29b Csaba Kiraly
{
205 2dab7813 Csaba Kiraly
  if (local) {
206 fcb5c29b Csaba Kiraly
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
207 4bf91643 Csaba Kiraly
      /* add measures here */
208
      delete_measures(id);
209 fcb5c29b Csaba Kiraly
      peerset_remove_peer(pset, id);
210 2dab7813 Csaba Kiraly
  }
211
  if (remote) {
212 9176d3d1 Csaba Kiraly
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
213
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
214 2dab7813 Csaba Kiraly
  }
215 9176d3d1 Csaba Kiraly
}
216
217 a26a0456 Csaba Kiraly
//get the rtt. Currenly only MONL version is supported
218 89ccef52 Csaba Kiraly
static double get_rtt_of(const struct nodeID* n){
219 acbc9155 Csaba Kiraly
#ifdef MONL
220 896704e9 Csaba Kiraly
  return get_rtt(n);
221 acbc9155 Csaba Kiraly
#else
222 a26a0456 Csaba Kiraly
  return NAN;
223 acbc9155 Csaba Kiraly
#endif
224
}
225
226 9818ca86 Csaba Kiraly
//get the declared capacity of a node
227
static double get_capacity_of(const struct nodeID* n){
228
  struct peer *p = peerset_get_peer(pset, n);
229
  if (p) {
230
    return p->capacity;
231
  }
232
233
  return NAN;
234
}
235
236 a26a0456 Csaba Kiraly
//returns: 1:yes 0:no -1:unknown
237 89ccef52 Csaba Kiraly
int desiredness(const struct nodeID* n) {
238 a26a0456 Csaba Kiraly
  double rtt = get_rtt_of(n);
239 9818ca86 Csaba Kiraly
  double bw =  get_capacity_of(n);
240
241
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
242
    return -1;
243
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
244
    return 1;
245
  }
246 a26a0456 Csaba Kiraly
247 9818ca86 Csaba Kiraly
  return 0;
248 a26a0456 Csaba Kiraly
}
249
250 89ccef52 Csaba Kiraly
bool is_desired(const struct nodeID* n) {
251 a9e55854 Csaba Kiraly
  return (desiredness(n) == 1);
252
}
253
254 f6e9f400 Csaba Kiraly
int cmp_rtt(const struct nodeID* a, const struct nodeID* b) {
255
  double ra, rb;
256
  ra = get_rtt_of(a);
257 ad8a7c4a Csaba Kiraly
  rb = get_rtt_of(b);
258 f6e9f400 Csaba Kiraly
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
259
  else if (isnan(rb) || ra < rb) return -1;
260
  else return 1;
261
}
262
263
int vcmp_rtt(const void* a, const void* b) {
264
  return cmp_rtt((const struct nodeID*)a, (const struct nodeID*)b);
265
}
266
267 2f846ddd Csaba Kiraly
// currently it just makes the peerset grow
268 74a5d4ae CsabaKiraly
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
269 2f846ddd Csaba Kiraly
{
270 6b2924d4 Csaba Kiraly
  int n_ids, metasize, i, newids_size, max_ids;
271
  static const struct nodeID **newids;
272 cc0cd459 Csaba Kiraly
  static const struct metadata *metas;
273 2f846ddd Csaba Kiraly
  struct peer *peers;
274
  struct timeval tnow, told;
275 304607e2 Csaba Kiraly
  static const struct nodeID **savedids;
276
  static int savedids_size;
277 2f846ddd Csaba Kiraly
278 eac0d4ce Csaba Kiraly
  if timerisset(&tout_bmap) {
279
    gettimeofday(&tnow, NULL);
280
    timersub(&tnow, &tout_bmap, &told);
281
    peers = peerset_get_peers(pset);
282
    for (i = 0; i < peerset_size(pset); i++) {
283
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
284
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
285 d84d066d Csaba Kiraly
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
286 eac0d4ce Csaba Kiraly
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
287
        topoAddToBL(peers[i].id);
288 2dab7813 Csaba Kiraly
        remove_peer(peers[i--].id, true, true);
289 eac0d4ce Csaba Kiraly
        //}
290
      }
291
    }
292
  }
293
294 8a49328f CsabaKiraly
  if (cnt++ % 100 == 0) {
295 7e39164f MarcoBiazzini
        update_metadata();
296 59ad7d83 Csaba Kiraly
    if (counter > TMAN_MAX_IDLE) {
297 7e39164f MarcoBiazzini
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
298 59ad7d83 Csaba Kiraly
    }
299 7e39164f MarcoBiazzini
  }
300
301 9176d3d1 Csaba Kiraly
  //handle explicit add/remove messages
302 483dec26 Csaba Kiraly
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
303 9176d3d1 Csaba Kiraly
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
304
    if (len != 2) {
305
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
306
      return;
307
    }
308
    switch (buff[1]) {
309
      case STREAMER_TOPOLOGY_MSG_ADD:
310 65a62f30 Csaba Kiraly
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
311 4b40812a Csaba Kiraly
        if (!peerset_get_peer(pset, from))
312
                add_peer(from, NULL, true, false);
313 9176d3d1 Csaba Kiraly
        break;
314
      case STREAMER_TOPOLOGY_MSG_REMOVE:
315 65a62f30 Csaba Kiraly
        ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
316 2dab7813 Csaba Kiraly
        remove_peer(from, true, false);
317 9176d3d1 Csaba Kiraly
        break;
318
      default:
319
        fprintf(stderr, "Bad streamer topo message received!\n");
320
    }
321
    reg_neigh_size(peerset_size(pset));
322
    return;
323
  }
324
325 7e39164f MarcoBiazzini
  topoParseData(buff, len);
326 de351684 Csaba Kiraly
327 6828703c Csaba Kiraly
  if (!buff) {
328
    reg_neigh_size(peerset_size(pset));
329
    return;
330
  }
331 de351684 Csaba Kiraly
332 e0f05cb2 napawine
  peers = peerset_get_peers(pset);
333 acbc9155 Csaba Kiraly
  n_ids = peerset_size(pset);
334 6b2924d4 Csaba Kiraly
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
335
  metas = topGetMetadata(&metasize);        //TODO: check metasize
336 304607e2 Csaba Kiraly
  max_ids = n_ids + savedids_size + newids_size;
337 d84d066d Csaba Kiraly
  ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
338 d8a205cd Csaba Kiraly
  {
339
    int desired_part;
340 6f490e6a Csaba Kiraly
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
341 dd2e7bcf Csaba Kiraly
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
342 6f490e6a Csaba Kiraly
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
343 acbc9155 Csaba Kiraly
344 79c926d0 Csaba Kiraly
    if (topo_out) {
345
      for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
346
        oldids[oldids_size++] = peers[i].id;
347
        fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
348
      }
349
    } else {
350
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
351
        oldids[oldids_size++] = savedids[i];
352
        fprintf(stderr," %s - RTT: %f\n", node_addr(savedids[i]) , get_rtt_of(savedids[i]));
353
      }
354
      savedids_size = 0;
355
      free(savedids);
356 acbc9155 Csaba Kiraly
    }
357 d8a205cd Csaba Kiraly
358 6f490e6a Csaba Kiraly
    // select the topo_mem portion of peers to be kept (uniform random)
359 f6e9f400 Csaba Kiraly
    if (topo_keep_best) {
360
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_rtt);
361
    } else {
362
      nidset_shuffle(oldids, oldids_size);
363
    }
364 6f490e6a Csaba Kiraly
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
365
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
366 6b2924d4 Csaba Kiraly
367 6f490e6a Csaba Kiraly
    // compose list of known nodeids
368 6b2924d4 Csaba Kiraly
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
369
370 6f490e6a Csaba Kiraly
    // compose list of candidate nodeids
371
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
372
373 d8a205cd Csaba Kiraly
    // select the alpha_target portion of desired peers
374 6f490e6a Csaba Kiraly
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
375
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired);
376 f6e9f400 Csaba Kiraly
    if (topo_add_best) {
377
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_rtt);
378
    } else {
379
      nidset_shuffle(desireds, desireds_size);
380
    }
381 6f490e6a Csaba Kiraly
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
382 d8a205cd Csaba Kiraly
383
    // random from the rest
384 6f490e6a Csaba Kiraly
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
385 d8a205cd Csaba Kiraly
    nidset_shuffle(others, others_size);
386 69dfa8b1 Csaba Kiraly
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
387 3131d7d1 Csaba Kiraly
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
388
389 dd2e7bcf Csaba Kiraly
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
390
            selecteds_size, nodeids_size,
391
            keep_size, oldids_size,
392
            MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
393
            random_size, others_size);
394 6b2924d4 Csaba Kiraly
    // add new ones
395
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
396
    for (i = 0; i < toadds_size; i++) {
397 dd2e7bcf Csaba Kiraly
      int j;
398 6b2924d4 Csaba Kiraly
      //searching for the metadata
399
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
400
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
401 483dec26 Csaba Kiraly
        add_peer(newids[j], &metas[j], topo_out, topo_in);
402 6b2924d4 Csaba Kiraly
      } else {
403 48a8f955 Csaba Kiraly
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
404 6b2924d4 Csaba Kiraly
      }
405
    }
406
407 d8a205cd Csaba Kiraly
    // finally, remove those not needed
408
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
409 f6d28931 Csaba Kiraly
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
410 d8a205cd Csaba Kiraly
    for (i = 0; i < toremoves_size; i++) {
411
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
412 483dec26 Csaba Kiraly
      remove_peer(toremoves[i], topo_out, topo_in);
413 acbc9155 Csaba Kiraly
    }
414 d8a205cd Csaba Kiraly
    fprintf(stderr,"Topo remove end\n");
415 79c926d0 Csaba Kiraly
416
    if (!topo_out) {
417
      savedids = malloc(selecteds_size * sizeof(savedids[0]));        //TODO: handle errors
418
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
419
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
420
      }
421
      for (i = 0; i < oldids_size; i++) {
422
        nodeid_free(oldids[i]);
423
      }
424
    }
425 52c2cd41 Csaba Kiraly
  }
426
427 e4ef30e3 Csaba Kiraly
  reg_neigh_size(peerset_size(pset));
428 2f846ddd Csaba Kiraly
}
429 fcb5c29b Csaba Kiraly
430
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
431
{
432
  struct peer *p = peerset_get_peer(pset, id);
433
  if (!p) {
434 cb6be841 Csaba Kiraly
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
435 fcb5c29b Csaba Kiraly
    if (reg) {
436 483dec26 Csaba Kiraly
      add_peer(id,NULL, topo_out, false);
437 825df88c Csaba Kiraly
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
438 fcb5c29b Csaba Kiraly
      p = peerset_get_peer(pset,id);
439
    }
440
  }
441
442
  return p;
443
}
444
445 7e39164f MarcoBiazzini
int peers_init(void)
446 fcb5c29b Csaba Kiraly
{
447
  fprintf(stderr,"peers_init\n");
448
  pset = peerset_init(0);
449 0bb2425e Csaba Kiraly
  return pset ? 1 : 0;
450 fcb5c29b Csaba Kiraly
}
451
452 7e39164f MarcoBiazzini
struct peerset *get_peers(void)
453 fcb5c29b Csaba Kiraly
{
454
  return pset;
455
}