Statistics
| Branch: | Revision:

streamers / output-grapes.c @ 03de31e0

History | View | Annotate | Download (4.98 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14
#include <chunkiser.h>
15

    
16
#include "output.h"
17
#include "measures.h"
18
#include "dbg.h"
19

    
20
static int last_chunk = -1;
21
static int next_chunk = -1;
22
static int buff_size;
23
extern bool chunk_log;
24
extern int start_id;
25
extern int end_id;
26

    
27
static char sflag = 0;
28
static char eflag = 0;
29

    
30
struct outbuf {
31
  struct chunk c;
32
};
33
static struct outbuf *buff;
34
static struct output_stream *out;
35

    
36
void output_init(int bufsize, const char *config)
37
{
38
  char *c;
39

    
40
  c = strchr(config,',');
41
  if (c) {
42
    *(c++) = 0;
43
  }
44
  out = out_stream_init(config, c);
45
  if (out == NULL) {
46
     fprintf(stderr, "Error: can't initialize output module\n");
47
     exit(1);
48
  }
49
  if (!buff) {
50
    int i;
51

    
52
    buff_size = bufsize;
53
    buff = malloc(sizeof(struct outbuf) * buff_size);
54
    if (!buff) {
55
     fprintf(stderr, "Error: can't allocate output buffer\n");
56
     exit(1);
57
    }
58
    for (i = 0; i < buff_size; i++) {
59
      buff[i].c.data = NULL;
60
    }
61
  } else {
62
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
63
   exit(1);
64
  }
65
}
66

    
67
static void buffer_print(void)
68
{
69
#ifdef DEBUG
70
  int i;
71

    
72
  if (next_chunk < 0) {
73
    return;
74
  }
75

    
76
  dprintf("\toutbuf: %d-> ",next_chunk);
77
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
78
    if (buff[i % buff_size].c.data) {
79
      dprintf("%d",i % 10);
80
    } else {
81
      dprintf(".");
82
    }
83
  }
84
  dprintf("\n");
85
#endif
86
}
87

    
88
static void buffer_free(int i)
89
{
90
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].c.data);
91
  if(start_id == -1 || buff[i].c.id >= start_id) {
92
    if(end_id == -1 || buff[i].c.id <= end_id) {
93
      if(sflag == 0) {
94
        fprintf(stderr, "\nFirst chunk id played out: %d\n\n",buff[i].c.id);
95
        sflag = 1;
96
      }
97
      chunk_write(out, &buff[i].c);
98
      last_chunk = buff[i].c.id;
99
    } else if (eflag == 0 && last_chunk != -1) {
100
      fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
101
      eflag = 1;
102
    }
103
  }
104

    
105
  free(buff[i].c.data);
106
  buff[i].c.data = NULL;
107
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].c.id + 1);
108
  reg_chunk_playout(buff[i].c.id, true, buff[i].c.timestamp);
109
  next_chunk = buff[i].c.id + 1;
110
}
111

    
112
static void buffer_flush(int id)
113
{
114
  int i = id % buff_size;
115

    
116
  while(buff[i].c.data) {
117
    buffer_free(i);
118
    i = (i + 1) % buff_size;
119
    if (i == id % buff_size) {
120
      break;
121
    }
122
  }
123
}
124

    
125
void output_deliver(const struct chunk *c)
126
{
127
  if (!buff) {
128
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
129
    output_init(8, NULL);
130
  }
131

    
132
  dprintf("Chunk %d delivered\n", c->id);
133
  buffer_print();
134
  if (c->id < next_chunk) {
135
    return;
136
  }
137

    
138
  /* Initialize buffer with first chunk */
139
  if (next_chunk == -1) {
140
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
141
    fprintf(stderr,"First RX Chunk ID: %d\n", c->id);
142
  }
143

    
144
  if (c->id >= next_chunk + buff_size) {
145
    int i;
146

    
147
    /* We might need some space for storing this chunk,
148
     * or the stored chunks are too old
149
     */
150
    for (i = next_chunk; i <= c->id - buff_size; i++) {
151
      if (buff[i % buff_size].c.data) {
152
        buffer_free(i % buff_size);
153
      } else {
154
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
155
        next_chunk++;
156
      }
157
    }
158
    buffer_flush(next_chunk);
159
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
160
  }
161

    
162
  dprintf("%d == %d?\n", c->id, next_chunk);
163
  if (c->id == next_chunk) {
164
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
165

    
166
    if(start_id == -1 || c->id >= start_id) {
167
      if(end_id == -1 || c->id <= end_id) {
168
        if(sflag == 0) {
169
          fprintf(stderr, "\nFirst chunk id played out: %d\n\n",c->id);
170
          sflag = 1;
171
        }
172
        chunk_write(out, c);
173
        last_chunk = c->id;
174
      } else if (eflag == 0 && last_chunk != -1) {
175
        fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
176
        eflag = 1;
177
      }
178
    }
179
    reg_chunk_playout(c->id, true, c->timestamp);
180
    next_chunk++;
181
    buffer_flush(next_chunk);
182
  } else {
183
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
184
    if (buff[c->id % buff_size].c.data) {
185
      if (buff[c->id % buff_size].c.id == c->id) {
186
        /* Duplicate of a stored chunk */
187
        if(chunk_log){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);}
188
        dprintf("\tDuplicate!\n");
189
        reg_chunk_duplicate();
190
        return;
191
      }
192
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].c.id);
193
      exit(-1);
194
    }
195
    /* We previously flushed, so we know that c->id is free */
196
    memcpy(&buff[c->id % buff_size].c, c, sizeof(struct chunk));
197
    buff[c->id % buff_size].c.data = malloc(c->size);
198
    memcpy(buff[c->id % buff_size].c.data, c->data, c->size);
199
  }
200
}