Statistics
| Branch: | Revision:

streamers / output-grapes.c @ f14985ba

History | View | Annotate | Download (4.2 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14
#include <chunkiser.h>
15

    
16
#include "output.h"
17
#include "measures.h"
18
#include "dbg.h"
19

    
20
static int next_chunk = -1;
21
static int buff_size;
22
extern bool chunk_log;
23

    
24
struct outbuf {
25
  struct chunk c;
26
};
27
static struct outbuf *buff;
28
static struct output_stream *out;
29

    
30
void output_init(int bufsize, const char *config)
31
{
32
  char cfg[256];
33

    
34
  if (config && (strlen(config) > 4) && (memcmp(config, "udp:", 4) == 0)) {
35
    config += 4;
36
    sprintf(cfg, "dechunkiser=udp");
37
    sprintf(cfg + strlen(cfg), ",%s", config);
38
  } else {
39
    sprintf(cfg, "dechunkiser=avf,media=av");
40
  }
41
  out = out_stream_init(config, cfg);
42
  if (out == NULL) {
43
     fprintf(stderr, "Error: can't initialize output module\n");
44
     exit(1);
45
  }
46
  if (!buff) {
47
    int i;
48

    
49
    buff_size = bufsize;
50
    buff = malloc(sizeof(struct outbuf) * buff_size);
51
    if (!buff) {
52
     fprintf(stderr, "Error: can't allocate output buffer\n");
53
     exit(1);
54
    }
55
    for (i = 0; i < buff_size; i++) {
56
      buff[i].c.data = NULL;
57
    }
58
  } else {
59
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
60
   exit(1);
61
  }
62
}
63

    
64
static void buffer_print(void)
65
{
66
#ifdef DEBUG
67
  int i;
68

    
69
  if (next_chunk < 0) {
70
    return;
71
  }
72

    
73
  dprintf("\toutbuf: %d-> ",next_chunk);
74
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
75
    if (buff[i % buff_size].c.data) {
76
      dprintf("%d",i % 10);
77
    } else {
78
      dprintf(".");
79
    }
80
  }
81
  dprintf("\n");
82
#endif
83
}
84

    
85
static void buffer_free(int i)
86
{
87
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].c.data);
88
  chunk_write(out, &buff[i].c);
89
  free(buff[i].c.data);
90
  buff[i].c.data = NULL;
91
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].c.id + 1);
92
  reg_chunk_playout(buff[i].c.id, true, buff[i].c.timestamp);
93
  next_chunk = buff[i].c.id + 1;
94
}
95

    
96
static void buffer_flush(int id)
97
{
98
  int i = id % buff_size;
99

    
100
  while(buff[i].c.data) {
101
    buffer_free(i);
102
    i = (i + 1) % buff_size;
103
    if (i == id % buff_size) {
104
      break;
105
    }
106
  }
107
}
108

    
109
void output_deliver(const struct chunk *c)
110
{
111
  if (!buff) {
112
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
113
    output_init(8, NULL);
114
  }
115

    
116
  dprintf("Chunk %d delivered\n", c->id);
117
  buffer_print();
118
  if (c->id < next_chunk) {
119
    return;
120
  }
121

    
122
  /* Initialize buffer with first chunk */
123
  if (next_chunk == -1) {
124
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
125
  }
126

    
127
  if (c->id >= next_chunk + buff_size) {
128
    int i;
129

    
130
    /* We might need some space for storing this chunk,
131
     * or the stored chunks are too old
132
     */
133
    for (i = next_chunk; i <= c->id - buff_size; i++) {
134
      if (buff[i % buff_size].c.data) {
135
        buffer_free(i % buff_size);
136
      } else {
137
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
138
        next_chunk++;
139
      }
140
    }
141
    buffer_flush(next_chunk);
142
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
143
  }
144

    
145
  dprintf("%d == %d?\n", c->id, next_chunk);
146
  if (c->id == next_chunk) {
147
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
148
    chunk_write(out, c);
149
    reg_chunk_playout(c->id, true, c->timestamp);
150
    next_chunk++;
151
    buffer_flush(next_chunk);
152
  } else {
153
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
154
    if (buff[c->id % buff_size].c.data) {
155
      if (buff[c->id % buff_size].c.id == c->id) {
156
        /* Duplicate of a stored chunk */
157
        if(chunk_log){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);}
158
        dprintf("\tDuplicate!\n");
159
        reg_chunk_duplicate();
160
        return;
161
      }
162
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].c.id);
163
      exit(-1);
164
    }
165
    /* We previously flushed, so we know that c->id is free */
166
    memcpy(&buff[c->id % buff_size].c, c, sizeof(struct chunk));
167
    buff[c->id % buff_size].c.data = malloc(c->size);
168
    memcpy(buff[c->id % buff_size].c.data, c->data, c->size);
169
  }
170
}