Statistics
| Branch: | Revision:

grapes / src / Chunkiser / input-stream-rtp.c @ 54b02213

History | View | Annotate | Download (18.3 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2014 Davide Kirchner
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#ifndef _WIN32
9
#include <sys/socket.h>
10
#include <netinet/in.h>
11
#include <fcntl.h>
12
#else
13
#include <winsock2.h>
14
#endif
15
#include <unistd.h>
16
#include <stdlib.h>
17
#include <stdint.h>
18
#include <stdio.h>
19
#include <string.h>
20
#include <limits.h>
21
#include <string.h>
22
#include <stdarg.h>
23

    
24
#ifdef DEBUG
25
#pragma message "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"
26
#pragma message "RTP Chunkiser compiling in debug mode"
27
#endif
28

    
29
#pragma GCC diagnostic ignored "-Wcast-qual"  // Ignore warnings from pj libs
30
#include <pjlib.h>
31
#include <pjmedia.h>
32
#pragma GCC diagnostic pop  // Restore warnings to command line options
33

    
34
#ifndef DEBUG
35
#define NDEBUG
36
#endif
37
#include <assert.h>
38

    
39
#include "int_coding.h"
40
#include "payload.h"
41
#include "grapes_config.h"
42
#include "chunkiser_iface.h"
43
#include "stream-rtp.h"
44

    
45
// ntp timestamp management utilities
46
#define TS_SHIFT 32
47
#define TS_FRACT_MASK ((1ULL << TS_SHIFT) - 1)
48

    
49
#define UDP_MAX_SIZE 65536   // 2^16
50
//#define RTP_DEFAULT_CHUNK_SIZE 20
51
#define RTP_DEFAULT_CHUNK_SIZE 65536
52
#define RTP_DEFAULT_MAX_DELAY (1ULL << (TS_SHIFT-2))  // 250 ms
53

    
54
struct rtp_ntp_ts {
55
  // both in HOST byte order
56
  uint64_t ntp;
57
  uint32_t rtp;
58
};
59

    
60
struct rtp_stream {
61
  struct pjmedia_rtp_session rtp;
62
  struct pjmedia_rtcp_session rtcp;
63
  struct rtp_ntp_ts tss[2];
64
  int last_updated_ts;  // index in tss
65
};
66

    
67
struct chunkiser_ctx {
68
  // fixed context (set at opening time)
69
  uint64_t start_time;    // TODO:  What is this for?? Is it needed?
70
                          // (it was there un UDP chunkiser)
71
  int max_size;           // max `buff` size
72
  uint64_t max_delay;     // max delay to accumulate in a chunk [ntp format]
73
  int video_stream_id;    // index in `streams`
74
  int rfc3551;
75
  int verbosity;
76
  int rtp_log;
77
  int fds[RTP_UDP_PORTS_NUM_MAX + 1];
78
  int fds_len;  // even if "-1"-terminated, save length to make things easier
79
  struct rtp_stream streams[RTP_STREAMS_NUM_MAX];  // its len is fds_len/2
80
  // running context (set at chunkising time)
81
  uint8_t *buff;          // chunk buffer
82
  int size;               // its current size
83
  int next_fd;            // next fd (index in fsd array) to be tried (in a round-robin)
84
  int counter;            // number of chunks sent
85
  uint64_t min_ntp_ts;    // ntp timestamp of first packet in chunk
86
  uint64_t max_ntp_ts;    // ntp timestamp of last packet in chunk
87
  int ntp_ts_status;      // known (1), yet unkwnown (0) or unknown (-1)
88
};
89

    
90
/* Holds relevant information extracted from each RTP packet */
91
struct rtp_info {
92
  uint16_t valid;
93
  uint16_t marker;
94
  uint64_t ntp_ts;
95
};
96

    
97

    
98
/* Define a printf-like function for logging */
99
static void printf_log(const struct chunkiser_ctx *ctx, int loglevel,
100
                       const char *fmt, ...) {
101
  va_list args;
102
  FILE* s = stderr;
103
  if (loglevel <= ctx->verbosity && strlen(fmt)) {
104
    fprintf(s, "RTP Chunkiser: ");
105
    if (loglevel == 0) {
106
      fprintf(s, "Error: ");
107
    }
108
    va_start(args, fmt);
109
    vfprintf(s, fmt, args);
110
    va_end(args);
111
    fprintf(s, "\n");
112
    if (loglevel == 0) {
113
      fflush(s);
114
    }
115
#ifdef DEBUG
116
    fflush(s);
117
#endif
118
  }
119
}
120

    
121

    
122
/* SUPPORT FUNCTIONS FOR TIMESTAMPS MANAGEMENT AND CONVERSIONS */
123

    
124
/* Multiplies 2 uint64_t, returning 0 and logging error on overflow */
125
static inline uint64_t mult_overflow(const struct chunkiser_ctx *ctx, uint64_t a, uint64_t b) {
126
  int n_a, n_b;
127
  uint64_t res;
128
  if (a == 0ULL || b == 0ULL) {
129
    // skip overflow ckecking (__builtin_clzll may fail)
130
    return 0ULL;
131
  }
132
  res = a * b;
133

    
134
  n_a = 64 - __builtin_clzll(a);
135
  n_b = 64 - __builtin_clzll(b);
136
  if (n_a + n_b > 64) {
137
    /* approximate check failed, proceed with exact check */
138
    if (res / a != b) {
139
      printf_log(ctx, 2, "Overflow during timestamp computation.");
140
      return 0ULL;
141
    }
142
  }
143
  return res;
144
}
145

    
146

    
147
/* Converts timestamps. If impossible, returns 0 */
148
static uint64_t rtptontp(const struct chunkiser_ctx *ctx,
149
                         const struct rtp_stream *stream, uint32_t rtp) {
150
  // latest rtp-ntp matching
151
  const struct rtp_ntp_ts *b = &stream->tss[stream->last_updated_ts];
152
  // second latest rtp-ntp matching
153
  const struct rtp_ntp_ts *a = &stream->tss[1 - stream->last_updated_ts];
154
  uint64_t tmp;
155

    
156
  if ((a->rtp == 0 && a->ntp == 0) || (b->rtp == 0 && b->ntp == 0)) {
157
    printf_log(ctx, 2, "NTP timestamp not (yet) computable.");
158
    return 0ULL;
159
  }
160
  else if (b->rtp - a->rtp == 0) {  // Will have to divide by this
161
    printf_log(ctx, 1,
162
               "Warning: there were two equal timestamps in the RTCP flow!");
163
    return 0ULL;
164
  }
165
  else {
166
    /*
167
         a         b    new
168
      ---|---------|-----*--> RTP timeline
169
      ---|---------|-----?--> NTP timeline
170
      a and b are known via RTCP. We assume (new _after_ a) and (b _after_ a)
171
      Note both timelines are circular
172
     */
173
    // Make sure the time interval (a -> b) is shorter than (b -> a)
174
    // (One of the two intervals will encompass a timer overflow).
175
    // Otherways something is probably very wrong.
176
    assert((b->rtp - a->rtp) < (a->rtp - b->rtp)); 
177
    // Similarly, with newly arrived and a
178
    assert((rtp - a->rtp) < (a->rtp - rtp));
179
    // Similarly with ntp
180
    assert((b->ntp - a->ntp) < (a->ntp - b->ntp));
181

    
182
    // return (rtp - a->rtp) * (b->ntp - a->ntp) / (b->rtp - a->rtp) + a->ntp
183
    // but handle multiplication overflow
184

    
185
    tmp = mult_overflow(ctx, (rtp - a->rtp), (b->ntp - a->ntp));
186
    if (tmp == 0) {
187
      return 0ULL;
188
    }
189
    return tmp / (b->rtp - a->rtp) + a->ntp;
190
  }
191
}
192

    
193

    
194
/* Returns min/max between 2 ntp timestamps, assuming inputs are
195
   "close enough"
196
 */
197
static uint64_t ts_min(uint64_t a, uint64_t b) {
198
  if (b - a < a - b) return a;
199
  else return b;
200
}
201
static uint64_t ts_max(uint64_t a, uint64_t b) {
202
  if (b - a < a - b) return b;
203
  else return a;
204
}
205

    
206

    
207
/* SUPPORT FUNCTIONS FOR RTP LIBRARY MANAGEMENT */
208

    
209
/* Initializes PJSIP. Returns 0 on success, nonzero on failure */
210
static int rtplib_init(struct chunkiser_ctx *ctx) {
211
  int i;
212
  struct pjmedia_rtp_session_setting rtp_s;
213
  struct pjmedia_rtcp_session_setting rtcp_s;
214

    
215
  pj_init();
216

    
217
  rtp_s.flags = 0;
218
  pjmedia_rtcp_session_setting_default(&rtcp_s);
219
  rtcp_s.clock_rate = 1;  // Just to avoid Floating point exception
220
  for (i=0; i<ctx->fds_len/2; i++) {
221
    pjmedia_rtcp_init2(&ctx->streams[i].rtcp, &rtcp_s);
222
    if (pjmedia_rtp_session_init2(&ctx->streams[i].rtp, rtp_s) != PJ_SUCCESS) {
223
      printf_log(ctx, 0, "Error initialising pjmedia RTP session");
224
      return 1;
225
    }
226
    ctx->streams[i].last_updated_ts = 0;
227
    ctx->streams[i].tss[0].ntp = ctx->streams[i].tss[0].rtp = 0;
228
    ctx->streams[i].tss[1].ntp = ctx->streams[i].tss[1].rtp = 0;
229
  }
230
  return 0;
231
}
232

    
233

    
234
/* Inspects the given RTP packet and fills `info` accordingly. */
235
static void rtp_packet_received(struct chunkiser_ctx *ctx, int stream_id, uint8_t *pkt, int size, struct rtp_info *info) {
236
  const struct pjmedia_rtp_hdr *rtp_h;
237
  const void *rtp_p;
238
  int rtp_p_len;
239
  struct rtp_stream *stream = &ctx->streams[stream_id];
240
  if (pjmedia_rtp_decode_rtp(&stream->rtp,
241
                             pkt, size,
242
                             &rtp_h, &rtp_p, &rtp_p_len)
243
      == PJ_SUCCESS) {
244
    pjmedia_rtcp_rx_rtp(&stream->rtcp, ntohs(rtp_h->seq),
245
                        ntohl(rtp_h->ts), rtp_p_len);
246
    pjmedia_rtp_session_update(&stream->rtp, rtp_h, NULL);
247
    printf_log(ctx, 2, "  -> valid RTP packet from ssrc:%u with "
248
               "type:%u, marker:%u, seq:%u, timestamp:%u",
249
               ntohl(rtp_h->ssrc), rtp_h->pt, rtp_h->m, ntohs(rtp_h->seq),
250
               ntohl(rtp_h->ts));
251
    info->valid = 1;
252
    info->marker = rtp_h->m;
253
    info->ntp_ts = rtptontp(ctx, stream, ntohl(rtp_h->ts));
254
  }
255
  else {
256
    printf_log(ctx, 1, "Warning: got invalid RTP packet (forwarding anyway).");
257
    info->valid = 0;
258
  }
259
}
260

    
261

    
262
/* Updates the context upon reception of RTCP packets. */
263
static void rtcp_packet_received(struct chunkiser_ctx *ctx, int stream_id, uint8_t *pkt, int size) {
264
  struct rtp_stream *stream = &ctx->streams[stream_id];
265
  printf_log(ctx, 2, "  -> RTCP packet");
266
  pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, size);
267

    
268
  // Parse RTCP packet with low-level API.
269
  // Few lines taken from pjmedia/src/pjmedia/rtcp.c in pjproject
270
  {
271
    pj_uint8_t *p, *p_end;
272
    p = (pj_uint8_t*)pkt;
273
    p_end = p + size;
274
    while (p < p_end) {
275
      pjmedia_rtcp_common *common = (pjmedia_rtcp_common*)p;
276
      unsigned len = (pj_ntohs((pj_uint16_t)common->length)+1) * 4;
277
      if (common->pt == 200) {  // packet type 200 is Sender Report (SR)
278
        const pjmedia_rtcp_sr *sr =
279
          (pjmedia_rtcp_sr*) (((char*)pkt) + sizeof(pjmedia_rtcp_common));
280
        stream->last_updated_ts = (stream->last_updated_ts + 1) % 2;
281
        stream->tss[stream->last_updated_ts].rtp =
282
          ntohl(sr->rtp_ts);
283
        stream->tss[stream->last_updated_ts].ntp =
284
          (((uint64_t)ntohl(sr->ntp_sec)) << TS_SHIFT)
285
          + (((uint64_t)ntohl(sr->ntp_frac)) & TS_FRACT_MASK);
286
      }
287
      p += len;
288
    }
289
  }
290
  return;
291
}
292

    
293

    
294
/* SUPPORT FUNCTIONS FOR UDP SOCKETS MANAGEMENT */
295

    
296
static int input_get_udp(uint8_t *data, int fd) {
297
  ssize_t msglen;
298

    
299
  msglen = recv(fd, data, UDP_MAX_SIZE, 0);
300
  if (msglen <= 0) {
301
    return 0;
302
  }
303

    
304
  return msglen;
305
}
306

    
307

    
308
static int listen_udp(const struct chunkiser_ctx *ctx, int port) {
309
  struct sockaddr_in servaddr;
310
  int r;
311
  int fd;
312

    
313
  fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
314
  if (fd < 0) {
315
    return -1;
316
  }
317

    
318
#ifndef _WIN32
319
  fcntl(fd, F_SETFL, O_NONBLOCK);
320
#else
321
  {
322
    unsigned long nonblocking = 1;
323
    ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
324
  }
325
#endif
326

    
327
  memset(&servaddr, 0, sizeof(servaddr));
328
  servaddr.sin_family = AF_INET;
329
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
330
  servaddr.sin_port = htons(port);
331
  r = bind(fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
332
  if (r < 0) {
333
    close(fd);
334

    
335
    return -1;
336
  }
337
  printf_log(ctx, 2, "  opened fd:%d for port:%d", fd, port);
338

    
339
  return fd;
340
}
341

    
342

    
343
/* SUPPORT FUNCTIONS FOR COMMAND-LINE CONFIGURATION */
344

    
345
/*
346
  Read config string `config`, updates `ctx` accordingly.
347
  Also open required UDP ports, so as to save their file descriptors
348
  in context.
349
  Return 0 on success, nonzero on failure.
350
 */
351
static int conf_parse(struct chunkiser_ctx *ctx, const char *config) {
352
  int ports[RTP_UDP_PORTS_NUM_MAX + 1];
353
  struct tag *cfg_tags;
354
  int i;
355
  const char *error_str = NULL;
356
  int chunk_size;
357
  int max_delay_input;
358

    
359
  /* Default context values */
360
  ctx->video_stream_id = -2;
361
  ctx->rfc3551 = 0;
362
  ctx->verbosity = 1;
363
  ctx->rtp_log = 0;
364
  chunk_size = RTP_DEFAULT_CHUNK_SIZE;
365
  ctx->max_size = chunk_size + UDP_MAX_SIZE;
366
  ctx->max_delay = RTP_DEFAULT_MAX_DELAY;
367
  for (i=0; i<RTP_UDP_PORTS_NUM_MAX + 1; i++) {
368
    ports[i] = -1;
369
  }
370

    
371
  /* Parse options */
372
  cfg_tags = grapes_config_parse(config);
373
  if (cfg_tags) {
374
    grapes_config_value_int(cfg_tags, "verbosity", &(ctx->verbosity));
375
    printf_log(ctx, 2, "Verbosity set to %i", ctx->verbosity);
376

    
377
    grapes_config_value_int(cfg_tags, "rtp_log", &(ctx->rtp_log));
378
    if (ctx->rtp_log) {
379
        printf_log(ctx, 2, "Producing parsable rtp log. "
380
                   "TimeStamps are expressed in 2^-32 s.");
381
    }
382

    
383
    grapes_config_value_int(cfg_tags, "rfc3551", &(ctx->rfc3551));
384
    printf_log(ctx, 2, "%ssing RFC 3551",
385
               (ctx->rfc3551 ? "U" : "Not u"));
386

    
387
    if (grapes_config_value_int(cfg_tags, "chunk_size", &chunk_size)) {
388
      ctx->max_size = chunk_size + UDP_MAX_SIZE;
389
    }
390
    printf_log(ctx, 2, "Chunk size is %d bytes", chunk_size);
391
    printf_log(ctx, 2, "Maximum chunk size is thus %d bytes", ctx->max_size);
392

    
393
    if (grapes_config_value_int(cfg_tags, "max_delay_ms", &max_delay_input)) {
394
      ctx->max_delay = max_delay_input * (1ULL << TS_SHIFT) / 1000;
395
    }
396
    else if (grapes_config_value_int(cfg_tags, "max_delay_s", &max_delay_input)) {
397
      ctx->max_delay = max_delay_input * (1ULL << TS_SHIFT);
398
    }
399
    printf_log(ctx, 2, "Maximum delay set to %.0f ms.",
400
               ctx->max_delay * 1000.0 / (1ULL << TS_SHIFT));
401

    
402
    ctx->fds_len =
403
      rtp_ports_parse(cfg_tags, ports, &(ctx->video_stream_id), &error_str);
404

    
405
    if (ctx->rfc3551) {
406
      printf_log(ctx, 2,
407
                 "The video stream for RFC 3551 is the one on ports %d:%d",
408
                 ports[ctx->video_stream_id], ports[ctx->video_stream_id+1]);
409
    }
410
  }
411
  free(cfg_tags);
412

    
413
  if (ctx->fds_len == 0) {
414
    printf_log(ctx, 0, error_str);
415
    return 1;
416
  }
417

    
418
  /* Open ports */
419
  for (i = 0; ports[i] >= 0; i++) {
420
    ctx->fds[i] = listen_udp(ctx, ports[i]);
421
    if (ctx->fds[i] < 0) {
422
      printf_log(ctx, 0, "Cannot open port %d", ports[i]);
423
      for (; i>=0 ; i--) {
424
        close(ctx->fds[i]);
425
      }
426
      return 2;
427
    }
428
  }
429
  ctx->fds[i] = -1;
430
  if (i != ctx->fds_len) {
431
    printf_log(ctx, 0, "Something very wrong happended.");
432
    return 3;
433
  }
434

    
435
  return 0;
436
}
437

    
438

    
439
/* ACTUAL "PUBLIC" FUNCTIONS, exposed via `struct chunkiser_iface in_rtp` */
440

    
441
static struct chunkiser_ctx *rtp_open(const char *fname, int *period, const char *config) {
442
  struct chunkiser_ctx *res;
443
  struct timeval tv;
444

    
445
  res = malloc(sizeof(struct chunkiser_ctx));
446
  if (res == NULL) {
447
    return NULL;
448
  }
449

    
450
  if(conf_parse(res, config) != 0) {
451
    printf_log(res, 0, "Error while parsing input parameters.");
452
    free(res);
453
    return NULL;
454
  }
455
  printf_log(res, 2, "Parameter parsing was successful.");
456

    
457
  if (rtplib_init(res) != 0) {
458
    free(res);
459
    return NULL;
460
  }
461

    
462
  gettimeofday(&tv, NULL);
463
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
464

    
465
  res->buff = NULL;
466
  res->size = 0;
467
  res->counter = 0;
468
  res->next_fd = 0;
469
  res->ntp_ts_status = 0;
470
  *period = 0;
471

    
472
  return res;
473
}
474

    
475

    
476
static void rtp_close(struct chunkiser_ctx  *ctx) {
477
  int i;
478

    
479
  if (ctx->buff != NULL) {
480
    free(ctx->buff);
481
  }
482
  for (i = 0; ctx->fds[i] >= 0; i++) {
483
    close(ctx->fds[i]);
484
  }
485
  free(ctx);
486
}
487

    
488

    
489
/*
490
  Creates a chunk.  If the chunk is created successfully, returns a
491
  pointer to an alloccated memory buffer to chunk content.  The caller
492
  should `free` it up.  In this case, size and ts are set to the
493
  chunk's size and timestamp.
494

495
  If no data is available, returns NULL and size=0
496

497
  In case of error, returns NULL and size=-1
498
 */
499
static uint8_t *rtp_chunkise(struct chunkiser_ctx *ctx, int id, int *size, uint64_t *ts) {
500
  int status;  // -1: buffer full, send now
501
               //  0: Go on, do not send;
502
               //  1: send after loop;
503
               //  2: do one more round-robin loop now
504
  int j;
505
  uint8_t *res;
506

    
507
  // Allocate new buffer if needed
508
  if (ctx->buff == NULL) {
509
    ctx->buff = malloc(ctx->max_size);
510
    ctx->ntp_ts_status = 0;
511
    if (ctx->buff == NULL) {
512
      printf_log(ctx, 0, "Could not alloccate chunk buffer: exiting.");
513
      *size = -1;
514
      return NULL;
515
    }
516
  }
517
  do {
518
    status = 0;
519
    // Check open ports for incoming UDP packets in a round-robin
520
    for (j = 0; j < ctx->fds_len && status >= 0; j++) {
521
      int i = (ctx->next_fd + j) % ctx->fds_len;
522
      int new_pkt_size;
523
      uint8_t *new_pkt_start =
524
        ctx->buff + ctx->size + RTP_PAYLOAD_PER_PKT_HEADER_SIZE;
525
      struct rtp_info info;
526

    
527
      assert((ctx->max_size - ctx->size)
528
             >= (UDP_MAX_SIZE + RTP_PAYLOAD_PER_PKT_HEADER_SIZE));
529

    
530
      new_pkt_size = input_get_udp(new_pkt_start, ctx->fds[i]);
531
      if (new_pkt_size) {
532
        printf_log(ctx, 2, "Got UDP message of size %d from port id #%d",
533
                   new_pkt_size, i);
534
        if (i % 2 == 0) {  // RTP packet
535
          rtp_packet_received(ctx, i/2, new_pkt_start, new_pkt_size, &info);
536
          if (info.valid) {
537
            printf_log(ctx, 2, "  packet has NTP timestamp (seconds) %llu",
538
                       info.ntp_ts >> TS_SHIFT);
539
            if (ctx->rtp_log) {
540
              fprintf(stderr, "[RTP_LOG] timestamp=%llu size=%d port_id=%d\n", info.ntp_ts, new_pkt_size, i);
541
            }
542
            // update chunk timestamp
543
            if (info.ntp_ts == 0ULL) {
544
              // packet with unknown ts, ignore all timestamps
545
              ctx->ntp_ts_status = -1;
546
            }
547
            if (ctx->ntp_ts_status >= 0) {
548
              switch (ctx->ntp_ts_status) {
549
              case 0:
550
                ctx->min_ntp_ts = info.ntp_ts;
551
                ctx->max_ntp_ts = info.ntp_ts;
552
                ctx->ntp_ts_status = 1;
553
                break;
554
              case 1:
555
                ctx->min_ntp_ts = ts_min(ctx->min_ntp_ts, info.ntp_ts);
556
                ctx->max_ntp_ts = ts_max(ctx->max_ntp_ts, info.ntp_ts);
557
                break;
558
              }
559
              if ((ctx->max_ntp_ts - ctx->min_ntp_ts) >= ctx->max_delay) {
560
                printf_log(ctx, 2, "  Max delay reached: %.0f over %.0f ms",
561
                           (ctx->max_ntp_ts - ctx->min_ntp_ts) * 1000.0 / (1ULL << TS_SHIFT),
562
                           ctx->max_delay * 1000.0 / (1ULL << TS_SHIFT));
563
                status = ((status > 1) ? status : 1); // status = max(status, 1)
564
              }
565
            }
566
            // Marker bit semantic for video stream in rfc3551
567
            if (ctx->rfc3551 && i/2 == ctx->video_stream_id && !info.marker) {
568
              printf_log(ctx, 2, "  Waiting for another part of this frame!");
569
              status = 2;
570
            }
571
          }
572
        }
573
        else {  // RTCP packet
574
          rtcp_packet_received(ctx, i/2, new_pkt_start, new_pkt_size);
575
        }
576
        // append packet to chunk
577
        rtp_payload_per_pkt_header_set(ctx->buff + ctx->size, new_pkt_size, i);
578
        ctx->size += new_pkt_size + RTP_PAYLOAD_PER_PKT_HEADER_SIZE;
579

    
580
        if ((ctx->max_size - ctx->size)
581
            < (UDP_MAX_SIZE + RTP_PAYLOAD_PER_PKT_HEADER_SIZE)
582
            ) {  // Not enough space left in buffer: send chunk
583
          printf_log(ctx, 2, "Buffer size reached: (%d over %d - max %d)",
584
                     ctx->size, ctx->max_size - UDP_MAX_SIZE, ctx->max_size);
585
          status = -1;
586
          ctx->next_fd = i;
587
        }
588
      }
589
    }
590
  } while (status >= 2);
591

    
592
  if (status == 0) {
593
    *size = 0;
594
    res = NULL;
595
  }
596
  else {
597
    struct timeval now;
598
    res = ctx->buff;
599
    *size = ctx->size;
600
    gettimeofday(&now, NULL);
601
    *ts = now.tv_sec * 1000000ULL + now.tv_usec;
602
    ctx->counter++;
603
    ctx->buff = NULL;
604
    ctx->size = 0;
605
    printf_log(ctx, 2, "Chunk created: size %i, timestamp %lli", *size, *ts);
606
  }
607

    
608
  return res;
609
}
610

    
611

    
612
const int *rtp_get_fds(const struct chunkiser_ctx *ctx) {
613
  return ctx->fds;
614
}
615

    
616

    
617
struct chunkiser_iface in_rtp = {
618
  .open = rtp_open,
619
  .close = rtp_close,
620
  .chunkise = rtp_chunkise,
621
  .get_fds = rtp_get_fds,
622
};
623

    
624

    
625
#ifdef DEBUG
626
#pragma message "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
627
#endif