Revision dc99f5c4 src/streaming.c
src/streaming.c | ||
---|---|---|
57 | 57 |
#include "peer_metadata.h" |
58 | 58 |
|
59 | 59 |
#include "scheduler_la.h" |
60 |
#include "grapes_config.h" |
|
60 | 61 |
|
61 | 62 |
# define CB_SIZE_TIME_UNLIMITED 1e12 |
62 | 63 |
|
... | ... | |
68 | 69 |
uint16_t hopcount; |
69 | 70 |
} __attribute__((packed)); |
70 | 71 |
|
72 |
enum distribution_type {DIST_UNIFORM, DIST_TURBO}; |
|
73 |
|
|
71 | 74 |
struct streaming_context { |
72 | 75 |
struct chunk_buffer *cb; |
73 | 76 |
struct input_desc *input; |
... | ... | |
80 | 83 |
bool send_bmap_before_push; |
81 | 84 |
uint64_t CB_SIZE_TIME; |
82 | 85 |
uint32_t chunk_loss_interval; |
86 |
enum distribution_type dist_type; |
|
87 |
int offer_per_period; |
|
83 | 88 |
}; |
84 | 89 |
|
85 | 90 |
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config) |
86 | 91 |
{ |
87 | 92 |
struct streaming_context * stc; |
93 |
struct tag * tags; |
|
88 | 94 |
static char conf[80]; |
89 | 95 |
|
90 | 96 |
stc = malloc(sizeof(struct streaming_context)); |
97 |
stc->ps = ps; |
|
91 | 98 |
stc->bcast_after_receive_every = 0; |
92 | 99 |
stc->neigh_on_chunk_recv = false; |
93 | 100 |
stc->send_bmap_before_push = false; |
94 | 101 |
stc->transactions = NULL; |
95 | 102 |
stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED; //in millisec, defaults to unlimited |
96 | 103 |
stc->chunk_loss_interval = 0; // disable self-lossy feature (for experiments) |
104 |
stc->dist_type = DIST_UNIFORM; |
|
97 | 105 |
|
98 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL; |
|
106 |
tags = grapes_config_parse(config); |
|
107 |
if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0) |
|
108 |
stc->dist_type = DIST_TURBO; |
|
109 |
grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1); |
|
110 |
free(tags); |
|
99 | 111 |
|
100 |
stc->ps = ps;
|
|
112 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
|
|
101 | 113 |
stc->cb_size = psinstance_chunkbuffer_size(ps); |
102 | 114 |
sprintf(conf, "size=%d", stc->cb_size); |
103 | 115 |
stc->cb = cb_init(conf); |
116 |
stc->ch_locks = chunk_locks_create(); |
|
117 |
|
|
104 | 118 |
chunkDeliveryInit(psinstance_nodeid(ps)); |
105 | 119 |
chunkSignalingInit(psinstance_nodeid(ps)); |
106 |
stc->ch_locks = chunk_locks_create(); |
|
107 | 120 |
return stc; |
108 | 121 |
} |
109 | 122 |
|
... | ... | |
552 | 565 |
return 1; |
553 | 566 |
} |
554 | 567 |
|
568 |
double peerWeightNeigh(struct peer **n){ |
|
569 |
double res; |
|
570 |
res = (double)(((struct metadata *)(*n)->metadata)->neigh_size); |
|
571 |
if (res) |
|
572 |
return res; |
|
573 |
return 1; |
|
574 |
} |
|
575 |
|
|
576 |
double peerWeightInvNeigh(struct peer **n){ |
|
577 |
double res; |
|
578 |
res = peerWeightNeigh(n); |
|
579 |
return 1/res; |
|
580 |
} |
|
581 |
|
|
555 | 582 |
double peerWeightLoss(struct peer **n){ |
556 | 583 |
return 1; |
557 | 584 |
} |
... | ... | |
723 | 750 |
|
724 | 751 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
725 | 752 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
726 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
|
753 |
if (stc->dist_type == DIST_TURBO) |
|
754 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh); |
|
755 |
else |
|
756 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform); |
|
727 | 757 |
|
728 | 758 |
for (i=0; i<selectedpeers_len ; i++){ |
729 | 759 |
int transid = transaction_create(stc->transactions, selectedpeers[i]->id); |
... | ... | |
865 | 895 |
|
866 | 896 |
//SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK); |
867 | 897 |
dst_peers = (struct peer **) malloc(sizeof(struct peer* ) * multiplicity); |
868 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
|
898 |
if (stc->dist_type == DIST_TURBO) |
|
899 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh); |
|
900 |
else |
|
901 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
|
869 | 902 |
|
870 | 903 |
selectedpairs = (struct PeerChunk *) malloc(sizeof(struct PeerChunk) * selectedpairs_len); |
871 | 904 |
for ( i=0; i<selectedpairs_len; i++) |
... | ... | |
943 | 976 |
} |
944 | 977 |
} |
945 | 978 |
} |
979 |
|
|
980 |
suseconds_t streaming_offer_interval(const struct streaming_context *stc) |
|
981 |
{ |
|
982 |
struct peerset *pset; |
|
983 |
const struct peer * p; |
|
984 |
int i; |
|
985 |
double load = 0; |
|
986 |
suseconds_t offer_int; |
|
987 |
|
|
988 |
offer_int = chunk_interval_measure(psinstance_measures(stc->ps)); |
|
989 |
|
|
990 |
if (stc->dist_type == DIST_TURBO) { |
|
991 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
|
992 |
peerset_for_each(pset,p,i) |
|
993 |
load += peerWeightInvNeigh((struct peer **)&p); |
|
994 |
offer_int /= load; |
|
995 |
} |
|
996 |
|
|
997 |
return offer_int / stc->offer_per_period; |
|
998 |
} |
Also available in: Unified diff