Statistics
| Branch: | Revision:

streamers / streaming.c @ c8c4c779

History | View | Annotate | Download (7.59 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24
static int cb_size;
25

    
26
int _needs(struct chunkID_set *cset, int cb_size, int cid);
27

    
28
void stream_init(int size, struct nodeID *myID)
29
{
30
  char conf[32];
31

    
32
  cb_size = size;
33

    
34
  sprintf(conf, "size=%d", cb_size);
35
  cb = cb_init(conf);
36
  chunkInit(myID);
37
}
38

    
39
int source_init(const char *fname, struct nodeID *myID)
40
{
41
  input = input_open(fname);
42
  if (input == NULL) {
43
    return -1;
44
  }
45

    
46
  stream_init(1, myID);
47
  return 0;
48
}
49

    
50
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
51
{
52
  struct chunk *chunks;
53
  int num_chunks, i;
54
  struct chunkID_set *my_bmap = chunkID_set_init(0);
55
  chunks = cb_get_chunks(chbuf, &num_chunks);
56

    
57
  for(i=0; i<num_chunks; i++) {
58
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
59
  }
60
  return my_bmap;
61
}
62

    
63
// a simple implementation that request everything that we miss ... up to max deliver
64
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver){
65
  struct chunkID_set *cset_acc, *my_bmap;
66
  int i, d, cset_off_size;
67

    
68
  my_bmap = cb_to_bmap(cb);
69
  cset_off_size = chunkID_set_size(cset_off);
70
  cset_acc = chunkID_set_init(0);
71
  for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
72
    int chunkid = chunkID_set_get_chunk(cset_off, i);
73
    if (_needs(my_bmap, cb_size, chunkid)) {
74
      chunkID_set_add_chunk(cset_acc, chunkid);
75
      d++;
76
    }
77
  }
78

    
79
  chunkID_set_free(my_bmap);
80
  return cset_acc;
81
}
82

    
83
void send_bmap(struct peer *to)
84
{
85
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
86
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
87

    
88
  chunkID_set_free(my_bmap);
89
}
90

    
91
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
92
{
93
  int res;
94
  static struct chunk c;
95
  struct peer *p;
96

    
97
  res = decodeChunk(&c, buff + 1, len - 1);
98
  if (res > 0) {
99
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
100
    output_deliver(&c);
101
    res = cb_add_chunk(cb, &c);
102
    if (res < 0) {
103
      dprintf("\tchunk too old, buffer full with newer chunks\n");
104
      free(c.data);
105
      free(c.attributes);
106
    }
107
    p = peerset_get_peer(pset,from);
108
    if (!p) {
109
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
110
      peerset_add_peer(pset,from);
111
      p = peerset_get_peer(pset,from);
112
    }
113
    if (p) {        //now we have it almost sure
114
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
115
      send_bmap(p);        //send explicit ack
116
    }
117
  }
118
}
119

    
120
int generated_chunk(suseconds_t *delta)
121
{
122
  int res;
123
  struct chunk c;
124

    
125
  *delta = input_get(input, &c);
126
  if (*delta < 0) {
127
    fprintf(stderr, "Error in input!\n");
128
    exit(-1);
129
  }
130
  if (c.data == NULL) {
131
    return 0;
132
  }
133
  res = cb_add_chunk(cb, &c);
134
  if (res < 0) {
135
    free(c.data);
136
    free(c.attributes);
137
  }
138

    
139
  return 1;
140
}
141

    
142
/**
143
 *example function to filter chunks based on whether a given peer needs them.
144
 *
145
 * Looks at buffermap information received about the given peer.
146
 */
147
int needs(struct peer *p, struct chunk *c){
148
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
149
  if (! p->bmap) {
150
    dprintf("no bmap\n");
151
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
152
  }
153
  return _needs(p->bmap, p->cb_size, c->id);
154
}
155

    
156
int _needs(struct chunkID_set *cset, int cb_size, int cid){
157
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
158
    int missing, min;
159
    //@TODO: add some bmap_timestamp based logic
160

    
161
    if (chunkID_set_size(cset) == 0) {
162
      dprintf("bmap empty\n");
163
      return 1;        // if the bmap seems empty, it needs the chunk
164
    }
165
    missing = cb_size - chunkID_set_size(cset);
166
    missing = missing < 0 ? 0 : missing;
167
    min = chunkID_set_get_chunk(cset,0);
168
      dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
169
    return (cid >= min - missing);
170
  }
171

    
172
  dprintf("has it\n");
173
  return 0;
174
}
175

    
176
double randomPeer(struct peer **p){
177
  return 1;
178
}
179
double getChunkTimestamp(struct chunk **c){
180
  return (double) (*c)->timestamp;
181
}
182

    
183
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver){
184
  int i, d, cset_acc_size;
185

    
186
  cset_acc_size = chunkID_set_size(cset_acc);
187
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
188
    struct chunk *c;
189
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
190
    c = cb_get_chunk(cb, chunkid);
191
    if (c && needs(to, c) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
192
      int res = sendChunk(to->id, c);
193
      if (res >= 0) {
194
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
195
        d++;
196
      }
197
    }
198
  }
199
}
200

    
201
void send_offer(const struct peerset *pset)
202
{
203
  struct chunk *buff;
204
  int size, res, i, n;
205
  struct peer *neighbours;
206

    
207
  n = peerset_size(pset);
208
  neighbours = peerset_get_peers(pset);
209
  dprintf("Send Offer: %d neighbours\n", n);
210
  if (n == 0) return;
211
  buff = cb_get_chunks(cb, &size);
212
  if (size == 0) return;
213

    
214
  {
215
    size_t selectedpeers_len = 1;
216
    struct chunk *chunkps[size];
217
    struct peer *peerps[n];
218
    struct peer *selectedpeers[selectedpeers_len];
219

    
220
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
221
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
222
    selectPeersForChunks(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpeers, &selectedpeers_len, needs, randomPeer);        //select a peer that needs at least one of our chunks
223

    
224
    for (i=0; i<selectedpeers_len ; i++){
225
      int max_deliver = 1;
226
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
227
      dprintf("\t sending offer to %s\n", node_addr(selectedpeers[i]->id));
228
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, 0);
229
    }
230
  }
231
}
232

    
233

    
234
void send_chunk(const struct peerset *pset)
235
{
236
  struct chunk *buff;
237
  int size, res, i, n;
238
  struct peer *neighbours;
239

    
240
  n = peerset_size(pset);
241
  neighbours = peerset_get_peers(pset);
242
  dprintf("Send Chunk: %d neighbours\n", n);
243
  if (n == 0) return;
244
  buff = cb_get_chunks(cb, &size);
245
  dprintf("\t %d chunks in buffer...\n", size);
246
  if (size == 0) return;
247

    
248
  /************ STUPID DUMB SCHEDULING ****************/
249
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
250
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
251
  /************ /STUPID DUMB SCHEDULING ****************/
252

    
253
  /************ USE SCHEDULER ****************/
254
  {
255
    size_t selectedpairs_len = 1;
256
    struct chunk *chunkps[size];
257
    struct peer *peerps[n];
258
    struct PeerChunk selectedpairs[1];
259
  
260
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
261
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
262
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
263
  /************ /USE SCHEDULER ****************/
264

    
265
    for (i=0; i<selectedpairs_len ; i++){
266
      struct peer *p = selectedpairs[i].peer;
267
      struct chunk *c = selectedpairs[i].chunk;
268
      dprintf("\t sending chunk[%d] to ", c->id);
269
      dprintf("%s\n", node_addr(p->id));
270

    
271
      send_bmap(p);
272
      res = sendChunk(p->id, c);
273
      dprintf("\tResult: %d\n", res);
274
      if (res>=0) {
275
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
276
      }
277
    }
278
  }
279
}