Revision a8a91b7e

View differences:

src/msg_buffer.c
1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <string.h>
4
#include "msg_buffer.h"
5

  
6

  
7
struct msg_buffer_slot {
8
	uint8_t *buf;
9
	uint32_t size;
10
	int busy;
11
};
12

  
13
struct msg_buffer {
14
	struct msg_buffer_slot *slots;
15
	uint32_t nslots;
16
	int32_t next_slot_push;
17
	int32_t next_slot_pop;
18

  
19
	struct timeval last_push;
20
	struct timeval last_pop;
21

  
22
	uint32_t size;
23
	uint32_t busy_slots;
24

  
25
	int buffer_started;
26
	struct timeval buffer_started_tv;
27
	uint32_t start_buf_to_us;
28
	uint32_t start_buf_size_th;
29
	int start_buf_complete;
30
	int data_ready;
31
	int slow_mode; /* 0 disabled, 1 use th1 parameters, 2 use th2 parameters */
32
	uint32_t th1_size;
33
	uint32_t th1_to;
34
	uint32_t th2_size;
35
	uint32_t th2_to;
36

  
37
	uint32_t flush_to_us;
38
	int flushing;
39

  
40
	uint32_t min_to_us;
41

  
42
	int32_t next_slot_parse;
43

  
44
	int debug;
45
	char *buf_id;
46
};
47

  
48
/* Return 1 if the difference (t2 - t1) is negative, otherwise 0.  */
49
static int msg_buffer_timeval_subtract(struct timeval *result,
50
				       struct timeval *t2,
51
				       struct timeval *t1)
52
{
53
	long int diff = (t2->tv_usec + 1000000 * t2->tv_sec) -
54
				(t1->tv_usec + 1000000 * t1->tv_sec);
55
	result->tv_sec = diff / 1000000;
56
	result->tv_usec = diff % 1000000;
57

  
58
	return (diff < 0);
59
}
60

  
61
static int msg_buffer_increase_n_slots(struct msg_buffer *msgb)
62
{
63
	uint32_t new_nslots = msgb->nslots << 1;
64
	int i;
65
	int32_t next_slot_copy = msgb->next_slot_pop;
66

  
67
	struct msg_buffer_slot * new_slots = (struct msg_buffer_slot *)
68
			malloc(sizeof(struct msg_buffer_slot) * new_nslots);
69

  
70
	if (!new_slots) {
71
		fprintf(stderr,
72
			"RTP_BUFFER ERROR (%s): new_slots allocation failed\n",
73
			msgb->buf_id);
74
		return -1;
75
	}
76

  
77
	i = 0;
78
	while (i < msgb->nslots) {
79
		(new_slots + i)->buf = (msgb->slots + next_slot_copy)->buf;
80
		(new_slots + i)->size = (msgb->slots + next_slot_copy)->size;
81
		(new_slots + i)->busy = (msgb->slots + next_slot_copy)->busy;
82

  
83
		i++;
84

  
85
		next_slot_copy = (next_slot_copy + 1) % msgb->nslots;
86
	}
87

  
88
	free(msgb->slots);
89

  
90
	if (msgb->next_slot_parse != MSG_BUFFER_INVALID_SLOT_IDX) {
91
		if (msgb->next_slot_parse >= msgb->next_slot_pop) {
92
			msgb->next_slot_parse = msgb->next_slot_parse -
93
						msgb->next_slot_pop;
94
		} else {
95
			msgb->next_slot_parse =
96
					(msgb->nslots - msgb->next_slot_pop) +
97
					msgb->next_slot_parse;
98
		}
99
	}
100

  
101
	msgb->slots = new_slots;
102
	msgb->nslots = new_nslots;
103
	msgb->next_slot_pop = 0;
104
	msgb->next_slot_push = i;
105

  
106
	return 0;
107
}
108

  
109
static void msg_buffer_check_start_buf_to(struct msg_buffer *msgb)
110
{
111
	if (msgb->buffer_started && (msgb->start_buf_to_us > 0)) {
112
		struct timeval tv_now, tv_diff;
113
		gettimeofday(&tv_now, NULL);
114
		msg_buffer_timeval_subtract(&tv_diff, &tv_now,
115
					&(msgb->buffer_started_tv));
116
		if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
117
		    msgb->start_buf_to_us) {
118
			if (msgb->debug) {
119
				fprintf(stdout,
120
				    "RTP_BUFFER (%s): initial buffering TO\n",
121
				    msgb->buf_id);
122
			}
123
			msgb->start_buf_complete = 1;
124
		}
125
	}
126
}
127

  
128
struct msg_buffer *msg_buffer_init(int debug, char *id)
129
{
130
	struct msg_buffer *msgb = (struct msg_buffer *)
131
					malloc(sizeof(struct msg_buffer));
132

  
133
	if (!msgb) {
134
		fprintf(stderr,
135
			"ERROR: struct msg_buffer memory allocation failed\n");
136
		goto msgb_err;
137
	}
138

  
139
	memset(msgb, 0, sizeof(struct msg_buffer));
140

  
141
	if (!id) {
142
		fprintf(stderr, "RTP_BUFFER ERROR: id parameter is NULL\n");
143
		goto id_err;
144
	}
145

  
146
	msgb->buf_id = (char *) malloc(strlen(id) + 1);
147

  
148
	if (!(msgb->buf_id)) {
149
		fprintf(stderr, "RTP_BUFFER ERROR: buf_id allocation failed\n");
150
		goto id_err;
151
	}
152

  
153
	strncpy(msgb->buf_id, id, strlen(id) + 1);
154

  
155
	msgb->slots = (struct msg_buffer_slot *)
156
				malloc(sizeof(struct msg_buffer_slot) *
157
				MSG_BUFFER_N_START_SLOT);
158

  
159
	if (!(msgb->slots)) {
160
		fprintf(stderr, "RTP_BUFFER ERROR (%s): "
161
			"struct msg_buffer_slots memory allocation failed\n",
162
			msgb->buf_id);
163
		goto slots_err;
164
	}
165

  
166
	memset(msgb->slots, 0,
167
	       sizeof(struct msg_buffer_slot) * MSG_BUFFER_N_START_SLOT);
168

  
169
	msgb->nslots = MSG_BUFFER_N_START_SLOT;
170
	msgb->next_slot_push = 0;
171
	msgb->next_slot_pop = MSG_BUFFER_INVALID_SLOT_IDX;
172
	gettimeofday(&(msgb->last_push), NULL);
173
	gettimeofday(&(msgb->last_pop), NULL);
174
	msgb->size = 0;
175
	msgb->busy_slots = 0;
176
	msgb->buffer_started = 0;
177
	msgb->start_buf_to_us = 0;
178
	msgb->start_buf_size_th = MSG_BUFFER_START_BUF_SIZE_TH;
179
	msgb->start_buf_complete = 0;
180
	msgb->data_ready = 0;
181
	msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH2;
182
	msgb->th1_size = MSG_BUFFER_TH1_SIZE;
183
	msgb->th1_to = MSG_BUFFER_TH1_TO_US;
184
	msgb->th2_size = MSG_BUFFER_TH2_SIZE;
185
	msgb->th2_to = MSG_BUFFER_TH2_TO_US;
186
	msgb->flush_to_us = MSG_BUFFER_FLUSH_TO_US;
187
	msgb->flushing = 0;
188
	msgb->min_to_us = MSG_BUFFER_MIN_TO_US;
189
	msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
190
	msgb->debug = debug;
191

  
192
	return msgb;
193

  
194
slots_err:
195
	free(msgb->buf_id);
196
id_err:
197
	free(msgb);
198
msgb_err:
199
	return NULL;
200
}
201

  
202
void msg_buffer_destroy(struct msg_buffer **msgb)
203
{
204
	uint32_t i;
205

  
206
	if (*msgb) {
207
		for (i = 0; i < (*msgb)->nslots; i++) {
208
		if (((*msgb)->slots + i)->busy) {
209
			free(((*msgb)->slots + i)->buf);
210
			((*msgb)->slots + i)->buf = NULL;
211
			((*msgb)->slots + i)->size = 0;
212
			((*msgb)->slots + i)->busy = 0;
213
		}
214
	}
215

  
216
		free((*msgb)->slots);
217
		free((*msgb)->buf_id);
218
		free((*msgb));
219

  
220
		*msgb = NULL;
221
	}
222
}
223

  
224
int msg_buffer_push(struct msg_buffer *msgb, uint8_t *buf, uint32_t size)
225
{
226
	int res = 0;
227
	uint8_t *new_buf = NULL;
228

  
229
	/* If all slots are busy, allocate new ones */
230
	if ((msgb->busy_slots > 0) &&
231
	    (msgb->next_slot_push == msgb->next_slot_pop)) {
232
		if ((res = msg_buffer_increase_n_slots(msgb)) == -1) {
233
			fprintf(stderr, "RTP_BUFFER ERROR (%s): "
234
				"msg_buffer_increase_n_slots()\n",
235
				msgb->buf_id);
236
			return res;
237
		}
238
	}
239

  
240
	/* Copy the new buffer into the next slot */
241
	new_buf = (uint8_t *) malloc(size);
242
	if (!new_buf) {
243
		fprintf(stderr, "RTP_BUFFER ERROR (%s): buffer push failed\n",
244
			msgb->buf_id);
245
		res = -1;
246
		return res;
247
	}
248

  
249
	memcpy(new_buf, buf, size);
250
	(msgb->slots + msgb->next_slot_push)->buf = new_buf;
251
	(msgb->slots + msgb->next_slot_push)->size = size;
252
	(msgb->slots + msgb->next_slot_push)->busy = 1;
253

  
254
	if (msgb->debug) {
255
		fprintf(stdout, "RTP_BUFFER (%s): "
256
			"pushed %d bytes in slot %d (tot. size %u)\n",
257
			msgb->buf_id, size, msgb->next_slot_push,
258
			msgb->size + size);
259
	}
260

  
261
	if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
262
		msgb->next_slot_pop = msgb->next_slot_push;
263
	}
264

  
265
	msgb->next_slot_push = (msgb->next_slot_push + 1) % msgb->nslots;
266
	msgb->size += size;
267
	msgb->busy_slots++;
268
	msgb->flushing = 0;
269
	gettimeofday(&(msgb->last_push), NULL);
270

  
271
	if (!(msgb->buffer_started)) {
272
		msgb->buffer_started = 1;
273
		gettimeofday(&(msgb->buffer_started_tv), NULL);
274
	}
275

  
276
	if (msgb->size >= msgb->th1_size) {
277
		msgb->data_ready = 1;
278
		msgb->slow_mode = MSG_BUFFER_SLOW_MODE_DISABLED;
279
	}
280
	/* else if (msgb->size >= msgb->th2_size) {
281
		msgb->slow_mode = 1;
282
	} */
283

  
284
	if (!(msgb->start_buf_complete)) {
285
		if (msgb->size >= msgb->start_buf_size_th) {
286
			msgb->start_buf_complete = 1;
287
		} else {
288
			msg_buffer_check_start_buf_to(msgb);
289
		}
290
	}
291

  
292
	return res;
293
}
294

  
295
int msg_buffer_pop(struct msg_buffer *msgb, uint8_t **buf)
296
{
297
	int size = 0;
298

  
299
	if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
300
		*buf = NULL;
301
		return size;
302
	}
303

  
304
	*buf = (msgb->slots + msgb->next_slot_pop)->buf;
305
	size = (msgb->slots + msgb->next_slot_pop)->size;
306

  
307
	(msgb->slots + msgb->next_slot_pop)->buf = NULL;
308
	(msgb->slots + msgb->next_slot_pop)->size = 0;
309
	(msgb->slots + msgb->next_slot_pop)->busy = 0;
310

  
311
	if (msgb->debug) {
312
		fprintf(stdout, "RTP_BUFFER (%s): "
313
			"popped %d bytes in slot %d (tot. size %u)\n",
314
			msgb->buf_id, size, msgb->next_slot_pop,
315
			msgb->size - size);
316
	}
317

  
318
	msgb->next_slot_pop = (msgb->next_slot_pop + 1) % msgb->nslots;
319
	(msgb->busy_slots)--;
320
	msgb->size -= size;
321
	gettimeofday(&(msgb->last_pop), NULL);
322

  
323
	if (msgb->busy_slots == 0) {
324
		msgb->next_slot_pop = MSG_BUFFER_INVALID_SLOT_IDX;
325
	}
326

  
327
	if (msgb->size < msgb->th1_size) {
328
		msgb->data_ready = 0;
329

  
330
		if (msgb->slow_mode != MSG_BUFFER_SLOW_MODE_TH2) {
331
			msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH1;
332
		}
333
	}
334

  
335
	if (msgb->size < msgb->th2_size) {
336
		msgb->slow_mode = MSG_BUFFER_SLOW_MODE_TH2;
337
	}
338

  
339
	return size;
340
}
341

  
342
int msg_buffer_get_status(struct msg_buffer *msgb, struct timeval *tv)
343
{
344
	struct timeval tv_now, tv_flush, tv_diff;
345
	long to;
346

  
347
	msg_buffer_check_start_buf_to(msgb);
348

  
349
	gettimeofday(&tv_now, NULL);
350
	msg_buffer_timeval_subtract(&tv_flush, &tv_now, &(msgb->last_push));
351

  
352
	if (!(msgb->flushing) && msgb->start_buf_complete &&
353
	    ((tv_flush.tv_usec + 1000000 * tv_flush.tv_sec) >
354
	     msgb->flush_to_us)) {
355
		fprintf(stdout,
356
			"RTP_BUFFER (%s): flush (size %u, busy slots %u)\n",
357
			msgb->buf_id, msgb->size, msgb->busy_slots);
358
		msgb->flushing = 1;
359
	}
360

  
361
	if ((msgb->next_slot_pop != MSG_BUFFER_INVALID_SLOT_IDX) &&
362
	    msgb->flushing) {
363
		if (tv) {
364
			tv->tv_sec = 0;
365
			tv->tv_usec = msgb->min_to_us;
366
		}
367
		return MSG_BUFFER_DATA_READY;
368
	}
369

  
370
	if ((msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) &&
371
	    (msgb->flushing)) {
372
		fprintf(stdout,
373
			"RTP_BUFFER (%s): flush complete\n", msgb->buf_id);
374
		if (tv) {
375
			tv->tv_sec = 0;
376
			tv->tv_usec = msgb->min_to_us;
377
		}
378
		return MSG_BUFFER_DATA_FLUSHED;
379
	}
380

  
381
	msg_buffer_timeval_subtract(&tv_diff, &tv_now, &(msgb->last_pop));
382

  
383
	if (msgb->debug) {
384
		fprintf(stdout, "RTP_BUFFER (%s): "
385
			"size %u (busy slots %u), TH2 %u, TH1 %u\n",
386
			msgb->buf_id, msgb->size, msgb->busy_slots,
387
			msgb->th2_size, msgb->th1_size);
388
	}
389

  
390
	if (!(msgb->start_buf_complete) ||
391
	    (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX)) {
392

  
393
		if (tv) {
394
			to = msgb->th2_to;
395
			tv->tv_sec = to / 1000000;
396
			tv->tv_usec = to % 1000000;
397
		}
398

  
399
		if (msgb->debug) {
400
			fprintf(stdout, "RTP_BUFFER (%s): "
401
				"start buffering incomplete or buffer empty"
402
				"(size %u)\n", msgb->buf_id, msgb->size);
403
		}
404

  
405
		if (!(msgb->start_buf_complete)) {
406
			return MSG_BUFFER_DATA_START_BUF;
407
		} else {
408
			return MSG_BUFFER_DATA_NOT_READY;
409
		}
410
	}
411

  
412
	if (msgb->data_ready) {
413
		if (tv) {
414
			tv->tv_sec = 0;
415
			tv->tv_usec = msgb->min_to_us;
416
		}
417

  
418
		if (msgb->debug) {
419
			fprintf(stdout,
420
				"RTP_BUFFER (%s): DATA READY\n", msgb->buf_id);
421
		}
422

  
423
		return MSG_BUFFER_DATA_READY;
424
	}
425

  
426
	if (msgb->slow_mode == MSG_BUFFER_SLOW_MODE_TH2) {
427
		if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
428
		    msgb->th2_to) {
429
			if (tv) {
430
				tv->tv_sec = 0;
431
				tv->tv_usec = msgb->min_to_us;
432
			}
433

  
434
			if (msgb->debug) {
435
				fprintf(stdout, "RTP_BUFFER (%s): "
436
					"DATA READY TIMEOUT TH2\n",
437
					msgb->buf_id);
438
			}
439

  
440
			return MSG_BUFFER_DATA_READY;
441
		}
442

  
443
		if (tv) {
444
			to = msgb->th2_to - (tv->tv_usec + 1000000 * tv->tv_sec);
445
			tv->tv_sec = to / 1000000;
446
			tv->tv_usec = to % 1000000;
447
		}
448
	}
449

  
450
	if (msgb->slow_mode == MSG_BUFFER_SLOW_MODE_TH1) {
451
		if ((tv_diff.tv_usec + 1000000 * tv_diff.tv_sec) >=
452
		    msgb->th1_to) {
453
			if (tv) {
454
				tv->tv_sec = 0;
455
				tv->tv_usec = msgb->min_to_us;
456
			}
457

  
458
			if (msgb->debug) {
459
				fprintf(stdout, "RTP_BUFFER (%s): "
460
					"DATA READY TIMEOUT TH1\n",
461
					msgb->buf_id);
462
			}
463

  
464
			return MSG_BUFFER_DATA_READY;
465
		}
466

  
467
		if (tv) {
468
			to = msgb->th1_to -
469
				(tv->tv_usec + 1000000 * tv->tv_sec);
470
			tv->tv_sec = to / 1000000;
471
			tv->tv_usec = to % 1000000;
472
		}
473
	}
474

  
475
	if (msgb->debug) {
476
		fprintf(stdout, "RTP_BUFFER (%s): DATA NOT READY\n",
477
			msgb->buf_id);
478
	}
479

  
480
	return MSG_BUFFER_DATA_NOT_READY;
481
}
482

  
483
int msg_buffer_parse_start(struct msg_buffer *msgb)
484
{
485
	if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX) {
486
		if (msgb->debug) {
487
			fprintf(stderr,
488
				"RTP_BUFFER (%s): msg_buffer_parse_start() "
489
				"failed (buffer empty)\n",
490
				msgb->buf_id);
491
		}
492
		return -1;
493
	}
494

  
495
	msgb->next_slot_parse = msgb->next_slot_pop;
496

  
497
	return 0;
498
}
499

  
500
int msg_buffer_parse_next(struct msg_buffer *msgb, uint8_t **buf)
501
{
502
	int size = 0;
503

  
504
	if (msgb->next_slot_pop == MSG_BUFFER_INVALID_SLOT_IDX ||
505
	    msgb->next_slot_parse == MSG_BUFFER_INVALID_SLOT_IDX) {
506
		*buf = NULL;
507
		return size;
508
	}
509

  
510
	*buf = (msgb->slots + msgb->next_slot_parse)->buf;
511
	size = (msgb->slots + msgb->next_slot_parse)->size;
512

  
513
	if (msgb->debug) {
514
		fprintf(stdout, "RTP_BUFFER (%s): parsed %d bytes in slot %d\n",
515
		    msgb->buf_id, size, msgb->next_slot_parse);
516
	}
517

  
518
	msgb->next_slot_parse = (msgb->next_slot_parse + 1) % msgb->nslots;
519

  
520
	if (msgb->next_slot_parse == msgb->next_slot_push) {
521
		msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
522
	}
523

  
524
	return size;
525

  
526
}
527

  
528
void msg_buffer_parse_stop(struct msg_buffer *msgb)
529
{
530
	msgb->next_slot_parse = MSG_BUFFER_INVALID_SLOT_IDX;
531
}
532

  
533
uint32_t msg_buffer_get_start_buf_size(struct msg_buffer *msgb)
534
{
535
	if (!msgb) {
536
		return 0;
537
	}
538

  
539
	return msgb->start_buf_size_th;
540
}
541

  
542
uint32_t msg_buffer_get_current_size(struct msg_buffer *msgb)
543
{
544
	if (!msgb) {
545
		return 0;
546
	}
547

  
548
	return msgb->size;
549
}
550

  
551
void msg_buffer_set_ths_to(struct msg_buffer *msgb,
552
			   uint32_t th1_to_us,
553
			   uint32_t th2_to_us)
554
{
555
	if (msgb) {
556
		msgb->th1_to = th1_to_us;
557
		msgb->th2_to = th2_to_us;
558
	}
559
}
560

  
561
void msg_buffer_set_ths_size(struct msg_buffer *msgb,
562
			     uint32_t th1_size,
563
			     uint32_t th2_size)
564
{
565
	if (msgb) {
566
		msgb->th1_size = th1_size;
567
		msgb->th2_size = th2_size;
568
	}
569
}
570

  
571
void msg_buffer_set_initial_buffering_to_us(struct msg_buffer *msgb,
572
					    uint32_t to_us)
573
{
574
	if (msgb) {
575
		msgb->start_buf_to_us = to_us;
576
		msgb->buffer_started = 0;
577
	}
578
}
579

  
580
void msg_buffer_set_start_buf_size_th(struct msg_buffer *msgb,
581
				      uint32_t size_th)
582
{
583
	if (msgb) {
584
		msgb->start_buf_size_th = size_th;
585
	}
586
}
587

  
588
void msg_buffer_set_flush_to_us(struct msg_buffer *msgb, uint32_t to_us)
589
{
590
	if (msgb) {
591
		msgb->flush_to_us = to_us;
592
	}
593
}
594

  
595
void msg_buffer_start_buf_reinit(struct msg_buffer *msgb, uint32_t to_us)
596
{
597
	if (msgb) {
598
		msgb->flushing = 0;
599
		msgb->start_buf_complete = 0;
600
		msgb->buffer_started = 1;
601
		msgb->start_buf_to_us = to_us;
602
		gettimeofday(&(msgb->buffer_started_tv), NULL);
603
	}
604
}
605

  
606
int msg_buffer_is_flushing(struct msg_buffer *msgb)
607
{
608
	if (msgb) {
609
		if (msgb->flushing) {
610
			return 1;
611
		}
612
	}
613

  
614
	return 0;
615
}
616

  
617
uint32_t msg_buffer_get_nslots(struct msg_buffer *msgb)
618
{
619
	if (msgb) {
620
		return msgb->nslots;
621
	}
622
}
src/msg_buffer.h
1
#ifndef MSG_BUFFER
2
#define MSG_BUFFER
3

  
4
#include <stdint.h>
5
#include <sys/time.h>
6

  
7
/*
8
 * Thresholds sizes and timeouts should be carefully selected depending on the
9
 * type of content stored in the buffer.
10
 *
11
 * In case of audio/video content the values should depend on the stream
12
 * parameters:
13
 *
14
 *  AVStream *st = ic->streams[i]; where ic is an AVFormatContext
15
 *
16
 */
17

  
18
#define MSG_BUFFER_START_BUF_SIZE_TH	(0x40000) /* 256KiB */
19
#define MSG_BUFFER_TH1_SIZE		(0x20000) /* 128KiB */
20
#define MSG_BUFFER_TH1_TO_US		(33000)   /* 33ms (e.g., 30fps video) */
21
#define MSG_BUFFER_TH2_SIZE		(0x10000) /* 64KiB */
22
#define MSG_BUFFER_TH2_TO_US		(66000)   /* 66ms */
23

  
24
#define MSG_BUFFER_FLUSH_TO_US		(300000)  /* 300ms */
25

  
26
#define MSG_BUFFER_MIN_TO_US		(0)      /* 0ms */
27

  
28
#define MSG_BUFFER_N_START_SLOT		(4096)    /* Default # starting slots */
29

  
30
#define MSG_BUFFER_INVALID_SLOT_IDX	(-1)
31

  
32
#define MSG_BUFFER_DATA_START_BUF	(0x4)
33
#define MSG_BUFFER_DATA_FLUSHED		(0x2)
34
#define MSG_BUFFER_DATA_READY		(0x1)
35
#define MSG_BUFFER_DATA_NOT_READY	(0x0)
36

  
37
#define MSG_BUFFER_SLOW_MODE_DISABLED	(0x0)
38
#define MSG_BUFFER_SLOW_MODE_TH1	(0x1)
39
#define MSG_BUFFER_SLOW_MODE_TH2	(0x2)
40

  
41
/* msg_buffer_init must be called for initializing the msg_buffer.
42
 *
43
 * - debug: 1 for printing verbose debuggin messages, 0 otherwise
44
 * - id: identifier string for this buffer
45
 *
46
 * Return a pointer to a struct msg_buffer that must be passed as parameter to
47
 * all the other functions listed in this file.
48
 */
49
struct msg_buffer *msg_buffer_init(int debug, char *id);
50

  
51
/* msg_buffer_destroy frees the resources used by the msg_buffer.
52
 *
53
 * - msgb: pointer to a struct msg_buffer previously initialized with
54
 *   msg_buffer_init
55
 */
56
void msg_buffer_destroy(struct msg_buffer **msgb);
57

  
58
/* Push a new message in the msg_buffer */
59
int msg_buffer_push(struct msg_buffer *msgb, uint8_t *buf, uint32_t size);
60
/* Check the status of the buffer.
61
 *
62
 * Return:
63
 * - MSG_BUFFER_DATA_READY: a new message can be popped from the buffer
64
 * - MSG_BUFFER_DATA_FLUSHED: buffer is now empty after the flush
65
 * - MSG_BUFFER_DATA_START_BUF: buffer is waiting to reach the initial buffering
66
 *   threshold
67
 * - MSG_BUFFER_DATA_NOT_READY: there are no messages to pop from the buffer
68
 *   */
69
int msg_buffer_get_status(struct msg_buffer *msgb, struct timeval *tv);
70
/* Pop a message from the buffer.
71
 * The caller has the responsibility to free buf.
72
 */
73
int msg_buffer_pop(struct msg_buffer *msgb, uint8_t **buf);
74

  
75
/* The following three functions are used to parse the content of msg_buffer
76
 * (starting from next_slot_pop) without actually removing the slots from the
77
 * buffer
78
 *
79
 * This is useful for parsing the content of the buffer without actually
80
 * removing the content in order to consume it later.
81
 *
82
 * The usage patter is the following:
83
 * - call msg_buffer_parse_start(), if it returns 0 then
84
 * - call (also multiple times) msg_buffer_parse_next() for parsing the content
85
 *   of the buffer
86
 * - When the parsing is complete, close the transaction with
87
 *   msg_buffer_parse_stop
88
 */
89

  
90
/* Return -1 if the msg_buffer is empty, 0 otherwise */
91
int msg_buffer_parse_start(struct msg_buffer *msgb);
92
int msg_buffer_parse_next(struct msg_buffer *msgb, uint8_t **buf);
93
void msg_buffer_parse_stop(struct msg_buffer *msgb);
94

  
95
/* Reinitialize the bootstrap of the buffer.
96
 * The next data will be ready when the buffer reaches a size greater or equal
97
 * to the start buffering threshold (configured using
98
 * msg_buffer_set_start_buf_size_th)
99
 */
100
void msg_buffer_start_buf_reinit(struct msg_buffer *msgb, uint32_t to_us);
101

  
102
/* Get/Set functions */
103

  
104
/* return the initial buffering threshold */
105
uint32_t msg_buffer_get_start_buf_size(struct msg_buffer *msgb);
106
/* return the current buffer size in byte */
107
uint32_t msg_buffer_get_current_size(struct msg_buffer *msgb);
108
/* set the timeouts (us) for threshold1 (TH1) and threshold2 (TH2) */
109
void msg_buffer_set_ths_to(struct msg_buffer *msgb,
110
			   uint32_t th1_to_us,
111
			   uint32_t th2_to_us);
112
/* set the sizes (bytes) for threshold1 (TH1) and threshold2 (TH2) */
113
void msg_buffer_set_ths_size(struct msg_buffer *msgb,
114
			     uint32_t th1_size,
115
			     uint32_t th2_size);
116
/* set the timeout (us) for the initial buffering */
117
void msg_buffer_set_initial_buffering_to_us(struct msg_buffer *msgb,
118
					    uint32_t to_us);
119
/* set the size (bytes) of the initial buffering
120
 *
121
 * msg_buffer_get_status() will return MSG_BUFFER_DATA_NOT_READY until the start
122
 * buffer size is reached or until the initail buffer timeout expires (whatever
123
 * comes first)
124
 */
125
void msg_buffer_set_start_buf_size_th(struct msg_buffer *msgb,
126
				      uint32_t size_th);
127
/* Set the flush timeout for the buffer.
128
 * The buffer start flushing if the timeout expires.
129
 * The timeout is reset every time a new message is pushed into the buffer
130
 */
131
void msg_buffer_set_flush_to_us(struct msg_buffer *msgb, uint32_t to_us);
132
/* Return 1 if the msg_buffer is currently flushing, 0 otherwise */
133
int msg_buffer_is_flushing(struct msg_buffer *msgb);
134
/* Return the number of slots in the buffer */
135
uint32_t msg_buffer_get_nslots(struct msg_buffer *msgb);
136

  
137
#endif // MSG_BUFFER

Also available in: Unified diff