Revision dc99f5c4

View differences:

src/measures.c
27 27

  
28 28
struct chunk_interval_estimate {
29 29
	uint32_t first_index;
30
	uint32_t last_index;
30 31
	uint64_t first_timestamp;
31 32
	suseconds_t chunk_interval;
32 33
	enum data_state state;
......
90 91
		switch (m->cie.state) {
91 92
			case deinit:
92 93
				m->cie.first_index = cid;
94
				m->cie.last_index = cid;
93 95
				m->cie.first_timestamp = ctimestamp;
94 96
				m->cie.state = loading;
95 97
				break;
96 98
			case loading:
97 99
			case ready:
98
				if (cid > (int64_t)m->cie.first_index)
100
				if (cid > (int64_t)m->cie.last_index)
101
				{
99 102
					m->cie.chunk_interval = ((ctimestamp - m->cie.first_timestamp))/(cid - m->cie.first_index);
103
					m->cie.last_index = cid;
104
				}
100 105
				m->cie.state = ready;
101 106
				break;
102 107
		}
src/peer_metadata.c
1
/*
2
 * Copyright (c) 2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20

  
1 21
#include<peer_metadata.h>
2 22
#include<malloc.h>
3 23
#include<string.h>
src/peer_metadata.h
1
/*
2
 * Copyright (c) 2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20

  
1 21
#ifndef __PEER_METADATA_H__
2 22
#define __PEER_METADATA_H__
3 23

  
src/psinstance.c
261 261
				else
262 262
					received_chunk(ps->streaming, remote, buff, len);
263 263
				res = 2;
264
				ps->chunk_offer_interval = chunk_interval_measure(ps->measure);
265 264
				break;
266 265
			case MSG_TYPE_SIGNALLING:
267 266
				dtprintf("Sign message received:\n");
......
272 271
				fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
273 272
				res = -2;
274 273
		}
274
		ps->chunk_offer_interval = streaming_offer_interval(ps->streaming);
275 275

  
276 276
	if (remote)
277 277
		nodeid_free(remote);
src/streaming.c
57 57
#include "peer_metadata.h"
58 58

  
59 59
#include "scheduler_la.h"
60
#include "grapes_config.h"
60 61

  
61 62
# define CB_SIZE_TIME_UNLIMITED 1e12
62 63

  
......
68 69
  uint16_t hopcount;
69 70
} __attribute__((packed));
70 71

  
72
enum distribution_type {DIST_UNIFORM, DIST_TURBO};
73

  
71 74
struct streaming_context {
72 75
	struct chunk_buffer *cb;
73 76
	struct input_desc *input;
......
80 83
	bool send_bmap_before_push;
81 84
	uint64_t CB_SIZE_TIME;
82 85
	uint32_t chunk_loss_interval;
86
	enum distribution_type dist_type;
87
	int offer_per_period;
83 88
};
84 89

  
85 90
struct streaming_context * streaming_create(const struct psinstance * ps, struct input_context * inc, const char * config)
86 91
{
87 92
	struct streaming_context * stc;
93
	struct tag * tags;
88 94
	static char conf[80];
89 95

  
90 96
	stc = malloc(sizeof(struct streaming_context));
97
	stc->ps = ps;
91 98
	stc->bcast_after_receive_every = 0;
92 99
	stc->neigh_on_chunk_recv = false;
93 100
	stc->send_bmap_before_push = false;
94 101
	stc->transactions = NULL;
95 102
	stc->CB_SIZE_TIME = CB_SIZE_TIME_UNLIMITED;	//in millisec, defaults to unlimited
96 103
	stc->chunk_loss_interval = 0;  // disable self-lossy feature (for experiments)
104
	stc->dist_type = DIST_UNIFORM;
97 105

  
98
	stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
106
	tags = grapes_config_parse(config);
107
	if (strcmp(grapes_config_value_str_default(tags, "dist_type", ""), "turbo") == 0)
108
		stc->dist_type = DIST_TURBO;
109
	grapes_config_value_int_default(tags, "offer_per_period", &(stc->offer_per_period), 1);
110
	free(tags);
99 111

  
100
	stc->ps = ps;
112
	stc->input = inc ? input_open(inc->filename, inc->fds, inc->fds_size, config) : NULL;
101 113
	stc->cb_size = psinstance_chunkbuffer_size(ps);
102 114
	sprintf(conf, "size=%d", stc->cb_size);
103 115
	stc->cb = cb_init(conf);
116
	stc->ch_locks = chunk_locks_create();
117

  
104 118
	chunkDeliveryInit(psinstance_nodeid(ps));
105 119
	chunkSignalingInit(psinstance_nodeid(ps));
106
	stc->ch_locks = chunk_locks_create();
107 120
	return stc;
108 121
}
109 122

  
......
552 565
  return 1;
553 566
}
554 567

  
568
double peerWeightNeigh(struct peer **n){
569
	double res;
570
	res = (double)(((struct metadata *)(*n)->metadata)->neigh_size);
571
	if (res)
572
		return res;
573
	return 1;
574
}
575

  
576
double peerWeightInvNeigh(struct peer **n){
577
	double res;
578
	res = peerWeightNeigh(n);
579
	return 1/res;
580
}
581

  
555 582
double peerWeightLoss(struct peer **n){
556 583
  return 1;
557 584
}
......
723 750

  
724 751
    for (i = 0;i < size; i++) chunkids[size - 1 - i] = (buff+i)->id;
725 752
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i];
726
    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, SCHED_PEER);
753
    if (stc->dist_type == DIST_TURBO)
754
	    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightInvNeigh);
755
    else
756
	    selectPeersForChunks(SCHED_WEIGHTING, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, SCHED_NEEDS, peerWeightUniform);
727 757

  
728 758
    for (i=0; i<selectedpeers_len ; i++){
729 759
      int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
......
865 895

  
866 896
  //SCHED_TYPE(SCHED_WEIGHTING, peers, peers_num, &(target_chunk->id), 1, selectedpairs, &selectedpairs_len, SCHED_NEEDS, peer_evaluation, SCHED_CHUNK);
867 897
 	dst_peers = (struct peer **) malloc(sizeof(struct peer* ) *  multiplicity);
868
 	selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
898
	if (stc->dist_type == DIST_TURBO)
899
		selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peerWeightNeigh);
900
	else
901
		selectPeersForChunks(SCHED_WEIGHTING, peers, peers_num, (int *)&(target_chunk->id) , 1, dst_peers, &selectedpairs_len, NULL, peer_evaluation);
869 902

  
870 903
	selectedpairs = (struct PeerChunk *)  malloc(sizeof(struct PeerChunk) * selectedpairs_len);
871 904
	for ( i=0; i<selectedpairs_len; i++)
......
943 976
    }
944 977
  }
945 978
}
979

  
980
suseconds_t streaming_offer_interval(const struct streaming_context *stc)
981
{
982
	struct peerset *pset;
983
	const struct peer * p;
984
	int i;
985
	double load = 0;
986
	suseconds_t offer_int;
987

  
988
	offer_int = chunk_interval_measure(psinstance_measures(stc->ps));
989
	
990
	if (stc->dist_type == DIST_TURBO) {
991
		pset = topology_get_neighbours(psinstance_topology(stc->ps));
992
		peerset_for_each(pset,p,i)
993
			load += peerWeightInvNeigh((struct peer **)&p);
994
		offer_int /= load;
995
	}
996

  
997
	return offer_int / stc->offer_per_period;
998
}
src/streaming.h
60 60

  
61 61
void streaming_destroy(struct streaming_context ** stc);
62 62

  
63
suseconds_t streaming_offer_interval(const struct streaming_context *stc);
64

  
63 65
#endif	/* STREAMING_H */
src/topology.c
78 78
	if(name) fprintf(stderr,"%s\n",name);
79 79
	if(pset)
80 80
		peerset_for_each(pset,p,i)
81
			fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
81
			fprintf(stderr, "\t%s, cbsize: %d, neighsize: %d\n", nodeid_static_str(p->id), peer_cb_size(p), peer_neigh_size(p));
82 82
}
83 83

  
84 84
void update_metadata(struct topology * t)
......
86 86
	metadata_update(&(t->my_metadata), 
87 87
			psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps),
88 88
			peerset_size(t->neighbourhood));
89
	psample_change_metadata(t->tc, &(t->my_metadata), sizeof(struct metadata));
89 90
}
90 91

  
91 92
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id)
......
118 119
	t->topo_in = true; //peer selects in-neighbours (combined means bidirectional)
119 120
	grapes_config_value_int_default(tags, "neighbourhood_size", &(t->neighbourhood_target_size), DEFAULT_PEER_NEIGH_SIZE);
120 121

  
121
	update_metadata(t);
122 122
	t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config);
123
	update_metadata(t);
123 124
	
124 125
	free(tags);
125 126
	return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0;
......
240 241
			peerset_add_peer(t->swarm_bucket,sample_nodes[i]);
241 242
			p = topology_get_peer(t, sample_nodes[i]);
242 243
		}
243
		else
244
			//fprintf(stderr,"[DEBUG] OLD PEER!\n");
245 244
		peer_set_metadata(p,&(sample_metas[i]));	
245
		fprintf(stderr,"[DEBUG] sampled node: %s, cbsize: %d, neighsize: %d\n",nodeid_static_str(sample_nodes[i]), sample_metas[i].cb_size, sample_metas[i].neigh_size);
246 246
	}
247 247
}
248 248

  
......
262 262
    if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
263 263
         ( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <)     )   ) {
264 264
      dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood));
265
      //if (peerset_size(t->neighbourhood) > 1) {	// avoid dropping our last link to the world
266
      topology_blacklist_add(t, peers[i]->id);
267
      neighbourhood_remove_peer(t, peers[i]->id);
268
      //}
265
      if (peerset_size(t->neighbourhood) > 1) {	// avoid dropping our last link to the world
266
	      topology_blacklist_add(t, peers[i]->id);
267
	      neighbourhood_remove_peer(t, peers[i]->id);
268
      }
269 269
    }
270 270
  }
271 271
	
......
424 424
void topology_update(struct topology * t)
425 425
{
426 426
	struct peerset * old_neighs;
427
  const struct peer * p;
428
  int i;
427
	const struct peer * p;
428
	int i;
429 429

  
430
  psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
430
	psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
431 431

  
432 432
	update_metadata(t);
433 433
	topology_sample_peers(t);
434 434

  
435
  if timerisset(&(t->tout_bmap) )
435
	if timerisset(&(t->tout_bmap) )
436 436
		neighbourhood_drop_unactives(t, &(t->tout_bmap));
437 437

  
438 438
	old_neighs = peerset_create_reference_copy(t->neighbourhood);
439 439

  
440
    topology_update_random(t);
440
	topology_update_random(t);
441 441

  
442
  topology_signal_change(t, old_neighs);
442
	topology_signal_change(t, old_neighs);
443 443
	peerset_destroy_reference_copy(&old_neighs);
444 444

  
445
    peerset_for_each(t->swarm_bucket,p,i)
446
      peerset_pop_peer(t->locked_neighs,p->id);
447
    peerset_clear(t->swarm_bucket,0);  // we don't remember past peers
445
	peerset_for_each(t->swarm_bucket,p,i)
446
		peerset_pop_peer(t->locked_neighs,p->id);
447
	peerset_clear(t->swarm_bucket,0);  // we don't remember past peers
448 448
}
449 449

  
450 450
void topology_destroy(struct topology **t)

Also available in: Unified diff