Statistics
| Branch: | Revision:

streamers / streaming.c @ 5bb51b2b

History | View | Annotate | Download (3.53 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10
#include <peerset.h>
11
#include <peer.h>
12
#include <chunkidset.h>
13

    
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17
#include "dbg.h"
18

    
19
#include "scheduler_la.h"
20

    
21
static struct chunk_buffer *cb;
22
static struct input_desc *input;
23

    
24
void stream_init(int size, struct nodeID *myID)
25
{
26
  char conf[32];
27

    
28
  sprintf(conf, "size=%d", size);
29
  cb = cb_init(conf);
30
  chunkInit(myID);
31
}
32

    
33
int source_init(const char *fname, struct nodeID *myID)
34
{
35
  input = input_open(fname);
36
  if (input == NULL) {
37
    return -1;
38
  }
39

    
40
  stream_init(1, myID);
41
  return 0;
42
}
43

    
44
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
45
{
46
  int res;
47
  static struct chunk c;
48
  struct peer *p;
49

    
50
  res = decodeChunk(&c, buff + 1, len - 1);
51
  if (res > 0) {
52
    output_deliver(&c);
53
    res = cb_add_chunk(cb, &c);
54
    if (res < 0) {
55
      free(c.data);
56
      free(c.attributes);
57
    }
58
    p = peerset_get_peer(pset,from);
59
    if (!p) {
60
      printf("warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
61
      peerset_add_peer(pset,from);
62
      p = peerset_get_peer(pset,from);
63
    }
64
    if (p) {        //now we have it almost sure
65
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
66
    }
67
  }
68
}
69

    
70
void generated_chunk(void)
71
{
72
  int res;
73
  struct chunk c;
74

    
75
  if (input_get(input, &c) <= 0) {
76
    fprintf(stderr, "Error in input!\n");
77
    exit(-1);
78
  }
79
  res = cb_add_chunk(cb, &c);
80
  if (res < 0) {
81
    free(c.data);
82
    free(c.attributes);
83
  }
84
}
85

    
86
/**
87
 *example function to filter chunks based on whether a given peer needs them. The real implementation
88
 * should look at buffermap information received about the given peer (or it should guess)
89
 */
90
int needs(struct peer *p, struct chunk *c){
91
  return (! p->bmap || chunkID_set_check(p->bmap,c->id) < 0);
92
}
93
double randomPeer(struct peer **p){
94
  return 1;
95
}
96
double getChunkTimestamp(struct chunk **c){
97
  return (double) (*c)->timestamp;
98
}
99

    
100

    
101
void send_chunk(const struct peerset *pset)
102
{
103
  struct chunk *buff;
104
  int size, res, i, n;
105
  const struct peer *neighbours;
106

    
107
  n = peerset_size(pset);
108
  neighbours = peerset_get_peers(pset);
109
  dprintf("Send Chunk: %d neighbours\n", n);
110
  if (n == 0) return;
111
  buff = cb_get_chunks(cb, &size);
112
  dprintf("\t %d chunks in buffer...\n", size);
113
  if (size == 0) return;
114

    
115
  /************ STUPID DUMB SCHEDULING ****************/
116
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
117
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
118
  /************ /STUPID DUMB SCHEDULING ****************/
119

    
120
  /************ USE SCHEDULER ****************/
121
  size_t selectedpairs_len = 1;
122
  struct chunk *chunkps[size];
123
  for (i = 0;i < size; i++) chunkps[i] = buff+i;
124
  struct peer *peerps[n];
125
  for (i = 0; i<n; i++) peerps[i] = neighbours+i;
126
  struct PeerChunk selectedpairs[1];
127
  schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
128
  /************ /USE SCHEDULER ****************/
129

    
130
  for (i=0; i<selectedpairs_len ; i++){
131
    struct peer *p = selectedpairs[i].peer;
132
    struct chunk *c = selectedpairs[i].chunk;
133
    dprintf("\t sending chunk[%d] to ", c->id);
134
    dprintf("%s\n", node_addr(p->id));
135

    
136
    res = sendChunk(p->id, c);
137
    dprintf("Result: %d\n", res);
138
    if (res>=0) {
139
      chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
140
    }
141
  }
142
}