Statistics
| Branch: | Revision:

streamers / topology.c @ cfff6086

History | View | Annotate | Download (25.4 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30
#include "hrc.h"
31
#include "blacklist.h"
32

    
33
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
34
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
35

    
36
#define UPDATE_PEERS_TIMEOUT 5
37

    
38
double desired_bw = 0;
39
double desired_rtt = 0.2;
40
double alpha_target = 0.5;
41
double topo_mem = 0;
42

    
43
bool topo_out = true; //peer selects out-neighbours
44
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
45

    
46
bool topo_keep_best = false;
47
bool topo_add_best = false;
48
bool topo_black_list = false;
49

    
50
const char* topo_keep_best_policy = "";
51
const char* topo_add_best_policy = "";
52
const char* topo_black_list_policy = "";
53

    
54
int NEIGHBORHOOD_TARGET_SIZE = 30;
55
#define TMAN_MAX_IDLE 10
56
#define TMAN_LOG_EVERY 1000
57

    
58
#define STREAMER_TOPOLOGY_MSG_ADD 0
59
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
60

    
61
static struct peerset *pset_outgoing;
62
static struct peerset *pset_incoming;
63
static struct timeval tout_bmap = {20, 0};
64
static int counter = 0;
65
static int counter2 = 0;
66
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
67
static tmanRankingFunction rankFunct = simpleRanker;
68

    
69
struct metadata {
70
  uint16_t cb_size;
71
  uint16_t cps;
72
  double capacity;
73
  double recv_delay;
74
} __attribute__((packed));
75

    
76
static struct metadata my_metadata;
77
static int cnt = 0;
78
static struct nodeID *me = NULL;
79
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
80
static struct nodeID ** neighbors;
81
static struct timeval last_time_updated_peers;
82

    
83
struct nodeID **newids;
84
int newids_size;
85
struct metadata *metas;
86

    
87
static void update_metadata(void) {
88
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
89
        my_metadata.cps =  (uint16_t) get_running_offer_threads();
90
        my_metadata.recv_delay = (double) get_receive_delay();
91
        my_metadata.capacity = (double) get_capacity();
92
}
93

    
94
static struct metadata get_metadata_for_peer(struct nodeID *id){
95
        int i = 0;
96
        struct metadata to_return;
97
        to_return.cb_size = NAN;
98
        to_return.cps =  NAN;
99
        to_return.recv_delay = NAN;
100
        to_return.capacity = NAN;
101

    
102
        if (newids_size > 0 && id != NULL) {
103
                for (i = 0 ; i < newids_size; i++) {
104
                        if (newids[i] != NULL) {
105
                                if (nodeid_equal(newids[i],id)){
106
                                        to_return.cb_size = metas[i].cb_size;
107
                                        to_return.cps = metas[i].cps;
108
                                        to_return.recv_delay = metas[i].recv_delay;
109
                                        to_return.capacity = metas[i].capacity;
110
                                        return to_return;
111
                                }
112
                        }
113
                }
114
        return to_return;
115
        }
116
}
117

    
118
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
119

    
120
        double t,p1,p2;
121
        t = *((const double *)tin);
122
        p1 = *((const double *)p1in);
123
        p2 = *((const double *)p2in);
124

    
125
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
126
        else if (isnan(p1)) return 2;
127
        else if (isnan(p2)) return 1;
128
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
129

    
130
}
131

    
132
int topologyInit(struct nodeID *myID, const char *config)
133
{
134
        int i;
135
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
136
                bind_msg_type(mTypes[i]);
137
        update_metadata();
138
        me = myID;
139
        struct timeval tnow;
140
        gettimeofday(&tnow, NULL);
141
        gettimeofday(&last_time_updated_peers, NULL);
142
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
143
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
144
}
145

    
146
void topologyShutdown(void)
147
{
148
}
149

    
150
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
151
{
152
        // TODO: check this!! Just to use this function to bootstrap ncast...
153
        struct metadata m = {0};        //TODO: check what metadata option should mean
154

    
155
        if (counter < TMAN_MAX_IDLE)
156
                return topAddNeighbour(neighbour,&m,sizeof(m));
157
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
158
}
159

    
160
static int topoParseData(const uint8_t *buff, int len)
161
{
162
        int res = -1,ncs = 0,msize;
163
        const struct nodeID **n; const void *m;
164
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
165
                res = topParseData(buff,len);
166
//                 if (counter <= TMAN_MAX_IDLE)
167
//                         counter++;
168
        }
169
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
170
        {
171
                n = topGetNeighbourhood(&ncs);
172
                if (ncs) {
173
                m = topGetMetadata(&msize);
174
                res = tmanParseData(buff,len,n,ncs,m,msize);
175
                }
176
        }
177
  return res;
178
}
179

    
180
static const struct nodeID **topoGetNeighbourhood(int *n)
181
{
182
        int i; double d;
183
        if (counter > TMAN_MAX_IDLE) {
184
                uint8_t *mdata; int msize;
185
                *n = tmanGetNeighbourhoodSize();
186
                if (neighbors) free(neighbors);
187
                neighbors = calloc(*n,sizeof(struct nodeID *));
188
                tmanGetMetadata(&msize);
189
                mdata = calloc(*n,msize);
190
                tmanGivePeers(*n,neighbors,(void *)mdata);
191

    
192
                if (cnt % TMAN_LOG_EVERY == 0) {
193
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
194
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
195
                                d = *((double *)(mdata+i*msize));
196
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
197
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
198
                        }
199
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
200
                }
201

    
202
                free(mdata);
203
                return (const struct nodeID **)neighbors;
204
        }
205
        else
206
                return topGetNeighbourhood(n);
207
}
208

    
209
static void topoAddToBL (struct nodeID *id)
210
{
211
        if (counter >= TMAN_MAX_IDLE)
212
                tmanAddToBlackList(id);
213
        topAddToBlackList(id);
214
}
215

    
216
//TODO: send metadata as well
217
static int send_topo_msg(struct nodeID *dst, uint8_t type)
218
{
219
  char msg[2];
220
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
221
  msg[1] = type;
222
  
223
  return send_to_peer(me, dst, msg, 2);
224
}
225

    
226
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
227
{
228
  if (outgoing) {
229
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
230
      peerset_add_peer(pset_outgoing, id);
231
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
232
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
233
      /* add measures here */
234
      add_measures(id);
235
      send_bmap(id);
236
  }
237
  if (incoming) {
238
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
239
      peerset_add_peer(pset_incoming, id);
240
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
241
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
242
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
243
  }
244
}
245

    
246
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
247
{
248
  if (outgoing) {
249
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
250
      /* delete measures here */
251
      delete_measures(id);
252
      peerset_remove_peer(pset_outgoing, id);
253
  }
254
  if (incoming) {
255
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
256
      peerset_remove_peer(pset_incoming, id);
257
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
258
  }
259
}
260

    
261
//get the rtt. Currenly only MONL version is supported
262
static double get_rtt_of(const struct nodeID* n){
263
#ifdef MONL
264
  return get_rtt(n);
265
#else
266
  return NAN;
267
#endif
268
}
269

    
270
//get the declared capacity of a node
271
static double get_capacity_of(const struct nodeID* n){
272
  struct peer *p = peerset_get_peer(pset_incoming, n);
273
  if (p) {
274
    return p->capacity;
275
  } 
276
  p = peerset_get_peer(pset_outgoing, n);
277
  if (p) {
278
    return p->capacity;
279
  } 
280
  return NAN;
281
}
282

    
283
// //get the declared capacity of a node
284
// static double get_capacity_of_newid(const struct nodeID* n){
285
//   int i = 0;
286
//   for (i = 0; i < newids_size; i++){
287
//           if (strcmp(node_addr(n), node_addr(newids[i]))) {
288
//                   return metas[i].capacity;
289
//           }
290
//   }
291
//   return NAN;
292
// }
293

    
294

    
295
// FF//get the declared capacity of a node
296
// static double get_offers_threads_of(const struct nodeID* n){
297
//   struct peer *p = peerset_get_peer(pset, n);
298
//   if (p) {
299
//     return p->offers_threads;
300
//   }
301
// 
302
//   return NAN;
303
// }
304

    
305

    
306
static double get_rx_bytes_chunks_of(const struct nodeID* n){
307
#ifdef MONL
308
  return get_rx_bytes_chunks(n);
309
#else
310
  return NAN;
311
#endif
312
}
313

    
314
static double get_transmitter_lossrate_of(const struct nodeID* n){
315
#ifdef MONL
316
  return get_transmitter_lossrate_of(n);
317
#else
318
  return NAN;
319
#endif
320
}
321

    
322
//returns: 1:yes 0:no -1:unknown
323
int desiredness(const struct nodeID* n) {
324
  double rtt = get_rtt_of(n);
325
  double bw = 0.0;
326
  bw =  get_metadata_for_peer(n).capacity;
327
  
328
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
329
    return -1;
330
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
331
    return 1;
332
  }
333

    
334
  return 0;
335
}
336

    
337
// equal to desiredness, but considers blacklist
338
//returns: 1:yes 0:no -1:unknown
339
int desiredness_unbled(const struct nodeID* n) {
340
  double rtt = get_rtt_of(n);
341
  bool bled = black_listed(node_addr(n));
342
  double bw =  0.0;
343
  bw =  get_metadata_for_peer(n).capacity;
344

    
345
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
346
    return -1;
347
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw) && (!bled)) {
348
    return 1;
349
  }
350

    
351
  return 0;
352
}
353

    
354
//returns: 1:yes 0:no -1:unknown
355
int blacklistness_lossrate(const struct nodeID* n) {
356
  double loss_rate = get_transmitter_lossrate(n);
357
  
358
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
359
  
360
  if (isnan(loss_rate)) {
361
    return -1;
362
  } else if (loss_rate > 0.10) {
363
    return 1;
364
  }
365

    
366
  return 0;
367
}
368

    
369
bool is_desired(const struct nodeID* n) {
370
  return (desiredness(n) == 1);
371
}
372

    
373
bool is_desired_unbled(const struct nodeID* n) {
374
  return (desiredness_unbled(n) == 1);
375
}
376

    
377
bool is_to_blacklist(const struct nodeID* n) {
378
  return (blacklistness_lossrate(n) == 1);
379
}
380

    
381
double add_alpha_noise(double to_return) {
382
        double uniform = rand()/(RAND_MAX + 1.0);
383
        return (double) (((1-alpha_target) + (2*alpha_target*uniform)) * to_return) ; 
384
        }
385

    
386
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
387
  double ra, rb;
388
  ra = add_alpha_noise(get_rtt_of(*a));
389
  rb = add_alpha_noise(get_rtt_of(*b));
390
  
391
  fprintf(stderr, "RTTS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
392
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
393
  else if (isnan(rb) || ra < rb) return -1;
394
  else return 1;
395
}
396

    
397
int vcmp_rtt(const void* a, const void* b) {
398
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
399
}
400

    
401
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
402
  double ra, rb;
403
  ra = -( add_alpha_noise(get_metadata_for_peer(*a).capacity));
404
  rb = -( add_alpha_noise(get_metadata_for_peer(*b).capacity));
405

    
406
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
407
  else if (isnan(rb) || ra < rb) return -1;
408
  else return 1;
409
}
410

    
411
int vcmp_bw(const void* a, const void* b) {
412
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
413
}
414

    
415
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
416
  double ra, rb;
417
  ra = -( add_alpha_noise(get_metadata_for_peer(*a).capacity * get_rtt_of(*a)));
418
  rb = -( add_alpha_noise(get_metadata_for_peer(*b).capacity * get_rtt_of(*b)));
419

    
420
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
421
  else if (isnan(rb) || ra < rb) return -1;
422
  else return 1;
423
}
424

    
425
int vcmp_rtt_bw(const void* a, const void* b) {
426
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
427
}
428

    
429
int cmp_offers(const struct nodeID** a, const struct nodeID** b) {
430
  double ra, rb;
431
  ra = -( add_alpha_noise(get_metadata_for_peer(*a).cps) );
432
  rb = -( add_alpha_noise(get_metadata_for_peer(*b).cps) );
433
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
434
  else if (isnan(rb) || ra < rb) return -1;
435
  else return 1;
436
}
437

    
438
int vcmp_offers(const void* a, const void* b) {
439
  return cmp_offers((const struct nodeID**)a, (const struct nodeID**)b);
440
}
441

    
442
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
443
  double ra, rb;
444
  ra = -(add_alpha_noise(get_rx_bytes_chunks_of(*a)));
445
  rb = -(add_alpha_noise(get_rx_bytes_chunks_of(*b)));
446
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
447
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
448
        return 0;
449
  else if (isnan(rb) || ra < rb) 
450
        return -1;
451
  else 
452
        return 1;
453
}
454

    
455
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
456
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
457
}
458

    
459
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
460
  double ra, rb;
461
  ra = add_alpha_noise(get_transmitter_lossrate(*a));
462
  rb = add_alpha_noise(get_transmitter_lossrate(*b));
463
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
464
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
465
        return 0;
466
  else if (isnan(rb) || ra < rb) 
467
        return -1;
468
  else 
469
        return 1;
470
}
471

    
472
int vcmp_packet_loss(const void* a, const void* b) {
473
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
474
}
475

    
476
// Use add best policy specified at command line
477
int vcmp_add_best(const void* a, const void* b) {
478
  if (strcmp(topo_add_best_policy, "OFFERS") == 0) {
479
          return vcmp_offers(a, b);
480
  } else if (strcmp(topo_add_best_policy, "BW") == 0) {
481
          return vcmp_bw(a, b);
482
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
483
          return vcmp_rtt( a,  b);
484
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
485
          return vcmp_rtt_bw( a,  b);
486
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
487
          return vcmp_rtt_bw( a, b);
488
  } else {
489
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
490
    return 1;
491
  }  
492
}
493

    
494
// Use keep best policy specified at command line
495
int vcmp_keep_best(const void* a, const void* b) {
496
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
497
          return vcmp_rtt(a, b);
498
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
499
          return vcmp_bw(a, b);
500
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
501
          return vcmp_rx_bytes_chunks(a, b);
502
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
503
          return vcmp_packet_loss( a, b);
504
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
505
          return vcmp_rx_bytes_chunks( a, b);
506
  } else {
507
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
508
    return 1;
509
  }  
510
}
511

    
512
// currently it just makes the peerset grow
513
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
514
{
515
  int n_ids, metasize, i, max_ids;
516
  
517
  struct peer *peers;
518
  struct timeval tnow, told;
519
  static const struct nodeID **savedids;
520
  static int savedids_size;
521
  
522
  if timerisset(&tout_bmap) {
523
    gettimeofday(&tnow, NULL);
524
    timersub(&tnow, &tout_bmap, &told);
525
    peers = peerset_get_peers(pset_incoming);
526
    // Prune incoming inactive peers
527
    for (i = 0; i < peerset_size(pset_incoming); i++) {
528
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
529
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
530
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
531
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
532
        topoAddToBL(peers[i].id);
533
        remove_peer(peers[i--].id, topo_out, topo_in);
534
        //}
535
      }
536
    }
537
  }
538

    
539
  //handle explicit add/remove messages
540
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
541
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
542
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
543
    if (len != 2) {
544
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
545
      return;
546
    }
547
    switch (buff[1]) {
548
      case STREAMER_TOPOLOGY_MSG_ADD:
549
        // I have to add this peer to my outgoing neighbourood
550
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
551
        if (!peerset_get_peer(pset_outgoing, from)) {
552
                counter2++;
553
                add_peer(from, NULL, true, false);
554
                }
555
//         if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
556
//                 remove_peer(from, topo_in, topo_out);
557
//         else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
558
//                 send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
559
        break;
560
      case STREAMER_TOPOLOGY_MSG_REMOVE:
561
        // Neighbour ask me to remove him from my outgoing neighbourhood
562
        if (peerset_get_peer(pset_outgoing, from) != NULL) {
563
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
564
                remove_peer(from, true, false);
565
                }
566
        break;
567
      default:
568
        fprintf(stderr, "Bad streamer topo message received!\n");
569
    }
570
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
571
    reg_incoming_neigh_size(peerset_size(pset_incoming));
572
    return;
573
  }
574

    
575
  int res_topo_parse_data = topoParseData(buff, len);
576

    
577
  // Exit if a gossiping message has been received
578
  if (res_topo_parse_data > 0) {
579
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
580
    reg_incoming_neigh_size(peerset_size(pset_incoming));
581
    return;
582
  }
583

    
584
  // Look for peers which can be removed from blacklist
585
  check_black_list_timeouts();
586
  update_metadata();
587
  topChangeMetadata(&my_metadata, sizeof(struct metadata));
588

    
589
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
590

    
591
  // Check if it's time to update the neighbourhood
592
  if (time_difference > UPDATE_PEERS_TIMEOUT) {
593
    
594
    // If I have to control my sons (--topo_in)
595
    if (topo_in && !topo_out) {
596
        peers = peerset_get_peers(pset_outgoing);
597
        n_ids = peerset_size(pset_outgoing);
598
    } else if (!topo_in && topo_out) { // If I have to control my fathers (--topo_out)
599
        peers = peerset_get_peers(pset_incoming);
600
        n_ids = peerset_size(pset_incoming);
601
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
602
        peers = peerset_get_peers(pset_incoming);
603
        n_ids = peerset_size(pset_incoming);
604
    }
605

    
606
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
607
    metas = topGetMetadata(&metasize);
608
    int index = 0;
609
    for (index = 0; index < newids_size; index++) {
610
            dprintf("METAS node %s, capacity %f, cps %d, buffer %d\n", node_addr(newids[index]), metas[index].capacity, metas[index].cps, metas[index].cb_size);
611
    }
612
    dprintf("METAS my %f END\n", my_metadata.capacity);
613
    max_ids = n_ids + savedids_size + newids_size;
614
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
615

    
616
    int desired_part;
617
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
618
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
619
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
620

    
621
    if (topo_out) {
622
      for (i = 0, oldids_size = 0; i < peerset_size(pset_incoming); i++) {
623
        oldids[oldids_size++] = peers[i].id;
624
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
625
      }
626
    } else {
627
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
628
        oldids[oldids_size++] = savedids[i];
629
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
630
      }
631
      savedids_size = 0;
632
      free(savedids);
633
    }
634

    
635
    // select the topo_mem portion of peers to be kept (uniform random)
636
    if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
637
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
638
    } else {
639
      nidset_shuffle(oldids, oldids_size);
640
    }
641
    for (i = 0; i < oldids_size; i++) {
642
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), get_capacity_of(oldids[i]));
643
      }
644
    dprintf("QSORT KEEP BEST END\n");
645
    
646
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
647
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
648

    
649
    // compose list of known nodeids
650
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
651
  
652
    // compose list of candidate nodeids
653
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
654

    
655
    // select the alpha_target portion of desired peers
656
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
657
    
658
    // Filter out blacklisted ones    
659
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
660
    
661
    if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
662
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
663
    } else {
664
      nidset_shuffle(desireds, desireds_size);
665
    }
666
    for (i = 0; i < desireds_size; i++) {
667
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW*RTT %f, OFFERS %d\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity*get_rtt_of(desireds[i])), (get_metadata_for_peer(desireds[i])).cps);
668
      }
669
    dprintf("QSORT ADD BEST END\n");
670
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
671

    
672
    // random from the rest
673
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
674
    nidset_shuffle(others, others_size);
675
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
676
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
677

    
678
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
679
        selecteds_size, nodeids_size,
680
        keep_size, oldids_size,
681
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
682
        random_size, others_size);
683
    // add new ones
684
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
685
    for (i = 0; i < toadds_size; i++) {
686
      int j;
687
      //searching for the metadata
688
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
689
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
690
        counter2++;
691
        add_peer(newids[j], &metas[j], topo_out, topo_in);
692
      } else {
693
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
694
      }
695
    }
696

    
697
    // finally, remove those not needed
698
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
699
        
700
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
701
    for (i = 0; i < toremoves_size; i++) {
702
       // Check that it has not been removed already
703
       bool removed_already = false;
704
       if (topo_in && !topo_out) {
705
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
706
       } else if (!topo_in && topo_out) {
707
               removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
708
       } else if (topo_in && topo_out) {
709
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
710
       }
711
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
712
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
713
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
714
          add_to_blacklist(node_addr(toremoves[i]));
715
          }
716
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
717
       if (!removed_already) {
718
          remove_peer(toremoves[i], topo_out, topo_in);
719
          }
720
    }
721
    fprintf(stderr,"Topo remove end\n");
722

    
723
    if (!topo_out) {
724
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
725
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
726
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
727
      }
728
      for (i = 0; i < oldids_size; i++) {
729
        nodeid_free(oldids[i]);
730
      }
731
    }
732
    gettimeofday(&last_time_updated_peers, NULL); 
733
  } 
734
  reg_outgoing_neigh_size(peerset_size(pset_outgoing));
735
  reg_incoming_neigh_size(peerset_size(pset_incoming));
736
}
737

    
738
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
739
{
740
  struct peer *p = peerset_get_peer(pset_incoming, id);
741
  if (!p) {
742
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
743
    if (reg) {
744
      add_peer(id,NULL, false, true);
745
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
746
      p = peerset_get_peer(pset_incoming,id);
747
    }
748
  }
749

    
750
  return p;
751
}
752

    
753
int peers_init(void)
754
{
755
  fprintf(stderr,"peers_init\n");
756
  pset_incoming = peerset_init(0);
757
  pset_outgoing = peerset_init(0);
758
  return pset_incoming && pset_outgoing ? 1 : 0;
759
}
760

    
761
struct peerset *get_peers(void)
762
{
763
  return pset_outgoing;
764
}
765

    
766
struct peerset *get_outgoing_peers(void)
767
{
768
  return pset_outgoing;
769
}
770

    
771
struct peerset *get_incoming_peers(void)
772
{
773
  return pset_incoming;
774
}