Statistics
| Branch: | Revision:

pstreamer / src / output-grapes.c @ fe735c05

History | View | Annotate | Download (7.3 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 * Copyright (c) 2017 Luca Baldesi
5
 *
6
 * This file is part of PeerStreamer.
7
 *
8
 * PeerStreamer is free software: you can redistribute it and/or
9
 * modify it under the terms of the GNU Affero General Public License as
10
 * published by the Free Software Foundation, either version 3 of the
11
 * License, or (at your option) any later version.
12
 *
13
 * PeerStreamer is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
16
 * General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU Affero General Public License
19
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
20
 *
21
 */
22
#include <unistd.h>
23
#include <stdlib.h>
24
#include <string.h>
25
#include <stdint.h>
26
#include <stdbool.h>
27
#include <stdio.h>
28

    
29
#include <chunk.h>
30
#include <chunkiser.h>
31
#include <psinstance_internal.h>
32
#include <measures.h>
33

    
34
#include "output.h"
35
#include "dbg.h"
36

    
37
struct chunk_output {
38
        int last_chunk;
39
        int next_chunk;
40
        int buff_size;
41
        int start_id;
42
        int end_id;
43
        
44
        char sflag;
45
        char eflag;
46
        struct chunk *buff;
47
        struct output_stream *out;
48
        const struct psinstance * ps;
49
        
50
        bool reorder; 
51
};
52

    
53
//struct outbuf {
54
//  struct chunk c;
55
//};
56
//static struct outbuf *buff;
57
//static struct output_stream *out;
58

    
59

    
60
void output_init(struct chunk_output * outg, int bufsize, const char *config)
61
{
62
  //char * c;
63

    
64
  // c = strchr(config,',');  // this is actually ignored
65
  // if (c) {
66
  //   *(c++) = 0;
67
  // }
68
  outg->out = out_stream_init("/dev/stdout", config);
69
  if (outg->out == NULL) {
70
     fprintf(stderr, "Error: can't initialize output module\n");
71
     exit(1);
72
  }
73
  if (!outg->buff) {
74
    int i;
75

    
76
    outg->buff_size = bufsize;
77
    outg->buff = malloc(sizeof(struct chunk) * outg->buff_size);
78
    if (!outg->buff) {
79
     fprintf(stderr, "Error: can't allocate output buffer\n");
80
     exit(1);
81
    }
82
    for (i = 0; i < outg->buff_size; i++) {
83
      (outg->buff)[i].data = NULL;
84
    }
85
  } else {
86
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
87
   exit(1);
88
  }
89
        outg->last_chunk = -1;
90
        outg->next_chunk = -1;
91
        outg->sflag = 0;
92
        outg->eflag = 0;
93
        outg->start_id = -1;
94
        outg->end_id = -1;
95
        outg->reorder = true; // should be zero if using chunkstream (legacy code from PeerStreamer)
96
}
97

    
98
static void buffer_print(const struct chunk_output * outg)
99
{
100
#ifdef DEBUG
101
  int i;
102

    
103
  if (outg->next_chunk < 0) {
104
    return;
105
  }
106

    
107
  dprintf("\toutbuf: %d-> ",outg->next_chunk);
108
  for (i = outg->next_chunk; i < outg->next_chunk + outg->buff_size; i++) {
109
    if (outg->buff[i % outg->buff_size].data) {
110
      dprintf("%d",i % 10);
111
    } else {
112
      dprintf(".");
113
    }
114
  }
115
  dprintf("\n");
116
#endif
117
}
118

    
119
static void buffer_free(struct chunk_output * outg, int i)
120
{
121
  dprintf("\t\tFlush Buf %d: %s\n", i, outg->buff[i].data);
122
  if(outg->start_id == -1 || outg->buff[i].id >= outg->start_id) {
123
    if(outg->end_id == -1 || outg->buff[i].id <= outg->end_id) {
124
      if(outg->sflag == 0) {
125
        fprintf(stderr, "\nFirst chunk id played out: %d\n\n",outg->buff[i].id);
126
        outg->sflag = 1;
127
      }
128
      if (outg->reorder) chunk_write(outg->out, &(outg->buff[i]));
129
      outg->last_chunk = outg->buff[i].id;
130
    } else if (outg->eflag == 0 && outg->last_chunk != -1) {
131
      fprintf(stderr, "\nLast chunk id played out: %d\n\n", outg->last_chunk);
132
      outg->eflag = 1;
133
    }
134
  }
135

    
136
  free(outg->buff[i].data);
137
  outg->buff[i].data = NULL;
138
  dprintf("Next Chunk: %d -> %d\n", outg->next_chunk, outg->buff[i].id + 1);
139
  reg_chunk_playout(psinstance_measures(outg->ps), outg->buff[i].id, true, outg->buff[i].timestamp);
140
  outg->next_chunk = outg->buff[i].id + 1;
141
}
142

    
143
static void buffer_flush(struct chunk_output * outg, int id)
144
{
145
  int i = id % outg->buff_size;
146

    
147
  while(outg->buff[i].data) {
148
    buffer_free(outg, i);
149
    i = (i + 1) % outg->buff_size;
150
    if (i == id % outg->buff_size) {
151
      break;
152
    }
153
  }
154
}
155

    
156
void output_deliver(struct chunk_output * outg, const struct chunk *c)
157
{
158
  if (!outg->buff) {
159
    fprintf(stderr, "Warning: buff not initialized!!! Setting output buffer to 8\n");
160
  }
161

    
162
  if (!outg->reorder) chunk_write(outg->out, c);
163

    
164
  dprintf("Chunk %d delivered\n", c->id);
165
  buffer_print(outg);
166
  if (c->id < outg->next_chunk) {
167
    return;
168
  }
169

    
170
  /* Initialize buffer with first chunk */
171
  if (outg->next_chunk == -1) {
172
    outg->next_chunk = c->id; // FIXME: could be anything between c->id and (c->id - buff_size + 1 > 0) ? c->id - buff_size + 1 : 0
173
    fprintf(stderr,"First RX Chunk ID: %d\n", c->id);
174
  }
175

    
176
  if (c->id >= outg->next_chunk + outg->buff_size) {
177
    int i;
178

    
179
    /* We might need some space for storing this chunk,
180
     * or the stored chunks are too old
181
     */
182
    for (i = outg->next_chunk; i <= c->id - outg->buff_size; i++) {
183
      if (outg->buff[i % outg->buff_size].data) {
184
        buffer_free(outg, i % outg->buff_size);
185
      } else {
186
        reg_chunk_playout(psinstance_measures(outg->ps), c->id, false, c->timestamp); // FIXME: some chunks could be counted as lost at the beginning, depending on the initialization of next_chunk
187
        outg->next_chunk++;
188
      }
189
    }
190
    buffer_flush(outg, outg->next_chunk);
191
    dprintf("Next is now %d, chunk is %d\n", outg->next_chunk, c->id);
192
  }
193

    
194
  dprintf("%d == %d?\n", c->id, outg->next_chunk);
195
  if (c->id == outg->next_chunk) {
196
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % outg->buff_size, c->data);
197

    
198
    if(outg->start_id == -1 || c->id >= outg->start_id) {
199
      if(outg->end_id == -1 || c->id <= outg->end_id) {
200
        if(outg->sflag == 0) {
201
          fprintf(stderr, "\nFirst chunk id played out: %d\n\n",c->id);
202
          outg->sflag = 1;
203
        }
204
        if (outg->reorder) chunk_write(outg->out, c);
205
        outg->last_chunk = c->id;
206
      } else if (outg->eflag == 0 && outg->last_chunk != -1) {
207
        fprintf(stderr, "\nLast chunk id played out: %d\n\n", outg->last_chunk);
208
        outg->eflag = 1;
209
      }
210
    }
211
    reg_chunk_playout(psinstance_measures(outg->ps), c->id, true, c->timestamp);
212
    outg->next_chunk++;
213
    buffer_flush(outg, outg->next_chunk);
214
  } else {
215
    dprintf("Storing %d (in %d)\n", c->id, c->id % outg->buff_size);
216
    if (outg->buff[c->id % outg->buff_size].data) {
217
      if (outg->buff[c->id % outg->buff_size].id == c->id) {
218
        /* Duplicate of a stored chunk */
219
        dprintf("\tDuplicate!\n");
220
        reg_chunk_duplicate(psinstance_measures(outg->ps));
221
        return;
222
      }
223
      fprintf(stderr, "Crap!, chunkid:%d, storedid: %d\n", c->id, outg->buff[c->id % outg->buff_size].id);
224
      exit(-1);
225
    }
226
    /* We previously flushed, so we know that c->id is free */
227
    memcpy(&(outg->buff[c->id % outg->buff_size]), c, sizeof(struct chunk));
228
    outg->buff[c->id % outg->buff_size].data = malloc(c->size);
229
    memcpy(outg->buff[c->id % outg->buff_size].data, c->data, c->size);
230
  }
231
}
232

    
233
struct chunk_output * output_create(int bufsize, const char *config, const struct psinstance * ps)
234
{
235
        struct chunk_output * outg;
236

    
237
        outg = malloc(sizeof(struct chunk_output));
238
        outg->ps = ps;
239
        outg->buff = NULL;
240
        outg->out = NULL;
241
        output_init(outg, bufsize, config);
242

    
243
        return outg;
244
}
245

    
246
void output_destroy(struct chunk_output ** outg)
247
{
248
        if (outg && *outg)
249
        {
250
                buffer_flush(*outg, 0);
251
                if((*outg)->buff)
252
                        free((*outg)->buff);
253
                if((*outg)->out)
254
                        out_stream_close((*outg)->out);
255
                free(*outg);
256
        }
257
}