Statistics
| Branch: | Revision:

streamers / streaming.c @ 6920fdab

History | View | Annotate | Download (1.49 KB)

1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

    
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

    
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14
#include "dbg.h"
15

    
16
static struct chunk_buffer *cb;
17

    
18
void stream_init(int size, struct nodeID *myID)
19
{
20
  char conf[32];
21

    
22
  sprintf(conf, "size=%d", size);
23
  cb = cb_init(conf);
24
  chunkInit(myID);
25
}
26

    
27
void received_chunk(const uint8_t *buff, int len)
28
{
29
  int res;
30
  struct chunk c;
31

    
32
  res = decodeChunk(&c, buff + 1, len - 1);
33
  if (res > 0) {
34
    output_deliver(&c);
35
    res = cb_add_chunk(cb, &c);
36
    if (res < 0) {
37
      free(c.data);
38
      free(c.attributes);
39
    }
40
  }
41
}
42

    
43
void generated_chunk(void)
44
{
45
  int res;
46
  struct chunk c;
47

    
48
  input_get(&c);
49
  if (res > 0) {
50
    res = cb_add_chunk(cb, &c);
51
    if (res < 0) {
52
      free(c.data);
53
      free(c.attributes);
54
    }
55
  }
56
}
57

    
58
void send_chunk(const struct nodeID **neighbours, int n)
59
{
60
  struct chunk *buff;
61
  int target, c, size;
62

    
63
  dprintf("Send Chunk: %d neighbours\n", n);
64
  if (n == 0) return;
65
  buff = cb_get_chunks(cb, &size);
66
  dprintf("\t %d chunks in buffer...\n", size);
67
  if (size == 0) return;
68

    
69
  /************ STUPID DUMB SCHEDULING ****************/
70
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
71
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
72
  /************ /STUPID DUMB SCHEDULING ****************/
73
  dprintf("\t sending chunk[%d]\n", buff[c].id);
74

    
75
  sendChunk(neighbours[target], buff + c);
76
}