Statistics
| Branch: | Revision:

streamers / topology.c @ 3a85536f

History | View | Annotate | Download (31.7 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21
#include <int_coding.h>
22

    
23
#include "compatibility/timer.h"
24

    
25
#include "topology.h"
26
#include "scheduler_la.h"
27
#include "nodeid_set.h"
28
#include "streaming.h"
29
#include "dbg.h"
30
#include "measures.h"
31
#include "streamer.h"
32
#include "hrc.h"
33
#include "blacklist.h"
34

    
35
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
36
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
37

    
38
#define MODULE  2147483647
39
#define A       16807
40
#define LASTXN  127773
41
#define UPTOMOD -2836
42
#define RATIO   0.46566128e-9
43

    
44
double alpha_target = 0.5;
45
double topo_mem = 0;
46

    
47
bool topo_out = true; //peer selects out-neighbours
48
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
49

    
50
bool topo_keep_best = false;
51
bool topo_add_best = false;
52
bool topo_black_list = false;
53

    
54
const char* topo_keep_best_policy = "";
55
const char* topo_add_best_policy = "";
56
const char* topo_black_list_policy = "";
57

    
58
int NEIGHBORHOOD_TARGET_SIZE = 30;
59
double UPDATE_PEERS_TIMEOUT = 5;
60
#define TMAN_MAX_IDLE 10
61
#define TMAN_LOG_EVERY 1000
62

    
63
#define STREAMER_TOPOLOGY_MSG_ADD 0
64
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
65

    
66
static struct peerset *pset_outgoing;
67
static struct peerset *pset_incoming;
68
static struct timeval tout_bmap = {20, 0};
69
static int counter = 0;
70
static int counter2 = 0;
71
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
72
static tmanRankingFunction rankFunct = simpleRanker;
73

    
74
struct metadata {
75
  uint16_t cb_size;
76
  uint16_t cps;
77
  double capacity;
78
  double recv_delay;
79
} __attribute__((packed));
80

    
81
static struct metadata my_metadata;
82
static int cnt = 0;
83
static struct nodeID *me = NULL;
84
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
85
static struct nodeID ** neighbors;
86
static struct timeval last_time_updated_peers;
87

    
88

    
89
struct nodeID **newids;
90
int newids_size;
91
double *add_weights;
92
static long seed = 1;
93
struct metadata *metas;
94

    
95
double add_noise(double to_return, double alpha) {
96
        double uniform = rand()/(RAND_MAX + 1.0);
97
        return (double) (((1-alpha) + (2*alpha_target*uniform)) * to_return) ; 
98
        }
99

    
100
long rnd32(long gSeed)
101
{
102
  long times, rest, prod1, prod2;
103

    
104
  times = gSeed / LASTXN;
105
  rest = gSeed - times * LASTXN;
106
  prod1 = times * UPTOMOD;
107
  prod2 = rest * A;
108
  gSeed = prod1 + prod2;
109
  if (gSeed < 0)
110
    gSeed = gSeed + MODULE;
111
  return gSeed;
112
}
113

    
114
double negexp (double mean, long *gSeed)
115
{
116
  double u;
117
  *gSeed = rnd32 (*gSeed);
118
  u = (*gSeed) * RATIO;
119
  return (-mean * log (u));
120
}
121

    
122
static void update_metadata(void) {
123
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
124
        my_metadata.cps =  (uint16_t) get_running_offer_threads();
125
        my_metadata.recv_delay = (double) get_receive_delay();
126
        my_metadata.capacity = (double) get_capacity();
127
}
128

    
129
static struct metadata get_metadata_for_peer(struct nodeID *id){
130
        int i = 0;
131
        struct metadata to_return;
132
        to_return.cb_size = NAN;
133
        to_return.cps =  NAN;
134
        to_return.recv_delay = NAN;
135
        to_return.capacity = NAN;
136

    
137
        if (newids_size > 0 && id != NULL) {
138
                for (i = 0 ; i < newids_size; i++) {
139
                        if (newids[i] != NULL) {
140
                                if (nodeid_equal(newids[i],id)){
141
                                        to_return.cb_size = metas[i].cb_size;
142
                                        to_return.cps = metas[i].cps;
143
                                        to_return.recv_delay = metas[i].recv_delay;
144
                                        to_return.capacity = metas[i].capacity;
145
                                        return to_return;
146
                                }
147
                        }
148
                }
149
        return to_return;
150
        }
151
}
152

    
153
static double get_exp_for_peer(struct nodeID *id){
154
        int i = 0;
155
        double to_return = NAN;
156

    
157
        if (newids_size > 0 && id != NULL) {
158
                for (i = 0 ; i < newids_size; i++) {
159
                        if (newids[i] != NULL) {
160
                                if (nodeid_equal(newids[i],id)){
161
                                        to_return = add_weights[i];
162
                                        return to_return;
163
                                }
164
                        }
165
                }
166
        }
167
        return to_return;
168
}
169

    
170
static void fill_add_weights(){
171
        int i = 0;
172
        if (newids_size > 0) {
173
                for (i = 0 ; i < newids_size; i++) {
174
                        add_weights[i] = negexp((1/((metas[i].capacity)/100000)), &seed);
175
                        dprintf("EXP: exp %f, capacity %f, seed %ld\n", add_weights[i], (metas[i].capacity), seed );
176
                }
177
        }
178
        return;
179
}
180

    
181

    
182
void check_my_state() {
183
  // Check if we are in adaptive mode
184
          double playout = get_chunk_playout();
185
        dprintf("CHECK: playout %f\n", playout);
186
        if (playout < 1) {
187
                UPDATE_PEERS_TIMEOUT = 2;
188
                topo_mem = 0.6;
189
        }
190
        else {
191
                UPDATE_PEERS_TIMEOUT = 10;
192
                topo_mem = 0.95;
193
        }
194
}
195

    
196
/**
197
  * @brief Prototype for function assigning a weigth to a nodeid
198
  * @return the weight associated to the peer
199
  */
200
typedef double (*nodeidEvaluateFunction)(struct nodeID **);
201

    
202
/**
203
  * Select best N of K peers with the given ordering method
204
  */
205
void nidset_select(SchedOrdering ordering, struct nodeID **peers, size_t peers_len, nodeidEvaluateFunction peerevaluate, struct nodeID **selected, size_t *selected_len ){
206
  selectWithOrdering(ordering, sizeof(peers[0]), (void*)peers, peers_len, (evaluateFunction)peerevaluate, (void*)selected, selected_len);
207
}
208

    
209

    
210
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
211

    
212
        double t,p1,p2;
213
        t = *((const double *)tin);
214
        p1 = *((const double *)p1in);
215
        p2 = *((const double *)p2in);
216

    
217
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
218
        else if (isnan(p1)) return 2;
219
        else if (isnan(p2)) return 1;
220
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
221

    
222
}
223

    
224
int topologyInit(struct nodeID *myID, const char *config)
225
{
226
        int i;
227
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
228
                bind_msg_type(mTypes[i]);
229
        update_metadata();
230
        me = myID;
231
        struct timeval tnow;
232
        gettimeofday(&tnow, NULL);
233
        gettimeofday(&last_time_updated_peers, NULL);
234
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
235
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
236
}
237

    
238
void topologyShutdown(void)
239
{
240
}
241

    
242
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
243
{
244
        // TODO: check this!! Just to use this function to bootstrap ncast...
245
        struct metadata m = {0};        //TODO: check what metadata option should mean
246

    
247
        if (counter < TMAN_MAX_IDLE)
248
                return topAddNeighbour(neighbour,&m,sizeof(m));
249
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
250
}
251

    
252
static int topoParseData(const uint8_t *buff, int len)
253
{
254
        int res = -1,ncs = 0,msize;
255
        const struct nodeID **n; const void *m;
256
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
257
                res = topParseData(buff,len);
258
//                 if (counter <= TMAN_MAX_IDLE)
259
//                         counter++;
260
        }
261
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
262
        {
263
                n = topGetNeighbourhood(&ncs);
264
                if (ncs) {
265
                m = topGetMetadata(&msize);
266
                res = tmanParseData(buff,len,n,ncs,m,msize);
267
                }
268
        }
269
  return res;
270
}
271

    
272
static const struct nodeID **topoGetNeighbourhood(int *n)
273
{
274
        int i; double d;
275
        if (counter > TMAN_MAX_IDLE) {
276
                uint8_t *mdata; int msize;
277
                *n = tmanGetNeighbourhoodSize();
278
                if (neighbors) free(neighbors);
279
                neighbors = calloc(*n,sizeof(struct nodeID *));
280
                tmanGetMetadata(&msize);
281
                mdata = calloc(*n,msize);
282
                tmanGivePeers(*n,neighbors,(void *)mdata);
283

    
284
                if (cnt % TMAN_LOG_EVERY == 0) {
285
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
286
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
287
                                d = *((double *)(mdata+i*msize));
288
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
289
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
290
                        }
291
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
292
                }
293

    
294
                free(mdata);
295
                return (const struct nodeID **)neighbors;
296
        }
297
        else
298
                return topGetNeighbourhood(n);
299
}
300

    
301
static void topoAddToBL (struct nodeID *id)
302
{
303
        if (counter >= TMAN_MAX_IDLE)
304
                tmanAddToBlackList(id);
305
        topAddToBlackList(id);
306
}
307

    
308
int encodeMetaData(uint8_t *buff, int buff_len)
309
{
310
  if (buff_len < 2 + 2 + 2 + 4 + 4) {
311
    /* Not enough space... */
312
    return -1;
313
  }
314
  int16_cpy(buff, my_metadata.cb_size);
315
  int16_cpy(buff + 2, my_metadata.cps);
316
  int_cpy(buff + 4, my_metadata.capacity);
317
  int_cpy(buff + 12, my_metadata.recv_delay);
318
  }
319

    
320
static int send_topo_msg(struct nodeID *dst, uint8_t type)
321
{
322
  int res = 0;
323
  int buff_len = 2 + sizeof(struct metadata);
324
  
325
  uint8_t *buff = malloc(buff_len);
326
  buff[0] = MSG_TYPE_STREAMER_TOPOLOGY;
327
  buff[1] = type;
328
  
329
  encodeMetaData(buff + 2, buff_len);
330
    
331
  res = send_to_peer(me, dst, buff, buff_len);
332
  free(buff);
333
  
334
  return res;
335
}
336

    
337
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
338
{
339
  if (outgoing) {
340
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
341
      peerset_add_peer(pset_outgoing, id);
342
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
343
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
344
      send_bmap(id);
345
  }
346
  if (incoming) {
347
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
348
      peerset_add_peer(pset_incoming, id);
349
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
350
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
351
      /* add measures here */
352
      add_measures(id);
353
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
354
  }
355
}
356

    
357
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
358
{
359
  if (outgoing) {
360
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
361
      peerset_remove_peer(pset_outgoing, id);
362
  }
363
  if (incoming) {
364
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
365
      peerset_remove_peer(pset_incoming, id);
366
      /* delete measures here */
367
      delete_measures(id);
368
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
369
  }
370
}
371

    
372
//get the rtt. Currenly only MONL version is supported
373
static double get_rtt_of(const struct nodeID* n){
374
#ifdef MONL
375
  return get_rtt(n);
376
#else
377
  return NAN;
378
#endif
379
}
380

    
381
//get the declared capacity of a node
382
static double get_capacity_of(const struct nodeID* n){
383
  struct peer *p = peerset_get_peer(pset_incoming, n);
384
  if (p) {
385
    return p->capacity;
386
  } 
387
  p = peerset_get_peer(pset_outgoing, n);
388
  if (p) {
389
    return p->capacity;
390
  } 
391
  return NAN;
392
}
393

    
394
static double get_rx_bytes_chunks_of(const struct nodeID* n){
395
#ifdef MONL
396
  return get_rx_bytes_chunks(n);
397
#else
398
  return NAN;
399
#endif
400
}
401

    
402
static double get_transmitter_lossrate_of(const struct nodeID* n){
403
#ifdef MONL
404
  return get_transmitter_lossrate_of(n);
405
#else
406
  return NAN;
407
#endif
408
}
409

    
410
// Check if peer is the blacklist
411
//returns: 1:yes 0:no
412
int desiredness_unbled(const struct nodeID* n) {
413
  bool bled = black_listed(node_addr(n));
414

    
415
  if (!bled) {
416
    return 1;
417
  }
418
  return 0;
419
}
420

    
421
//returns: 1:yes 0:no -1:unknown
422
int blacklistness_lossrate(const struct nodeID* n) {
423
  double loss_rate = get_transmitter_lossrate(n);
424
  
425
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
426
  
427
  if (isnan(loss_rate)) {
428
    return -1;
429
  } else if (loss_rate > 0.10) {
430
    return 1;
431
  }
432

    
433
  return 0;
434
}
435

    
436
bool is_desired_unbled(const struct nodeID* n) {
437
  return (desiredness_unbled(n) == 1);
438
}
439

    
440
bool is_to_blacklist(const struct nodeID* n) {
441
  return (blacklistness_lossrate(n) == 1);
442
}
443

    
444
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
445
  double ra, rb;
446
  ra = add_noise(get_rtt_of(*a), alpha_target);
447
  rb = add_noise(get_rtt_of(*b), alpha_target);
448
  
449
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
450
  else if (isnan(rb) || ra < rb) return -1;
451
  else return 1;
452
}
453

    
454
int vcmp_rtt(const void* a, const void* b) {
455
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
456
}
457

    
458
static double get_bw2(const struct nodeID** a){
459
  double ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
460
  dprintf( "bw: %s %f\n", node_addr(*a), ra);
461
  return ra;
462
}
463

    
464
static double get_bw_local2(const struct nodeID** a){
465
  double ra = add_noise(get_capacity_of(*a), alpha_target);
466
  dprintf( "bw: %s %f\n", node_addr(*a), ra);
467
  return ra;
468
}
469

    
470
static double get_rtt2(const struct nodeID** a){
471
  double ra = add_noise((1/get_rtt_of(*a)), alpha_target);
472
  dprintf( "rtt2: %s %f\n", node_addr(*a), ra);
473
  return ra;
474
}
475

    
476
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
477
  double ra, rb;
478
  ra = -( add_noise(get_metadata_for_peer(*a).capacity, alpha_target));
479
  rb = -( add_noise(get_metadata_for_peer(*b).capacity, alpha_target));
480

    
481
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
482
  else if (isnan(rb) || ra < rb) return -1;
483
  else return 1;
484
}
485

    
486
int vcmp_bw(const void* a, const void* b) {
487
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
488
}
489

    
490
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
491
  double ra, rb;
492
  ra = -( add_noise((get_metadata_for_peer(*a).capacity / (get_rtt_of(*a) / 1000)), alpha_target));
493
  rb = -( add_noise((get_metadata_for_peer(*b).capacity / (get_rtt_of(*b) / 1000)), alpha_target));
494

    
495
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
496
  else if (isnan(rb) || ra < rb) return -1;
497
  else return 1;
498
}
499

    
500
int vcmp_rtt_bw(const void* a, const void* b) {
501
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
502
}
503

    
504
int cmp_offers(const struct nodeID** a, const struct nodeID** b) {
505
  double ra, rb;
506
  ra = -( add_noise(get_metadata_for_peer(*a).cps, alpha_target) );
507
  rb = -( add_noise(get_metadata_for_peer(*b).cps, alpha_target) );
508
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
509
  else if (isnan(rb) || ra < rb) return -1;
510
  else return 1;
511
}
512

    
513
int vcmp_offers(const void* a, const void* b) {
514
  return cmp_offers((const struct nodeID**)a, (const struct nodeID**)b);
515
}
516

    
517
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
518
  double ra, rb;
519
  ra = -(add_noise(get_rx_bytes_chunks_of(*a), alpha_target));
520
  rb = -(add_noise(get_rx_bytes_chunks_of(*b), alpha_target));
521
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
522
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
523
        return 0;
524
  else if (isnan(rb) || ra < rb) 
525
        return -1;
526
  else 
527
        return 1;
528
}
529

    
530
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
531
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
532
}
533

    
534
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
535
  double ra, rb;
536
  ra = add_noise(get_transmitter_lossrate(*a), alpha_target);
537
  rb = add_noise(get_transmitter_lossrate(*b), alpha_target);
538
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
539
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
540
        return 0;
541
  else if (isnan(rb) || ra < rb) 
542
        return -1;
543
  else 
544
        return 1;
545
}
546

    
547
int vcmp_packet_loss(const void* a, const void* b) {
548
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
549
}
550

    
551
int cmp_affinity(const struct nodeID** a, const struct nodeID** b) {
552
  double ra, rb, mine, differencea, differenceb;
553
  
554
  ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
555
  rb = add_noise(get_metadata_for_peer(*b).capacity, alpha_target);
556
  mine = get_capacity();
557
    
558
  if ((!isnan(ra)) && (!isnan(rb))) {
559
        differencea = ra - mine;
560
        differenceb = rb - mine;
561
        // Both greater than mine
562
        if (differenceb > 0 && differencea > 0) {
563
                if (differencea > differenceb)
564
                        return 1;
565
                else if (differencea < differenceb)
566
                        return -1;
567
                else if (differencea == differenceb)
568
                        return 0;
569
        } else if (differenceb > 0 && differencea < 0) {
570
                return 1;
571
        } else if (differenceb < 0 && differencea > 0) {
572
                return -1;
573
        } else if (differenceb < 0 && differencea < 0) {
574
                if (differencea > differenceb)
575
                        return -1;
576
                else if (differencea < differenceb)
577
                        return 1;
578
                else if (differencea == differenceb)
579
                        return 0;
580
        }
581
  } else if (isnan(ra) && (!isnan(rb))) {
582
          return -1;
583
  } else if (isnan(rb) && (!isnan(ra))) {
584
          return 1;
585
  }
586
}
587

    
588
int vcmp_affinity(const void* a, const void* b) {
589
  return cmp_affinity((const struct nodeID**) a, (const struct nodeID**) b);
590
}
591

    
592
int cmp_bw_weight(const struct nodeID** a, const struct nodeID** b) {
593
  double ra, rb;
594
  ra = get_exp_for_peer(*a);
595
  rb = get_exp_for_peer(*b);
596
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
597
        return 0;
598
  else if (isnan(ra) || ra < rb) 
599
        return -1;
600
  else 
601
        return 1;
602
}
603

    
604
int vcmp_bw_weight(const void* a, const void* b) {
605
  return cmp_bw_weight((const struct nodeID**) a, (const struct nodeID**) b);
606
}
607

    
608
// Use add best policy specified at command line
609
int vcmp_add_best(const void* a, const void* b) {
610
  if (strcmp(topo_add_best_policy, "OFFERS") == 0) {
611
          return vcmp_offers(a, b);
612
  } else if (strcmp(topo_add_best_policy, "BW") == 0) {
613
          return vcmp_bw(a, b);
614
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
615
          return vcmp_rtt( a,  b);
616
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
617
          return vcmp_rtt_bw( a,  b);
618
  } else if (strcmp(topo_add_best_policy, "AFFINITY") == 0) {
619
          return vcmp_affinity( a,  b);
620
  } else if (strcmp(topo_add_best_policy, "BW_WEIGHT") == 0) {
621
          return vcmp_bw_weight( a,  b);
622
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
623
          return vcmp_rtt_bw( a, b);
624
  } else {
625
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
626
    return 1;
627
  }  
628
}
629

    
630
// Use keep best policy specified at command line
631
int vcmp_keep_best(const void* a, const void* b) {
632
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
633
          return vcmp_rtt(a, b);
634
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
635
          return vcmp_bw(a, b);
636
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
637
          return vcmp_rx_bytes_chunks(a, b);
638
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
639
          return vcmp_packet_loss( a, b);
640
  } else if (strcmp(topo_keep_best_policy, "AFFINITY") == 0) {
641
          return vcmp_affinity( a,  b);
642
  } else if (strcmp(topo_keep_best_policy, "BW_WEIGHT") == 0) {
643
          return vcmp_bw_weight( a,  b);
644
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
645
          return vcmp_rx_bytes_chunks( a, b);
646
  } else {
647
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
648
    return 1;
649
  }  
650
}
651

    
652

    
653
// currently it just makes the peerset grow
654
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
655
{
656
  int n_ids, metasize, i, max_ids;
657
  
658
  struct peer *peers;
659
  struct timeval tnow, told;
660
  static const struct nodeID **savedids;
661
  static int savedids_size;
662
  
663
  if timerisset(&tout_bmap) {
664
    gettimeofday(&tnow, NULL);
665
    timersub(&tnow, &tout_bmap, &told);
666
    peers = peerset_get_peers(pset_incoming);
667
    // Prune incoming inactive peers
668
    for (i = 0; i < peerset_size(pset_incoming); i++) {
669
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
670
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
671
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
672
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
673
        topoAddToBL(peers[i].id);
674
        remove_peer(peers[i--].id, topo_out, topo_in);
675
        //}
676
      }
677
    }
678
  }
679

    
680
  //handle explicit add/remove messages
681
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
682
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
683
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
684
    if (len != 2) {
685
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
686
      return;
687
    }
688
    switch (buff[1]) {
689
      case STREAMER_TOPOLOGY_MSG_ADD:
690
        // I have to add this peer to my outgoing neighbourood
691
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
692
        if (!peerset_get_peer(pset_outgoing, from)) {
693
                counter2++;
694
                add_peer(from, NULL, true, false);
695
                }
696
//         if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
697
//                 remove_peer(from, topo_in, topo_out);
698
//         else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
699
//                 send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
700
        break;
701
      case STREAMER_TOPOLOGY_MSG_REMOVE:
702
        // Neighbour ask me to remove him from my outgoing neighbourhood
703
        if (peerset_get_peer(pset_outgoing, from) != NULL) {
704
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
705
                remove_peer(from, true, false);
706
                }
707
        break;
708
      default:
709
        fprintf(stderr, "Bad streamer topo message received!\n");
710
    }
711
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
712
    reg_incoming_neigh_size(peerset_size(pset_incoming));
713
    return;
714
  }
715

    
716
  int res_topo_parse_data = topoParseData(buff, len);
717

    
718
  // Exit if a gossiping message has been received
719
  if (res_topo_parse_data > 0) {
720
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
721
    reg_incoming_neigh_size(peerset_size(pset_incoming));
722
    return;
723
  }
724

    
725
  // Look for peers which can be removed from blacklist
726
  check_black_list_timeouts();
727
  update_metadata();
728
  topChangeMetadata(&my_metadata, sizeof(struct metadata));
729

    
730
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
731

    
732
  // Check if it's time to update the neighbourhood
733
  if (add_noise(time_difference, 0.1) > UPDATE_PEERS_TIMEOUT) {
734
    
735
    // If I have to control my sons (--topo_in)
736
    if (topo_in && !topo_out) {
737
        peers = peerset_get_peers(pset_outgoing);
738
        n_ids = peerset_size(pset_outgoing);
739
    } else if (!topo_in && topo_out) { // If I have to control my fathers (--topo_out)
740
        peers = peerset_get_peers(pset_incoming);
741
        n_ids = peerset_size(pset_incoming);
742
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
743
        peers = peerset_get_peers(pset_incoming);
744
        n_ids = peerset_size(pset_incoming);
745
    }
746

    
747
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
748
    metas = topGetMetadata(&metasize);
749
    add_weights = malloc(newids_size * sizeof(double));
750
    
751
    int index = 0;
752
    for (index = 0; index < newids_size; index++) {
753
            add_weights[index] = 0.0;
754
            dprintf("METAS node %s, capacity %f, cps %d, buffer %d\n", node_addr(newids[index]), metas[index].capacity, metas[index].cps, metas[index].cb_size);
755
    }
756
    dprintf("METAS my %f END\n", my_metadata.capacity);
757
    max_ids = n_ids + savedids_size + newids_size;
758
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
759

    
760
    int desired_part;
761
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
762
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
763
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
764

    
765
    if (topo_out) {
766
      for (i = 0, oldids_size = 0; i < peerset_size(pset_incoming); i++) {
767
        oldids[oldids_size++] = peers[i].id;
768
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
769
      }
770
    } else {
771
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
772
        oldids[oldids_size++] = savedids[i];
773
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
774
      }
775
      savedids_size = 0;
776
      free(savedids);
777
    }
778
    
779
    if ((topo_keep_best) && ((strcmp(topo_keep_best_policy, "W_BW") == 0) || (strcmp(topo_keep_best_policy, "W_RTT") == 0) )) {
780
      const struct nodeID *tmps[max_ids];
781
      int tmps_size;
782
      peerEvaluateFunction peerevaluate = NULL;
783
      SchedOrdering ordering = SCHED_WEIGHTED;
784
      if (strcmp(topo_keep_best_policy, "W_BW") == 0) {
785
        peerevaluate = get_bw_local2;
786
      } else if (strcmp(topo_keep_best_policy, "W_RTT") == 0) {
787
        peerevaluate = get_rtt2;
788
      }
789
      tmps_size = oldids_size;
790
      nidset_select(ordering, oldids, oldids_size, peerevaluate, tmps, &tmps_size);
791
    // select the topo_mem portion of peers to be kept (uniform random)
792
    } else {  
793
        if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
794
                qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
795
        } else {
796
                nidset_shuffle(oldids, oldids_size);
797
        }
798
    }
799
    for (i = 0; i < oldids_size; i++) {
800
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), get_capacity_of(oldids[i]));
801
    }
802
    dprintf("QSORT KEEP BEST END\n");
803
    
804
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
805
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
806

    
807
    // compose list of known nodeids
808
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
809
  
810
    // compose list of candidate nodeids
811
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
812

    
813
    // select the alpha_target portion of desired peers
814
    desired_part = (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
815
    
816
    // Filter out blacklisted ones    
817
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
818
    
819
    if ((topo_add_best) && ((strcmp(topo_add_best_policy, "W_BW") == 0) || (strcmp(topo_add_best_policy, "W_RTT") == 0) )) {
820
      const struct nodeID *tmps[max_ids];
821
      int tmps_size;
822
      peerEvaluateFunction peerevaluate = NULL;
823
      SchedOrdering ordering = SCHED_WEIGHTED;
824
      if (strcmp(topo_add_best_policy, "W_BW") == 0) {
825
        peerevaluate = get_bw2;
826
      } else if (strcmp(topo_add_best_policy, "W_RTT") == 0) {
827
        peerevaluate = get_rtt2;
828
      }
829
      tmps_size = MIN(desireds_size,desired_part);
830
      nidset_select(ordering, desireds, desireds_size, peerevaluate, tmps, &tmps_size);
831
      nidset_add_i(selecteds, &selecteds_size, max_ids, tmps, tmps_size);
832
    } else {
833
      if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
834
        fill_add_weights();
835
        qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
836
      } else if (strcmp(topo_add_best_policy, "RND") == 0) {
837
        nidset_shuffle(desireds, desireds_size);      
838
      }
839
    }
840
    for (i = 0; i < desireds_size; i++) {
841
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW/RTT %f, OFFERS %d, EXP %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity/get_rtt_of(desireds[i])), (get_metadata_for_peer(desireds[i])).cps, get_exp_for_peer(desireds[i]));
842
        }
843
    dprintf("QSORT ADD BEST END\n");
844
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
845
    
846

    
847
    // random from the rest
848
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
849
    nidset_shuffle(others, others_size);
850
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
851
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
852

    
853
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
854
        selecteds_size, nodeids_size,
855
        keep_size, oldids_size,
856
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
857
        random_size, others_size);
858
    // add new ones
859
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
860
    for (i = 0; i < toadds_size; i++) {
861
      int j;
862
      //searching for the metadata
863
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
864
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
865
        counter2++;
866
        add_peer(newids[j], &metas[j], topo_out, topo_in);
867
      } else {
868
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
869
      }
870
    }
871

    
872
    // finally, remove those not needed
873
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
874
        
875
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
876
    for (i = 0; i < toremoves_size; i++) {
877
       // Check that it has not been removed already
878
       bool removed_already = false;
879
       if (topo_in && !topo_out) {
880
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
881
       } else if (!topo_in && topo_out) {
882
               removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
883
       } else if (topo_in && topo_out) {
884
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
885
       }
886
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
887
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
888
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
889
          add_to_blacklist(node_addr(toremoves[i]));
890
          }
891
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
892
       if (!removed_already) {
893
          remove_peer(toremoves[i], topo_out, topo_in);
894
          }
895
    }
896
    fprintf(stderr,"Topo remove end\n");
897

    
898
    if (!topo_out) {
899
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
900
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
901
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
902
      }
903
      for (i = 0; i < oldids_size; i++) {
904
        nodeid_free(oldids[i]);
905
      }
906
    }
907
    
908
    free(add_weights);
909
    add_weights = NULL;
910
    
911
    gettimeofday(&last_time_updated_peers, NULL); 
912
    // Check if my QoE is low. If yes, update neighbourood more frequently and change a larger
913
    // portion of neighbours.
914
//     check_my_state();
915
  } 
916
  reg_outgoing_neigh_size(peerset_size(pset_outgoing));
917
  reg_incoming_neigh_size(peerset_size(pset_incoming));
918
}
919

    
920
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
921
{
922
  struct peer *p = peerset_get_peer(pset_incoming, id);
923
  if (!p) {
924
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
925
    if (reg) {
926
      add_peer(id,NULL, false, true);
927
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
928
      p = peerset_get_peer(pset_incoming,id);
929
    }
930
  }
931

    
932
  return p;
933
}
934

    
935
struct peer *nodeid_to_peer_incoming(const struct nodeID* id, int reg)
936
{
937
  struct peer *p = peerset_get_peer(pset_incoming, id);
938
  if (!p) {
939
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
940
    if (reg) {
941
      add_peer(id,NULL, false, true);
942
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
943
      p = peerset_get_peer(pset_incoming,id);
944
    }
945
  }
946

    
947
  return p;
948
}
949

    
950
struct peer *nodeid_to_peer_outgoing(const struct nodeID* id, int reg)
951
{
952
  struct peer *p = peerset_get_peer(pset_outgoing, id);
953
  if (!p) {
954
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
955
    if (reg) {
956
      add_peer(id,NULL, false, true);
957
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
958
      p = peerset_get_peer(pset_incoming,id);
959
    }
960
  }
961

    
962
  return p;
963
}
964

    
965
int peers_init(void)
966
{
967
  fprintf(stderr,"peers_init\n");
968
  pset_incoming = peerset_init(0);
969
  pset_outgoing = peerset_init(0);
970
  return pset_incoming && pset_outgoing ? 1 : 0;
971
}
972

    
973
struct peerset *get_peers(void)
974
{
975
  return pset_outgoing;
976
}
977

    
978
struct peerset *get_outgoing_peers(void)
979
{
980
  return pset_outgoing;
981
}
982

    
983
struct peerset *get_incoming_peers(void)
984
{
985
  return pset_incoming;
986
}