Statistics
| Branch: | Revision:

streamers / output.c @ bc1ddc15

History | View | Annotate | Download (3.79 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 d3a583ca Luca
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10 89e893e2 Luca
#include <stdint.h>
11 c792c999 Luca
#include <stdio.h>
12 89e893e2 Luca
13
#include <chunk.h>
14
15 094f504c Csaba Kiraly
#include "measures.h"
16 54b631d9 Luca Abeni
#include "out-stream.h"
17 c792c999 Luca
#include "dbg.h"
18
19 d6a213f3 Csaba Kiraly
static int next_chunk = -1;
20 af2907a9 Csaba Kiraly
static int buff_size;
21 d3a583ca Luca
22 af2907a9 Csaba Kiraly
struct outbuf {
23 74a5d4ae CsabaKiraly
  void *data;
24 d3a583ca Luca
  int size;
25 c792c999 Luca
  int id;
26 19d6b4ca Csaba Kiraly
  uint64_t timestamp;
27 af2907a9 Csaba Kiraly
};
28
static struct outbuf *buff;
29
30
void output_init(int bufsize)
31
{
32
  if (!buff) {
33 80b2d694 Csaba Kiraly
    int i;
34
35 af2907a9 Csaba Kiraly
    buff_size = bufsize;
36 74a5d4ae CsabaKiraly
    buff = malloc(sizeof(struct outbuf) * buff_size);
37 af2907a9 Csaba Kiraly
    if (!buff) {
38
     fprintf(stderr, "Error: can't allocate output buffer\n");
39
     exit(1);
40
    }
41 80b2d694 Csaba Kiraly
    for (i = 0; i < buff_size; i++) {
42
      buff[i].data = NULL;
43
    }
44 af2907a9 Csaba Kiraly
  } else {
45
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
46
   exit(1);
47
  }
48
}
49 d3a583ca Luca
50 fdcdce95 Csaba Kiraly
void buffer_print()
51
{
52 3c50e0b1 Csaba Kiraly
#ifdef DEBUG
53 fdcdce95 Csaba Kiraly
  int i;
54
55 0bee8166 Csaba Kiraly
  if (next_chunk < 0) {
56
    return;
57
  }
58
59 fdcdce95 Csaba Kiraly
  dprintf("\toutbuf: %d-> ",next_chunk);
60
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
61
    if (buff[i % buff_size].data) {
62
      dprintf("%d",i % 10);
63
    } else {
64
      dprintf(".");
65
    }
66
  }
67
  dprintf("\n");
68 3c50e0b1 Csaba Kiraly
#endif
69 fdcdce95 Csaba Kiraly
}
70
71 c792c999 Luca
void buffer_free(int i)
72
{
73 298a8869 Luca Abeni
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
74 f05acd38 Csaba Kiraly
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
75 c792c999 Luca
  free(buff[i].data);
76
  buff[i].data = NULL;
77 b76a7006 Luca
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
78 19d6b4ca Csaba Kiraly
  reg_chunk_playout(buff[i].id, true, buff[i].timestamp);
79 c792c999 Luca
  next_chunk = buff[i].id + 1;
80
}
81
82 d3a583ca Luca
void buffer_flush(int id)
83
{
84
  int i = id % buff_size;
85
86
  while(buff[i].data) {
87 c792c999 Luca
    buffer_free(i);
88 d3a583ca Luca
    i = (i + 1) % buff_size;
89 c792c999 Luca
    if (i == id % buff_size) {
90 d3a583ca Luca
      break;
91
    }
92
  }
93
}
94
95 89e893e2 Luca
void output_deliver(const struct chunk *c)
96
{
97 af2907a9 Csaba Kiraly
  if (!buff) {
98
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
99
    output_init(8);
100
  }
101
102 c792c999 Luca
  dprintf("Chunk %d delivered\n", c->id);
103 9132cb6f Csaba Kiraly
  buffer_print();
104 c792c999 Luca
  if (c->id < next_chunk) {
105
    return;
106
  }
107
108 402088f8 Csaba Kiraly
  /* Initialize buffer with first chunk */
109
  if (next_chunk == -1) {
110 e9397abf CsabaKiraly
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
111 402088f8 Csaba Kiraly
  }
112
113 c792c999 Luca
  if (c->id >= next_chunk + buff_size) {
114
    int i;
115
116
    /* We might need some space for storing this chunk,
117
     * or the stored chunks are too old
118
     */
119
    for (i = next_chunk; i <= c->id - buff_size; i++) {
120
      if (buff[i % buff_size].data) {
121
        buffer_free(i % buff_size);
122
      } else {
123 e9397abf CsabaKiraly
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
124 c792c999 Luca
        next_chunk++;
125
      }
126
    }
127
    buffer_flush(next_chunk);
128
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
129
  }
130
131
  dprintf("%d == %d?\n", c->id, next_chunk);
132 d3a583ca Luca
  if (c->id == next_chunk) {
133 298a8869 Luca Abeni
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
134 54b631d9 Luca Abeni
    chunk_write(c->id, c->data, c->size);
135 19d6b4ca Csaba Kiraly
    reg_chunk_playout(c->id, true, c->timestamp);
136 d3a583ca Luca
    next_chunk++;
137
    buffer_flush(next_chunk);
138
  } else {
139 c792c999 Luca
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
140 e8654707 Luca
    if (buff[c->id % buff_size].data) {
141
      if (buff[c->id % buff_size].id == c->id) {
142
        /* Duplicate of a stored chunk */
143 bc1ddc15 MatteoSammarco
        fprintf(stderr,"Duplicate! chunkID: %d\n", c->id); // ENST
144 13d85fc6 Csaba Kiraly
        dprintf("\tDuplicate!\n");
145 30c89aac Csaba Kiraly
        reg_chunk_duplicate();
146 e8654707 Luca
        return;
147
      }
148 402088f8 Csaba Kiraly
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].id);
149 c792c999 Luca
      exit(-1);
150
    }
151
    /* We previously flushed, so we know that c->id is free */
152 74a5d4ae CsabaKiraly
    buff[c->id % buff_size].data = malloc(c->size);
153 d3a583ca Luca
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
154
    buff[c->id % buff_size].size = c->size;
155 c792c999 Luca
    buff[c->id % buff_size].id = c->id;
156 19d6b4ca Csaba Kiraly
    buff[c->id % buff_size].timestamp = c->timestamp;
157 d3a583ca Luca
  }
158 89e893e2 Luca
}