Statistics
| Branch: | Revision:

streamers / streaming.c @ 449e156e

History | View | Annotate | Download (4.23 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24

    
25
void stream_init(int size, struct nodeID *myID)
26
{
27
  char conf[32];
28

    
29
  sprintf(conf, "size=%d", size);
30
  cb = cb_init(conf);
31
  chunkInit(myID);
32
}
33

    
34
int source_init(const char *fname, struct nodeID *myID)
35
{
36
  input = input_open(fname);
37
  if (input == NULL) {
38
    return -1;
39
  }
40

    
41
  stream_init(1, myID);
42
  return 0;
43
}
44

    
45
void send_bmap(struct peer *to)
46
{
47
  struct chunk *chunks;
48
  int num_chunks, i;
49
  struct chunkID_set *my_bmap = chunkID_set_init(0);
50
  chunks = cb_get_chunks(cb, &num_chunks);
51

    
52
  for(i=0; i<num_chunks; i++) {
53
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
54
  }
55

    
56
  sendMyBufferMap(to->id, my_bmap, 0);
57

    
58
  chunkID_set_clear(my_bmap,0);
59
  free(my_bmap);
60
}
61

    
62
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
63
{
64
  int res;
65
  static struct chunk c;
66
  struct peer *p;
67

    
68
  res = decodeChunk(&c, buff + 1, len - 1);
69
  if (res > 0) {
70
    dprintf("Received chunk %d from peer: %s\n", c.id, node_addr(from));
71
    output_deliver(&c);
72
    res = cb_add_chunk(cb, &c);
73
    if (res < 0) {
74
      dprintf("\tchunk too old, buffer full with newer chunks\n");
75
      free(c.data);
76
      free(c.attributes);
77
    }
78
    p = peerset_get_peer(pset,from);
79
    if (!p) {
80
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
81
      peerset_add_peer(pset,from);
82
      p = peerset_get_peer(pset,from);
83
    }
84
    if (p) {        //now we have it almost sure
85
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
86
      send_bmap(p);        //send explicit ack
87
    }
88
  }
89
}
90

    
91
int generated_chunk(suseconds_t *delta)
92
{
93
  int res;
94
  struct chunk c;
95

    
96
  *delta = input_get(input, &c);
97
  if (*delta < 0) {
98
    fprintf(stderr, "Error in input!\n");
99
    exit(-1);
100
  }
101
  if (c.data == NULL) {
102
    return 0;
103
  }
104
  res = cb_add_chunk(cb, &c);
105
  if (res < 0) {
106
    free(c.data);
107
    free(c.attributes);
108
  }
109

    
110
  return 1;
111
}
112

    
113
/**
114
 *example function to filter chunks based on whether a given peer needs them. The real implementation
115
 * should look at buffermap information received about the given peer (or it should guess)
116
 */
117
int needs(struct peer *p, struct chunk *c){
118
  return (! p->bmap || chunkID_set_check(p->bmap,c->id) < 0);
119
}
120
double randomPeer(struct peer **p){
121
  return 1;
122
}
123
double getChunkTimestamp(struct chunk **c){
124
  return (double) (*c)->timestamp;
125
}
126

    
127

    
128
void send_chunk(const struct peerset *pset)
129
{
130
  struct chunk *buff;
131
  int size, res, i, n;
132
  struct peer *neighbours;
133

    
134
  n = peerset_size(pset);
135
  neighbours = peerset_get_peers(pset);
136
  dprintf("Send Chunk: %d neighbours\n", n);
137
  if (n == 0) return;
138
  buff = cb_get_chunks(cb, &size);
139
  dprintf("\t %d chunks in buffer...\n", size);
140
  if (size == 0) return;
141

    
142
  /************ STUPID DUMB SCHEDULING ****************/
143
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
144
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
145
  /************ /STUPID DUMB SCHEDULING ****************/
146

    
147
  /************ USE SCHEDULER ****************/
148
  {
149
    size_t selectedpairs_len = 1;
150
    struct chunk *chunkps[size];
151
    struct peer *peerps[n];
152
    struct PeerChunk selectedpairs[1];
153
  
154
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
155
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
156
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
157
  /************ /USE SCHEDULER ****************/
158

    
159
    for (i=0; i<selectedpairs_len ; i++){
160
      struct peer *p = selectedpairs[i].peer;
161
      struct chunk *c = selectedpairs[i].chunk;
162
      dprintf("\t sending chunk[%d] to ", c->id);
163
      dprintf("%s\n", node_addr(p->id));
164

    
165
      send_bmap(p);
166
      res = sendChunk(p->id, c);
167
      dprintf("\tResult: %d\n", res);
168
      if (res>=0) {
169
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
170
      }
171
    }
172
  }
173
}