Statistics
| Branch: | Revision:

streamers / streaming.c @ d447f71d

History | View | Annotate | Download (6.52 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24
static int cb_size;
25

    
26
int _needs(struct chunkID_set *cset, int cb_size, int cid);
27

    
28
void stream_init(int size, struct nodeID *myID)
29
{
30
  char conf[32];
31

    
32
  cb_size = size;
33

    
34
  sprintf(conf, "size=%d", cb_size);
35
  cb = cb_init(conf);
36
  chunkInit(myID);
37
}
38

    
39
int source_init(const char *fname, struct nodeID *myID)
40
{
41
  input = input_open(fname);
42
  if (input == NULL) {
43
    return -1;
44
  }
45

    
46
  stream_init(1, myID);
47
  return 0;
48
}
49

    
50
// a simple implementation that request everything that we miss ... up to max deliver
51
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver){
52
  struct chunkID_set *cset_acc = chunkID_set_init(0);
53
  int i, d, cset_off_size;
54

    
55
  cset_off_size = chunkID_set_size(cset_off);
56
  for (i = 0, d = 0; i < cset_off_size && d < max_deliver; i++) {
57
    int chunkid = chunkID_set_get_chunk(cset_off, i);
58
    if (! cb_get_chunk(cb, chunkid)) {
59
      chunkID_set_add_chunk(cset_acc, chunkid);
60
      d++;
61
    }
62
  }
63

    
64
  return cset_acc;
65
}
66

    
67
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
68
{
69
  struct chunk *chunks;
70
  int num_chunks, i;
71
  struct chunkID_set *my_bmap = chunkID_set_init(0);
72
  chunks = cb_get_chunks(chbuf, &num_chunks);
73

    
74
  for(i=0; i<num_chunks; i++) {
75
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
76
  }
77
  return my_bmap;
78
}
79

    
80
void send_bmap(struct peer *to)
81
{
82
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
83
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
84

    
85
  chunkID_set_clear(my_bmap,0);
86
  free(my_bmap);
87
}
88

    
89
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
90
{
91
  int res;
92
  static struct chunk c;
93
  struct peer *p;
94

    
95
  res = decodeChunk(&c, buff + 1, len - 1);
96
  if (res > 0) {
97
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
98
    output_deliver(&c);
99
    res = cb_add_chunk(cb, &c);
100
    if (res < 0) {
101
      dprintf("\tchunk too old, buffer full with newer chunks\n");
102
      free(c.data);
103
      free(c.attributes);
104
    }
105
    p = peerset_get_peer(pset,from);
106
    if (!p) {
107
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
108
      peerset_add_peer(pset,from);
109
      p = peerset_get_peer(pset,from);
110
    }
111
    if (p) {        //now we have it almost sure
112
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
113
      send_bmap(p);        //send explicit ack
114
    }
115
  }
116
}
117

    
118
int generated_chunk(suseconds_t *delta)
119
{
120
  int res;
121
  struct chunk c;
122

    
123
  *delta = input_get(input, &c);
124
  if (*delta < 0) {
125
    fprintf(stderr, "Error in input!\n");
126
    exit(-1);
127
  }
128
  if (c.data == NULL) {
129
    return 0;
130
  }
131
  res = cb_add_chunk(cb, &c);
132
  if (res < 0) {
133
    free(c.data);
134
    free(c.attributes);
135
  }
136

    
137
  return 1;
138
}
139

    
140
/**
141
 *example function to filter chunks based on whether a given peer needs them.
142
 *
143
 * Looks at buffermap information received about the given peer.
144
 */
145
int needs(struct peer *p, struct chunk *c){
146
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
147
  if (! p->bmap) {
148
    dprintf("no bmap\n");
149
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
150
  }
151
  return _needs(p->bmap, p->cb_size, c->id);
152
}
153

    
154
int _needs(struct chunkID_set *cset, int cb_size, int cid){
155
  if (chunkID_set_check(cset,cid) < 0) { //it might need the chunk
156
    int missing, min;
157
    //@TODO: add some bmap_timestamp based logic
158

    
159
    if (chunkID_set_size(cset) == 0) {
160
      dprintf("bmap empty\n");
161
      return 1;        // if the bmap seems empty, it needs the chunk
162
    }
163
    missing = cb_size - chunkID_set_size(cset);
164
    missing = missing < 0 ? 0 : missing;
165
    min = chunkID_set_get_chunk(cset,0);
166
      dprintf("%s ... cid(%d) >= min(%d) - missing(%d) ?\n",(cid >= min - missing)?"YES":"NO",cid, min, missing);
167
    return (cid >= min - missing);
168
  }
169

    
170
  dprintf("has it\n");
171
  return 0;
172
}
173

    
174
double randomPeer(struct peer **p){
175
  return 1;
176
}
177
double getChunkTimestamp(struct chunk **c){
178
  return (double) (*c)->timestamp;
179
}
180

    
181
void send_accepted_chunks(struct peer *to, struct chunkID_set *cset_acc, int max_deliver){
182
  int i, d, cset_acc_size;
183

    
184
  cset_acc_size = chunkID_set_size(cset_acc);
185
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
186
    struct chunk *c;
187
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
188
    c = cb_get_chunk(cb, chunkid);
189
    if (c && needs(to, c) ) {        // we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
190
      int res = sendChunk(to->id, c);
191
      if (res >= 0) {
192
        chunkID_set_add_chunk(to->bmap, c->id); //don't send twice ... assuming that it will actually arrive
193
        d++;
194
      }
195
    }
196
  }
197
}
198

    
199

    
200
void send_chunk(const struct peerset *pset)
201
{
202
  struct chunk *buff;
203
  int size, res, i, n;
204
  struct peer *neighbours;
205

    
206
  n = peerset_size(pset);
207
  neighbours = peerset_get_peers(pset);
208
  dprintf("Send Chunk: %d neighbours\n", n);
209
  if (n == 0) return;
210
  buff = cb_get_chunks(cb, &size);
211
  dprintf("\t %d chunks in buffer...\n", size);
212
  if (size == 0) return;
213

    
214
  /************ STUPID DUMB SCHEDULING ****************/
215
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
216
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
217
  /************ /STUPID DUMB SCHEDULING ****************/
218

    
219
  /************ USE SCHEDULER ****************/
220
  {
221
    size_t selectedpairs_len = 1;
222
    struct chunk *chunkps[size];
223
    struct peer *peerps[n];
224
    struct PeerChunk selectedpairs[1];
225
  
226
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
227
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
228
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
229
  /************ /USE SCHEDULER ****************/
230

    
231
    for (i=0; i<selectedpairs_len ; i++){
232
      struct peer *p = selectedpairs[i].peer;
233
      struct chunk *c = selectedpairs[i].chunk;
234
      dprintf("\t sending chunk[%d] to ", c->id);
235
      dprintf("%s\n", node_addr(p->id));
236

    
237
      send_bmap(p);
238
      res = sendChunk(p->id, c);
239
      dprintf("\tResult: %d\n", res);
240
      if (res>=0) {
241
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
242
      }
243
    }
244
  }
245
}