streamers / streaming.c @ 22ebd96d
History | View | Annotate | Download (5.12 KB)
1 |
#include <stdlib.h> |
---|---|
2 |
#include <stdio.h> |
3 |
#include <stdint.h> |
4 |
|
5 |
#include <net_helper.h> |
6 |
#include <chunk.h> |
7 |
#include <chunkbuffer.h> |
8 |
#include <trade_msg_la.h> |
9 |
#include <trade_msg_ha.h> |
10 |
#include <peerset.h> |
11 |
#include <peer.h> |
12 |
#include <chunkidset.h> |
13 |
|
14 |
#include "streaming.h" |
15 |
#include "output.h" |
16 |
#include "input.h" |
17 |
#include "dbg.h" |
18 |
#include "chunk_signaling.h" |
19 |
|
20 |
#include "scheduler_la.h" |
21 |
|
22 |
static struct chunk_buffer *cb; |
23 |
static struct input_desc *input; |
24 |
static int cb_size; |
25 |
|
26 |
void stream_init(int size, struct nodeID *myID) |
27 |
{ |
28 |
char conf[32]; |
29 |
|
30 |
cb_size = size; |
31 |
|
32 |
sprintf(conf, "size=%d", cb_size);
|
33 |
cb = cb_init(conf); |
34 |
chunkInit(myID); |
35 |
} |
36 |
|
37 |
int source_init(const char *fname, struct nodeID *myID) |
38 |
{ |
39 |
input = input_open(fname); |
40 |
if (input == NULL) { |
41 |
return -1; |
42 |
} |
43 |
|
44 |
stream_init(1, myID);
|
45 |
return 0; |
46 |
} |
47 |
|
48 |
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf) |
49 |
{ |
50 |
struct chunk *chunks;
|
51 |
int num_chunks, i;
|
52 |
struct chunkID_set *my_bmap = chunkID_set_init(0); |
53 |
chunks = cb_get_chunks(chbuf, &num_chunks); |
54 |
|
55 |
for(i=0; i<num_chunks; i++) { |
56 |
chunkID_set_add_chunk(my_bmap, chunks[i].id); |
57 |
} |
58 |
return my_bmap;
|
59 |
} |
60 |
|
61 |
void send_bmap(struct peer *to) |
62 |
{ |
63 |
struct chunkID_set *my_bmap = cb_to_bmap(cb);
|
64 |
sendMyBufferMap(to->id, my_bmap, cb_size, 0);
|
65 |
|
66 |
chunkID_set_clear(my_bmap,0);
|
67 |
free(my_bmap); |
68 |
} |
69 |
|
70 |
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len) |
71 |
{ |
72 |
int res;
|
73 |
static struct chunk c; |
74 |
struct peer *p;
|
75 |
|
76 |
res = decodeChunk(&c, buff + 1, len - 1); |
77 |
if (res > 0) { |
78 |
dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
|
79 |
output_deliver(&c); |
80 |
res = cb_add_chunk(cb, &c); |
81 |
if (res < 0) { |
82 |
dprintf("\tchunk too old, buffer full with newer chunks\n");
|
83 |
free(c.data); |
84 |
free(c.attributes); |
85 |
} |
86 |
p = peerset_get_peer(pset,from); |
87 |
if (!p) {
|
88 |
fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
|
89 |
peerset_add_peer(pset,from); |
90 |
p = peerset_get_peer(pset,from); |
91 |
} |
92 |
if (p) { //now we have it almost sure |
93 |
chunkID_set_add_chunk(p->bmap,c.id); //don't send it back
|
94 |
send_bmap(p); //send explicit ack
|
95 |
} |
96 |
} |
97 |
} |
98 |
|
99 |
int generated_chunk(suseconds_t *delta)
|
100 |
{ |
101 |
int res;
|
102 |
struct chunk c;
|
103 |
|
104 |
*delta = input_get(input, &c); |
105 |
if (*delta < 0) { |
106 |
fprintf(stderr, "Error in input!\n");
|
107 |
exit(-1);
|
108 |
} |
109 |
if (c.data == NULL) { |
110 |
return 0; |
111 |
} |
112 |
res = cb_add_chunk(cb, &c); |
113 |
if (res < 0) { |
114 |
free(c.data); |
115 |
free(c.attributes); |
116 |
} |
117 |
|
118 |
return 1; |
119 |
} |
120 |
|
121 |
/**
|
122 |
*example function to filter chunks based on whether a given peer needs them.
|
123 |
*
|
124 |
* Looks at buffermap information received about the given peer.
|
125 |
*/
|
126 |
int needs(struct peer *p, struct chunk *c){ |
127 |
dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
|
128 |
if (! p->bmap) {
|
129 |
dprintf("no bmap\n");
|
130 |
return 1; // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!) |
131 |
} |
132 |
|
133 |
if (chunkID_set_check(p->bmap,c->id) < 0) { //it might need the chunk |
134 |
int missing, min;
|
135 |
//@TODO: add some bmap_timestamp based logic
|
136 |
|
137 |
if (chunkID_set_size(p->bmap) == 0) { |
138 |
dprintf("bmap empty\n");
|
139 |
return 1; // if the bmap seems empty, it needs the chunk |
140 |
} |
141 |
missing = p->cb_size - chunkID_set_size(p->bmap); |
142 |
missing = missing < 0 ? 0 : missing; |
143 |
min = chunkID_set_get_chunk(p->bmap,0);
|
144 |
dprintf("%s ... c->id(%d) >= min(%d) - missing(%d) ?\n",(c->id >= min - missing)?"YES":"NO",c->id, min, missing); |
145 |
return (c->id >= min - missing);
|
146 |
} |
147 |
|
148 |
dprintf("has it\n");
|
149 |
return 0; |
150 |
} |
151 |
|
152 |
double randomPeer(struct peer **p){ |
153 |
return 1; |
154 |
} |
155 |
double getChunkTimestamp(struct chunk **c){ |
156 |
return (double) (*c)->timestamp; |
157 |
} |
158 |
|
159 |
|
160 |
void send_chunk(const struct peerset *pset) |
161 |
{ |
162 |
struct chunk *buff;
|
163 |
int size, res, i, n;
|
164 |
struct peer *neighbours;
|
165 |
|
166 |
n = peerset_size(pset); |
167 |
neighbours = peerset_get_peers(pset); |
168 |
dprintf("Send Chunk: %d neighbours\n", n);
|
169 |
if (n == 0) return; |
170 |
buff = cb_get_chunks(cb, &size); |
171 |
dprintf("\t %d chunks in buffer...\n", size);
|
172 |
if (size == 0) return; |
173 |
|
174 |
/************ STUPID DUMB SCHEDULING ****************/
|
175 |
//target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
|
176 |
//c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
|
177 |
/************ /STUPID DUMB SCHEDULING ****************/
|
178 |
|
179 |
/************ USE SCHEDULER ****************/
|
180 |
{ |
181 |
size_t selectedpairs_len = 1;
|
182 |
struct chunk *chunkps[size];
|
183 |
struct peer *peerps[n];
|
184 |
struct PeerChunk selectedpairs[1]; |
185 |
|
186 |
for (i = 0;i < size; i++) chunkps[i] = buff+i; |
187 |
for (i = 0; i<n; i++) peerps[i] = neighbours+i; |
188 |
schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp); |
189 |
/************ /USE SCHEDULER ****************/
|
190 |
|
191 |
for (i=0; i<selectedpairs_len ; i++){ |
192 |
struct peer *p = selectedpairs[i].peer;
|
193 |
struct chunk *c = selectedpairs[i].chunk;
|
194 |
dprintf("\t sending chunk[%d] to ", c->id);
|
195 |
dprintf("%s\n", node_addr(p->id));
|
196 |
|
197 |
send_bmap(p); |
198 |
res = sendChunk(p->id, c); |
199 |
dprintf("\tResult: %d\n", res);
|
200 |
if (res>=0) { |
201 |
chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
|
202 |
} |
203 |
} |
204 |
} |
205 |
} |