Revision feaa6e2c

View differences:

Chunkiser/input-stream-avs.c
1
/*
2
 *  Copyright (c) 2009 Luca Abeni
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6

  
7
#include <libavformat/avformat.h>
8
#include <stdbool.h>
9

  
10
#include "../dbg.h"
11
#include "../input-stream.h"
12
#include "../payload.h"
13
#include "../input.h"		//TODO: for flags. Check if we can do something smarter
14

  
15
#define STATIC_BUFF_SIZE 1000 * 1024
16
static int header_refresh_period;
17

  
18
struct input_stream {
19
  AVFormatContext *s;
20
  bool loop;	//loop on input file infinitely
21
  int audio_stream;
22
  int video_stream;
23
  int64_t last_ts;
24
  int64_t base_ts;
25
  int frames_since_global_headers;
26
};
27

  
28
static uint8_t codec_type(enum CodecID cid)
29
{
30
  switch (cid) {
31
    case CODEC_ID_MPEG1VIDEO:
32
    case CODEC_ID_MPEG2VIDEO:
33
      return 1;
34
    case CODEC_ID_H261:
35
      return 2;
36
    case CODEC_ID_H263P:
37
    case CODEC_ID_H263:
38
      return 3;
39
    case CODEC_ID_MJPEG:
40
      return 4;
41
    case CODEC_ID_MPEG4:
42
      header_refresh_period = 50;
43
      return 5;
44
    case CODEC_ID_FLV1:
45
      return 6;
46
    case CODEC_ID_SVQ3:
47
      return 7;
48
    case CODEC_ID_DVVIDEO:
49
      return 8;
50
    case CODEC_ID_H264:
51
      header_refresh_period = 50;
52
      return 9;
53
    case CODEC_ID_THEORA:
54
    case CODEC_ID_VP3:
55
      return 10;
56
    case CODEC_ID_SNOW:
57
      return 11;
58
    case CODEC_ID_VP6:
59
      return 12;
60
    case CODEC_ID_DIRAC:
61
      return 13;
62
    default:
63
      fprintf(stderr, "Unknown codec ID %d\n", cid);
64
      return 0;
65
  }
66
}
67

  
68
static void video_header_fill(uint8_t *data, AVStream *st)
69
{
70
  int num, den;
71

  
72
  num = st->avg_frame_rate.num;
73
  den = st->avg_frame_rate.den;
74
//fprintf(stderr, "Rate: %d/%d\n", num, den);
75
  if (num == 0) {
76
    num = st->r_frame_rate.num;
77
    den = st->r_frame_rate.den;
78
  }
79
  if (num > (1 << 16)) {
80
    num /= 1000;
81
    den /= 1000;
82
  }
83
  payload_header_write(data, codec_type(st->codec->codec_id), st->codec->width, st->codec->height, num, den);
84
}
85

  
86
static void frame_header_fill(uint8_t *data, int size, AVPacket *pkt, AVStream *st, int64_t base_ts)
87
{
88
  AVRational fps;
89
  int32_t pts, dts;
90

  
91
  fps = st->avg_frame_rate;
92
  if (fps.num == 0) {
93
    fps = st->r_frame_rate;
94
  }
95
  if (pkt->pts != AV_NOPTS_VALUE) {
96
    pts = av_rescale_q(pkt->pts, st->time_base, (AVRational){fps.den, fps.num}),
97
    pts += av_rescale_q(base_ts, AV_TIME_BASE_Q, (AVRational){fps.den, fps.num});
98
  } else {
99
    pts = -1;
100
  }
101
  dprintf("pkt->pts=%ld PTS=%d",pkt->pts, pts);
102
  if (pkt->dts != AV_NOPTS_VALUE) {
103
    dts = av_rescale_q(pkt->dts, st->time_base, (AVRational){fps.den, fps.num});
104
    dts += av_rescale_q(base_ts, AV_TIME_BASE_Q, (AVRational){fps.den, fps.num});
105
  } else {
106
    fprintf(stderr, "No DTS???\n");
107
    dts = 0;
108
  }
109
  dprintf(" DTS=%d\n",dts);
110
  frame_header_write(data, size, pts, dts);
111
}
112

  
113
struct input_stream *input_stream_open(const char *fname, int *period, uint16_t flags)
114
{
115
  struct input_stream *desc;
116
  int i, res;
117

  
118
  avcodec_register_all();
119
  av_register_all();
120

  
121
  desc = malloc(sizeof(struct input_stream));
122
  if (desc == NULL) {
123
    return NULL;
124
  }
125
  res = av_open_input_file(&desc->s, fname, NULL, 0, NULL);
126
  if (res < 0) {
127
    fprintf(stderr, "Error opening %s: %d\n", fname, res);
128

  
129
    return NULL;
130
  }
131

  
132
  desc->s->flags |= AVFMT_FLAG_GENPTS;
133
  res = av_find_stream_info(desc->s);
134
  if (res < 0) {
135
    fprintf(stderr, "Cannot find codec parameters for %s\n", fname);
136

  
137
    return NULL;
138
  }
139
  desc->video_stream = -1;
140
  desc->audio_stream = -1;
141
  desc->last_ts = 0;
142
  desc->base_ts = 0;
143
  desc->frames_since_global_headers = 0;
144
  desc->loop = flags & INPUT_LOOP;
145
  for (i = 0; i < desc->s->nb_streams; i++) {
146
    if (desc->video_stream == -1 && desc->s->streams[i]->codec->codec_type == CODEC_TYPE_VIDEO) {
147
      desc->video_stream = i;
148
      fprintf(stderr, "Video Frame Rate = %d/%d --- Period: %lld\n",
149
              desc->s->streams[i]->r_frame_rate.num,
150
              desc->s->streams[i]->r_frame_rate.den,
151
              av_rescale(1000000, desc->s->streams[i]->r_frame_rate.den, desc->s->streams[i]->r_frame_rate.num));
152
      *period = av_rescale(1000000, desc->s->streams[i]->r_frame_rate.den, desc->s->streams[i]->r_frame_rate.num);
153
    }
154
    if (desc->audio_stream == -1 && desc->s->streams[i]->codec->codec_type == CODEC_TYPE_AUDIO) {
155
      desc->audio_stream = i;
156
    }
157
  }
158

  
159
  dump_format(desc->s, 0, fname, 0);
160

  
161
  return desc;
162
}
163

  
164
void input_stream_close(struct input_stream *s)
165
{
166
    av_close_input_file(s->s);
167
    free(s);
168
}
169

  
170
int input_stream_rewind(struct input_stream *s)
171
{
172
    int ret;
173

  
174
    ret = av_seek_frame(s->s,-1,0,0);
175
    s->base_ts = s->last_ts;
176

  
177
    return ret;
178
}
179

  
180

  
181
#if 0
182
int input_get_1(struct input_stream *s, struct chunk *c)
183
{
184
    static AVPacket pkt;
185
    static int inited;
186
    AVStream *st;
187
    int res;
188
    static uint8_t static_buff[STATIC_BUFF_SIZE];
189
    static int cid;
190
    uint8_t *p;
191

  
192
    p = static_buff;
193
    if (inited == 0) {
194
        inited = 1;
195
        res = av_read_frame(s->s, &pkt);
196
        if (res < 0) {
197
            fprintf(stderr, "First read failed: %d!!!\n", res);
198

  
199
            return 0;
200
        }
201
        if ((pkt.flags & PKT_FLAG_KEY) == 0) {
202
            fprintf(stderr, "First frame is not key frame!!!\n");
203

  
204
            return 0;
205
        }
206
    }
207
    c->timestamp = pkt.dts;
208
    memcpy(p, pkt.data, pkt.size);
209
    p += pkt.size;
210
    while (1) {
211
        res = av_read_frame(s->s, &pkt);
212
        if (res >= 0) {
213
            st = s->s->streams[pkt.stream_index];
214
            if (pkt.flags & PKT_FLAG_KEY) {
215
                c->size = p - static_buff;
216
                c->data = malloc(c->size);
217
                if (c->data == NULL) {
218
                  return 0;
219
                }
220
                memcpy(c->data, static_buff, c->size);
221
                c->attributes_size = 0;
222
                c->attributes = NULL;
223
                c->id = cid++; 
224
                return 1;
225
            }
226
            memcpy(p, pkt.data, pkt.size);
227
            p += pkt.size;
228
        } else {
229
            if (p - static_buff > 0) {
230
                c->size = p - static_buff;
231
                c->data = malloc(c->size);
232
                if (c->data == NULL) {
233
                  return 0;
234
                }
235
                memcpy(c->data, static_buff, c->size);
236
                c->attributes_size = 0;
237
                c->attributes = NULL;
238
                c->id = cid++; 
239
                return 1;
240
            }
241
            return 0;
242
        }
243
    }
244

  
245
    return 0;
246
}
247
#endif
248

  
249
uint8_t *chunkise(struct input_stream *s, int id, int *size, uint64_t *ts)
250
{
251
    AVPacket pkt;
252
    int res;
253
    uint8_t *data;
254
    int header_out, header_size;
255

  
256
    res = av_read_frame(s->s, &pkt);
257
    if (res < 0) {
258
      if (s->loop) {
259
        if (input_stream_rewind(s) >= 0) {
260
          *size = 0;
261
          *ts = s->last_ts;
262

  
263
          return NULL;
264
        }
265
      }
266
      fprintf(stderr, "AVPacket read failed: %d!!!\n", res);
267
      *size = -1;
268

  
269
      return NULL;
270
    }
271
    if (pkt.stream_index != s->video_stream) {
272
      *size = 0;
273
      *ts = s->last_ts;
274
      av_free_packet(&pkt);
275

  
276
      return NULL;
277
    }
278

  
279
    if (s->s->streams[pkt.stream_index]->codec->codec_type == CODEC_TYPE_VIDEO) {
280
      header_size = VIDEO_PAYLOAD_HEADER_SIZE;
281
    }
282
    if (header_refresh_period) {
283
      header_out = (pkt.flags & PKT_FLAG_KEY) != 0;
284
      if (header_out == 0) {
285
        s->frames_since_global_headers++;
286
        if (s->frames_since_global_headers == header_refresh_period) {
287
          s->frames_since_global_headers = 0;
288
          header_out = 1;
289
        }
290
      }
291
    } else {
292
      header_out = 0;
293
    }
294
    *size = pkt.size + s->s->streams[pkt.stream_index]->codec->extradata_size * header_out + header_size + FRAME_HEADER_SIZE;
295
    data = malloc(*size);
296
    if (data == NULL) {
297
      *size = -1;
298
      av_free_packet(&pkt);
299

  
300
      return NULL;
301
    }
302
    if (s->s->streams[pkt.stream_index]->codec->codec_type == CODEC_TYPE_VIDEO) {
303
      video_header_fill(data, s->s->streams[pkt.stream_index]);
304
    }
305
    data[VIDEO_PAYLOAD_HEADER_SIZE - 1] = 1;
306
    frame_header_fill(data + VIDEO_PAYLOAD_HEADER_SIZE, *size - header_size - FRAME_HEADER_SIZE, &pkt, s->s->streams[pkt.stream_index], s->base_ts);
307

  
308
    if (header_out && s->s->streams[pkt.stream_index]->codec->extradata_size) {
309
      memcpy(data + header_size + FRAME_HEADER_SIZE, s->s->streams[pkt.stream_index]->codec->extradata, s->s->streams[pkt.stream_index]->codec->extradata_size);
310
      memcpy(data + header_size + FRAME_HEADER_SIZE + s->s->streams[pkt.stream_index]->codec->extradata_size, pkt.data, pkt.size);
311
    } else {
312
      memcpy(data + header_size + FRAME_HEADER_SIZE, pkt.data, pkt.size);
313
    }
314
    *ts = av_rescale_q(pkt.dts, s->s->streams[pkt.stream_index]->time_base, AV_TIME_BASE_Q);
315
    dprintf("pkt.dts=%ld TS1=%lu" , pkt.dts, *ts);
316
    *ts += s->base_ts;
317
    dprintf(" TS2=%lu\n",*ts);
318
    s->last_ts = *ts;
319
    av_free_packet(&pkt);
320

  
321
    return data;
322
}
323

  
324
#if 0
325
int chunk_read_avs1(void *s_h, struct chunk *c)
326
{
327
    AVFormatContext *s = s_h;
328
    static AVPacket pkt;
329
    static int inited;
330
    AVStream *st;
331
    int res;
332
    int cnt;
333
    static uint8_t static_buff[STATIC_BUFF_SIZE];
334
    uint8_t *p, *pcurr;
335
    static uint8_t *p1;
336
    static struct chunk c2;
337
    int f1;
338
    static int f2;
339

  
340
    if (p1) {
341
        c2.id = c->id;
342
        *c = c2;
343
        p1 = NULL;
344

  
345
        return f2;
346
    }
347

  
348
    p = static_buff;
349
    p1 = static_buff + STATIC_BUFF_SIZE / 2;
350
    if (inited == 0) {
351
        inited = 1;
352
        res = av_read_frame(s, &pkt);
353
        if (res < 0) {
354
            fprintf(stderr, "First read failed: %d!!!\n", res);
355

  
356
            return 0;
357
        }
358
        if ((pkt.flags & PKT_FLAG_KEY) == 0) {
359
            fprintf(stderr, "First frame is not key frame!!!\n");
360

  
361
            return 0;
362
        }
363
    }
364
    cnt = 0; f1 = 0; f2 = 0;
365
    c->stride_size = 2;
366
    c2.stride_size = 2;
367
    pcurr = p1;
368
    if (pkt.size > 0) {
369
        memcpy(p, pkt.data, pkt.size);
370
        c->frame[0] = p;
371
        c->frame_len[0] = pkt.size;
372
        f1++;
373
        p += pkt.size;
374
    }
375
    while (1) {
376
        res = av_read_frame(s, &pkt);
377
        if (res >= 0) {
378
            st = s->streams[pkt.stream_index];
379
            if (pkt.flags & PKT_FLAG_KEY) {
380
                cnt++;
381
                if (cnt == 2) {
382
                    return f1;
383
                }
384
            }
385
            memcpy(pcurr, pkt.data, pkt.size);
386
            if (pcurr == p) {
387
                c->frame[f1] = pcurr;
388
                c->frame_len[f1] = pkt.size;
389
                p += pkt.size;
390
                pcurr = p1;
391
                f1++;
392
            } else {
393
                c2.frame[f2] = pcurr;
394
                c2.frame_len[f2] = pkt.size;
395
                p1 += pkt.size;
396
                pcurr = p;
397
                f2++;
398
            }
399
        } else {
400
            pkt.size = 0;
401

  
402
            return f1;
403
        }
404
    }
405

  
406
    return 0;
407
}
408
#endif
Makefile
95 95
OBJS += measures.o
96 96
endif
97 97

  
98
IO ?= ffmpeg
98
IO ?= grapes
99 99
ifeq ($(IO), grapes)
100 100
OBJS += input-grapes.o output-grapes.o
101 101
ifdef FFMPEG_DIR
......
110 110
LDLIBS += $(call ld-option, -lbz2)
111 111
endif
112 112
endif
113
ifeq ($(IO), ffmpeg)
114
OBJS += input.o
115
OBJS += Chunkiser/input-stream-avs.o 
116
OBJS += output.o 
117
OBJS += out-stream-avf.o
118
CPPFLAGS += -I$(FFMPEG_DIR)/include
119
LDFLAGS += -L$(FFMPEG_DIR)/lib
120
CFLAGS += -pthread
121
LDFLAGS += -pthread
122
LDLIBS += -lavformat -lavcodec -lavcore -lavutil
123
LDLIBS += -lm
124
LDLIBS += $(call ld-option, -lz)
125
LDLIBS += $(call ld-option, -lbz2)
126
endif
127 113
ifeq ($(IO), httpmhd)
128 114
CPPFLAGS += -DHTTPIO_MHD
129 115
CPPFLAGS += -DHTTPIO
......
161 147
LDFLAGS += -L$(LOCAL_CURL)/lib
162 148
LDLIBS += $(LOCAL_CURL)/lib/libcurl.a -lrt
163 149
endif
164
ifeq ($(IO), dummy)
165
OBJS += input.o
166
OBJS += input-stream-dummy.o 
167
OBJS += output.o 
168
OBJS += out-stream-dummy.o
169
endif
170
ifeq ($(IO), udp)
171
OBJS += input-udp.o
172
OBJS += output.o
173
OBJS += out-stream-udp.o
174
endif
175 150

  
176 151
EXECTARGET = offerstreamer
177 152
ifdef ML
input-stream-dummy.c
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <string.h>
11

  
12
#include "input-stream.h"
13
#include "payload.h"
14

  
15
static struct input_stream {
16
} fake_descriptor;
17

  
18
struct input_stream *input_stream_open(const char *fname, int *period, uint16_t flags)
19
{
20
  *period = 40000;
21
  return &fake_descriptor;
22
}
23

  
24
void input_stream_close(struct input_stream *dummy)
25
{
26
}
27

  
28
uint8_t *chunkise(struct input_stream *dummy, int id, int *size, uint64_t *ts)
29
{
30
  uint8_t *res;
31
  const int header_size = 1 + 2 + 2 + 2 + 2 + 1; // 1 Frame type + 2 width + 2 height + 2 frame rate num + 2 frame rate den + 1 number of frames
32
  static char buff[80];
33

  
34
  sprintf(buff, "Chunk %d", id);
35
  *ts = 40 * id * 1000;
36
  *size = strlen(buff) + 1 + header_size + FRAME_HEADER_SIZE;
37
  res = malloc(*size);
38
  res[0] = 1;
39
  res[1] = 352 >> 8;
40
  res[2] = 352 & 0xFF;
41
  res[3] = 288 >> 8;
42
  res[4] = 288 & 0xFF;
43
  res[5] = 0;
44
  res[6] = 1;
45
  res[7] = 0;
46
  res[8] = 25;
47
  res[9] = 1;
48
  res[10] = ((*size - header_size - FRAME_HEADER_SIZE)) >> 16 & 0xFF;
49
  res[11] = ((*size - header_size - FRAME_HEADER_SIZE)) >> 8 & 0xFF;
50
  res[12] = (*size - header_size - FRAME_HEADER_SIZE) & 0xFF;
51
  res[13] = *ts >> 24;
52
  res[14] = *ts >> 16;
53
  res[15] = *ts >> 8;
54
  res[16] = *ts & 0xFF;
55
  res[17] = 0;
56
  memcpy(res + header_size + FRAME_HEADER_SIZE, buff, *size - header_size - FRAME_HEADER_SIZE);
57

  
58
  return res;
59
}
input-udp.c
1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <sys/socket.h>
8
#include <netinet/in.h>
9
#include <unistd.h>
10
#include <stdlib.h>
11
#include <stdint.h>
12
#include <stdio.h>
13
#include <string.h>
14
#include <limits.h>
15

  
16
#include <chunk.h>
17

  
18
#include "input.h"
19
#include "input-stream.h"
20
#include "dbg.h"
21
#include "io_udp.h"
22

  
23
#define UDP_PORTS_NUM_MAX 10
24

  
25
struct input_desc {
26
  char ip[16];
27
  int base_port;
28
  int ports;
29
  int fds[UDP_PORTS_NUM_MAX + 1];
30
  int id;
31
  int interframe;
32
  uint64_t start_time;
33
  uint8_t *data;
34
  int size;
35
  int counter;
36
  int every;
37
};
38

  
39
static int listen_udp(int port)
40
{
41
  struct sockaddr_in servaddr;
42
  int r;
43
  int fd;
44

  
45
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
46
  if (fd < 0) {
47
    return -1;
48
  }
49

  
50
  bzero(&servaddr, sizeof(servaddr));
51
  servaddr.sin_family = AF_INET;
52
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
53
  servaddr.sin_port = htons(port);
54
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
55
  if (r < 0) {
56
    close(fd);
57
    return -1;
58
  }
59

  
60
  fprintf(stderr,"\topened fd:%d for port:%d\n", fd, port);
61
  return fd;
62
}
63

  
64
static int config_parse(struct input_desc *desc, const char *config)
65
{
66
  int res, i;
67

  
68
  if (!config) {
69
    return -1;
70
  }
71

  
72
  res = sscanf(config, "udp://%15[0-9.]:%d%n", desc->ip, &desc->base_port, &i);
73
  if (res < 2) {
74
    fprintf(stderr,"error parsing input config: %s\n", config);
75
    return -2;
76
  }
77
  if (*(config + i) == 0) {
78
    desc->ports = 1;
79
  } else {
80
    int max_port;
81
    res = sscanf(config + i, "-%d", &max_port);
82
    if (res == 1) {
83
      desc->ports = max_port - desc->base_port + 1;
84
      if (desc->ports <=0) {
85
        fprintf(stderr,"error parsing input config: negative port range %s\n", config + i);
86
        return -3;
87
      }
88
    } else {
89
      fprintf(stderr,"error parsing input config: bad port range %s\n", config + i);
90
      return -4;
91
    }
92
  }
93

  
94
  desc->every = 1;
95

  
96
  return 1;
97
}
98

  
99
struct input_desc *input_open(const char *config, uint16_t flags, int *fds, int fds_size)
100
{
101
  struct input_desc *res;
102
  struct timeval tv;
103
  int ret, i;
104

  
105
  res = malloc(sizeof(struct input_desc));
106
  if (res == NULL) {
107
    return NULL;
108
  }
109

  
110
  ret = config_parse(res, config);
111
  if (ret < 0) {
112
    free(res);
113
    return NULL;
114
  }
115

  
116
  if ( res->ports > UDP_PORTS_NUM_MAX) {
117
    fprintf(stderr, "UDP input supports only %d ports\n", UDP_PORTS_NUM_MAX);
118
    free(res);
119
    return NULL;
120
  }
121

  
122
  if (! fds || fds_size <= res->ports + 1) {
123
    fprintf(stderr, "UDP input module needs more then %d file descriptors\n", fds_size-1);
124
    free(res);
125
    return NULL;
126
  }
127

  
128
  for (i = 0; i < res->ports; i++) {
129
    res->fds[i] = fds[i] = listen_udp(res->base_port + i);
130
    if (fds[i] < 0) {
131
      for (; i>=0 ; i--) {
132
        close(fds[i]);
133
      }
134
      res->fds[0] = fds[0] = -1;
135
      free(res);
136
      return NULL;
137
    }
138
  }
139
  res->fds[i] = fds[i] = -1;
140

  
141
  gettimeofday(&tv, NULL);
142
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
143
  //res->id = (res->start_time / res->interframe) % INT_MAX; //FIXME
144
  res->id = 1;
145

  
146
  res->data = NULL;
147
  res->size = 0;
148
  res->counter = 0;
149

  
150
  return res;
151
}
152

  
153
void input_close(struct input_desc *s)
154
{
155
  int i;
156

  
157
  for (i = 0; i < s->ports; i++) {
158
    close(s->fds[i]);
159
  }
160
  free(s);
161
}
162

  
163
#define UDP_BUF_SIZE 65536
164
int input_get_udp(struct input_desc *s, struct chunk *c, int fd_index)
165
{
166
  uint8_t buf[UDP_BUF_SIZE];
167
  size_t buflen = UDP_BUF_SIZE;
168
  ssize_t msglen;
169
  struct timeval now;
170
  uint8_t *data;
171

  
172
  fprintf(stderr,"\treading on fd:%d (index:%d)\n", s->fds[fd_index], fd_index);
173
  msglen = recv(s->fds[fd_index], buf, buflen, MSG_DONTWAIT);
174
  if (msglen < 0) {
175
    c->data = NULL;
176
    return 0;
177
  }
178
  if (msglen == 0) {
179
    c->data = NULL;
180
    return 0;
181
  }
182
  fprintf(stderr,"\treceived %d bytes\n",msglen);
183

  
184
  s->data = realloc(s->data, s->size + sizeof(struct io_udp_header) + msglen);
185
  data = s->data + s->size;
186
  ((struct io_udp_header*)data)->size = msglen;
187
  ((struct io_udp_header*)data)->portdiff = fd_index;
188
  memcpy(data + sizeof(struct io_udp_header), buf, msglen);
189
  s->size += sizeof(struct io_udp_header) + msglen;
190

  
191
  if (++s->counter % s->every) {
192
    c->data = NULL;
193
    return 0;
194
  }
195

  
196
  c->data = s->data;
197
  s->data = NULL;
198
  c->size = s->size;
199
  s->size = 0;
200
  c->id = s->id++;
201
  c->attributes_size = 0;
202
  c->attributes = NULL;
203
  gettimeofday(&now, NULL);
204
  c->timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
205
  s->counter = 0;
206

  
207
  return 1;
208
}
209

  
210
int input_get(struct input_desc *s, struct chunk *c)
211
{
212
  int i;
213

  
214
  fprintf(stderr,"input_get called\n");
215

  
216
  for (i = 0; i < s->ports; i++) {
217
    if (input_get_udp(s, c, i)) {
218
      return 999999;
219
    }
220
  }
221
  return 999999;
222
}
input.c
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <string.h>
12
#include <limits.h>
13

  
14
#include <chunk.h>
15

  
16
#include "input.h"
17
#include "input-stream.h"
18
#include "dbg.h"
19

  
20
struct input_desc {
21
  struct input_stream *s;
22
  int id;
23
  int interframe;
24
  uint64_t start_time;
25
  uint64_t first_ts;
26
};
27

  
28
struct input_desc *input_open(const char *fname, uint16_t flags, int *fds, int fds_size)
29
{
30
  struct input_desc *res;
31
  struct timeval tv;
32

  
33
  if (fds_size >= 1) {
34
    *fds = -1; //This input module needs no fds to monitor
35
  }
36

  
37
  res = malloc(sizeof(struct input_desc));
38
  if (res == NULL) {
39
    return NULL;
40
  }
41
  res->s = input_stream_open(fname, &res->interframe, flags);
42
  if (res->s == NULL) {
43
    free(res);
44
    res = NULL;
45
    return res;
46
  }
47
  gettimeofday(&tv, NULL);
48
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
49
  res->first_ts = 0;
50
  res->id = (res->start_time / res->interframe) % INT_MAX; //TODO: verify 32/64 bit
51

  
52
  return res;
53
}
54

  
55
void input_close(struct input_desc *s)
56
{
57
  input_stream_close(s->s);
58
  free(s);
59
}
60

  
61
int input_get(struct input_desc *s, struct chunk *c)
62
{
63
  struct timeval now;
64
  int64_t delta;
65

  
66
  c->data = chunkise(s->s, s->id, &c->size, &c->timestamp);
67
  if (c->size == -1) {
68
    return -1;
69
  }
70
  if (c->data) {
71
    c->id = s->id++;
72
  }
73
  c->attributes_size = 0;
74
  c->attributes = NULL;
75
  if (s->first_ts == 0) {
76
    s->first_ts = c->timestamp;
77
  }
78
  delta = c->timestamp - s->first_ts + s->interframe;
79
  gettimeofday(&now, NULL);
80
  delta = delta + s->start_time - now.tv_sec * 1000000ULL - now.tv_usec;
81
  dprintf("Delta: %lld\n", delta);
82
  dprintf("Generate Chunk[%d] (TS: %llu)\n", c->id, c->timestamp);
83

  
84
  c->timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
85

  
86
  return delta > 0 ? delta : 0;
87
}
out-stream-avf.c
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6

  
7
#include <libavformat/avformat.h>
8
#include <stdio.h>
9
#include <string.h>
10

  
11
#include "out-stream.h"
12
#include "dbg.h"
13
#include "payload.h"
14

  
15
static const char *output_format;
16
static const char *output_file;
17

  
18
static int64_t prev_pts, prev_dts;
19

  
20
static enum CodecID libav_codec_id(uint8_t mytype)
21
{
22
  switch (mytype) {
23
    case 1:
24
      return CODEC_ID_MPEG2VIDEO;
25
    case 2:
26
      return CODEC_ID_H261;
27
    case 3:
28
      return CODEC_ID_H263P;
29
    case 4:
30
      return CODEC_ID_MJPEG;
31
    case 5:
32
      return CODEC_ID_MPEG4;
33
    case 6:
34
      return CODEC_ID_FLV1;
35
    case 7:
36
      return CODEC_ID_SVQ3;
37
    case 8:
38
      return CODEC_ID_DVVIDEO;
39
    case 9:
40
      return CODEC_ID_H264;
41
    case 10:
42
      return CODEC_ID_THEORA;
43
    case 11:
44
      return CODEC_ID_SNOW;
45
    case 12:
46
      return CODEC_ID_VP6;
47
    case 13:
48
      return CODEC_ID_DIRAC;
49
    default:
50
      fprintf(stderr, "Unknown codec %d\n", mytype);
51
      return 0;
52
  }
53
}
54

  
55
static AVFormatContext *format_init(const uint8_t *data)
56
{
57
  AVFormatContext *of;
58
  AVCodecContext *c;
59
  AVOutputFormat *outfmt;
60
  int width, height, frame_rate_n, frame_rate_d;
61
  uint8_t codec;
62

  
63
  av_register_all();
64

  
65
  payload_header_parse(data, &codec, &width, &height, &frame_rate_n, &frame_rate_d);
66
  dprintf("Frame size: %dx%d -- Frame rate: %d / %d\n", width, height, frame_rate_n, frame_rate_d);
67
  outfmt = av_guess_format(output_format, output_file, NULL);
68
  of = avformat_alloc_context();
69
  if (of == NULL) {
70
    return NULL;
71
  }
72
  of->oformat = outfmt;
73
  av_new_stream(of, 0);
74
  c = of->streams[0]->codec;
75
  c->codec_id = libav_codec_id(codec);
76
  c->codec_type = CODEC_TYPE_VIDEO;
77
  c->width = width;
78
  c->height= height;
79
  c->time_base.den = frame_rate_n;
80
  c->time_base.num = frame_rate_d;
81
  of->streams[0]->avg_frame_rate.num = frame_rate_n;
82
  of->streams[0]->avg_frame_rate.den = frame_rate_d;
83
  c->pix_fmt = PIX_FMT_YUV420P;
84

  
85
  prev_pts = 0;
86
  prev_dts = 0;
87

  
88
  return of;
89
}
90

  
91
int out_stream_init(const char *config)
92
{
93
  output_format = "nut";
94
  if (config) {
95
    char *colon;
96
    output_file = strdup(config);
97
  } else {
98
    output_file = "/dev/stdout";
99
  }
100

  
101
  return 1;
102
}
103

  
104
void chunk_write(int id, const uint8_t *data, int size)
105
{
106
  static AVFormatContext *outctx;
107
  const int header_size = VIDEO_PAYLOAD_HEADER_SIZE; 
108
  int frames, i;
109
  const uint8_t *p;
110

  
111
  if (data[0] > 127) {
112
    fprintf(stderr, "Error! Non video chunk: %x!!!\n", data[0]);
113
    return;
114
  }
115
  if (outctx == NULL) {
116
    outctx = format_init(data);
117
    if (outctx == NULL) {
118
      fprintf(stderr, "Format init failed\n");
119

  
120
      return;
121
    }
122
    av_set_parameters(outctx, NULL);
123
    snprintf(outctx->filename, sizeof(outctx->filename), "%s", output_file);
124
    dump_format(outctx, 0, output_file, 1);
125
    url_fopen(&outctx->pb, output_file, URL_WRONLY);
126
    av_write_header(outctx);
127
  }
128

  
129
  frames = data[header_size - 1];
130
  p = data + header_size + FRAME_HEADER_SIZE * frames;
131
  for (i = 0; i < frames; i++) {
132
    AVPacket pkt;
133
    int64_t pts, dts;
134
    int frame_size;
135

  
136
    frame_header_parse(data + header_size + FRAME_HEADER_SIZE * i,
137
                       &frame_size, &pts, &dts);
138
    dprintf("Frame %d PTS1: %d\n", i, pts);
139
    av_init_packet(&pkt);
140
    pkt.stream_index = 0;	// FIXME!
141
    if (pts != -1) {
142
      pts += (pts < prev_pts - ((1L << 31) - 1)) ? ((prev_pts >> 32) + 1) << 32 : (prev_pts >> 32) << 32;
143
      dprintf(" PTS2: %d\n", pts);
144
      prev_pts = pts;
145
      dprintf("Frame %d has size %d --- PTS: %lld DTS: %lld\n", i, frame_size,
146
                                             av_rescale_q(pts, outctx->streams[0]->codec->time_base, AV_TIME_BASE_Q),
147
                                             av_rescale_q(dts, outctx->streams[0]->codec->time_base, AV_TIME_BASE_Q));
148
      pkt.pts = av_rescale_q(pts, outctx->streams[0]->codec->time_base, outctx->streams[0]->time_base);
149
    } else {
150
      pkt.pts = AV_NOPTS_VALUE;
151
    }
152
    dts += (dts < prev_dts - ((1L << 31) - 1)) ? ((prev_dts >> 32) + 1) << 32 : (prev_dts >> 32) << 32;
153
    prev_dts = dts;
154
    pkt.dts = av_rescale_q(dts, outctx->streams[0]->codec->time_base, outctx->streams[0]->time_base);
155
    pkt.data = p;
156
    p += frame_size;
157
    pkt.size = frame_size;
158
    av_interleaved_write_frame(outctx, &pkt);
159
  }
160
}
out-stream-dummy.c
1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <unistd.h>
7
#include <stdint.h>
8
#include <stdio.h>
9

  
10
#include "out-stream.h"
11
#include "dbg.h"
12
#include "payload.h"
13

  
14
static int outfd = 1;
15

  
16
int out_stream_init(const char *config)
17
{
18
  return 1;
19
}
20

  
21
void chunk_write(int id, const uint8_t *data, int size)
22
{
23
  const int header_size = VIDEO_PAYLOAD_HEADER_SIZE;
24
  int width, height, frame_rate_n, frame_rate_d, frames;
25
  int i;
26
  uint8_t codec;
27

  
28
  payload_header_parse(data, &codec, &width, &height, &frame_rate_n, &frame_rate_d);
29
  if (codec != 1) {
30
    fprintf(stderr, "Error! Non video chunk: %x!!!\n", codec);
31
    return;
32
  }
33
  dprintf("Frame size: %dx%d -- Frame rate: %d / %d\n", width, height, frame_rate_n, frame_rate_d);
34
  frames = data[9];
35
  for (i = 0; i < frames; i++) {
36
    int frame_size;
37
    int32_t pts, dts;
38

  
39
    frame_header_parse(data, &frame_size, &pts, &dts);
40
    dprintf("Frame %d has size %d\n", i, frame_size);
41
  }
42
#ifdef DEBUGOUT
43
#define buff_size 8 // HACK!
44
  fprintf(stderr, "\tOut Chunk[%d] - %d: %s\n", id, id % buff_size, data + header_size + frames * FRAME_HEADER_SIZE);
45
#else
46
  write(outfd, data + header_size + frames * FRAME_HEADER_SIZE, size - header_size - frames * FRAME_HEADER_SIZE);
47
#endif
48
}
out-stream-udp.c
1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/socket.h>
7
#include <netinet/in.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdint.h>
11
#include <limits.h>
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <strings.h>
15

  
16
#include "out-stream.h"
17
#include "dbg.h"
18
#include "payload.h"
19
#include "io_udp.h"
20

  
21
static int outfd = -1;
22
static char ip[16];
23
static int base_port;
24
static int ports;
25

  
26
static int config_parse(const char *config)
27
{
28
  int res, i;
29

  
30
  res = sscanf(config, "udp://%15[0-9.]:%d%n", ip, &base_port, &i);
31
  if (res < 2) {
32
    fprintf(stderr,"error parsing output config: %s\n", config);
33
    return -1;
34
  }
35
  if (*(config + i) == 0) {
36
    ports = INT_MAX;
37
  } else {
38
    int max_port;
39
    res = sscanf(config + i, "-%d", &max_port);
40
    if (res == 1) {
41
      ports = max_port - base_port + 1;
42
      if (ports <=0) {
43
        fprintf(stderr,"error parsing output config: negative port range %s\n", config + i);
44
        return -2;
45
      }
46
    } else {
47
      fprintf(stderr,"error parsing output config: bad port range %s\n", config + i);
48
      return -3;
49
    }
50
  }
51

  
52
  return 1;
53
}
54

  
55

  
56
int out_stream_init(const char *config)
57
{
58
  int res;
59

  
60
  if (!config) {
61
    fprintf(stderr, "udp output not configured, no output. Use udp://127.0.0.1:<port>\n");
62
    return 1;
63
  }
64

  
65
  outfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
66
  if (outfd < 0) {
67
    fprintf(stderr, "output socket: can't open\n");
68
    return -2;
69
  }
70

  
71
  res = config_parse(config);
72
  if (res < 0) {
73
    close(outfd);
74
    return -3;
75
  }
76

  
77
  return 1;
78
}
79

  
80
void packet_write(const uint8_t *data, int size)
81
{
82
  struct sockaddr_in si_other;
83

  
84
  bzero(&si_other, sizeof(si_other));
85
  si_other.sin_family = AF_INET;
86
  si_other.sin_port = htons((base_port + ((const struct io_udp_header*)data)->portdiff));
87
  if (inet_aton(ip, &si_other.sin_addr) == 0) {
88
     fprintf(stderr, " output socket: inet_aton() failed\n");
89
     return;
90
  }
91

  
92
  sendto(outfd, data + sizeof (struct io_udp_header), size - sizeof (struct io_udp_header), 0, (struct sockaddr *) &si_other, sizeof(si_other));
93
}
94

  
95
void chunk_write(int id, const uint8_t *data, int size)
96
{
97
  int i = 0;
98

  
99
  if (outfd < 0) {
100
    return;
101
  }
102

  
103
  while (i < size) {
104
    int psize = ((const struct io_udp_header*)(data + i))->size;
105
    packet_write(data + i, sizeof(struct io_udp_header)  + psize);
106
    i += sizeof(struct io_udp_header) + psize;
107
  }
108
}

Also available in: Unified diff