Statistics
| Branch: | Revision:

streamers / output.c @ 19d6b4ca

History | View | Annotate | Download (3.65 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14

    
15
#include "measures.h"
16
#include "out-stream.h"
17
#include "dbg.h"
18

    
19
static int next_chunk = -1;
20
static int buff_size;
21

    
22
struct outbuf {
23
  void *data;
24
  int size;
25
  int id;
26
  uint64_t timestamp;
27
};
28
static struct outbuf *buff;
29

    
30
void output_init(int bufsize)
31
{
32
  if (!buff) {
33
    int i;
34

    
35
    buff_size = bufsize;
36
    buff = malloc(sizeof(struct outbuf) * buff_size);
37
    if (!buff) {
38
     fprintf(stderr, "Error: can't allocate output buffer\n");
39
     exit(1);
40
    }
41
    for (i = 0; i < buff_size; i++) {
42
      buff[i].data = NULL;
43
    }
44
  } else {
45
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
46
   exit(1);
47
  }
48
}
49

    
50
void buffer_print()
51
{
52
#ifdef DEBUG
53
  int i;
54

    
55
  if (next_chunk < 0) {
56
    return;
57
  }
58

    
59
  dprintf("\toutbuf: %d-> ",next_chunk);
60
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
61
    if (buff[i % buff_size].data) {
62
      dprintf("%d",i % 10);
63
    } else {
64
      dprintf(".");
65
    }
66
  }
67
  dprintf("\n");
68
#endif
69
}
70

    
71
void buffer_free(int i)
72
{
73
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
74
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
75
  free(buff[i].data);
76
  buff[i].data = NULL;
77
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
78
#ifdef MONL
79
  reg_chunk_playout(buff[i].id, true, buff[i].timestamp);
80
#endif
81
  next_chunk = buff[i].id + 1;
82
}
83

    
84
void buffer_flush(int id)
85
{
86
  int i = id % buff_size;
87

    
88
  while(buff[i].data) {
89
    buffer_free(i);
90
    i = (i + 1) % buff_size;
91
    if (i == id % buff_size) {
92
      break;
93
    }
94
  }
95
}
96

    
97
void output_deliver(const struct chunk *c)
98
{
99
  if (!buff) {
100
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
101
    output_init(8);
102
  }
103

    
104
  dprintf("Chunk %d delivered\n", c->id);
105
  buffer_print();
106
  if (c->id < next_chunk) {
107
    return;
108
  }
109

    
110
  /* Initialize buffer with first chunk */
111
  if (next_chunk == -1) {
112
    next_chunk = (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0;
113
  }
114

    
115
  if (c->id >= next_chunk + buff_size) {
116
    int i;
117

    
118
    /* We might need some space for storing this chunk,
119
     * or the stored chunks are too old
120
     */
121
    for (i = next_chunk; i <= c->id - buff_size; i++) {
122
      if (buff[i % buff_size].data) {
123
        buffer_free(i % buff_size);
124
      } else {
125
#ifdef MONL
126
        reg_chunk_playout(c->id, false, c->timestamp);
127
#endif
128
        next_chunk++;
129
      }
130
    }
131
    buffer_flush(next_chunk);
132
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
133
  }
134

    
135
  dprintf("%d == %d?\n", c->id, next_chunk);
136
  if (c->id == next_chunk) {
137
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
138
    chunk_write(c->id, c->data, c->size);
139
#ifdef MONL
140
    reg_chunk_playout(c->id, true, c->timestamp);
141
#endif
142
    next_chunk++;
143
    buffer_flush(next_chunk);
144
  } else {
145
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
146
    if (buff[c->id % buff_size].data) {
147
      if (buff[c->id % buff_size].id == c->id) {
148
        /* Duplicate of a stored chunk */
149
        dprintf("\tDuplicate!\n");
150
#ifdef MONL
151
        reg_chunk_duplicate();
152
#endif
153
        return;
154
      }
155
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].id);
156
      exit(-1);
157
    }
158
    /* We previously flushed, so we know that c->id is free */
159
    buff[c->id % buff_size].data = malloc(c->size);
160
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
161
    buff[c->id % buff_size].size = c->size;
162
    buff[c->id % buff_size].id = c->id;
163
    buff[c->id % buff_size].timestamp = c->timestamp;
164
  }
165
}