Revision 91363912

View differences:

topology.c
58 58
#define STREAMER_TOPOLOGY_MSG_ADD 0
59 59
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
60 60

  
61
static struct peerset *pset;
61
static struct peerset *pset_outgoing;
62
static struct peerset *pset_incoming;
62 63
static struct timeval tout_bmap = {20, 0};
63 64
static int counter = 0;
64 65
static int counter2 = 0;
......
79 80
static struct nodeID ** neighbors;
80 81
static struct timeval last_time_updated_peers;
81 82

  
83
struct nodeID **newids;
84
int newids_size;
85
struct metadata *metas;
86

  
82 87
static void update_metadata(void) {
83
	my_metadata.cb_size = (uint16_t) am_i_source() ? 0 : get_cb_size();
88
	my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
84 89
	my_metadata.cps =  (uint16_t) get_running_offer_threads();
85 90
	my_metadata.recv_delay = (double) get_receive_delay();
86 91
	my_metadata.capacity = (double) get_capacity();
87 92
}
88 93

  
94
static struct metadata get_metadata_for_peer(struct nodeID *id){
95
	int i = 0;
96
	struct metadata to_return;
97
	to_return.cb_size = NAN;
98
	to_return.cps =  NAN;
99
	to_return.recv_delay = NAN;
100
	to_return.capacity = NAN;
101

  
102
	if (newids_size > 0 && id != NULL) {
103
		for (i = 0 ; i < newids_size; i++) {
104
			if (newids[i] != NULL) {
105
				if (nodeid_equal(newids[i],id)){
106
					to_return.cb_size = metas[i].cb_size;
107
					to_return.cps = metas[i].cps;
108
					to_return.recv_delay = metas[i].recv_delay;
109
					to_return.capacity = metas[i].capacity;
110
					return to_return;
111
				}
112
			}
113
		}
114
	return to_return;
115
	}
116
}
117

  
89 118
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
90 119

  
91 120
	double t,p1,p2;
......
190 219
  char msg[2];
191 220
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
192 221
  msg[1] = type;
222
  
193 223
  return send_to_peer(me, dst, msg, 2);
194 224
}
195 225

  
196
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
226
static void add_peer(const struct nodeID *id, const struct metadata *m, bool outgoing, bool incoming)
197 227
{
198
  if (local) {
199
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
200
      peerset_add_peer(pset, id);
201
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
202
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
228
  if (outgoing) {
229
      dtprintf("Adding %s to outgoing neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
230
      peerset_add_peer(pset_outgoing, id);
231
      if (m) peerset_get_peer(pset_outgoing, id)->cb_size = m->cb_size;
232
      if (m) peerset_get_peer(pset_outgoing, id)->capacity = m->capacity;
203 233
      /* add measures here */
204 234
      add_measures(id);
205 235
      send_bmap(id);
206 236
  }
207
  if (remote) {
208
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
237
  if (incoming) {
238
      dtprintf("Topo: explicit add message sent!!! to %s (peers:%d)\n", node_addr(id));
239
      peerset_add_peer(pset_incoming, id);
240
      if (m) peerset_get_peer(pset_incoming, id)->cb_size = m->cb_size;
241
      if (m) peerset_get_peer(pset_incoming, id)->capacity = m->capacity;
209 242
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
210 243
  }
211 244
}
212 245

  
213
static void remove_peer(const struct nodeID *id, bool local, bool remote)
246
static void remove_peer(const struct nodeID *id, bool outgoing, bool incoming)
214 247
{
215
  if (local) {
216
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
248
  if (outgoing) {
249
      dtprintf("Removing %s from outgoing neighbourhood!\n", node_addr(id));
217 250
      /* delete measures here */
218 251
      delete_measures(id);
219
      peerset_remove_peer(pset, id);
252
      peerset_remove_peer(pset_outgoing, id);
220 253
  }
221
  if (remote) {
222
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
254
  if (incoming) {
255
      dtprintf("Topo: explicit remove message sent!!! to %s (peers:%d)\n", node_addr(id));
256
      peerset_remove_peer(pset_incoming, id);
223 257
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
224 258
  }
225 259
}
......
235 269

  
236 270
//get the declared capacity of a node
237 271
static double get_capacity_of(const struct nodeID* n){
238
  struct peer *p = peerset_get_peer(pset, n);
272
  struct peer *p = peerset_get_peer(pset_incoming, n);
239 273
  if (p) {
240 274
    return p->capacity;
241
  }
242

  
275
  } 
276
  p = peerset_get_peer(pset_outgoing, n);
277
  if (p) {
278
    return p->capacity;
279
  } 
243 280
  return NAN;
244 281
}
245 282

  
......
285 322
//returns: 1:yes 0:no -1:unknown
286 323
int desiredness(const struct nodeID* n) {
287 324
  double rtt = get_rtt_of(n);
288
  double bw =  get_capacity_of(n);
289

  
325
  double bw = 0.0;
326
  bw =  get_metadata_for_peer(n).capacity;
327
  
290 328
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
291 329
    return -1;
292 330
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
......
300 338
//returns: 1:yes 0:no -1:unknown
301 339
int desiredness_unbled(const struct nodeID* n) {
302 340
  double rtt = get_rtt_of(n);
303
  double bw =  get_capacity_of(n);
304 341
  bool bled = black_listed(node_addr(n));
342
  double bw =  0.0;
343
  bw =  get_metadata_for_peer(n).capacity;
305 344

  
306 345
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
307 346
    return -1;
......
355 394

  
356 395
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
357 396
  double ra, rb;
358
  ra = -( get_capacity_of(*a) );
359
  rb = -( get_capacity_of(*b) );
397
  ra = -( get_metadata_for_peer(*a).capacity);
398
  rb = -( get_metadata_for_peer(*b).capacity);
399

  
360 400
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
361 401
  else if (isnan(rb) || ra < rb) return -1;
362 402
  else return 1;
......
368 408

  
369 409
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
370 410
  double ra, rb;
371
  ra = -( get_capacity_of(*a)*get_rtt_of(*a) );
372
  rb = -( get_capacity_of(*b)*get_rtt_of(*b) );
411
  ra = -( get_metadata_for_peer(*a).capacity * get_rtt_of(*a));
412
  rb = -( get_metadata_for_peer(*b).capacity * get_rtt_of(*b));
413

  
373 414
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
374 415
  else if (isnan(rb) || ra < rb) return -1;
375 416
  else return 1;
......
444 485

  
445 486
// Use keep best policy specified at command line
446 487
int vcmp_keep_best(const void* a, const void* b) {
447
  if (strcmp(topo_keep_best_policy, "BW") == 0) {
488
  if (strcmp(topo_keep_best_policy, "RTT") == 0) {
489
	  return vcmp_rtt(a, b);
490
  }  else if (strcmp(topo_keep_best_policy, "BW") == 0) {
448 491
	  return vcmp_bw(a, b);
449 492
  }  else if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
450 493
	  return vcmp_rx_bytes_chunks(a, b);
......
462 505
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
463 506
{
464 507
  int n_ids, metasize, i, max_ids;
465
  struct nodeID **newids;
466
  int newids_size;
467
  struct metadata *metas;
468 508
  
469 509
  struct peer *peers;
470 510
  struct timeval tnow, told;
......
474 514
  if timerisset(&tout_bmap) {
475 515
    gettimeofday(&tnow, NULL);
476 516
    timersub(&tnow, &tout_bmap, &told);
477
    peers = peerset_get_peers(pset);
478
    for (i = 0; i < peerset_size(pset); i++) {
517
    peers = peerset_get_peers(pset_incoming);
518
    // Prune incoming inactive peers
519
    for (i = 0; i < peerset_size(pset_incoming); i++) {
479 520
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
480 521
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
481
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
522
        ftprintf(stderr,"Topo: dropping incoming inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset_incoming));
482 523
        //if (peerset_size(pset) > 1) {	// avoid dropping our last link to the world
483 524
        topoAddToBL(peers[i].id);
484
        remove_peer(peers[i--].id, true, true);
525
        remove_peer(peers[i--].id, topo_out, topo_in);
485 526
        //}
486 527
      }
487 528
    }
488 529
  }
489 530

  
490 531
  //handle explicit add/remove messages
491
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
492
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
532
//   if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
533
    if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
534
    fprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
493 535
    if (len != 2) {
494 536
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
495 537
      return;
496 538
    }
497 539
    switch (buff[1]) {
498 540
      case STREAMER_TOPOLOGY_MSG_ADD:
499
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
500
	if (!peerset_get_peer(pset, from)) {
541
	// I have to add this peer to my outgoing neighbourood
542
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
543
	if (!peerset_get_peer(pset_outgoing, from)) {
501 544
		counter2++;
502 545
        	add_peer(from, NULL, true, false);
503 546
		}
504
	if (black_listed(node_addr(from)) && (peerset_get_peer(pset, from)))
505
		remove_peer(from, topo_in, topo_out);
506
	else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset, from) == NULL))
507
		send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
547
// 	if (black_listed(node_addr(from)) && (peerset_get_peer(pset_outgoing, from)))
548
// 		remove_peer(from, topo_in, topo_out);
549
// 	else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset_outgoing, from) == NULL))
550
// 		send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
508 551
        break;
509 552
      case STREAMER_TOPOLOGY_MSG_REMOVE:
510
	if (peerset_get_peer(pset, from) != NULL) {
511
		ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
553
	if (peerset_get_peer(pset_outgoing, from) != NULL) {
554
		ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset_outgoing));
512 555
		remove_peer(from, true, false);
513 556
		}
514 557
        break;
515 558
      default:
516 559
        fprintf(stderr, "Bad streamer topo message received!\n");
517 560
    }
518
    reg_neigh_size(peerset_size(pset));
561
    reg_neigh_size(peerset_size(pset_outgoing));
519 562
    return;
520 563
  }
521 564

  
......
523 566

  
524 567
  // Exit if a gossiping message has been received
525 568
  if (res_topo_parse_data > 0) {
526
    reg_neigh_size(peerset_size(pset));
569
    reg_neigh_size(peerset_size(pset_outgoing));
527 570
    return;
528 571
  }
529 572

  
......
536 579

  
537 580
  // Check if it's time to update the neighbourhood
538 581
  if (time_difference > UPDATE_PEERS_TIMEOUT) {
539
	  
540
    peers = peerset_get_peers(pset);
541
    n_ids = peerset_size(pset);
542 582
    
583
    // If I have to control my sons (--topo_in)
584
    if (topo_in && !topo_out) {
585
	peers = peerset_get_peers(pset_outgoing);
586
	n_ids = peerset_size(pset_outgoing);
587
    } else if (!topo_in && topo_out) { // If I have to control my fathers (--topo_out)
588
	peers = peerset_get_peers(pset_incoming);
589
	n_ids = peerset_size(pset_incoming);
590
    } else if (topo_in && topo_out) { // If topo_bidir, no distinction between sons and fathers, then incoming or outgoing doesn't matter
591
	peers = peerset_get_peers(pset_incoming);
592
	n_ids = peerset_size(pset_incoming);
593
    }
594

  
543 595
    newids = topGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
544 596
    metas = topGetMetadata(&metasize);
545 597
    int index = 0;
......
556 608
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
557 609

  
558 610
    if (topo_out) {
559
      for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
611
      for (i = 0, oldids_size = 0; i < peerset_size(pset_incoming); i++) {
560 612
        oldids[oldids_size++] = peers[i].id;
561 613
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
562 614
      }
......
575 627
    } else {
576 628
      nidset_shuffle(oldids, oldids_size);
577 629
    }
578
    metas = topGetMetadata(&metasize);
579 630
    for (i = 0; i < oldids_size; i++) {
580
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), metas[i].capacity);
631
        dprintf("QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f, BW %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]), get_capacity_of(oldids[i]));
581 632
      }
582 633
    dprintf("QSORT KEEP BEST END\n");
583 634
    
......
596 647
    // Filter out blacklisted ones    
597 648
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
598 649
    
599
    if ((topo_keep_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
650
    if ((topo_add_best) && (strcmp(topo_add_best_policy, "RND") != 0)) {
600 651
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
601 652
    } else {
602 653
      nidset_shuffle(desireds, desireds_size);
603 654
    }
604 655
    for (i = 0; i < desireds_size; i++) {
605
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW*RTT %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), metas[i].capacity, (double) (get_capacity_of(desireds[i])*get_rtt_of(desireds[i])) );
656
        dprintf("QSORT ADD BEST %s - RTT: %f, BW %f, BW*RTT %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), (get_metadata_for_peer(desireds[i])).capacity, (double) ((get_metadata_for_peer(desireds[i])).capacity*get_rtt_of(desireds[i])) );
606 657
      }
607 658
    dprintf("QSORT ADD BEST END\n");
608 659
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
......
638 689
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
639 690
    for (i = 0; i < toremoves_size; i++) {
640 691
       // Check that it has not been removed already
641
       if (peerset_get_peer(pset, toremoves[i]) != NULL ) {
692
       bool removed_already = false;
693
       if (topo_in && !topo_out) {
694
	       removed_already = (peerset_get_peer(pset_outgoing, toremoves[i])!=NULL) ? false : true;
695
       } else if (!topo_in && topo_out) {
696
	       removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
697
       } else if (topo_in && topo_out) {
698
	       removed_already = (peerset_get_peer(pset_incoming, toremoves[i])!=NULL) ? false : true;
699
       }
700
       if (!removed_already) {
642 701
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
643 702
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
644 703
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
645 704
          add_to_blacklist(node_addr(toremoves[i]));
646 705
          }
647 706
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
648
       if (peerset_get_peer(pset, toremoves[i]) != NULL) {
707
       if (!removed_already) {
649 708
          remove_peer(toremoves[i], topo_out, topo_in);
650 709
          }
651 710
       }
......
663 722
    }
664 723
    gettimeofday(&last_time_updated_peers, NULL); 
665 724
  } 
666
  reg_neigh_size(peerset_size(pset));
725
  reg_neigh_size(peerset_size(pset_outgoing));
667 726
}
668 727

  
669 728
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
670 729
{
671
  struct peer *p = peerset_get_peer(pset, id);
730
  struct peer *p = peerset_get_peer(pset_incoming, id);
672 731
  if (!p) {
673 732
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
674 733
    if (reg) {
675
      add_peer(id,NULL, topo_out, false);
676
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
677
      p = peerset_get_peer(pset,id);
734
      add_peer(id,NULL, false, true);
735
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset_incoming));
736
      p = peerset_get_peer(pset_incoming,id);
678 737
    }
679 738
  }
680 739

  
......
684 743
int peers_init(void)
685 744
{
686 745
  fprintf(stderr,"peers_init\n");
687
  pset = peerset_init(0);
688
  return pset ? 1 : 0;
746
  pset_incoming = peerset_init(0);
747
  pset_outgoing = peerset_init(0);
748
  return pset_incoming && pset_outgoing ? 1 : 0;
689 749
}
690 750

  
691 751
struct peerset *get_peers(void)
692 752
{
693
  return pset;
753
  return pset_outgoing;
694 754
}

Also available in: Unified diff