Statistics
| Branch: | Revision:

streamers / streaming.c @ 46c24e94

History | View | Annotate | Download (2.1 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <stdint.h>
10
#include <stdbool.h>
11

    
12
#include <net_helper.h>
13
#include <chunk.h> 
14
#include <chunkbuffer.h> 
15
#include <trade_msg_la.h>
16
#include <trade_msg_ha.h>
17

    
18
#include "streaming.h"
19
#include "output.h"
20
#include "input.h"
21
#include "dbg.h"
22

    
23
static struct chunk_buffer *cb;
24
static struct input_desc *input;
25

    
26
void stream_init(int size, struct nodeID *myID)
27
{
28
  char conf[32];
29

    
30
  sprintf(conf, "size=%d", size);
31
  cb = cb_init(conf);
32
  chunkInit(myID);
33
}
34

    
35
int source_init(const char *fname, struct nodeID *myID, bool loop)
36
{
37
  input = input_open(fname, loop ? INPUT_LOOP : 0);
38
  if (input == NULL) {
39
    return -1;
40
  }
41

    
42
  stream_init(1, myID);
43
  return 0;
44
}
45

    
46
void received_chunk(const uint8_t *buff, int len)
47
{
48
  int res;
49
  static struct chunk c;
50

    
51
  res = decodeChunk(&c, buff + 1, len - 1);
52
  if (res > 0) {
53
    output_deliver(&c);
54
    res = cb_add_chunk(cb, &c);
55
    if (res < 0) {
56
      free(c.data);
57
      free(c.attributes);
58
    }
59
  }
60
}
61

    
62
int generated_chunk(suseconds_t *delta)
63
{
64
  int res;
65
  struct chunk c;
66

    
67
  *delta = input_get(input, &c);
68
  if (*delta < 0) {
69
    fprintf(stderr, "Error in input!\n");
70
    exit(-1);
71
  }
72
  if (c.data == NULL) {
73
    return 0;
74
  }
75
  res = cb_add_chunk(cb, &c);
76
  if (res < 0) {
77
    free(c.data);
78
    free(c.attributes);
79
  }
80

    
81
  return 1;
82
}
83

    
84
void send_chunk(const struct nodeID **neighbours, int n)
85
{
86
  struct chunk *buff;
87
  int target, c, size, res;
88

    
89
  dprintf("Send Chunk: %d neighbours\n", n);
90
  if (n == 0) return;
91
  buff = cb_get_chunks(cb, &size);
92
  dprintf("\t %d chunks in buffer...\n", size);
93
  if (size == 0) return;
94

    
95
  /************ STUPID DUMB SCHEDULING ****************/
96
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
97
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
98
  /************ /STUPID DUMB SCHEDULING ****************/
99
  dprintf("\t sending chunk[%d] (%d) to ", buff[c].id, c);
100
  dprintf("%s\n", node_addr(neighbours[target]));
101

    
102
  res = sendChunk(neighbours[target], buff + c);
103
  dprintf("Result: %d\n", res);
104
}