Statistics
| Branch: | Revision:

streamers / topology.c @ 6405ff89

History | View | Annotate | Download (28 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30
#include "hrc.h"
31
#include "blacklist.h"
32

    
33
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
34
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
35

    
36
#define MODULE  2147483647
37
#define A       16807
38
#define LASTXN  127773
39
#define UPTOMOD -2836
40
#define RATIO   0.46566128e-9
41

    
42
double alpha_target = 0.5;
43
double topo_mem = 0;
44

    
45
bool topo_out = true; //peer selects out-neighbours
46
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
47

    
48
bool topo_keep_best = false;
49
bool topo_add_best = false;
50
bool topo_black_list = false;
51

    
52
const char* topo_keep_best_policy = "";
53
const char* topo_add_best_policy = "";
54
const char* topo_black_list_policy = "";
55

    
56
int NEIGHBORHOOD_TARGET_SIZE = 30;
57
int UPDATE_PEERS_TIMEOUT = 5;
58
#define TMAN_MAX_IDLE 10
59
#define TMAN_LOG_EVERY 1000
60

    
61
#define STREAMER_TOPOLOGY_MSG_ADD 0
62
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
63

    
64
static struct peerset *pset_outgoing;
65
static struct peerset *pset_incoming;
66
static struct timeval tout_bmap = {20, 0};
67
static int counter = 0;
68
static int counter2 = 0;
69
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
70
static tmanRankingFunction rankFunct = simpleRanker;
71

    
72
struct metadata {
73
  uint16_t cb_size;
74
  uint16_t cps;
75
  double capacity;
76
  double recv_delay;
77
} __attribute__((packed));
78

    
79
static struct metadata my_metadata;
80
static int cnt = 0;
81
static struct nodeID *me = NULL;
82
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
83
static struct nodeID ** neighbors;
84
static struct timeval last_time_updated_peers;
85

    
86
struct nodeID **newids;
87
int newids_size;
88
struct metadata *metas;
89

    
90
static void update_metadata(void) {
91
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
92
        my_metadata.cps =  (uint16_t) get_running_offer_threads();
93
        my_metadata.recv_delay = (double) get_receive_delay();
94
        my_metadata.capacity = (double) get_capacity();
95
}
96

    
97
static struct metadata get_metadata_for_peer(struct nodeID *id){
98
        int i = 0;
99
        struct metadata to_return;
100
        to_return.cb_size = NAN;
101
        to_return.cps =  NAN;
102
        to_return.recv_delay = NAN;
103
        to_return.capacity = NAN;
104

    
105
        if (newids_size > 0 && id != NULL) {
106
                for (i = 0 ; i < newids_size; i++) {
107
                        if (newids[i] != NULL) {
108
                                if (nodeid_equal(newids[i],id)){
109
                                        to_return.cb_size = metas[i].cb_size;
110
                                        to_return.cps = metas[i].cps;
111
                                        to_return.recv_delay = metas[i].recv_delay;
112
                                        to_return.capacity = metas[i].capacity;
113
                                        return to_return;
114
                                }
115
                        }
116
                }
117
        return to_return;
118
        }
119
}
120

    
121
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
122

    
123
        double t,p1,p2;
124
        t = *((const double *)tin);
125
        p1 = *((const double *)p1in);
126
        p2 = *((const double *)p2in);
127

    
128
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
129
        else if (isnan(p1)) return 2;
130
        else if (isnan(p2)) return 1;
131
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
132

    
133
}
134

    
135
int topologyInit(struct nodeID *myID, const char *config)
136
{
137
        int i;
138
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
139
                bind_msg_type(mTypes[i]);
140
        update_metadata();
141
        me = myID;
142
        struct timeval tnow;
143
        gettimeofday(&tnow, NULL);
144
        gettimeofday(&last_time_updated_peers, NULL);
145
        last_time_updated_peers.tv_sec -= UPDATE_PEERS_TIMEOUT;
146
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
147
}
148

    
149
void topologyShutdown(void)
150
{
151
}
152

    
153
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
154
{
155
        // TODO: check this!! Just to use this function to bootstrap ncast...
156
        struct metadata m = {0};        //TODO: check what metadata option should mean
157

    
158
        if (counter < TMAN_MAX_IDLE)
159
                return topAddNeighbour(neighbour,&m,sizeof(m));
160
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
161
}
162

    
163
static int topoParseData(const uint8_t *buff, int len)
164
{
165
        int res = -1,ncs = 0,msize;
166
        const struct nodeID **n; const void *m;
167
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
168
                res = topParseData(buff,len);
169
//                 if (counter <= TMAN_MAX_IDLE)
170
//                         counter++;
171
        }
172
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
173
        {
174
                n = topGetNeighbourhood(&ncs);
175
                if (ncs) {
176
                m = topGetMetadata(&msize);
177
                res = tmanParseData(buff,len,n,ncs,m,msize);
178
                }
179
        }
180
  return res;
181
}
182

    
183
static const struct nodeID **topoGetNeighbourhood(int *n)
184
{
185
        int i; double d;
186
        if (counter > TMAN_MAX_IDLE) {
187
                uint8_t *mdata; int msize;
188
                *n = tmanGetNeighbourhoodSize();
189
                if (neighbors) free(neighbors);
190
                neighbors = calloc(*n,sizeof(struct nodeID *));
191
                tmanGetMetadata(&msize);
192
                mdata = calloc(*n,msize);
193
                tmanGivePeers(*n,neighbors,(void *)mdata);
194

    
195
                if (cnt % TMAN_LOG_EVERY == 0) {
196
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
197
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
198
                                d = *((double *)(mdata+i*msize));
199
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
200
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
201
                        }
202
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
203
                }
204

    
205
                free(mdata);
206
                return (const struct nodeID **)neighbors;
207
        }
208
        else
209
                return topGetNeighbourhood(n);
210
}
211

    
212
static void topoAddToBL (struct nodeID *id)
213
{
214
        if (counter >= TMAN_MAX_IDLE)
215
                tmanAddToBlackList(id);
216
        topAddToBlackList(id);
217
}
218

    
219
//TODO: send metadata as well
220
static int send_topo_msg(struct nodeID *dst, uint8_t type)
221
{
222
  char msg[2];
223
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
224
  msg[1] = type;
225
  
226
  return send_to_peer(me, dst, msg, 2);
227
}
228

    
229
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
230
{
231
  if (outgoing) {
232
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
233
      peerset_add_peer(pset_outgoing, id);
234
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
235
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
236
      /* add measures here */
237
      add_measures(id);
238
      send_bmap(id);
239
  }
240
  if (incoming) {
241
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
242
      peerset_add_peer(pset_incoming, id);
243
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
244
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
245
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
246
  }
247
}
248

    
249
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
250
{
251
  if (outgoing) {
252
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
253
      /* delete measures here */
254
      delete_measures(id);
255
      peerset_remove_peer(pset_outgoing, id);
256
  }
257
  if (incoming) {
258
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
259
      peerset_remove_peer(pset_incoming, id);
260
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
261
  }
262
}
263

    
264
//get the rtt. Currenly only MONL version is supported
265
static double get_rtt_of(const struct nodeID* n){
266
#ifdef MONL
267
  return get_rtt(n);
268
#else
269
  return NAN;
270
#endif
271
}
272

    
273
//get the declared capacity of a node
274
static double get_capacity_of(const struct nodeID* n){
275
  struct peer *p = peerset_get_peer(pset_incoming, n);
276
  if (p) {
277
    return p->capacity;
278
  } 
279
  p = peerset_get_peer(pset_outgoing, n);
280
  if (p) {
281
    return p->capacity;
282
  } 
283
  return NAN;
284
}
285

    
286
static double get_rx_bytes_chunks_of(const struct nodeID* n){
287
#ifdef MONL
288
  return get_rx_bytes_chunks(n);
289
#else
290
  return NAN;
291
#endif
292
}
293

    
294
static double get_transmitter_lossrate_of(const struct nodeID* n){
295
#ifdef MONL
296
  return get_transmitter_lossrate_of(n);
297
#else
298
  return NAN;
299
#endif
300
}
301

    
302
// Check if peer is the blacklist
303
//returns: 1:yes 0:no
304
int desiredness_unbled(const struct nodeID* n) {
305
  bool bled = black_listed(node_addr(n));
306

    
307
  if (!bled) {
308
    return 1;
309
  }
310
  return 0;
311
}
312

    
313
//returns: 1:yes 0:no -1:unknown
314
int blacklistness_lossrate(const struct nodeID* n) {
315
  double loss_rate = get_transmitter_lossrate(n);
316
  
317
  dprintf("BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
318
  
319
  if (isnan(loss_rate)) {
320
    return -1;
321
  } else if (loss_rate > 0.10) {
322
    return 1;
323
  }
324

    
325
  return 0;
326
}
327

    
328
bool is_desired_unbled(const struct nodeID* n) {
329
  return (desiredness_unbled(n) == 1);
330
}
331

    
332
bool is_to_blacklist(const struct nodeID* n) {
333
  return (blacklistness_lossrate(n) == 1);
334
}
335

    
336
double add_noise(double to_return, double alpha) {
337
        double uniform = rand()/(RAND_MAX + 1.0);
338
        return (double) (((1-alpha) + (2*alpha_target*uniform)) * to_return) ; 
339
        }
340

    
341
long rnd32(long gSeed)
342
{
343
  long times, rest, prod1, prod2;
344

    
345
  times = gSeed / LASTXN;
346
  rest = gSeed - times * LASTXN;
347
  prod1 = times * UPTOMOD;
348
  prod2 = rest * A;
349
  gSeed = prod1 + prod2;
350
  if (gSeed < 0)
351
    gSeed = gSeed + MODULE;
352
  return gSeed;
353
}
354

    
355
double negexp (double mean, long *gSeed)
356
{
357
  double u;
358
  *gSeed = rnd32 (*gSeed);
359
  u = (*gSeed) * RATIO;
360
  return (-mean * log (u));
361
}
362

    
363
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
364
  double ra, rb;
365
  ra = add_noise(get_rtt_of(*a), alpha_target);
366
  rb = add_noise(get_rtt_of(*b), alpha_target);
367
  
368
  fprintf(stderr, "RTTS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
369
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
370
  else if (isnan(rb) || ra < rb) return -1;
371
  else return 1;
372
}
373

    
374
int vcmp_rtt(const void* a, const void* b) {
375
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
376
}
377

    
378
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
379
  double ra, rb;
380
  ra = -( add_noise(get_metadata_for_peer(*a).capacity, alpha_target));
381
  rb = -( add_noise(get_metadata_for_peer(*b).capacity, alpha_target));
382

    
383
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
384
  else if (isnan(rb) || ra < rb) return -1;
385
  else return 1;
386
}
387

    
388
int vcmp_bw(const void* a, const void* b) {
389
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
390
}
391

    
392
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
393
  double ra, rb;
394
  ra = -( add_noise((get_metadata_for_peer(*a).capacity / (get_rtt_of(*a) / 1000)), alpha_target));
395
  rb = -( add_noise((get_metadata_for_peer(*b).capacity / (get_rtt_of(*b) / 1000)), alpha_target));
396

    
397
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
398
  else if (isnan(rb) || ra < rb) return -1;
399
  else return 1;
400
}
401

    
402
int vcmp_rtt_bw(const void* a, const void* b) {
403
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
404
}
405

    
406
int cmp_offers(const struct nodeID** a, const struct nodeID** b) {
407
  double ra, rb;
408
  ra = -( add_noise(get_metadata_for_peer(*a).cps, alpha_target) );
409
  rb = -( add_noise(get_metadata_for_peer(*b).cps, alpha_target) );
410
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
411
  else if (isnan(rb) || ra < rb) return -1;
412
  else return 1;
413
}
414

    
415
int vcmp_offers(const void* a, const void* b) {
416
  return cmp_offers((const struct nodeID**)a, (const struct nodeID**)b);
417
}
418

    
419
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
420
  double ra, rb;
421
  ra = -(add_noise(get_rx_bytes_chunks_of(*a), alpha_target));
422
  rb = -(add_noise(get_rx_bytes_chunks_of(*b), alpha_target));
423
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
424
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
425
        return 0;
426
  else if (isnan(rb) || ra < rb) 
427
        return -1;
428
  else 
429
        return 1;
430
}
431

    
432
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
433
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
434
}
435

    
436
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
437
  double ra, rb;
438
  ra = add_noise(get_transmitter_lossrate(*a), alpha_target);
439
  rb = add_noise(get_transmitter_lossrate(*b), alpha_target);
440
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
441
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
442
        return 0;
443
  else if (isnan(rb) || ra < rb) 
444
        return -1;
445
  else 
446
        return 1;
447
}
448

    
449
int vcmp_packet_loss(const void* a, const void* b) {
450
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
451
}
452

    
453
int cmp_affinity(const struct nodeID** a, const struct nodeID** b) {
454
  double ra, rb, mine, differencea, differenceb;
455
  
456
  ra = add_noise(get_metadata_for_peer(*a).capacity, alpha_target);
457
  rb = add_noise(get_metadata_for_peer(*b).capacity, alpha_target);
458
  mine = get_capacity();
459
    
460
  if ((!isnan(ra)) && (!isnan(rb))) {
461
        differencea = ra - mine;
462
        differenceb = rb - mine;
463
        // Both greater than mine
464
        if (differenceb > 0 && differencea > 0) {
465
                if (differencea > differenceb)
466
                        return 1;
467
                else if (differencea < differenceb)
468
                        return -1;
469
                else if (differencea == differenceb)
470
                        return 0;
471
        } else if (differenceb > 0 && differencea < 0) {
472
                return 1;
473
        } else if (differenceb < 0 && differencea > 0) {
474
                return -1;
475
        } else if (differenceb < 0 && differencea < 0) {
476
                if (differencea > differenceb)
477
                        return -1;
478
                else if (differencea < differenceb)
479
                        return 1;
480
                else if (differencea == differenceb)
481
                        return 0;
482
        }
483
  } else if (isnan(ra) && (!isnan(rb))) {
484
          return -1;
485
  } else if (isnan(rb) && (!isnan(ra))) {
486
          return 1;
487
  }
488
}
489

    
490
int vcmp_affinity(const void* a, const void* b) {
491
  return cmp_affinity((const struct nodeID**) a, (const struct nodeID**) b);
492
}
493

    
494
int cmp_bw_weight(const struct nodeID** a, const struct nodeID** b) {
495
  double bwa, bwb, ra, rb, vca, vcb, seeda, seedb;
496
  bwa = get_metadata_for_peer(*a).capacity;
497
  bwb = get_metadata_for_peer(*b).capacity;
498
  if (!isnan(ra)) {
499
        ra = 1/(bwa/1000000);
500
        seeda = rand()/(RAND_MAX + 1.0);
501
        vca = negexp(ra, &seeda);
502
  }
503
  else { 
504
        vca = NAN;
505
  }
506
  if (!isnan(rb)) {
507
        rb = 1/(bwb/1000000);
508
        seedb = rand()/(RAND_MAX + 1.0);
509
        vcb = negexp(rb, &seedb);
510
  }
511
  else {
512
        vcb = NAN;
513
  }
514
  
515
  if ((isnan(vca) && isnan(vcb)) || vca == vcb) 
516
        return 0;
517
  else if (isnan(vcb) || vca < vcb) 
518
        return -1;
519
  else 
520
        return 1;
521
}
522

    
523
int vcmp_bw_weight(const void* a, const void* b) {
524
  return cmp_bw_weight((const struct nodeID**) a, (const struct nodeID**) b);
525
}
526

    
527
// Use add best policy specified at command line
528
int vcmp_add_best(const void* a, const void* b) {
529
  if (strcmp(topo_add_best_policy, "OFFERS") == 0) {
530
          return vcmp_offers(a, b);
531
  } else if (strcmp(topo_add_best_policy, "BW") == 0) {
532
          return vcmp_bw(a, b);
533
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
534
          return vcmp_rtt( a,  b);
535
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
536
          return vcmp_rtt_bw( a,  b);
537
  } else if (strcmp(topo_add_best_policy, "AFFINITY") == 0) {
538
          return vcmp_affinity( a,  b);
539
  } else if (strcmp(topo_add_best_policy, "BW_WEIGHT") == 0) {
540
          return vcmp_bw_weight( a,  b);
541
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
542
          return vcmp_rtt_bw( a, b);
543
  } else {
544
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
545
    return 1;
546
  }  
547
}
548

    
549
// Use keep best policy specified at command line
550
int vcmp_keep_best(const void* a, const void* b) {
551
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
552
          return vcmp_rtt(a, b);
553
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
554
          return vcmp_bw(a, b);
555
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
556
          return vcmp_rx_bytes_chunks(a, b);
557
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
558
          return vcmp_packet_loss( a, b);
559
  } else if (strcmp(topo_keep_best_policy, "AFFINITY") == 0) {
560
          return vcmp_affinity( a,  b);
561
  } else if (strcmp(topo_keep_best_policy, "BW_WEIGHT") == 0) {
562
          return vcmp_bw_weight( a,  b);
563
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
564
          return vcmp_rx_bytes_chunks( a, b);
565
  } else {
566
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
567
    return 1;
568
  }  
569
}
570

    
571

    
572
// currently it just makes the peerset grow
573
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
574
{
575
  int n_ids, metasize, i, max_ids;
576
  
577
  struct peer *peers;
578
  struct timeval tnow, told;
579
  static const struct nodeID **savedids;
580
  static int savedids_size;
581
  
582
  if timerisset(&tout_bmap) {
583
    gettimeofday(&tnow, NULL);
584
    timersub(&tnow, &tout_bmap, &told);
585
    peers = peerset_get_peers(pset_incoming);
586
    // Prune incoming inactive peers
587
    for (i = 0; i < peerset_size(pset_incoming); i++) {
588
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
589
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
590
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
591
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
592
        topoAddToBL(peers[i].id);
593
        remove_peer(peers[i--].id, topo_out, topo_in);
594
        //}
595
      }
596
    }
597
  }
598

    
599
  //handle explicit add/remove messages
600
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
601
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
602
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
603
    if (len != 2) {
604
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
605
      return;
606
    }
607
    switch (buff[1]) {
608
      case STREAMER_TOPOLOGY_MSG_ADD:
609
        // I have to add this peer to my outgoing neighbourood
610
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
611
        if (!peerset_get_peer(pset_outgoing, from)) {
612
                counter2++;
613
                add_peer(from, NULL, true, false);
614
                }
615
//         if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
616
//                 remove_peer(from, topo_in, topo_out);
617
//         else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
618
//                 send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
619
        break;
620
      case STREAMER_TOPOLOGY_MSG_REMOVE:
621
        // Neighbour ask me to remove him from my outgoing neighbourhood
622
        if (peerset_get_peer(pset_outgoing, from) != NULL) {
623
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
624
                remove_peer(from, true, false);
625
                }
626
        break;
627
      default:
628
        fprintf(stderr, "Bad streamer topo message received!\n");
629
    }
630
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
631
    reg_incoming_neigh_size(peerset_size(pset_incoming));
632
    return;
633
  }
634

    
635
  int res_topo_parse_data = topoParseData(buff, len);
636

    
637
  // Exit if a gossiping message has been received
638
  if (res_topo_parse_data > 0) {
639
    reg_outgoing_neigh_size(peerset_size(pset_outgoing));
640
    reg_incoming_neigh_size(peerset_size(pset_incoming));
641
    return;
642
  }
643

    
644
  // Look for peers which can be removed from blacklist
645
  check_black_list_timeouts();
646
  update_metadata();
647
  topChangeMetadata(&my_metadata, sizeof(struct metadata));
648

    
649
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6);
650

    
651
  // Check if it's time to update the neighbourhood
652
  if (add_noise(time_difference, 0.1) > UPDATE_PEERS_TIMEOUT) {
653
    
654
    // If I have to control my sons (--topo_in)
655
    if (topo_in && !topo_out) {
656
        peers = peerset_get_peers(pset_outgoing);
657
        n_ids = peerset_size(pset_outgoing);
658
    } else if (!topo_in && topo_out) { // If I have to control my fathers (--topo_out)
659
        peers = peerset_get_peers(pset_incoming);
660
        n_ids = peerset_size(pset_incoming);
661
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
662
        peers = peerset_get_peers(pset_incoming);
663
        n_ids = peerset_size(pset_incoming);
664
    }
665

    
666
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
667
    metas = topGetMetadata(&metasize);
668
    int index = 0;
669
    for (index = 0; index < newids_size; index++) {
670
            dprintf("METAS node %s, capacity %f, cps %d, buffer %d\n", node_addr(newids[index]), metas[index].capacity, metas[index].cps, metas[index].cb_size);
671
    }
672
    dprintf("METAS my %f END\n", my_metadata.capacity);
673
    max_ids = n_ids + savedids_size + newids_size;
674
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
675

    
676
    int desired_part;
677
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
678
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
679
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
680

    
681
    if (topo_out) {
682
      for (i = 0, oldids_size = 0; i < peerset_size(pset_incoming); i++) {
683
        oldids[oldids_size++] = peers[i].id;
684
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
685
      }
686
    } else {
687
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
688
        oldids[oldids_size++] = savedids[i];
689
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
690
      }
691
      savedids_size = 0;
692
      free(savedids);
693
    }
694

    
695
    // select the topo_mem portion of peers to be kept (uniform random)
696
    if ((topo_keep_best) && (strcmp(topo_keep_best_policy, "RND") != 0)) {
697
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
698
    } else {
699
      nidset_shuffle(oldids, oldids_size);
700
    }
701
    for (i = 0; i < oldids_size; i++) {
702
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), get_capacity_of(oldids[i]));
703
      }
704
    dprintf("QSORT KEEP BEST END\n");
705
    
706
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
707
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
708

    
709
    // compose list of known nodeids
710
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
711
  
712
    // compose list of candidate nodeids
713
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
714

    
715
    // select the alpha_target portion of desired peers
716
    desired_part = (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
717
    
718
    
719
    // Filter out blacklisted ones    
720
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
721
    
722
    if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
723
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
724
    } else {
725
      nidset_shuffle(desireds, desireds_size);
726
    }
727
    for (i = 0; i < desireds_size; i++) {
728
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW/RTT %f, OFFERS %d\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity/get_rtt_of(desireds[i])), (get_metadata_for_peer(desireds[i])).cps);
729
      }
730
    dprintf("QSORT ADD BEST END\n");
731
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
732

    
733
    // random from the rest
734
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
735
    nidset_shuffle(others, others_size);
736
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
737
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
738

    
739
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
740
        selecteds_size, nodeids_size,
741
        keep_size, oldids_size,
742
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
743
        random_size, others_size);
744
    // add new ones
745
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
746
    for (i = 0; i < toadds_size; i++) {
747
      int j;
748
      //searching for the metadata
749
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
750
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
751
        counter2++;
752
        add_peer(newids[j], &metas[j], topo_out, topo_in);
753
      } else {
754
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
755
      }
756
    }
757

    
758
    // finally, remove those not needed
759
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
760
        
761
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
762
    for (i = 0; i < toremoves_size; i++) {
763
       // Check that it has not been removed already
764
       bool removed_already = false;
765
       if (topo_in && !topo_out) {
766
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
767
       } else if (!topo_in && topo_out) {
768
               removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
769
       } else if (topo_in && topo_out) {
770
               removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
771
       }
772
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
773
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
774
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
775
          add_to_blacklist(node_addr(toremoves[i]));
776
          }
777
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
778
       if (!removed_already) {
779
          remove_peer(toremoves[i], topo_out, topo_in);
780
          }
781
    }
782
    fprintf(stderr,"Topo remove end\n");
783

    
784
    if (!topo_out) {
785
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
786
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
787
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
788
      }
789
      for (i = 0; i < oldids_size; i++) {
790
        nodeid_free(oldids[i]);
791
      }
792
    }
793
    gettimeofday(&last_time_updated_peers, NULL); 
794
  } 
795
  reg_outgoing_neigh_size(peerset_size(pset_outgoing));
796
  reg_incoming_neigh_size(peerset_size(pset_incoming));
797
}
798

    
799
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
800
{
801
  struct peer *p = peerset_get_peer(pset_incoming, id);
802
  if (!p) {
803
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
804
    if (reg) {
805
      add_peer(id,NULL, false, true);
806
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
807
      p = peerset_get_peer(pset_incoming,id);
808
    }
809
  }
810

    
811
  return p;
812
}
813

    
814
struct peer *nodeid_to_peer_incoming(const struct nodeID* id, int reg)
815
{
816
  struct peer *p = peerset_get_peer(pset_incoming, id);
817
  if (!p) {
818
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
819
    if (reg) {
820
      add_peer(id,NULL, false, true);
821
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
822
      p = peerset_get_peer(pset_incoming,id);
823
    }
824
  }
825

    
826
  return p;
827
}
828

    
829
struct peer *nodeid_to_peer_outgoing(const struct nodeID* id, int reg)
830
{
831
  struct peer *p = peerset_get_peer(pset_outgoing, id);
832
  if (!p) {
833
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
834
    if (reg) {
835
      add_peer(id,NULL, false, true);
836
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
837
      p = peerset_get_peer(pset_incoming,id);
838
    }
839
  }
840

    
841
  return p;
842
}
843

    
844
int peers_init(void)
845
{
846
  fprintf(stderr,"peers_init\n");
847
  pset_incoming = peerset_init(0);
848
  pset_outgoing = peerset_init(0);
849
  return pset_incoming && pset_outgoing ? 1 : 0;
850
}
851

    
852
struct peerset *get_peers(void)
853
{
854
  return pset_outgoing;
855
}
856

    
857
struct peerset *get_outgoing_peers(void)
858
{
859
  return pset_outgoing;
860
}
861

    
862
struct peerset *get_incoming_peers(void)
863
{
864
  return pset_incoming;
865
}