Statistics
| Branch: | Revision:

streamers / streaming.c @ d74bc79c

History | View | Annotate | Download (5.02 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24
static int cb_size;
25

    
26
void stream_init(int size, struct nodeID *myID)
27
{
28
  char conf[32];
29

    
30
  cb_size = size;
31

    
32
  sprintf(conf, "size=%d", cb_size);
33
  cb = cb_init(conf);
34
  chunkInit(myID);
35
}
36

    
37
int source_init(const char *fname, struct nodeID *myID)
38
{
39
  input = input_open(fname);
40
  if (input == NULL) {
41
    return -1;
42
  }
43

    
44
  stream_init(1, myID);
45
  return 0;
46
}
47

    
48
void send_bmap(struct peer *to)
49
{
50
  struct chunk *chunks;
51
  int num_chunks, i;
52
  struct chunkID_set *my_bmap = chunkID_set_init(0);
53
  chunks = cb_get_chunks(cb, &num_chunks);
54

    
55
  for(i=0; i<num_chunks; i++) {
56
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
57
  }
58

    
59
  sendMyBufferMap(to->id, my_bmap, cb_size, 0);
60

    
61
  chunkID_set_clear(my_bmap,0);
62
  free(my_bmap);
63
}
64

    
65
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
66
{
67
  int res;
68
  static struct chunk c;
69
  struct peer *p;
70

    
71
  res = decodeChunk(&c, buff + 1, len - 1);
72
  if (res > 0) {
73
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
74
    output_deliver(&c);
75
    res = cb_add_chunk(cb, &c);
76
    if (res < 0) {
77
      dprintf("\tchunk too old, buffer full with newer chunks\n");
78
      free(c.data);
79
      free(c.attributes);
80
    }
81
    p = peerset_get_peer(pset,from);
82
    if (!p) {
83
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
84
      peerset_add_peer(pset,from);
85
      p = peerset_get_peer(pset,from);
86
    }
87
    if (p) {        //now we have it almost sure
88
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
89
      send_bmap(p);        //send explicit ack
90
    }
91
  }
92
}
93

    
94
int generated_chunk(suseconds_t *delta)
95
{
96
  int res;
97
  struct chunk c;
98

    
99
  *delta = input_get(input, &c);
100
  if (*delta < 0) {
101
    fprintf(stderr, "Error in input!\n");
102
    exit(-1);
103
  }
104
  if (c.data == NULL) {
105
    return 0;
106
  }
107
  res = cb_add_chunk(cb, &c);
108
  if (res < 0) {
109
    free(c.data);
110
    free(c.attributes);
111
  }
112

    
113
  return 1;
114
}
115

    
116
/**
117
 *example function to filter chunks based on whether a given peer needs them.
118
 *
119
 * Looks at buffermap information received about the given peer.
120
 */
121
int needs(struct peer *p, struct chunk *c){
122
  fprintf(stderr,"%s needs c%d ? :",node_addr(p->id),c->id);
123
  if (! p->bmap) {
124
    fprintf(stderr,"no bmap\n");
125
    return 1;        // if we have no bmap information, we assume it needs the chunk (aggressive behaviour!)
126
  }
127

    
128
  if (chunkID_set_check(p->bmap,c->id) < 0) { //it might need the chunk
129
    int missing, min;
130
    //@TODO: add some bmap_timestamp based logic
131

    
132
    if (chunkID_set_size(p->bmap) == 0) {
133
      fprintf(stderr,"bmap empty\n");
134
      return 1;        // if the bmap seems empty, it needs the chunk
135
    }
136
    missing = p->cb_size - chunkID_set_size(p->bmap);
137
    missing = missing < 0 ? 0 : missing;
138
    min = chunkID_set_get_chunk(p->bmap,0);
139
      fprintf(stderr,"%s ... c->id(%d) >= min(%d) - missing(%d) ?\n",(c->id >= min - missing)?"YES":"NO",c->id, min, missing);
140
    return (c->id >= min - missing);
141
  }
142

    
143
  fprintf(stderr,"has it\n");
144
  return 0;
145
}
146

    
147
double randomPeer(struct peer **p){
148
  return 1;
149
}
150
double getChunkTimestamp(struct chunk **c){
151
  return (double) (*c)->timestamp;
152
}
153

    
154

    
155
void send_chunk(const struct peerset *pset)
156
{
157
  struct chunk *buff;
158
  int size, res, i, n;
159
  struct peer *neighbours;
160

    
161
  n = peerset_size(pset);
162
  neighbours = peerset_get_peers(pset);
163
  dprintf("Send Chunk: %d neighbours\n", n);
164
  if (n == 0) return;
165
  buff = cb_get_chunks(cb, &size);
166
  dprintf("\t %d chunks in buffer...\n", size);
167
  if (size == 0) return;
168

    
169
  /************ STUPID DUMB SCHEDULING ****************/
170
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
171
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
172
  /************ /STUPID DUMB SCHEDULING ****************/
173

    
174
  /************ USE SCHEDULER ****************/
175
  {
176
    size_t selectedpairs_len = 1;
177
    struct chunk *chunkps[size];
178
    struct peer *peerps[n];
179
    struct PeerChunk selectedpairs[1];
180
  
181
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
182
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
183
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
184
  /************ /USE SCHEDULER ****************/
185

    
186
    for (i=0; i<selectedpairs_len ; i++){
187
      struct peer *p = selectedpairs[i].peer;
188
      struct chunk *c = selectedpairs[i].chunk;
189
      dprintf("\t sending chunk[%d] to ", c->id);
190
      dprintf("%s\n", node_addr(p->id));
191

    
192
      send_bmap(p);
193
      res = sendChunk(p->id, c);
194
      dprintf("\tResult: %d\n", res);
195
      if (res>=0) {
196
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
197
      }
198
    }
199
  }
200
}