Statistics
| Branch: | Revision:

streamers / streaming.c @ 4367dafd

History | View | Annotate | Download (2.77 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

    
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14
#include "dbg.h"
15

    
16
#include "scheduler_la.h"
17

    
18
static struct chunk_buffer *cb;
19
static struct input_desc *input;
20

    
21
void stream_init(int size, struct nodeID *myID)
22
{
23
  char conf[32];
24

    
25
  sprintf(conf, "size=%d", size);
26
  cb = cb_init(conf);
27
  chunkInit(myID);
28
}
29

    
30
int source_init(const char *fname, struct nodeID *myID)
31
{
32
  input = input_open(fname);
33
  if (input == NULL) {
34
    return -1;
35
  }
36

    
37
  stream_init(1, myID);
38
  return 0;
39
}
40

    
41
void received_chunk(const uint8_t *buff, int len)
42
{
43
  int res;
44
  static struct chunk c;
45

    
46
  res = decodeChunk(&c, buff + 1, len - 1);
47
  if (res > 0) {
48
    output_deliver(&c);
49
    res = cb_add_chunk(cb, &c);
50
    if (res < 0) {
51
      free(c.data);
52
      free(c.attributes);
53
    }
54
  }
55
}
56

    
57
void generated_chunk(void)
58
{
59
  int res;
60
  struct chunk c;
61

    
62
  if (input_get(input, &c) <= 0) {
63
    fprintf(stderr, "Error in input!\n");
64
    exit(-1);
65
  }
66
  res = cb_add_chunk(cb, &c);
67
  if (res < 0) {
68
    free(c.data);
69
    free(c.attributes);
70
  }
71
}
72

    
73
/**
74
 *example function to filter chunks based on whether a given peer needs them. The real implementation
75
 * should look at buffermap information received about the given peer (or it should guess)
76
 */
77
int needs(const struct peer *p, const struct chunk *c){
78
  return 1;        //TODO: avoid at least sending to the source :)
79
}
80
double randomPeer(const struct peer **p){
81
  return 1;
82
}
83
double getChunkTimestamp(const struct chunk **c){
84
  return (double) (*c)->timestamp;
85
}
86

    
87

    
88
void send_chunk(const struct nodeID **neighbours, int n)
89
{
90
  struct chunk *buff;
91
  int target, c, size, res, i;
92

    
93
  dprintf("Send Chunk: %d neighbours\n", n);
94
  if (n == 0) return;
95
  buff = cb_get_chunks(cb, &size);
96
  dprintf("\t %d chunks in buffer...\n", size);
97
  if (size == 0) return;
98

    
99
  /************ STUPID DUMB SCHEDULING ****************/
100
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
101
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
102
  /************ /STUPID DUMB SCHEDULING ****************/
103

    
104
  /************ USE SCHEDULER ****************/
105
  size_t selectedpairs_len = 1;
106
  struct chunk *pbuff[size];
107
  for (i=0;i<size;i++) pbuff[i]=buff+i;
108
  struct PeerChunk selectedpairs[1];
109
  schedSelectPeerFirst(SCHED_BEST, neighbours, n, pbuff, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
110
  /************ /USE SCHEDULER ****************/
111

    
112
  for (i=0; i<selectedpairs_len ; i++){
113
    dprintf("\t sending chunk[%d] to ", selectedpairs[i].chunk->id);
114
    dprintf("%s\n", node_addr(selectedpairs[i].peer));
115

    
116
    res = sendChunk(selectedpairs[i].peer, selectedpairs[i].chunk);
117
    dprintf("Result: %d\n", res);
118
  }
119
}