Revision dc99f5c4
src/measures.c | ||
---|---|---|
27 | 27 |
|
28 | 28 |
struct chunk_interval_estimate { |
29 | 29 |
uint32_t first_index; |
30 |
uint32_t last_index; |
|
30 | 31 |
uint64_t first_timestamp; |
31 | 32 |
suseconds_t chunk_interval; |
32 | 33 |
enum data_state state; |
... | ... | |
90 | 91 |
switch (m->cie.state) { |
91 | 92 |
case deinit: |
92 | 93 |
m->cie.first_index = cid; |
94 |
m->cie.last_index = cid; |
|
93 | 95 |
m->cie.first_timestamp = ctimestamp; |
94 | 96 |
m->cie.state = loading; |
95 | 97 |
break; |
96 | 98 |
case loading: |
97 | 99 |
case ready: |
98 |
if (cid > (int64_t)m->cie.first_index) |
|
100 |
if (cid > (int64_t)m->cie.last_index) |
|
101 |
{ |
|
99 | 102 |
m->cie.chunk_interval = ((ctimestamp - m->cie.first_timestamp))/(cid - m->cie.first_index); |
103 |
m->cie.last_index = cid; |
|
104 |
} |
|
100 | 105 |
m->cie.state = ready; |
101 | 106 |
break; |
102 | 107 |
} |
src/peer_metadata.c | ||
---|---|---|
1 |
/* |
|
2 |
* Copyright (c) 2017 Luca Baldesi |
|
3 |
* |
|
4 |
* This file is part of PeerStreamer. |
|
5 |
* |
|
6 |
* PeerStreamer is free software: you can redistribute it and/or |
|
7 |
* modify it under the terms of the GNU Affero General Public License as |
|
8 |
* published by the Free Software Foundation, either version 3 of the |
|
9 |
* License, or (at your option) any later version. |
|
10 |
* |
|
11 |
* PeerStreamer is distributed in the hope that it will be useful, |
|
12 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero |
|
14 |
* General Public License for more details. |
|
15 |
* |
|
16 |
* You should have received a copy of the GNU Affero General Public License |
|
17 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
* |
|
19 |
*/ |
|
20 |
|
|
1 | 21 |
#include<peer_metadata.h> |
2 | 22 |
#include<malloc.h> |
3 | 23 |
#include<string.h> |
src/peer_metadata.h | ||
---|---|---|
1 |
/* |
|
2 |
* Copyright (c) 2017 Luca Baldesi |
|
3 |
* |
|
4 |
* This file is part of PeerStreamer. |
|
5 |
* |
|
6 |
* PeerStreamer is free software: you can redistribute it and/or |
|
7 |
* modify it under the terms of the GNU Affero General Public License as |
|
8 |
* published by the Free Software Foundation, either version 3 of the |
|
9 |
* License, or (at your option) any later version. |
|
10 |
* |
|
11 |
* PeerStreamer is distributed in the hope that it will be useful, |
|
12 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero |
|
14 |
* General Public License for more details. |
|
15 |
* |
|
16 |
* You should have received a copy of the GNU Affero General Public License |
|
17 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
* |
|
19 |
*/ |
|
20 |
|
|
1 | 21 |
#ifndef __PEER_METADATA_H__ |
2 | 22 |
#define __PEER_METADATA_H__ |
3 | 23 |
|
src/psinstance.c | ||
---|---|---|
261 | 261 |
else |
262 | 262 |
received_chunk(ps->streaming, remote, buff, len); |
263 | 263 |
res = 2; |
264 |
ps->chunk_offer_interval = chunk_interval_measure(ps->measure); |
|
265 | 264 |
break; |
266 | 265 |
case MSG_TYPE_SIGNALLING: |
267 | 266 |
dtprintf("Sign message received:\n"); |
... | ... | |
272 | 271 |
fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
273 | 272 |
res = -2; |
274 | 273 |
} |
274 |
ps->chunk_offer_interval = streaming_offer_interval(ps->streaming); |
|
275 | 275 |
|
276 | 276 |
if (remote) |
277 | 277 |
nodeid_free(remote); |
src/streaming.c | ||
---|---|---|
57 | 57 |
#include "peer_metadata.h" |
58 | 58 |
|
59 | 59 |
#include "scheduler_la.h" |
60 |
#include "grapes_config.h" |
|
60 | 61 |
|
61 | 62 |
# define CB_SIZE_TIME_UNLIMITED 1e12 |
62 | 63 |
|
... | ... | |
68 | 69 |
uint16_t hopcount; |
69 | 70 |
} __attribute__((packed)); |
70 | 71 |
|
72 |
enum distribution_type {DIST_UNIFORM, DIST_TURBO}; |
|
73 |
|
|
71 | 74 |
struct streaming_context { |
72 | 75 |
struct chunk_buffer *cb; |
73 | 76 |
struct input_desc *input; |
... | ... | |
80 | 83 |
bool send_bmap_before_push; |
81 | 84 |
uint64_t CB_SIZE_TIME; |
82 | 85 |
uint32_t chunk_loss_interval; |
86 |
enum distribution_type dist_type; |
|
87 |
int offer_per_period; |
|
83 | 88 |
}; |
84 | 89 |
|
85 | 90 |
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config) |
86 | 91 |
{ |
87 | 92 |
struct streaming_context * stc; |
93 |
struct tag * tags; |
|
88 | 94 |
static char conf[80]; |
89 | 95 |
|
90 | 96 |
stc = malloc(sizeof(struct streaming_context)); |
97 |
stc->ps = ps; |
|
91 | 98 |
stc->bcast_after_receive_every = 0; |
92 | 99 |
stc->neigh_on_chunk_recv = false; |
93 | 100 |
stc->send_bmap_before_push = false; |
94 | 101 |
stc->transactions = NULL; |
95 | 102 |
stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED; //in millisec, defaults to unlimited |
96 | 103 |
stc->chunk_loss_interval = 0; // disable self-lossy feature (for experiments) |
104 |
stc->dist_type = DIST_UNIFORM; |
|
97 | 105 |
|
98 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL; |
|
106 |
tags = grapes_config_parse(config); |
|
107 |
if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0) |
|
108 |
stc->dist_type = DIST_TURBO; |
|
109 |
grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1); |
|
110 |
free(tags); |
|
99 | 111 |
|
100 |
stc->ps = ps;
|
|
112 |
stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
|
|
101 | 113 |
stc->cb_size = psinstance_chunkbuffer_size(ps); |
102 | 114 |
sprintf(conf, "size=%d", stc->cb_size); |
103 | 115 |
stc->cb = cb_init(conf); |
116 |
stc->ch_locks = chunk_locks_create(); |
|
117 |
|
|
104 | 118 |
chunkDeliveryInit(psinstance_nodeid(ps)); |
105 | 119 |
chunkSignalingInit(psinstance_nodeid(ps)); |
106 |
stc->ch_locks = chunk_locks_create(); |
|
107 | 120 |
return stc; |
108 | 121 |
} |
109 | 122 |
|
... | ... | |
552 | 565 |
return 1; |
553 | 566 |
} |
554 | 567 |
|
568 |
double peerWeightNeigh(struct peer **n){ |
|
569 |
double res; |
|
570 |
res = (double)(((struct metadata *)(*n)->metadata)->neigh_size); |
|
571 |
if (res) |
|
572 |
return res; |
|
573 |
return 1; |
|
574 |
} |
|
575 |
|
|
576 |
double peerWeightInvNeigh(struct peer **n){ |
|
577 |
double res; |
|
578 |
res = peerWeightNeigh(n); |
|
579 |
return 1/res; |
|
580 |
} |
|
581 |
|
|
555 | 582 |
double peerWeightLoss(struct peer **n){ |
556 | 583 |
return 1; |
557 | 584 |
} |
... | ... | |
723 | 750 |
|
724 | 751 |
for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id; |
725 | 752 |
for (i = 0; i<n; i++) nodeids[i] = neighbours[i]; |
726 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER); |
|
753 |
if (stc->dist_type == DIST_TURBO) |
|
754 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh); |
|
755 |
else |
|
756 |
selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform); |
|
727 | 757 |
|
728 | 758 |
for (i=0; i<selectedpeers_len ; i++){ |
729 | 759 |
int transid = transaction_create(stc->transactions, selectedpeers[i]->id); |
... | ... | |
865 | 895 |
|
866 | 896 |
//SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK); |
867 | 897 |
dst_peers = (struct peer **) malloc(sizeof(struct peer* ) * multiplicity); |
868 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
|
898 |
if (stc->dist_type == DIST_TURBO) |
|
899 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh); |
|
900 |
else |
|
901 |
selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation); |
|
869 | 902 |
|
870 | 903 |
selectedpairs = (struct PeerChunk *) malloc(sizeof(struct PeerChunk) * selectedpairs_len); |
871 | 904 |
for ( i=0; i<selectedpairs_len; i++) |
... | ... | |
943 | 976 |
} |
944 | 977 |
} |
945 | 978 |
} |
979 |
|
|
980 |
suseconds_t streaming_offer_interval(const struct streaming_context *stc) |
|
981 |
{ |
|
982 |
struct peerset *pset; |
|
983 |
const struct peer * p; |
|
984 |
int i; |
|
985 |
double load = 0; |
|
986 |
suseconds_t offer_int; |
|
987 |
|
|
988 |
offer_int = chunk_interval_measure(psinstance_measures(stc->ps)); |
|
989 |
|
|
990 |
if (stc->dist_type == DIST_TURBO) { |
|
991 |
pset = topology_get_neighbours(psinstance_topology(stc->ps)); |
|
992 |
peerset_for_each(pset,p,i) |
|
993 |
load += peerWeightInvNeigh((struct peer **)&p); |
|
994 |
offer_int /= load; |
|
995 |
} |
|
996 |
|
|
997 |
return offer_int / stc->offer_per_period; |
|
998 |
} |
src/streaming.h | ||
---|---|---|
60 | 60 |
|
61 | 61 |
void streaming_destroy(struct streaming_context ** stc); |
62 | 62 |
|
63 |
suseconds_t streaming_offer_interval(const struct streaming_context *stc); |
|
64 |
|
|
63 | 65 |
#endif /* STREAMING_H */ |
src/topology.c | ||
---|---|---|
78 | 78 |
if(name) fprintf(stderr,"%s\n",name); |
79 | 79 |
if(pset) |
80 | 80 |
peerset_for_each(pset,p,i) |
81 |
fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
|
|
81 |
fprintf(stderr, "\t%s, cbsize: %d, neighsize: %d\n", nodeid_static_str(p->id), peer_cb_size(p), peer_neigh_size(p));
|
|
82 | 82 |
} |
83 | 83 |
|
84 | 84 |
void update_metadata(struct topology * t) |
... | ... | |
86 | 86 |
metadata_update(&(t->my_metadata), |
87 | 87 |
psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps), |
88 | 88 |
peerset_size(t->neighbourhood)); |
89 |
psample_change_metadata(t->tc, &(t->my_metadata), sizeof(struct metadata)); |
|
89 | 90 |
} |
90 | 91 |
|
91 | 92 |
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id) |
... | ... | |
118 | 119 |
t->topo_in = true; //peer selects in-neighbours (combined means bidirectional) |
119 | 120 |
grapes_config_value_int_default(tags, "neighbourhood_size", &(t->neighbourhood_target_size), DEFAULT_PEER_NEIGH_SIZE); |
120 | 121 |
|
121 |
update_metadata(t); |
|
122 | 122 |
t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config); |
123 |
update_metadata(t); |
|
123 | 124 |
|
124 | 125 |
free(tags); |
125 | 126 |
return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0; |
... | ... | |
240 | 241 |
peerset_add_peer(t->swarm_bucket,sample_nodes[i]); |
241 | 242 |
p = topology_get_peer(t, sample_nodes[i]); |
242 | 243 |
} |
243 |
else |
|
244 |
//fprintf(stderr,"[DEBUG] OLD PEER!\n"); |
|
245 | 244 |
peer_set_metadata(p,&(sample_metas[i])); |
245 |
fprintf(stderr,"[DEBUG] sampled node: %s, cbsize: %d, neighsize: %d\n",nodeid_static_str(sample_nodes[i]), sample_metas[i].cb_size, sample_metas[i].neigh_size); |
|
246 | 246 |
} |
247 | 247 |
} |
248 | 248 |
|
... | ... | |
262 | 262 |
if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) || |
263 | 263 |
( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <) ) ) { |
264 | 264 |
dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood)); |
265 |
//if (peerset_size(t->neighbourhood) > 1) { // avoid dropping our last link to the world
|
|
266 |
topology_blacklist_add(t, peers[i]->id); |
|
267 |
neighbourhood_remove_peer(t, peers[i]->id); |
|
268 |
//}
|
|
265 |
if (peerset_size(t->neighbourhood) > 1) { // avoid dropping our last link to the world |
|
266 |
topology_blacklist_add(t, peers[i]->id);
|
|
267 |
neighbourhood_remove_peer(t, peers[i]->id);
|
|
268 |
} |
|
269 | 269 |
} |
270 | 270 |
} |
271 | 271 |
|
... | ... | |
424 | 424 |
void topology_update(struct topology * t) |
425 | 425 |
{ |
426 | 426 |
struct peerset * old_neighs; |
427 |
const struct peer * p;
|
|
428 |
int i;
|
|
427 |
const struct peer * p;
|
|
428 |
int i;
|
|
429 | 429 |
|
430 |
psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
|
|
430 |
psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
|
|
431 | 431 |
|
432 | 432 |
update_metadata(t); |
433 | 433 |
topology_sample_peers(t); |
434 | 434 |
|
435 |
if timerisset(&(t->tout_bmap) )
|
|
435 |
if timerisset(&(t->tout_bmap) )
|
|
436 | 436 |
neighbourhood_drop_unactives(t, &(t->tout_bmap)); |
437 | 437 |
|
438 | 438 |
old_neighs = peerset_create_reference_copy(t->neighbourhood); |
439 | 439 |
|
440 |
topology_update_random(t);
|
|
440 |
topology_update_random(t);
|
|
441 | 441 |
|
442 |
topology_signal_change(t, old_neighs);
|
|
442 |
topology_signal_change(t, old_neighs);
|
|
443 | 443 |
peerset_destroy_reference_copy(&old_neighs); |
444 | 444 |
|
445 |
peerset_for_each(t->swarm_bucket,p,i)
|
|
446 |
peerset_pop_peer(t->locked_neighs,p->id);
|
|
447 |
peerset_clear(t->swarm_bucket,0); // we don't remember past peers
|
|
445 |
peerset_for_each(t->swarm_bucket,p,i)
|
|
446 |
peerset_pop_peer(t->locked_neighs,p->id);
|
|
447 |
peerset_clear(t->swarm_bucket,0); // we don't remember past peers
|
|
448 | 448 |
} |
449 | 449 |
|
450 | 450 |
void topology_destroy(struct topology **t) |
Also available in: Unified diff