Statistics
| Branch: | Revision:

streamers / streaming.c @ 394bac21

History | View | Annotate | Download (8.46 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24
static int cb_size;
25
static int transid=0;
26
struct chunkID_set *lock_set;
27

    
28
int _needs(struct chunkID_set *cset, int cb_size, int cid);
29

    
30
void chunk_lock(int chunkid,struct peer *from){
31
  if (!lock_set) lock_set = chunkID_set_init(16);
32

    
33
  chunkID_set_add_chunk(lock_set, chunkid);
34
}
35

    
36
void chunk_unlock(int chunkid){
37
  if (!lock_set) return;
38
  chunkID_set_remove_chunk(lock_set, chunkid);
39
}
40

    
41
int chunk_islocked(int chunkid){
42
  int r;
43

    
44
  if (!lock_set) return 0;
45
  r = chunkID_set_check(lock_set, chunkid);
46
  return (r >= 0);
47
}
48

    
49
void stream_init(int size, struct nodeID *myID)
50
{
51
  char conf[32];
52

    
53
  cb_size = size;
54

    
55
  sprintf(conf, "size=%d", cb_size);
56
  cb = cb_init(conf);
57
  chunkInit(myID);
58
}
59

    
60
int source_init(const char *fname, struct nodeID *myID)
61
{
62
  input = input_open(fname);
63
  if (input == NULL) {
64
    return -1;
65
  }
66

    
67
  stream_init(1, myID);
68
  return 0;
69
}
70

    
71
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
72
{
73
  struct chunk *chunks;
74
  int num_chunks, i;
75
  struct chunkID_set *my_bmap = chunkID_set_init(0);
76
  chunks = cb_get_chunks(chbuf, &num_chunks);
77

    
78
  for(i=num_chunks-1; i>=0; i--) {
79
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
80
  }
81
  return my_bmap;
82
}
83

    
84
// a simple implementation that request everything that we miss ... up to max deliver
85
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver){
86
  struct chunkID_set *cset_acc, *my_bmap;
87
  int i, d, cset_off_size;
88

    
89
  my_bmap = cb_to_bmap(cb);
90
  cset_off_size = chunkID_set_size(cset_off);
91
  cset_acc = chunkID_set_init(0);
92
  for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
93
    int chunkid = chunkID_set_get_chunk(cset_off, i);
94
    dprintf("\tdo I need c%d ? :",chunkid);
95
    if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
96
      chunkID_set_add_chunk(cset_acc, chunkid);
97
      chunk_lock(chunkid,from);
98
      d++;
99
    }
100
  }
101

    
102
  chunkID_set_free(my_bmap);
103
  return cset_acc;
104
}
105

    
106
void send_bmap(struct peer *to)
107
{
108
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
109
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
110

    
111
  chunkID_set_free(my_bmap);
112
}
113

    
114
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
115
{
116
  int res;
117
  static struct chunk c;
118
  struct peer *p;
119

    
120
  res = decodeChunk(&c, buff + 1, len - 1);
121
  chunk_unlock(c.id);
122
  if (res > 0) {
123
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
124
    output_deliver(&c);
125
    res = cb_add_chunk(cb, &c);
126
    if (res < 0) {
127
      dprintf("\tchunk too old, buffer full with newer chunks\n");
128
      free(c.data);
129
      free(c.attributes);
130
    }
131
    p = peerset_get_peer(pset,from);
132
    if (!p) {
133
      fprintf(stderr,"\twarning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
134
      peerset_add_peer(pset,from);
135
      p = peerset_get_peer(pset,from);
136
    }
137
    if (p) {        //now we have it almost sure
138
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
139
      send_bmap(p);        //send explicit ack
140
    }
141
  }
142
}
143

    
144
int generated_chunk(suseconds_t *delta)
145
{
146
  int res;
147
  struct chunk c;
148

    
149
  *delta = input_get(input, &c);
150
  if (*delta < 0) {
151
    fprintf(stderr, "Error in input!\n");
152
    exit(-1);
153
  }
154
  if (c.data == NULL) {
155
    return 0;
156
  }
157
  dprintf("Generated chunk %d of %d bytes\n",c.id, c.size);
158
  res = cb_add_chunk(cb, &c);
159
  if (res < 0) {
160
    free(c.data);
161
    free(c.attributes);
162
  }
163

    
164
  return 1;
165
}
166

    
167
/**
168
 *example function to filter chunks based on whether a given peer needs them.
169
 *
170
 * Looks at buffermap information received about the given peer.
171
 */
172
int needs(struct peer *p, struct chunk *c){
173
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
174
  if (! p->bmap) {
175
    dprintf("no bmap\n");
176
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
177
  }
178
  return _needs(p->bmap, p->cb_size, c->id);
179
}
180

    
181
int _needs(struct chunkID_set *cset, int cb_size, int cid){
182
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
183
    int missing, min;
184
    //@TODO: add some bmap_timestamp based logic
185

    
186
    if (chunkID_set_size(cset) == 0) {
187
      dprintf("bmap empty\n");
188
      return 1;        // if the bmap seems empty, it needs the chunk
189
    }
190
    missing = cb_size - chunkID_set_size(cset);
191
    missing = missing < 0 ? 0 : missing;
192
    min = chunkID_set_get_chunk(cset, chunkID_set_size(cset)-1);
193
      dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
194
    return (cid >= min - missing);
195
  }
196

    
197
  dprintf("has it\n");
198
  return 0;
199
}
200

    
201
double randomPeer(struct peer **p){
202
  return timerisset(&(*p)->bmap_timestamp) ? 1 : 0.1;
203
}
204
double getChunkTimestamp(struct chunk **c){
205
  return (double) (*c)->timestamp;
206
}
207

    
208
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver){
209
  int i, d, cset_acc_size;
210

    
211
  cset_acc_size = chunkID_set_size(cset_acc);
212
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
213
    struct chunk *c;
214
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
215
    c = cb_get_chunk(cb, chunkid);
216
    if (c && needs(to, c) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
217
      int res = sendChunk(to->id, c);
218
      if (res >= 0) {
219
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
220
        d++;
221
      } else {
222
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
223
      }
224
    }
225
  }
226
}
227

    
228
void send_offer(const struct peerset *pset)
229
{
230
  struct chunk *buff;
231
  int size, res, i, n;
232
  struct peer *neighbours;
233

    
234
  n = peerset_size(pset);
235
  neighbours = peerset_get_peers(pset);
236
  dprintf("Send Offer: %d neighbours\n", n);
237
  if (n == 0) return;
238
  buff = cb_get_chunks(cb, &size);
239
  if (size == 0) return;
240

    
241
  {
242
    size_t selectedpeers_len = 1;
243
    struct chunk *chunkps[size];
244
    struct peer *peerps[n];
245
    struct peer *selectedpeers[selectedpeers_len];
246

    
247
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
248
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
249
    selectPeersForChunks(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpeers, &selectedpeers_len, needs, randomPeer);        //select a peer that needs at least one of our chunks
250

    
251
    for (i=0; i<selectedpeers_len ; i++){
252
      int max_deliver = 1;
253
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
254
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]->id));
255
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
256
      chunkID_set_free(my_bmap);
257
    }
258
  }
259
}
260

    
261

    
262
void send_chunk(const struct peerset *pset)
263
{
264
  struct chunk *buff;
265
  int size, res, i, n;
266
  struct peer *neighbours;
267

    
268
  n = peerset_size(pset);
269
  neighbours = peerset_get_peers(pset);
270
  dprintf("Send Chunk: %d neighbours\n", n);
271
  if (n == 0) return;
272
  buff = cb_get_chunks(cb, &size);
273
  dprintf("\t %d chunks in buffer...\n", size);
274
  if (size == 0) return;
275

    
276
  /************ STUPID DUMB SCHEDULING ****************/
277
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
278
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
279
  /************ /STUPID DUMB SCHEDULING ****************/
280

    
281
  /************ USE SCHEDULER ****************/
282
  {
283
    size_t selectedpairs_len = 1;
284
    struct chunk *chunkps[size];
285
    struct peer *peerps[n];
286
    struct PeerChunk selectedpairs[1];
287
  
288
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
289
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
290
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
291
  /************ /USE SCHEDULER ****************/
292

    
293
    for (i=0; i<selectedpairs_len ; i++){
294
      struct peer *p = selectedpairs[i].peer;
295
      struct chunk *c = selectedpairs[i].chunk;
296
      dprintf("\t sending chunk[%d] to ", c->id);
297
      dprintf("%s\n", node_addr(p->id));
298

    
299
      send_bmap(p);
300
      res = sendChunk(p->id, c);
301
      dprintf("\tResult: %d\n", res);
302
      if (res>=0) {
303
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
304
      } else {
305
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
306
      }
307
    }
308
  }
309
}