Statistics
| Branch: | Revision:

streamers / streaming.c @ abd2ef3b

History | View | Annotate | Download (8.08 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19
#include "chunklock.h"
20

    
21
#include "scheduler_la.h"
22

    
23
static struct chunk_buffer *cb;
24
static struct input_desc *input;
25
static int cb_size;
26
static int transid=0;
27

    
28
int _needs(struct chunkID_set *cset, int cb_size, int cid);
29

    
30
void stream_init(int size, struct nodeID *myID)
31
{
32
  char conf[32];
33

    
34
  cb_size = size;
35

    
36
  sprintf(conf, "size=%d", cb_size);
37
  cb = cb_init(conf);
38
  chunkInit(myID);
39
}
40

    
41
int source_init(const char *fname, struct nodeID *myID)
42
{
43
  input = input_open(fname);
44
  if (input == NULL) {
45
    return -1;
46
  }
47

    
48
  stream_init(1, myID);
49
  return 0;
50
}
51

    
52
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
53
{
54
  struct chunk *chunks;
55
  int num_chunks, i;
56
  struct chunkID_set *my_bmap = chunkID_set_init(0);
57
  chunks = cb_get_chunks(chbuf, &num_chunks);
58

    
59
  for(i=num_chunks-1; i>=0; i--) {
60
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
61
  }
62
  return my_bmap;
63
}
64

    
65
// a simple implementation that request everything that we miss ... up to max deliver
66
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver){
67
  struct chunkID_set *cset_acc, *my_bmap;
68
  int i, d, cset_off_size;
69

    
70
  my_bmap = cb_to_bmap(cb);
71
  cset_off_size = chunkID_set_size(cset_off);
72
  cset_acc = chunkID_set_init(0);
73
  for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
74
    int chunkid = chunkID_set_get_chunk(cset_off, i);
75
    dprintf("\tdo I need c%d ? :",chunkid);
76
    if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
77
      chunkID_set_add_chunk(cset_acc, chunkid);
78
      chunk_lock(chunkid,from);
79
      d++;
80
    }
81
  }
82

    
83
  chunkID_set_free(my_bmap);
84
  return cset_acc;
85
}
86

    
87
void send_bmap(struct peer *to)
88
{
89
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
90
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
91

    
92
  chunkID_set_free(my_bmap);
93
}
94

    
95
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
96
{
97
  int res;
98
  static struct chunk c;
99
  struct peer *p;
100

    
101
  res = decodeChunk(&c, buff + 1, len - 1);
102
  chunk_unlock(c.id);
103
  if (res > 0) {
104
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
105
    output_deliver(&c);
106
    res = cb_add_chunk(cb, &c);
107
    if (res < 0) {
108
      dprintf("\tchunk too old, buffer full with newer chunks\n");
109
      free(c.data);
110
      free(c.attributes);
111
    }
112
    p = peerset_get_peer(pset,from);
113
    if (!p) {
114
      fprintf(stderr,"\twarning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
115
      peerset_add_peer(pset,from);
116
      p = peerset_get_peer(pset,from);
117
    }
118
    if (p) {        //now we have it almost sure
119
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
120
      send_bmap(p);        //send explicit ack
121
    }
122
  }
123
}
124

    
125
int generated_chunk(suseconds_t *delta)
126
{
127
  int res;
128
  struct chunk c;
129

    
130
  *delta = input_get(input, &c);
131
  if (*delta < 0) {
132
    fprintf(stderr, "Error in input!\n");
133
    exit(-1);
134
  }
135
  if (c.data == NULL) {
136
    return 0;
137
  }
138
  dprintf("Generated chunk %d of %d bytes\n",c.id, c.size);
139
  res = cb_add_chunk(cb, &c);
140
  if (res < 0) {
141
    free(c.data);
142
    free(c.attributes);
143
  }
144

    
145
  return 1;
146
}
147

    
148
/**
149
 *example function to filter chunks based on whether a given peer needs them.
150
 *
151
 * Looks at buffermap information received about the given peer.
152
 */
153
int needs(struct peer *p, struct chunk *c){
154
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
155
  if (! p->bmap) {
156
    dprintf("no bmap\n");
157
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
158
  }
159
  return _needs(p->bmap, p->cb_size, c->id);
160
}
161

    
162
int _needs(struct chunkID_set *cset, int cb_size, int cid){
163
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
164
    int missing, min;
165
    //@TODO: add some bmap_timestamp based logic
166

    
167
    if (chunkID_set_size(cset) == 0) {
168
      dprintf("bmap empty\n");
169
      return 1;        // if the bmap seems empty, it needs the chunk
170
    }
171
    missing = cb_size - chunkID_set_size(cset);
172
    missing = missing < 0 ? 0 : missing;
173
    min = chunkID_set_get_chunk(cset, chunkID_set_size(cset)-1);
174
      dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
175
    return (cid >= min - missing);
176
  }
177

    
178
  dprintf("has it\n");
179
  return 0;
180
}
181

    
182
double randomPeer(struct peer **p){
183
  return timerisset(&(*p)->bmap_timestamp) ? 1 : 0.1;
184
}
185
double getChunkTimestamp(struct chunk **c){
186
  return (double) (*c)->timestamp;
187
}
188

    
189
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver){
190
  int i, d, cset_acc_size;
191

    
192
  cset_acc_size = chunkID_set_size(cset_acc);
193
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
194
    struct chunk *c;
195
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
196
    c = cb_get_chunk(cb, chunkid);
197
    if (c && needs(to, c) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
198
      int res = sendChunk(to->id, c);
199
      if (res >= 0) {
200
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
201
        d++;
202
      } else {
203
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
204
      }
205
    }
206
  }
207
}
208

    
209
void send_offer(const struct peerset *pset)
210
{
211
  struct chunk *buff;
212
  int size, res, i, n;
213
  struct peer *neighbours;
214

    
215
  n = peerset_size(pset);
216
  neighbours = peerset_get_peers(pset);
217
  dprintf("Send Offer: %d neighbours\n", n);
218
  if (n == 0) return;
219
  buff = cb_get_chunks(cb, &size);
220
  if (size == 0) return;
221

    
222
  {
223
    size_t selectedpeers_len = 1;
224
    struct chunk *chunkps[size];
225
    struct peer *peerps[n];
226
    struct peer *selectedpeers[selectedpeers_len];
227

    
228
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
229
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
230
    selectPeersForChunks(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpeers, &selectedpeers_len, needs, randomPeer);        //select a peer that needs at least one of our chunks
231

    
232
    for (i=0; i<selectedpeers_len ; i++){
233
      int max_deliver = 1;
234
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
235
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]->id));
236
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
237
      chunkID_set_free(my_bmap);
238
    }
239
  }
240
}
241

    
242

    
243
void send_chunk(const struct peerset *pset)
244
{
245
  struct chunk *buff;
246
  int size, res, i, n;
247
  struct peer *neighbours;
248

    
249
  n = peerset_size(pset);
250
  neighbours = peerset_get_peers(pset);
251
  dprintf("Send Chunk: %d neighbours\n", n);
252
  if (n == 0) return;
253
  buff = cb_get_chunks(cb, &size);
254
  dprintf("\t %d chunks in buffer...\n", size);
255
  if (size == 0) return;
256

    
257
  /************ STUPID DUMB SCHEDULING ****************/
258
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
259
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
260
  /************ /STUPID DUMB SCHEDULING ****************/
261

    
262
  /************ USE SCHEDULER ****************/
263
  {
264
    size_t selectedpairs_len = 1;
265
    struct chunk *chunkps[size];
266
    struct peer *peerps[n];
267
    struct PeerChunk selectedpairs[1];
268
  
269
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
270
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
271
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
272
  /************ /USE SCHEDULER ****************/
273

    
274
    for (i=0; i<selectedpairs_len ; i++){
275
      struct peer *p = selectedpairs[i].peer;
276
      struct chunk *c = selectedpairs[i].chunk;
277
      dprintf("\t sending chunk[%d] to ", c->id);
278
      dprintf("%s\n", node_addr(p->id));
279

    
280
      send_bmap(p);
281
      res = sendChunk(p->id, c);
282
      dprintf("\tResult: %d\n", res);
283
      if (res>=0) {
284
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
285
      } else {
286
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
287
      }
288
    }
289
  }
290
}