Statistics
| Branch: | Revision:

streamers / topology.c @ ff3e49b8

History | View | Annotate | Download (30.1 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "scheduler_la.h"
26
#include "nodeid_set.h"
27
#include "streaming.h"
28
#include "dbg.h"
29
#include "measures.h"
30
#include "streamer.h"
31
#include "hrc.h"
32
#include "blacklist.h"
33

    
34
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
35
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
36

    
37
#define MODULE  2147483647
38
#define A       16807
39
#define LASTXN  127773
40
#define UPTOMOD -2836
41
#define RATIO   0.46566128e-9
42

    
43
double alpha_target = 0.5;
44
double topo_mem = 0;
45

    
46
bool topo_out = true; //peer selects out-neighbours
47
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
48

    
49
bool topo_keep_best = false;
50
bool topo_add_best = false;
51
bool topo_black_list = false;
52

    
53
const char* topo_keep_best_policy = "";
54
const char* topo_add_best_policy = "";
55
const char* topo_black_list_policy = "";
56

    
57
int NEIGHBORHOOD_TARGET_SIZE = 30;
58
double UPDATE_PEERS_TIMEOUT = 5;
59
#define TMAN_MAX_IDLE 10
60
#define TMAN_LOG_EVERY 1000
61

    
62
#define STREAMER_TOPOLOGY_MSG_ADD 0
63
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
64

    
65
static struct peerset *pset_outgoing;
66
static struct peerset *pset_incoming;
67
static struct timeval tout_bmap = {20, 0};
68
static int counter = 0;
69
static int counter2 = 0;
70
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
71
static tmanRankingFunction rankFunct = simpleRanker;
72

    
73
struct metadata {
74
  uint16_t cb_size;
75
  uint16_t cps;
76
  double capacity;
77
  double recv_delay;
78
} __attribute__((packed));
79

    
80
static struct metadata my_metadata;
81
static int cnt = 0;
82
static struct nodeID *me = NULL;
83
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
84
static struct nodeID ** neighbors;
85
static struct timeval last_time_updated_peers;
86

    
87

    
88
struct nodeID **newids;
89
int newids_size;
90
double *add_weights;
91
static long seed = 1;
92
struct metadata *metas;
93

    
94
double add_noise(double to_return, double alpha) {
95
        double uniform = rand()/(RAND_MAX + 1.0);
96
        return (double) (((1-alpha) + (2*alpha_target*uniform)) * to_return) ; 
97
        }
98

    
99
long rnd32(long gSeed)
100
{
101
  long times, rest, prod1, prod2;
102

    
103
  times = gSeed / LASTXN;
104
  rest = gSeed - times * LASTXN;
105
  prod1 = times * UPTOMOD;
106
  prod2 = rest * A;
107
  gSeed = prod1 + prod2;
108
  if (gSeed < 0)
109
    gSeed = gSeed + MODULE;
110
  return gSeed;
111
}
112

    
113
double negexp (double mean, long *gSeed)
114
{
115
  double u;
116
  *gSeed = rnd32 (*gSeed);
117
  u = (*gSeed) * RATIO;
118
  return (-mean * log (u));
119
}
120

    
121
static void update_metadata(void) {
122
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
123
        my_metadata.cps =  (uint16_t) get_running_offer_threads();
124
        my_metadata.recv_delay = (double) get_receive_delay();
125
        my_metadata.capacity = (double) get_capacity();
126
}
127

    
128
static struct metadata get_metadata_for_peer(struct nodeID *id){
129
        int i = 0;
130
        struct metadata to_return;
131
        to_return.cb_size = NAN;
132
        to_return.cps =  NAN;
133
        to_return.recv_delay = NAN;
134
        to_return.capacity = NAN;
135

    
136
        if (newids_size > 0 && id != NULL) {
137
                for (i = 0 ; i < newids_size; i++) {
138
                        if (newids[i] != NULL) {
139
                                if (nodeid_equal(newids[i],id)){
140
                                        to_return.cb_size = metas[i].cb_size;
141
                                        to_return.cps = metas[i].cps;
142
                                        to_return.recv_delay = metas[i].recv_delay;
143
                                        to_return.capacity = metas[i].capacity;
144
                                        return to_return;
145
                                }
146
                        }
147
                }
148
        return to_return;
149
        }
150
}
151

    
152
static double get_exp_for_peer(struct nodeID *id){
153
        int i = 0;
154
        double to_return = NAN;
155

    
156
        if (newids_size > 0 && id != NULL) {
157
                for (i = 0 ; i < newids_size; i++) {
158
                        if (newids[i] != NULL) {
159
                                if (nodeid_equal(newids[i],id)){
160
                                        to_return = add_weights[i];
161
                                        return to_return;
162
                                }
163
                        }
164
                }
165
        }
166
        return to_return;
167
}
168

    
169
static void fill_add_weights(){
170
        int i = 0;
171
        if (newids_size > 0) {
172
                for (i = 0 ; i < newids_size; i++) {
173
                        add_weights[i] = negexp((1/((metas[i].capacity)/100000)), &seed);
174
                        dprintf("EXP: exp %f, capacity %f, seed %ld\n", add_weights[i], (metas[i].capacity), seed );
175
                }
176
        }
177
        return;
178
}
179

    
180

    
181
void check_my_state() {
182
  // Check if we are in adaptive mode
183
          double playout = get_chunk_playout();
184
        dprintf("CHECK: playout %f\n", playout);
185
        if (playout < 1) {
186
                UPDATE_PEERS_TIMEOUT = 2;
187
                topo_mem = 0.6;
188
        }
189
        else {
190
                UPDATE_PEERS_TIMEOUT = 10;
191
                topo_mem = 0.95;
192
        }
193
}
194

    
195
/**
196
  * @brief Prototype for function assigning a weigth to a nodeid
197
  * @return the weight associated to the peer
198
  */
199
typedef double (*nodeidEvaluateFunction)(struct nodeID **);
200

    
201
/**
202
  * Select best N of K peers with the given ordering method
203
  */
204
void nidset_select(SchedOrdering ordering, struct nodeID **peers, size_t peers_len, nodeidEvaluateFunction peerevaluate, struct nodeID **selected, size_t *selected_len ){
205
  selectWithOrdering(ordering, sizeof(peers[0]), (void*)peers, peers_len, (evaluateFunction)peerevaluate, (void*)selected, selected_len);
206
}
207

    
208

    
209
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
210

    
211
        double t,p1,p2;
212
        t = *((const double *)tin);
213
        p1 = *((const double *)p1in);
214
        p2 = *((const double *)p2in);
215

    
216
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
217
        else if (isnan(p1)) return 2;
218
        else if (isnan(p2)) return 1;
219
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
220

    
221
}
222

    
223
int topologyInit(struct nodeID *myID, const char *config)
224
{
225
        int i;
226
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
227
                bind_msg_type(mTypes[i]);
228
        update_metadata();
229
        me = myID;
230
        struct timeval tnow;
231
        gettimeofday(&tnow, NULL);
232
        gettimeofday(&last_time_updated_peers, NULL);
233
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
234
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
235
}
236

    
237
void topologyShutdown(void)
238
{
239
}
240

    
241
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
242
{
243
        // TODO: check this!! Just to use this function to bootstrap ncast...
244
        struct metadata m = {0};        //TODO: check what metadata option should mean
245

    
246
        if (counter < TMAN_MAX_IDLE)
247
                return topAddNeighbour(neighbour,&m,sizeof(m));
248
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
249
}
250

    
251
static int topoParseData(const uint8_t *buff, int len)
252
{
253
        int res = -1,ncs = 0,msize;
254
        const struct nodeID **n; const void *m;
255
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
256
                res = topParseData(buff,len);
257
//                 if (counter <= TMAN_MAX_IDLE)
258
//                         counter++;
259
        }
260
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
261
        {
262
                n = topGetNeighbourhood(&ncs);
263
                if (ncs) {
264
                m = topGetMetadata(&msize);
265
                res = tmanParseData(buff,len,n,ncs,m,msize);
266
                }
267
        }
268
  return res;
269
}
270

    
271
static const struct nodeID **topoGetNeighbourhood(int *n)
272
{
273
        int i; double d;
274
        if (counter > TMAN_MAX_IDLE) {
275
                uint8_t *mdata; int msize;
276
                *n = tmanGetNeighbourhoodSize();
277
                if (neighbors) free(neighbors);
278
                neighbors = calloc(*n,sizeof(struct nodeID *));
279
                tmanGetMetadata(&msize);
280
                mdata = calloc(*n,msize);
281
                tmanGivePeers(*n,neighbors,(void *)mdata);
282

    
283
                if (cnt % TMAN_LOG_EVERY == 0) {
284
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
285
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
286
                                d = *((double *)(mdata+i*msize));
287
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
288
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
289
                        }
290
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
291
                }
292

    
293
                free(mdata);
294
                return (const struct nodeID **)neighbors;
295
        }
296
        else
297
                return topGetNeighbourhood(n);
298
}
299

    
300
static void topoAddToBL (struct nodeID *id)
301
{
302
        if (counter >= TMAN_MAX_IDLE)
303
                tmanAddToBlackList(id);
304
        topAddToBlackList(id);
305
}
306

    
307
//TODO: send metadata as well
308
static int send_topo_msg(struct nodeID *dst, uint8_t type)
309
{
310
  char msg[2];
311
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
312
  msg[1] = type;
313
  
314
  return send_to_peer(me, dst, msg, 2);
315
}
316

    
317
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
318
{
319
  if (outgoing) {
320
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
321
      peerset_add_peer(pset_outgoing, id);
322
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
323
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
324
      send_bmap(id);
325
  }
326
  if (incoming) {
327
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
328
      peerset_add_peer(pset_incoming, id);
329
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
330
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
331
      /* add measures here */
332
      add_measures(id);
333
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
334
  }
335
}
336

    
337
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
338
{
339
  if (outgoing) {
340
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
341
      peerset_remove_peer(pset_outgoing, id);
342
  }
343
  if (incoming) {
344
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
345
      peerset_remove_peer(pset_incoming, id);
346
      /* delete measures here */
347
      delete_measures(id);
348
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
349
  }
350
}
351

    
352
//get the rtt. Currenly only MONL version is supported
353
static double get_rtt_of(const struct nodeID* n){
354
#ifdef MONL
355
  return get_rtt(n);
356
#else
357
  return NAN;
358
#endif
359
}
360

    
361
//get the declared capacity of a node
362
static double get_capacity_of(const struct nodeID* n){
363
  struct peer *p = peerset_get_peer(pset_incoming, n);
364
  if (p) {
365
    return p->capacity;
366
  } 
367
  p = peerset_get_peer(pset_outgoing, n);
368
  if (p) {
369
    return p->capacity;
370
  } 
371
  return NAN;
372
}
373

    
374
static double get_rx_bytes_chunks_of(const struct nodeID* n){
375
#ifdef MONL
376
  return get_rx_bytes_chunks(n);
377
#else
378
  return NAN;
379
#endif
380
}
381

    
382
static double get_transmitter_lossrate_of(const struct nodeID* n){
383
#ifdef MONL
384
  return get_transmitter_lossrate_of(n);
385
#else
386
  return NAN;
387
#endif
388
}
389

    
390
// Check if peer is the blacklist
391
//returns: 1:yes 0:no
392
int desiredness_unbled(const struct nodeID* n) {
393
  bool bled = black_listed(node_addr(n));
394

    
395
  if (!bled) {
396
    return 1;
397
  }
398
  return 0;
399
}
400

    
401
//returns: 1:yes 0:no -1:unknown
402
int blacklistness_lossrate(const struct nodeID* n) {
403
  double loss_rate = get_transmitter_lossrate(n);
404
  
405
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
406
  
407
  if (isnan(loss_rate)) {
408
    return -1;
409
  } else if (loss_rate > 0.10) {
410
    return 1;
411
  }
412

    
413
  return 0;
414
}
415

    
416
bool is_desired_unbled(const struct nodeID* n) {
417
  return (desiredness_unbled(n) == 1);
418
}
419

    
420
bool is_to_blacklist(const struct nodeID* n) {
421
  return (blacklistness_lossrate(n) == 1);
422
}
423

    
424
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
425
  double ra, rb;
426
  ra = add_noise(get_rtt_of(*a), alpha_target);
427
  rb = add_noise(get_rtt_of(*b), alpha_target);
428
  
429
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
430
  else if (isnan(rb) || ra < rb) return -1;
431
  else return 1;
432
}
433

    
434
int vcmp_rtt(const void* a, const void* b) {
435
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
436
}
437

    
438
static double get_bw2(const struct nodeID** a){
439
  double ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
440
  fprintf(stderr, "bw: %s %f\n", node_addr(*a), ra);
441
  return ra;
442
}
443

    
444
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
445
  double ra, rb;
446
  ra = -( add_noise(get_metadata_for_peer(*a).capacity, alpha_target));
447
  rb = -( add_noise(get_metadata_for_peer(*b).capacity, alpha_target));
448

    
449
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
450
  else if (isnan(rb) || ra < rb) return -1;
451
  else return 1;
452
}
453

    
454
int vcmp_bw(const void* a, const void* b) {
455
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
456
}
457

    
458
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
459
  double ra, rb;
460
  ra = -( add_noise((get_metadata_for_peer(*a).capacity / (get_rtt_of(*a) / 1000)), alpha_target));
461
  rb = -( add_noise((get_metadata_for_peer(*b).capacity / (get_rtt_of(*b) / 1000)), alpha_target));
462

    
463
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
464
  else if (isnan(rb) || ra < rb) return -1;
465
  else return 1;
466
}
467

    
468
int vcmp_rtt_bw(const void* a, const void* b) {
469
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
470
}
471

    
472
int cmp_offers(const struct nodeID** a, const struct nodeID** b) {
473
  double ra, rb;
474
  ra = -( add_noise(get_metadata_for_peer(*a).cps, alpha_target) );
475
  rb = -( add_noise(get_metadata_for_peer(*b).cps, alpha_target) );
476
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
477
  else if (isnan(rb) || ra < rb) return -1;
478
  else return 1;
479
}
480

    
481
int vcmp_offers(const void* a, const void* b) {
482
  return cmp_offers((const struct nodeID**)a, (const struct nodeID**)b);
483
}
484

    
485
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
486
  double ra, rb;
487
  ra = -(add_noise(get_rx_bytes_chunks_of(*a), alpha_target));
488
  rb = -(add_noise(get_rx_bytes_chunks_of(*b), alpha_target));
489
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
490
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
491
        return 0;
492
  else if (isnan(rb) || ra < rb) 
493
        return -1;
494
  else 
495
        return 1;
496
}
497

    
498
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
499
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
500
}
501

    
502
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
503
  double ra, rb;
504
  ra = add_noise(get_transmitter_lossrate(*a), alpha_target);
505
  rb = add_noise(get_transmitter_lossrate(*b), alpha_target);
506
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
507
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
508
        return 0;
509
  else if (isnan(rb) || ra < rb) 
510
        return -1;
511
  else 
512
        return 1;
513
}
514

    
515
int vcmp_packet_loss(const void* a, const void* b) {
516
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
517
}
518

    
519
int cmp_affinity(const struct nodeID** a, const struct nodeID** b) {
520
  double ra, rb, mine, differencea, differenceb;
521
  
522
  ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
523
  rb = add_noise(get_metadata_for_peer(*b).capacity, alpha_target);
524
  mine = get_capacity();
525
    
526
  if ((!isnan(ra)) && (!isnan(rb))) {
527
        differencea = ra - mine;
528
        differenceb = rb - mine;
529
        // Both greater than mine
530
        if (differenceb > 0 && differencea > 0) {
531
                if (differencea > differenceb)
532
                        return 1;
533
                else if (differencea < differenceb)
534
                        return -1;
535
                else if (differencea == differenceb)
536
                        return 0;
537
        } else if (differenceb > 0 && differencea < 0) {
538
                return 1;
539
        } else if (differenceb < 0 && differencea > 0) {
540
                return -1;
541
        } else if (differenceb < 0 && differencea < 0) {
542
                if (differencea > differenceb)
543
                        return -1;
544
                else if (differencea < differenceb)
545
                        return 1;
546
                else if (differencea == differenceb)
547
                        return 0;
548
        }
549
  } else if (isnan(ra) && (!isnan(rb))) {
550
          return -1;
551
  } else if (isnan(rb) && (!isnan(ra))) {
552
          return 1;
553
  }
554
}
555

    
556
int vcmp_affinity(const void* a, const void* b) {
557
  return cmp_affinity((const struct nodeID**) a, (const struct nodeID**) b);
558
}
559

    
560
int cmp_bw_weight(const struct nodeID** a, const struct nodeID** b) {
561
  double ra, rb;
562
  ra = get_exp_for_peer(*a);
563
  rb = get_exp_for_peer(*b);
564
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
565
        return 0;
566
  else if (isnan(ra) || ra < rb) 
567
        return -1;
568
  else 
569
        return 1;
570
}
571

    
572
int vcmp_bw_weight(const void* a, const void* b) {
573
  return cmp_bw_weight((const struct nodeID**) a, (const struct nodeID**) b);
574
}
575

    
576
// Use add best policy specified at command line
577
int vcmp_add_best(const void* a, const void* b) {
578
  if (strcmp(topo_add_best_policy, "OFFERS") == 0) {
579
          return vcmp_offers(a, b);
580
  } else if (strcmp(topo_add_best_policy, "BW") == 0) {
581
          return vcmp_bw(a, b);
582
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
583
          return vcmp_rtt( a,  b);
584
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
585
          return vcmp_rtt_bw( a,  b);
586
  } else if (strcmp(topo_add_best_policy, "AFFINITY") == 0) {
587
          return vcmp_affinity( a,  b);
588
  } else if (strcmp(topo_add_best_policy, "BW_WEIGHT") == 0) {
589
          return vcmp_bw_weight( a,  b);
590
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
591
          return vcmp_rtt_bw( a, b);
592
  } else {
593
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
594
    return 1;
595
  }  
596
}
597

    
598
// Use keep best policy specified at command line
599
int vcmp_keep_best(const void* a, const void* b) {
600
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
601
          return vcmp_rtt(a, b);
602
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
603
          return vcmp_bw(a, b);
604
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
605
          return vcmp_rx_bytes_chunks(a, b);
606
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
607
          return vcmp_packet_loss( a, b);
608
  } else if (strcmp(topo_keep_best_policy, "AFFINITY") == 0) {
609
          return vcmp_affinity( a,  b);
610
  } else if (strcmp(topo_keep_best_policy, "BW_WEIGHT") == 0) {
611
          return vcmp_bw_weight( a,  b);
612
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
613
          return vcmp_rx_bytes_chunks( a, b);
614
  } else {
615
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
616
    return 1;
617
  }  
618
}
619

    
620

    
621
// currently it just makes the peerset grow
622
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
623
{
624
  int n_ids, metasize, i, max_ids;
625
  
626
  struct peer *peers;
627
  struct timeval tnow, told;
628
  static const struct nodeID **savedids;
629
  static int savedids_size;
630
  
631
  if timerisset(&tout_bmap) {
632
    gettimeofday(&tnow, NULL);
633
    timersub(&tnow, &tout_bmap, &told);
634
    peers = peerset_get_peers(pset_incoming);
635
    // Prune incoming inactive peers
636
    for (i = 0; i < peerset_size(pset_incoming); i++) {
637
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
638
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
639
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
640
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
641
        topoAddToBL(peers[i].id);
642
        remove_peer(peers[i--].id, topo_out, topo_in);
643
        //}
644
      }
645
    }
646
  }
647

    
648
  //handle explicit add/remove messages
649
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
650
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
651
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
652
    if (len != 2) {
653
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
654
      return;
655
    }
656
    switch (buff[1]) {
657
      case STREAMER_TOPOLOGY_MSG_ADD:
658
        // I have to add this peer to my outgoing neighbourood
659
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
660
        if (!peerset_get_peer(pset_outgoing, from)) {
661
                counter2++;
662
                add_peer(from, NULL, true, false);
663
                }
664
//         if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
665
//                 remove_peer(from, topo_in, topo_out);
666
//         else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
667
//                 send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
668
        break;
669
      case STREAMER_TOPOLOGY_MSG_REMOVE:
670
        // Neighbour ask me to remove him from my outgoing neighbourhood
671
        if (peerset_get_peer(pset_outgoing, from) != NULL) {
672
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
673
                remove_peer(from, true, false);
674
                }
675
        break;
676
      default:
677
        fprintf(stderr, "Bad streamer topo message received!\n");
678
    }
679
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
680
    reg_incoming_neigh_size(peerset_size(pset_incoming));
681
    return;
682
  }
683

    
684
  int res_topo_parse_data = topoParseData(buff, len);
685

    
686
  // Exit if a gossiping message has been received
687
  if (res_topo_parse_data > 0) {
688
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
689
    reg_incoming_neigh_size(peerset_size(pset_incoming));
690
    return;
691
  }
692

    
693
  // Look for peers which can be removed from blacklist
694
  check_black_list_timeouts();
695
  update_metadata();
696
  topChangeMetadata(&my_metadata, sizeof(struct metadata));
697

    
698
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
699

    
700
  // Check if it's time to update the neighbourhood
701
  if (add_noise(time_difference, 0.1) > UPDATE_PEERS_TIMEOUT) {
702
    
703
    // If I have to control my sons (--topo_in)
704
    if (topo_in && !topo_out) {
705
        peers = peerset_get_peers(pset_outgoing);
706
        n_ids = peerset_size(pset_outgoing);
707
    } else if (!topo_in && topo_out) { // If I have to control my fathers (--topo_out)
708
        peers = peerset_get_peers(pset_incoming);
709
        n_ids = peerset_size(pset_incoming);
710
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
711
        peers = peerset_get_peers(pset_incoming);
712
        n_ids = peerset_size(pset_incoming);
713
    }
714

    
715
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
716
    metas = topGetMetadata(&metasize);
717
    add_weights = malloc(newids_size * sizeof(double));
718
    
719
    int index = 0;
720
    for (index = 0; index < newids_size; index++) {
721
            add_weights[index] = 0.0;
722
            dprintf("METAS node %s, capacity %f, cps %d, buffer %d\n", node_addr(newids[index]), metas[index].capacity, metas[index].cps, metas[index].cb_size);
723
    }
724
    dprintf("METAS my %f END\n", my_metadata.capacity);
725
    max_ids = n_ids + savedids_size + newids_size;
726
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
727

    
728
    int desired_part;
729
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
730
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
731
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
732

    
733
    if (topo_out) {
734
      for (i = 0, oldids_size = 0; i < peerset_size(pset_incoming); i++) {
735
        oldids[oldids_size++] = peers[i].id;
736
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
737
      }
738
    } else {
739
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
740
        oldids[oldids_size++] = savedids[i];
741
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
742
      }
743
      savedids_size = 0;
744
      free(savedids);
745
    }
746

    
747
    // select the topo_mem portion of peers to be kept (uniform random)
748
    if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
749
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
750
    } else {
751
      nidset_shuffle(oldids, oldids_size);
752
    }
753
    for (i = 0; i < oldids_size; i++) {
754
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), get_capacity_of(oldids[i]));
755
      }
756
    dprintf("QSORT KEEP BEST END\n");
757
    
758
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
759
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
760

    
761
    // compose list of known nodeids
762
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
763
  
764
    // compose list of candidate nodeids
765
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
766

    
767
    // select the alpha_target portion of desired peers
768
    desired_part = (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
769
    
770
    
771
    // Filter out blacklisted ones    
772
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
773
    
774
    if ((topo_add_best) && (strcmp(topo_add_best_policy, "W_BW") == 0)) {
775
      const struct nodeID *tmps[max_ids];
776
      int tmps_size;
777

    
778
      SchedOrdering ordering = SCHED_WEIGHTED;
779
      peerEvaluateFunction peerevaluate = get_bw2;
780
      tmps_size = MIN(desireds_size,desired_part);
781
      nidset_select(ordering, desireds, desireds_size, peerevaluate, tmps, &tmps_size);
782
      nidset_add_i(selecteds, &selecteds_size, max_ids, tmps, tmps_size);
783
    } else {
784
      if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
785
        fill_add_weights();
786
        qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
787
      } else {
788
        nidset_shuffle(desireds, desireds_size);      
789
      }
790
    }
791
    for (i = 0; i < desireds_size; i++) {
792
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW/RTT %f, OFFERS %d, EXP %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity/get_rtt_of(desireds[i])), (get_metadata_for_peer(desireds[i])).cps, get_exp_for_peer(desireds[i]));
793
        }
794
    dprintf("QSORT ADD BEST END\n");
795
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
796
    
797

    
798
    // random from the rest
799
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
800
    nidset_shuffle(others, others_size);
801
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
802
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
803

    
804
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
805
        selecteds_size, nodeids_size,
806
        keep_size, oldids_size,
807
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
808
        random_size, others_size);
809
    // add new ones
810
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
811
    for (i = 0; i < toadds_size; i++) {
812
      int j;
813
      //searching for the metadata
814
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
815
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
816
        counter2++;
817
        add_peer(newids[j], &metas[j], topo_out, topo_in);
818
      } else {
819
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
820
      }
821
    }
822

    
823
    // finally, remove those not needed
824
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
825
        
826
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
827
    for (i = 0; i < toremoves_size; i++) {
828
       // Check that it has not been removed already
829
       bool removed_already = false;
830
       if (topo_in && !topo_out) {
831
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
832
       } else if (!topo_in && topo_out) {
833
               removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
834
       } else if (topo_in && topo_out) {
835
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
836
       }
837
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
838
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
839
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
840
          add_to_blacklist(node_addr(toremoves[i]));
841
          }
842
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
843
       if (!removed_already) {
844
          remove_peer(toremoves[i], topo_out, topo_in);
845
          }
846
    }
847
    fprintf(stderr,"Topo remove end\n");
848

    
849
    if (!topo_out) {
850
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
851
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
852
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
853
      }
854
      for (i = 0; i < oldids_size; i++) {
855
        nodeid_free(oldids[i]);
856
      }
857
    }
858
    
859
    free(add_weights);
860
    add_weights = NULL;
861
    
862
    gettimeofday(&last_time_updated_peers, NULL); 
863
    // Check if my QoE is low. If yes, update neighbourood more frequently and change a larger
864
    // portion of neighbours.
865
//     check_my_state();
866
  } 
867
  reg_outgoing_neigh_size(peerset_size(pset_outgoing));
868
  reg_incoming_neigh_size(peerset_size(pset_incoming));
869
}
870

    
871
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
872
{
873
  struct peer *p = peerset_get_peer(pset_incoming, id);
874
  if (!p) {
875
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
876
    if (reg) {
877
      add_peer(id,NULL, false, true);
878
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
879
      p = peerset_get_peer(pset_incoming,id);
880
    }
881
  }
882

    
883
  return p;
884
}
885

    
886
struct peer *nodeid_to_peer_incoming(const struct nodeID* id, int reg)
887
{
888
  struct peer *p = peerset_get_peer(pset_incoming, id);
889
  if (!p) {
890
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
891
    if (reg) {
892
      add_peer(id,NULL, false, true);
893
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
894
      p = peerset_get_peer(pset_incoming,id);
895
    }
896
  }
897

    
898
  return p;
899
}
900

    
901
struct peer *nodeid_to_peer_outgoing(const struct nodeID* id, int reg)
902
{
903
  struct peer *p = peerset_get_peer(pset_outgoing, id);
904
  if (!p) {
905
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
906
    if (reg) {
907
      add_peer(id,NULL, false, true);
908
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
909
      p = peerset_get_peer(pset_incoming,id);
910
    }
911
  }
912

    
913
  return p;
914
}
915

    
916
int peers_init(void)
917
{
918
  fprintf(stderr,"peers_init\n");
919
  pset_incoming = peerset_init(0);
920
  pset_outgoing = peerset_init(0);
921
  return pset_incoming && pset_outgoing ? 1 : 0;
922
}
923

    
924
struct peerset *get_peers(void)
925
{
926
  return pset_outgoing;
927
}
928

    
929
struct peerset *get_outgoing_peers(void)
930
{
931
  return pset_outgoing;
932
}
933

    
934
struct peerset *get_incoming_peers(void)
935
{
936
  return pset_incoming;
937
}