Revision 3af4c8d7

View differences:

Makefile
14 14
MONGOOSE_OPTS+=-DMG_DISABLE_MQTT -DMG_DISABLE_JSON_RPC -DMG_DISABLE_SOCKETPAIR  -DMG_DISABLE_CGI # -DMG_DISABLE_HTTP_WEBSOCKET
15 15
LDFLAGS+=-lpstreamer -lgrapes -lm
16 16

  
17
all: $(EXE) Tools/janus
17
all: $(EXE) Tools/janus/bin/janus
18 18

  
19 19
$(EXE): $(LIBS) $(OBJS) peerstreamer-ng.c
20 20
	$(CC) -o peerstreamer-ng  peerstreamer-ng.c $(OBJS) Libs/mongoose/mongoose.o $(CFLAGS) $(LDFLAGS)
......
37 37
	git submodule update Libs/pstreamer/
38 38
	make -C Libs/pstreamer/ 
39 39

  
40
Tools/janus:
40
Tools/janus/bin/janus:
41 41
	git submodule init Libs/janus-gateway/
42 42
	git submodule update Libs/janus-gateway/
43 43
	cd $(PWD)/Libs/janus-gateway/ && ./autogen.sh
44
	cd $(PWD)/Libs/janus-gateway/ && SRTP15X_CFLAGS="-I$(PWD)/Libs/janus-gateway/Libs/libsrtp/include" SRTP15X_LIBS="-L$(PWD)/Libs/janus-gateway/Libs/libsrtp" PKG_CONFIG_PATH=$(PWD)/Libs/janus-gateway/Libs/libsrtp ./configure --disable-all-plugins --disable-all-transports --disable-all-handlers --enable-rest --disable-turn-rest-api --enable-static --prefix=$(PWD)/Tools/janus --enable-plugin-streaming #--enable-libsrtp2
44
	cd $(PWD)/Libs/janus-gateway/ && SRTP15X_CFLAGS="-I$(PWD)/Libs/janus-gateway/Libs/libsrtp/include" SRTP15X_LIBS="-L$(PWD)/Libs/janus-gateway/Libs/libsrtp" PKG_CONFIG_PATH=$(PWD)/Libs/janus-gateway/Libs/libsrtp ./configure --disable-all-plugins --disable-all-transports --disable-all-handlers --enable-rest --disable-turn-rest-api --enable-static --prefix=$(PWD)/Tools/janus --enable-plugin-streaming --enable-plugin-videoroom #--enable-libsrtp2
45 45
	make -C Libs/janus-gateway/ install
46 46

  
47 47
tests:
Test/Makefile
9 9
CFLAGS += -g -W -Wall -Wno-unused-function -Wno-unused-parameter -O0 -I../Libs/mongoose/  -I../src -I../Libs/pstreamer/include -I../Libs/GRAPES/include
10 10
CFLAGS+=-L../Libs/GRAPES/src  -L../Libs/pstreamer/src 
11 11

  
12
all: $(LIBS) $(TARGET_SRC) $(TARGET_OBJS) $(OBJS) ../Tools/janus
12
all: $(LIBS) $(TARGET_SRC) $(TARGET_OBJS) $(OBJS) ../Tools/janus/bin/janus
13 13

  
14
../Tools/janus:
15
	make -C ../ Tools/janus
14
../Tools/janus/bin/janus:
15
	make -C ../ Tools/janus/bin/janus
16 16

  
17 17
Libs/mongoose/mongoose.o:
18 18
	git submodule init Libs/mongoose/
Test/janus_instance_test.c
110 110

  
111 111
int main(int argv, char ** argc)
112 112
{
113
	sprintf(janus_conf, "janus_executable=%s/../Tools/janus/bin/janus", dirname(argc[0]));
113
	char wdir[200];
114
	strncpy(wdir, dirname(argc[0]), 200);
115
	sprintf(janus_conf, "janus_executable=%s/../Tools/janus/bin/janus,janus_param=--configs-folder=%s/../Tools/janus_conf", wdir, wdir);
114 116

  
115 117
	janus_instance_create_test();
116 118
	janus_instance_launch_test();
Test/pstreamer_manager_test.c
9 9
#include<stdio.h>
10 10
#include<string.h>
11 11

  
12
#include<janus_instance.h>
12 13
#include"pstreamer.h"
13 14

  
14 15

  
......
66 67
	ps = pstreamer_manager_create_streamer(psm, "10.0.0.1", "6000", "42", "127.0.0.1", NULL);
67 68

  
68 69
	json = pstreamer_to_json(ps);
69
	fprintf(stderr, "[DEBUG] %s\n", json);
70 70
	assert(strcmp(json, "{\"id\":\"42\",\"source_ip\":\"10.0.0.1\",\"source_port\":\"6000\",\"janus_streaming_id\":\"0\"}") == 0);
71 71
	free(json);
72 72

  
......
119 119
	fprintf(stderr,"%s successfully passed!\n",__func__);
120 120
}
121 121

  
122
void pstreamer_manager_create_source_streamer_test()
123
{
124
	struct pstreamer_manager * psm = NULL;
125
	const struct pstreamer * ps = NULL;
126

  
127
	ps = pstreamer_manager_create_source_streamer(NULL, NULL, NULL, NULL);
128
	assert(ps == NULL);
129

  
130
	psm = pstreamer_manager_new(6000, NULL);
131

  
132
	ps = pstreamer_manager_create_source_streamer(psm, NULL, NULL, NULL);
133
	assert(ps == NULL);
134

  
135
	ps = pstreamer_manager_create_source_streamer(psm, "room1", "127.0.0.1", NULL);
136
	assert(ps);
137

  
138
	ps = pstreamer_manager_create_source_streamer(psm, "room1", "127.0.0.1", NULL);
139
	assert(ps == NULL);
140

  
141
	pstreamer_manager_destroy(&psm);
142
	fprintf(stderr,"%s successfully passed!\n",__func__);
143
}
144

  
145
void pstreamer_is_source_test()
146
{
147
	struct pstreamer_manager * psm = NULL;
148
	const struct pstreamer * ps = NULL;
149

  
150
	psm = pstreamer_manager_new(6000, NULL);
151

  
152
	assert(pstreamer_is_source(NULL) == 0);
153

  
154
	ps = pstreamer_manager_create_source_streamer(psm, "room1", "127.0.0.1", NULL);
155
	assert(ps);
156
	assert(pstreamer_is_source(ps));
157

  
158
	ps = pstreamer_manager_create_streamer(psm, "10.0.0.1", "6000", "42", "127.0.0.1", NULL);
159
	assert(ps);
160
	assert(pstreamer_is_source(ps) == 0);
161

  
162
	pstreamer_manager_destroy(&psm);
163
	fprintf(stderr,"%s successfully passed!\n",__func__);
164
}
165

  
166
void pstreamer_manager_sources_to_json_test()
167
{
168
	struct pstreamer_manager * psm = NULL;
169
	char * res;
170

  
171
	res = pstreamer_manager_sources_to_json(NULL);
172
	assert(res == NULL);
173

  
174
	psm = pstreamer_manager_new(6000, NULL);
175

  
176
	res = pstreamer_manager_sources_to_json(psm);
177
	assert(res);
178
	assert(strcmp(res,"[]")==0);
179
	free(res);
180

  
181
	pstreamer_manager_create_source_streamer(psm, "room1", "127.0.0.1", NULL);
182
	res = pstreamer_manager_sources_to_json(psm);
183
	assert(res);
184
	assert(strcmp(res,"[{\"id\":\"room1\",\"source_ip\":\"127.0.0.1\",\"source_port\":\"0\",\"janus_streaming_id\":\"0\"}]")==0);
185
	free(res);
186

  
187
	pstreamer_manager_create_streamer(psm, "10.0.0.1", "6000", "42", "127.0.0.1", NULL);
188
	res = pstreamer_manager_sources_to_json(psm);
189
	assert(res);
190
	assert(strcmp(res,"[{\"id\":\"room1\",\"source_ip\":\"127.0.0.1\",\"source_port\":\"0\",\"janus_streaming_id\":\"0\"}]")==0);
191
	free(res);
192

  
193
	pstreamer_manager_create_source_streamer(psm, "room2", "127.0.0.1", NULL);
194
	res = pstreamer_manager_sources_to_json(psm);
195
	assert(res);
196
	assert(strcmp(res,"[{\"id\":\"room1\",\"source_ip\":\"127.0.0.1\",\"source_port\":\"0\",\"janus_streaming_id\":\"0\"},\
197
{\"id\":\"room2\",\"source_ip\":\"127.0.0.1\",\"source_port\":\"0\",\"janus_streaming_id\":\"0\"}]")==0);
198
	free(res);
199

  
200
	pstreamer_manager_destroy(&psm);
201
	fprintf(stderr,"%s successfully passed!\n",__func__);
202
}
203

  
122 204
int main(int argv, char ** argc)
123 205
{
124 206
	pstreamer_manager_destroy_test();
......
126 208
	pstreamer_to_json_test();
127 209
	pstreamer_manager_destroy_streamer_test();
128 210
	pstreamer_manager_set_streamer_options_test();
211

  
212
	pstreamer_manager_create_source_streamer_test();
213
	pstreamer_is_source_test();
214
	pstreamer_manager_sources_to_json_test();
129 215
	return 0;
130 216
}
Tools/janus_conf/janus.plugin.videoroom.cfg
1
; [<unique room ID>]
2
; description = This is my awesome room
3
; is_private = yes|no (whether this room should be in the public list, default=yes)
4
; secret = <optional password needed for manipulating (e.g. destroying) the room>
5
; pin = <optional password needed for joining the room>
6
; require_pvtid = yes|no (whether subscriptions are required to provide a valid
7
;			a valid private_id to associate with a publisher, default=no)
8
; publishers = <max number of concurrent senders> (e.g., 6 for a video
9
;              conference or 1 for a webinar)
10
; bitrate = <max video bitrate for senders> (e.g., 128000)
11
; fir_freq = <send a FIR to publishers every fir_freq seconds> (0=disable)
12
; audiocodec = opus|g722|pcmu|pcma|isac32|isac16 (audio codec(s) to force on publishers, default=opus
13
;			can be a comma separated list in order of preference, e.g., opus,pcmu)
14
; videocodec = vp8|vp9|h264 (video codec(s) to force on publishers, default=vp8
15
;			can be a comma separated list in order of preference, e.g., vp9,vp8,h264)
16
; video_svc = yes|no (whether SVC support must be enabled; works only for VP9, default=no)
17
; audiolevel_ext = yes|no (whether the ssrc-audio-level RTP extension must
18
;		be negotiated/used or not for new publishers, default=yes)
19
; audiolevel_event = yes|no (whether to emit event to other users or not, default=no)
20
; audio_active_packets = 100 (number of packets with audio level, default=100, 2 seconds)
21
; audio_level_average = 25 (average value of audio level, 127=muted, 0='too loud', default=25)
22
; videoorient_ext = yes|no (whether the video-orientation RTP extension must
23
;		be negotiated/used or not for new publishers, default=yes)
24
; playoutdelay_ext = yes|no (whether the playout-delay RTP extension must
25
;		be negotiated/used or not for new publishers, default=yes)
26
; record = true|false (whether this room should be recorded, default=false)
27
; rec_dir = <folder where recordings should be stored, when enabled>
28
; notify_joining = true|false (optional, whether to notify all participants when a new
29
;               participant joins the room. The Videoroom plugin by design only notifies
30
;               new feeds (publishers), and enabling this may result extra notification
31
;               traffic. This flag is particularly useful when enabled with require_pvtid
32
;               for admin to manage listening only participants. default=false)
33

  
34
[general]
35
;admin_key = supersecret		; If set, rooms can be created via API only
36
								; if this key is provided in the request
37
;events = no					; Whether events should be sent to event
38
								; handlers (default is yes)
39

  
40
[1234]
41
description = WebRTC2RTP 
42
publishers = 1
43
bitrate = 128000
44
fir_freq = 10
45
;audiocodec = opus
46
;videocodec = vp8
47
record = false
48
;rec_dir = /path/to/recordings-folder
src/janus_instance.c
24 24
#include<unistd.h>
25 25
#include<tokens.h>
26 26
#include<grapes_config.h>
27
#include <sys/prctl.h>  // for process sync
27 28

  
28 29
#define INVALID_PID -1
29 30

  
30 31
#define JANUS_MSG_SESSION_CREATE "{\"transaction\": \"random\", \"janus\": \"create\"}"
31 32
#define JANUS_MSG_SESSION_KEEPALIVE "{\"transaction\": \"ciao\", \"janus\": \"keepalive\"}"
32
#define JANUS_MSG_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.streaming\"}"
33
#define JANUS_MSG_STREAMING_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.streaming\"}"
34
#define JANUS_MSG_VIDEOROOM_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.videoroom\"}"
33 35

  
34 36

  
35 37
struct janus_instance {
......
41 43
	struct mg_mgr *mongoose_srv;
42 44
	struct periodic_task * heartbeat;
43 45
	uint64_t management_session;
44
	uint64_t plugin_handle;
46
	uint64_t streaming_plugin_handle;
47
	uint64_t videoroom_plugin_handle;
45 48
	struct task_manager * tm;
46 49
};
47 50

  
......
63 66
	return res;
64 67
}
65 68

  
66
char * janus_instance_handle_path(const struct janus_instance * janus)
69
char * janus_instance_streaming_handle_path(const struct janus_instance * janus)
67 70
{
68 71
	char * res = NULL;
69
	if (janus && janus->management_session && janus->plugin_handle && janus->endpoint)
72
	if (janus && janus->management_session && janus->streaming_plugin_handle && janus->endpoint)
70 73
	{
71 74
		res = malloc(sizeof(char) * (strlen(janus->endpoint) + 43));  // each identifier comprises of 20 characters at most
72
		sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->plugin_handle);
75
		sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->streaming_plugin_handle);
76
	}
77
	return res;
78
}
79

  
80
char * janus_instance_videoroom_handle_path(const struct janus_instance * janus)
81
{
82
	char * res = NULL;
83
	if (janus && janus->management_session && janus->videoroom_plugin_handle && janus->endpoint)
84
	{
85
		res = malloc(sizeof(char) * (strlen(janus->endpoint) + 43));  // each identifier comprises of 20 characters at most
86
		sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->videoroom_plugin_handle);
73 87
	}
74 88
	return res;
75 89
}
......
101 115
		ji->logfile = strdup(grapes_config_value_str_default(tags, "janus_logfile", "janus.log"));
102 116
		ji->janus_pid = INVALID_PID;
103 117
		ji->management_session = 0;
104
		ji->plugin_handle = 0;
118
		ji->streaming_plugin_handle = 0;
119
		ji->videoroom_plugin_handle = 0;
105 120
		ji->tm = tm;
106 121
		ji->heartbeat = NULL;
107 122
		ji->mongoose_srv = mongoose_srv;
......
155 170
	}
156 171
}
157 172

  
158
void janus_instance_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
173
void janus_instance_streaming_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
159 174
{
160 175
	struct janus_instance * janus;
161 176
	struct http_message *hm = (struct http_message *) ev_data;
......
173 188
					buff = malloc(sizeof(char) * (hm->body.len + 1));
174 189
					strncpy(buff, hm->body.p, hm->body.len);
175 190
					buff[hm->body.len] = '\0';  // make sure string terminates
176
					janus->plugin_handle = janus_instance_msg_get_id(buff);
191
					janus->streaming_plugin_handle = janus_instance_msg_get_id(buff);
177 192
					free(buff);
178
					debug("Got plugin handle!\n");
193
					debug("Got plugin streaming_handle!\n");
194
				default:
195
					debug("Janus answers: %d\n", hm->resp_code);
196
			}
197
			nc->flags |= MG_F_SEND_AND_CLOSE;
198
			break;
199
		case MG_EV_CLOSE:
200
			debug("Janus server closed connection\n");
201
			break;
202
		default:
203
			break;
204
	}
205
}
206

  
207
void janus_instance_videoroom_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
208
{
209
	struct janus_instance * janus;
210
	struct http_message *hm = (struct http_message *) ev_data;
211
	char *buff;
212

  
213
	janus = nc->user_data;
214
	switch (ev) {
215
		case MG_EV_CONNECT:
216
			if (*(int *) ev_data != 0)
217
				debug("Janus communication failure\n");
218
			break;
219
		case MG_EV_HTTP_REPLY:
220
			switch (hm->resp_code) {
221
				case 200:
222
					buff = malloc(sizeof(char) * (hm->body.len + 1));
223
					strncpy(buff, hm->body.p, hm->body.len);
224
					buff[hm->body.len] = '\0';  // make sure string terminates
225
					janus->videoroom_plugin_handle = janus_instance_msg_get_id(buff);
226
					free(buff);
227
					debug("Got plugin videoroom_handle!\n");
179 228
				default:
180 229
					debug("Janus answers: %d\n", hm->resp_code);
181 230
			}
......
210 259
					buff[hm->body.len] = '\0';  // make sure string terminates
211 260
					janus->management_session = janus_instance_msg_get_id(buff);
212 261
					free(buff);
262
					
263
					// Requesting handle for the streaming plugin
264
					buff = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));
265
					sprintf(buff, "%s/%"PRId64"", janus->endpoint, janus->management_session);
266
					conn = mg_connect_http(janus->mongoose_srv, janus_instance_streaming_plugin_handler, buff, NULL, JANUS_MSG_STREAMING_PLUGIN_CREATE);
267
					free(buff);
268
					if (conn)
269
						conn->user_data = (void *) janus;
270

  
271
					// Requesting handle for the videoroom plugin
213 272
					buff = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));
214 273
					sprintf(buff, "%s/%"PRId64"", janus->endpoint, janus->management_session);
215
					conn = mg_connect_http(janus->mongoose_srv, janus_instance_plugin_handler, buff, NULL, JANUS_MSG_PLUGIN_CREATE);
274
					conn = mg_connect_http(janus->mongoose_srv, janus_instance_videoroom_plugin_handler, buff, NULL, JANUS_MSG_VIDEOROOM_PLUGIN_CREATE);
216 275
					free(buff);
217 276
					if (conn)
218 277
						conn->user_data = (void *) janus;
......
273 332

  
274 333
	if (ji && ji->janus_pid == INVALID_PID)
275 334
	{
335
		info("%s - %s\n", ji->executable, ji->conf_param);
276 336
		res = stat(ji->executable, &s);
277 337
		// check exe existence
278 338
		if (res == 0 && S_ISREG(s.st_mode))
......
294 354
					dup2(fd, 2);   // make stderr go to file - you may choose to not do this
295 355
					close(fd);
296 356

  
357
					prctl(PR_SET_PDEATHSIG, SIGHUP); // makes kernel dispatch me a SIGHUP if parent dies
358

  
297 359
					argv[0] = ji->executable;
298 360
					argv[1] = ji->conf_param;
299 361
					argv[2] = NULL;
......
359 421
	}
360 422
}
361 423

  
424
void janus_instance_videoroom_creation_handler(struct mg_connection *nc, int ev, void *ev_data)
425
{
426
	struct http_message *hm = (struct http_message *) ev_data;
427
	void ** data;
428
	struct streamer_creation_callback * scc;
429

  
430
	data = nc->user_data;
431
	scc = data[0];
432
	switch (ev) {
433
		case MG_EV_CONNECT:
434
			if (*(int *) ev_data != 0)
435
				debug("Janus communication failure\n");
436
			break;
437
		case MG_EV_HTTP_REPLY:
438
			switch (hm->resp_code) {
439
				case 200:
440
					info("Room created\n");
441
				default:
442
					debug("Janus answers: %d\n", hm->resp_code);
443
			}
444
			nc->flags |= MG_F_SEND_AND_CLOSE;
445
			break;
446
		case MG_EV_CLOSE:
447
			debug("Janus server closed connection\n");
448
			if (scc)
449
			{
450
				debug("Janus instance calls creation trigger\n");
451
				streamer_creation_callback_trigger(scc, 0);
452
				streamer_creation_callback_destroy(&scc);
453
				free(data);
454
			}
455
			break;
456
		default:
457
			break;
458
	}
459
}
460

  
362 461
int8_t janus_instance_create_streaming_point(struct janus_instance const * janus, uint64_t *mp_id, uint16_t audio_port, uint16_t video_port, struct streamer_creation_callback *scc)
363 462
{
364 463
	struct mg_connection * conn;
......
372 471

  
373 472
	if (janus && mp_id && audio_port && video_port)
374 473
	{
375
		uri = janus_instance_handle_path(janus);
474
		uri = janus_instance_streaming_handle_path(janus);
376 475
		if (uri)
377 476
		{
378 477
			sprintf(buff, fmt, audio_port, video_port);
......
403 502

  
404 503
	if (janus && mp_id)
405 504
	{
406
		uri = janus_instance_handle_path(janus);
505
		uri = janus_instance_streaming_handle_path(janus);
407 506
		if (uri)
408 507
		{
409 508
			sprintf(buff, fmt, mp_id);
......
418 517
	}
419 518
	return res;
420 519
}
520

  
521
int8_t janus_instance_create_videoroom(struct janus_instance const * janus, const char * room_id, struct streamer_creation_callback *scc)
522
{
523
	struct mg_connection * conn;
524
	int8_t res = -1;
525
	char * uri;
526
	char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"create\",\"room\":%s}}";
527

  
528
	char buff[280];
529
	void ** data;
530

  
531
	if (janus && room_id)
532
	{
533
		uri = janus_instance_videoroom_handle_path(janus);
534
		if (uri)
535
		{
536
			sprintf(buff, fmt, room_id);
537
		   debug("Conctating Janus to create a new video room\n");	
538
			conn = mg_connect_http(janus->mongoose_srv, janus_instance_videoroom_creation_handler, uri, NULL, buff);
539
			if (conn)
540
			{
541
				data = malloc(sizeof(void *));
542
				data[0] = scc;
543
				conn->user_data = data;
544
				res = 0;
545
			} else
546
			   debug("Aaargh, no connection!\n");	
547
			free(uri);
548
		}
549
	}
550
	return res;
551
}
552

  
553
int8_t janus_instance_destroy_videoroom(struct janus_instance const * janus, const char * room_id)
554
{
555
	struct mg_connection * conn;
556
	int8_t res = -1;
557
	char * uri;
558
	char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"destroy\",\"room\": %s}}";
559
	char buff[120];
560

  
561
	if (janus && room_id)
562
	{
563
		uri = janus_instance_videoroom_handle_path(janus);
564
		if (uri)
565
		{
566
			sprintf(buff, fmt, room_id);
567
			conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
568
			if (conn)
569
			{
570
				conn->user_data = (void *) room_id;
571
				res = 0;
572
			} 
573
			free(uri);
574
		}
575
	}
576
	return res;
577
}
578

  
579
int8_t janus_instance_forward_rtp(struct janus_instance const * janus, const char * room_id, uint64_t participant_id, const char * rtp_dest, uint16_t audio_port, uint16_t video_port)
580
{
581
	struct mg_connection * conn;
582
	int8_t res = -1;
583
	char * uri;
584
	char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"rtp_forward\",\"room\":%s,\"publisher_id\":%"PRId64", \"host\": %s,\"audio_port\":%"PRId16",\"video_port\":%"PRId16",\"audio_pt\":111,\"video_pt\":98}}";
585

  
586
	char buff[280];
587

  
588
	if (janus && room_id && rtp_dest && audio_port > 0 && video_port > 0)
589
	{
590
		uri = janus_instance_videoroom_handle_path(janus);
591
		if (uri)
592
		{
593
			sprintf(buff, fmt, room_id, participant_id, rtp_dest, audio_port, video_port);
594
		   debug("Conctating Janus to create a new video room\n");	
595
			conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
596
			if (conn)
597
				res = 0;
598
			else
599
			   debug("Aaargh, no connection!\n");	
600
			free(uri);
601
		}
602
	}
603
	return res;
604
}
src/janus_instance.h
39 39

  
40 40
int8_t janus_instance_destroy_streaming_point(struct janus_instance const * janus, uint64_t mp_id);
41 41

  
42
int8_t janus_instance_create_videoroom(struct janus_instance const * janus, const char * room_id, struct streamer_creation_callback *scc);
43

  
44
int8_t janus_instance_destroy_videoroom(struct janus_instance const * janus, const char * room_id);
45

  
46
int8_t janus_instance_forward_rtp(struct janus_instance const * janus, const char * room_id, uint64_t participant_id, const char * rtp_ip, uint16_t audio_port, uint16_t video_port);
47

  
42 48
#endif
src/name_lengths.h
9 9
#define MAX_URI_LENGTH 80
10 10
#define MAX_SDPFILENAME_LENGTH 255
11 11
#define MAX_PATH_LENGTH 255
12
#define MAX_JANUS_USERID_LENGTH 80
12 13

  
13 14
#endif
src/path_handlers.c
28 28
#include<mongoose.h>
29 29
#include<streamer_creation_callback.h>
30 30

  
31
void mg_connection_remote_ip(char * ip, const struct mg_connection *nc)
32
{
33
	const struct sockaddr_in *sender;
34
	sender = (struct sockaddr_in *)&(nc->sa);
35
	inet_ntop(AF_INET, &(sender->sin_addr), ip, MAX_IPADDR_LENGTH);
36
}
37

  
38 31
char * mg_uri_field(struct http_message *hm, uint8_t pos)
39 32
{
40 33
	char * uri;
......
71 64
	free(channels);
72 65
}
73 66

  
67
int8_t source_streamer_creation_handler(struct mg_connection *nc, const struct pschannel_bucket *psb, const struct pstreamer * ps, int8_t ret)
68
{
69
	char * json = NULL;
70
	int8_t res = -1;
71

  
72
	info("Inside source creation handler\n");
73
	if (ps)
74
		json = pstreamer_to_json(ps);
75

  
76
	if (ret == 0 && json)
77
	{
78
		mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-type: application/json\r\n\r\n");
79
		mg_printf_http_chunk(nc, json);
80
		res = 0;
81
		info("Source room created and served\n");
82
	} else {
83
		mg_printf(nc, "%s", "HTTP/1.1 500 Internal server error\r\nTransfer-Encoding: chunked\r\n\r\n");
84
		// destroy ps?
85
		info("Stream room cannot be correctly created\n");
86
		if (ret)
87
			debug(json);
88
		else
89
			debug("PS does not exist");
90
	}
91
	if (json)
92
		free(json);
93
	mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
94

  
95
	return res;
96
}
97

  
74 98
int8_t streamer_creation_handler(struct mg_connection *nc, const struct pschannel_bucket *psb, const struct pstreamer * ps, int8_t ret)
75 99
{
76 100
	char * json = NULL;
......
125 149
	mg_get_http_var(&hm->body, "port", port, MAX_PORT_LENGTH);
126 150

  
127 151
	id = mg_uri_field(hm, 1);
128
	mg_connection_remote_ip(rtp_dst_ip, nc);
152
	mg_conn_addr_to_str(nc, rtp_dst_ip, MAX_IPADDR_LENGTH, MG_SOCK_STRINGIFY_IP|MG_SOCK_STRINGIFY_REMOTE);
129 153

  
130 154
	info("POST request for resource %s from %s\n", id, rtp_dst_ip);
131 155
	ch = pschannel_bucket_find(c->pb, ipaddr, port);
......
180 204

  
181 205
	free(id);
182 206
}
207

  
208
void source_index(struct mg_connection *nc, struct http_message *hm)
209
{
210
	char * channels;
211
	const struct context * c;
212

  
213
	c = (const struct context *) nc->user_data;
214

  
215
	info("GET request for source\n");
216
	channels = pstreamer_manager_sources_to_json(c->psm);
217
	debug("\t%s\n", channels);
218

  
219
	mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-type: application/json\r\n\r\n");
220
	mg_printf_http_chunk(nc, "%s", channels);
221
	mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
222

  
223
	free(channels);
224
}
225

  
226
void source_streamer_create(struct mg_connection *nc, struct http_message *hm)
227
{
228
	const struct context * c;
229
	char rtp_source_ip[MAX_IPADDR_LENGTH];
230
	char * id;
231
	const struct pstreamer * ps = NULL;
232
	const struct pschannel * ch = NULL;
233

  
234
	c = (const struct context *) nc->user_data;
235

  
236
	id = mg_uri_field(hm, 1);
237
	mg_conn_addr_to_str(nc, rtp_source_ip, MAX_IPADDR_LENGTH, MG_SOCK_STRINGIFY_IP);
238

  
239
	info("POST request for source resource %s from %s\n", id, rtp_source_ip);
240

  
241
	ps = pstreamer_manager_create_source_streamer(c->psm, id, rtp_source_ip, streamer_creation_callback_new(nc, c->pb, source_streamer_creation_handler)); 
242
	if(ps)
243
	{
244
		pstreamer_schedule_tasks((struct pstreamer*)ps, c->tm);
245
		info("Source streamer instance created\n");
246
	} else {
247
		info("Source streamer could not be launched\n");
248
		mg_printf(nc, "%s", "HTTP/1.1 409 Conflict\r\nTransfer-Encoding: chunked\r\n\r\n");
249
		mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
250
	}
251

  
252
	free(id);
253
}
254

  
255
void source_streamer_update(struct mg_connection *nc, struct http_message *hm)
256
{
257
	char * id, * json;
258
	char janus_user_id[MAX_JANUS_USERID_LENGTH+1];
259
	const struct pstreamer * ps;
260
	const struct context * c;
261

  
262
	c = (const struct context *) nc->user_data;
263
	id = mg_uri_field(hm, 1);
264

  
265
	ps = pstreamer_manager_get_streamer(c->psm, id);
266
	info("UPDATE request for source resource %s\n", id);
267
	if (ps)
268
	{
269
		mg_get_http_var(&hm->body, "participant_id", janus_user_id, MAX_JANUS_USERID_LENGTH);
270

  
271
		pstreamer_source_touch(c->psm, (struct pstreamer*) ps, atoll(janus_user_id));
272
		info("\tSource instance %s found and touched\n", id);
273
		json = pstreamer_to_json(ps);
274
		mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-type: application/json\r\n\r\n");
275
		mg_printf_http_chunk(nc, "%s", json);
276
		mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
277
		free(json);
278
	} else {
279
		info("\tInstance %s not found\n", id);
280
		mg_printf(nc, "%s", "HTTP/1.1 404 Not Found\r\nTransfer-Encoding: chunked\r\n\r\n");
281
		mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
282
	}
283
	free(id);
284
}
285

  
src/path_handlers.h
29 29
char * mg_uri_field(struct http_message *hm, uint8_t pos);
30 30

  
31 31
void channel_index(struct mg_connection *nc, struct http_message *hm);
32

  
33 32
void streamer_create(struct mg_connection *nc, struct http_message *hm);
34

  
35 33
void streamer_update(struct mg_connection *nc, struct http_message *hm);
36 34

  
35
void source_index(struct mg_connection *nc, struct http_message *hm);
36
void source_streamer_create(struct mg_connection *nc, struct http_message *hm);
37
void source_streamer_update(struct mg_connection *nc, struct http_message *hm);
38

  
37 39
uint8_t load_path_handlers(struct router *r)
38 40
{
39 41
	uint8_t res = 0;
......
42 44
	res |= router_add_route(r, "POST", "^/channels/[a-zA-Z0-9]+$", streamer_create);
43 45
	res |= router_add_route(r, "UPDATE", "^/channels/[a-zA-Z0-9]+$", streamer_update);
44 46

  
47
	res |= router_add_route(r, "GET", "^/sources$", source_index);
48
	res |= router_add_route(r, "POST", "^/sources/[a-zA-Z0-9]+$", source_streamer_create);
49
	res |= router_add_route(r, "UPDATE", "^/sources/[a-zA-Z0-9]+$", source_streamer_update);
50

  
45 51
	return res;
46 52
}
47 53

  
src/periodic_task_intfs.c
155 155

  
156 156
	return 0;
157 157
}
158

  
159
uint8_t pstreamer_inject_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
160
{
161
	struct psinstance * ps;
162
	ps = (struct psinstance *) periodic_task_get_data(pt);
163
	if (ret == 0)
164
	{
165
		debug("Chunk seeding time\n");
166
		psinstance_inject_chunk(ps);
167
	}
168
	return 0;
169
}
src/periodic_task_intfs.h
33 33

  
34 34
uint8_t pstreamer_offer_task_reinit(struct periodic_task * pt);
35 35

  
36
uint8_t pstreamer_inject_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
37

  
36 38
uint8_t pstreamer_msg_handling_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
37 39

  
38 40
uint8_t pstreamer_msg_handling_task_reinit(struct periodic_task * pt);
src/pstreamer.c
29 29
#include<debug.h>
30 30

  
31 31
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32
#define STREAMER_PEER_CONF "port=%d,dechunkiser=rtp,base=%d,addr=%s"
33
#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,base=%d,addr=%s,max_delay_ms=50"
32 34

  
33 35
struct pstreamer {
34 36
	char source_ip[MAX_IPADDR_LENGTH];
......
39 41
	uint16_t base_port;
40 42
	struct periodic_task * topology_task;
41 43
	struct periodic_task * offer_task;
44
	struct periodic_task * inject_task;
42 45
	struct periodic_task * msg_task;
43 46
	struct task_manager * tm;
44 47
	timeout topology_interval;
......
52 55
	struct janus_instance const * janus;
53 56
};
54 57

  
55
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_dst_ip, const char * opts)
58
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_ip, const char * fmt, const char * opts)
56 59
/* we assume source_ip and source_port are valid strings */
57 60
{
58 61
	char config[MAX_PSINSTANCE_CONFIG_LENGTH];
59
	char * fmt = "port=%d,dechunkiser=rtp,base=%d,addr=%s";
60 62
	int count;
61 63

  
62
	count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_dst_ip);
64
	count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_ip);
63 65
	if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
64 66
		snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
65 67
	ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
66 68

  
67 69
	ps->topology_task = NULL;
68 70
	ps->offer_task = NULL;
71
	ps->inject_task = NULL;
69 72
	ps->msg_task = NULL;
70 73
	ps->tm = NULL;
71 74
	ps->topology_interval = 400;
......
85 88
		task_manager_destroy_task(ps->tm, &(ps->topology_task));
86 89
	if (ps->offer_task)
87 90
		task_manager_destroy_task(ps->tm, &(ps->offer_task));
91
	if (ps->inject_task)
92
		task_manager_destroy_task(ps->tm, &(ps->inject_task));
88 93
	if (ps->msg_task)
89 94
		task_manager_destroy_task(ps->tm, &(ps->msg_task));
90 95
	psinstance_destroy(&(ps->psc));
......
168 173
		ord_set_for_each(ps, (*psm)->streamers)
169 174
		{
170 175
			if ((*psm)->janus)
171
				janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
176
				if (pstreamer_is_source(ps))
177
					janus_instance_destroy_videoroom((*psm)->janus, ((struct pstreamer*)ps)->id);
178
				else
179
					janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
172 180
			pstreamer_deinit((struct pstreamer *)ps);
173 181
		}
174 182

  
......
196 204
		ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
197 205
		ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
198 206
		ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
207
		if (pstreamer_is_source(ps))
208
			ps->inject_task = task_manager_new_task(tm, NULL, pstreamer_inject_task_callback, 25, ps->psc); 
199 209
	}
200 210
	return 0;
201 211
}
......
263 273
			else
264 274
				if (scc)
265 275
					streamer_creation_callback_trigger(scc, 0);
266
			pstreamer_init(ps, rtp_dst_ip, psm->streamer_opts);
276
			pstreamer_init(ps, rtp_dst_ip, STREAMER_PEER_CONF, psm->streamer_opts);
267 277
			ord_set_insert(psm->streamers, ps, 0);
268 278
		} else
269 279
		{
......
340 350
	}
341 351
	return NULL;
342 352
}
353

  
354
uint8_t pstreamer_is_source(const struct pstreamer * ps)
355
{
356
	return ps && (ps->source_port == 0) ? 1 : 0;
357
}
358

  
359
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm)
360
{
361
	char * res = NULL, * ps_json;
362
	uint32_t pos;
363
	const void * iter;
364
	const struct pstreamer * ps;
365

  
366
	if (psm)
367
	{
368
		res = malloc(sizeof(char)*3);
369
		res[0] = '[';
370
		pos = 1;
371

  
372
		ord_set_for_each(iter, psm->streamers)
373
		{
374
			ps = (const struct pstreamer *) iter;
375
			if (pstreamer_is_source(ps))
376
			{
377
				ps_json = pstreamer_to_json(ps);
378
				if (ps_json)
379
				{
380
					res = realloc(res, sizeof(char)*(pos+strlen(ps_json) + (pos == 1? 2 : 3)));
381
					if (pos > 1)
382
						res[pos++] = ',';
383
					strcpy(res+pos, ps_json);
384
					pos += strlen(ps_json);
385
					free(ps_json);
386
				}
387
			}
388
		}
389
		res[pos++] = ']';
390
		res[pos] = '\0';
391
	}
392
	return res;
393
}
394

  
395
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc)
396
{
397
	struct pstreamer * ps = NULL;
398
	const void * ptr = NULL;
399

  
400
	if (psm && id && rtp_src_ip)
401
	{
402
		ps = malloc(sizeof(struct pstreamer));
403
		strncpy(ps->source_ip, rtp_src_ip, MAX_IPADDR_LENGTH);
404
		ps->source_port = 0;
405
		strncpy(ps->id, id, PSID_LENGTH);
406
		ps->base_port = assign_streaming_ports(psm); 
407
		ptr = ord_set_find(psm->streamers, (const void *) ps);
408
		if (ptr == NULL)
409
		{
410
			pstreamer_touch(ps);
411
			streamer_creation_set_pstreamer_ref(scc, ps);
412
			if (psm->janus)
413
			{
414
				debug("calling janus upon notification of perstreamer source creation\n");
415
				janus_instance_create_videoroom(psm->janus, ps->id, scc);
416
			}
417
			else
418
				if (scc)
419
					streamer_creation_callback_trigger(scc, 0);
420
			pstreamer_init(ps, ps->source_ip, STREAMER_SOURCE_CONF, psm->streamer_opts);
421
			ord_set_insert(psm->streamers, ps, 0);
422
		} else
423
		{
424
			free(ps);
425
			ps = NULL;
426
		}
427
	}
428

  
429
	return ps;
430
}
431

  
432
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id)
433
{
434
	if (psm && ps)
435
	{
436
		pstreamer_touch(ps);
437
		if (psm->janus && pstreamer_is_source(ps))
438
		{
439
			janus_instance_forward_rtp(psm->janus, ps->id, janus_id, ps->source_ip, ps->base_port+1, ps->base_port+3);
440
			ps->janus_streaming_id = janus_id;
441
		}
442
	}
443
}
src/pstreamer.h
60 60

  
61 61
int8_t pstreamer_manager_set_streamer_options(struct pstreamer_manager *psm, const char * opts);
62 62

  
63
uint8_t pstreamer_is_source(const struct pstreamer * ps);
64

  
65
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm);
66

  
67
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc);
68

  
69
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id);
70

  
63 71
#endif
src/task_manager.h
24 24
#include<sys/select.h>
25 25
#include<mongoose.h>
26 26

  
27
typedef uint16_t timeout;
27
typedef uint16_t timeout; // milliseconds
28 28

  
29 29
struct periodic_task;
30 30

  

Also available in: Unified diff