Statistics
| Branch: | Revision:

streamers / streaming.c @ c851d69c

History | View | Annotate | Download (1.46 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

    
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14

    
15
static struct chunk_buffer *cb;
16

    
17
void stream_init(int size)
18
{
19
  char conf[32];
20

    
21
  sprintf(conf, "size=%d", size);
22
  cb = cb_init(conf);
23
}
24

    
25
void received_chunk(const uint8_t *buff, int len)
26
{
27
  int res;
28
  struct chunk c;
29

    
30
  res = decodeChunk(&c, buff + 1, len - 1);
31
  if (res > 0) {
32
    output_deliver(&c);
33
    res = cb_add_chunk(cb, &c);
34
    if (res < 0) {
35
      free(c.data);
36
      free(c.attributes);
37
    }
38
  }
39
}
40

    
41
void generated_chunk(void)
42
{
43
  int res;
44
  struct chunk c;
45

    
46
  input_get(&c);
47
  if (res > 0) {
48
    res = cb_add_chunk(cb, &c);
49
    if (res < 0) {
50
      free(c.data);
51
      free(c.attributes);
52
    }
53
  }
54
}
55

    
56
void send_chunk(const struct nodeID **neighbours, int n)
57
{
58
  struct chunk *buff;
59
  int target, c, size;
60

    
61
  fprintf(stderr, "Send Chunk: %d neighbours\n", n);
62
  if (n == 0) return;
63
  buff = cb_get_chunks(cb, &size);
64
  fprintf(stderr, "\t %d chunks in buffer...\n", size);
65
  if (size == 0) return;
66

    
67
  /************ STUPID DUMB SCHEDULING ****************/
68
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
69
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
70
  /************ /STUPID DUMB SCHEDULING ****************/
71
  fprintf(stderr, "\t sending chunk[%d]\n", buff[c].id);
72

    
73
  sendChunk(neighbours[target], buff + c);
74
}