Statistics
| Branch: | Revision:

streamers / output-grapes.c @ e816f3de

History | View | Annotate | Download (5.08 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14
#include <chunkiser.h>
15

    
16
#include "output.h"
17
#include "measures.h"
18
#include "dbg.h"
19

    
20
static int last_chunk = -1;
21
static int next_chunk = -1;
22
static int buff_size;
23
extern bool chunk_log;
24
extern int start_id;
25
extern int end_id;
26

    
27
static char sflag = 0;
28
static char eflag = 0;
29

    
30
bool reorder = OUTPUT_REORDER;
31

    
32
struct outbuf {
33
  struct chunk c;
34
};
35
static struct outbuf *buff;
36
static struct output_stream *out;
37

    
38
void output_init(int bufsize, const char *config)
39
{
40
  char *c;
41

    
42
  c = strchr(config,',');
43
  if (c) {
44
    *(c++) = 0;
45
  }
46
  out = out_stream_init(config, c);
47
  if (out == NULL) {
48
     fprintf(stderr, "Error: can't initialize output module\n");
49
     exit(1);
50
  }
51
  if (!buff) {
52
    int i;
53

    
54
    buff_size = bufsize;
55
    buff = malloc(sizeof(struct outbuf) * buff_size);
56
    if (!buff) {
57
     fprintf(stderr, "Error: can't allocate output buffer\n");
58
     exit(1);
59
    }
60
    for (i = 0; i < buff_size; i++) {
61
      buff[i].c.data = NULL;
62
    }
63
  } else {
64
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
65
   exit(1);
66
  }
67
}
68

    
69
static void buffer_print(void)
70
{
71
#ifdef DEBUG
72
  int i;
73

    
74
  if (next_chunk < 0) {
75
    return;
76
  }
77

    
78
  dprintf("\toutbuf: %d-> ",next_chunk);
79
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
80
    if (buff[i % buff_size].c.data) {
81
      dprintf("%d",i % 10);
82
    } else {
83
      dprintf(".");
84
    }
85
  }
86
  dprintf("\n");
87
#endif
88
}
89

    
90
static void buffer_free(int i)
91
{
92
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].c.data);
93
  if(start_id == -1 || buff[i].c.id >= start_id) {
94
    if(end_id == -1 || buff[i].c.id <= end_id) {
95
      if(sflag == 0) {
96
        fprintf(stderr, "\nFirst chunk id played out: %d\n\n",buff[i].c.id);
97
        sflag = 1;
98
      }
99
      if (reorder) chunk_write(out, &buff[i].c);
100
      last_chunk = buff[i].c.id;
101
    } else if (eflag == 0 && last_chunk != -1) {
102
      fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
103
      eflag = 1;
104
    }
105
  }
106

    
107
  free(buff[i].c.data);
108
  buff[i].c.data = NULL;
109
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].c.id + 1);
110
  reg_chunk_playout(buff[i].c.id, true, buff[i].c.timestamp);
111
  next_chunk = buff[i].c.id + 1;
112
}
113

    
114
static void buffer_flush(int id)
115
{
116
  int i = id % buff_size;
117

    
118
  while(buff[i].c.data) {
119
    buffer_free(i);
120
    i = (i + 1) % buff_size;
121
    if (i == id % buff_size) {
122
      break;
123
    }
124
  }
125
}
126

    
127
void output_deliver(const struct chunk *c)
128
{
129
  if (!buff) {
130
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
131
    output_init(8, NULL);
132
  }
133

    
134
  if (!reorder) chunk_write(out, c);
135

    
136
  dprintf("Chunk %d delivered\n", c->id);
137
  buffer_print();
138
  if (c->id < next_chunk) {
139
    return;
140
  }
141

    
142
  /* Initialize buffer with first chunk */
143
  if (next_chunk == -1) {
144
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
145
    fprintf(stderr,"First RX Chunk ID: %d\n", c->id);
146
  }
147

    
148
  if (c->id >= next_chunk + buff_size) {
149
    int i;
150

    
151
    /* We might need some space for storing this chunk,
152
     * or the stored chunks are too old
153
     */
154
    for (i = next_chunk; i <= c->id - buff_size; i++) {
155
      if (buff[i % buff_size].c.data) {
156
        buffer_free(i % buff_size);
157
      } else {
158
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
159
        next_chunk++;
160
      }
161
    }
162
    buffer_flush(next_chunk);
163
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
164
  }
165

    
166
  dprintf("%d == %d?\n", c->id, next_chunk);
167
  if (c->id == next_chunk) {
168
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
169

    
170
    if(start_id == -1 || c->id >= start_id) {
171
      if(end_id == -1 || c->id <= end_id) {
172
        if(sflag == 0) {
173
          fprintf(stderr, "\nFirst chunk id played out: %d\n\n",c->id);
174
          sflag = 1;
175
        }
176
        if (reorder) chunk_write(out, c);
177
        last_chunk = c->id;
178
      } else if (eflag == 0 && last_chunk != -1) {
179
        fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
180
        eflag = 1;
181
      }
182
    }
183
    reg_chunk_playout(c->id, true, c->timestamp);
184
    next_chunk++;
185
    buffer_flush(next_chunk);
186
  } else {
187
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
188
    if (buff[c->id % buff_size].c.data) {
189
      if (buff[c->id % buff_size].c.id == c->id) {
190
        /* Duplicate of a stored chunk */
191
        if(chunk_log){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);}
192
        dprintf("\tDuplicate!\n");
193
        reg_chunk_duplicate();
194
        return;
195
      }
196
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].c.id);
197
      exit(-1);
198
    }
199
    /* We previously flushed, so we know that c->id is free */
200
    memcpy(&buff[c->id % buff_size].c, c, sizeof(struct chunk));
201
    buff[c->id % buff_size].c.data = malloc(c->size);
202
    memcpy(buff[c->id % buff_size].c.data, c->data, c->size);
203
  }
204
}