Statistics
| Branch: | Revision:

peerstreamer-src / src / msg_buffer.c @ b87e7c7a

History | View | Annotate | Download (13.5 KB)

1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <string.h>
4
#include "msg_buffer.h"
5

    
6

    
7
struct msg_buffer_slot {
8
        uint8_t *buf;
9
        uint32_t size;
10
        int busy;
11
};
12

    
13
struct msg_buffer {
14
        struct msg_buffer_slot *slots;
15
        uint32_t nslots;
16
        int32_t next_slot_push;
17
        int32_t next_slot_pop;
18

    
19
        struct timeval last_push;
20
        struct timeval last_pop;
21

    
22
        uint32_t size;
23
        uint32_t busy_slots;
24

    
25
        int buffer_started;
26
        struct timeval buffer_started_tv;
27
        uint32_t start_buf_to_us;
28
        uint32_t start_buf_size_th;
29
        int start_buf_complete;
30
        int data_ready;
31
        int slow_mode; /* 0 disabled, 1 use th1 parameters, 2 use th2 parameters */
32
        uint32_t th1_size;
33
        uint32_t th1_to;
34
        uint32_t th2_size;
35
        uint32_t th2_to;
36

    
37
        uint32_t flush_to_us;
38
        int flushing;
39

    
40
        uint32_t min_to_us;
41

    
42
        int32_t next_slot_parse;
43

    
44
        int debug;
45
        char *buf_id;
46
};
47

    
48
/* Return 1 if the difference (t2 - t1) is negative, otherwise 0.  */
49
static int msg_buffer_timeval_subtract(struct timeval *result,
50
                                       struct timeval *t2,
51
                                       struct timeval *t1)
52
{
53
        long int diff = (t2->tv_usec + 1000000 * t2->tv_sec) -
54
                                (t1->tv_usec + 1000000 * t1->tv_sec);
55
        result->tv_sec = diff / 1000000;
56
        result->tv_usec = diff % 1000000;
57

    
58
        return (diff < 0);
59
}
60

    
61
static int msg_buffer_increase_n_slots(struct msg_buffer *msgb)
62
{
63
        uint32_t new_nslots = msgb->nslots << 1;
64
        uint32_t i;
65
        int32_t next_slot_copy = msgb->next_slot_pop;
66

    
67
        struct msg_buffer_slot * new_slots = (struct msg_buffer_slot *)
68
                        malloc(sizeof(struct msg_buffer_slot) * new_nslots);
69

    
70
        if (!new_slots) {
71
                fprintf(stderr,
72
                        "RTP_BUFFER ERROR (%s): new_slots allocation failed\n",
73
                        msgb->buf_id);
74
                return -1;
75
        }
76

    
77
        i = 0;
78
        while (i < msgb->nslots) {
79
                (new_slots + i)->buf = (msgb->slots + next_slot_copy)->buf;
80
                (new_slots + i)->size = (msgb->slots + next_slot_copy)->size;
81
                (new_slots + i)->busy = (msgb->slots + next_slot_copy)->busy;
82

    
83
                i++;
84

    
85
                next_slot_copy = (next_slot_copy + 1) % msgb->nslots;
86
        }
87

    
88
        free(msgb->slots);
89

    
90
        if (msgb->next_slot_parse != MSG_BUFFER_INVALID_SLOT_IDX) {
91
                if (msgb->next_slot_parse >= msgb->next_slot_pop) {
92
                        msgb->next_slot_parse = msgb->next_slot_parse -
93
                                                msgb->next_slot_pop;
94
                } else {
95
                        msgb->next_slot_parse =
96
                                        (msgb->nslots - msgb->next_slot_pop) +
97
                                        msgb->next_slot_parse;
98
                }
99
        }
100

    
101
        msgb->slots = new_slots;
102
        msgb->nslots = new_nslots;
103
        msgb->next_slot_pop = 0;
104
        msgb->next_slot_push = i;
105

    
106
        return 0;
107
}
108

    
109
static void msg_buffer_check_start_buf_to(struct msg_buffer *msgb)
110
{
111
        if (msgb->buffer_started && (msgb->start_buf_to_us > 0)) {
112
                struct timeval tv_now, tv_diff;
113
                gettimeofday(&tv_now, NULL);
114
                msg_buffer_timeval_subtract(&tv_diff, &tv_now,
115
                                        &(msgb->buffer_started_tv));
116
                if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
117
                    msgb->start_buf_to_us) {
118
                        if (msgb->debug) {
119
                                fprintf(stdout,
120
                                    "RTP_BUFFER (%s): initial buffering TO\n",
121
                                    msgb->buf_id);
122
                        }
123
                        msgb->start_buf_complete = 1;
124
                }
125
        }
126
}
127

    
128
struct msg_buffer *msg_buffer_init(int debug, char *id)
129
{
130
        struct msg_buffer *msgb = (struct msg_buffer *)
131
                                        malloc(sizeof(struct msg_buffer));
132

    
133
        if (!msgb) {
134
                fprintf(stderr,
135
                        "ERROR: struct msg_buffer memory allocation failed\n");
136
                goto msgb_err;
137
        }
138

    
139
        memset(msgb, 0, sizeof(struct msg_buffer));
140

    
141
        if (!id) {
142
                fprintf(stderr, "RTP_BUFFER ERROR: id parameter is NULL\n");
143
                goto id_err;
144
        }
145

    
146
        msgb->buf_id = (char *) malloc(strlen(id) + 1);
147

    
148
        if (!(msgb->buf_id)) {
149
                fprintf(stderr, "RTP_BUFFER ERROR: buf_id allocation failed\n");
150
                goto id_err;
151
        }
152

    
153
        strncpy(msgb->buf_id, id, strlen(id) + 1);
154

    
155
        msgb->slots = (struct msg_buffer_slot *)
156
                                malloc(sizeof(struct msg_buffer_slot) *
157
                                MSG_BUFFER_N_START_SLOT);
158

    
159
        if (!(msgb->slots)) {
160
                fprintf(stderr, "RTP_BUFFER ERROR (%s): "
161
                        "struct msg_buffer_slots memory allocation failed\n",
162
                        msgb->buf_id);
163
                goto slots_err;
164
        }
165

    
166
        memset(msgb->slots, 0,
167
               sizeof(struct msg_buffer_slot) * MSG_BUFFER_N_START_SLOT);
168

    
169
        msgb->nslots = MSG_BUFFER_N_START_SLOT;
170
        msgb->next_slot_push = 0;
171
        msgb->next_slot_pop = MSG_BUFFER_INVALID_SLOT_IDX;
172
        gettimeofday(&(msgb->last_push), NULL);
173
        gettimeofday(&(msgb->last_pop), NULL);
174
        msgb->size = 0;
175
        msgb->busy_slots = 0;
176
        msgb->buffer_started = 0;
177
        msgb->start_buf_to_us = 0;
178
        msgb->start_buf_size_th = MSG_BUFFER_START_BUF_SIZE_TH;
179
        msgb->start_buf_complete = 0;
180
        msgb->data_ready = 0;
181
        msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH2;
182
        msgb->th1_size = MSG_BUFFER_TH1_SIZE;
183
        msgb->th1_to = MSG_BUFFER_TH1_TO_US;
184
        msgb->th2_size = MSG_BUFFER_TH2_SIZE;
185
        msgb->th2_to = MSG_BUFFER_TH2_TO_US;
186
        msgb->flush_to_us = MSG_BUFFER_FLUSH_TO_US;
187
        msgb->flushing = 0;
188
        msgb->min_to_us = MSG_BUFFER_MIN_TO_US;
189
        msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
190
        msgb->debug = debug;
191

    
192
        return msgb;
193

    
194
slots_err:
195
        free(msgb->buf_id);
196
id_err:
197
        free(msgb);
198
msgb_err:
199
        return NULL;
200
}
201

    
202
void msg_buffer_destroy(struct msg_buffer **msgb)
203
{
204
        uint32_t i;
205

    
206
        if (*msgb) {
207
                for (i = 0; i < (*msgb)->nslots; i++) {
208
                if (((*msgb)->slots + i)->busy) {
209
                        free(((*msgb)->slots + i)->buf);
210
                        ((*msgb)->slots + i)->buf = NULL;
211
                        ((*msgb)->slots + i)->size = 0;
212
                        ((*msgb)->slots + i)->busy = 0;
213
                }
214
        }
215

    
216
                free((*msgb)->slots);
217
                free((*msgb)->buf_id);
218
                free((*msgb));
219

    
220
                *msgb = NULL;
221
        }
222
}
223

    
224
int msg_buffer_push(struct msg_buffer *msgb, uint8_t *buf, uint32_t size)
225
{
226
        int res = 0;
227
        uint8_t *new_buf = NULL;
228

    
229
        /* If all slots are busy, allocate new ones */
230
        if ((msgb->busy_slots > 0) &&
231
            (msgb->next_slot_push == msgb->next_slot_pop)) {
232
                if ((res = msg_buffer_increase_n_slots(msgb)) == -1) {
233
                        fprintf(stderr, "RTP_BUFFER ERROR (%s): "
234
                                "msg_buffer_increase_n_slots()\n",
235
                                msgb->buf_id);
236
                        return res;
237
                }
238
        }
239

    
240
        /* Copy the new buffer into the next slot */
241
        new_buf = (uint8_t *) malloc(size);
242
        if (!new_buf) {
243
                fprintf(stderr, "RTP_BUFFER ERROR (%s): buffer push failed\n",
244
                        msgb->buf_id);
245
                res = -1;
246
                return res;
247
        }
248

    
249
        memcpy(new_buf, buf, size);
250
        (msgb->slots + msgb->next_slot_push)->buf = new_buf;
251
        (msgb->slots + msgb->next_slot_push)->size = size;
252
        (msgb->slots + msgb->next_slot_push)->busy = 1;
253

    
254
        if (msgb->debug) {
255
                fprintf(stdout, "RTP_BUFFER (%s): "
256
                        "pushed %d bytes in slot %d (tot. size %u)\n",
257
                        msgb->buf_id, size, msgb->next_slot_push,
258
                        msgb->size + size);
259
        }
260

    
261
        if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
262
                msgb->next_slot_pop = msgb->next_slot_push;
263
        }
264

    
265
        msgb->next_slot_push = (msgb->next_slot_push + 1) % msgb->nslots;
266
        msgb->size += size;
267
        msgb->busy_slots++;
268
        msgb->flushing = 0;
269
        gettimeofday(&(msgb->last_push), NULL);
270

    
271
        if (!(msgb->buffer_started)) {
272
                msgb->buffer_started = 1;
273
                gettimeofday(&(msgb->buffer_started_tv), NULL);
274
        }
275

    
276
        if (msgb->size >= msgb->th1_size) {
277
                msgb->data_ready = 1;
278
                msgb->slow_mode = MSG_BUFFER_SLOW_MODE_DISABLED;
279
        }
280
        /* else if (msgb->size >= msgb->th2_size) {
281
                msgb->slow_mode = 1;
282
        } */
283

    
284
        if (!(msgb->start_buf_complete)) {
285
                if (msgb->size >= msgb->start_buf_size_th) {
286
                        msgb->start_buf_complete = 1;
287
                } else {
288
                        msg_buffer_check_start_buf_to(msgb);
289
                }
290
        }
291

    
292
        return res;
293
}
294

    
295
int msg_buffer_pop(struct msg_buffer *msgb, uint8_t **buf)
296
{
297
        int size = 0;
298

    
299
        if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
300
                *buf = NULL;
301
                return size;
302
        }
303

    
304
        *buf = (msgb->slots + msgb->next_slot_pop)->buf;
305
        size = (msgb->slots + msgb->next_slot_pop)->size;
306

    
307
        (msgb->slots + msgb->next_slot_pop)->buf = NULL;
308
        (msgb->slots + msgb->next_slot_pop)->size = 0;
309
        (msgb->slots + msgb->next_slot_pop)->busy = 0;
310

    
311
        if (msgb->debug) {
312
                fprintf(stdout, "RTP_BUFFER (%s): "
313
                        "popped %d bytes in slot %d (tot. size %u)\n",
314
                        msgb->buf_id, size, msgb->next_slot_pop,
315
                        msgb->size - size);
316
        }
317

    
318
        msgb->next_slot_pop = (msgb->next_slot_pop + 1) % msgb->nslots;
319
        (msgb->busy_slots)--;
320
        msgb->size -= size;
321
        gettimeofday(&(msgb->last_pop), NULL);
322

    
323
        if (msgb->busy_slots == 0) {
324
                msgb->next_slot_pop = MSG_BUFFER_INVALID_SLOT_IDX;
325
        }
326

    
327
        if (msgb->size < msgb->th1_size) {
328
                msgb->data_ready = 0;
329

    
330
                if (msgb->slow_mode != MSG_BUFFER_SLOW_MODE_TH2) {
331
                        msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH1;
332
                }
333
        }
334

    
335
        if (msgb->size < msgb->th2_size) {
336
                msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH2;
337
        }
338

    
339
        return size;
340
}
341

    
342
int msg_buffer_get_status(struct msg_buffer *msgb, struct timeval *tv)
343
{
344
        struct timeval tv_now, tv_flush, tv_diff;
345
        long to;
346

    
347
        msg_buffer_check_start_buf_to(msgb);
348

    
349
        gettimeofday(&tv_now, NULL);
350
        msg_buffer_timeval_subtract(&tv_flush, &tv_now, &(msgb->last_push));
351

    
352
        if (!(msgb->flushing) && msgb->start_buf_complete &&
353
            ((tv_flush.tv_usec + 1000000 * tv_flush.tv_sec) >
354
             msgb->flush_to_us)) {
355
                fprintf(stdout,
356
                        "RTP_BUFFER (%s): flush (size %u, busy slots %u)\n",
357
                        msgb->buf_id, msgb->size, msgb->busy_slots);
358
                msgb->flushing = 1;
359
        }
360

    
361
        if ((msgb->next_slot_pop != MSG_BUFFER_INVALID_SLOT_IDX) &&
362
            msgb->flushing) {
363
                if (tv) {
364
                        tv->tv_sec = 0;
365
                        tv->tv_usec = msgb->min_to_us;
366
                }
367
                return MSG_BUFFER_DATA_READY;
368
        }
369

    
370
        if ((msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) &&
371
            (msgb->flushing)) {
372
                fprintf(stdout,
373
                        "RTP_BUFFER (%s): flush complete\n", msgb->buf_id);
374
                if (tv) {
375
                        tv->tv_sec = 0;
376
                        tv->tv_usec = msgb->min_to_us;
377
                }
378
                return MSG_BUFFER_DATA_FLUSHED;
379
        }
380

    
381
        msg_buffer_timeval_subtract(&tv_diff, &tv_now, &(msgb->last_pop));
382

    
383
        if (msgb->debug) {
384
                fprintf(stdout, "RTP_BUFFER (%s): "
385
                        "size %u (busy slots %u), TH2 %u, TH1 %u\n",
386
                        msgb->buf_id, msgb->size, msgb->busy_slots,
387
                        msgb->th2_size, msgb->th1_size);
388
        }
389

    
390
        if (!(msgb->start_buf_complete) ||
391
            (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX)) {
392

    
393
                if (tv) {
394
                        to = msgb->th2_to;
395
                        tv->tv_sec = to / 1000000;
396
                        tv->tv_usec = to % 1000000;
397
                }
398

    
399
                if (msgb->debug) {
400
                        fprintf(stdout, "RTP_BUFFER (%s): "
401
                                "start buffering incomplete or buffer empty"
402
                                "(size %u)\n", msgb->buf_id, msgb->size);
403
                }
404

    
405
                if (!(msgb->start_buf_complete)) {
406
                        return MSG_BUFFER_DATA_START_BUF;
407
                } else {
408
                        return MSG_BUFFER_DATA_NOT_READY;
409
                }
410
        }
411

    
412
        if (msgb->data_ready) {
413
                if (tv) {
414
                        tv->tv_sec = 0;
415
                        tv->tv_usec = msgb->min_to_us;
416
                }
417

    
418
                if (msgb->debug) {
419
                        fprintf(stdout,
420
                                "RTP_BUFFER (%s): DATA READY\n", msgb->buf_id);
421
                }
422

    
423
                return MSG_BUFFER_DATA_READY;
424
        }
425

    
426
        if (msgb->slow_mode == MSG_BUFFER_SLOW_MODE_TH2) {
427
                if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
428
                    msgb->th2_to) {
429
                        if (tv) {
430
                                tv->tv_sec = 0;
431
                                tv->tv_usec = msgb->min_to_us;
432
                        }
433

    
434
                        if (msgb->debug) {
435
                                fprintf(stdout, "RTP_BUFFER (%s): "
436
                                        "DATA READY TIMEOUT TH2\n",
437
                                        msgb->buf_id);
438
                        }
439

    
440
                        return MSG_BUFFER_DATA_READY;
441
                }
442

    
443
                if (tv) {
444
                        to = msgb->th2_to - (tv->tv_usec + 1000000 * tv->tv_sec);
445
                        tv->tv_sec = to / 1000000;
446
                        tv->tv_usec = to % 1000000;
447
                }
448
        }
449

    
450
        if (msgb->slow_mode == MSG_BUFFER_SLOW_MODE_TH1) {
451
                if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
452
                    msgb->th1_to) {
453
                        if (tv) {
454
                                tv->tv_sec = 0;
455
                                tv->tv_usec = msgb->min_to_us;
456
                        }
457

    
458
                        if (msgb->debug) {
459
                                fprintf(stdout, "RTP_BUFFER (%s): "
460
                                        "DATA READY TIMEOUT TH1\n",
461
                                        msgb->buf_id);
462
                        }
463

    
464
                        return MSG_BUFFER_DATA_READY;
465
                }
466

    
467
                if (tv) {
468
                        to = msgb->th1_to -
469
                                (tv->tv_usec + 1000000 * tv->tv_sec);
470
                        tv->tv_sec = to / 1000000;
471
                        tv->tv_usec = to % 1000000;
472
                }
473
        }
474

    
475
        if (msgb->debug) {
476
                fprintf(stdout, "RTP_BUFFER (%s): DATA NOT READY\n",
477
                        msgb->buf_id);
478
        }
479

    
480
        return MSG_BUFFER_DATA_NOT_READY;
481
}
482

    
483
int msg_buffer_parse_start(struct msg_buffer *msgb)
484
{
485
        if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
486
                if (msgb->debug) {
487
                        fprintf(stderr,
488
                                "RTP_BUFFER (%s): msg_buffer_parse_start() "
489
                                "failed (buffer empty)\n",
490
                                msgb->buf_id);
491
                }
492
                return -1;
493
        }
494

    
495
        msgb->next_slot_parse = msgb->next_slot_pop;
496

    
497
        return 0;
498
}
499

    
500
int msg_buffer_parse_next(struct msg_buffer *msgb, uint8_t **buf)
501
{
502
        int size = 0;
503

    
504
        if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX ||
505
            msgb->next_slot_parse == MSG_BUFFER_INVALID_SLOT_IDX) {
506
                *buf = NULL;
507
                return size;
508
        }
509

    
510
        *buf = (msgb->slots + msgb->next_slot_parse)->buf;
511
        size = (msgb->slots + msgb->next_slot_parse)->size;
512

    
513
        if (msgb->debug) {
514
                fprintf(stdout, "RTP_BUFFER (%s): parsed %d bytes in slot %d\n",
515
                    msgb->buf_id, size, msgb->next_slot_parse);
516
        }
517

    
518
        msgb->next_slot_parse = (msgb->next_slot_parse + 1) % msgb->nslots;
519

    
520
        if (msgb->next_slot_parse == msgb->next_slot_push) {
521
                msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
522
        }
523

    
524
        return size;
525

    
526
}
527

    
528
void msg_buffer_parse_stop(struct msg_buffer *msgb)
529
{
530
        msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
531
}
532

    
533
uint32_t msg_buffer_get_start_buf_size(struct msg_buffer *msgb)
534
{
535
        if (!msgb) {
536
                return 0;
537
        }
538

    
539
        return msgb->start_buf_size_th;
540
}
541

    
542
uint32_t msg_buffer_get_current_size(struct msg_buffer *msgb)
543
{
544
        if (!msgb) {
545
                return 0;
546
        }
547

    
548
        return msgb->size;
549
}
550

    
551
void msg_buffer_set_ths_to(struct msg_buffer *msgb,
552
                           uint32_t th1_to_us,
553
                           uint32_t th2_to_us)
554
{
555
        if (msgb) {
556
                msgb->th1_to = th1_to_us;
557
                msgb->th2_to = th2_to_us;
558
        }
559
}
560

    
561
void msg_buffer_set_ths_size(struct msg_buffer *msgb,
562
                             uint32_t th1_size,
563
                             uint32_t th2_size)
564
{
565
        if (msgb) {
566
                msgb->th1_size = th1_size;
567
                msgb->th2_size = th2_size;
568
        }
569
}
570

    
571
void msg_buffer_set_start_buffering_to_us(struct msg_buffer *msgb,
572
                                            uint32_t to_us)
573
{
574
        if (msgb) {
575
                msgb->start_buf_to_us = to_us;
576
                msgb->buffer_started = 0;
577
        }
578
}
579

    
580
void msg_buffer_set_start_buf_size_th(struct msg_buffer *msgb,
581
                                      uint32_t size_th)
582
{
583
        if (msgb) {
584
                msgb->start_buf_size_th = size_th;
585
        }
586
}
587

    
588
void msg_buffer_set_flush_to_us(struct msg_buffer *msgb, uint32_t to_us)
589
{
590
        if (msgb) {
591
                msgb->flush_to_us = to_us;
592
        }
593
}
594

    
595
void msg_buffer_start_buf_reinit(struct msg_buffer *msgb, uint32_t to_us)
596
{
597
        if (msgb) {
598
                msgb->flushing = 0;
599
                msgb->start_buf_complete = 0;
600
                msgb->buffer_started = 1;
601
                msgb->start_buf_to_us = to_us;
602
                gettimeofday(&(msgb->buffer_started_tv), NULL);
603
        }
604
}
605

    
606
int msg_buffer_is_flushing(struct msg_buffer *msgb)
607
{
608
        if (msgb) {
609
                if (msgb->flushing) {
610
                        return 1;
611
                }
612
        }
613

    
614
        return 0;
615
}
616

    
617
uint32_t msg_buffer_get_nslots(struct msg_buffer *msgb)
618
{
619
        if (msgb) {
620
                return msgb->nslots;
621
        }
622

    
623
        return 0;
624
}