Statistics
| Branch: | Revision:

streamers / topology.c @ d27d49f4

History | View | Annotate | Download (43.3 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21
#include <int_coding.h>
22

    
23
#include "compatibility/timer.h"
24

    
25
#include "topology.h"
26
#include "scheduler_la.h"
27
#include "nodeid_set.h"
28
#include "streaming.h"
29
#include "dbg.h"
30
#include "measures.h"
31
#include "streamer.h"
32
#include "hrc.h"
33
#include "blacklist.h"
34

    
35
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
36
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
37

    
38
#define MODULE  2147483647
39
#define A       16807
40
#define LASTXN  127773
41
#define UPTOMOD -2836
42
#define RATIO   0.46566128e-9
43

    
44
double alpha_target = 0.5;
45
double topo_mem = 0;
46

    
47
bool topo_out = true; //peer selects out-neighbours
48
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
49

    
50
bool topo_keep_best = false;
51
bool topo_add_best = false;
52
bool topo_black_list = false;
53

    
54
const char* topo_keep_best_policy = "";
55
const char* topo_add_best_policy = "";
56
const char* topo_black_list_policy = "";
57

    
58
int subnet = 40;
59
int NEIGHBORHOOD_TARGET_SIZE = 30;
60
double UPDATE_PEERS_TIMEOUT = 5;
61
#define TMAN_MAX_IDLE 10
62
#define TMAN_LOG_EVERY 1000
63

    
64
#define STREAMER_TOPOLOGY_MSG_ADD 0
65
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
66

    
67
static struct peerset *pset_outgoing;
68
static struct peerset *pset_incoming;
69
static struct timeval tout_bmap = {20, 0};
70
static int counter = 0;
71
static int counter2 = 0;
72
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
73
static tmanRankingFunction rankFunct = simpleRanker;
74

    
75
struct metadata {
76
  uint16_t cb_size;
77
  uint16_t cps;
78
  uint16_t subnet;
79
  double capacity;
80
  double recv_delay;
81
} __attribute__((packed));
82

    
83
static struct metadata my_metadata;
84
static int cnt = 0;
85
static struct nodeID *me = NULL;
86
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
87
static struct nodeID ** neighbors;
88
static struct timeval last_time_updated_peers;
89

    
90

    
91
struct nodeID **newids;
92
int newids_size;
93
double *add_weights;
94
static long seed = 1;
95
struct metadata *metas;
96

    
97
double add_noise(double to_return, double alpha) {
98
        double uniform = rand()/(RAND_MAX + 1.0);
99
        return (double) (((1-alpha) + (2*alpha_target*uniform)) * to_return) ; 
100
        }
101

    
102
long rnd32(long gSeed)
103
{
104
  long times, rest, prod1, prod2;
105

    
106
  times = gSeed / LASTXN;
107
  rest = gSeed - times * LASTXN;
108
  prod1 = times * UPTOMOD;
109
  prod2 = rest * A;
110
  gSeed = prod1 + prod2;
111
  if (gSeed < 0)
112
    gSeed = gSeed + MODULE;
113
  return gSeed;
114
}
115

    
116
double negexp (double mean, long *gSeed)
117
{
118
  double u;
119
  *gSeed = rnd32 (*gSeed);
120
  u = (*gSeed) * RATIO;
121
  return (-mean * log (u));
122
}
123

    
124
static void update_metadata(void) {
125
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
126
        my_metadata.cps =  am_i_source() ? 15 : (uint16_t) get_running_offer_threads();
127
        my_metadata.subnet =  subnet;
128
        my_metadata.recv_delay = (double) get_receive_delay();
129
        my_metadata.capacity = (double) get_capacity();
130
}
131

    
132
static struct metadata get_metadata_for_peer(struct nodeID *id){
133
        int i = 0;
134
        struct metadata to_return;
135
        to_return.cb_size = (uint16_t) NAN;
136
        to_return.cps = (uint16_t) NAN;
137
        to_return.subnet = (uint16_t) NAN;
138
        to_return.recv_delay = NAN;
139
        to_return.capacity = NAN;
140

    
141
        if (newids_size > 0 && id != NULL) {
142
                for (i = 0 ; i < newids_size; i++) {
143
                        if (newids[i] != NULL) {
144
                                if (nodeid_equal(newids[i],id)){
145
                                        to_return.cb_size = metas[i].cb_size;
146
                                        to_return.cps = metas[i].cps;
147
                                        to_return.subnet =  metas[i].subnet;
148
                                        to_return.recv_delay = metas[i].recv_delay;
149
                                        to_return.capacity = metas[i].capacity;
150
                                        return to_return;
151
                                }
152
                        }
153
                }
154
        return to_return;
155
        }
156
}
157

    
158
static double get_exp_for_peer(struct nodeID *id){
159
        int i = 0;
160
        double to_return = NAN;
161

    
162
        if (newids_size > 0 && id != NULL) {
163
                for (i = 0 ; i < newids_size; i++) {
164
                        if (newids[i] != NULL) {
165
                                if (nodeid_equal(newids[i],id)){
166
                                        to_return = add_weights[i];
167
                                        return to_return;
168
                                }
169
                        }
170
                }
171
        }
172
        return to_return;
173
}
174

    
175
static void fill_add_weights(){
176
        int i = 0;
177
        if (newids_size > 0) {
178
                for (i = 0 ; i < newids_size; i++) {
179
                        add_weights[i] = negexp((1/((metas[i].capacity)/100000)), &seed);
180
                        dprintf("EXP: exp %f, capacity %f, seed %ld\n", add_weights[i], (metas[i].capacity), seed );
181
                }
182
        }
183
        return;
184
}
185

    
186

    
187
void check_my_state() {
188
  // Check if we are in adaptive mode
189
          double playout = get_chunk_playout();
190
        dprintf("CHECK: playout %f\n", playout);
191
        if (playout < 1) {
192
                UPDATE_PEERS_TIMEOUT = 2;
193
                topo_mem = 0.6;
194
        }
195
        else {
196
                UPDATE_PEERS_TIMEOUT = 10;
197
                topo_mem = 0.95;
198
        }
199
}
200

    
201
/**
202
  * @brief Prototype for function assigning a weigth to a nodeid
203
  * @return the weight associated to the peer
204
  */
205
typedef double (*nodeidEvaluateFunction)(struct nodeID **);
206

    
207
/**
208
  * Select best N of K peers with the given ordering method
209
  */
210
void nidset_select(SchedOrdering ordering, struct nodeID **peers, size_t peers_len, nodeidEvaluateFunction peerevaluate, struct nodeID **selected, size_t *selected_len ){
211
  selectWithOrdering(ordering, sizeof(peers[0]), (void*)peers, peers_len, (evaluateFunction)peerevaluate, (void*)selected, selected_len);
212
}
213

    
214

    
215
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
216

    
217
        double t,p1,p2;
218
        t = *((const double *)tin);
219
        p1 = *((const double *)p1in);
220
        p2 = *((const double *)p2in);
221

    
222
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
223
        else if (isnan(p1)) return 2;
224
        else if (isnan(p2)) return 1;
225
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
226

    
227
}
228

    
229
int topologyInit(struct nodeID *myID, const char *config)
230
{
231
        int i;
232
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
233
                bind_msg_type(mTypes[i]);
234
        update_metadata();
235
        me = myID;
236
        struct timeval tnow;
237
        gettimeofday(&tnow, NULL);
238
        gettimeofday(&last_time_updated_peers, NULL);
239
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
240
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
241
}
242

    
243
void topologyShutdown(void)
244
{
245
}
246

    
247
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
248
{
249
        // TODO: check this!! Just to use this function to bootstrap ncast...
250
        struct metadata m = {0};        //TODO: check what metadata option should mean
251

    
252
        if (counter < TMAN_MAX_IDLE)
253
                return topAddNeighbour(neighbour,&m,sizeof(m));
254
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
255
}
256

    
257
static int topoParseData(const uint8_t *buff, int len)
258
{
259
        int res = -1,ncs = 0,msize;
260
        const struct nodeID **n; const void *m;
261
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
262
                res = topParseData(buff,len);
263
//                 if (counter <= TMAN_MAX_IDLE)
264
//                         counter++;
265
        }
266
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
267
        {
268
                n = topGetNeighbourhood(&ncs);
269
                if (ncs) {
270
                m = topGetMetadata(&msize);
271
                res = tmanParseData(buff,len,n,ncs,m,msize);
272
                }
273
        }
274
  return res;
275
}
276

    
277
static const struct nodeID **topoGetNeighbourhood(int *n)
278
{
279
        int i; double d;
280
        if (counter > TMAN_MAX_IDLE) {
281
                uint8_t *mdata; int msize;
282
                *n = tmanGetNeighbourhoodSize();
283
                if (neighbors) free(neighbors);
284
                neighbors = calloc(*n,sizeof(struct nodeID *));
285
                tmanGetMetadata(&msize);
286
                mdata = calloc(*n,msize);
287
                tmanGivePeers(*n,neighbors,(void *)mdata);
288

    
289
                if (cnt % TMAN_LOG_EVERY == 0) {
290
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
291
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
292
                                d = *((double *)(mdata+i*msize));
293
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
294
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
295
                        }
296
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
297
                }
298

    
299
                free(mdata);
300
                return (const struct nodeID **)neighbors;
301
        }
302
        else
303
                return topGetNeighbourhood(n);
304
}
305

    
306
static void topoAddToBL (struct nodeID *id)
307
{
308
        if (counter >= TMAN_MAX_IDLE)
309
                tmanAddToBlackList(id);
310
        topAddToBlackList(id);
311
}
312

    
313
void encodeMetaData(uint8_t *buff, int buff_len)
314
{
315
//   if (buff_len < 2 + 2 + 2 + 4 + 4) {
316
//     /* Not enough space... */
317
//     return -1;
318
//   }
319
  int16_cpy(buff, my_metadata.cb_size);
320
  int16_cpy(buff + 2, my_metadata.cps);
321
  int16_cpy(buff + 4, my_metadata.subnet);
322
  int_cpy(buff + 8, ((int) my_metadata.capacity));
323
  int_cpy(buff + 12, ( (isnan(my_metadata.recv_delay)) ? (int) 1e6 : (int) (my_metadata.recv_delay*1e6)));
324
  dprintf("ENCODE: cb_size %d, cps %d, capacity %f, recv_delay %f\n", my_metadata.cb_size, my_metadata.cps, my_metadata.capacity, my_metadata.recv_delay );
325
  }
326

    
327
static int send_topo_msg(struct nodeID *dst, uint8_t type)
328
{
329
  int res = 0;
330
  int buff_len = 2 + sizeof(struct metadata);
331
  
332
  uint8_t *buff = malloc(buff_len);
333
  buff[0] = MSG_TYPE_STREAMER_TOPOLOGY;
334
  buff[1] = type;
335
  
336
  encodeMetaData(buff + 2, buff_len);
337
    
338
  res = send_to_peer(me, dst, buff, buff_len);
339
  free(buff);
340
  
341
  return res;
342
}
343

    
344
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
345
{
346
  if (outgoing) {
347
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
348
      peerset_add_peer(pset_outgoing, id);
349
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
350
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
351
      if (m) peerset_get_peer(pset_outgoing, id)->subnet = m->subnet;
352
      send_bmap(id);
353
  }
354
  if (incoming) {
355
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
356
      peerset_add_peer(pset_incoming, id);
357
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
358
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
359
      if (m) peerset_get_peer(pset_incoming, id)->subnet = m->subnet;
360
      /* add measures here */
361
      add_measures(id);
362
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
363
      send_bmap(id);
364
  }
365
}
366

    
367
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
368
{
369
  if (outgoing) {
370
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
371
      peerset_remove_peer(pset_outgoing, id);
372
  }
373
  if (incoming) {
374
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
375
      peerset_remove_peer(pset_incoming, id);
376
      /* delete measures here */
377
      delete_measures(id);
378
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
379
  }
380
}
381

    
382
//get the rtt. Currenly only MONL version is supported
383
static double get_rtt_of(const struct nodeID* n){
384
#ifdef MONL
385
  return get_rtt(n);
386
#else
387
  return NAN;
388
#endif
389
}
390

    
391
//get the declared capacity of a node
392
static double get_capacity_of(const struct nodeID* n){
393
  struct peer *p = peerset_get_peer(pset_incoming, n);
394
  if (p) {
395
    return p->capacity;
396
  } 
397
  p = peerset_get_peer(pset_outgoing, n);
398
  if (p) {
399
    return p->capacity;
400
  } 
401
  return NAN;
402
}
403

    
404
//get the declared subnet of a node
405
static int get_subnet_of(const struct nodeID* n){
406
  struct peer *p = peerset_get_peer(pset_incoming, n);
407
  if (p) {
408
    return p->subnet;
409
  } 
410
  p = peerset_get_peer(pset_outgoing, n);
411
  if (p) {
412
    return p->subnet;
413
  } 
414
  return NAN;
415
}
416

    
417
static double get_rx_bytes_chunks_of(const struct nodeID* n){
418
#ifdef MONL
419
  return get_rx_bytes_chunks(n);
420
#else
421
  return NAN;
422
#endif
423
}
424

    
425
static double get_rx_chunks_of(const struct nodeID* n){
426
#ifdef MONL
427
  return get_rx_chunks(n);
428
#else
429
  return NAN;
430
#endif
431
}
432

    
433
static double get_transmitter_lossrate_of(const struct nodeID* n){
434
#ifdef MONL
435
  return get_transmitter_lossrate_of(n);
436
#else
437
  return NAN;
438
#endif
439
}
440

    
441
// Check if peer is the blacklist
442
//returns: 1:yes 0:no
443
int desiredness_unbled(const struct nodeID* n) {
444
  bool bled = black_listed(node_addr(n));
445

    
446
  if (!bled) {
447
    return 1;
448
  }
449
  return 0;
450
}
451

    
452
//returns: 1:yes 0:no -1:unknown
453
int blacklistness_lossrate(const struct nodeID* n) {
454
  double loss_rate = get_transmitter_lossrate(n);
455
  
456
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
457
  
458
  if (isnan(loss_rate)) {
459
    return -1;
460
  } else if (loss_rate > 0.10) {
461
    return 1;
462
  }
463

    
464
  return 0;
465
}
466

    
467
bool is_desired_unbled(const struct nodeID* n) {
468
  return (desiredness_unbled(n) == 1);
469
}
470

    
471
bool is_to_blacklist(const struct nodeID* n) {
472
  return (blacklistness_lossrate(n) == 1);
473
}
474

    
475
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
476
  double ra, rb;
477
  ra = add_noise(get_rtt_of(*a), alpha_target);
478
  rb = add_noise(get_rtt_of(*b), alpha_target);
479
  
480
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
481
  else if (isnan(rb) || ra < rb) return -1;
482
  else return 1;
483
}
484

    
485
int vcmp_rtt(const void* a, const void* b) {
486
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
487
}
488

    
489
static double get_bw2(const struct nodeID** a){
490
  double ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
491
  dprintf( "bw: %s %f\n", node_addr(*a), ra);
492
  return ra;
493
}
494

    
495
static double get_add_subnet2(const struct nodeID** a){
496
  int his_subnet = get_metadata_for_peer(*a).subnet ;
497
  double ra = NAN;
498
  if (his_subnet == subnet) {
499
          ra = add_noise( 1/0.020, alpha_target);
500
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
501
          ra = add_noise( 1/0.080, alpha_target);
502
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
503
          ra = add_noise( 1/0.120, alpha_target);
504
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
505
          ra = add_noise( 1/0.160, alpha_target);
506
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
507
          ra = add_noise( 1/0.140, alpha_target);
508
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
509
          ra = add_noise( 1/0.240, alpha_target);
510
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
511
          ra = add_noise( 1/0.200, alpha_target);
512
  } else if (his_subnet == 40) {
513
           ra = add_noise( 1/0.020, alpha_target);
514
  }
515
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), ra);
516
  return ra;
517
}
518

    
519
static double get_add_subnet_root2(const struct nodeID** a){
520
  int his_subnet = get_metadata_for_peer(*a).subnet ;
521
  double ra = NAN;
522
  if (his_subnet == subnet) {
523
          ra = add_noise( 1/sqrt(0.020), alpha_target);
524
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
525
          ra = add_noise( 1/sqrt(0.080), alpha_target);
526
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
527
          ra = add_noise( 1/sqrt(0.120), alpha_target);
528
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
529
          ra = add_noise( 1/sqrt(0.160), alpha_target);
530
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
531
          ra = add_noise( 1/sqrt(0.140), alpha_target);
532
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
533
          ra = add_noise( 1/sqrt(0.240), alpha_target);
534
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
535
          ra = add_noise( 1/sqrt(0.300), alpha_target);
536
  } else if (his_subnet == 40) {
537
           ra = add_noise( 1/sqrt(0.020), alpha_target);
538
  }
539
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), ra);
540
  return ra;
541
}
542

    
543
static double get_keep_subnet2(const struct nodeID** a){
544
  int his_subnet = get_subnet_of(*a) ;
545
  double ra = NAN;
546
  if (his_subnet == subnet) {
547
          ra = add_noise( 1/0.020, alpha_target);
548
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
549
          ra = add_noise( 1/0.080, alpha_target);
550
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
551
          ra = add_noise( 1/0.120, alpha_target);
552
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
553
          ra = add_noise( 1/0.160, alpha_target);
554
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
555
          ra = add_noise( 1/0.140, alpha_target);
556
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
557
          ra = add_noise( 1/0.240, alpha_target);
558
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
559
          ra = add_noise( 1/0.300, alpha_target);
560
  } else if (his_subnet == 40) {
561
           ra = add_noise( 1/0.020, alpha_target);
562
  }
563
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), ra);
564
  return ra;
565
}
566

    
567
static double get_keep_subnet_root2(const struct nodeID** a){
568
  int his_subnet = get_subnet_of(*a) ;
569
  double ra = NAN;
570
  if (his_subnet == subnet) {
571
          ra = add_noise( 1/sqrt(0.020), alpha_target);
572
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
573
          ra = add_noise( 1/sqrt(0.080), alpha_target);
574
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
575
          ra = add_noise( 1/sqrt(0.120), alpha_target);
576
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
577
          ra = add_noise( 1/sqrt(0.160), alpha_target);
578
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
579
          ra = add_noise( 1/sqrt(0.140), alpha_target);
580
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
581
          ra = add_noise( 1/sqrt(0.240), alpha_target);
582
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
583
          ra = add_noise( 1/sqrt(0.300), alpha_target);
584
  } else if (his_subnet == 40) {
585
           ra = add_noise( 1/sqrt(0.020), alpha_target);
586
  }
587
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), ra);
588
  return ra;
589
}
590

    
591
static double get_keep_subnet_root3(const struct nodeID** a){
592
  int his_subnet = get_subnet_of(*a) ;
593
  double ra = NAN;
594
  if (his_subnet == subnet) {
595
          ra = add_noise( 1/cbrt(0.020), alpha_target);
596
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
597
          ra = add_noise( 1/cbrt(0.080), alpha_target);
598
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
599
          ra = add_noise( 1/cbrt(0.120), alpha_target);
600
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
601
          ra = add_noise( 1/cbrt(0.160), alpha_target);
602
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
603
          ra = add_noise( 1/cbrt(0.140), alpha_target);
604
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
605
          ra = add_noise( 1/cbrt(0.240), alpha_target);
606
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
607
          ra = add_noise( 1/cbrt(0.300), alpha_target);
608
  } else if (his_subnet == 40) {
609
           ra = add_noise( 1/cbrt(0.020), alpha_target);
610
  }
611
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), ra);
612
  return ra;
613
}
614

    
615
static double get_keep_subnet_rx_chunks(const struct nodeID** a){
616
  int his_subnet = get_subnet_of(*a) ;
617
  double rx_chunks = get_rx_chunks_of(*a);
618
  double ra = NAN;
619
  if (his_subnet == subnet) {
620
          ra = add_noise( 1/0.020, alpha_target);
621
  } else if ( ( subnet == 41 && his_subnet == 43 ) || ( subnet == 43 && his_subnet == 41 ) ) {
622
          ra = add_noise( 1/0.080, alpha_target);
623
  } else if ( ( subnet == 41 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 41 ) ) {
624
          ra = add_noise( 1/0.120, alpha_target);
625
  } else if ( ( subnet == 41 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 41 ) ) {
626
          ra = add_noise( 1/0.160, alpha_target);
627
  } else if ( ( subnet == 43 && his_subnet == 44 ) || ( subnet == 44 && his_subnet == 43 ) ) {
628
          ra = add_noise( 1/0.140, alpha_target);
629
  } else if ( ( subnet == 43 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 43 ) ) {
630
          ra = add_noise( 1/0.240, alpha_target);
631
  } else if ( ( subnet == 44 && his_subnet == 45 ) || ( subnet == 45 && his_subnet == 44 ) ) {
632
          ra = add_noise( 1/0.300, alpha_target);
633
  } else if (his_subnet == 40) {
634
           ra = add_noise( 1/0.020, alpha_target);
635
  }
636
  dprintf( "subnet rtt: %s %f\n", node_addr(*a), isnan(rx_chunks)? NAN:(rx_chunks/ra));
637
  if isnan(rx_chunks)
638
        return ra;
639
  else 
640
        return (double) (rx_chunks/ra);
641
}
642

    
643

    
644
static double get_bw_local2(const struct nodeID** a){
645
  double ra = add_noise(get_capacity_of(*a), alpha_target);
646
  dprintf( "bw: %s %f\n", node_addr(*a), ra);
647
  return ra;
648
}
649

    
650
static double get_rtt2(const struct nodeID** a){
651
  double rtt = get_rtt_of(*a);
652
  double ra = (isnan(rtt) ? rtt : add_noise(rtt, alpha_target) );
653
  dprintf( "rtt2: %s %f\n", node_addr(*a), ra);
654
  return ra;
655
}
656

    
657
static double get_rtt_root2(const struct nodeID** a){
658
  double rtt = get_rtt_of(*a);
659
  double ra = (isnan(rtt) ? rtt : add_noise((1/sqrt(rtt)), alpha_target) );
660
  dprintf( "rtt_root2: %s %f\n", node_addr(*a), ra);
661
  return ra;
662
}
663

    
664
static double get_rtt_root3(const struct nodeID** a){
665
  double rtt = get_rtt_of(*a);
666
  double ra = (isnan(rtt) ? rtt : add_noise((1/cbrt(rtt)), alpha_target) );
667
  dprintf( "rtt_root3: %s %f\n", node_addr(*a), ra);
668
  return ra;
669
}
670

    
671
static double get_rx_chunks_rtt(const struct nodeID** a){
672
  double rtt = get_rtt_of(*a);
673
  double rx_chunks = get_rx_chunks_of(*a);
674
  double ra = ((isnan(rtt) || isnan(rx_chunks)) ? rx_chunks : add_noise(( rx_chunks / rtt ), alpha_target) );
675
  dprintf( "rx_chunks_rtt: %s %f\n", node_addr(*a), ra);
676
  return ra;
677
}
678

    
679
static double get_offer2(const struct nodeID** a){
680
  double ra = add_noise(get_metadata_for_peer(*a).cps, alpha_target);
681
  dprintf( "offers: %s %f\n", node_addr(*a), ra);
682
  return ra;
683
}
684

    
685
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
686
  double ra, rb;
687
  ra = -( add_noise(get_metadata_for_peer(*a).capacity, alpha_target));
688
  rb = -( add_noise(get_metadata_for_peer(*b).capacity, alpha_target));
689

    
690
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
691
  else if (isnan(rb) || ra < rb) return -1;
692
  else return 1;
693
}
694

    
695
int vcmp_bw(const void* a, const void* b) {
696
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
697
}
698

    
699
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
700
  double ra, rb;
701
  ra = -( add_noise((get_metadata_for_peer(*a).capacity / (get_rtt_of(*a) / 1000)), alpha_target));
702
  rb = -( add_noise((get_metadata_for_peer(*b).capacity / (get_rtt_of(*b) / 1000)), alpha_target));
703

    
704
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
705
  else if (isnan(rb) || ra < rb) return -1;
706
  else return 1;
707
}
708

    
709
int vcmp_rtt_bw(const void* a, const void* b) {
710
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
711
}
712

    
713
int cmp_offers(const struct nodeID** a, const struct nodeID** b) {
714
  double ra, rb;
715
  ra = -( add_noise(get_metadata_for_peer(*a).cps, alpha_target) );
716
  rb = -( add_noise(get_metadata_for_peer(*b).cps, alpha_target) );
717
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
718
  else if (isnan(rb) || ra < rb) return -1;
719
  else return 1;
720
}
721

    
722
int vcmp_offers(const void* a, const void* b) {
723
  return cmp_offers((const struct nodeID**)a, (const struct nodeID**)b);
724
}
725

    
726
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
727
  double ra, rb;
728
  ra = -(add_noise(get_rx_chunks_of(*a), alpha_target));
729
  rb = -(add_noise(get_rx_chunks_of(*b), alpha_target));
730
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
731
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
732
        return 0;
733
  else if (isnan(rb) || ra < rb) 
734
        return -1;
735
  else 
736
        return 1;
737
}
738

    
739
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
740
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
741
}
742

    
743
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
744
  double ra, rb;
745
  ra = add_noise(get_transmitter_lossrate(*a), alpha_target);
746
  rb = add_noise(get_transmitter_lossrate(*b), alpha_target);
747
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
748
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
749
        return 0;
750
  else if (isnan(rb) || ra < rb) 
751
        return -1;
752
  else 
753
        return 1;
754
}
755

    
756
int vcmp_packet_loss(const void* a, const void* b) {
757
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
758
}
759

    
760
int cmp_affinity(const struct nodeID** a, const struct nodeID** b) {
761
  double ra, rb, mine, differencea, differenceb;
762
  
763
  ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
764
  rb = add_noise(get_metadata_for_peer(*b).capacity, alpha_target);
765
  mine = get_capacity();
766
    
767
  if ((!isnan(ra)) && (!isnan(rb))) {
768
        differencea = ra - mine;
769
        differenceb = rb - mine;
770
        // Both greater than mine
771
        if (differenceb > 0 && differencea > 0) {
772
                if (differencea > differenceb)
773
                        return 1;
774
                else if (differencea < differenceb)
775
                        return -1;
776
                else if (differencea == differenceb)
777
                        return 0;
778
        } else if (differenceb > 0 && differencea < 0) {
779
                return 1;
780
        } else if (differenceb < 0 && differencea > 0) {
781
                return -1;
782
        } else if (differenceb < 0 && differencea < 0) {
783
                if (differencea > differenceb)
784
                        return -1;
785
                else if (differencea < differenceb)
786
                        return 1;
787
                else if (differencea == differenceb)
788
                        return 0;
789
        }
790
  } else if (isnan(ra) && (!isnan(rb))) {
791
          return -1;
792
  } else if (isnan(rb) && (!isnan(ra))) {
793
          return 1;
794
  }
795
}
796

    
797
int vcmp_affinity(const void* a, const void* b) {
798
  return cmp_affinity((const struct nodeID**) a, (const struct nodeID**) b);
799
}
800

    
801
int cmp_bw_weight(const struct nodeID** a, const struct nodeID** b) {
802
  double ra, rb;
803
  ra = get_exp_for_peer(*a);
804
  rb = get_exp_for_peer(*b);
805
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
806
        return 0;
807
  else if (isnan(ra) || ra < rb) 
808
        return -1;
809
  else 
810
        return 1;
811
}
812

    
813
int vcmp_bw_weight(const void* a, const void* b) {
814
  return cmp_bw_weight((const struct nodeID**) a, (const struct nodeID**) b);
815
}
816

    
817
// Use add best policy specified at command line
818
int vcmp_add_best(const void* a, const void* b) {
819
  if (strcmp(topo_add_best_policy, "OFFERS") == 0) {
820
          return vcmp_offers(a, b);
821
  } else if (strcmp(topo_add_best_policy, "BW") == 0) {
822
          return vcmp_bw(a, b);
823
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
824
          return vcmp_rtt( a,  b);
825
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
826
          return vcmp_rtt_bw( a,  b);
827
  } else if (strcmp(topo_add_best_policy, "AFFINITY") == 0) {
828
          return vcmp_affinity( a,  b);
829
  } else if (strcmp(topo_add_best_policy, "BW_WEIGHT") == 0) {
830
          return vcmp_bw_weight( a,  b);
831
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
832
          return vcmp_rtt_bw( a, b);
833
  } else {
834
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
835
    return 1;
836
  }  
837
}
838

    
839
// Use keep best policy specified at command line
840
int vcmp_keep_best(const void* a, const void* b) {
841
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
842
          return vcmp_rtt(a, b);
843
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
844
          return vcmp_bw(a, b);
845
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
846
          return vcmp_rx_bytes_chunks(a, b);
847
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
848
          return vcmp_packet_loss( a, b);
849
  } else if (strcmp(topo_keep_best_policy, "AFFINITY") == 0) {
850
          return vcmp_affinity( a,  b);
851
  } else if (strcmp(topo_keep_best_policy, "BW_WEIGHT") == 0) {
852
          return vcmp_bw_weight( a,  b);
853
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
854
          return vcmp_rx_bytes_chunks( a, b);
855
  } else {
856
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
857
    return 1;
858
  }  
859
}
860

    
861
struct metadata decode_metadata(struct nodeID *from, const uint8_t *buff, int len) {
862
        struct metadata my_metadata;
863
        my_metadata.cb_size = int16_rcpy(buff + 2);
864
        my_metadata.cps = int16_rcpy(buff + 4);
865
        my_metadata.subnet = int16_rcpy(buff + 6);
866
        my_metadata.capacity = (double) int_rcpy(buff + 10);
867
        my_metadata.recv_delay = (double) (int_rcpy(buff + 14))*1e-6;
868
        dprintf("DECODE: cb_size %d, cps %d, capacity %f, subnet %d, recv_delay %f\n", my_metadata.cb_size, my_metadata.cps, my_metadata.capacity, my_metadata.subnet, my_metadata.recv_delay );
869
        return my_metadata;
870
}
871

    
872

    
873
// currently it just makes the peerset grow
874
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
875
{
876
  int n_ids, metasize, i, max_ids;
877
  
878
  struct peer *peers;
879
  struct timeval tnow, told;
880
  static const struct nodeID **savedids;
881
  static int savedids_size;
882
  
883
  if timerisset(&tout_bmap) {
884
    gettimeofday(&tnow, NULL);
885
    timersub(&tnow, &tout_bmap, &told);
886
    peers = peerset_get_peers(pset_incoming);
887
    // Prune incoming inactive peers
888
    for (i = 0; i < peerset_size(pset_incoming); i++) {
889
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
890
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
891
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
892
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
893
        if (peers[i].cb_size > 0) { 
894
          topoAddToBL(peers[i].id);
895
          remove_peer(peers[i--].id, topo_out, topo_in);
896
        } else {  
897
                if (get_capacity() < 4000000.0) {
898
                        topoAddToBL(peers[i].id);
899
                        remove_peer(peers[i--].id, topo_out, topo_in);
900
                }
901
        }
902
        //}
903
      }
904
    }
905
  }
906

    
907
  //handle explicit add/remove messages
908
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
909
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
910
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
911
    struct metadata received_metadata = decode_metadata(from, buff, len);
912
    topAddNeighbour(from,&received_metadata,sizeof(received_metadata));
913
    if (len != 24) {
914
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
915
      return;
916
    }
917
    switch (buff[1]) {
918
      case STREAMER_TOPOLOGY_MSG_ADD:
919
        // I have to add this peer to my outgoing neighbourood
920
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
921
        if (!peerset_get_peer(pset_outgoing, from)) {
922
                counter2++;
923
                add_peer(from, &received_metadata, true, false);
924
                }
925
//         if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
926
//                 remove_peer(from, topo_in, topo_out);
927
//         else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
928
//                 send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
929
        break;
930
      case STREAMER_TOPOLOGY_MSG_REMOVE:
931
        // Neighbour ask me to remove him from my outgoing neighbourhood
932
        if (peerset_get_peer(pset_outgoing, from) != NULL) {
933
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
934
                remove_peer(from, true, false);
935
                }
936
        break;
937
      default:
938
        fprintf(stderr, "Bad streamer topo message received!\n");
939
    }
940
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
941
    reg_incoming_neigh_size(peerset_size(pset_incoming));
942
    return;
943
  }
944

    
945
  int res_topo_parse_data = topoParseData(buff, len);
946

    
947
  // Exit if a gossiping message has been received
948
  if (res_topo_parse_data > 0) {
949
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
950
    reg_incoming_neigh_size(peerset_size(pset_incoming));
951
    return;
952
  }
953

    
954
  // Look for peers which can be removed from blacklist
955
  check_black_list_timeouts();
956
  update_metadata();
957
  topChangeMetadata(&my_metadata, sizeof(struct metadata));
958

    
959
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
960

    
961
  // Check if it's time to update the neighbourhood
962
  if (!am_i_source() && add_noise(time_difference, 0.1) > UPDATE_PEERS_TIMEOUT) {
963
    
964
    // If I have to control my fathers (--topo_in)
965
    if (topo_in && !topo_out) {
966
        peers = peerset_get_peers(pset_incoming);
967
        n_ids = peerset_size(pset_incoming);
968
    } else if (!topo_in && topo_out) { // If I have to control my sons (--topo_out)
969
        peers = peerset_get_peers(pset_outgoing);
970
        n_ids = peerset_size(pset_outgoing);
971
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
972
        peers = peerset_get_peers(pset_incoming);
973
        n_ids = peerset_size(pset_incoming);
974
    }
975

    
976
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
977
    metas = topGetMetadata(&metasize);
978
    add_weights = malloc(newids_size * sizeof(double));
979
    
980
    int index = 0;
981
    for (index = 0; index < newids_size; index++) {
982
            add_weights[index] = 0.0;
983
            dprintf("METAS node %s, capacity %f, cps %d, buffer %d\n", node_addr(newids[index]), metas[index].capacity, metas[index].cps, metas[index].cb_size);
984
    }
985
    dprintf("METAS my %f END\n", my_metadata.capacity);
986
    max_ids = n_ids + savedids_size + newids_size;
987
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
988

    
989
    int desired_part;
990
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
991
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
992
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
993

    
994
    for (i = 0, oldids_size = 0; i < n_ids; i++) {
995
      oldids[oldids_size++] = peers[i].id;
996
      fprintf(stderr," %s - RTT: %f, RX_BYTES_CHUNKS %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_rx_bytes_chunks_of(peers[i].id));
997
    }
998
    savedids_size = 0;
999

    
1000
    const struct nodeID *tmps[max_ids];
1001
    int tmps_size;
1002

    
1003
    
1004
    if ((topo_keep_best) && ((strcmp(topo_keep_best_policy, "W_BW") == 0) || (strcmp(topo_keep_best_policy, "W_RTT") == 0) || (strcmp(topo_keep_best_policy, "W_RTT2") == 0) || (strcmp(topo_keep_best_policy, "W_RTT3") == 0) || (strcmp(topo_keep_best_policy, "W_RX_CHUNKS_RTT") == 0) || (strcmp(topo_keep_best_policy, "W_RTT_TRICK") == 0) || (strcmp(topo_keep_best_policy, "W_RTT_TRICK2") == 0) || (strcmp(topo_keep_best_policy, "W_RTT_TRICK3") == 0) || (strcmp(topo_keep_best_policy, "W_RTT_TRICK_RX_CHUNKS") == 0) )) {
1005
      peerEvaluateFunction peerevaluate = NULL;
1006
      SchedOrdering ordering = SCHED_WEIGHTED;
1007
      if (strcmp(topo_keep_best_policy, "W_BW") == 0) {
1008
        peerevaluate = get_bw_local2;
1009
      } else if (strcmp(topo_keep_best_policy, "W_RTT") == 0) {
1010
        peerevaluate = get_rtt2;
1011
      } else if (strcmp(topo_keep_best_policy, "W_RTT2") == 0) {
1012
        peerevaluate = get_rtt_root2;
1013
      } else if (strcmp(topo_keep_best_policy, "W_RTT3") == 0) {
1014
        peerevaluate = get_rtt_root3;
1015
      } else if (strcmp(topo_keep_best_policy, "W_RX_CHUNKS_RTT") == 0) {
1016
        peerevaluate = get_rx_chunks_rtt;
1017
      } else if (strcmp(topo_keep_best_policy, "W_RTT_TRICK") == 0) {
1018
        peerevaluate = get_keep_subnet2;
1019
      } else if (strcmp(topo_keep_best_policy, "W_RTT_TRICK2") == 0) {
1020
        peerevaluate = get_keep_subnet_root2;
1021
      } else if (strcmp(topo_keep_best_policy, "W_RTT_TRICK3") == 0) {
1022
        peerevaluate = get_keep_subnet_root3;
1023
      } else if (strcmp(topo_keep_best_policy, "W_RTT_TRICK_RX_CHUNKS") == 0) {
1024
        peerevaluate = get_keep_subnet_rx_chunks;
1025
      }
1026

    
1027
      tmps_size = oldids_size;
1028
      nidset_select(ordering, oldids, oldids_size, peerevaluate, tmps, &tmps_size);
1029
    // select the topo_mem portion of peers to be kept (uniform random)
1030
    } else {  
1031
        if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
1032
                qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
1033
        } else {
1034
                nidset_shuffle(oldids, oldids_size);
1035
        }
1036
        for (i=0; i< oldids_size; i++) {
1037
                tmps[i] = oldids[i];
1038
        }
1039
    }
1040
    for (i = 0; i < oldids_size; i++) {
1041
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, BW %f, SUBNET %d\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_capacity_of(oldids[i]) , get_subnet_of(oldids[i]));
1042
    }
1043
    dprintf("QSORT KEEP BEST END\n");
1044
    
1045
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
1046
    
1047
    for (i=0; i< selecteds_size; i++) {
1048
            selecteds[i] = tmps[i];
1049
    }
1050

    
1051
    // compose list of known nodeids
1052
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
1053
  
1054
    // compose list of candidate nodeids
1055
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
1056

    
1057
    // select the alpha_target portion of desired peers
1058
    desired_part = (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
1059
    
1060
    // Filter out blacklisted ones    
1061
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
1062
    
1063
    if ((topo_add_best) && ((strcmp(topo_add_best_policy, "W_BW") == 0) || (strcmp(topo_add_best_policy, "W_RTT") == 0) || (strcmp(topo_add_best_policy, "W_OFFERS") == 0) || (strcmp(topo_add_best_policy, "W_RTT_TRICK") == 0) || (strcmp(topo_add_best_policy, "W_RTT_TRICK2") == 0) )) {
1064
      const struct nodeID *tmps2[max_ids];
1065
      int tmps2_size;
1066
      peerEvaluateFunction peerevaluate = NULL;
1067
      SchedOrdering ordering = SCHED_WEIGHTED;
1068
      if (strcmp(topo_add_best_policy, "W_BW") == 0) {
1069
        peerevaluate = get_bw2;
1070
      } else if (strcmp(topo_add_best_policy, "W_RTT") == 0) {
1071
        peerevaluate = get_rtt2;
1072
      } else if (strcmp(topo_add_best_policy, "W_OFFERS") == 0) {
1073
        peerevaluate = get_offer2;
1074
      } else if (strcmp(topo_add_best_policy, "W_RTT_TRICK") == 0) {
1075
        peerevaluate = get_add_subnet2;
1076
      } else if (strcmp(topo_add_best_policy, "W_RTT_TRICK2") == 0) {
1077
        peerevaluate = get_add_subnet_root2;
1078
      }
1079
      tmps2_size = MIN(desireds_size,desired_part);
1080
      nidset_select(ordering, desireds, desireds_size, peerevaluate, tmps2, &tmps2_size);
1081
      nidset_add_i(selecteds, &selecteds_size, max_ids, tmps2, tmps2_size);
1082
    } else {
1083
      if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
1084
        fill_add_weights();
1085
        qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
1086
      } else if (strcmp(topo_add_best_policy, "RND") == 0) {
1087
        nidset_shuffle(desireds, desireds_size);      
1088
      }
1089
    }
1090
    for (i = 0; i < desireds_size; i++) {
1091
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW/RTT %f, OFFERS %d, EXP %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity/get_rtt_of(desireds[i])), (get_metadata_for_peer(desireds[i])).cps, get_exp_for_peer(desireds[i]));
1092
        }
1093
    dprintf("QSORT ADD BEST END\n");
1094
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
1095
    
1096

    
1097
    // random from the rest
1098
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
1099
    nidset_shuffle(others, others_size);
1100
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
1101
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
1102

    
1103
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
1104
        selecteds_size, nodeids_size,
1105
        keep_size, oldids_size,
1106
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
1107
        random_size, others_size);
1108
    // add new ones
1109
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
1110
    for (i = 0; i < toadds_size; i++) {
1111
      int j;
1112
      //searching for the metadata
1113
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
1114
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
1115
        counter2++;
1116
        add_peer(newids[j], &metas[j], topo_out, topo_in);
1117
      } else {
1118
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
1119
      }
1120
    }
1121

    
1122
    // finally, remove those not needed
1123
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
1124
        
1125
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
1126
    for (i = 0; i < toremoves_size; i++) {
1127
       // Check that it has not been removed already
1128
       bool removed_already = false;
1129
       if (topo_in && !topo_out) {
1130
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
1131
       } else if (!topo_in && topo_out) {
1132
               removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
1133
       } else if (topo_in && topo_out) {
1134
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
1135
       }
1136
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
1137
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
1138
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
1139
          add_to_blacklist(node_addr(toremoves[i]));
1140
          }
1141
       fprintf(stderr," removing %s, cb_size %d\n", node_addr(toremoves[i]), (nodeid_to_peer_incoming(toremoves[i], 0))->cb_size);
1142
       // Check we are not removing the source if our bw is large enough
1143
       if ((nodeid_to_peer_incoming(toremoves[i], 0))->cb_size == 0) {
1144
               if ((get_capacity() < (double) 4000000)) {
1145
                       if (!removed_already) {
1146
                          remove_peer(toremoves[i], topo_out, topo_in);
1147
                       }
1148
               }
1149
       } else {
1150
          if (!removed_already) {
1151
                remove_peer(toremoves[i], topo_out, topo_in);
1152
          }
1153
       }
1154
    }
1155
    fprintf(stderr,"Topo remove end\n");
1156

    
1157
    for (i = 0; i < oldids_size; i++) {
1158
      nodeid_free(oldids[i]);
1159
    }
1160
    
1161
    free(add_weights);
1162
    add_weights = NULL;
1163
    
1164
    gettimeofday(&last_time_updated_peers, NULL); 
1165
    // Check if my QoE is low. If yes, update neighbourood more frequently and change a larger
1166
    // portion of neighbours.
1167
//     check_my_state();
1168
  } 
1169
  reg_outgoing_neigh_size(peerset_size(pset_outgoing));
1170
  reg_incoming_neigh_size(peerset_size(pset_incoming));
1171
}
1172

    
1173
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
1174
{
1175
  struct peer *p = peerset_get_peer(pset_incoming, id);
1176
  if (!p) {
1177
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
1178
    if (reg) {
1179
      add_peer(id,NULL, false, true);
1180
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
1181
      p = peerset_get_peer(pset_incoming,id);
1182
    }
1183
  }
1184

    
1185
  return p;
1186
}
1187

    
1188
struct peer *nodeid_to_peer_incoming(const struct nodeID* id, int reg)
1189
{
1190
  struct peer *p = peerset_get_peer(pset_incoming, id);
1191
  if (!p) {
1192
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
1193
    if (reg) {
1194
      add_peer(id,NULL, false, true);
1195
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
1196
      p = peerset_get_peer(pset_incoming,id);
1197
    }
1198
  }
1199

    
1200
  return p;
1201
}
1202

    
1203
struct peer *nodeid_to_peer_outgoing(const struct nodeID* id, int reg)
1204
{
1205
  struct peer *p = peerset_get_peer(pset_outgoing, id);
1206
  if (!p) {
1207
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
1208
    if (reg) {
1209
      add_peer(id,NULL, false, true);
1210
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
1211
      p = peerset_get_peer(pset_incoming,id);
1212
    }
1213
  }
1214

    
1215
  return p;
1216
}
1217

    
1218
int peers_init(void)
1219
{
1220
  fprintf(stderr,"peers_init\n");
1221
  pset_incoming = peerset_init(0);
1222
  pset_outgoing = peerset_init(0);
1223
  return pset_incoming && pset_outgoing ? 1 : 0;
1224
}
1225

    
1226
struct peerset *get_peers(void)
1227
{
1228
  return pset_outgoing;
1229
}
1230

    
1231
struct peerset *get_outgoing_peers(void)
1232
{
1233
  return pset_outgoing;
1234
}
1235

    
1236
struct peerset *get_incoming_peers(void)
1237
{
1238
  return pset_incoming;
1239
}