Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ 78955474

History | View | Annotate | Download (130 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand|rtsp
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
       rtsp = stream originated by an external RTSP feed (only
50
              available if libcurl support was compiled)
51
id = <unique numeric ID>
52
description = This is my awesome stream
53
is_private = yes|no (private streams don't appear when you do a 'list' request)
54
filename = path to the local file to stream (only for live/ondemand)
55
secret = <optional password needed for manipulating (e.g., destroying
56
                or enabling/disabling) the stream>
57
audio = yes|no (do/don't stream audio)
58
video = yes|no (do/don't stream video)
59
   The following options are only valid for the 'rtp' type:
60
audioport = local port for receiving audio frames
61
audiomcast = multicast group port for receiving audio frames
62
audiopt = <audio RTP payload type> (e.g., 111)
63
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
64
audiofmtp = Codec specific parameters, if any
65
videoport = local port for receiving video frames (only for rtp)
66
videomcast = multicast group port for receiving video frames
67
videopt = <video RTP payload type> (e.g., 100)
68
videortpmap = RTP map of the video codec (e.g., VP8/90000)
69
videofmtp = Codec specific parameters, if any
70
   The following options are only valid for the 'rstp' type:
71
url = RTSP stream URL (only if type=rtsp)
72
\endverbatim
73
 *
74
 * \section streamapi Streaming API
75
 * 
76
 * The Streaming API supports several requests, some of which are
77
 * synchronous and some asynchronous. There are some situations, though,
78
 * (invalid JSON, invalid request) which will always result in a
79
 * synchronous error response even for asynchronous requests. 
80
 * 
81
 * \c list , \c create , \c destroy , \c recording , \c enable and
82
 * \c disable are synchronous requests, which means you'll
83
 * get a response directly within the context of the transaction. \c list
84
 * lists all the available streams; \c create allows you to create a new
85
 * mountpoint dynamically, as an alternative to using the configuration
86
 * file; \c destroy removes a mountpoint and destroys it; \c recording
87
 * instructs the plugin on whether or not a live RTP stream should be
88
 * recorded while it's broadcasted; \c enable and \c disable respectively
89
 * enable and disable a mountpoint, that is decide whether or not a
90
 * mountpoint should be available to users without destroying it.
91
 * 
92
 * The \c watch , \c start , \c pause , \c switch and \c stop requests
93
 * instead are all asynchronous, which means you'll get a notification
94
 * about their success or failure in an event. \c watch asks the plugin
95
 * to prepare the playout of one of the available streams; \c start
96
 * starts the actual playout; \c pause allows you to pause a playout
97
 * without tearing down the PeerConnection; \c switch allows you to
98
 * switch to a different mountpoint of the same kind (note: only live
99
 * RTP mountpoints supported as of now) without having to stop and watch
100
 * the new one; \c stop stops the playout and tears the PeerConnection
101
 * down.
102
 * 
103
 * Actual API docs: TBD.
104
 * 
105
 * \ingroup plugins
106
 * \ref plugins
107
 */
108

    
109
#include "plugin.h"
110

    
111
#include <jansson.h>
112
#include <errno.h>
113
#include <sys/poll.h>
114
#include <sys/time.h>
115

    
116
#ifdef HAVE_LIBCURL
117
#include <curl/curl.h>
118
#endif
119

    
120
#include "../debug.h"
121
#include "../apierror.h"
122
#include "../config.h"
123
#include "../mutex.h"
124
#include "../rtp.h"
125
#include "../rtcp.h"
126
#include "../record.h"
127
#include "../utils.h"
128

    
129

    
130
/* Plugin information */
131
#define JANUS_STREAMING_VERSION                        5
132
#define JANUS_STREAMING_VERSION_STRING        "0.0.5"
133
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
134
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
135
#define JANUS_STREAMING_AUTHOR                        "Meetecho s.r.l."
136
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
137

    
138
/* Plugin methods */
139
janus_plugin *create(void);
140
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
141
void janus_streaming_destroy(void);
142
int janus_streaming_get_api_compatibility(void);
143
int janus_streaming_get_version(void);
144
const char *janus_streaming_get_version_string(void);
145
const char *janus_streaming_get_description(void);
146
const char *janus_streaming_get_name(void);
147
const char *janus_streaming_get_author(void);
148
const char *janus_streaming_get_package(void);
149
void janus_streaming_create_session(janus_plugin_session *handle, int *error);
150
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
151
void janus_streaming_setup_media(janus_plugin_session *handle);
152
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
153
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
154
void janus_streaming_hangup_media(janus_plugin_session *handle);
155
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
156
char *janus_streaming_query_session(janus_plugin_session *handle);
157

    
158
/* Plugin setup */
159
static janus_plugin janus_streaming_plugin =
160
        JANUS_PLUGIN_INIT (
161
                .init = janus_streaming_init,
162
                .destroy = janus_streaming_destroy,
163

    
164
                .get_api_compatibility = janus_streaming_get_api_compatibility,
165
                .get_version = janus_streaming_get_version,
166
                .get_version_string = janus_streaming_get_version_string,
167
                .get_description = janus_streaming_get_description,
168
                .get_name = janus_streaming_get_name,
169
                .get_author = janus_streaming_get_author,
170
                .get_package = janus_streaming_get_package,
171
                
172
                .create_session = janus_streaming_create_session,
173
                .handle_message = janus_streaming_handle_message,
174
                .setup_media = janus_streaming_setup_media,
175
                .incoming_rtp = janus_streaming_incoming_rtp,
176
                .incoming_rtcp = janus_streaming_incoming_rtcp,
177
                .hangup_media = janus_streaming_hangup_media,
178
                .destroy_session = janus_streaming_destroy_session,
179
                .query_session = janus_streaming_query_session,
180
        );
181

    
182
/* Plugin creator */
183
janus_plugin *create(void) {
184
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_STREAMING_NAME);
185
        return &janus_streaming_plugin;
186
}
187

    
188

    
189
/* Useful stuff */
190
static gint initialized = 0, stopping = 0;
191
static janus_callbacks *gateway = NULL;
192
static GThread *handler_thread;
193
static GThread *watchdog;
194
static void *janus_streaming_handler(void *data);
195
static void *janus_streaming_ondemand_thread(void *data);
196
static void *janus_streaming_filesource_thread(void *data);
197
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
198
static void *janus_streaming_relay_thread(void *data);
199

    
200
typedef enum janus_streaming_type {
201
        janus_streaming_type_none = 0,
202
        janus_streaming_type_live,
203
        janus_streaming_type_on_demand,
204
} janus_streaming_type;
205

    
206
typedef enum janus_streaming_source {
207
        janus_streaming_source_none = 0,
208
        janus_streaming_source_file,
209
        janus_streaming_source_rtp,
210
} janus_streaming_source;
211

    
212
typedef struct janus_streaming_rtp_source {
213
        in_addr_t audio_mcast;
214
        gint audio_port;
215
        in_addr_t video_mcast;
216
        gint video_port;
217
        janus_recorder *arc;        /* The Janus recorder instance for this streams's audio, if enabled */
218
        janus_recorder *vrc;        /* The Janus recorder instance for this streams's video, if enabled */
219
        int audio_fd;
220
        int video_fd;
221
#ifdef HAVE_LIBCURL
222
        CURL* curl;
223
#endif
224
} janus_streaming_rtp_source;
225

    
226
typedef struct janus_streaming_file_source {
227
        char *filename;
228
} janus_streaming_file_source;
229

    
230
typedef struct janus_streaming_codecs {
231
        gint audio_pt;
232
        char *audio_rtpmap;
233
        char *audio_fmtp;
234
        gint video_pt;
235
        char *video_rtpmap;
236
        char *video_fmtp;
237
} janus_streaming_codecs;
238

    
239
typedef struct janus_streaming_mountpoint {
240
        gint64 id;
241
        char *name;
242
        char *description;
243
        gboolean is_private;
244
        char *secret;
245
        gboolean enabled;
246
        gboolean active;
247
        janus_streaming_type streaming_type;
248
        janus_streaming_source streaming_source;
249
        void *source;        /* Can differ according to the source type */
250
        GDestroyNotify source_destroy;
251
        janus_streaming_codecs codecs;
252
        GList/*<unowned janus_streaming_session>*/ *listeners;
253
        gint64 destroyed;
254
        janus_mutex mutex;
255
} janus_streaming_mountpoint;
256
GHashTable *mountpoints;
257
janus_mutex mountpoints_mutex;
258

    
259
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp);
260

    
261
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
262
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
263
                uint64_t id, char *name, char *desc,
264
                gboolean doaudio, char* amcast,uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
265
                gboolean dovideo, char* vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp);
266
/* Helper to create a file/ondemand live source */
267
janus_streaming_mountpoint *janus_streaming_create_file_source(
268
                uint64_t id, char *name, char *desc, char *filename,
269
                gboolean live, gboolean doaudio, gboolean dovideo);
270
/* Helper to create a rtsp live source */
271
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
272
                uint64_t id, char *name, char *desc, char *url,
273
                gboolean doaudio, gboolean dovideo);
274

    
275

    
276
typedef struct janus_streaming_message {
277
        janus_plugin_session *handle;
278
        char *transaction;
279
        json_t *message;
280
        char *sdp_type;
281
        char *sdp;
282
} janus_streaming_message;
283
static GAsyncQueue *messages = NULL;
284

    
285
void janus_streaming_message_free(janus_streaming_message *msg);
286
void janus_streaming_message_free(janus_streaming_message *msg) {
287
        if(!msg)
288
                return;
289

    
290
        msg->handle = NULL;
291

    
292
        g_free(msg->transaction);
293
        msg->transaction = NULL;
294
        if(msg->message)
295
                json_decref(msg->message);
296
        msg->message = NULL;
297
        g_free(msg->sdp_type);
298
        msg->sdp_type = NULL;
299
        g_free(msg->sdp);
300
        msg->sdp = NULL;
301

    
302
        g_free(msg);
303
}
304

    
305

    
306
typedef struct janus_streaming_context {
307
        /* Needed to fix seq and ts in case of stream switching */
308
        uint32_t a_last_ssrc, a_last_ts, a_base_ts, a_base_ts_prev,
309
                        v_last_ssrc, v_last_ts, v_base_ts, v_base_ts_prev;
310
        uint16_t a_last_seq, a_base_seq, a_base_seq_prev,
311
                        v_last_seq, v_base_seq, v_base_seq_prev;
312
} janus_streaming_context;
313

    
314
typedef struct janus_streaming_session {
315
        janus_plugin_session *handle;
316
        janus_streaming_mountpoint *mountpoint;
317
        gboolean started;
318
        gboolean paused;
319
        janus_streaming_context context;
320
        gboolean stopping;
321
        gboolean hangingup;
322
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
323
} janus_streaming_session;
324
static GHashTable *sessions;
325
static GList *old_sessions;
326
static janus_mutex sessions_mutex;
327

    
328
/* Packets we get from gstreamer and relay */
329
typedef struct janus_streaming_rtp_relay_packet {
330
        rtp_header *data;
331
        gint length;
332
        gint is_video;
333
        uint32_t timestamp;
334
        uint16_t seq_number;
335
} janus_streaming_rtp_relay_packet;
336

    
337

    
338
/* Error codes */
339
#define JANUS_STREAMING_ERROR_NO_MESSAGE                        450
340
#define JANUS_STREAMING_ERROR_INVALID_JSON                        451
341
#define JANUS_STREAMING_ERROR_INVALID_REQUEST                452
342
#define JANUS_STREAMING_ERROR_MISSING_ELEMENT                453
343
#define JANUS_STREAMING_ERROR_INVALID_ELEMENT                454
344
#define JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT        455
345
#define JANUS_STREAMING_ERROR_CANT_CREATE                        456
346
#define JANUS_STREAMING_ERROR_UNAUTHORIZED                        457
347
#define JANUS_STREAMING_ERROR_CANT_SWITCH                        458
348
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR                        470
349

    
350

    
351
/* Streaming watchdog/garbage collector (sort of) */
352
void *janus_streaming_watchdog(void *data);
353
void *janus_streaming_watchdog(void *data) {
354
        JANUS_LOG(LOG_INFO, "Streaming watchdog started\n");
355
        gint64 now = 0;
356
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
357
                janus_mutex_lock(&sessions_mutex);
358
                /* Iterate on all the sessions */
359
                now = janus_get_monotonic_time();
360
                if(old_sessions != NULL) {
361
                        GList *sl = old_sessions;
362
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming sessions...\n", g_list_length(old_sessions));
363
                        while(sl) {
364
                                janus_streaming_session *session = (janus_streaming_session *)sl->data;
365
                                if(!session) {
366
                                        sl = sl->next;
367
                                        continue;
368
                                }
369
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
370
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
371
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming session\n");
372
                                        GList *rm = sl->next;
373
                                        old_sessions = g_list_delete_link(old_sessions, sl);
374
                                        sl = rm;
375
                                        session->handle = NULL;
376
                                        g_free(session);
377
                                        session = NULL;
378
                                        continue;
379
                                }
380
                                sl = sl->next;
381
                        }
382
                }
383
                janus_mutex_unlock(&sessions_mutex);
384
                g_usleep(500000);
385
        }
386
        JANUS_LOG(LOG_INFO, "Streaming watchdog stopped\n");
387
        return NULL;
388
}
389

    
390

    
391
/* Plugin implementation */
392
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
393
#ifdef HAVE_LIBCURL        
394
        curl_global_init(CURL_GLOBAL_ALL);
395
#endif        
396
        if(g_atomic_int_get(&stopping)) {
397
                /* Still stopping from before */
398
                return -1;
399
        }
400
        if(callback == NULL || config_path == NULL) {
401
                /* Invalid arguments */
402
                return -1;
403
        }
404

    
405
        /* Read configuration */
406
        char filename[255];
407
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
408
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
409
        janus_config *config = janus_config_parse(filename);
410
        if(config != NULL)
411
                janus_config_print(config);
412
        
413
        mountpoints = g_hash_table_new_full(NULL, NULL, NULL,
414
                                            (GDestroyNotify) janus_streaming_mountpoint_free);
415
        janus_mutex_init(&mountpoints_mutex);
416
        /* Parse configuration to populate the mountpoints */
417
        if(config != NULL) {
418
                janus_config_category *cat = janus_config_get_categories(config);
419
                while(cat != NULL) {
420
                        if(cat->name == NULL) {
421
                                cat = cat->next;
422
                                continue;
423
                        }
424
                        JANUS_LOG(LOG_VERB, "Adding stream '%s'\n", cat->name);
425
                        janus_config_item *type = janus_config_get_item(cat, "type");
426
                        if(type == NULL || type->value == NULL) {
427
                                JANUS_LOG(LOG_VERB, "  -- Invalid type, skipping stream...\n");
428
                                cat = cat->next;
429
                                continue;
430
                        }
431
                        if(!strcasecmp(type->value, "rtp")) {
432
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
433
                                janus_config_item *id = janus_config_get_item(cat, "id");
434
                                janus_config_item *desc = janus_config_get_item(cat, "description");
435
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
436
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
437
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
438
                                janus_config_item *video = janus_config_get_item(cat, "video");
439
                                janus_config_item *amcast = janus_config_get_item(cat, "audiomcast");
440
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
441
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
442
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
443
                                janus_config_item *afmtp = janus_config_get_item(cat, "audiofmtp");
444
                                janus_config_item *vmcast = janus_config_get_item(cat, "videomcast");
445
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
446
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
447
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
448
                                janus_config_item *vfmtp = janus_config_get_item(cat, "videofmtp");
449
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
450
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
451
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
452
                                if(!doaudio && !dovideo) {
453
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', no audio or video have to be streamed...\n", cat->name);
454
                                        cat = cat->next;
455
                                        continue;
456
                                }
457
                                if(doaudio &&
458
                                                (aport == NULL || aport->value == NULL ||
459
                                                acodec == NULL || acodec->value == NULL ||
460
                                                artpmap == NULL || artpmap->value == NULL)) {
461
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for audio...\n", cat->name);
462
                                        cat = cat->next;
463
                                        continue;
464
                                }
465
                                if(dovideo &&
466
                                                (vport == NULL || vport->value == NULL ||
467
                                                vcodec == NULL || vcodec->value == NULL ||
468
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
469
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for video...\n", cat->name);
470
                                        cat = cat->next;
471
                                        continue;
472
                                }
473
                                if(id == NULL || id->value == NULL) {
474
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
475
                                } else {
476
                                        janus_mutex_lock(&mountpoints_mutex);
477
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
478
                                        janus_mutex_unlock(&mountpoints_mutex);
479
                                        if(mp != NULL) {
480
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
481
                                                cat = cat->next;
482
                                                continue;
483
                                        }
484
                                }
485
                                JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
486
                                janus_streaming_mountpoint *mp = NULL;
487
                                if((mp = janus_streaming_create_rtp_source(
488
                                                (id && id->value) ? atoi(id->value) : 0,
489
                                                (char *)cat->name,
490
                                                desc ? (char *)desc->value : NULL,
491
                                                doaudio,
492
                                                amcast ? (char *)amcast->value : NULL,
493
                                                (aport && aport->value) ? atoi(aport->value) : 0,
494
                                                (acodec && acodec->value) ? atoi(acodec->value) : 0,
495
                                                artpmap ? (char *)artpmap->value : NULL,
496
                                                afmtp ? (char *)afmtp->value : NULL,
497
                                                dovideo,
498
                                                vmcast ? (char *)vmcast->value : NULL,
499
                                                (vport && vport->value) ? atoi(vport->value) : 0,
500
                                                (vcodec && vcodec->value) ? atoi(vcodec->value) : 0,
501
                                                vrtpmap ? (char *)vrtpmap->value : NULL,
502
                                                vfmtp ? (char *)vfmtp->value : NULL)) == NULL) {
503
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream '%s'...\n", cat->name);
504
                                        cat = cat->next;
505
                                        continue;
506
                                }
507
                                mp->is_private = is_private;
508
                                if(secret && secret->value)
509
                                        mp->secret = g_strdup(secret->value);
510
                        } else if(!strcasecmp(type->value, "live")) {
511
                                /* File live source */
512
                                janus_config_item *id = janus_config_get_item(cat, "id");
513
                                janus_config_item *desc = janus_config_get_item(cat, "description");
514
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
515
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
516
                                janus_config_item *file = janus_config_get_item(cat, "filename");
517
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
518
                                janus_config_item *video = janus_config_get_item(cat, "video");
519
                                if(file == NULL || file->value == NULL) {
520
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', missing mandatory information...\n", cat->name);
521
                                        cat = cat->next;
522
                                        continue;
523
                                }
524
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
525
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
526
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
527
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
528
                                if(!doaudio || dovideo) {
529
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', we only support audio file streaming right now...\n", cat->name);
530
                                        cat = cat->next;
531
                                        continue;
532
                                }
533
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
534
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
535
                                        cat = cat->next;
536
                                        continue;
537
                                }
538
                                if(id == NULL || id->value == NULL) {
539
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
540
                                } else {
541
                                        janus_mutex_lock(&mountpoints_mutex);
542
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
543
                                        janus_mutex_unlock(&mountpoints_mutex);
544
                                        if(mp != NULL) {
545
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
546
                                                cat = cat->next;
547
                                                continue;
548
                                        }
549
                                }
550
                                janus_streaming_mountpoint *mp = NULL;
551
                                if((mp = janus_streaming_create_file_source(
552
                                                (id && id->value) ? atoi(id->value) : 0,
553
                                                (char *)cat->name,
554
                                                desc ? (char *)desc->value : NULL,
555
                                                (char *)file->value,
556
                                                TRUE, doaudio, dovideo)) == NULL) {
557
                                        JANUS_LOG(LOG_ERR, "Error creating 'live' stream '%s'...\n", cat->name);
558
                                        cat = cat->next;
559
                                        continue;
560
                                }
561
                                mp->is_private = is_private;
562
                                if(secret && secret->value)
563
                                        mp->secret = g_strdup(secret->value);
564
                        } else if(!strcasecmp(type->value, "ondemand")) {
565
                                /* mu-Law file on demand source */
566
                                janus_config_item *id = janus_config_get_item(cat, "id");
567
                                janus_config_item *desc = janus_config_get_item(cat, "description");
568
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
569
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
570
                                janus_config_item *file = janus_config_get_item(cat, "filename");
571
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
572
                                janus_config_item *video = janus_config_get_item(cat, "video");
573
                                if(file == NULL || file->value == NULL) {
574
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', missing mandatory information...\n", cat->name);
575
                                        cat = cat->next;
576
                                        continue;
577
                                }
578
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
579
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
580
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
581
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
582
                                if(!doaudio || dovideo) {
583
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', we only support audio file streaming right now...\n", cat->name);
584
                                        cat = cat->next;
585
                                        continue;
586
                                }
587
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
588
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
589
                                        cat = cat->next;
590
                                        continue;
591
                                }
592
                                if(id == NULL || id->value == NULL) {
593
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
594
                                } else {
595
                                        janus_mutex_lock(&mountpoints_mutex);
596
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
597
                                        janus_mutex_unlock(&mountpoints_mutex);
598
                                        if(mp != NULL) {
599
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
600
                                                cat = cat->next;
601
                                                continue;
602
                                        }
603
                                }
604
                                janus_streaming_mountpoint *mp = NULL;
605
                                if((mp = janus_streaming_create_file_source(
606
                                                (id && id->value) ? atoi(id->value) : 0,
607
                                                (char *)cat->name,
608
                                                desc ? (char *)desc->value : NULL,
609
                                                (char *)file->value,
610
                                                FALSE, doaudio, dovideo)) == NULL) {
611
                                        JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream '%s'...\n", cat->name);
612
                                        cat = cat->next;
613
                                        continue;
614
                                }
615
                                mp->is_private = is_private;
616
                                if(secret && secret->value)
617
                                        mp->secret = g_strdup(secret->value);
618
                        } else if(!strcasecmp(type->value, "rtsp")) {
619
#ifndef HAVE_LIBCURL
620
                                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', libcurl support not compiled...\n", cat->name);
621
                                cat = cat->next;
622
                                continue;
623
#else
624
                                janus_config_item *id = janus_config_get_item(cat, "id");
625
                                janus_config_item *desc = janus_config_get_item(cat, "description");
626
                                janus_config_item *file = janus_config_get_item(cat, "url");
627
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
628
                                janus_config_item *video = janus_config_get_item(cat, "video");
629
                                if(file == NULL || file->value == NULL) {
630
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', missing mandatory information...\n", cat->name);
631
                                        cat = cat->next;
632
                                        continue;
633
                                }
634
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
635
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
636
                                if(id == NULL || id->value == NULL) {
637
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
638
                                } else {
639
                                        janus_mutex_lock(&mountpoints_mutex);
640
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
641
                                        janus_mutex_unlock(&mountpoints_mutex);
642
                                        if(mp != NULL) {
643
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
644
                                                cat = cat->next;
645
                                                continue;
646
                                        }
647
                                }
648
                                janus_streaming_mountpoint *mp = NULL;
649
                                if((mp = janus_streaming_create_rtsp_source(
650
                                                (id && id->value) ? atoi(id->value) : 0,
651
                                                (char *)cat->name,
652
                                                desc ? (char *)desc->value : NULL,
653
                                                (char *)file->value, doaudio, dovideo)) == NULL) {
654
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream '%s'...\n", cat->name);
655
                                        cat = cat->next;
656
                                        continue;
657
                                }
658
                                
659
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
660
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
661
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
662
                                mp->is_private = is_private;
663
                                if(secret && secret->value)
664
                                        mp->secret = g_strdup(secret->value);
665
#endif
666
                        } else {
667
                                JANUS_LOG(LOG_WARN, "Ignoring unknown stream type '%s' (%s)...\n", type->value, cat->name);
668
                        }
669
                        cat = cat->next;
670
                }
671
                /* Done */
672
                janus_config_destroy(config);
673
                config = NULL;
674
        }
675
        /* Show available mountpoints */
676
        janus_mutex_lock(&mountpoints_mutex);
677
        GHashTableIter iter;
678
        gpointer value;
679
        g_hash_table_iter_init(&iter, mountpoints);
680
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
681
                janus_streaming_mountpoint *mp = value;
682
                JANUS_LOG(LOG_VERB, "  ::: [%"SCNu64"][%s] %s (%s, %s, %s)\n", mp->id, mp->name, mp->description,
683
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
684
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source",
685
                        mp->is_private ? "private" : "public");
686
        }
687
        janus_mutex_unlock(&mountpoints_mutex);
688

    
689
        sessions = g_hash_table_new(NULL, NULL);
690
        janus_mutex_init(&sessions_mutex);
691
        messages = g_async_queue_new_full((GDestroyNotify) janus_streaming_message_free);
692
        /* This is the callback we'll need to invoke to contact the gateway */
693
        gateway = callback;
694
        g_atomic_int_set(&initialized, 1);
695

    
696
        GError *error = NULL;
697
        /* Start the sessions watchdog */
698
        watchdog = g_thread_try_new("streaming watchdog", &janus_streaming_watchdog, NULL, &error);
699
        if(!watchdog) {
700
                g_atomic_int_set(&initialized, 0);
701
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming watchdog thread...\n", error->code, error->message ? error->message : "??");
702
                return -1;
703
        }
704
        /* Launch the thread that will handle incoming messages */
705
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
706
        if(error != NULL) {
707
                g_atomic_int_set(&initialized, 0);
708
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming handler thread...\n", error->code, error->message ? error->message : "??");
709
                return -1;
710
        }
711
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_STREAMING_NAME);
712
        return 0;
713
}
714

    
715
void janus_streaming_destroy(void) {
716
        if(!g_atomic_int_get(&initialized))
717
                return;
718
        g_atomic_int_set(&stopping, 1);
719

    
720
        if(handler_thread != NULL) {
721
                g_thread_join(handler_thread);
722
                handler_thread = NULL;
723
        }
724
        if(watchdog != NULL) {
725
                g_thread_join(watchdog);
726
                watchdog = NULL;
727
        }
728

    
729
        /* FIXME We should destroy the sessions cleanly */
730
        usleep(500000);
731
        janus_mutex_lock(&mountpoints_mutex);
732
        g_hash_table_destroy(mountpoints);
733
        janus_mutex_unlock(&mountpoints_mutex);
734
        janus_mutex_lock(&sessions_mutex);
735
        g_hash_table_destroy(sessions);
736
        janus_mutex_unlock(&sessions_mutex);
737
        g_async_queue_unref(messages);
738
        messages = NULL;
739
        sessions = NULL;
740

    
741
        g_atomic_int_set(&initialized, 0);
742
        g_atomic_int_set(&stopping, 0);
743
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_STREAMING_NAME);
744
}
745

    
746
int janus_streaming_get_api_compatibility(void) {
747
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
748
        return JANUS_PLUGIN_API_VERSION;
749
}
750

    
751
int janus_streaming_get_version(void) {
752
        return JANUS_STREAMING_VERSION;
753
}
754

    
755
const char *janus_streaming_get_version_string(void) {
756
        return JANUS_STREAMING_VERSION_STRING;
757
}
758

    
759
const char *janus_streaming_get_description(void) {
760
        return JANUS_STREAMING_DESCRIPTION;
761
}
762

    
763
const char *janus_streaming_get_name(void) {
764
        return JANUS_STREAMING_NAME;
765
}
766

    
767
const char *janus_streaming_get_author(void) {
768
        return JANUS_STREAMING_AUTHOR;
769
}
770

    
771
const char *janus_streaming_get_package(void) {
772
        return JANUS_STREAMING_PACKAGE;
773
}
774

    
775
void janus_streaming_create_session(janus_plugin_session *handle, int *error) {
776
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
777
                *error = -1;
778
                return;
779
        }        
780
        janus_streaming_session *session = (janus_streaming_session *)calloc(1, sizeof(janus_streaming_session));
781
        if(session == NULL) {
782
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
783
                *error = -2;
784
                return;
785
        }
786
        session->handle = handle;
787
        session->mountpoint = NULL;        /* This will happen later */
788
        session->started = FALSE;        /* This will happen later */
789
        session->paused = FALSE;
790
        session->destroyed = 0;
791
        handle->plugin_handle = session;
792
        janus_mutex_lock(&sessions_mutex);
793
        g_hash_table_insert(sessions, handle, session);
794
        janus_mutex_unlock(&sessions_mutex);
795

    
796
        return;
797
}
798

    
799
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error) {
800
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
801
                *error = -1;
802
                return;
803
        }        
804
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
805
        if(!session) {
806
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
807
                *error = -2;
808
                return;
809
        }
810
        JANUS_LOG(LOG_VERB, "Removing streaming session...\n");
811
        if(session->mountpoint) {
812
                janus_mutex_lock(&session->mountpoint->mutex);
813
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
814
                janus_mutex_unlock(&session->mountpoint->mutex);
815
        }
816
        janus_mutex_lock(&sessions_mutex);
817
        if(!session->destroyed) {
818
                session->destroyed = janus_get_monotonic_time();
819
                g_hash_table_remove(sessions, handle);
820
                /* Cleaning up and removing the session is done in a lazy way */
821
                old_sessions = g_list_append(old_sessions, session);
822
        }
823
        janus_mutex_unlock(&sessions_mutex);
824
        return;
825
}
826

    
827
char *janus_streaming_query_session(janus_plugin_session *handle) {
828
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
829
                return NULL;
830
        }        
831
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
832
        if(!session) {
833
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
834
                return NULL;
835
        }
836
        /* What is this user watching, if anything? */
837
        json_t *info = json_object();
838
        json_object_set_new(info, "state", json_string(session->mountpoint ? "watching" : "idle"));
839
        if(session->mountpoint) {
840
                json_object_set_new(info, "mountpoint_id", json_integer(session->mountpoint->id));
841
                json_object_set_new(info, "mountpoint_name", session->mountpoint->name ? json_string(session->mountpoint->name) : NULL);
842
        }
843
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
844
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
845
        json_decref(info);
846
        return info_text;
847
}
848

    
849
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
850
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
851
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
852

    
853
        /* Pre-parse the message */
854
        int error_code = 0;
855
        char error_cause[512];
856
        json_t *root = NULL;
857
        json_t *response = NULL;
858

    
859
        if(message == NULL) {
860
                JANUS_LOG(LOG_ERR, "No message??\n");
861
                error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
862
                g_snprintf(error_cause, 512, "%s", "No message??");
863
                goto error;
864
        }
865
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
866

    
867
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
868
        if(!session) {
869
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
870
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
871
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
872
                goto error;
873
        }
874
        if(session->destroyed) {
875
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
876
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
877
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
878
                goto error;
879
        }
880
        json_error_t error;
881
        root = json_loads(message, 0, &error);
882
        if(!root) {
883
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
884
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
885
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
886
                goto error;
887
        }
888
        if(!json_is_object(root)) {
889
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
890
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
891
                g_snprintf(error_cause, 512, "JSON error: not an object");
892
                goto error;
893
        }
894
        /* Get the request first */
895
        json_t *request = json_object_get(root, "request");
896
        if(!request) {
897
                JANUS_LOG(LOG_ERR, "Missing element (request)\n");
898
                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
899
                g_snprintf(error_cause, 512, "Missing element (request)");
900
                goto error;
901
        }
902
        if(!json_is_string(request)) {
903
                JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
904
                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
905
                g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
906
                goto error;
907
        }
908
        /* Some requests ('create' and 'destroy') can be handled synchronously */
909
        const char *request_text = json_string_value(request);
910
        if(!strcasecmp(request_text, "list")) {
911
                json_t *list = json_array();
912
                JANUS_LOG(LOG_VERB, "Request for the list of mountpoints\n");
913
                /* Return a list of all available mountpoints */
914
                janus_mutex_lock(&mountpoints_mutex);
915
                GHashTableIter iter;
916
                gpointer value;
917
                g_hash_table_iter_init(&iter, mountpoints);
918
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
919
                        janus_streaming_mountpoint *mp = value;
920
                        if(mp->is_private) {
921
                                /* Skip private stream */
922
                                JANUS_LOG(LOG_VERB, "Skipping private mountpoint '%s'\n", mp->description);
923
                                continue;
924
                        }
925
                        json_t *ml = json_object();
926
                        json_object_set_new(ml, "id", json_integer(mp->id));
927
                        json_object_set_new(ml, "description", json_string(mp->description));
928
                        json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
929
                        json_array_append_new(list, ml);
930
                }
931
                janus_mutex_unlock(&mountpoints_mutex);
932
                /* Send info back */
933
                response = json_object();
934
                json_object_set_new(response, "streaming", json_string("list"));
935
                json_object_set_new(response, "list", list);
936
                goto plugin_response;
937
        } else if(!strcasecmp(request_text, "create")) {
938
                /* Create a new stream */
939
                json_t *type = json_object_get(root, "type");
940
                if(!type) {
941
                        JANUS_LOG(LOG_ERR, "Missing element (type)\n");
942
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
943
                        g_snprintf(error_cause, 512, "Missing element (type)");
944
                        goto error;
945
                }
946
                if(!json_is_string(type)) {
947
                        JANUS_LOG(LOG_ERR, "Invalid element (type should be a string)\n");
948
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
949
                        g_snprintf(error_cause, 512, "Invalid element (type should be a string)");
950
                        goto error;
951
                }
952
                const char *type_text = json_string_value(type);
953
                json_t *secret = json_object_get(root, "secret");
954
                if(secret && !json_is_string(secret)) {
955
                        JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
956
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
957
                        g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
958
                        goto error;
959
                }
960
                janus_streaming_mountpoint *mp = NULL;
961
                if(!strcasecmp(type_text, "rtp")) {
962
                        /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
963
                        json_t *id = json_object_get(root, "id");
964
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
965
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
966
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
967
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
968
                                goto error;
969
                        }
970
                        json_t *name = json_object_get(root, "name");
971
                        if(name && !json_is_string(name)) {
972
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
973
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
974
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
975
                                goto error;
976
                        }
977
                        json_t *desc = json_object_get(root, "description");
978
                        if(desc && !json_is_string(desc)) {
979
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
980
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
981
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
982
                                goto error;
983
                        }
984
                        json_t *is_private = json_object_get(root, "is_private");
985
                        if(is_private && !json_is_boolean(is_private)) {
986
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
987
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
988
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
989
                                goto error;
990
                        }
991
                        json_t *audio = json_object_get(root, "audio");
992
                        if(audio && !json_is_boolean(audio)) {
993
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
994
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
995
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
996
                                goto error;
997
                        }
998
                        json_t *video = json_object_get(root, "video");
999
                        if(video && !json_is_boolean(video)) {
1000
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1001
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1002
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1003
                                goto error;
1004
                        }
1005
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1006
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1007
                        if(!doaudio && !dovideo) {
1008
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1009
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1010
                                g_snprintf(error_cause, 512, "Can't add 'rtp' stream, no audio or video have to be streamed...");
1011
                                goto error;
1012
                        }
1013
                        uint16_t aport = 0;
1014
                        uint8_t acodec = 0;
1015
                        char *artpmap = NULL, *afmtp = NULL, *amcast = NULL;
1016
                        if(doaudio) {
1017
                                json_t *audiomcast = json_object_get(root, "audiomcast");
1018
                                if(audiomcast && !json_is_string(audiomcast)) {
1019
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiomcast should be a string)\n");
1020
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1021
                                        g_snprintf(error_cause, 512, "Invalid element (audiomcast should be a string)");
1022
                                        goto error;
1023
                                }
1024
                                amcast = (char *)json_string_value(audiomcast);                                
1025
                                json_t *audioport = json_object_get(root, "audioport");
1026
                                if(!audioport) {
1027
                                        JANUS_LOG(LOG_ERR, "Missing element (audioport)\n");
1028
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1029
                                        g_snprintf(error_cause, 512, "Missing element (audioport)");
1030
                                        goto error;
1031
                                }
1032
                                if(!json_is_integer(audioport) || json_integer_value(audioport) < 0) {
1033
                                        JANUS_LOG(LOG_ERR, "Invalid element (audioport should be a positive integer)\n");
1034
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1035
                                        g_snprintf(error_cause, 512, "Invalid element (audioport should be a positive integer)");
1036
                                        goto error;
1037
                                }
1038
                                aport = json_integer_value(audioport);
1039
                                json_t *audiopt = json_object_get(root, "audiopt");
1040
                                if(!audiopt) {
1041
                                        JANUS_LOG(LOG_ERR, "Missing element (audiopt)\n");
1042
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1043
                                        g_snprintf(error_cause, 512, "Missing element (audiopt)");
1044
                                        goto error;
1045
                                }
1046
                                if(!json_is_integer(audiopt) || json_integer_value(audiopt) < 0) {
1047
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiopt should be a positive integer)\n");
1048
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1049
                                        g_snprintf(error_cause, 512, "Invalid element (audiopt should be a positive integer)");
1050
                                        goto error;
1051
                                }
1052
                                acodec = json_integer_value(audiopt);
1053
                                json_t *audiortpmap = json_object_get(root, "audiortpmap");
1054
                                if(!audiortpmap) {
1055
                                        JANUS_LOG(LOG_ERR, "Missing element (audiortpmap)\n");
1056
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1057
                                        g_snprintf(error_cause, 512, "Missing element (audiortpmap)");
1058
                                        goto error;
1059
                                }
1060
                                if(!json_is_string(audiortpmap)) {
1061
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiortpmap should be a string)\n");
1062
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1063
                                        g_snprintf(error_cause, 512, "Invalid element (audiortpmap should be a string)");
1064
                                        goto error;
1065
                                }
1066
                                artpmap = (char *)json_string_value(audiortpmap);
1067
                                json_t *audiofmtp = json_object_get(root, "audiofmtp");
1068
                                if(audiofmtp && !json_is_string(audiofmtp)) {
1069
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiofmtp should be a string)\n");
1070
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1071
                                        g_snprintf(error_cause, 512, "Invalid element (audiofmtp should be a string)");
1072
                                        goto error;
1073
                                }
1074
                                afmtp = (char *)json_string_value(audiofmtp);
1075
                        }
1076
                        uint16_t vport = 0;
1077
                        uint8_t vcodec = 0;
1078
                        char *vrtpmap = NULL, *vfmtp = NULL, *vmcast = NULL;
1079
                        if(dovideo) {
1080
                                json_t *videomcast = json_object_get(root, "videomcast");
1081
                                if(videomcast && !json_is_string(videomcast)) {
1082
                                        JANUS_LOG(LOG_ERR, "Invalid element (videomcast should be a string)\n");
1083
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1084
                                        g_snprintf(error_cause, 512, "Invalid element (videomcast should be a string)");
1085
                                        goto error;
1086
                                }
1087
                                vmcast = (char *)json_string_value(videomcast);
1088
                                json_t *videoport = json_object_get(root, "videoport");
1089
                                if(!videoport) {
1090
                                        JANUS_LOG(LOG_ERR, "Missing element (videoport)\n");
1091
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1092
                                        g_snprintf(error_cause, 512, "Missing element (videoport)");
1093
                                        goto error;
1094
                                }
1095
                                if(!json_is_integer(videoport) || json_integer_value(videoport) < 0) {
1096
                                        JANUS_LOG(LOG_ERR, "Invalid element (videoport should be a positive integer)\n");
1097
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1098
                                        g_snprintf(error_cause, 512, "Invalid element (videoport should be a positive integer)");
1099
                                        goto error;
1100
                                }
1101
                                vport = json_integer_value(videoport);
1102
                                json_t *videopt = json_object_get(root, "videopt");
1103
                                if(!videopt) {
1104
                                        JANUS_LOG(LOG_ERR, "Missing element (videopt)\n");
1105
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1106
                                        g_snprintf(error_cause, 512, "Missing element (videopt)");
1107
                                        goto error;
1108
                                }
1109
                                if(!json_is_integer(videopt) || json_integer_value(videopt) < 0) {
1110
                                        JANUS_LOG(LOG_ERR, "Invalid element (videopt should be a positive integer)\n");
1111
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1112
                                        g_snprintf(error_cause, 512, "Invalid element (videopt should be a positive integer)");
1113
                                        goto error;
1114
                                }
1115
                                vcodec = json_integer_value(videopt);
1116
                                json_t *videortpmap = json_object_get(root, "videortpmap");
1117
                                if(!videortpmap) {
1118
                                        JANUS_LOG(LOG_ERR, "Missing element (videortpmap)\n");
1119
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1120
                                        g_snprintf(error_cause, 512, "Missing element (videortpmap)");
1121
                                        goto error;
1122
                                }
1123
                                if(!json_is_string(videortpmap)) {
1124
                                        JANUS_LOG(LOG_ERR, "Invalid element (videortpmap should be a string)\n");
1125
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1126
                                        g_snprintf(error_cause, 512, "Invalid element (videortpmap should be a string)");
1127
                                        goto error;
1128
                                }
1129
                                vrtpmap = (char *)json_string_value(videortpmap);
1130
                                json_t *videofmtp = json_object_get(root, "videofmtp");
1131
                                if(videofmtp && !json_is_string(videofmtp)) {
1132
                                        JANUS_LOG(LOG_ERR, "Invalid element (videofmtp should be a string)\n");
1133
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1134
                                        g_snprintf(error_cause, 512, "Invalid element (videofmtp should be a string)");
1135
                                        goto error;
1136
                                }
1137
                                vfmtp = (char *)json_string_value(videofmtp);
1138
                        }
1139
                        if(id == NULL) {
1140
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1141
                        } else {
1142
                                janus_mutex_lock(&mountpoints_mutex);
1143
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1144
                                janus_mutex_unlock(&mountpoints_mutex);
1145
                                if(mp != NULL) {
1146
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1147
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1148
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1149
                                        goto error;
1150
                                }
1151
                        }
1152
                        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
1153
                        mp = janus_streaming_create_rtp_source(
1154
                                        id ? json_integer_value(id) : 0,
1155
                                        name ? (char *)json_string_value(name) : NULL,
1156
                                        desc ? (char *)json_string_value(desc) : NULL,
1157
                                        doaudio, amcast, aport, acodec, artpmap, afmtp,
1158
                                        dovideo, vmcast, vport, vcodec, vrtpmap, vfmtp);
1159
                        if(mp == NULL) {
1160
                                JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream...\n");
1161
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1162
                                g_snprintf(error_cause, 512, "Error creating 'rtp' stream");
1163
                                goto error;
1164
                        }
1165
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1166
                } else if(!strcasecmp(type_text, "live")) {
1167
                        /* File live source */
1168
                        json_t *id = json_object_get(root, "id");
1169
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1170
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1171
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1172
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1173
                                goto error;
1174
                        }
1175
                        json_t *name = json_object_get(root, "name");
1176
                        if(name && !json_is_string(name)) {
1177
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1178
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1179
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1180
                                goto error;
1181
                        }
1182
                        json_t *desc = json_object_get(root, "description");
1183
                        if(desc && !json_is_string(desc)) {
1184
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1185
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1186
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1187
                                goto error;
1188
                        }
1189
                        json_t *is_private = json_object_get(root, "is_private");
1190
                        if(is_private && !json_is_boolean(is_private)) {
1191
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1192
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1193
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1194
                                goto error;
1195
                        }
1196
                        json_t *file = json_object_get(root, "file");
1197
                        if(file && !json_is_string(file)) {
1198
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1199
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1200
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1201
                                goto error;
1202
                        }
1203
                        json_t *audio = json_object_get(root, "audio");
1204
                        if(audio && !json_is_boolean(audio)) {
1205
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1206
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1207
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1208
                                goto error;
1209
                        }
1210
                        json_t *video = json_object_get(root, "video");
1211
                        if(video && !json_is_boolean(video)) {
1212
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1213
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1214
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1215
                                goto error;
1216
                        }
1217
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1218
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1219
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1220
                        if(!doaudio || dovideo) {
1221
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n");
1222
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1223
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, we only support audio file streaming right now...");
1224
                                goto error;
1225
                        }
1226
                        char *filename = (char *)json_string_value(file);
1227
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1228
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1229
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1230
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1231
                                goto error;
1232
                        }
1233
                        if(id == NULL) {
1234
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1235
                        } else {
1236
                                janus_mutex_lock(&mountpoints_mutex);
1237
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1238
                                janus_mutex_unlock(&mountpoints_mutex);
1239
                                if(mp != NULL) {
1240
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1241
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1242
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1243
                                        goto error;
1244
                                }
1245
                        }
1246
                        mp = janus_streaming_create_file_source(
1247
                                        id ? json_integer_value(id) : 0,
1248
                                        name ? (char *)json_string_value(name) : NULL,
1249
                                        desc ? (char *)json_string_value(desc) : NULL,
1250
                                        filename,
1251
                                        TRUE, doaudio, dovideo);
1252
                        if(mp == NULL) {
1253
                                JANUS_LOG(LOG_ERR, "Error creating 'live' stream...\n");
1254
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1255
                                g_snprintf(error_cause, 512, "Error creating 'live' stream");
1256
                                goto error;
1257
                        }
1258
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1259
                } else if(!strcasecmp(type_text, "ondemand")) {
1260
                        /* mu-Law file on demand source */
1261
                        json_t *id = json_object_get(root, "id");
1262
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1263
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1264
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1265
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1266
                                goto error;
1267
                        }
1268
                        json_t *name = json_object_get(root, "name");
1269
                        if(name && !json_is_string(name)) {
1270
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1271
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1272
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1273
                                goto error;
1274
                        }
1275
                        json_t *desc = json_object_get(root, "description");
1276
                        if(desc && !json_is_string(desc)) {
1277
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1278
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1279
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1280
                                goto error;
1281
                        }
1282
                        json_t *is_private = json_object_get(root, "is_private");
1283
                        if(is_private && !json_is_boolean(is_private)) {
1284
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1285
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1286
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1287
                                goto error;
1288
                        }
1289
                        json_t *file = json_object_get(root, "file");
1290
                        if(file && !json_is_string(file)) {
1291
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1292
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1293
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1294
                                goto error;
1295
                        }
1296
                        json_t *audio = json_object_get(root, "audio");
1297
                        if(audio && !json_is_boolean(audio)) {
1298
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1299
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1300
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1301
                                goto error;
1302
                        }
1303
                        json_t *video = json_object_get(root, "video");
1304
                        if(video && !json_is_boolean(video)) {
1305
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1306
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1307
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1308
                                goto error;
1309
                        }
1310
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1311
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1312
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1313
                        if(!doaudio || dovideo) {
1314
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
1315
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1316
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, we only support audio file streaming right now...");
1317
                                goto error;
1318
                        }
1319
                        char *filename = (char *)json_string_value(file);
1320
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1321
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1322
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1323
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1324
                                goto error;
1325
                        }
1326
                        if(id == NULL) {
1327
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1328
                        } else {
1329
                                janus_mutex_lock(&mountpoints_mutex);
1330
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1331
                                janus_mutex_unlock(&mountpoints_mutex);
1332
                                if(mp != NULL) {
1333
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1334
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1335
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1336
                                        goto error;
1337
                                }
1338
                        }
1339
                        mp = janus_streaming_create_file_source(
1340
                                        id ? json_integer_value(id) : 0,
1341
                                        name ? (char *)json_string_value(name) : NULL,
1342
                                        desc ? (char *)json_string_value(desc) : NULL,
1343
                                        filename,
1344
                                        FALSE, doaudio, dovideo);
1345
                        if(mp == NULL) {
1346
                                JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream...\n");
1347
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1348
                                g_snprintf(error_cause, 512, "Error creating 'ondemand' stream");
1349
                                goto error;
1350
                        }
1351
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1352
                } else if(!strcasecmp(type_text, "rtsp")) {
1353
#ifndef HAVE_LIBCURL
1354
                        JANUS_LOG(LOG_ERR, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1355
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1356
                        g_snprintf(error_cause, 512, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1357
                        goto error;
1358
#else
1359
                        /* RTSP source*/
1360
                        json_t *id = json_object_get(root, "id");
1361
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1362
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1363
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1364
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1365
                                goto error;
1366
                        }
1367
                        json_t *name = json_object_get(root, "name");
1368
                        if(name && !json_is_string(name)) {
1369
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1370
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1371
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1372
                                goto error;
1373
                        }
1374
                        json_t *desc = json_object_get(root, "description");
1375
                        if(desc && !json_is_string(desc)) {
1376
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1377
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1378
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1379
                                goto error;
1380
                        }
1381
                        json_t *is_private = json_object_get(root, "is_private");
1382
                        if(is_private && !json_is_boolean(is_private)) {
1383
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1384
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1385
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1386
                                goto error;
1387
                        }
1388
                        json_t *audio = json_object_get(root, "audio");
1389
                        if(audio && !json_is_boolean(audio)) {
1390
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1391
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1392
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1393
                                goto error;
1394
                        }
1395
                        json_t *video = json_object_get(root, "video");
1396
                        if(video && !json_is_boolean(video)) {
1397
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1398
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1399
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1400
                                goto error;
1401
                        }
1402
                        json_t *url = json_object_get(root, "url");
1403
                        if(url && !json_is_string(url)) {
1404
                                JANUS_LOG(LOG_ERR, "Invalid element (url should be a string)\n");
1405
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1406
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1407
                                goto error;
1408
                        }                        
1409
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1410
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1411
                        if(!doaudio && !dovideo) {
1412
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1413
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1414
                                g_snprintf(error_cause, 512, "Can't add 'rtsp' stream, no audio or video have to be streamed...");
1415
                                goto error;
1416
                        }
1417
                        mp = janus_streaming_create_rtsp_source(
1418
                                        id ? json_integer_value(id) : 0,
1419
                                        name ? (char *)json_string_value(name) : NULL,
1420
                                        desc ? (char *)json_string_value(desc) : NULL,
1421
                                        (char *)json_string_value(url),
1422
                                        doaudio, dovideo);
1423
                        if(mp == NULL) {
1424
                                JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream...\n");
1425
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1426
                                g_snprintf(error_cause, 512, "Error creating 'RTSP' stream");
1427
                                goto error;
1428
                        }
1429
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;                        
1430
#endif
1431
                } else {
1432
                        JANUS_LOG(LOG_ERR, "Unknown stream type '%s'...\n", type_text);
1433
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1434
                        g_snprintf(error_cause, 512, "Unknown stream type '%s'...\n", type_text);
1435
                        goto error;
1436
                }
1437
                /* Any secret? */
1438
                if(secret)
1439
                        mp->secret = g_strdup(json_string_value(secret));
1440
                /* Send info back */
1441
                response = json_object();
1442
                json_object_set_new(response, "streaming", json_string("created"));
1443
                json_object_set_new(response, "created", json_string(mp->name));
1444
                json_t *ml = json_object();
1445
                json_object_set_new(ml, "id", json_integer(mp->id));
1446
                json_object_set_new(ml, "description", json_string(mp->description));
1447
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1448
                json_object_set_new(ml, "is_private", json_string(mp->is_private ? "true" : "false"));
1449
                json_object_set_new(response, "stream", ml);
1450
                goto plugin_response;
1451
        } else if(!strcasecmp(request_text, "destroy")) {
1452
                /* Get rid of an existing stream (notice this doesn't remove it from the config file, though) */
1453
                json_t *id = json_object_get(root, "id");
1454
                if(!id) {
1455
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1456
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1457
                        g_snprintf(error_cause, 512, "Missing element (id)");
1458
                        goto error;
1459
                }
1460
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1461
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1462
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1463
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1464
                        goto error;
1465
                }
1466
                gint64 id_value = json_integer_value(id);
1467
                janus_mutex_lock(&mountpoints_mutex);
1468
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1469
                if(mp == NULL) {
1470
                        janus_mutex_unlock(&mountpoints_mutex);
1471
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1472
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1473
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1474
                        goto error;
1475
                }
1476
                if(mp->secret) {
1477
                        /* This action requires an authorized user */
1478
                        json_t *secret = json_object_get(root, "secret");
1479
                        if(!secret) {
1480
                                janus_mutex_unlock(&mountpoints_mutex);
1481
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1482
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1483
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1484
                                goto error;
1485
                        }
1486
                        if(!json_is_string(secret)) {
1487
                                janus_mutex_unlock(&mountpoints_mutex);
1488
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1489
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1490
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1491
                                goto error;
1492
                        }
1493
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1494
                                janus_mutex_unlock(&mountpoints_mutex);
1495
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1496
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1497
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1498
                                goto error;
1499
                        }
1500
                }
1501
                JANUS_LOG(LOG_VERB, "Request to unmount mountpoint/stream %"SCNu64"\n", id_value);
1502
                /* FIXME Should we kick the current viewers as well? */
1503
                janus_mutex_lock(&mp->mutex);
1504
                GList *viewer = g_list_first(mp->listeners);
1505
                /* Prepare JSON event */
1506
                json_t *event = json_object();
1507
                json_object_set_new(event, "streaming", json_string("event"));
1508
                json_t *result = json_object();
1509
                json_object_set_new(result, "status", json_string("stopped"));
1510
                json_object_set_new(event, "result", result);
1511
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1512
                json_decref(event);
1513
                while(viewer) {
1514
                        janus_streaming_session *session = (janus_streaming_session *)viewer->data;
1515
                        if(session != NULL) {
1516
                                session->stopping = TRUE;
1517
                                session->started = FALSE;
1518
                                session->paused = FALSE;
1519
                                session->mountpoint = NULL;
1520
                                /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1521
                                gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1522
                                gateway->close_pc(session->handle);
1523
                        }
1524
                        mp->listeners = g_list_remove_all(mp->listeners, session);
1525
                        viewer = g_list_first(mp->listeners);
1526
                }
1527
                g_free(event_text);
1528
                janus_mutex_unlock(&mp->mutex);
1529
                /* Remove mountpoint from the hashtable: this will get it destroyed */
1530
                g_hash_table_remove(mountpoints, GINT_TO_POINTER(id_value));
1531
                janus_mutex_unlock(&mountpoints_mutex);
1532
                /* Send info back */
1533
                response = json_object();
1534
                json_object_set_new(response, "streaming", json_string("destroyed"));
1535
                json_object_set_new(response, "destroyed", json_integer(id_value));
1536
                goto plugin_response;
1537
        } else if(!strcasecmp(request_text, "recording")) {
1538
                /* We can start/stop recording a live, RTP-based stream */
1539
                json_t *action = json_object_get(root, "action");
1540
                if(!action) {
1541
                        JANUS_LOG(LOG_ERR, "Missing element (action)\n");
1542
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1543
                        g_snprintf(error_cause, 512, "Missing element (action)");
1544
                        goto error;
1545
                }
1546
                if(!json_is_string(action)) {
1547
                        JANUS_LOG(LOG_ERR, "Invalid element (action should be a string)\n");
1548
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1549
                        g_snprintf(error_cause, 512, "Invalid element (action should be a string)");
1550
                        goto error;
1551
                }
1552
                const char *action_text = json_string_value(action);
1553
                if(strcasecmp(action_text, "start") && strcasecmp(action_text, "stop")) {
1554
                        JANUS_LOG(LOG_ERR, "Invalid action (should be start|stop)\n");
1555
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1556
                        g_snprintf(error_cause, 512, "Invalid action (should be start|stop)");
1557
                        goto error;
1558
                }
1559
                json_t *id = json_object_get(root, "id");
1560
                if(!id) {
1561
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1562
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1563
                        g_snprintf(error_cause, 512, "Missing element (id)");
1564
                        goto error;
1565
                }
1566
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1567
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1568
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1569
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1570
                        goto error;
1571
                }
1572
                gint64 id_value = json_integer_value(id);
1573
                janus_mutex_lock(&mountpoints_mutex);
1574
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1575
                if(mp == NULL) {
1576
                        janus_mutex_unlock(&mountpoints_mutex);
1577
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1578
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1579
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1580
                        goto error;
1581
                }
1582
                if(mp->streaming_type != janus_streaming_type_live || mp->streaming_source != janus_streaming_source_rtp) {
1583
                        janus_mutex_unlock(&mountpoints_mutex);
1584
                        JANUS_LOG(LOG_ERR, "Recording is only available on RTP-based live streams\n");
1585
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1586
                        g_snprintf(error_cause, 512, "Recording is only available on RTP-based live streams");
1587
                        goto error;
1588
                }
1589
                if(mp->secret) {
1590
                        /* This action requires an authorized user */
1591
                        json_t *secret = json_object_get(root, "secret");
1592
                        if(!secret) {
1593
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1594
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1595
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1596
                                goto error;
1597
                        }
1598
                        if(!json_is_string(secret)) {
1599
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1600
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1601
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1602
                                goto error;
1603
                        }
1604
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1605
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1606
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1607
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1608
                                goto error;
1609
                        }
1610
                }
1611
                janus_streaming_rtp_source *source = mp->source;
1612
                if(!strcasecmp(action_text, "start")) {
1613
                        /* Start a recording for audio and/or video */
1614
                        json_t *audio = json_object_get(root, "audio");
1615
                        if(audio && !json_is_string(audio)) {
1616
                                janus_mutex_unlock(&mountpoints_mutex);
1617
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a string)\n");
1618
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1619
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a string)");
1620
                                goto error;
1621
                        }
1622
                        json_t *video = json_object_get(root, "video");
1623
                        if(video && !json_is_string(video)) {
1624
                                janus_mutex_unlock(&mountpoints_mutex);
1625
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a string)\n");
1626
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1627
                                g_snprintf(error_cause, 512, "Invalid value (video should be a string)");
1628
                                goto error;
1629
                        }
1630
                        if((audio && source->arc) || (video && source->vrc)) {
1631
                                janus_mutex_unlock(&mountpoints_mutex);
1632
                                JANUS_LOG(LOG_ERR, "Recording for audio and/or video already started for this stream\n");
1633
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1634
                                g_snprintf(error_cause, 512, "Recording for audio and/or video already started for this stream");
1635
                                goto error;
1636
                        }
1637
                        if(!audio && !video) {
1638
                                janus_mutex_unlock(&mountpoints_mutex);
1639
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1640
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1641
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1642
                                goto error;
1643
                        }
1644
                        if(audio) {
1645
                                const char *audiofile = json_string_value(audio);
1646
                                source->arc = janus_recorder_create(NULL, 0, (char *)audiofile);
1647
                                if(source->arc == NULL) {
1648
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for audio\n");
1649
                                } else {
1650
                                        JANUS_LOG(LOG_INFO, "[%s] Audio recording started\n", mp->name);
1651
                                }
1652
                        }
1653
                        if(video) {
1654
                                const char *videofile = json_string_value(video);
1655
                                source->vrc = janus_recorder_create(NULL, 1, (char *)videofile);
1656
                                if(source->vrc == NULL) {
1657
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for video\n");
1658
                                } else {
1659
                                        JANUS_LOG(LOG_INFO, "[%s] Video recording started\n", mp->name);
1660
                                }
1661
                        }
1662
                        janus_mutex_unlock(&mountpoints_mutex);
1663
                        /* Send a success response back */
1664
                        response = json_object();
1665
                        json_object_set_new(response, "streaming", json_string("ok"));
1666
                        goto plugin_response;
1667
                } else if(!strcasecmp(action_text, "stop")) {
1668
                        /* Stop the recording */
1669
                        json_t *audio = json_object_get(root, "audio");
1670
                        if(audio && !json_is_boolean(audio)) {
1671
                                janus_mutex_unlock(&mountpoints_mutex);
1672
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1673
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1674
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1675
                                goto error;
1676
                        }
1677
                        json_t *video = json_object_get(root, "video");
1678
                        if(video && !json_is_boolean(video)) {
1679
                                janus_mutex_unlock(&mountpoints_mutex);
1680
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1681
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1682
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1683
                                goto error;
1684
                        }
1685
                        if(!audio && !video) {
1686
                                janus_mutex_unlock(&mountpoints_mutex);
1687
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1688
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1689
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1690
                                goto error;
1691
                        }
1692
                        if(audio && json_is_true(audio) && source->arc) {
1693
                                /* Close the audio recording */
1694
                                janus_recorder_close(source->arc);
1695
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1696
                                janus_recorder *tmp = source->arc;
1697
                                source->arc = NULL;
1698
                                janus_recorder_free(tmp);
1699
                        }
1700
                        if(video && json_is_true(video) && source->vrc) {
1701
                                /* Close the video recording */
1702
                                janus_recorder_close(source->vrc);
1703
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1704
                                janus_recorder *tmp = source->vrc;
1705
                                source->vrc = NULL;
1706
                                janus_recorder_free(tmp);
1707
                        }
1708
                        janus_mutex_unlock(&mountpoints_mutex);
1709
                        /* Send a success response back */
1710
                        response = json_object();
1711
                        json_object_set_new(response, "streaming", json_string("ok"));
1712
                        goto plugin_response;
1713
                }
1714
        } else if(!strcasecmp(request_text, "enable") || !strcasecmp(request_text, "disable")) {
1715
                /* A request to enable/disable a mountpoint */
1716
                json_t *id = json_object_get(root, "id");
1717
                if(!id) {
1718
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1719
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1720
                        g_snprintf(error_cause, 512, "Missing element (id)");
1721
                        goto error;
1722
                }
1723
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1724
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1725
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1726
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1727
                        goto error;
1728
                }
1729
                gint64 id_value = json_integer_value(id);
1730
                janus_mutex_lock(&mountpoints_mutex);
1731
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1732
                if(mp == NULL) {
1733
                        janus_mutex_unlock(&mountpoints_mutex);
1734
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1735
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1736
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1737
                        goto error;
1738
                }
1739
                if(mp->secret) {
1740
                        /* This action requires an authorized user */
1741
                        json_t *secret = json_object_get(root, "secret");
1742
                        if(!secret) {
1743
                                janus_mutex_unlock(&mountpoints_mutex);
1744
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1745
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1746
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1747
                                goto error;
1748
                        }
1749
                        if(!json_is_string(secret)) {
1750
                                janus_mutex_unlock(&mountpoints_mutex);
1751
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1752
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1753
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1754
                                goto error;
1755
                        }
1756
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1757
                                janus_mutex_unlock(&mountpoints_mutex);
1758
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1759
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1760
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1761
                                goto error;
1762
                        }
1763
                }
1764
                if(!strcasecmp(request_text, "enable")) {
1765
                        /* Enable a previously disabled mountpoint */
1766
                        JANUS_LOG(LOG_INFO, "[%s] Stream enabled\n", mp->name);
1767
                        mp->enabled = TRUE;
1768
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1769
                } else {
1770
                        /* Disable a previously enabled mountpoint */
1771
                        JANUS_LOG(LOG_INFO, "[%s] Stream disabled\n", mp->name);
1772
                        mp->enabled = FALSE;
1773
                        /* Any recording to close? */
1774
                        janus_streaming_rtp_source *source = mp->source;
1775
                        if(source->arc) {
1776
                                janus_recorder_close(source->arc);
1777
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1778
                                janus_recorder *tmp = source->arc;
1779
                                source->arc = NULL;
1780
                                janus_recorder_free(tmp);
1781
                        }
1782
                        if(source->vrc) {
1783
                                janus_recorder_close(source->vrc);
1784
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1785
                                janus_recorder *tmp = source->vrc;
1786
                                source->vrc = NULL;
1787
                                janus_recorder_free(tmp);
1788
                        }
1789
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1790
                }
1791
                janus_mutex_unlock(&mountpoints_mutex);
1792
                /* Send a success response back */
1793
                response = json_object();
1794
                json_object_set_new(response, "streaming", json_string("ok"));
1795
                goto plugin_response;
1796
        } else if(!strcasecmp(request_text, "watch") || !strcasecmp(request_text, "start")
1797
                        || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "stop")
1798
                        || !strcasecmp(request_text, "switch")) {
1799
                /* These messages are handled asynchronously */
1800
                janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
1801
                if(msg == NULL) {
1802
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1803
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1804
                        g_snprintf(error_cause, 512, "Memory error");
1805
                        goto error;
1806
                }
1807

    
1808
                g_free(message);
1809
                msg->handle = handle;
1810
                msg->transaction = transaction;
1811
                msg->message = root;
1812
                msg->sdp_type = sdp_type;
1813
                msg->sdp = sdp;
1814

    
1815
                g_async_queue_push(messages, msg);
1816

    
1817
                return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
1818
        } else {
1819
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1820
                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1821
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1822
                goto error;
1823
        }
1824

    
1825
plugin_response:
1826
                {
1827
                        if(!response) {
1828
                                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1829
                                g_snprintf(error_cause, 512, "Invalid response");
1830
                                goto error;
1831
                        }
1832
                        if(root != NULL)
1833
                                json_decref(root);
1834
                        g_free(transaction);
1835
                        g_free(message);
1836
                        g_free(sdp_type);
1837
                        g_free(sdp);
1838

    
1839
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1840
                        json_decref(response);
1841
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1842
                        g_free(response_text);
1843
                        return result;
1844
                }
1845

    
1846
error:
1847
                {
1848
                        if(root != NULL)
1849
                                json_decref(root);
1850
                        g_free(transaction);
1851
                        g_free(message);
1852
                        g_free(sdp_type);
1853
                        g_free(sdp);
1854

    
1855
                        /* Prepare JSON error event */
1856
                        json_t *event = json_object();
1857
                        json_object_set_new(event, "streaming", json_string("event"));
1858
                        json_object_set_new(event, "error_code", json_integer(error_code));
1859
                        json_object_set_new(event, "error", json_string(error_cause));
1860
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1861
                        json_decref(event);
1862
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
1863
                        g_free(event_text);
1864
                        return result;
1865
                }
1866

    
1867
}
1868

    
1869
void janus_streaming_setup_media(janus_plugin_session *handle) {
1870
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
1871
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1872
                return;
1873
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1874
        if(!session) {
1875
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1876
                return;
1877
        }
1878
        if(session->destroyed)
1879
                return;
1880
        /* TODO Only start streaming when we get this event */
1881
        session->context.a_last_ssrc = 0;
1882
        session->context.a_last_ssrc = 0;
1883
        session->context.a_last_ts = 0;
1884
        session->context.a_base_ts = 0;
1885
        session->context.a_base_ts_prev = 0;
1886
        session->context.v_last_ssrc = 0;
1887
        session->context.v_last_ts = 0;
1888
        session->context.v_base_ts = 0;
1889
        session->context.v_base_ts_prev = 0;
1890
        session->context.a_last_seq = 0;
1891
        session->context.a_base_seq = 0;
1892
        session->context.a_base_seq_prev = 0;
1893
        session->context.v_last_seq = 0;
1894
        session->context.v_base_seq = 0;
1895
        session->context.v_base_seq_prev = 0;
1896
        session->started = TRUE;
1897
        /* Prepare JSON event */
1898
        json_t *event = json_object();
1899
        json_object_set_new(event, "streaming", json_string("event"));
1900
        json_t *result = json_object();
1901
        json_object_set_new(result, "status", json_string("started"));
1902
        json_object_set_new(event, "result", result);
1903
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1904
        json_decref(event);
1905
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1906
        int ret = gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1907
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1908
        g_free(event_text);
1909
}
1910

    
1911
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
1912
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1913
                return;
1914
        /* FIXME We don't care about what the browser sends us, we're sendonly */
1915
}
1916

    
1917
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
1918
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1919
                return;
1920
        /* We might interested in the available bandwidth that the user advertizes */
1921
        uint64_t bw = janus_rtcp_get_remb(buf, len);
1922
        if(bw > 0) {
1923
                JANUS_LOG(LOG_HUGE, "REMB for this PeerConnection: %"SCNu64"\n", bw);
1924
                /* TODO Use this somehow (e.g., notification towards application?) */
1925
        }
1926
        /* FIXME Maybe we should care about RTCP, but not now */
1927
}
1928

    
1929
void janus_streaming_hangup_media(janus_plugin_session *handle) {
1930
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
1931
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1932
                return;
1933
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1934
        if(!session) {
1935
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1936
                return;
1937
        }
1938
        if(session->destroyed || session->hangingup)
1939
                return;
1940
        session->hangingup = TRUE;
1941
        /* FIXME Simulate a "stop" coming from the browser */
1942
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
1943
        msg->handle = handle;
1944
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
1945
        msg->transaction = NULL;
1946
        msg->sdp_type = NULL;
1947
        msg->sdp = NULL;
1948
        g_async_queue_push(messages, msg);
1949
        /* Done */
1950
        session->hangingup = FALSE;
1951
}
1952

    
1953
/* Thread to handle incoming messages */
1954
static void *janus_streaming_handler(void *data) {
1955
        JANUS_LOG(LOG_VERB, "Joining Streaming handler thread\n");
1956
        janus_streaming_message *msg = NULL;
1957
        int error_code = 0;
1958
        char *error_cause = calloc(1024, sizeof(char));
1959
        if(error_cause == NULL) {
1960
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1961
                return NULL;
1962
        }
1963
        json_t *root = NULL;
1964
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
1965
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
1966
                        usleep(50000);
1967
                        continue;
1968
                }
1969
                janus_streaming_session *session = (janus_streaming_session *)msg->handle->plugin_handle;
1970
                if(!session) {
1971
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1972
                        janus_streaming_message_free(msg);
1973
                        continue;
1974
                }
1975
                if(session->destroyed) {
1976
                        janus_streaming_message_free(msg);
1977
                        continue;
1978
                }
1979
                /* Handle request */
1980
                error_code = 0;
1981
                root = NULL;
1982
                if(msg->message == NULL) {
1983
                        JANUS_LOG(LOG_ERR, "No message??\n");
1984
                        error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1985
                        g_snprintf(error_cause, 512, "%s", "No message??");
1986
                        goto error;
1987
                }
1988
                root = msg->message;
1989
                /* Get the request first */
1990
                json_t *request = json_object_get(root, "request");
1991
                if(!request) {
1992
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
1993
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1994
                        g_snprintf(error_cause, 512, "Missing element (request)");
1995
                        goto error;
1996
                }
1997
                if(!json_is_string(request)) {
1998
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
1999
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2000
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
2001
                        goto error;
2002
                }
2003
                const char *request_text = json_string_value(request);
2004
                json_t *result = NULL;
2005
                const char *sdp_type = NULL;
2006
                char *sdp = NULL;
2007
                /* All these requests can only be handled asynchronously */
2008
                if(!strcasecmp(request_text, "watch")) {
2009
                        json_t *id = json_object_get(root, "id");
2010
                        if(!id) {
2011
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
2012
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
2013
                                g_snprintf(error_cause, 512, "Missing element (id)");
2014
                                goto error;
2015
                        }
2016
                        if(!json_is_integer(id) || json_integer_value(id) < 0) {
2017
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
2018
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2019
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
2020
                                goto error;
2021
                        }
2022
                        gint64 id_value = json_integer_value(id);
2023
                        janus_mutex_lock(&mountpoints_mutex);
2024
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2025
                        if(mp == NULL) {
2026
                                janus_mutex_unlock(&mountpoints_mutex);
2027
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2028
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2029
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2030
                                goto error;
2031
                        }
2032
                        janus_mutex_unlock(&mountpoints_mutex);
2033
                        JANUS_LOG(LOG_VERB, "Request to watch mountpoint/stream %"SCNu64"\n", id_value);
2034
                        session->stopping = FALSE;
2035
                        session->mountpoint = mp;
2036
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
2037
                                GError *error = NULL;
2038
                                g_thread_try_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session, &error);
2039
                                if(error != NULL) {
2040
                                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the on-demand thread...\n", error->code, error->message ? error->message : "??");
2041
                                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
2042
                                        g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the on-demand thread", error->code, error->message ? error->message : "??");
2043
                                        goto error;
2044
                                }
2045
                        }
2046
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
2047
                        janus_mutex_lock(&mp->mutex);
2048
                        mp->listeners = g_list_append(mp->listeners, session);
2049
                        janus_mutex_unlock(&mp->mutex);
2050
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
2051
                        char sdptemp[2048];
2052
                        memset(sdptemp, 0, 2048);
2053
                        gchar buffer[512];
2054
                        memset(buffer, 0, 512);
2055
                        gint64 sessid = janus_get_monotonic_time();
2056
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
2057
                        g_snprintf(buffer, 512,
2058
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
2059
                                        "-", sessid, version);
2060
                        g_strlcat(sdptemp, buffer, 2048);
2061
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 2048);
2062
                        if(mp->codecs.audio_pt >= 0) {
2063
                                /* Add audio line */
2064
                                g_snprintf(buffer, 512,
2065
                                        "m=audio 1 RTP/SAVPF %d\r\n"
2066
                                        "c=IN IP4 1.1.1.1\r\n",
2067
                                        mp->codecs.audio_pt);
2068
                                g_strlcat(sdptemp, buffer, 2048);
2069
                                if(mp->codecs.audio_rtpmap) {
2070
                                        g_snprintf(buffer, 512,
2071
                                                "a=rtpmap:%d %s\r\n",
2072
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
2073
                                        g_strlcat(sdptemp, buffer, 2048);
2074
                                }
2075
                                if(mp->codecs.audio_fmtp) {
2076
                                        g_snprintf(buffer, 512,
2077
                                                "a=fmtp:%d %s\r\n",
2078
                                                mp->codecs.audio_pt, mp->codecs.audio_fmtp);
2079
                                        g_strlcat(sdptemp, buffer, 2048);
2080
                                }
2081
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2082
                        }
2083
                        if(mp->codecs.video_pt >= 0) {
2084
                                /* Add video line */
2085
                                g_snprintf(buffer, 512,
2086
                                        "m=video 1 RTP/SAVPF %d\r\n"
2087
                                        "c=IN IP4 1.1.1.1\r\n",
2088
                                        mp->codecs.video_pt);
2089
                                g_strlcat(sdptemp, buffer, 2048);
2090
                                if(mp->codecs.video_rtpmap) {
2091
                                        g_snprintf(buffer, 512,
2092
                                                "a=rtpmap:%d %s\r\n",
2093
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
2094
                                        g_strlcat(sdptemp, buffer, 2048);
2095
                                }
2096
                                if(mp->codecs.video_fmtp) {
2097
                                        g_snprintf(buffer, 512,
2098
                                                "a=fmtp:%d %s\r\n",
2099
                                                mp->codecs.video_pt, mp->codecs.video_fmtp);
2100
                                        g_strlcat(sdptemp, buffer, 2048);
2101
                                }
2102
                                g_snprintf(buffer, 512,
2103
                                        "a=rtcp-fb:%d nack\r\n",
2104
                                        mp->codecs.video_pt);
2105
                                g_strlcat(sdptemp, buffer, 2048);
2106
                                g_snprintf(buffer, 512,
2107
                                        "a=rtcp-fb:%d goog-remb\r\n",
2108
                                        mp->codecs.video_pt);
2109
                                g_strlcat(sdptemp, buffer, 2048);
2110
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2111
                        }
2112
                        sdp = g_strdup(sdptemp);
2113
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
2114
                        result = json_object();
2115
                        json_object_set_new(result, "status", json_string("preparing"));
2116
                } else if(!strcasecmp(request_text, "start")) {
2117
                        if(session->mountpoint == NULL) {
2118
                                JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
2119
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2120
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2121
                                goto error;
2122
                        }
2123
                        JANUS_LOG(LOG_VERB, "Starting the streaming\n");
2124
                        session->paused = FALSE;
2125
                        result = json_object();
2126
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
2127
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
2128
                } else if(!strcasecmp(request_text, "pause")) {
2129
                        if(session->mountpoint == NULL) {
2130
                                JANUS_LOG(LOG_VERB, "Can't pause: no mountpoint set\n");
2131
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2132
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2133
                                goto error;
2134
                        }
2135
                        JANUS_LOG(LOG_VERB, "Pausing the streaming\n");
2136
                        session->paused = TRUE;
2137
                        result = json_object();
2138
                        json_object_set_new(result, "status", json_string("pausing"));
2139
                } else if(!strcasecmp(request_text, "switch")) {
2140
                        /* This listener wants to switch to a different mountpoint
2141
                         * NOTE: this only works for live RTP streams as of now: you
2142
                         * cannot, for instance, switch from a live RTP mountpoint to
2143
                         * an on demand one or viceversa (TBD.) */
2144
                        janus_streaming_mountpoint *oldmp = session->mountpoint;
2145
                        if(oldmp == NULL) {
2146
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a mountpoint\n");
2147
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2148
                                g_snprintf(error_cause, 512, "Can't switch: not on a mountpoint");
2149
                                goto error;
2150
                        }
2151
                        if(oldmp->streaming_type != janus_streaming_type_live || 
2152
                                        oldmp->streaming_source != janus_streaming_source_rtp) {
2153
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a live RTP mountpoint\n");
2154
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2155
                                g_snprintf(error_cause, 512, "Can't switch: not on a live RTP mountpoint");
2156
                                goto error;
2157
                        }
2158
                        json_t *id = json_object_get(root, "id");
2159
                        if(!id) {
2160
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
2161
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
2162
                                g_snprintf(error_cause, 512, "Missing element (id)");
2163
                                goto error;
2164
                        }
2165
                        if(!json_is_integer(id) || json_integer_value(id) < 0) {
2166
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
2167
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2168
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
2169
                                goto error;
2170
                        }
2171
                        gint64 id_value = json_integer_value(id);
2172
                        janus_mutex_lock(&mountpoints_mutex);
2173
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2174
                        if(mp == NULL) {
2175
                                janus_mutex_unlock(&mountpoints_mutex);
2176
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2177
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2178
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2179
                                goto error;
2180
                        }
2181
                        if(mp->streaming_type != janus_streaming_type_live || 
2182
                                        mp->streaming_source != janus_streaming_source_rtp) {
2183
                                janus_mutex_unlock(&mountpoints_mutex);
2184
                                JANUS_LOG(LOG_VERB, "Can't switch: target is not a live RTP mountpoint\n");
2185
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2186
                                g_snprintf(error_cause, 512, "Can't switch: target is not a live RTP mountpoint");
2187
                                goto error;
2188
                        }
2189
                        janus_mutex_unlock(&mountpoints_mutex);
2190
                        JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %"SCNu64" (old: %"SCNu64")\n", id_value, oldmp->id);
2191
                        session->paused = TRUE;
2192
                        /* Unsubscribe from the previous mountpoint and subscribe to the new one */
2193
                        janus_mutex_lock(&oldmp->mutex);
2194
                        oldmp->listeners = g_list_remove_all(oldmp->listeners, session);
2195
                        janus_mutex_unlock(&oldmp->mutex);
2196
                        /* Subscribe to the new one */
2197
                        janus_mutex_lock(&mp->mutex);
2198
                        mp->listeners = g_list_append(mp->listeners, session);
2199
                        janus_mutex_unlock(&mp->mutex);
2200
                        session->mountpoint = mp;
2201
                        session->paused = FALSE;
2202
                        /* Done */
2203
                        result = json_object();
2204
                        json_object_set_new(result, "streaming", json_string("event"));
2205
                        json_object_set_new(result, "switched", json_string("ok"));
2206
                        json_object_set_new(result, "id", json_integer(id_value));
2207
                } else if(!strcasecmp(request_text, "stop")) {
2208
                        if(session->stopping || !session->started) {
2209
                                /* Been there, done that: ignore */
2210
                                janus_streaming_message_free(msg);
2211
                                continue;
2212
                        }
2213
                        JANUS_LOG(LOG_VERB, "Stopping the streaming\n");
2214
                        session->stopping = TRUE;
2215
                        session->started = FALSE;
2216
                        session->paused = FALSE;
2217
                        result = json_object();
2218
                        json_object_set_new(result, "status", json_string("stopping"));
2219
                        if(session->mountpoint) {
2220
                                janus_mutex_lock(&session->mountpoint->mutex);
2221
                                JANUS_LOG(LOG_VERB, "  -- Removing the session from the mountpoint listeners\n");
2222
                                if(g_list_find(session->mountpoint->listeners, session) != NULL) {
2223
                                        JANUS_LOG(LOG_VERB, "  -- -- Found!\n");
2224
                                }
2225
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
2226
                                janus_mutex_unlock(&session->mountpoint->mutex);
2227
                        }
2228
                        session->mountpoint = NULL;
2229
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
2230
                        gateway->close_pc(session->handle);
2231
                } else {
2232
                        JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
2233
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
2234
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
2235
                        goto error;
2236
                }
2237
                
2238
                /* Any SDP to handle? */
2239
                if(msg->sdp) {
2240
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
2241
                }
2242

    
2243
                /* Prepare JSON event */
2244
                json_t *event = json_object();
2245
                json_object_set_new(event, "streaming", json_string("event"));
2246
                if(result != NULL)
2247
                        json_object_set_new(event, "result", result);
2248
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2249
                json_decref(event);
2250
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2251
                int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp);
2252
                JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2253
                g_free(event_text);
2254
                if(sdp)
2255
                        g_free(sdp);
2256
                janus_streaming_message_free(msg);
2257
                continue;
2258
                
2259
error:
2260
                {
2261
                        /* Prepare JSON error event */
2262
                        json_t *event = json_object();
2263
                        json_object_set_new(event, "streaming", json_string("event"));
2264
                        json_object_set_new(event, "error_code", json_integer(error_code));
2265
                        json_object_set_new(event, "error", json_string(error_cause));
2266
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2267
                        json_decref(event);
2268
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2269
                        int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL);
2270
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2271
                        g_free(event_text);
2272
                        janus_streaming_message_free(msg);
2273
                }
2274
        }
2275
        g_free(error_cause);
2276
        JANUS_LOG(LOG_VERB, "Leaving Streaming handler thread\n");
2277
        return NULL;
2278
}
2279

    
2280

    
2281
/* Helpers to destroy a streaming mountpoint. */
2282
static void janus_streaming_rtp_source_free(janus_streaming_rtp_source *source) {
2283
        if(source->audio_fd > 0) {
2284
                close(source->audio_fd);
2285
        }
2286
        if(source->video_fd > 0) {
2287
                close(source->video_fd);
2288
        }
2289
#ifdef HAVE_LIBCURL
2290
        if(source->curl) {
2291
                curl_easy_cleanup(source->curl);
2292
        }
2293
#endif
2294
        free(source);
2295
}
2296

    
2297
static void janus_streaming_file_source_free(janus_streaming_file_source *source) {
2298
        g_free(source->filename);
2299
        free(source);
2300
}
2301

    
2302
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp) {
2303
        mp->destroyed = janus_get_monotonic_time();
2304
        
2305
        g_free(mp->name);
2306
        g_free(mp->description);
2307
        janus_mutex_lock(&mp->mutex);
2308
        g_list_free(mp->listeners);
2309
        janus_mutex_unlock(&mp->mutex);
2310

    
2311
        if(mp->source != NULL && mp->source_destroy != NULL) {
2312
                mp->source_destroy(mp->source);
2313
        }
2314

    
2315
        g_free(mp->codecs.audio_rtpmap);
2316
        g_free(mp->codecs.audio_fmtp);
2317
        g_free(mp->codecs.video_rtpmap);
2318
        g_free(mp->codecs.video_fmtp);
2319

    
2320
        free(mp);
2321
}
2322

    
2323

    
2324
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
2325
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
2326
                uint64_t id, char *name, char *desc,
2327
                gboolean doaudio, char *amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
2328
                gboolean dovideo, char *vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp) {
2329
        if(name == NULL) {
2330
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2331
        }
2332
        if(id == 0) {
2333
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2334
        }
2335
        if(!doaudio && !dovideo) {
2336
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
2337
                return NULL;
2338
        }
2339
        if(doaudio && (aport == 0 || artpmap == NULL)) {
2340
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for audio...\n");
2341
                return NULL;
2342
        }
2343
        if(dovideo && (vport == 0 || vcodec == 0 || vrtpmap == NULL)) {
2344
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for video...\n");
2345
                return NULL;
2346
        }
2347
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
2348
        janus_streaming_mountpoint *live_rtp = calloc(1, sizeof(janus_streaming_mountpoint));
2349
        if(live_rtp == NULL) {
2350
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2351
                return NULL;
2352
        }
2353
        live_rtp->id = id ? id : g_random_int();
2354
        char tempname[255];
2355
        if(!name) {
2356
                memset(tempname, 0, 255);
2357
                g_snprintf(tempname, 255, "%"SCNu64, live_rtp->id);
2358
        }
2359
        live_rtp->name = g_strdup(name ? name : tempname);
2360
        char *description = NULL;
2361
        if(desc != NULL)
2362
                description = g_strdup(desc);
2363
        else
2364
                description = g_strdup(name ? name : tempname);
2365
        live_rtp->description = description;
2366
        live_rtp->enabled = TRUE;
2367
        live_rtp->active = FALSE;
2368
        live_rtp->streaming_type = janus_streaming_type_live;
2369
        live_rtp->streaming_source = janus_streaming_source_rtp;
2370
        janus_streaming_rtp_source *live_rtp_source = calloc(1, sizeof(janus_streaming_rtp_source));
2371
        if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
2372
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2373
                if(live_rtp->name)
2374
                        g_free(live_rtp->name);
2375
                if(description)
2376
                        g_free(description);
2377
                if(live_rtp_source)
2378
                        g_free(live_rtp_source);
2379
                g_free(live_rtp);
2380
                return NULL;
2381
        }
2382
        live_rtp_source->audio_mcast = doaudio ? (amcast ? inet_addr(amcast) : INADDR_ANY) : INADDR_ANY;
2383
        live_rtp_source->audio_port = doaudio ? aport : -1;
2384
        live_rtp_source->video_mcast = dovideo ? (vmcast ? inet_addr(vmcast) : INADDR_ANY) : INADDR_ANY;
2385
        live_rtp_source->video_port = dovideo ? vport : -1;
2386
        live_rtp_source->arc = NULL;
2387
        live_rtp_source->vrc = NULL;
2388
        live_rtp_source->audio_fd = -1;
2389
        live_rtp_source->video_fd = -1;        
2390
        live_rtp->source = live_rtp_source;
2391
        live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2392
        live_rtp->codecs.audio_pt = doaudio ? acodec : -1;
2393
        live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2394
        live_rtp->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL;
2395
        live_rtp->codecs.video_pt = dovideo ? vcodec : -1;
2396
        live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2397
        live_rtp->codecs.video_fmtp = dovideo ? (vfmtp ? g_strdup(vfmtp) : NULL) : NULL;
2398
        live_rtp->listeners = NULL;
2399
        live_rtp->destroyed = 0;
2400
        janus_mutex_init(&live_rtp->mutex);
2401
        janus_mutex_lock(&mountpoints_mutex);
2402
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtp->id), live_rtp);
2403
        janus_mutex_unlock(&mountpoints_mutex);
2404
        GError *error = NULL;
2405
        g_thread_try_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp, &error);
2406
        if(error != NULL) {
2407
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTP thread...\n", error->code, error->message ? error->message : "??");
2408
                if(live_rtp->name)
2409
                        g_free(live_rtp->name);
2410
                if(description)
2411
                        g_free(description);
2412
                if(live_rtp_source)
2413
                        g_free(live_rtp_source);
2414
                g_free(live_rtp);
2415
                return NULL;
2416
        }
2417
        return live_rtp;
2418
}
2419

    
2420
/* Helper to create a file/ondemand live source */
2421
janus_streaming_mountpoint *janus_streaming_create_file_source(
2422
                uint64_t id, char *name, char *desc, char *filename,
2423
                gboolean live, gboolean doaudio, gboolean dovideo) {
2424
        if(filename == NULL) {
2425
                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, missing filename...\n");
2426
                return NULL;
2427
        }
2428
        if(name == NULL) {
2429
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2430
        }
2431
        if(id == 0) {
2432
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2433
        }
2434
        if(!doaudio && !dovideo) {
2435
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, no audio or video have to be streamed...\n");
2436
                return NULL;
2437
        }
2438
        /* FIXME We don't support video streaming from file yet */
2439
        if(!doaudio || dovideo) {
2440
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, we only support audio file streaming right now...\n");
2441
                return NULL;
2442
        }
2443
        /* TODO We should support something more than raw a-Law and mu-Law streams... */
2444
        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
2445
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
2446
                return NULL;
2447
        }
2448
        janus_streaming_mountpoint *file_source = calloc(1, sizeof(janus_streaming_mountpoint));
2449
        if(file_source == NULL) {
2450
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2451
                return NULL;
2452
        }
2453
        file_source->id = id ? id : g_random_int();
2454
        char tempname[255];
2455
        if(!name) {
2456
                memset(tempname, 0, 255);
2457
                g_snprintf(tempname, 255, "%"SCNu64, file_source->id);
2458
        }
2459
        file_source->name = g_strdup(name ? name : tempname);
2460
        char *description = NULL;
2461
        if(desc != NULL)
2462
                description = g_strdup(desc);
2463
        else
2464
                description = g_strdup(name ? name : tempname);
2465
        file_source->description = description;
2466
        file_source->enabled = TRUE;
2467
        file_source->active = FALSE;
2468
        file_source->streaming_type = live ? janus_streaming_type_live : janus_streaming_type_on_demand;
2469
        file_source->streaming_source = janus_streaming_source_file;
2470
        janus_streaming_file_source *file_source_source = calloc(1, sizeof(janus_streaming_file_source));
2471
        if(file_source->name == NULL || description == NULL || file_source_source == NULL) {
2472
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2473
                if(file_source->name)
2474
                        g_free(file_source->name);
2475
                if(description)
2476
                        g_free(description);
2477
                if(file_source_source)
2478
                        g_free(file_source_source);
2479
                g_free(file_source);
2480
                return NULL;
2481
        }
2482
        file_source_source->filename = g_strdup(filename);
2483
        file_source->source = file_source_source;
2484
        file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free;
2485
        file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0;
2486
        file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000");
2487
        file_source->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
2488
        file_source->codecs.video_rtpmap = NULL;
2489
        file_source->listeners = NULL;
2490
        file_source->destroyed = 0;
2491
        janus_mutex_init(&file_source->mutex);
2492
        janus_mutex_lock(&mountpoints_mutex);
2493
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(file_source->id), file_source);
2494
        janus_mutex_unlock(&mountpoints_mutex);
2495
        if(live) {
2496
                GError *error = NULL;
2497
                g_thread_try_new(file_source->name, &janus_streaming_filesource_thread, file_source, &error);
2498
                if(error != NULL) {
2499
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the live filesource thread...\n", error->code, error->message ? error->message : "??");
2500
                        if(file_source->name)
2501
                                g_free(file_source->name);
2502
                        if(description)
2503
                                g_free(description);
2504
                        if(file_source_source)
2505
                                g_free(file_source_source);
2506
                        g_free(file_source);
2507
                        return NULL;
2508
                }
2509
        }
2510
        return file_source;
2511
}
2512

    
2513
#ifdef HAVE_LIBCURL                
2514
typedef struct janus_streaming_buffer {
2515
        char *buffer;
2516
        size_t size;
2517
} janus_streaming_buffer;
2518

    
2519
static size_t janus_streaming_rtsp_curl_callback(void *payload, size_t size, size_t nmemb, void *data) {
2520
        size_t realsize = size * nmemb;
2521
        janus_streaming_buffer *buf = (struct janus_streaming_buffer *)data;
2522
        /* (Re)allocate if needed */
2523
        buf->buffer = realloc(buf->buffer, buf->size+realsize+1);
2524
        if(buf->buffer == NULL) {
2525
                /* Memory error! */ 
2526
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2527
                return 0;
2528
        }
2529
        /* Update the buffer */
2530
        memcpy(&(buf->buffer[buf->size]), payload, realsize);
2531
        buf->size += realsize;
2532
        buf->buffer[buf->size] = 0;
2533
        /* Done! */
2534
        return realsize;
2535
}
2536

    
2537
static int janus_streaming_rtsp_parse_sdp(const char* buffer, const char* name, const char* media, int* pt, int* port, char* rtpmap, char* fmtp, char* control) {
2538
        char pattern[256];
2539
        g_snprintf(pattern, sizeof(pattern), "m=%s", media);
2540
        char *m=strstr(buffer, pattern);
2541
        if(m == NULL) {
2542
                JANUS_LOG(LOG_VERB, "[%s] no media %s...\n", name, media);
2543
                return -1;
2544
        }
2545
        sscanf(m, "m=%*s %d %*s %d", port, pt);
2546
        char *s=strstr(m, "a=control:");
2547
        if(s == NULL) {
2548
                JANUS_LOG(LOG_ERR, "[%s] no control for %s...\n", name, media);
2549
                return -1;
2550
        }                
2551
        sscanf(s, "a=control:%s", control);
2552
        char *r=strstr(m, "a=rtpmap:");
2553
        if(r != NULL) {
2554
                sscanf(r, "a=rtpmap:%*d %s", rtpmap);
2555
        }
2556
        char *f=strstr(m, "a=fmtp:");
2557
        if(f != NULL) {
2558
                sscanf(f, "a=fmtp:%*d %s", fmtp);
2559
        }        
2560
        int fd = socket(AF_INET, SOCK_DGRAM, 0);
2561
        if(fd < 0) {
2562
                JANUS_LOG(LOG_ERR, "[%s] cannot create socket for %s...\n", name, media);
2563
                return -1;
2564
        }
2565
        struct sockaddr_in address;
2566
        address.sin_family = AF_INET;
2567
        address.sin_addr.s_addr = INADDR_ANY;
2568
        address.sin_port = htons(*port);
2569
        if(*port > 0) {
2570
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
2571
                setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
2572
        }                
2573
        if(bind(fd, (struct sockaddr *)(&address), sizeof(struct sockaddr)) < 0) {
2574
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for video (%d)...\n", name, *port);
2575
                close(fd);
2576
                return -1;
2577
        }
2578
        socklen_t len = sizeof(address);
2579
        if(getsockname(fd, (struct sockaddr *)&address, &len) < 0)
2580
        {
2581
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (%d)...\n", name, media, *port);
2582
                close(fd);
2583
                return -1;
2584
        }
2585
        *port=ntohs(address.sin_port);
2586
        
2587
        return fd;
2588
}        
2589

    
2590
/* Helper to create an RTSP source */
2591
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2592
                uint64_t id, char *name, char *desc, char *url,
2593
                gboolean doaudio, gboolean dovideo) {
2594
        if(url == NULL) {
2595
                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n");
2596
                return NULL;
2597
        }        
2598
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");        
2599
        CURL* curl = curl_easy_init();
2600
        if(curl == NULL) {
2601
                JANUS_LOG(LOG_ERR, "Can't init CURL\n");
2602
                return NULL;
2603
        }
2604
        curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
2605
        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
2606
        curl_easy_setopt(curl, CURLOPT_URL, url);        
2607
        /* Send an RTSP DESCRIBE */
2608
        janus_streaming_buffer data;
2609
        data.buffer = malloc(1);
2610
        data.size = 0;
2611
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, url);
2612
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_DESCRIBE);
2613
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, janus_streaming_rtsp_curl_callback);                
2614
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &data);
2615
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, janus_streaming_rtsp_curl_callback);                
2616
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, &data);
2617
        int res = curl_easy_perform(curl);
2618
        if(res != CURLE_OK) {
2619
                JANUS_LOG(LOG_ERR, "Couldn't send DESCRIBE request: %s\n", curl_easy_strerror(res));
2620
                curl_easy_cleanup(curl);
2621
                return NULL;
2622
        }                
2623
        JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n",data.buffer);        
2624
        /* Parse the SDP we just got to figure out the negotiated media */
2625
        int vpt = -1;
2626
        int vport = -1;
2627
        char vrtpmap[2048];
2628
        char vfmtp[2048];
2629
        char vcontrol[2048];
2630
        char uri[1024];
2631
        char transport[1024];
2632
        int video_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "video", &vpt, &vport, vrtpmap, vfmtp, vcontrol);
2633
        if(video_fd >= 0) {
2634
                /* Send an RTSP SETUP for video */
2635
                free(data.buffer);
2636
                data.buffer = malloc(1);
2637
                data.size = 0;                
2638
                sprintf(uri, "%s/%s", url, vcontrol);
2639
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2640
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", vport, vport+1);        
2641
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2642
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2643
                res = curl_easy_perform(curl);
2644
                if(res != CURLE_OK) {
2645
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2646
                        curl_easy_cleanup(curl);
2647
                        return NULL;
2648
                }                
2649
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2650
        }
2651
        int apt = -1;
2652
        int aport = -1;
2653
        char artpmap[2048];
2654
        char afmtp[2048];
2655
        char acontrol[2048];
2656
        int audio_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "audio", &apt, &aport, artpmap, afmtp, acontrol);
2657
        if(audio_fd >= 0) {
2658
                /* Send an RTSP SETUP for audio */
2659
                free(data.buffer);
2660
                data.buffer = malloc(1);
2661
                data.size = 0;                
2662
                sprintf(uri, "%s/%s", url, acontrol);
2663
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2664
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", aport, aport+1);        
2665
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2666
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2667
                res = curl_easy_perform(curl);
2668
                if(res != CURLE_OK) {
2669
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2670
                        curl_easy_cleanup(curl);
2671
                        return NULL;
2672
                }                
2673
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2674
        }        
2675
        /* Create an RTP source for the media we'll get */
2676
        char tempname[255];
2677
        if(!name) {
2678
                memset(tempname, 0, 255);
2679
                g_snprintf(tempname, 255, "%"SCNu64, id);
2680
        }
2681
        char *sourcename =  g_strdup(name ? name : tempname);
2682
        if(sourcename == NULL) {
2683
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2684
                curl_easy_cleanup(curl);
2685
                return NULL;
2686
        }        
2687
        char *description = NULL;
2688
        if(desc != NULL) {
2689
                description = g_strdup(desc);
2690
        } else {
2691
                description = g_strdup(name ? name : tempname);
2692
        }
2693
        if(description == NULL) {
2694
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2695
                g_free(sourcename);
2696
                curl_easy_cleanup(curl);
2697
                return NULL;
2698
        }                
2699
        janus_streaming_mountpoint *live_rtsp = calloc(1, sizeof(janus_streaming_mountpoint));
2700
        if(live_rtsp == NULL) {
2701
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2702
                g_free(description);
2703
                g_free(sourcename);
2704
                curl_easy_cleanup(curl);
2705
                return NULL;
2706
        }        
2707
        live_rtsp->id = id ? id : g_random_int();
2708
        live_rtsp->name = sourcename;
2709
        live_rtsp->description = description;
2710
        live_rtsp->enabled = TRUE;
2711
        live_rtsp->active = FALSE;
2712
        live_rtsp->streaming_type = janus_streaming_type_live;
2713
        live_rtsp->streaming_source = janus_streaming_source_rtp;
2714
        janus_streaming_rtp_source *live_rtsp_source = calloc(1, sizeof(janus_streaming_rtp_source));
2715
        live_rtsp_source->arc = NULL;
2716
        live_rtsp_source->vrc = NULL;
2717
        live_rtsp_source->audio_fd = audio_fd;
2718
        live_rtsp_source->video_fd = video_fd;
2719
        live_rtsp_source->curl = curl;
2720
        live_rtsp->source = live_rtsp_source;
2721
        live_rtsp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2722
        live_rtsp->codecs.audio_pt = doaudio ? apt : -1;
2723
        live_rtsp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2724
        live_rtsp->codecs.audio_fmtp = doaudio ? g_strdup(afmtp) : NULL;
2725
        live_rtsp->codecs.video_pt = dovideo ? vpt : -1;
2726
        live_rtsp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2727
        live_rtsp->codecs.video_fmtp = dovideo ? g_strdup(vfmtp) : NULL;
2728
        live_rtsp->listeners = NULL;
2729
        live_rtsp->destroyed = 0;
2730
        janus_mutex_init(&live_rtsp->mutex);
2731
        janus_mutex_lock(&mountpoints_mutex);
2732
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtsp->id), live_rtsp);
2733
        janus_mutex_unlock(&mountpoints_mutex);        
2734
        GError *error = NULL;
2735
        g_thread_try_new(live_rtsp->name, &janus_streaming_relay_thread, live_rtsp, &error);        
2736
        if(error != NULL) {
2737
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTSP thread...\n", error->code, error->message ? error->message : "??");
2738
                g_free(description);
2739
                g_free(sourcename);
2740
                janus_streaming_rtp_source_free(live_rtsp_source);
2741
                g_free(live_rtsp);
2742
                curl_easy_cleanup(curl);
2743
                return NULL;        
2744
        }                                
2745
        /* Send an RTSP PLAY */
2746
        free(data.buffer);        
2747
        data.buffer = malloc(1);
2748
        data.size = 0;                
2749
        sprintf(uri, "%s/", url);
2750
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2751
        curl_easy_setopt(curl, CURLOPT_RANGE, "0.000-");
2752
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY);                
2753
        res = curl_easy_perform(curl);
2754
        if(res != CURLE_OK) {
2755
                JANUS_LOG(LOG_ERR, "Couldn't send PLAY request: %s\n", curl_easy_strerror(res));
2756
                g_free(description);
2757
                g_free(sourcename);
2758
                janus_streaming_rtp_source_free(live_rtsp_source);
2759
                g_free(live_rtsp);
2760
                curl_easy_cleanup(curl);
2761
                return NULL;
2762
        }                
2763
        JANUS_LOG(LOG_VERB, "PLAY answer:%s\n",data.buffer);        
2764
        free(data.buffer);        
2765
        
2766
        return live_rtsp;
2767
}
2768
#else
2769
/* Helper to create an RTSP source */
2770
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2771
                uint64_t id, char *name, char *desc, char *url,
2772
                gboolean doaudio, gboolean dovideo) {
2773
        JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
2774
        return NULL;
2775
}
2776
#endif
2777

    
2778
/* FIXME Thread to send RTP packets from a file (on demand) */
2779
static void *janus_streaming_ondemand_thread(void *data) {
2780
        JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n");
2781
        janus_streaming_session *session = (janus_streaming_session *)data;
2782
        if(!session) {
2783
                JANUS_LOG(LOG_ERR, "Invalid session!\n");
2784
                g_thread_unref(g_thread_self());
2785
                return NULL;
2786
        }
2787
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
2788
        if(!mountpoint) {
2789
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2790
                g_thread_unref(g_thread_self());
2791
                return NULL;
2792
        }
2793
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2794
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2795
                g_thread_unref(g_thread_self());
2796
                return NULL;
2797
        }
2798
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
2799
                JANUS_LOG(LOG_ERR, "[%s] Not an on-demand file source mountpoint!\n", mountpoint->name);
2800
                g_thread_unref(g_thread_self());
2801
                return NULL;
2802
        }
2803
        janus_streaming_file_source *source = mountpoint->source;
2804
        if(source == NULL || source->filename == NULL) {
2805
                g_thread_unref(g_thread_self());
2806
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2807
                return NULL;
2808
        }
2809
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2810
        FILE *audio = fopen(source->filename, "rb");
2811
        if(!audio) {
2812
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2813
                g_thread_unref(g_thread_self());
2814
                return NULL;
2815
        }
2816
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2817
        /* Buffer */
2818
        char *buf = calloc(1024, sizeof(char));
2819
        if(buf == NULL) {
2820
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2821
                g_thread_unref(g_thread_self());
2822
                return NULL;
2823
        }
2824
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2825
        /* Set up RTP */
2826
        gint16 seq = 1;
2827
        gint32 ts = 0;
2828
        rtp_header *header = (rtp_header *)buf;
2829
        header->version = 2;
2830
        header->markerbit = 1;
2831
        header->type = mountpoint->codecs.audio_pt;
2832
        header->seq_number = htons(seq);
2833
        header->timestamp = htonl(ts);
2834
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2835
        /* Timer */
2836
        struct timeval now, before;
2837
        gettimeofday(&before, NULL);
2838
        now.tv_sec = before.tv_sec;
2839
        now.tv_usec = before.tv_usec;
2840
        time_t passed, d_s, d_us;
2841
        /* Loop */
2842
        gint read = 0;
2843
        janus_streaming_rtp_relay_packet packet;
2844
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed && !session->stopping && !session->destroyed) {
2845
                /* See if it's time to prepare a frame */
2846
                gettimeofday(&now, NULL);
2847
                d_s = now.tv_sec - before.tv_sec;
2848
                d_us = now.tv_usec - before.tv_usec;
2849
                if(d_us < 0) {
2850
                        d_us += 1000000;
2851
                        --d_s;
2852
                }
2853
                passed = d_s*1000000 + d_us;
2854
                if(passed < 18000) {        /* Let's wait about 18ms */
2855
                        usleep(1000);
2856
                        continue;
2857
                }
2858
                /* Update the reference time */
2859
                before.tv_usec += 20000;
2860
                if(before.tv_usec > 1000000) {
2861
                        before.tv_sec++;
2862
                        before.tv_usec -= 1000000;
2863
                }
2864
                /* If not started or paused, wait some more */
2865
                if(!session->started || session->paused || !mountpoint->enabled)
2866
                        continue;
2867
                /* Read frame from file... */
2868
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2869
                if(feof(audio)) {
2870
                        /* FIXME We're doing this forever... should this be configurable? */
2871
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2872
                        fseek(audio, 0, SEEK_SET);
2873
                        continue;
2874
                }
2875
                if(read < 0)
2876
                        break;
2877
                if(mountpoint->active == FALSE)
2878
                        mountpoint->active = TRUE;
2879
                //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
2880
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
2881
                //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
2882
                /* Relay on all sessions */
2883
                packet.data = header;
2884
                packet.length = RTP_HEADER_SIZE + read;
2885
                packet.is_video = 0;
2886
                /* Backup the actual timestamp and sequence number */
2887
                packet.timestamp = ntohl(packet.data->timestamp);
2888
                packet.seq_number = ntohs(packet.data->seq_number);
2889
                /* Go! */
2890
                janus_streaming_relay_rtp_packet(session, &packet);
2891
                /* Update header */
2892
                seq++;
2893
                header->seq_number = htons(seq);
2894
                ts += 160;
2895
                header->timestamp = htonl(ts);
2896
                header->markerbit = 0;
2897
        }
2898
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name);
2899
        g_free(name);
2900
        g_free(buf);
2901
        fclose(audio);
2902
        g_thread_unref(g_thread_self());
2903
        return NULL;
2904
}
2905

    
2906
/* FIXME Thread to send RTP packets from a file (live) */
2907
static void *janus_streaming_filesource_thread(void *data) {
2908
        JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n");
2909
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
2910
        if(!mountpoint) {
2911
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2912
                g_thread_unref(g_thread_self());
2913
                return NULL;
2914
        }
2915
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2916
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2917
                g_thread_unref(g_thread_self());
2918
                return NULL;
2919
        }
2920
        if(mountpoint->streaming_type != janus_streaming_type_live) {
2921
                JANUS_LOG(LOG_ERR, "[%s] Not a live file source mountpoint!\n", mountpoint->name);
2922
                g_thread_unref(g_thread_self());
2923
                return NULL;
2924
        }
2925
        janus_streaming_file_source *source = mountpoint->source;
2926
        if(source == NULL || source->filename == NULL) {
2927
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2928
                g_thread_unref(g_thread_self());
2929
                return NULL;
2930
        }
2931
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2932
        FILE *audio = fopen(source->filename, "rb");
2933
        if(!audio) {
2934
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2935
                g_thread_unref(g_thread_self());
2936
                return NULL;
2937
        }
2938
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2939
        /* Buffer */
2940
        char *buf = calloc(1024, sizeof(char));
2941
        if(buf == NULL) {
2942
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2943
                g_thread_unref(g_thread_self());
2944
                return NULL;
2945
        }
2946
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2947
        /* Set up RTP */
2948
        gint16 seq = 1;
2949
        gint32 ts = 0;
2950
        rtp_header *header = (rtp_header *)buf;
2951
        header->version = 2;
2952
        header->markerbit = 1;
2953
        header->type = mountpoint->codecs.audio_pt;
2954
        header->seq_number = htons(seq);
2955
        header->timestamp = htonl(ts);
2956
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2957
        /* Timer */
2958
        struct timeval now, before;
2959
        gettimeofday(&before, NULL);
2960
        now.tv_sec = before.tv_sec;
2961
        now.tv_usec = before.tv_usec;
2962
        time_t passed, d_s, d_us;
2963
        /* Loop */
2964
        gint read = 0;
2965
        janus_streaming_rtp_relay_packet packet;
2966
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
2967
                /* See if it's time to prepare a frame */
2968
                gettimeofday(&now, NULL);
2969
                d_s = now.tv_sec - before.tv_sec;
2970
                d_us = now.tv_usec - before.tv_usec;
2971
                if(d_us < 0) {
2972
                        d_us += 1000000;
2973
                        --d_s;
2974
                }
2975
                passed = d_s*1000000 + d_us;
2976
                if(passed < 18000) {        /* Let's wait about 18ms */
2977
                        usleep(1000);
2978
                        continue;
2979
                }
2980
                /* Update the reference time */
2981
                before.tv_usec += 20000;
2982
                if(before.tv_usec > 1000000) {
2983
                        before.tv_sec++;
2984
                        before.tv_usec -= 1000000;
2985
                }
2986
                /* If paused, wait some more */
2987
                if(!mountpoint->enabled)
2988
                        continue;
2989
                /* Read frame from file... */
2990
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2991
                if(feof(audio)) {
2992
                        /* FIXME We're doing this forever... should this be configurable? */
2993
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2994
                        fseek(audio, 0, SEEK_SET);
2995
                        continue;
2996
                }
2997
                if(read < 0)
2998
                        break;
2999
                if(mountpoint->active == FALSE)
3000
                        mountpoint->active = TRUE;
3001
                // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3002
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3003
                // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3004
                /* Relay on all sessions */
3005
                packet.data = header;
3006
                packet.length = RTP_HEADER_SIZE + read;
3007
                packet.is_video = 0;
3008
                /* Backup the actual timestamp and sequence number */
3009
                packet.timestamp = ntohl(packet.data->timestamp);
3010
                packet.seq_number = ntohs(packet.data->seq_number);
3011
                /* Go! */
3012
                janus_mutex_lock_nodebug(&mountpoint->mutex);
3013
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3014
                janus_mutex_unlock_nodebug(&mountpoint->mutex);
3015
                /* Update header */
3016
                seq++;
3017
                header->seq_number = htons(seq);
3018
                ts += 160;
3019
                header->timestamp = htonl(ts);
3020
                header->markerbit = 0;
3021
        }
3022
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name);
3023
        g_free(name);
3024
        g_free(buf);
3025
        fclose(audio);
3026
        g_thread_unref(g_thread_self());
3027
        return NULL;
3028
}
3029

    
3030
/* FIXME Test thread to relay RTP frames coming from gstreamer/ffmpeg/others */
3031
static void *janus_streaming_relay_thread(void *data) {
3032
        JANUS_LOG(LOG_VERB, "Starting streaming relay thread\n");
3033
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3034
        if(!mountpoint) {
3035
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3036
                g_thread_unref(g_thread_self());
3037
                return NULL;
3038
        }
3039
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
3040
                JANUS_LOG(LOG_ERR, "[%s] Not an RTP source mountpoint!\n", mountpoint->name);
3041
                g_thread_unref(g_thread_self());
3042
                return NULL;
3043
        }
3044
        janus_streaming_rtp_source *source = mountpoint->source;
3045
        if(source == NULL) {
3046
                JANUS_LOG(LOG_ERR, "[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
3047
                g_thread_unref(g_thread_self());
3048
                return NULL;
3049
        }
3050
        gint audio_port = source->audio_port;
3051
        gint video_port = source->video_port;
3052
        /* Socket stuff */
3053
        struct sockaddr_in audio_address, video_address;
3054
        int audio_fd = source->audio_fd;
3055
        if((audio_fd < 0) && (audio_port >= 0)) {
3056
                audio_fd = socket(AF_INET, SOCK_DGRAM, 0);
3057
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
3058
                setsockopt(audio_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
3059
                if(IN_MULTICAST(source->audio_mcast))
3060
                {
3061
                        struct ip_mreq mreq;
3062
                        mreq.imr_multiaddr.s_addr = source->audio_mcast;
3063
                        if(setsockopt(audio_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
3064
                                JANUS_LOG(LOG_ERR, "[%s] Audio listener IP_ADD_MEMBERSHIP fail\n", mountpoint->name);
3065
                                g_thread_unref(g_thread_self());
3066
                                return NULL;
3067
                        }
3068
                }
3069

    
3070
                audio_address.sin_family = AF_INET;
3071
                audio_address.sin_port = htons(audio_port);
3072
                audio_address.sin_addr.s_addr = INADDR_ANY;
3073
                if(bind(audio_fd, (struct sockaddr *)(&audio_address), sizeof(struct sockaddr)) < 0) {
3074
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for audio (port %d)...\n", mountpoint->name, audio_port);
3075
                        g_thread_unref(g_thread_self());
3076
                        return NULL;
3077
                }
3078
                JANUS_LOG(LOG_VERB, "[%s] Audio listener bound to port %d\n", mountpoint->name, audio_port);
3079
        }
3080
        int video_fd = source->video_fd;
3081
        if((video_fd < 0) && (video_port >= 0)) {
3082
                video_fd = socket(AF_INET, SOCK_DGRAM, 0);
3083
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
3084
                setsockopt(video_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
3085
                if(IN_MULTICAST(source->video_mcast))
3086
                {
3087
                        struct ip_mreq mreq;
3088
                        mreq.imr_multiaddr.s_addr = source->video_mcast;
3089
                        if(setsockopt(video_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
3090
                                JANUS_LOG(LOG_ERR, "[%s] Video listener IP_ADD_MEMBERSHIP fail\n", mountpoint->name);
3091
                                g_thread_unref(g_thread_self());
3092
                                return NULL;
3093
                        }
3094
                }
3095
                
3096
                video_address.sin_family = AF_INET;
3097
                video_address.sin_port = htons(video_port);
3098
                video_address.sin_addr.s_addr = INADDR_ANY;
3099
                if(bind(video_fd, (struct sockaddr *)(&video_address), sizeof(struct sockaddr)) < 0) {
3100
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for video (%d)...\n", mountpoint->name, video_port);
3101
                        g_thread_unref(g_thread_self());
3102
                        return NULL;
3103
                }
3104
                JANUS_LOG(LOG_VERB, "[%s] Video listener bound to port %d\n", mountpoint->name, video_port);
3105
        }
3106
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3107
        /* Needed to fix seq and ts */
3108
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
3109
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
3110
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
3111
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
3112
        /* Loop */
3113
        socklen_t addrlen;
3114
        struct sockaddr_in remote;
3115
        int resfd = 0, bytes = 0;
3116
        struct pollfd fds[2];
3117
        char buffer[1500];
3118
        memset(buffer, 0, 1500);
3119
        janus_streaming_rtp_relay_packet packet;
3120
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3121
                /* Wait for some data */
3122
                fds[0].fd = 0;
3123
                fds[0].events = 0;
3124
                fds[0].revents = 0;
3125
                if(audio_fd > 0) {
3126
                        fds[0].fd = audio_fd;
3127
                        fds[0].events = POLLIN;
3128
                }
3129
                fds[1].fd = 0;
3130
                fds[1].events = 0;
3131
                fds[1].revents = 0;
3132
                if(video_fd > 0) {
3133
                        fds[1].fd = video_fd;
3134
                        fds[1].events = POLLIN;
3135
                }
3136
                resfd = poll(fds, 2, 1000);
3137
                if(resfd < 0) {
3138
                        JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", mountpoint->name, errno, strerror(errno));
3139
                        break;
3140
                } else if(resfd == 0) {
3141
                        /* No data, keep going */
3142
                        continue;
3143
                }
3144
                if(audio_fd && (fds[0].revents & POLLIN)) {
3145
                        /* Got something audio (RTP) */
3146
                        fds[0].revents = 0;
3147
                        if(mountpoint->active == FALSE)
3148
                                mountpoint->active = TRUE;
3149
                        addrlen = sizeof(remote);
3150
                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3151
                        // JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the audio channel...\n", bytes);
3152
                        /* If paused, ignore this packet */
3153
                        if(!mountpoint->enabled)
3154
                                continue;
3155
                        rtp_header *rtp = (rtp_header *)buffer;
3156
                        // JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3157
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3158
                        /* Relay on all sessions */
3159
                        packet.data = rtp;
3160
                        packet.length = bytes;
3161
                        packet.is_video = 0;
3162
                        /* Do we have a new stream? */
3163
                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
3164
                                a_last_ssrc = ntohl(packet.data->ssrc);
3165
                                JANUS_LOG(LOG_INFO, "[%s] New audio stream! (ssrc=%u)\n", name, a_last_ssrc);
3166
                                a_base_ts_prev = a_last_ts;
3167
                                a_base_ts = ntohl(packet.data->timestamp);
3168
                                a_base_seq_prev = a_last_seq;
3169
                                a_base_seq = ntohs(packet.data->seq_number);
3170
                        }
3171
                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
3172
                        packet.data->timestamp = htonl(a_last_ts);
3173
                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
3174
                        packet.data->seq_number = htons(a_last_seq);
3175
                        // JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3176
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3177
                        packet.data->type = mountpoint->codecs.audio_pt;
3178
                        /* Is there a recorder? */
3179
                        if(source->arc) {
3180
                                JANUS_LOG(LOG_HUGE, "[%s] Saving audio frame (%d bytes)\n", name, bytes);
3181
                                janus_recorder_save_frame(source->arc, buffer, bytes);
3182
                        }
3183
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3184
                        packet.timestamp = ntohl(packet.data->timestamp);
3185
                        packet.seq_number = ntohs(packet.data->seq_number);
3186
                        /* Go! */
3187
                        janus_mutex_lock(&mountpoint->mutex);
3188
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3189
                        janus_mutex_unlock(&mountpoint->mutex);
3190
                        continue;
3191
                }
3192
                if(video_fd && (fds[1].revents & POLLIN)) {
3193
                        /* Got something video (RTP) */
3194
                        fds[1].revents = 0;
3195
                        if(mountpoint->active == FALSE)
3196
                                mountpoint->active = TRUE;
3197
                        addrlen = sizeof(remote);
3198
                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3199
                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the video channel...\n", bytes);
3200
                        /* If paused, ignore this packet */
3201
                        if(!mountpoint->enabled)
3202
                                continue;
3203
                        rtp_header *rtp = (rtp_header *)buffer;
3204
                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3205
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3206
                        /* Relay on all sessions */
3207
                        packet.data = rtp;
3208
                        packet.length = bytes;
3209
                        packet.is_video = 1;
3210
                        /* Do we have a new stream? */
3211
                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
3212
                                v_last_ssrc = ntohl(packet.data->ssrc);
3213
                                JANUS_LOG(LOG_INFO, "[%s] New video stream! (ssrc=%u)\n", name, v_last_ssrc);
3214
                                v_base_ts_prev = v_last_ts;
3215
                                v_base_ts = ntohl(packet.data->timestamp);
3216
                                v_base_seq_prev = v_last_seq;
3217
                                v_base_seq = ntohs(packet.data->seq_number);
3218
                        }
3219
                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
3220
                        packet.data->timestamp = htonl(v_last_ts);
3221
                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
3222
                        packet.data->seq_number = htons(v_last_seq);
3223
                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3224
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3225
                        packet.data->type = mountpoint->codecs.video_pt;
3226
                        /* Is there a recorder? */
3227
                        if(source->vrc) {
3228
                                JANUS_LOG(LOG_HUGE, "[%s] Saving video frame (%d bytes)\n", name, bytes);
3229
                                janus_recorder_save_frame(source->vrc, buffer, bytes);
3230
                        }
3231
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3232
                        packet.timestamp = ntohl(packet.data->timestamp);
3233
                        packet.seq_number = ntohs(packet.data->seq_number);
3234
                        /* Go! */
3235
                        janus_mutex_lock(&mountpoint->mutex);
3236
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3237
                        janus_mutex_unlock(&mountpoint->mutex);
3238
                        continue;
3239
                }
3240
        }
3241
        JANUS_LOG(LOG_VERB, "[%s] Leaving streaming relay thread\n", name);
3242
        g_free(name);
3243
        g_thread_unref(g_thread_self());
3244
        return NULL;
3245
}
3246

    
3247
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
3248
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
3249
        if(!packet || !packet->data || packet->length < 1) {
3250
                JANUS_LOG(LOG_ERR, "Invalid packet...\n");
3251
                return;
3252
        }
3253
        janus_streaming_session *session = (janus_streaming_session *)data;
3254
        if(!session || !session->handle) {
3255
                // JANUS_LOG(LOG_ERR, "Invalid session...\n");
3256
                return;
3257
        }
3258
        if(!session->started || session->paused) {
3259
                // JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n");
3260
                return;
3261
        }
3262

    
3263
        /* Make sure there hasn't been a publisher switch by checking the SSRC */
3264
        if(packet->is_video) {
3265
                if(ntohl(packet->data->ssrc) != session->context.v_last_ssrc) {
3266
                        session->context.v_last_ssrc = ntohl(packet->data->ssrc);
3267
                        session->context.v_base_ts_prev = session->context.v_last_ts;
3268
                        session->context.v_base_ts = packet->timestamp;
3269
                        session->context.v_base_seq_prev = session->context.v_last_seq;
3270
                        session->context.v_base_seq = packet->seq_number;
3271
                }
3272
                /* Compute a coherent timestamp and sequence number */
3273
                session->context.v_last_ts = (packet->timestamp-session->context.v_base_ts)
3274
                        + session->context.v_base_ts_prev+4500;        /* FIXME When switching, we assume 15fps */
3275
                session->context.v_last_seq = (packet->seq_number-session->context.v_base_seq)+session->context.v_base_seq_prev+1;
3276
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3277
                packet->data->timestamp = htonl(session->context.v_last_ts);
3278
                packet->data->seq_number = htons(session->context.v_last_seq);
3279
                if(gateway != NULL)
3280
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3281
                /* Restore the timestamp and sequence number to what the publisher set them to */
3282
                packet->data->timestamp = htonl(packet->timestamp);
3283
                packet->data->seq_number = htons(packet->seq_number);
3284
        } else {
3285
                if(ntohl(packet->data->ssrc) != session->context.a_last_ssrc) {
3286
                        session->context.a_last_ssrc = ntohl(packet->data->ssrc);
3287
                        session->context.a_base_ts_prev = session->context.a_last_ts;
3288
                        session->context.a_base_ts = packet->timestamp;
3289
                        session->context.a_base_seq_prev = session->context.a_last_seq;
3290
                        session->context.a_base_seq = packet->seq_number;
3291
                }
3292
                /* Compute a coherent timestamp and sequence number */
3293
                session->context.a_last_ts = (packet->timestamp-session->context.a_base_ts)
3294
                        + session->context.a_base_ts_prev+960;        /* FIXME When switching, we assume Opus and so a 960 ts step */
3295
                session->context.a_last_seq = (packet->seq_number-session->context.a_base_seq)+session->context.a_base_seq_prev+1;
3296
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3297
                packet->data->timestamp = htonl(session->context.a_last_ts);
3298
                packet->data->seq_number = htons(session->context.a_last_seq);
3299
                if(gateway != NULL)
3300
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3301
                /* Restore the timestamp and sequence number to what the publisher set them to */
3302
                packet->data->timestamp = htonl(packet->timestamp);
3303
                packet->data->seq_number = htons(packet->seq_number);
3304
        }
3305

    
3306
        return;
3307
}