Revision 56d5986f

View differences:

Makefile
6 6
pstreamer: pstreamer.c $(LIBPS) $(LIBGRAPES)
7 7
	cc pstreamer.c -o pstreamer -I $(GRAPES)/include -I include/ -l pstreamer -L src -l grapes -L $(GRAPES)/src
8 8

  
9
tests: $(LIBPS)
10
	GRAPES=$(GRAPES) $(MAKE) -C test/
11
	test/run_tests.sh
12

  
9 13
$(LIBPS): $(LIBPS_SRC)
10
	$(MAKE) -C src/
14
	GRAPES=$(GRAPES) $(MAKE) -C src/
11 15

  
12 16
$(LIBGRAPES):
13 17
	$(MAKE) -C $(GRAPES)
src/chunk_signaling.c
62 62
  // rc_reg_ack(trans_id);
63 63
}
64 64

  
65
void bmap_received(const struct psinstance * ps, struct nodeID *fromid, struct nodeID *ownerid, struct chunkID_set *c_set, int cb_size, uint16_t trans_id) {
65
void bmap_received(const struct psinstance * ps, struct nodeID *fromid, struct nodeID *ownerid, struct chunkID_set *c_set, int maxdeliver, uint16_t trans_id) {
66 66
  struct peer *owner;
67 67
  if (nodeid_equal(fromid, ownerid)) {
68 68
    owner = nodeid_to_peer(psinstance_topology(ps), ownerid, neigh_on_sign_recv);
......
75 75
  if (owner) {	//now we have it almost sure
76 76
    chunkID_set_clear(owner->bmap,0);	//TODO: some better solution might be needed to keep info about chunks we sent in flight.
77 77
    chunkID_set_union(owner->bmap,c_set);
78
    owner->cb_size = cb_size;
79 78
    gettimeofday(&owner->bmap_timestamp, NULL);
80 79
  }
81 80
}
......
149 148
    }
150 149
    switch (sig_type) {
151 150
        case sig_send_buffermap:
152
          bmap_received(ps, fromid, ownerid, c_set, max_deliver, trans_id); //FIXME: cb_size has gone from signaling
151
          bmap_received(ps, fromid, ownerid, c_set, max_deliver, trans_id); 
153 152
          break;
154 153
        case sig_offer:
155 154
          offer_received(ps, fromid, c_set, max_deliver, trans_id);
src/peer_metadata.c
1
#include<peer_metadata.h>
2
#include<malloc.h>
3
#include<string.h>
4

  
5
int8_t metadata_update(struct metadata *m, uint16_t cb_size, uint8_t neigh_size)
6
{
7
	if (m)
8
	{
9
		m->cb_size = cb_size;
10
		m->neigh_size = neigh_size;
11
		return 0;
12
	}
13
	return -1;
14
}
15

  
16
int8_t peer_set_metadata(struct  peer *p, const struct metadata *m)
17
{
18
	if (p && m)
19
	{
20
		if (!(p->metadata))
21
			p->metadata = malloc(sizeof(struct metadata));
22
		memmove(p->metadata, m, sizeof(struct metadata));
23
		return 0;
24
	}
25
	return -1;
26
}
27

  
28
uint16_t peer_cb_size(const struct peer *p)
29
{
30
	if (p && p->metadata)
31
		return ((struct metadata *)p->metadata)->cb_size;
32
	return DEFAULT_PEER_CBSIZE;
33
}
src/peer_metadata.h
1
#ifndef __PEER_METADATA_H__
2
#define __PEER_METADATA_H__
3

  
4
#include<stdint.h>
5
#include<peer.h>
6

  
7
#define DEFAULT_PEER_CBSIZE 50
8

  
9
struct metadata {
10
  uint16_t cb_size;
11
  uint8_t neigh_size;
12
} __attribute__((packed));
13

  
14

  
15
int8_t metadata_update(struct metadata *m, uint16_t cb_size, uint8_t neigh_size);
16

  
17
int8_t peer_set_metadata(struct  peer *p, const struct metadata *m);
18

  
19
uint16_t peer_cb_size(const struct peer *p);
20

  
21
#endif
src/streaming.c
54 54
#include "net_helpers.h"
55 55
#include "scheduling.h"
56 56
#include "transaction.h"
57
#include "peer_metadata.h"
57 58

  
58 59
#include "scheduler_la.h"
59 60

  
......
478 479
//	fprintf(stderr,"[DEBUG] Evaluating Peer %s, CB_SIZE: %d\n",node_addr_tr(n->id),p->cb_size); // DEBUG
479 480
//	print_chunkID_set(p->bmap);																					// DEBUG
480 481

  
481
  return _needs(stc, p->bmap, p->cb_size, cid);
482
  return _needs(stc, p->bmap, peer_cb_size(p), cid);
482 483
}
483 484

  
484 485
/**
......
525 526
{
526 527
	int min;
527 528

  
528
	if (p->cb_size == 0) // it does not have capacity
529
	if (peer_cb_size(p) == 0) // it does not have capacity
529 530
		return 0;
530 531
	if (chunkID_set_check(p->bmap,cid) < 0)  // not in bmap
531 532
	{
532
		if(p->cb_size > chunkID_set_size(p->bmap)) // it has room for chunks anyway
533
		if(peer_cb_size(p) > chunkID_set_size(p->bmap)) // it has room for chunks anyway
533 534
		{
534
			min = chunkID_set_get_earliest(p->bmap) - p->cb_size + chunkID_set_size(p->bmap);
535
			min = chunkID_set_get_earliest(p->bmap) - peer_cb_size(p) + chunkID_set_size(p->bmap);
535 536
			min = min < 0 ? 0 : min;
536 537
			if (cid >= min)
537 538
				return 1;
......
728 729
      int transid = transaction_create(stc->transactions, selectedpeers[i]->id);
729 730
      int max_deliver = offer_max_deliver(stc, selectedpeers[i]->id);
730 731
      struct chunkID_set *offer_cset = compose_offer_cset(stc, selectedpeers[i]);
731
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), selectedpeers[i]->cb_size);
732
      dprintf("\t sending offer(%d) to %s, cb_size: %d\n", transid, nodeid_static_str(selectedpeers[i]->id), peer_cb_size(selectedpeers[i]));
732 733
      offerChunks(selectedpeers[i]->id, offer_cset, max_deliver, transid++);
733 734
#ifdef LOG_SIGNAL
734 735
			log_signal(psinstance_nodeid(stc->ps),selectedpeers[i]->id,chunkID_set_size(offer_cset),transid,sig_offer,"SENT");
src/topology.c
30 30
#include <peersampler.h>
31 31
#include <peer.h>
32 32
#include <grapes_msg_types.h>
33
#include<grapes_config.h>
33 34
//
34 35
#include "compatibility/timer.h"
35 36
//
......
38 39
#include "dbg.h"
39 40
#include "measures.h"
40 41
#include "streaming.h"
42
#include "peer_metadata.h"
41 43

  
42 44
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
43 45
#define NEIGHBOURHOOD_ADD 0
44 46
#define NEIGHBOURHOOD_REMOVE 1
45
#define DEFAULT_PEER_CBSIZE 50
46 47

  
47 48
#ifndef NAN	//NAN is missing in some old math.h versions
48 49
#define NAN            (0.0/0.0)
49 50
#endif
50 51

  
51
struct metadata {
52
  uint16_t cb_size;
53
} __attribute__((packed));
54

  
55 52
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
56 53

  
57 54
struct topology {
58
	double desired_bw;
59
	double desired_rtt;
60
	double alpha_target;
61 55
	double topo_mem;
62 56
	bool topo_out;
63 57
	bool topo_in;
64
	bool topo_keep_best;
65
	bool topo_add_best;
66 58
	int neighbourhood_target_size;
67 59
	struct timeval tout_bmap;
68 60
	struct metadata my_metadata;	
......
91 83

  
92 84
void update_metadata(struct topology * t)
93 85
{
94
	t->my_metadata.cb_size = psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps);
86
	metadata_update(&(t->my_metadata), 
87
			psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps),
88
			peerset_size(t->neighbourhood));
95 89
}
96 90

  
97 91
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id)
......
105 99

  
106 100
int topology_init(struct topology * t, const struct psinstance * ps, const char *config)
107 101
{
102
	struct tag * tags;
103

  
104
	tags = grapes_config_parse(config);
105

  
108 106
	bind_msg_type(MSG_TYPE_NEIGHBOURHOOD);
109 107
	bind_msg_type(MSG_TYPE_TOPOLOGY);
110 108
	t->tout_bmap.tv_sec = 20;
......
115 113
	t->swarm_bucket = peerset_init(0);
116 114
	t->locked_neighs = peerset_init(0);
117 115

  
118
	t->desired_bw = 0;	//TODO: turn on capacity measurement and set meaningful default value
119
	t->desired_rtt = 0.2;
120
	t->alpha_target = 0.4;
121 116
	t->topo_mem = 0.7;
122 117
	t->topo_out = true; //peer selects out-neighbours
123 118
	t->topo_in = true; //peer selects in-neighbours (combined means bidirectional)
124
	t->topo_keep_best = false;
125
	t->topo_add_best = false;
126
	t->neighbourhood_target_size = 30;
119
	grapes_config_value_int_default(tags, "neighbourhood_size", &(t->neighbourhood_target_size), 30);
127 120

  
128 121
	update_metadata(t);
129 122
	t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config);
130 123
	
131
  //fprintf(stderr,"[DEBUG] done with topology init\n");
124
	free(tags);
132 125
	return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0;
133 126
}
134 127

  
......
149 142
	return psample_add_peer(t->tc,id,&m,sizeof(m));
150 143
}
151 144

  
152
void topology_peer_set_metadata(struct  peer *p, const struct metadata *m)
153
{
154
	if (p)
155
	{
156
		if (m)
157
		{
158
			p->cb_size = m->cb_size;
159
		}
160
		else
161
		{
162
			p->cb_size = DEFAULT_PEER_CBSIZE;
163
		}
164

  
165
	}
166
}
167 145

  
168 146
struct peer * neighbourhood_add_peer(struct topology * t, const struct nodeID *id)
169 147
{
......
211 189
			if (len >= (sizeof(struct metadata) + 1))
212 190
			{
213 191
				memmove(&m,buff+1,sizeof(struct metadata));
214
				topology_peer_set_metadata(p,&m);
192
				peer_set_metadata(p,&m);
215 193
			}
216 194
			break;
217 195

  
......
264 242
		}
265 243
		else
266 244
			//fprintf(stderr,"[DEBUG] OLD PEER!\n");
267
		topology_peer_set_metadata(p,&(sample_metas[i]));	
245
		peer_set_metadata(p,&(sample_metas[i]));	
268 246
	}
269 247
}
270 248

  
......
311 289
}
312 290

  
313 291
double get_capacity_of(struct topology *t, const struct nodeID* n){
314
  struct peer *p = topology_get_peer(t, n);
315
  if (p) {
316
    return p->capacity;
317
  }
318

  
319 292
  return NAN;
320 293
}
321 294

  
......
388 361
		memmove(peers,const_peers,peers_num*sizeof(struct peer*));
389 362

  
390 363
	if (criterion != PEER_CHOICE_RANDOM && cmp_peer != NULL) {
391
    //fprintf(stderr,"[DEBUG] choosen the qsort\n");
392 364
		qsort(peers, peers_num, sizeof(struct peer*), cmp_peer);
393 365
	} else {
394 366
    array_shuffle(peers, peers_num, sizeof(struct peer *));
......
490 462
		free(*t);
491 463
	}
492 464
}
465

  
466
uint8_t topology_peer_cbsize(const struct topology *t, const struct peer * p)
467
{
468
}

Also available in: Unified diff