Revision dc99f5c4 src/streaming.c

View differences:

src/streaming.c
57 57
#include "peer_metadata.h"
58 58

  
59 59
#include "scheduler_la.h"
60
#include "grapes_config.h"
60 61

  
61 62
# define CB_SIZE_TIME_UNLIMITED 1e12
62 63

  
......
68 69
  uint16_t hopcount;
69 70
} __attribute__((packed));
70 71

  
72
enum distribution_type {DIST_UNIFORM, DIST_TURBO};
73

  
71 74
struct streaming_context {
72 75
	struct chunk_buffer *cb;
73 76
	struct input_desc *input;
......
80 83
	bool send_bmap_before_push;
81 84
	uint64_t CB_SIZE_TIME;
82 85
	uint32_t chunk_loss_interval;
86
	enum distribution_type dist_type;
87
	int offer_per_period;
83 88
};
84 89

  
85 90
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config)
86 91
{
87 92
	struct streaming_context * stc;
93
	struct tag * tags;
88 94
	static char conf[80];
89 95

  
90 96
	stc = malloc(sizeof(struct streaming_context));
97
	stc->ps = ps;
91 98
	stc->bcast_after_receive_every = 0;
92 99
	stc->neigh_on_chunk_recv = false;
93 100
	stc->send_bmap_before_push = false;
94 101
	stc->transactions = NULL;
95 102
	stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;	//in millisec, defaults to unlimited
96 103
	stc->chunk_loss_interval = 0;  // disable self-lossy feature (for experiments)
104
	stc->dist_type = DIST_UNIFORM;
97 105

  
98
	stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
106
	tags = grapes_config_parse(config);
107
	if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0)
108
		stc->dist_type = DIST_TURBO;
109
	grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1);
110
	free(tags);
99 111

  
100
	stc->ps = ps;
112
	stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
101 113
	stc->cb_size = psinstance_chunkbuffer_size(ps);
102 114
	sprintf(conf, "size=%d", stc->cb_size);
103 115
	stc->cb = cb_init(conf);
116
	stc->ch_locks = chunk_locks_create();
117

  
104 118
	chunkDeliveryInit(psinstance_nodeid(ps));
105 119
	chunkSignalingInit(psinstance_nodeid(ps));
106
	stc->ch_locks = chunk_locks_create();
107 120
	return stc;
108 121
}
109 122

  
......
552 565
  return 1;
553 566
}
554 567

  
568
double peerWeightNeigh(struct peer **n){
569
	double res;
570
	res = (double)(((struct metadata *)(*n)->metadata)->neigh_size);
571
	if (res)
572
		return res;
573
	return 1;
574
}
575

  
576
double peerWeightInvNeigh(struct peer **n){
577
	double res;
578
	res = peerWeightNeigh(n);
579
	return 1/res;
580
}
581

  
555 582
double peerWeightLoss(struct peer **n){
556 583
  return 1;
557 584
}
......
723 750

  
724 751
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
725 752
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
726
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
753
    if (stc->dist_type == DIST_TURBO)
754
	    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh);
755
    else
756
	    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform);
727 757

  
728 758
    for (i=0; i<selectedpeers_len ; i++){
729 759
      int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
......
865 895

  
866 896
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
867 897
 	dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
868
 	selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
898
	if (stc->dist_type == DIST_TURBO)
899
		selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh);
900
	else
901
		selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
869 902

  
870 903
	selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
871 904
	for ( i=0; i<selectedpairs_len; i++)
......
943 976
    }
944 977
  }
945 978
}
979

  
980
suseconds_t streaming_offer_interval(const struct streaming_context *stc)
981
{
982
	struct peerset *pset;
983
	const struct peer * p;
984
	int i;
985
	double load = 0;
986
	suseconds_t offer_int;
987

  
988
	offer_int = chunk_interval_measure(psinstance_measures(stc->ps));
989
	
990
	if (stc->dist_type == DIST_TURBO) {
991
		pset = topology_get_neighbours(psinstance_topology(stc->ps));
992
		peerset_for_each(pset,p,i)
993
			load += peerWeightInvNeigh((struct peer **)&p);
994
		offer_int /= load;
995
	}
996

  
997
	return offer_int / stc->offer_per_period;
998
}

Also available in: Unified diff