Revision 690d5bac

View differences:

out-stream.h
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
int out_stream_init(const char *config);
7
void chunk_write(int id, const uint8_t *data, int size);
8

  
output.c
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

  
13
#include <chunk.h>
14

  
15
#include "output.h"
16
#include "measures.h"
17
#include "out-stream.h"
18
#include "dbg.h"
19

  
20
#ifdef CRAP
21
#define chunk_write(a,b,c)
22
#endif
23

  
24
static int next_chunk = -1;
25
static int buff_size;
26
extern bool chunk_log;
27

  
28
struct outbuf {
29
  void *data;
30
  int size;
31
  int id;
32
  uint64_t timestamp;
33
};
34
static struct outbuf *buff;
35

  
36
void output_init(int bufsize, const char *config)
37
{
38
#ifdef CRAP
39
  output_init1(bufsize, config);
40
#else
41
  if (out_stream_init(config) < 0) {
42
     fprintf(stderr, "Error: can't initialize output module\n");
43
     exit(1);
44
  }
45
#endif
46
  if (!buff) {
47
    int i;
48

  
49
    buff_size = bufsize;
50
    buff = malloc(sizeof(struct outbuf) * buff_size);
51
    if (!buff) {
52
     fprintf(stderr, "Error: can't allocate output buffer\n");
53
     exit(1);
54
    }
55
    for (i = 0; i < buff_size; i++) {
56
      buff[i].data = NULL;
57
    }
58
  } else {
59
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
60
   exit(1);
61
  }
62
}
63

  
64
void buffer_print()
65
{
66
#ifdef DEBUG
67
  int i;
68

  
69
  if (next_chunk < 0) {
70
    return;
71
  }
72

  
73
  dprintf("\toutbuf: %d-> ",next_chunk);
74
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
75
    if (buff[i % buff_size].data) {
76
      dprintf("%d",i % 10);
77
    } else {
78
      dprintf(".");
79
    }
80
  }
81
  dprintf("\n");
82
#endif
83
}
84

  
85
void buffer_free(int i)
86
{
87
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
88
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
89
  free(buff[i].data);
90
  buff[i].data = NULL;
91
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
92
  reg_chunk_playout(buff[i].id, true, buff[i].timestamp);
93
  next_chunk = buff[i].id + 1;
94
}
95

  
96
void buffer_flush(int id)
97
{
98
  int i = id % buff_size;
99

  
100
  while(buff[i].data) {
101
    buffer_free(i);
102
    i = (i + 1) % buff_size;
103
    if (i == id % buff_size) {
104
      break;
105
    }
106
  }
107
}
108

  
109
void output_deliver(const struct chunk *c)
110
{
111
  if (!buff) {
112
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
113
    output_init(8, NULL);
114
  }
115
#ifdef CRAP
116
  output_deliver1(c);
117
#endif
118

  
119
  dprintf("Chunk %d delivered\n", c->id);
120
  buffer_print();
121
  if (c->id < next_chunk) {
122
    return;
123
  }
124

  
125
  /* Initialize buffer with first chunk */
126
  if (next_chunk == -1) {
127
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
128
  }
129

  
130
  if (c->id >= next_chunk + buff_size) {
131
    int i;
132

  
133
    /* We might need some space for storing this chunk,
134
     * or the stored chunks are too old
135
     */
136
    for (i = next_chunk; i <= c->id - buff_size; i++) {
137
      if (buff[i % buff_size].data) {
138
        buffer_free(i % buff_size);
139
      } else {
140
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
141
        next_chunk++;
142
      }
143
    }
144
    buffer_flush(next_chunk);
145
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
146
  }
147

  
148
  dprintf("%d == %d?\n", c->id, next_chunk);
149
  if (c->id == next_chunk) {
150
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
151
    chunk_write(c->id, c->data, c->size);
152
    reg_chunk_playout(c->id, true, c->timestamp);
153
    next_chunk++;
154
    buffer_flush(next_chunk);
155
  } else {
156
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
157
    if (buff[c->id % buff_size].data) {
158
      if (buff[c->id % buff_size].id == c->id) {
159
        /* Duplicate of a stored chunk */
160
        if(chunk_log){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);}
161
        dprintf("\tDuplicate!\n");
162
        reg_chunk_duplicate();
163
        return;
164
      }
165
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].id);
166
      exit(-1);
167
    }
168
    /* We previously flushed, so we know that c->id is free */
169
    buff[c->id % buff_size].data = malloc(c->size);
170
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
171
    buff[c->id % buff_size].size = c->size;
172
    buff[c->id % buff_size].id = c->id;
173
    buff[c->id % buff_size].timestamp = c->timestamp;
174
  }
175
}

Also available in: Unified diff