Statistics
| Branch: | Revision:

streamers / output.c @ 74a5d4ae

History | View | Annotate | Download (3.57 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14

    
15
#include "measures.h"
16
#include "out-stream.h"
17
#include "dbg.h"
18

    
19
static int next_chunk = -1;
20
static int buff_size;
21

    
22
struct outbuf {
23
  void *data;
24
  int size;
25
  int id;
26
  uint64_t timestamp;
27
};
28
static struct outbuf *buff;
29

    
30
void output_init(int bufsize)
31
{
32
  if (!buff) {
33
    int i;
34

    
35
    buff_size = bufsize;
36
    buff = malloc(sizeof(struct outbuf) * buff_size);
37
    if (!buff) {
38
     fprintf(stderr, "Error: can't allocate output buffer\n");
39
     exit(1);
40
    }
41
    for (i = 0; i < buff_size; i++) {
42
      buff[i].data = NULL;
43
    }
44
  } else {
45
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
46
   exit(1);
47
  }
48
}
49

    
50
void buffer_print()
51
{
52
#ifdef DEBUG
53
  int i;
54

    
55
  if (next_chunk < 0) {
56
    return;
57
  }
58

    
59
  dprintf("\toutbuf: %d-> ",next_chunk);
60
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
61
    if (buff[i % buff_size].data) {
62
      dprintf("%d",i % 10);
63
    } else {
64
      dprintf(".");
65
    }
66
  }
67
  dprintf("\n");
68
#endif
69
}
70

    
71
void buffer_free(int i)
72
{
73
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
74
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
75
  free(buff[i].data);
76
  buff[i].data = NULL;
77
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
78
  reg_chunk_playout(buff[i].id, true, buff[i].timestamp);
79
  next_chunk = buff[i].id + 1;
80
}
81

    
82
void buffer_flush(int id)
83
{
84
  int i = id % buff_size;
85

    
86
  while(buff[i].data) {
87
    buffer_free(i);
88
    i = (i + 1) % buff_size;
89
    if (i == id % buff_size) {
90
      break;
91
    }
92
  }
93
}
94

    
95
void output_deliver(const struct chunk *c)
96
{
97
  if (!buff) {
98
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
99
    output_init(8);
100
  }
101

    
102
  dprintf("Chunk %d delivered\n", c->id);
103
  buffer_print();
104
  if (c->id < next_chunk) {
105
    return;
106
  }
107

    
108
  /* Initialize buffer with first chunk */
109
  if (next_chunk == -1) {
110
    next_chunk = (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0;
111
  }
112

    
113
  if (c->id >= next_chunk + buff_size) {
114
    int i;
115

    
116
    /* We might need some space for storing this chunk,
117
     * or the stored chunks are too old
118
     */
119
    for (i = next_chunk; i <= c->id - buff_size; i++) {
120
      if (buff[i % buff_size].data) {
121
        buffer_free(i % buff_size);
122
      } else {
123
        reg_chunk_playout(c->id, false, c->timestamp);
124
        next_chunk++;
125
      }
126
    }
127
    buffer_flush(next_chunk);
128
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
129
  }
130

    
131
  dprintf("%d == %d?\n", c->id, next_chunk);
132
  if (c->id == next_chunk) {
133
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
134
    chunk_write(c->id, c->data, c->size);
135
    reg_chunk_playout(c->id, true, c->timestamp);
136
    next_chunk++;
137
    buffer_flush(next_chunk);
138
  } else {
139
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
140
    if (buff[c->id % buff_size].data) {
141
      if (buff[c->id % buff_size].id == c->id) {
142
        /* Duplicate of a stored chunk */
143
        dprintf("\tDuplicate!\n");
144
        reg_chunk_duplicate();
145
        return;
146
      }
147
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].id);
148
      exit(-1);
149
    }
150
    /* We previously flushed, so we know that c->id is free */
151
    buff[c->id % buff_size].data = malloc(c->size);
152
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
153
    buff[c->id % buff_size].size = c->size;
154
    buff[c->id % buff_size].id = c->id;
155
    buff[c->id % buff_size].timestamp = c->timestamp;
156
  }
157
}