Statistics
| Branch: | Revision:

streamers / streaming.c @ 4bb789ed

History | View | Annotate | Download (1.8 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

    
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14
#include "dbg.h"
15

    
16
static struct chunk_buffer *cb;
17
static struct input_desc *input;
18

    
19
void stream_init(int size, struct nodeID *myID)
20
{
21
  char conf[32];
22

    
23
  sprintf(conf, "size=%d", size);
24
  cb = cb_init(conf);
25
  chunkInit(myID);
26
}
27

    
28
int source_init(const char *fname, struct nodeID *myID)
29
{
30
  input = input_open(fname);
31
  if (input == NULL) {
32
    return -1;
33
  }
34

    
35
  stream_init(1, myID);
36
  return 0;
37
}
38

    
39
void received_chunk(const uint8_t *buff, int len)
40
{
41
  int res;
42
  static struct chunk c;
43

    
44
  res = decodeChunk(&c, buff + 1, len - 1);
45
  if (res > 0) {
46
    output_deliver(&c);
47
    res = cb_add_chunk(cb, &c);
48
    if (res < 0) {
49
      free(c.data);
50
      free(c.attributes);
51
    }
52
  }
53
}
54

    
55
void generated_chunk(void)
56
{
57
  int res;
58
  struct chunk c;
59

    
60
  if (input_get(input, &c) <= 0) {
61
    fprintf(stderr, "Error in input!\n");
62
    exit(-1);
63
  }
64
  res = cb_add_chunk(cb, &c);
65
  if (res < 0) {
66
    free(c.data);
67
    free(c.attributes);
68
  }
69
}
70

    
71
void send_chunk(const struct nodeID **neighbours, int n)
72
{
73
  struct chunk *buff;
74
  int target, c, size;
75

    
76
  dprintf("Send Chunk: %d neighbours\n", n);
77
  if (n == 0) return;
78
  buff = cb_get_chunks(cb, &size);
79
  dprintf("\t %d chunks in buffer...\n", size);
80
  if (size == 0) return;
81

    
82
  /************ STUPID DUMB SCHEDULING ****************/
83
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
84
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
85
  /************ /STUPID DUMB SCHEDULING ****************/
86
  dprintf("\t sending chunk[%d] (%d) to ", buff[c].id, c);
87
  dprintf("%s\n", node_addr(neighbours[target]));
88

    
89
  sendChunk(neighbours[target], buff + c);
90
}