Statistics
| Branch: | Revision:

streamers / topology.c @ 63a52683

History | View | Annotate | Download (22.1 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30
#include "hrc.h"
31
#include "blacklist.h"
32

    
33
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
34
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
35

    
36
#define UPDATE_PEERS_TIMEOUT 5
37

    
38
double desired_bw = 0;
39
double desired_rtt = 0.2;
40
double alpha_target = 0.5;
41
double topo_mem = 0;
42

    
43
bool topo_out = true; //peer selects out-neighbours
44
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
45

    
46
bool topo_keep_best = false;
47
bool topo_add_best = false;
48
bool topo_black_list = false;
49

    
50
const char* topo_keep_best_policy = "";
51
const char* topo_add_best_policy = "";
52
const char* topo_black_list_policy = "";
53

    
54
int NEIGHBORHOOD_TARGET_SIZE = 30;
55
#define TMAN_MAX_IDLE 10
56
#define TMAN_LOG_EVERY 1000
57

    
58
#define STREAMER_TOPOLOGY_MSG_ADD 0
59
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
60

    
61
static struct peerset *pset;
62
static struct timeval tout_bmap = {20, 0};
63
static int counter = 0;
64
static int counter2 = 0;
65
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
66
static tmanRankingFunction rankFunct = simpleRanker;
67

    
68
struct metadata {
69
  uint16_t cb_size;
70
  uint16_t cps;
71
  float capacity;
72
  float recv_delay;
73
} __attribute__((packed));
74

    
75
static struct metadata my_metadata;
76
static int cnt = 0;
77
static struct nodeID *me = NULL;
78
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
79
static struct nodeID ** neighbors;
80
static struct timeval last_time_updated_peers;
81

    
82
static void update_metadata(void) {
83
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
84
        my_metadata.recv_delay = get_receive_delay();
85
        my_metadata.cps =  get_running_offer_threads();
86
        my_metadata.capacity = get_capacity();
87
}
88

    
89
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
90

    
91
        double t,p1,p2;
92
        t = *((const double *)tin);
93
        p1 = *((const double *)p1in);
94
        p2 = *((const double *)p2in);
95

    
96
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
97
        else if (isnan(p1)) return 2;
98
        else if (isnan(p2)) return 1;
99
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
100

    
101
}
102

    
103
int topologyInit(struct nodeID *myID, const char *config)
104
{
105
        int i;
106
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
107
                bind_msg_type(mTypes[i]);
108
        update_metadata();
109
        me = myID;
110
        struct timeval tnow;
111
        gettimeofday(&tnow, NULL);
112
        gettimeofday(&last_time_updated_peers, NULL);
113
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
114
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
115
}
116

    
117
void topologyShutdown(void)
118
{
119
}
120

    
121
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
122
{
123
        // TODO: check this!! Just to use this function to bootstrap ncast...
124
        struct metadata m = {0};        //TODO: check what metadata option should mean
125

    
126
        if (counter < TMAN_MAX_IDLE)
127
                return topAddNeighbour(neighbour,&m,sizeof(m));
128
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
129
}
130

    
131
static int topoParseData(const uint8_t *buff, int len)
132
{
133
        int res = -1,ncs = 0,msize;
134
        const struct nodeID **n; const void *m;
135
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
136
                res = topParseData(buff,len);
137
//                 if (counter <= TMAN_MAX_IDLE)
138
//                         counter++;
139
        }
140
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
141
        {
142
                n = topGetNeighbourhood(&ncs);
143
                if (ncs) {
144
                m = topGetMetadata(&msize);
145
                res = tmanParseData(buff,len,n,ncs,m,msize);
146
                }
147
        }
148
  return res;
149
}
150

    
151
static const struct nodeID **topoGetNeighbourhood(int *n)
152
{
153
        int i; double d;
154
        if (counter > TMAN_MAX_IDLE) {
155
                uint8_t *mdata; int msize;
156
                *n = tmanGetNeighbourhoodSize();
157
                if (neighbors) free(neighbors);
158
                neighbors = calloc(*n,sizeof(struct nodeID *));
159
                tmanGetMetadata(&msize);
160
                mdata = calloc(*n,msize);
161
                tmanGivePeers(*n,neighbors,(void *)mdata);
162

    
163
                if (cnt % TMAN_LOG_EVERY == 0) {
164
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
165
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
166
                                d = *((double *)(mdata+i*msize));
167
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
168
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
169
                        }
170
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
171
                }
172

    
173
                free(mdata);
174
                return (const struct nodeID **)neighbors;
175
        }
176
        else
177
                return topGetNeighbourhood(n);
178
}
179

    
180
static void topoAddToBL (struct nodeID *id)
181
{
182
        if (counter >= TMAN_MAX_IDLE)
183
                tmanAddToBlackList(id);
184
        topAddToBlackList(id);
185
}
186

    
187
//TODO: send metadata as well
188
static int send_topo_msg(struct nodeID *dst, uint8_t type)
189
{
190
  char msg[2];
191
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
192
  msg[1] = type;
193
  return send_to_peer(me, dst, msg, 2);
194
}
195

    
196
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
197
{
198
  if (local) {
199
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
200
      peerset_add_peer(pset, id);
201
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
202
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
203
      /* add measures here */
204
      add_measures(id);
205
      send_bmap(id);
206
  }
207
  if (remote) {
208
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
209
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
210
  }
211
}
212

    
213
static void remove_peer(const struct nodeID *id, bool local, bool remote)
214
{
215
  if (local) {
216
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
217
      /* delete measures here */
218
      delete_measures(id);
219
      peerset_remove_peer(pset, id);
220
  }
221
  if (remote) {
222
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
223
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
224
  }
225
}
226

    
227
//get the rtt. Currenly only MONL version is supported
228
static double get_rtt_of(const struct nodeID* n){
229
#ifdef MONL
230
  return get_rtt(n);
231
#else
232
  return NAN;
233
#endif
234
}
235

    
236
//get the declared capacity of a node
237
static double get_capacity_of(const struct nodeID* n){
238
  struct peer *p = peerset_get_peer(pset, n);
239
  if (p) {
240
    return p->capacity;
241
  }
242

    
243
  return NAN;
244
}
245

    
246
// //get the declared capacity of a node
247
// static double get_capacity_of_newid(const struct nodeID* n){
248
//   int i = 0;
249
//   for (i = 0; i < newids_size; i++){
250
//           if (strcmp(node_addr(n), node_addr(newids[i]))) {
251
//                   return metas[i].capacity;
252
//           }
253
//   }
254
//   return NAN;
255
// }
256

    
257

    
258
// FF//get the declared capacity of a node
259
// static double get_offers_threads_of(const struct nodeID* n){
260
//   struct peer *p = peerset_get_peer(pset, n);
261
//   if (p) {
262
//     return p->offers_threads;
263
//   }
264
// 
265
//   return NAN;
266
// }
267

    
268

    
269
static double get_rx_bytes_chunks_of(const struct nodeID* n){
270
#ifdef MONL
271
  return get_rx_bytes_chunks(n);
272
#else
273
  return NAN;
274
#endif
275
}
276

    
277
static double get_transmitter_lossrate_of(const struct nodeID* n){
278
#ifdef MONL
279
  return get_transmitter_lossrate_of(n);
280
#else
281
  return NAN;
282
#endif
283
}
284

    
285
//returns: 1:yes 0:no -1:unknown
286
int desiredness(const struct nodeID* n) {
287
  double rtt = get_rtt_of(n);
288
  double bw =  get_capacity_of(n);
289

    
290
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
291
    return -1;
292
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
293
    return 1;
294
  }
295

    
296
  return 0;
297
}
298

    
299
// equal to desiredness, but considers blacklist
300
//returns: 1:yes 0:no -1:unknown
301
int desiredness_unbled(const struct nodeID* n) {
302
  double rtt = get_rtt_of(n);
303
  double bw =  get_capacity_of(n);
304
  bool bled = black_listed(node_addr(n));
305

    
306
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
307
    return -1;
308
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw) && (!bled)) {
309
    return 1;
310
  }
311

    
312
  return 0;
313
}
314

    
315
//returns: 1:yes 0:no -1:unknown
316
int blacklistness_lossrate(const struct nodeID* n) {
317
  double loss_rate = get_transmitter_lossrate(n);
318
  
319
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
320
  
321
  if (isnan(loss_rate)) {
322
    return -1;
323
  } else if (loss_rate > 0.10) {
324
    return 1;
325
  }
326

    
327
  return 0;
328
}
329

    
330
bool is_desired(const struct nodeID* n) {
331
  return (desiredness(n) == 1);
332
}
333

    
334
bool is_desired_unbled(const struct nodeID* n) {
335
  return (desiredness_unbled(n) == 1);
336
}
337

    
338
bool is_to_blacklist(const struct nodeID* n) {
339
  return (blacklistness_lossrate(n) == 1);
340
}
341

    
342
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
343
  double ra, rb;
344
  ra = get_rtt_of(*a);
345
  rb = get_rtt_of(*b);
346
  fprintf(stderr, "RTTS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
347
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
348
  else if (isnan(rb) || ra < rb) return -1;
349
  else return 1;
350
}
351

    
352
int vcmp_rtt(const void* a, const void* b) {
353
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
354
}
355

    
356
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
357
  double ra, rb;
358
  ra = -( get_capacity_of(*a) );
359
  rb = -( get_capacity_of(*b) );
360
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
361
  else if (isnan(rb) || ra < rb) return -1;
362
  else return 1;
363
}
364

    
365
int vcmp_bw(const void* a, const void* b) {
366
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
367
}
368

    
369
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
370
  double ra, rb;
371
  ra = -( get_capacity_of(*a)*get_rtt_of(*a) );
372
  rb = -( get_capacity_of(*b)*get_rtt_of(*b) );
373
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
374
  else if (isnan(rb) || ra < rb) return -1;
375
  else return 1;
376
}
377

    
378
int vcmp_rtt_bw(const void* a, const void* b) {
379
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
380
}
381

    
382
// int cmp_offerratio_hopcount_queuedelay(const struct nodeID* a, const struct nodeID* b) {
383
//   double ra, rb;
384
//   ra = get_capacity_of(a)*get_rtt_of(a);
385
//   rb = get_capacity_of(b)*get_rtt_of(b);
386
//   if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
387
//   else if (isnan(rb) || ra < rb) return -1;
388
//   else return 1;
389
// }
390

    
391
// int vcmp_offerratio_hopcount_queuedelay(const void* a, const void* b) {
392
//   return cmp_rtt((const struct nodeID*)a, (const struct nodeID*)b);
393
// }
394

    
395
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
396
  double ra, rb;
397
  ra = -(get_rx_bytes_chunks_of(*a));
398
  rb = -(get_rx_bytes_chunks_of(*b));
399
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
400
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
401
        return 0;
402
  else if (isnan(rb) || ra < rb) 
403
        return -1;
404
  else 
405
        return 1;
406
}
407

    
408
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
409
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
410
}
411

    
412
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
413
  double ra, rb;
414
  ra = -( get_transmitter_lossrate(*a) );
415
  rb = -( get_transmitter_lossrate(*b) );
416
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
417
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
418
        return 0;
419
  else if (isnan(rb) || ra < rb) 
420
        return -1;
421
  else 
422
        return 1;
423
}
424

    
425
int vcmp_packet_loss(const void* a, const void* b) {
426
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
427
}
428

    
429
// Use add best policy specified at command line
430
int vcmp_add_best(const void* a, const void* b) {
431
  if (strcmp(topo_add_best_policy, "BW") == 0) {
432
          return vcmp_bw(a, b);
433
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
434
          return vcmp_rtt( a,  b);
435
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
436
          return vcmp_rtt_bw( a,  b);
437
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
438
          return vcmp_rtt_bw( a, b);
439
  } else {
440
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
441
    return 1;
442
  }  
443
}
444

    
445
// Use keep best policy specified at command line
446
int vcmp_keep_best(const void* a, const void* b) {
447
  if (strcmp(topo_keep_best_policy, "BW") == 0) {
448
          return vcmp_bw(a, b);
449
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
450
          return vcmp_rx_bytes_chunks(a, b);
451
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
452
          return vcmp_packet_loss( a, b);
453
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
454
          return vcmp_rx_bytes_chunks( a, b);
455
  } else {
456
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
457
    return 1;
458
  }  
459
}
460

    
461
// currently it just makes the peerset grow
462
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
463
{
464
  int n_ids, metasize, i, max_ids;
465
  struct nodeID **newids;
466
  int newids_size;
467
  struct metadata *metas;
468
  
469
  struct peer *peers;
470
  struct timeval tnow, told;
471
  static const struct nodeID **savedids;
472
  static int savedids_size;
473
  
474
  if timerisset(&tout_bmap) {
475
    gettimeofday(&tnow, NULL);
476
    timersub(&tnow, &tout_bmap, &told);
477
    peers = peerset_get_peers(pset);
478
    for (i = 0; i < peerset_size(pset); i++) {
479
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
480
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
481
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
482
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
483
        topoAddToBL(peers[i].id);
484
        remove_peer(peers[i--].id, true, true);
485
        //}
486
      }
487
    }
488
  }
489

    
490
  if (cnt++ % 100 == 0) {
491
        update_metadata();
492
    if (counter > TMAN_MAX_IDLE) {
493
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
494
    }
495
  }
496

    
497
  //handle explicit add/remove messages
498
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
499
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
500
    if (len != 2) {
501
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
502
      return;
503
    }
504
    switch (buff[1]) {
505
      case STREAMER_TOPOLOGY_MSG_ADD:
506
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
507
        if (!peerset_get_peer(pset, from)) {
508
                counter2++;
509
                add_peer(from, NULL, true, false);
510
                }
511
        if (black_listed(node_addr(from)) && (peerset_get_peer(pset, from)))
512
                remove_peer(from, topo_in, topo_out);
513
        else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset, from) == NULL))
514
                send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
515
        break;
516
      case STREAMER_TOPOLOGY_MSG_REMOVE:
517
        if (peerset_get_peer(pset, from) != NULL) {
518
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
519
                remove_peer(from, true, false);
520
                }
521
        break;
522
      default:
523
        fprintf(stderr, "Bad streamer topo message received!\n");
524
    }
525
    reg_neigh_size(peerset_size(pset));
526
    return;
527
  }
528

    
529
  int res_topo_parse_data = topoParseData(buff, len);
530

    
531
  // Exit if a gossiping message has been received
532
  if (res_topo_parse_data > 0) {
533
    reg_neigh_size(peerset_size(pset));
534
    return;
535
  }
536

    
537
  // Look for peers which can be removed from blacklist
538
  check_black_list_timeouts();
539

    
540
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
541

    
542
  // Check if it's time to update the neighbourhood
543
  if (time_difference > UPDATE_PEERS_TIMEOUT) {
544
          
545
    peers = peerset_get_peers(pset);
546
    n_ids = peerset_size(pset);
547
    
548
    newids = topoGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
549
    metas = topGetMetadata(&metasize);
550
    int index = 0;
551
    for (index = 0; index < newids_size; index++) {
552
            dprintf("METAS node %s, capacity %f\n", node_addr(newids[index]), metas[index].capacity);
553
    }
554
    dprintf("METAS my %f END\n", my_metadata.capacity);
555
    max_ids = n_ids + savedids_size + newids_size;
556
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
557

    
558
    int desired_part;
559
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
560
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
561
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
562

    
563
    if (topo_out) {
564
      for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
565
        oldids[oldids_size++] = peers[i].id;
566
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
567
      }
568
    } else {
569
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
570
        oldids[oldids_size++] = savedids[i];
571
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
572
      }
573
      savedids_size = 0;
574
      free(savedids);
575
    }
576

    
577
    // select the topo_mem portion of peers to be kept (uniform random)
578
    if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
579
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
580
    } else {
581
      nidset_shuffle(oldids, oldids_size);
582
    }
583
    for (i = 0; i < oldids_size; i++) {
584
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), metas[i].capacity);
585
      }
586
    dprintf("QSORT KEEP BEST END\n");
587
    
588
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
589
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
590

    
591
    // compose list of known nodeids
592
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
593
  
594
    // compose list of candidate nodeids
595
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
596

    
597
    // select the alpha_target portion of desired peers
598
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
599
    
600
    // Filter out blacklisted ones    
601
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
602
    
603
    if ((topo_keep_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
604
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
605
    } else {
606
      nidset_shuffle(desireds, desireds_size);
607
    }
608
    for (i = 0; i < desireds_size; i++) {
609
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW*RTT %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), metas[i].capacity, (double) (get_capacity_of(desireds[i])*get_rtt_of(desireds[i])) );
610
      }
611
    dprintf("QSORT ADD BEST END\n");
612
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
613

    
614
    // random from the rest
615
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
616
    nidset_shuffle(others, others_size);
617
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
618
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
619

    
620
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
621
        selecteds_size, nodeids_size,
622
        keep_size, oldids_size,
623
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
624
        random_size, others_size);
625
    // add new ones
626
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
627
    for (i = 0; i < toadds_size; i++) {
628
      int j;
629
      //searching for the metadata
630
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
631
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
632
        counter2++;
633
        add_peer(newids[j], &metas[j], topo_out, topo_in);
634
      } else {
635
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
636
      }
637
    }
638

    
639
    // finally, remove those not needed
640
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
641
        
642
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
643
    for (i = 0; i < toremoves_size; i++) {
644
       // Check that it has not been removed already
645
       if (peerset_get_peer(pset, toremoves[i]) != NULL ) {
646
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
647
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
648
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
649
          add_to_blacklist(node_addr(toremoves[i]));
650
          }
651
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
652
       if (peerset_get_peer(pset, toremoves[i]) != NULL) {
653
          remove_peer(toremoves[i], topo_out, topo_in);
654
          }
655
       }
656
    }
657
    fprintf(stderr,"Topo remove end\n");
658

    
659
    if (!topo_out) {
660
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
661
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
662
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
663
      }
664
      for (i = 0; i < oldids_size; i++) {
665
        nodeid_free(oldids[i]);
666
      }
667
    }
668
    gettimeofday(&last_time_updated_peers, NULL); 
669
  } 
670
  reg_neigh_size(peerset_size(pset));
671
}
672

    
673
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
674
{
675
  struct peer *p = peerset_get_peer(pset, id);
676
  if (!p) {
677
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
678
    if (reg) {
679
      add_peer(id,NULL, topo_out, false);
680
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
681
      p = peerset_get_peer(pset,id);
682
    }
683
  }
684

    
685
  return p;
686
}
687

    
688
int peers_init(void)
689
{
690
  fprintf(stderr,"peers_init\n");
691
  pset = peerset_init(0);
692
  return pset ? 1 : 0;
693
}
694

    
695
struct peerset *get_peers(void)
696
{
697
  return pset;
698
}