Statistics
| Branch: | Revision:

streamers / streaming.c @ e64fc7e5

History | View | Annotate | Download (1.45 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

    
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14
#include "dbg.h"
15

    
16
static struct chunk_buffer *cb;
17

    
18
void stream_init(int size)
19
{
20
  char conf[32];
21

    
22
  sprintf(conf, "size=%d", size);
23
  cb = cb_init(conf);
24
}
25

    
26
void received_chunk(const uint8_t *buff, int len)
27
{
28
  int res;
29
  struct chunk c;
30

    
31
  res = decodeChunk(&c, buff + 1, len - 1);
32
  if (res > 0) {
33
    output_deliver(&c);
34
    res = cb_add_chunk(cb, &c);
35
    if (res < 0) {
36
      free(c.data);
37
      free(c.attributes);
38
    }
39
  }
40
}
41

    
42
void generated_chunk(void)
43
{
44
  int res;
45
  struct chunk c;
46

    
47
  input_get(&c);
48
  if (res > 0) {
49
    res = cb_add_chunk(cb, &c);
50
    if (res < 0) {
51
      free(c.data);
52
      free(c.attributes);
53
    }
54
  }
55
}
56

    
57
void send_chunk(const struct nodeID **neighbours, int n)
58
{
59
  struct chunk *buff;
60
  int target, c, size;
61

    
62
  dprintf("Send Chunk: %d neighbours\n", n);
63
  if (n == 0) return;
64
  buff = cb_get_chunks(cb, &size);
65
  dprintf("\t %d chunks in buffer...\n", size);
66
  if (size == 0) return;
67

    
68
  /************ STUPID DUMB SCHEDULING ****************/
69
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
70
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
71
  /************ /STUPID DUMB SCHEDULING ****************/
72
  dprintf("\t sending chunk[%d]\n", buff[c].id);
73

    
74
  sendChunk(neighbours[target], buff + c);
75
}