Statistics
| Branch: | Revision:

streamers / output.c @ 4aaa8891

History | View | Annotate | Download (3.99 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14

    
15
#include "output.h"
16
#include "measures.h"
17
#include "out-stream.h"
18
#include "dbg.h"
19

    
20
static int next_chunk = -1;
21
static int buff_size;
22

    
23
struct outbuf {
24
  void *data;
25
  int size;
26
  int id;
27
  uint64_t timestamp;
28
};
29
static struct outbuf *buff;
30

    
31
void output_init(int bufsize, const char *config)
32
{
33
  if (out_stream_init(config) < 0) {
34
     fprintf(stderr, "Error: can't initialize output module\n");
35
     exit(1);
36
  }
37

    
38
  if (!buff) {
39
    int i;
40

    
41
    buff_size = bufsize;
42
    buff = malloc(sizeof(struct outbuf) * buff_size);
43
    if (!buff) {
44
     fprintf(stderr, "Error: can't allocate output buffer\n");
45
     exit(1);
46
    }
47
    for (i = 0; i < buff_size; i++) {
48
      buff[i].data = NULL;
49
    }
50
  } else {
51
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
52
   exit(1);
53
  }
54
}
55

    
56
void buffer_print()
57
{
58
#ifdef DEBUG
59
  int i;
60

    
61
  if (next_chunk < 0) {
62
    return;
63
  }
64

    
65
  dprintf("\toutbuf: %d-> ",next_chunk);
66
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
67
    if (buff[i % buff_size].data) {
68
      dprintf("%d",i % 10);
69
    } else {
70
      dprintf(".");
71
    }
72
  }
73
  dprintf("\n");
74
#endif
75
}
76

    
77
void buffer_free(int i)
78
{
79
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
80
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
81
  free(buff[i].data);
82
  buff[i].data = NULL;
83
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
84
  reg_chunk_playout(buff[i].id, true, buff[i].timestamp);
85
  next_chunk = buff[i].id + 1;
86
}
87

    
88
void buffer_flush(int id)
89
{
90
  int i = id % buff_size;
91

    
92
  while(buff[i].data) {
93
    buffer_free(i);
94
    i = (i + 1) % buff_size;
95
    if (i == id % buff_size) {
96
      break;
97
    }
98
  }
99
}
100

    
101
void output_deliver(const struct chunk *c)
102
{
103
  if (!buff) {
104
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
105
    output_init(8, NULL);
106
  }
107

    
108
  dprintf("Chunk %d delivered\n", c->id);
109
  buffer_print();
110
  if (c->id < next_chunk) {
111
    return;
112
  }
113

    
114
  /* Initialize buffer with first chunk */
115
  if (next_chunk == -1) {
116
    next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
117
  }
118

    
119
  if (c->id >= next_chunk + buff_size) {
120
    int i;
121

    
122
    /* We might need some space for storing this chunk,
123
     * or the stored chunks are too old
124
     */
125
    for (i = next_chunk; i <= c->id - buff_size; i++) {
126
      if (buff[i % buff_size].data) {
127
        buffer_free(i % buff_size);
128
      } else {
129
        reg_chunk_playout(c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
130
        next_chunk++;
131
      }
132
    }
133
    buffer_flush(next_chunk);
134
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
135
  }
136

    
137
  dprintf("%d == %d?\n", c->id, next_chunk);
138
  if (c->id == next_chunk) {
139
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
140
    chunk_write(c->id, c->data, c->size);
141
    reg_chunk_playout(c->id, true, c->timestamp);
142
    next_chunk++;
143
    buffer_flush(next_chunk);
144
  } else {
145
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
146
    if (buff[c->id % buff_size].data) {
147
      if (buff[c->id % buff_size].id == c->id) {
148
        /* Duplicate of a stored chunk */
149
        extern bool log_on; //ENST
150
        if(log_on){fprintf(stderr,"Duplicate! chunkID: %d\n", c->id);} // ENST
151
        dprintf("\tDuplicate!\n");
152
        reg_chunk_duplicate();
153
        return;
154
      }
155
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, buff[c->id % buff_size].id);
156
      exit(-1);
157
    }
158
    /* We previously flushed, so we know that c->id is free */
159
    buff[c->id % buff_size].data = malloc(c->size);
160
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
161
    buff[c->id % buff_size].size = c->size;
162
    buff[c->id % buff_size].id = c->id;
163
    buff[c->id % buff_size].timestamp = c->timestamp;
164
  }
165
}