Revision 3af4c8d7 src/pstreamer.c

View differences:

src/pstreamer.c
29 29
#include<debug.h>
30 30

  
31 31
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32
#define STREAMER_PEER_CONF "port=%d,dechunkiser=rtp,base=%d,addr=%s"
33
#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,base=%d,addr=%s,max_delay_ms=50"
32 34

  
33 35
struct pstreamer {
34 36
	char source_ip[MAX_IPADDR_LENGTH];
......
39 41
	uint16_t base_port;
40 42
	struct periodic_task * topology_task;
41 43
	struct periodic_task * offer_task;
44
	struct periodic_task * inject_task;
42 45
	struct periodic_task * msg_task;
43 46
	struct task_manager * tm;
44 47
	timeout topology_interval;
......
52 55
	struct janus_instance const * janus;
53 56
};
54 57

  
55
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_dst_ip, const char * opts)
58
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_ip, const char * fmt, const char * opts)
56 59
/* we assume source_ip and source_port are valid strings */
57 60
{
58 61
	char config[MAX_PSINSTANCE_CONFIG_LENGTH];
59
	char * fmt = "port=%d,dechunkiser=rtp,base=%d,addr=%s";
60 62
	int count;
61 63

  
62
	count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_dst_ip);
64
	count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_ip);
63 65
	if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
64 66
		snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
65 67
	ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
66 68

  
67 69
	ps->topology_task = NULL;
68 70
	ps->offer_task = NULL;
71
	ps->inject_task = NULL;
69 72
	ps->msg_task = NULL;
70 73
	ps->tm = NULL;
71 74
	ps->topology_interval = 400;
......
85 88
		task_manager_destroy_task(ps->tm, &(ps->topology_task));
86 89
	if (ps->offer_task)
87 90
		task_manager_destroy_task(ps->tm, &(ps->offer_task));
91
	if (ps->inject_task)
92
		task_manager_destroy_task(ps->tm, &(ps->inject_task));
88 93
	if (ps->msg_task)
89 94
		task_manager_destroy_task(ps->tm, &(ps->msg_task));
90 95
	psinstance_destroy(&(ps->psc));
......
168 173
		ord_set_for_each(ps, (*psm)->streamers)
169 174
		{
170 175
			if ((*psm)->janus)
171
				janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
176
				if (pstreamer_is_source(ps))
177
					janus_instance_destroy_videoroom((*psm)->janus, ((struct pstreamer*)ps)->id);
178
				else
179
					janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
172 180
			pstreamer_deinit((struct pstreamer *)ps);
173 181
		}
174 182

  
......
196 204
		ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
197 205
		ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
198 206
		ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
207
		if (pstreamer_is_source(ps))
208
			ps->inject_task = task_manager_new_task(tm, NULL, pstreamer_inject_task_callback, 25, ps->psc); 
199 209
	}
200 210
	return 0;
201 211
}
......
263 273
			else
264 274
				if (scc)
265 275
					streamer_creation_callback_trigger(scc, 0);
266
			pstreamer_init(ps, rtp_dst_ip, psm->streamer_opts);
276
			pstreamer_init(ps, rtp_dst_ip, STREAMER_PEER_CONF, psm->streamer_opts);
267 277
			ord_set_insert(psm->streamers, ps, 0);
268 278
		} else
269 279
		{
......
340 350
	}
341 351
	return NULL;
342 352
}
353

  
354
uint8_t pstreamer_is_source(const struct pstreamer * ps)
355
{
356
	return ps && (ps->source_port == 0) ? 1 : 0;
357
}
358

  
359
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm)
360
{
361
	char * res = NULL, * ps_json;
362
	uint32_t pos;
363
	const void * iter;
364
	const struct pstreamer * ps;
365

  
366
	if (psm)
367
	{
368
		res = malloc(sizeof(char)*3);
369
		res[0] = '[';
370
		pos = 1;
371

  
372
		ord_set_for_each(iter, psm->streamers)
373
		{
374
			ps = (const struct pstreamer *) iter;
375
			if (pstreamer_is_source(ps))
376
			{
377
				ps_json = pstreamer_to_json(ps);
378
				if (ps_json)
379
				{
380
					res = realloc(res, sizeof(char)*(pos+strlen(ps_json) + (pos == 1? 2 : 3)));
381
					if (pos > 1)
382
						res[pos++] = ',';
383
					strcpy(res+pos, ps_json);
384
					pos += strlen(ps_json);
385
					free(ps_json);
386
				}
387
			}
388
		}
389
		res[pos++] = ']';
390
		res[pos] = '\0';
391
	}
392
	return res;
393
}
394

  
395
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc)
396
{
397
	struct pstreamer * ps = NULL;
398
	const void * ptr = NULL;
399

  
400
	if (psm && id && rtp_src_ip)
401
	{
402
		ps = malloc(sizeof(struct pstreamer));
403
		strncpy(ps->source_ip, rtp_src_ip, MAX_IPADDR_LENGTH);
404
		ps->source_port = 0;
405
		strncpy(ps->id, id, PSID_LENGTH);
406
		ps->base_port = assign_streaming_ports(psm); 
407
		ptr = ord_set_find(psm->streamers, (const void *) ps);
408
		if (ptr == NULL)
409
		{
410
			pstreamer_touch(ps);
411
			streamer_creation_set_pstreamer_ref(scc, ps);
412
			if (psm->janus)
413
			{
414
				debug("calling janus upon notification of perstreamer source creation\n");
415
				janus_instance_create_videoroom(psm->janus, ps->id, scc);
416
			}
417
			else
418
				if (scc)
419
					streamer_creation_callback_trigger(scc, 0);
420
			pstreamer_init(ps, ps->source_ip, STREAMER_SOURCE_CONF, psm->streamer_opts);
421
			ord_set_insert(psm->streamers, ps, 0);
422
		} else
423
		{
424
			free(ps);
425
			ps = NULL;
426
		}
427
	}
428

  
429
	return ps;
430
}
431

  
432
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id)
433
{
434
	if (psm && ps)
435
	{
436
		pstreamer_touch(ps);
437
		if (psm->janus && pstreamer_is_source(ps))
438
		{
439
			janus_instance_forward_rtp(psm->janus, ps->id, janus_id, ps->source_ip, ps->base_port+1, ps->base_port+3);
440
			ps->janus_streaming_id = janus_id;
441
		}
442
	}
443
}

Also available in: Unified diff