Statistics
| Branch: | Revision:

streamers / streaming.c @ 22ebd96d

History | View | Annotate | Download (5.12 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24
static int cb_size;
25

    
26
void stream_init(int size, struct nodeID *myID)
27
{
28
  char conf[32];
29

    
30
  cb_size = size;
31

    
32
  sprintf(conf, "size=%d", cb_size);
33
  cb = cb_init(conf);
34
  chunkInit(myID);
35
}
36

    
37
int source_init(const char *fname, struct nodeID *myID)
38
{
39
  input = input_open(fname);
40
  if (input == NULL) {
41
    return -1;
42
  }
43

    
44
  stream_init(1, myID);
45
  return 0;
46
}
47

    
48
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
49
{
50
  struct chunk *chunks;
51
  int num_chunks, i;
52
  struct chunkID_set *my_bmap = chunkID_set_init(0);
53
  chunks = cb_get_chunks(chbuf, &num_chunks);
54

    
55
  for(i=0; i<num_chunks; i++) {
56
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
57
  }
58
  return my_bmap;
59
}
60

    
61
void send_bmap(struct peer *to)
62
{
63
  struct chunkID_set *my_bmap = cb_to_bmap(cb);
64
   sendMyBufferMap(to->id, my_bmap, cb_size, 0);
65

    
66
  chunkID_set_clear(my_bmap,0);
67
  free(my_bmap);
68
}
69

    
70
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
71
{
72
  int res;
73
  static struct chunk c;
74
  struct peer *p;
75

    
76
  res = decodeChunk(&c, buff + 1, len - 1);
77
  if (res > 0) {
78
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
79
    output_deliver(&c);
80
    res = cb_add_chunk(cb, &c);
81
    if (res < 0) {
82
      dprintf("\tchunk too old, buffer full with newer chunks\n");
83
      free(c.data);
84
      free(c.attributes);
85
    }
86
    p = peerset_get_peer(pset,from);
87
    if (!p) {
88
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
89
      peerset_add_peer(pset,from);
90
      p = peerset_get_peer(pset,from);
91
    }
92
    if (p) {        //now we have it almost sure
93
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
94
      send_bmap(p);        //send explicit ack
95
    }
96
  }
97
}
98

    
99
int generated_chunk(suseconds_t *delta)
100
{
101
  int res;
102
  struct chunk c;
103

    
104
  *delta = input_get(input, &c);
105
  if (*delta < 0) {
106
    fprintf(stderr, "Error in input!\n");
107
    exit(-1);
108
  }
109
  if (c.data == NULL) {
110
    return 0;
111
  }
112
  res = cb_add_chunk(cb, &c);
113
  if (res < 0) {
114
    free(c.data);
115
    free(c.attributes);
116
  }
117

    
118
  return 1;
119
}
120

    
121
/**
122
 *example function to filter chunks based on whether a given peer needs them.
123
 *
124
 * Looks at buffermap information received about the given peer.
125
 */
126
int needs(struct peer *p, struct chunk *c){
127
  dprintf("\t%s needs c%d ? :",node_addr(p->id),c->id);
128
  if (! p->bmap) {
129
    dprintf("no bmap\n");
130
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
131
  }
132

    
133
  if (chunkID_set_check(p->bmap,c->id) < 0) { //it might need the chunk
134
    int missing, min;
135
    //@TODO: add some bmap_timestamp based logic
136

    
137
    if (chunkID_set_size(p->bmap) == 0) {
138
      dprintf("bmap empty\n");
139
      return 1;        // if the bmap seems empty, it needs the chunk
140
    }
141
    missing = p->cb_size - chunkID_set_size(p->bmap);
142
    missing = missing < 0 ? 0 : missing;
143
    min = chunkID_set_get_chunk(p->bmap,0);
144
      dprintf("%s ... c->id(%d) >= min(%d) - missing(%d) ?\n",(c->id >= min - missing)?"YES":"NO",c->id, min, missing);
145
    return (c->id >= min - missing);
146
  }
147

    
148
  dprintf("has it\n");
149
  return 0;
150
}
151

    
152
double randomPeer(struct peer **p){
153
  return 1;
154
}
155
double getChunkTimestamp(struct chunk **c){
156
  return (double) (*c)->timestamp;
157
}
158

    
159

    
160
void send_chunk(const struct peerset *pset)
161
{
162
  struct chunk *buff;
163
  int size, res, i, n;
164
  struct peer *neighbours;
165

    
166
  n = peerset_size(pset);
167
  neighbours = peerset_get_peers(pset);
168
  dprintf("Send Chunk: %d neighbours\n", n);
169
  if (n == 0) return;
170
  buff = cb_get_chunks(cb, &size);
171
  dprintf("\t %d chunks in buffer...\n", size);
172
  if (size == 0) return;
173

    
174
  /************ STUPID DUMB SCHEDULING ****************/
175
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
176
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
177
  /************ /STUPID DUMB SCHEDULING ****************/
178

    
179
  /************ USE SCHEDULER ****************/
180
  {
181
    size_t selectedpairs_len = 1;
182
    struct chunk *chunkps[size];
183
    struct peer *peerps[n];
184
    struct PeerChunk selectedpairs[1];
185
  
186
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
187
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
188
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
189
  /************ /USE SCHEDULER ****************/
190

    
191
    for (i=0; i<selectedpairs_len ; i++){
192
      struct peer *p = selectedpairs[i].peer;
193
      struct chunk *c = selectedpairs[i].chunk;
194
      dprintf("\t sending chunk[%d] to ", c->id);
195
      dprintf("%s\n", node_addr(p->id));
196

    
197
      send_bmap(p);
198
      res = sendChunk(p->id, c);
199
      dprintf("\tResult: %d\n", res);
200
      if (res>=0) {
201
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
202
      }
203
    }
204
  }
205
}