Statistics
| Branch: | Revision:

streamers / streaming.c @ e31aecb0

History | View | Annotate | Download (9.2 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <stdint.h>
10
#include <stdbool.h>
11
#include <math.h>
12

    
13
#include <net_helper.h>
14
#include <chunk.h> 
15
#include <chunkbuffer.h> 
16
#include "chunkbuffer_helper.h"
17
#include <trade_msg_la.h>
18
#include <trade_msg_ha.h>
19
#include <peerset.h>
20
#include <peer.h>
21
#include <chunkidset.h>
22

    
23
#include "streaming.h"
24
#include "output.h"
25
#include "input.h"
26
#include "dbg.h"
27
#include "chunk_signaling.h"
28
#include "chunklock.h"
29
#include "topology.h"
30
#include "measures.h"
31

    
32
#include "scheduler_la.h"
33

    
34
static struct chunk_buffer *cb;
35
static struct input_desc *input;
36
static int cb_size;
37
static int transid=0;
38

    
39
int _needs(struct chunkID_set *cset, int cb_size, int cid);
40

    
41
void cb_print()
42
{
43
#ifdef DEBUG
44
  struct chunk *chunks;
45
  int num_chunks, i, id;
46
  chunks = cb_get_chunks(cb, &num_chunks);
47

    
48
  dprintf("\tchbuf :");
49
  i = 0;
50
  if(num_chunks) {
51
    id = chunks[0].id;
52
    dprintf(" %d-> ",id);
53
    while (i < num_chunks) {
54
      if (id == chunks[i].id) {
55
        dprintf("%d",id % 10);
56
        i++;
57
      } else {
58
        dprintf(".");
59
      }
60
      id++;
61
    }
62
  }
63
  dprintf("\n");
64
#endif
65
}
66

    
67
void stream_init(int size, struct nodeID *myID)
68
{
69
  char conf[32];
70

    
71
  cb_size = size;
72

    
73
  sprintf(conf, "size=%d", cb_size);
74
  cb = cb_init(conf);
75
  chunkDeliveryInit(myID);
76
}
77

    
78
int source_init(const char *fname, struct nodeID *myID, bool loop)
79
{
80
  input = input_open(fname, loop ? INPUT_LOOP : 0);
81
  if (input == NULL) {
82
    return -1;
83
  }
84

    
85
  stream_init(1, myID);
86
  return 0;
87
}
88

    
89
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
90
{
91
  struct chunk *chunks;
92
  int num_chunks, i;
93
  struct chunkID_set *my_bmap = chunkID_set_init(0);
94
  chunks = cb_get_chunks(chbuf, &num_chunks);
95

    
96
  for(i=num_chunks-1; i>=0; i--) {
97
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
98
  }
99
  return my_bmap;
100
}
101

    
102
// a simple implementation that request everything that we miss ... up to max deliver
103
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver){
104
  struct chunkID_set *cset_acc, *my_bmap;
105
  int i, d, cset_off_size;
106
  double lossrate;
107

    
108
  cset_acc = chunkID_set_init(0);
109

    
110
  //reduce load a little bit if there are losses on the path from this guy
111
  lossrate = get_lossrate(from->id);
112
  lossrate = finite(lossrate) ? lossrate : 0;        //start agressively, assuming 0 loss
113
  if (rand()/((double)RAND_MAX + 1) >= 10 * lossrate ) {
114
    my_bmap = cb_to_bmap(cb);
115
    cset_off_size = chunkID_set_size(cset_off);
116
    for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
117
      int chunkid = chunkID_set_get_chunk(cset_off, i);
118
      dprintf("\tdo I need c%d ? :",chunkid);
119
      if (!chunk_islocked(chunkid) && _needs(my_bmap, cb_size, chunkid)) {
120
        chunkID_set_add_chunk(cset_acc, chunkid);
121
        chunk_lock(chunkid,from);
122
        d++;
123
      }
124
    }
125
    chunkID_set_free(my_bmap);
126
  }
127

    
128
  return cset_acc;
129
}
130

    
131
void send_bmap(struct peer *to)
132
{
133
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
134
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
135

    
136
  chunkID_set_free(my_bmap);
137
}
138

    
139
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
140
{
141
  int res;
142
  static struct chunk c;
143
  struct peer *p;
144

    
145
  res = decodeChunk(&c, buff + 1, len - 1);
146
  chunk_unlock(c.id);
147
  if (res > 0) {
148
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
149
    output_deliver(&c);
150
    res = cb_add_chunk(cb, &c);
151
    cb_print();
152
    if (res < 0) {
153
      dprintf("\tchunk too old, buffer full with newer chunks\n");
154
      free(c.data);
155
      free(c.attributes);
156
    }
157
    p = nodeid_to_peer(from,1);
158
    if (p) {        //now we have it almost sure
159
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
160
      send_bmap(p);        //send explicit ack
161
    }
162
  } else {
163
    fprintf(stderr,"\tError: can't decode chunk!\n");
164
  }
165
}
166

    
167
int generated_chunk(suseconds_t *delta)
168
{
169
  int res;
170
  struct chunk c;
171

    
172
  *delta = input_get(input, &c);
173
  if (*delta < 0) {
174
    fprintf(stderr, "Error in input!\n");
175
    exit(-1);
176
  }
177
  if (c.data == NULL) {
178
    return 0;
179
  }
180
  dprintf("Generated chunk %d of %d bytes\n",c.id, c.size);
181
  res = cb_add_chunk(cb, &c);
182
  if (res < 0) {
183
    free(c.data);
184
    free(c.attributes);
185
  }
186

    
187
  return 1;
188
}
189

    
190
/**
191
 *example function to filter chunks based on whether a given peer needs them.
192
 *
193
 * Looks at buffermap information received about the given peer.
194
 */
195
int needs(struct peer *p, struct chunk *c){
196
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
197
  if (! p->bmap) {
198
    dprintf("no bmap\n");
199
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
200
  }
201
  return _needs(p->bmap, p->cb_size, c->id);
202
}
203

    
204
int _needs(struct chunkID_set *cset, int cb_size, int cid){
205
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
206
    int missing, min;
207
    //@TODO: add some bmap_timestamp based logic
208

    
209
    if (chunkID_set_size(cset) == 0) {
210
      dprintf("bmap empty\n");
211
      return 1;        // if the bmap seems empty, it needs the chunk
212
    }
213
    missing = cb_size - chunkID_set_size(cset);
214
    missing = missing < 0 ? 0 : missing;
215
    min = chunkID_set_get_chunk(cset, chunkID_set_size(cset)-1);
216
      dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
217
    return (cid >= min - missing);
218
  }
219

    
220
  dprintf("has it\n");
221
  return 0;
222
}
223

    
224
double peerWeightReceivedfrom(struct peer **p){
225
  return timerisset(&(*p)->bmap_timestamp) ? 1 : 0.1;
226
}
227

    
228
double peerWeightUniform(struct peer **p){
229
  return 1;
230
}
231

    
232
double peerWeightRtt(struct peer **p){
233
  double rtt = get_rtt((*p)->id);
234
  dprintf("RTT to %s: %f\n", node_addr((*p)->id), rtt);
235
  return finite(rtt) ? 1 / (rtt + 0.005) : 1 / 1;
236
}
237

    
238
double getChunkTimestamp(struct chunk **c){
239
  return (double) (*c)->timestamp;
240
}
241

    
242
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver){
243
  int i, d, cset_acc_size;
244

    
245
  cset_acc_size = chunkID_set_size(cset_acc);
246
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
247
    struct chunk *c;
248
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
249
    c = cb_get_chunk(cb, chunkid);
250
    if (c && needs(to, c) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
251
      int res = sendChunk(to->id, c);
252
      if (res >= 0) {
253
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
254
        d++;
255
      } else {
256
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
257
      }
258
    }
259
  }
260
}
261

    
262
void send_offer()
263
{
264
  struct chunk *buff;
265
  int size, res, i, n;
266
  struct peer *neighbours;
267
  struct peerset *pset;
268

    
269
  pset = get_peers();
270
  n = peerset_size(pset);
271
  neighbours = peerset_get_peers(pset);
272
  dprintf("Send Offer: %d neighbours\n", n);
273
  if (n == 0) return;
274
  buff = cb_get_chunks(cb, &size);
275
  if (size == 0) return;
276

    
277
  {
278
    size_t selectedpeers_len = 1;
279
    struct chunk *chunkps[size];
280
    struct peer *peerps[n];
281
    struct peer *selectedpeers[selectedpeers_len];
282

    
283
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
284
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
285
    selectPeersForChunks(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpeers, &selectedpeers_len, needs, peerWeightReceivedfrom);        //select a peer that needs at least one of our chunks
286

    
287
    for (i=0; i<selectedpeers_len ; i++){
288
      int max_deliver = 1;
289
      struct chunkID_set *my_bmap = cb_to_bmap(cb);
290
      dprintf("\t sending offer(%d) to %s\n", transid, node_addr(selectedpeers[i]->id));
291
      res = offerChunks(selectedpeers[i]->id, my_bmap, max_deliver, transid++);
292
      chunkID_set_free(my_bmap);
293
    }
294
  }
295
}
296

    
297

    
298
void send_chunk()
299
{
300
  struct chunk *buff;
301
  int size, res, i, n;
302
  struct peer *neighbours;
303
  struct peerset *pset;
304

    
305
  pset = get_peers();
306
  n = peerset_size(pset);
307
  neighbours = peerset_get_peers(pset);
308
  dprintf("Send Chunk: %d neighbours\n", n);
309
  if (n == 0) return;
310
  buff = cb_get_chunks(cb, &size);
311
  dprintf("\t %d chunks in buffer...\n", size);
312
  if (size == 0) return;
313

    
314
  /************ STUPID DUMB SCHEDULING ****************/
315
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
316
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
317
  /************ /STUPID DUMB SCHEDULING ****************/
318

    
319
  /************ USE SCHEDULER ****************/
320
  {
321
    size_t selectedpairs_len = 1;
322
    struct chunk *chunkps[size];
323
    struct peer *peerps[n];
324
    struct PeerChunk selectedpairs[1];
325
  
326
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
327
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
328
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, peerWeightReceivedfrom, getChunkTimestamp);
329
  /************ /USE SCHEDULER ****************/
330

    
331
    for (i=0; i<selectedpairs_len ; i++){
332
      struct peer *p = selectedpairs[i].peer;
333
      struct chunk *c = selectedpairs[i].chunk;
334
      dprintf("\t sending chunk[%d] to ", c->id);
335
      dprintf("%s\n", node_addr(p->id));
336

    
337
      send_bmap(p);
338
      res = sendChunk(p->id, c);
339
      dprintf("\tResult: %d\n", res);
340
      if (res>=0) {
341
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
342
      } else {
343
        fprintf(stderr,"ERROR sending chunk %d\n",c->id);
344
      }
345
    }
346
  }
347
}