Statistics
| Branch: | Revision:

streamers / output-grapes.c @ a2fa286f

History | View | Annotate | Download (5.36 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14
#include <chunkiser.h>
15

    
16
#include "output.h"
17
#include "measures.h"
18
#include "dbg.h"
19

    
20
static int last_chunk = -1;
21
static int next_chunk = -1;
22
static int buff_size;
23
extern bool chunk_log;
24
extern int start_id;
25
extern int end_id;
26

    
27
static char sflag = 0;
28
static char eflag = 0;
29

    
30
struct outbuf {
31
  struct chunk c;
32
};
33
static struct outbuf *buff;
34
static struct output_stream *out;
35

    
36
void output_init(int bufsize, const char *config)
37
{
38
  char cfg[256];
39

    
40
  if (config && (strlen(config) > 4) && (memcmp(config, "udp:", 4) == 0)) {
41
    config += 4;
42
    sprintf(cfg, "dechunkiser=udp");
43
    sprintf(cfg + strlen(cfg), ",%s", config);
44
  } else if (config && (strlen(config) >= 6) && (memcmp(config, "dummy:", 6) == 0)) {
45
    config += 6;
46
    sprintf(cfg, "dechunkiser=dummy,type=stats");
47
    sprintf(cfg + strlen(cfg), ",%s", config);
48
  } else {
49
    sprintf(cfg, "dechunkiser=avf,media=av");
50
  }
51
  out = out_stream_init(config, cfg);
52
  if (out == NULL) {
53
     fprintf(stderr, "Error: can't initialize output module\n");
54
     exit(1);
55
  }
56
  if (!buff) {
57
    int i;
58

    
59
    buff_size = bufsize;
60
    buff = malloc(sizeof(struct outbuf) * buff_size);
61
    if (!buff) {
62
     fprintf(stderr, "Error: can't allocate output buffer\n");
63
     exit(1);
64
    }
65
    for (i = 0; i < buff_size; i++) {
66
      buff[i].c.data = NULL;
67
    }
68
  } else {
69
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
70
   exit(1);
71
  }
72
}
73

    
74
static void buffer_print(void)
75
{
76
#ifdef DEBUG
77
  int i;
78

    
79
  if (next_chunk < 0) {
80
    return;
81
  }
82

    
83
  dprintf("\toutbuf: %d-> ",next_chunk);
84
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
85
    if (buff[i % buff_size].c.data) {
86
      dprintf("%d",i % 10);
87
    } else {
88
      dprintf(".");
89
    }
90
  }
91
  dprintf("\n");
92
#endif
93
}
94

    
95
static void buffer_free(int i)
96
{
97
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].c.data);
98
  if(start_id == -1 || buff[i].c.id >= start_id) {
99
    if(end_id == -1 || buff[i].c.id <= end_id) {
100
      if(sflag == 0) {
101
        fprintf(stderr, "\nFirst chunk id played out: %d\n\n",buff[i].c.id);
102
        sflag = 1;
103
      }
104
      chunk_write(out, &buff[i].c);
105
      last_chunk = buff[i].c.id;
106
    } else if (eflag == 0 && last_chunk != -1) {
107
      fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
108
      eflag = 1;
109
    }
110
  }
111

    
112
  free(buff[i].c.data);
113
  buff[i].c.data = NULL;
114
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].c.id + 1);
115
  reg_chunk_playout(buff[i].c.id, true, buff[i].c.timestamp);
116
  next_chunk = buff[i].c.id + 1;
117
}
118

    
119
static void buffer_flush(int id)
120
{
121
  int i = id % buff_size;
122

    
123
  while(buff[i].c.data) {
124
    buffer_free(i);
125
    i = (i + 1) % buff_size;
126
    if (i == id % buff_size) {
127
      break;
128
    }
129
  }
130
}
131

    
132
void output_deliver(const struct chunk *c)
133
{
134
  if (!buff) {
135
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
136
    output_init(8, NULL);
137
  }
138

    
139
  dprintf("Chunk %d delivered\n", c->id);
140
  buffer_print();
141
  if (c->id < next_chunk) {
142
    return;
143
  }
144

    
145
  /* Initialize buffer with first chunk */
146
  if (next_chunk == -1) {
147
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
148
    fprintf(stderr,"First RX Chunk ID: %d\n", c->id);
149
  }
150

    
151
  if (c->id >= next_chunk + buff_size) {
152
    int i;
153

    
154
    /* We might need some space for storing this chunk,
155
     * or the stored chunks are too old
156
     */
157
    for (i = next_chunk; i <= c->id - buff_size; i++) {
158
      if (buff[i % buff_size].c.data) {
159
        buffer_free(i % buff_size);
160
      } else {
161
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
162
        next_chunk++;
163
      }
164
    }
165
    buffer_flush(next_chunk);
166
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
167
  }
168

    
169
  dprintf("%d == %d?\n", c->id, next_chunk);
170
  if (c->id == next_chunk) {
171
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
172

    
173
    if(start_id == -1 || c->id >= start_id) {
174
      if(end_id == -1 || c->id <= end_id) {
175
        if(sflag == 0) {
176
          fprintf(stderr, "\nFirst chunk id played out: %d\n\n",c->id);
177
          sflag = 1;
178
        }
179
        chunk_write(out, c);
180
        last_chunk = c->id;
181
      } else if (eflag == 0 && last_chunk != -1) {
182
        fprintf(stderr, "\nLast chunk id played out: %d\n\n", last_chunk);
183
        eflag = 1;
184
      }
185
    }
186
    reg_chunk_playout(c->id, true, c->timestamp);
187
    next_chunk++;
188
    buffer_flush(next_chunk);
189
  } else {
190
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
191
    if (buff[c->id % buff_size].c.data) {
192
      if (buff[c->id % buff_size].c.id == c->id) {
193
        /* Duplicate of a stored chunk */
194
        if(chunk_log){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);}
195
        dprintf("\tDuplicate!\n");
196
        reg_chunk_duplicate();
197
        return;
198
      }
199
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].c.id);
200
      exit(-1);
201
    }
202
    /* We previously flushed, so we know that c->id is free */
203
    memcpy(&buff[c->id % buff_size].c, c, sizeof(struct chunk));
204
    buff[c->id % buff_size].c.data = malloc(c->size);
205
    memcpy(buff[c->id % buff_size].c.data, c->data, c->size);
206
  }
207
}