Statistics
| Branch: | Revision:

streamers / streaming.c @ 8c2cc74a

History | View | Annotate | Download (3.9 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18
#include "chunk_signaling.h"
19

    
20
#include "scheduler_la.h"
21

    
22
static struct chunk_buffer *cb;
23
static struct input_desc *input;
24

    
25
void stream_init(int size, struct nodeID *myID)
26
{
27
  char conf[32];
28

    
29
  sprintf(conf, "size=%d", size);
30
  cb = cb_init(conf);
31
  chunkInit(myID);
32
}
33

    
34
int source_init(const char *fname, struct nodeID *myID)
35
{
36
  input = input_open(fname);
37
  if (input == NULL) {
38
    return -1;
39
  }
40

    
41
  stream_init(1, myID);
42
  return 0;
43
}
44

    
45
void send_bmap(struct peer *to)
46
{
47
  struct chunk *chunks;
48
  int num_chunks, i;
49
  struct chunkID_set *my_bmap = chunkID_set_init(0);
50
  chunks = cb_get_chunks(cb, &num_chunks);
51

    
52
  for(i=0; i<num_chunks; i++) {
53
    chunkID_set_add_chunk(my_bmap, chunks[i].id);
54
  }
55

    
56
  sendMyBufferMap(to->id, my_bmap, chunkID_set_size(my_bmap), 0);
57
}
58

    
59
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
60
{
61
  int res;
62
  static struct chunk c;
63
  struct peer *p;
64

    
65
  res = decodeChunk(&c, buff + 1, len - 1);
66
  if (res > 0) {
67
    output_deliver(&c);
68
    res = cb_add_chunk(cb, &c);
69
    if (res < 0) {
70
      free(c.data);
71
      free(c.attributes);
72
    }
73
    p = peerset_get_peer(pset,from);
74
    if (!p) {
75
      printf("warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
76
      peerset_add_peer(pset,from);
77
      p = peerset_get_peer(pset,from);
78
    }
79
    if (p) {        //now we have it almost sure
80
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
81
    }
82
  }
83
}
84

    
85
void generated_chunk(void)
86
{
87
  int res;
88
  struct chunk c;
89

    
90
  if (input_get(input, &c) <= 0) {
91
    fprintf(stderr, "Error in input!\n");
92
    exit(-1);
93
  }
94
  res = cb_add_chunk(cb, &c);
95
  if (res < 0) {
96
    free(c.data);
97
    free(c.attributes);
98
  }
99
}
100

    
101
/**
102
 *example function to filter chunks based on whether a given peer needs them. The real implementation
103
 * should look at buffermap information received about the given peer (or it should guess)
104
 */
105
int needs(struct peer *p, struct chunk *c){
106
  return (! p->bmap || chunkID_set_check(p->bmap,c->id) < 0);
107
}
108
double randomPeer(struct peer **p){
109
  return 1;
110
}
111
double getChunkTimestamp(struct chunk **c){
112
  return (double) (*c)->timestamp;
113
}
114

    
115

    
116
void send_chunk(const struct peerset *pset)
117
{
118
  struct chunk *buff;
119
  int size, res, i, n;
120
  const struct peer *neighbours;
121

    
122
  n = peerset_size(pset);
123
  neighbours = peerset_get_peers(pset);
124
  dprintf("Send Chunk: %d neighbours\n", n);
125
  if (n == 0) return;
126
  buff = cb_get_chunks(cb, &size);
127
  dprintf("\t %d chunks in buffer...\n", size);
128
  if (size == 0) return;
129

    
130
  /************ STUPID DUMB SCHEDULING ****************/
131
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
132
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
133
  /************ /STUPID DUMB SCHEDULING ****************/
134

    
135
  /************ USE SCHEDULER ****************/
136
  size_t selectedpairs_len = 1;
137
  struct chunk *chunkps[size];
138
  for (i = 0;i < size; i++) chunkps[i] = buff+i;
139
  struct peer *peerps[n];
140
  for (i = 0; i<n; i++) peerps[i] = neighbours+i;
141
  struct PeerChunk selectedpairs[1];
142
  schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
143
  /************ /USE SCHEDULER ****************/
144

    
145
  for (i=0; i<selectedpairs_len ; i++){
146
    struct peer *p = selectedpairs[i].peer;
147
    struct chunk *c = selectedpairs[i].chunk;
148
    dprintf("\t sending chunk[%d] to ", c->id);
149
    dprintf("%s\n", node_addr(p->id));
150

    
151
    res = sendChunk(p->id, c);
152
    dprintf("Result: %d\n", res);
153
    if (res>=0) {
154
      chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
155
    }
156
    send_bmap(p);
157
  }
158
}