Statistics
| Branch: | Revision:

streamers / streaming.c @ 0c3f7376

History | View | Annotate | Download (4.09 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24

    
25
void stream_init(int size, struct nodeID *myID)
26
{
27
  char conf[32];
28

    
29
  sprintf(conf, "size=%d", size);
30
  cb = cb_init(conf);
31
  chunkInit(myID);
32
}
33

    
34
int source_init(const char *fname, struct nodeID *myID)
35
{
36
  input = input_open(fname);
37
  if (input == NULL) {
38
    return -1;
39
  }
40

    
41
  stream_init(1, myID);
42
  return 0;
43
}
44

    
45
void send_bmap(struct peer *to)
46
{
47
  struct chunk *chunks;
48
  int num_chunks, i;
49
  struct chunkID_set *my_bmap = chunkID_set_init(0);
50
  chunks = cb_get_chunks(cb, &num_chunks);
51

    
52
  for(i=0; i<num_chunks; i++) {
53
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
54
  }
55

    
56
  sendMyBufferMap(to->id, my_bmap, 0);
57

    
58
  chunkID_set_clear(my_bmap,0);
59
  free(my_bmap);
60
}
61

    
62
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
63
{
64
  int res;
65
  static struct chunk c;
66
  struct peer *p;
67

    
68
  res = decodeChunk(&c, buff + 1, len - 1);
69
  if (res > 0) {
70
    output_deliver(&c);
71
    res = cb_add_chunk(cb, &c);
72
    if (res < 0) {
73
      free(c.data);
74
      free(c.attributes);
75
    }
76
    p = peerset_get_peer(pset,from);
77
    if (!p) {
78
      fprintf(stderr,"warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
79
      peerset_add_peer(pset,from);
80
      p = peerset_get_peer(pset,from);
81
    }
82
    if (p) {        //now we have it almost sure
83
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
84
      send_bmap(p);        //send explicit ack
85
    }
86
  }
87
}
88

    
89
int generated_chunk(suseconds_t *delta)
90
{
91
  int res;
92
  struct chunk c;
93

    
94
  *delta = input_get(input, &c);
95
  if (*delta < 0) {
96
    fprintf(stderr, "Error in input!\n");
97
    exit(-1);
98
  }
99
  if (c.data == NULL) {
100
    return 0;
101
  }
102
  res = cb_add_chunk(cb, &c);
103
  if (res < 0) {
104
    free(c.data);
105
    free(c.attributes);
106
  }
107

    
108
  return 1;
109
}
110

    
111
/**
112
 *example function to filter chunks based on whether a given peer needs them. The real implementation
113
 * should look at buffermap information received about the given peer (or it should guess)
114
 */
115
int needs(struct peer *p, struct chunk *c){
116
  return (! p->bmap || chunkID_set_check(p->bmap,c->id) < 0);
117
}
118
double randomPeer(struct peer **p){
119
  return 1;
120
}
121
double getChunkTimestamp(struct chunk **c){
122
  return (double) (*c)->timestamp;
123
}
124

    
125

    
126
void send_chunk(const struct peerset *pset)
127
{
128
  struct chunk *buff;
129
  int size, res, i, n;
130
  struct peer *neighbours;
131

    
132
  n = peerset_size(pset);
133
  neighbours = peerset_get_peers(pset);
134
  dprintf("Send Chunk: %d neighbours\n", n);
135
  if (n == 0) return;
136
  buff = cb_get_chunks(cb, &size);
137
  dprintf("\t %d chunks in buffer...\n", size);
138
  if (size == 0) return;
139

    
140
  /************ STUPID DUMB SCHEDULING ****************/
141
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
142
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
143
  /************ /STUPID DUMB SCHEDULING ****************/
144

    
145
  /************ USE SCHEDULER ****************/
146
  {
147
    size_t selectedpairs_len = 1;
148
    struct chunk *chunkps[size];
149
    struct peer *peerps[n];
150
    struct PeerChunk selectedpairs[1];
151
  
152
    for (i = 0;i < size; i++) chunkps[i] = buff+i;
153
    for (i = 0; i<n; i++) peerps[i] = neighbours+i;
154
    schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
155
  /************ /USE SCHEDULER ****************/
156

    
157
    for (i=0; i<selectedpairs_len ; i++){
158
      struct peer *p = selectedpairs[i].peer;
159
      struct chunk *c = selectedpairs[i].chunk;
160
      dprintf("\t sending chunk[%d] to ", c->id);
161
      dprintf("%s\n", node_addr(p->id));
162

    
163
      res = sendChunk(p->id, c);
164
      dprintf("Result: %d\n", res);
165
      if (res>=0) {
166
        chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
167
      }
168
      send_bmap(p);
169
    }
170
  }
171
}