Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ 793d18b1

History | View | Annotate | Download (130 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand|rtsp
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
       rtsp = stream originated by an external RTSP feed (only
50
              available if libcurl support was compiled)
51
id = <unique numeric ID>
52
description = This is my awesome stream
53
is_private = yes|no (private streams don't appear when you do a 'list' request)
54
filename = path to the local file to stream (only for live/ondemand)
55
secret = <optional password needed for manipulating (e.g., destroying
56
                or enabling/disabling) the stream>
57
audio = yes|no (do/don't stream audio)
58
video = yes|no (do/don't stream video)
59
   The following options are only valid for the 'rtp' type:
60
audioport = local port for receiving audio frames
61
audiomcast = multicast group port for receiving audio frames
62
audiopt = <audio RTP payload type> (e.g., 111)
63
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
64
audiofmtp = Codec specific parameters, if any
65
videoport = local port for receiving video frames (only for rtp)
66
videomcast = multicast group port for receiving video frames
67
videopt = <video RTP payload type> (e.g., 100)
68
videortpmap = RTP map of the video codec (e.g., VP8/90000)
69
videofmtp = Codec specific parameters, if any
70
   The following options are only valid for the 'rstp' type:
71
url = RTSP stream URL (only if type=rtsp)
72
\endverbatim
73
 *
74
 * \section streamapi Streaming API
75
 * 
76
 * The Streaming API supports several requests, some of which are
77
 * synchronous and some asynchronous. There are some situations, though,
78
 * (invalid JSON, invalid request) which will always result in a
79
 * synchronous error response even for asynchronous requests. 
80
 * 
81
 * \c list , \c create , \c destroy , \c recording , \c enable and
82
 * \c disable are synchronous requests, which means you'll
83
 * get a response directly within the context of the transaction. \c list
84
 * lists all the available streams; \c create allows you to create a new
85
 * mountpoint dynamically, as an alternative to using the configuration
86
 * file; \c destroy removes a mountpoint and destroys it; \c recording
87
 * instructs the plugin on whether or not a live RTP stream should be
88
 * recorded while it's broadcasted; \c enable and \c disable respectively
89
 * enable and disable a mountpoint, that is decide whether or not a
90
 * mountpoint should be available to users without destroying it.
91
 * 
92
 * The \c watch , \c start , \c pause , \c switch and \c stop requests
93
 * instead are all asynchronous, which means you'll get a notification
94
 * about their success or failure in an event. \c watch asks the plugin
95
 * to prepare the playout of one of the available streams; \c start
96
 * starts the actual playout; \c pause allows you to pause a playout
97
 * without tearing down the PeerConnection; \c switch allows you to
98
 * switch to a different mountpoint of the same kind (note: only live
99
 * RTP mountpoints supported as of now) without having to stop and watch
100
 * the new one; \c stop stops the playout and tears the PeerConnection
101
 * down.
102
 * 
103
 * Actual API docs: TBD.
104
 * 
105
 * \ingroup plugins
106
 * \ref plugins
107
 */
108

    
109
#include "plugin.h"
110

    
111
#include <jansson.h>
112
#include <errno.h>
113
#include <sys/poll.h>
114
#include <sys/time.h>
115

    
116
#ifdef HAVE_LIBCURL
117
#include <curl/curl.h>
118
#endif
119

    
120
#include "../debug.h"
121
#include "../apierror.h"
122
#include "../config.h"
123
#include "../mutex.h"
124
#include "../rtp.h"
125
#include "../rtcp.h"
126
#include "../record.h"
127
#include "../utils.h"
128

    
129

    
130
/* Plugin information */
131
#define JANUS_STREAMING_VERSION                        5
132
#define JANUS_STREAMING_VERSION_STRING        "0.0.5"
133
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
134
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
135
#define JANUS_STREAMING_AUTHOR                        "Meetecho s.r.l."
136
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
137

    
138
/* Plugin methods */
139
janus_plugin *create(void);
140
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
141
void janus_streaming_destroy(void);
142
int janus_streaming_get_api_compatibility(void);
143
int janus_streaming_get_version(void);
144
const char *janus_streaming_get_version_string(void);
145
const char *janus_streaming_get_description(void);
146
const char *janus_streaming_get_name(void);
147
const char *janus_streaming_get_author(void);
148
const char *janus_streaming_get_package(void);
149
void janus_streaming_create_session(janus_plugin_session *handle, int *error);
150
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
151
void janus_streaming_setup_media(janus_plugin_session *handle);
152
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
153
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
154
void janus_streaming_hangup_media(janus_plugin_session *handle);
155
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
156
char *janus_streaming_query_session(janus_plugin_session *handle);
157

    
158
/* Plugin setup */
159
static janus_plugin janus_streaming_plugin =
160
        JANUS_PLUGIN_INIT (
161
                .init = janus_streaming_init,
162
                .destroy = janus_streaming_destroy,
163

    
164
                .get_api_compatibility = janus_streaming_get_api_compatibility,
165
                .get_version = janus_streaming_get_version,
166
                .get_version_string = janus_streaming_get_version_string,
167
                .get_description = janus_streaming_get_description,
168
                .get_name = janus_streaming_get_name,
169
                .get_author = janus_streaming_get_author,
170
                .get_package = janus_streaming_get_package,
171
                
172
                .create_session = janus_streaming_create_session,
173
                .handle_message = janus_streaming_handle_message,
174
                .setup_media = janus_streaming_setup_media,
175
                .incoming_rtp = janus_streaming_incoming_rtp,
176
                .incoming_rtcp = janus_streaming_incoming_rtcp,
177
                .hangup_media = janus_streaming_hangup_media,
178
                .destroy_session = janus_streaming_destroy_session,
179
                .query_session = janus_streaming_query_session,
180
        );
181

    
182
/* Plugin creator */
183
janus_plugin *create(void) {
184
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_STREAMING_NAME);
185
        return &janus_streaming_plugin;
186
}
187

    
188

    
189
/* Useful stuff */
190
static gint initialized = 0, stopping = 0;
191
static janus_callbacks *gateway = NULL;
192
static GThread *handler_thread;
193
static GThread *watchdog;
194
static void *janus_streaming_handler(void *data);
195
static void *janus_streaming_ondemand_thread(void *data);
196
static void *janus_streaming_filesource_thread(void *data);
197
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
198
static void *janus_streaming_relay_thread(void *data);
199

    
200
typedef enum janus_streaming_type {
201
        janus_streaming_type_none = 0,
202
        janus_streaming_type_live,
203
        janus_streaming_type_on_demand,
204
} janus_streaming_type;
205

    
206
typedef enum janus_streaming_source {
207
        janus_streaming_source_none = 0,
208
        janus_streaming_source_file,
209
        janus_streaming_source_rtp,
210
} janus_streaming_source;
211

    
212
typedef struct janus_streaming_rtp_source {
213
        in_addr_t audio_mcast;
214
        gint audio_port;
215
        in_addr_t video_mcast;
216
        gint video_port;
217
        janus_recorder *arc;        /* The Janus recorder instance for this streams's audio, if enabled */
218
        janus_recorder *vrc;        /* The Janus recorder instance for this streams's video, if enabled */
219
        int audio_fd;
220
        int video_fd;
221
#ifdef HAVE_LIBCURL
222
        CURL* curl;
223
#endif
224
} janus_streaming_rtp_source;
225

    
226
typedef struct janus_streaming_file_source {
227
        char *filename;
228
} janus_streaming_file_source;
229

    
230
typedef struct janus_streaming_codecs {
231
        gint audio_pt;
232
        char *audio_rtpmap;
233
        char *audio_fmtp;
234
        gint video_pt;
235
        char *video_rtpmap;
236
        char *video_fmtp;
237
} janus_streaming_codecs;
238

    
239
typedef struct janus_streaming_mountpoint {
240
        gint64 id;
241
        char *name;
242
        char *description;
243
        gboolean is_private;
244
        char *secret;
245
        gboolean enabled;
246
        gboolean active;
247
        janus_streaming_type streaming_type;
248
        janus_streaming_source streaming_source;
249
        void *source;        /* Can differ according to the source type */
250
        GDestroyNotify source_destroy;
251
        janus_streaming_codecs codecs;
252
        GList/*<unowned janus_streaming_session>*/ *listeners;
253
        gint64 destroyed;
254
        janus_mutex mutex;
255
} janus_streaming_mountpoint;
256
GHashTable *mountpoints;
257
janus_mutex mountpoints_mutex;
258

    
259
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp);
260

    
261
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
262
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
263
                uint64_t id, char *name, char *desc,
264
                gboolean doaudio, char* amcast,uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
265
                gboolean dovideo, char* vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp);
266
/* Helper to create a file/ondemand live source */
267
janus_streaming_mountpoint *janus_streaming_create_file_source(
268
                uint64_t id, char *name, char *desc, char *filename,
269
                gboolean live, gboolean doaudio, gboolean dovideo);
270
/* Helper to create a rtsp live source */
271
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
272
                uint64_t id, char *name, char *desc, char *url,
273
                gboolean doaudio, gboolean dovideo);
274

    
275

    
276
typedef struct janus_streaming_message {
277
        janus_plugin_session *handle;
278
        char *transaction;
279
        json_t *message;
280
        char *sdp_type;
281
        char *sdp;
282
} janus_streaming_message;
283
static GAsyncQueue *messages = NULL;
284

    
285
void janus_streaming_message_free(janus_streaming_message *msg);
286
void janus_streaming_message_free(janus_streaming_message *msg) {
287
        if(!msg)
288
                return;
289

    
290
        msg->handle = NULL;
291

    
292
        g_free(msg->transaction);
293
        msg->transaction = NULL;
294
        if(msg->message)
295
                json_decref(msg->message);
296
        msg->message = NULL;
297
        g_free(msg->sdp_type);
298
        msg->sdp_type = NULL;
299
        g_free(msg->sdp);
300
        msg->sdp = NULL;
301

    
302
        g_free(msg);
303
}
304

    
305

    
306
typedef struct janus_streaming_context {
307
        /* Needed to fix seq and ts in case of stream switching */
308
        uint32_t a_last_ssrc, a_last_ts, a_base_ts, a_base_ts_prev,
309
                        v_last_ssrc, v_last_ts, v_base_ts, v_base_ts_prev;
310
        uint16_t a_last_seq, a_base_seq, a_base_seq_prev,
311
                        v_last_seq, v_base_seq, v_base_seq_prev;
312
} janus_streaming_context;
313

    
314
typedef struct janus_streaming_session {
315
        janus_plugin_session *handle;
316
        janus_streaming_mountpoint *mountpoint;
317
        gboolean started;
318
        gboolean paused;
319
        janus_streaming_context context;
320
        gboolean stopping;
321
        guint64 destroyed;        /* Time at which this session was marked as destroyed */
322
} janus_streaming_session;
323
static GHashTable *sessions;
324
static GList *old_sessions;
325
static janus_mutex sessions_mutex;
326

    
327
/* Packets we get from gstreamer and relay */
328
typedef struct janus_streaming_rtp_relay_packet {
329
        rtp_header *data;
330
        gint length;
331
        gint is_video;
332
        uint32_t timestamp;
333
        uint16_t seq_number;
334
} janus_streaming_rtp_relay_packet;
335

    
336

    
337
/* Error codes */
338
#define JANUS_STREAMING_ERROR_NO_MESSAGE                        450
339
#define JANUS_STREAMING_ERROR_INVALID_JSON                        451
340
#define JANUS_STREAMING_ERROR_INVALID_REQUEST                452
341
#define JANUS_STREAMING_ERROR_MISSING_ELEMENT                453
342
#define JANUS_STREAMING_ERROR_INVALID_ELEMENT                454
343
#define JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT        455
344
#define JANUS_STREAMING_ERROR_CANT_CREATE                        456
345
#define JANUS_STREAMING_ERROR_UNAUTHORIZED                        457
346
#define JANUS_STREAMING_ERROR_CANT_SWITCH                        458
347
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR                        470
348

    
349

    
350
/* Streaming watchdog/garbage collector (sort of) */
351
void *janus_streaming_watchdog(void *data);
352
void *janus_streaming_watchdog(void *data) {
353
        JANUS_LOG(LOG_INFO, "Streaming watchdog started\n");
354
        gint64 now = 0;
355
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
356
                janus_mutex_lock(&sessions_mutex);
357
                /* Iterate on all the sessions */
358
                now = janus_get_monotonic_time();
359
                if(old_sessions != NULL) {
360
                        GList *sl = old_sessions;
361
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming sessions...\n", g_list_length(old_sessions));
362
                        while(sl) {
363
                                janus_streaming_session *session = (janus_streaming_session *)sl->data;
364
                                if(!session) {
365
                                        sl = sl->next;
366
                                        continue;
367
                                }
368
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
369
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
370
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming session\n");
371
                                        GList *rm = sl->next;
372
                                        old_sessions = g_list_delete_link(old_sessions, sl);
373
                                        sl = rm;
374
                                        session->handle = NULL;
375
                                        g_free(session);
376
                                        session = NULL;
377
                                        continue;
378
                                }
379
                                sl = sl->next;
380
                        }
381
                }
382
                janus_mutex_unlock(&sessions_mutex);
383
                g_usleep(500000);
384
        }
385
        JANUS_LOG(LOG_INFO, "Streaming watchdog stopped\n");
386
        return NULL;
387
}
388

    
389

    
390
/* Plugin implementation */
391
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
392
#ifdef HAVE_LIBCURL        
393
        curl_global_init(CURL_GLOBAL_ALL);
394
#endif        
395
        if(g_atomic_int_get(&stopping)) {
396
                /* Still stopping from before */
397
                return -1;
398
        }
399
        if(callback == NULL || config_path == NULL) {
400
                /* Invalid arguments */
401
                return -1;
402
        }
403

    
404
        /* Read configuration */
405
        char filename[255];
406
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
407
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
408
        janus_config *config = janus_config_parse(filename);
409
        if(config != NULL)
410
                janus_config_print(config);
411
        
412
        mountpoints = g_hash_table_new_full(NULL, NULL, NULL,
413
                                            (GDestroyNotify) janus_streaming_mountpoint_free);
414
        janus_mutex_init(&mountpoints_mutex);
415
        /* Parse configuration to populate the mountpoints */
416
        if(config != NULL) {
417
                janus_config_category *cat = janus_config_get_categories(config);
418
                while(cat != NULL) {
419
                        if(cat->name == NULL) {
420
                                cat = cat->next;
421
                                continue;
422
                        }
423
                        JANUS_LOG(LOG_VERB, "Adding stream '%s'\n", cat->name);
424
                        janus_config_item *type = janus_config_get_item(cat, "type");
425
                        if(type == NULL || type->value == NULL) {
426
                                JANUS_LOG(LOG_VERB, "  -- Invalid type, skipping stream...\n");
427
                                cat = cat->next;
428
                                continue;
429
                        }
430
                        if(!strcasecmp(type->value, "rtp")) {
431
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
432
                                janus_config_item *id = janus_config_get_item(cat, "id");
433
                                janus_config_item *desc = janus_config_get_item(cat, "description");
434
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
435
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
436
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
437
                                janus_config_item *video = janus_config_get_item(cat, "video");
438
                                janus_config_item *amcast = janus_config_get_item(cat, "audiomcast");
439
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
440
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
441
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
442
                                janus_config_item *afmtp = janus_config_get_item(cat, "audiofmtp");
443
                                janus_config_item *vmcast = janus_config_get_item(cat, "videomcast");
444
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
445
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
446
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
447
                                janus_config_item *vfmtp = janus_config_get_item(cat, "videofmtp");
448
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
449
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
450
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
451
                                if(!doaudio && !dovideo) {
452
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', no audio or video have to be streamed...\n", cat->name);
453
                                        cat = cat->next;
454
                                        continue;
455
                                }
456
                                if(doaudio &&
457
                                                (aport == NULL || aport->value == NULL ||
458
                                                acodec == NULL || acodec->value == NULL ||
459
                                                artpmap == NULL || artpmap->value == NULL)) {
460
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for audio...\n", cat->name);
461
                                        cat = cat->next;
462
                                        continue;
463
                                }
464
                                if(dovideo &&
465
                                                (vport == NULL || vport->value == NULL ||
466
                                                vcodec == NULL || vcodec->value == NULL ||
467
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
468
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for video...\n", cat->name);
469
                                        cat = cat->next;
470
                                        continue;
471
                                }
472
                                if(id == NULL || id->value == NULL) {
473
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
474
                                } else {
475
                                        janus_mutex_lock(&mountpoints_mutex);
476
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
477
                                        janus_mutex_unlock(&mountpoints_mutex);
478
                                        if(mp != NULL) {
479
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
480
                                                cat = cat->next;
481
                                                continue;
482
                                        }
483
                                }
484
                                JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
485
                                janus_streaming_mountpoint *mp = NULL;
486
                                if((mp = janus_streaming_create_rtp_source(
487
                                                (id && id->value) ? atoi(id->value) : 0,
488
                                                (char *)cat->name,
489
                                                desc ? (char *)desc->value : NULL,
490
                                                doaudio,
491
                                                amcast ? (char *)amcast->value : NULL,
492
                                                (aport && aport->value) ? atoi(aport->value) : 0,
493
                                                (acodec && acodec->value) ? atoi(acodec->value) : 0,
494
                                                artpmap ? (char *)artpmap->value : NULL,
495
                                                afmtp ? (char *)afmtp->value : NULL,
496
                                                dovideo,
497
                                                vmcast ? (char *)vmcast->value : NULL,
498
                                                (vport && vport->value) ? atoi(vport->value) : 0,
499
                                                (vcodec && vcodec->value) ? atoi(vcodec->value) : 0,
500
                                                vrtpmap ? (char *)vrtpmap->value : NULL,
501
                                                vfmtp ? (char *)vfmtp->value : NULL)) == NULL) {
502
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream '%s'...\n", cat->name);
503
                                        cat = cat->next;
504
                                        continue;
505
                                }
506
                                mp->is_private = is_private;
507
                                if(secret && secret->value)
508
                                        mp->secret = g_strdup(secret->value);
509
                        } else if(!strcasecmp(type->value, "live")) {
510
                                /* File live source */
511
                                janus_config_item *id = janus_config_get_item(cat, "id");
512
                                janus_config_item *desc = janus_config_get_item(cat, "description");
513
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
514
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
515
                                janus_config_item *file = janus_config_get_item(cat, "filename");
516
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
517
                                janus_config_item *video = janus_config_get_item(cat, "video");
518
                                if(file == NULL || file->value == NULL) {
519
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', missing mandatory information...\n", cat->name);
520
                                        cat = cat->next;
521
                                        continue;
522
                                }
523
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
524
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
525
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
526
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
527
                                if(!doaudio || dovideo) {
528
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', we only support audio file streaming right now...\n", cat->name);
529
                                        cat = cat->next;
530
                                        continue;
531
                                }
532
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
533
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
534
                                        cat = cat->next;
535
                                        continue;
536
                                }
537
                                if(id == NULL || id->value == NULL) {
538
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
539
                                } else {
540
                                        janus_mutex_lock(&mountpoints_mutex);
541
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
542
                                        janus_mutex_unlock(&mountpoints_mutex);
543
                                        if(mp != NULL) {
544
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
545
                                                cat = cat->next;
546
                                                continue;
547
                                        }
548
                                }
549
                                janus_streaming_mountpoint *mp = NULL;
550
                                if((mp = janus_streaming_create_file_source(
551
                                                (id && id->value) ? atoi(id->value) : 0,
552
                                                (char *)cat->name,
553
                                                desc ? (char *)desc->value : NULL,
554
                                                (char *)file->value,
555
                                                TRUE, doaudio, dovideo)) == NULL) {
556
                                        JANUS_LOG(LOG_ERR, "Error creating 'live' stream '%s'...\n", cat->name);
557
                                        cat = cat->next;
558
                                        continue;
559
                                }
560
                                mp->is_private = is_private;
561
                                if(secret && secret->value)
562
                                        mp->secret = g_strdup(secret->value);
563
                        } else if(!strcasecmp(type->value, "ondemand")) {
564
                                /* mu-Law file on demand source */
565
                                janus_config_item *id = janus_config_get_item(cat, "id");
566
                                janus_config_item *desc = janus_config_get_item(cat, "description");
567
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
568
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
569
                                janus_config_item *file = janus_config_get_item(cat, "filename");
570
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
571
                                janus_config_item *video = janus_config_get_item(cat, "video");
572
                                if(file == NULL || file->value == NULL) {
573
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', missing mandatory information...\n", cat->name);
574
                                        cat = cat->next;
575
                                        continue;
576
                                }
577
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
578
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
579
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
580
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
581
                                if(!doaudio || dovideo) {
582
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', we only support audio file streaming right now...\n", cat->name);
583
                                        cat = cat->next;
584
                                        continue;
585
                                }
586
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
587
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
588
                                        cat = cat->next;
589
                                        continue;
590
                                }
591
                                if(id == NULL || id->value == NULL) {
592
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
593
                                } else {
594
                                        janus_mutex_lock(&mountpoints_mutex);
595
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
596
                                        janus_mutex_unlock(&mountpoints_mutex);
597
                                        if(mp != NULL) {
598
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
599
                                                cat = cat->next;
600
                                                continue;
601
                                        }
602
                                }
603
                                janus_streaming_mountpoint *mp = NULL;
604
                                if((mp = janus_streaming_create_file_source(
605
                                                (id && id->value) ? atoi(id->value) : 0,
606
                                                (char *)cat->name,
607
                                                desc ? (char *)desc->value : NULL,
608
                                                (char *)file->value,
609
                                                FALSE, doaudio, dovideo)) == NULL) {
610
                                        JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream '%s'...\n", cat->name);
611
                                        cat = cat->next;
612
                                        continue;
613
                                }
614
                                mp->is_private = is_private;
615
                                if(secret && secret->value)
616
                                        mp->secret = g_strdup(secret->value);
617
                        } else if(!strcasecmp(type->value, "rtsp")) {
618
#ifndef HAVE_LIBCURL
619
                                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', libcurl support not compiled...\n", cat->name);
620
                                cat = cat->next;
621
                                continue;
622
#else
623
                                janus_config_item *id = janus_config_get_item(cat, "id");
624
                                janus_config_item *desc = janus_config_get_item(cat, "description");
625
                                janus_config_item *file = janus_config_get_item(cat, "url");
626
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
627
                                janus_config_item *video = janus_config_get_item(cat, "video");
628
                                if(file == NULL || file->value == NULL) {
629
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', missing mandatory information...\n", cat->name);
630
                                        cat = cat->next;
631
                                        continue;
632
                                }
633
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
634
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
635
                                if(id == NULL || id->value == NULL) {
636
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
637
                                } else {
638
                                        janus_mutex_lock(&mountpoints_mutex);
639
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
640
                                        janus_mutex_unlock(&mountpoints_mutex);
641
                                        if(mp != NULL) {
642
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
643
                                                cat = cat->next;
644
                                                continue;
645
                                        }
646
                                }
647
                                janus_streaming_mountpoint *mp = NULL;
648
                                if((mp = janus_streaming_create_rtsp_source(
649
                                                (id && id->value) ? atoi(id->value) : 0,
650
                                                (char *)cat->name,
651
                                                desc ? (char *)desc->value : NULL,
652
                                                (char *)file->value, doaudio, dovideo)) == NULL) {
653
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream '%s'...\n", cat->name);
654
                                        cat = cat->next;
655
                                        continue;
656
                                }
657
                                
658
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
659
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
660
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
661
                                mp->is_private = is_private;
662
                                if(secret && secret->value)
663
                                        mp->secret = g_strdup(secret->value);
664
#endif
665
                        } else {
666
                                JANUS_LOG(LOG_WARN, "Ignoring unknown stream type '%s' (%s)...\n", type->value, cat->name);
667
                        }
668
                        cat = cat->next;
669
                }
670
                /* Done */
671
                janus_config_destroy(config);
672
                config = NULL;
673
        }
674
        /* Show available mountpoints */
675
        janus_mutex_lock(&mountpoints_mutex);
676
        GHashTableIter iter;
677
        gpointer value;
678
        g_hash_table_iter_init(&iter, mountpoints);
679
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
680
                janus_streaming_mountpoint *mp = value;
681
                JANUS_LOG(LOG_VERB, "  ::: [%"SCNu64"][%s] %s (%s, %s, %s)\n", mp->id, mp->name, mp->description,
682
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
683
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source",
684
                        mp->is_private ? "private" : "public");
685
        }
686
        janus_mutex_unlock(&mountpoints_mutex);
687

    
688
        sessions = g_hash_table_new(NULL, NULL);
689
        janus_mutex_init(&sessions_mutex);
690
        messages = g_async_queue_new_full((GDestroyNotify) janus_streaming_message_free);
691
        /* This is the callback we'll need to invoke to contact the gateway */
692
        gateway = callback;
693
        g_atomic_int_set(&initialized, 1);
694

    
695
        GError *error = NULL;
696
        /* Start the sessions watchdog */
697
        watchdog = g_thread_try_new("streaming watchdog", &janus_streaming_watchdog, NULL, &error);
698
        if(!watchdog) {
699
                g_atomic_int_set(&initialized, 0);
700
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming watchdog thread...\n", error->code, error->message ? error->message : "??");
701
                return -1;
702
        }
703
        /* Launch the thread that will handle incoming messages */
704
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
705
        if(error != NULL) {
706
                g_atomic_int_set(&initialized, 0);
707
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming handler thread...\n", error->code, error->message ? error->message : "??");
708
                return -1;
709
        }
710
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_STREAMING_NAME);
711
        return 0;
712
}
713

    
714
void janus_streaming_destroy(void) {
715
        if(!g_atomic_int_get(&initialized))
716
                return;
717
        g_atomic_int_set(&stopping, 1);
718

    
719
        if(handler_thread != NULL) {
720
                g_thread_join(handler_thread);
721
                handler_thread = NULL;
722
        }
723
        if(watchdog != NULL) {
724
                g_thread_join(watchdog);
725
                watchdog = NULL;
726
        }
727

    
728
        /* FIXME We should destroy the sessions cleanly */
729
        usleep(500000);
730
        janus_mutex_lock(&mountpoints_mutex);
731
        g_hash_table_destroy(mountpoints);
732
        janus_mutex_unlock(&mountpoints_mutex);
733
        janus_mutex_lock(&sessions_mutex);
734
        g_hash_table_destroy(sessions);
735
        janus_mutex_unlock(&sessions_mutex);
736
        g_async_queue_unref(messages);
737
        messages = NULL;
738
        sessions = NULL;
739

    
740
        g_atomic_int_set(&initialized, 0);
741
        g_atomic_int_set(&stopping, 0);
742
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_STREAMING_NAME);
743
}
744

    
745
int janus_streaming_get_api_compatibility(void) {
746
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
747
        return JANUS_PLUGIN_API_VERSION;
748
}
749

    
750
int janus_streaming_get_version(void) {
751
        return JANUS_STREAMING_VERSION;
752
}
753

    
754
const char *janus_streaming_get_version_string(void) {
755
        return JANUS_STREAMING_VERSION_STRING;
756
}
757

    
758
const char *janus_streaming_get_description(void) {
759
        return JANUS_STREAMING_DESCRIPTION;
760
}
761

    
762
const char *janus_streaming_get_name(void) {
763
        return JANUS_STREAMING_NAME;
764
}
765

    
766
const char *janus_streaming_get_author(void) {
767
        return JANUS_STREAMING_AUTHOR;
768
}
769

    
770
const char *janus_streaming_get_package(void) {
771
        return JANUS_STREAMING_PACKAGE;
772
}
773

    
774
void janus_streaming_create_session(janus_plugin_session *handle, int *error) {
775
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
776
                *error = -1;
777
                return;
778
        }        
779
        janus_streaming_session *session = (janus_streaming_session *)calloc(1, sizeof(janus_streaming_session));
780
        if(session == NULL) {
781
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
782
                *error = -2;
783
                return;
784
        }
785
        session->handle = handle;
786
        session->mountpoint = NULL;        /* This will happen later */
787
        session->started = FALSE;        /* This will happen later */
788
        session->paused = FALSE;
789
        session->destroyed = 0;
790
        handle->plugin_handle = session;
791
        janus_mutex_lock(&sessions_mutex);
792
        g_hash_table_insert(sessions, handle, session);
793
        janus_mutex_unlock(&sessions_mutex);
794

    
795
        return;
796
}
797

    
798
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error) {
799
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
800
                *error = -1;
801
                return;
802
        }        
803
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
804
        if(!session) {
805
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
806
                *error = -2;
807
                return;
808
        }
809
        JANUS_LOG(LOG_VERB, "Removing streaming session...\n");
810
        if(session->mountpoint) {
811
                janus_mutex_lock(&session->mountpoint->mutex);
812
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
813
                janus_mutex_unlock(&session->mountpoint->mutex);
814
        }
815
        janus_mutex_lock(&sessions_mutex);
816
        if(!session->destroyed) {
817
                session->destroyed = janus_get_monotonic_time();
818
                g_hash_table_remove(sessions, handle);
819
                /* Cleaning up and removing the session is done in a lazy way */
820
                old_sessions = g_list_append(old_sessions, session);
821
        }
822
        janus_mutex_unlock(&sessions_mutex);
823
        return;
824
}
825

    
826
char *janus_streaming_query_session(janus_plugin_session *handle) {
827
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
828
                return NULL;
829
        }        
830
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
831
        if(!session) {
832
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
833
                return NULL;
834
        }
835
        /* What is this user watching, if anything? */
836
        json_t *info = json_object();
837
        json_object_set_new(info, "state", json_string(session->mountpoint ? "watching" : "idle"));
838
        if(session->mountpoint) {
839
                json_object_set_new(info, "mountpoint_id", json_integer(session->mountpoint->id));
840
                json_object_set_new(info, "mountpoint_name", session->mountpoint->name ? json_string(session->mountpoint->name) : NULL);
841
        }
842
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
843
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
844
        json_decref(info);
845
        return info_text;
846
}
847

    
848
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
849
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
850
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
851

    
852
        /* Pre-parse the message */
853
        int error_code = 0;
854
        char error_cause[512];
855
        json_t *root = NULL;
856
        json_t *response = NULL;
857

    
858
        if(message == NULL) {
859
                JANUS_LOG(LOG_ERR, "No message??\n");
860
                error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
861
                g_snprintf(error_cause, 512, "%s", "No message??");
862
                goto error;
863
        }
864
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
865

    
866
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
867
        if(!session) {
868
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
869
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
870
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
871
                goto error;
872
        }
873
        if(session->destroyed) {
874
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
875
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
876
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
877
                goto error;
878
        }
879
        json_error_t error;
880
        root = json_loads(message, 0, &error);
881
        if(!root) {
882
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
883
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
884
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
885
                goto error;
886
        }
887
        if(!json_is_object(root)) {
888
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
889
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
890
                g_snprintf(error_cause, 512, "JSON error: not an object");
891
                goto error;
892
        }
893
        /* Get the request first */
894
        json_t *request = json_object_get(root, "request");
895
        if(!request) {
896
                JANUS_LOG(LOG_ERR, "Missing element (request)\n");
897
                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
898
                g_snprintf(error_cause, 512, "Missing element (request)");
899
                goto error;
900
        }
901
        if(!json_is_string(request)) {
902
                JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
903
                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
904
                g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
905
                goto error;
906
        }
907
        /* Some requests ('create' and 'destroy') can be handled synchronously */
908
        const char *request_text = json_string_value(request);
909
        if(!strcasecmp(request_text, "list")) {
910
                json_t *list = json_array();
911
                JANUS_LOG(LOG_VERB, "Request for the list of mountpoints\n");
912
                /* Return a list of all available mountpoints */
913
                janus_mutex_lock(&mountpoints_mutex);
914
                GHashTableIter iter;
915
                gpointer value;
916
                g_hash_table_iter_init(&iter, mountpoints);
917
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
918
                        janus_streaming_mountpoint *mp = value;
919
                        if(mp->is_private) {
920
                                /* Skip private stream */
921
                                JANUS_LOG(LOG_VERB, "Skipping private mountpoint '%s'\n", mp->description);
922
                                continue;
923
                        }
924
                        json_t *ml = json_object();
925
                        json_object_set_new(ml, "id", json_integer(mp->id));
926
                        json_object_set_new(ml, "description", json_string(mp->description));
927
                        json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
928
                        json_array_append_new(list, ml);
929
                }
930
                janus_mutex_unlock(&mountpoints_mutex);
931
                /* Send info back */
932
                response = json_object();
933
                json_object_set_new(response, "streaming", json_string("list"));
934
                json_object_set_new(response, "list", list);
935
                goto plugin_response;
936
        } else if(!strcasecmp(request_text, "create")) {
937
                /* Create a new stream */
938
                json_t *type = json_object_get(root, "type");
939
                if(!type) {
940
                        JANUS_LOG(LOG_ERR, "Missing element (type)\n");
941
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
942
                        g_snprintf(error_cause, 512, "Missing element (type)");
943
                        goto error;
944
                }
945
                if(!json_is_string(type)) {
946
                        JANUS_LOG(LOG_ERR, "Invalid element (type should be a string)\n");
947
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
948
                        g_snprintf(error_cause, 512, "Invalid element (type should be a string)");
949
                        goto error;
950
                }
951
                const char *type_text = json_string_value(type);
952
                json_t *secret = json_object_get(root, "secret");
953
                if(secret && !json_is_string(secret)) {
954
                        JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
955
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
956
                        g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
957
                        goto error;
958
                }
959
                janus_streaming_mountpoint *mp = NULL;
960
                if(!strcasecmp(type_text, "rtp")) {
961
                        /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
962
                        json_t *id = json_object_get(root, "id");
963
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
964
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
965
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
966
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
967
                                goto error;
968
                        }
969
                        json_t *name = json_object_get(root, "name");
970
                        if(name && !json_is_string(name)) {
971
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
972
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
973
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
974
                                goto error;
975
                        }
976
                        json_t *desc = json_object_get(root, "description");
977
                        if(desc && !json_is_string(desc)) {
978
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
979
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
980
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
981
                                goto error;
982
                        }
983
                        json_t *is_private = json_object_get(root, "is_private");
984
                        if(is_private && !json_is_boolean(is_private)) {
985
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
986
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
987
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
988
                                goto error;
989
                        }
990
                        json_t *audio = json_object_get(root, "audio");
991
                        if(audio && !json_is_boolean(audio)) {
992
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
993
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
994
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
995
                                goto error;
996
                        }
997
                        json_t *video = json_object_get(root, "video");
998
                        if(video && !json_is_boolean(video)) {
999
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1000
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1001
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1002
                                goto error;
1003
                        }
1004
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1005
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1006
                        if(!doaudio && !dovideo) {
1007
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1008
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1009
                                g_snprintf(error_cause, 512, "Can't add 'rtp' stream, no audio or video have to be streamed...");
1010
                                goto error;
1011
                        }
1012
                        uint16_t aport = 0;
1013
                        uint8_t acodec = 0;
1014
                        char *artpmap = NULL, *afmtp = NULL, *amcast = NULL;
1015
                        if(doaudio) {
1016
                                json_t *audiomcast = json_object_get(root, "audiomcast");
1017
                                if(audiomcast && !json_is_string(audiomcast)) {
1018
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiomcast should be a string)\n");
1019
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1020
                                        g_snprintf(error_cause, 512, "Invalid element (audiomcast should be a string)");
1021
                                        goto error;
1022
                                }
1023
                                amcast = (char *)json_string_value(audiomcast);                                
1024
                                json_t *audioport = json_object_get(root, "audioport");
1025
                                if(!audioport) {
1026
                                        JANUS_LOG(LOG_ERR, "Missing element (audioport)\n");
1027
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1028
                                        g_snprintf(error_cause, 512, "Missing element (audioport)");
1029
                                        goto error;
1030
                                }
1031
                                if(!json_is_integer(audioport) || json_integer_value(audioport) < 0) {
1032
                                        JANUS_LOG(LOG_ERR, "Invalid element (audioport should be a positive integer)\n");
1033
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1034
                                        g_snprintf(error_cause, 512, "Invalid element (audioport should be a positive integer)");
1035
                                        goto error;
1036
                                }
1037
                                aport = json_integer_value(audioport);
1038
                                json_t *audiopt = json_object_get(root, "audiopt");
1039
                                if(!audiopt) {
1040
                                        JANUS_LOG(LOG_ERR, "Missing element (audiopt)\n");
1041
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1042
                                        g_snprintf(error_cause, 512, "Missing element (audiopt)");
1043
                                        goto error;
1044
                                }
1045
                                if(!json_is_integer(audiopt) || json_integer_value(audiopt) < 0) {
1046
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiopt should be a positive integer)\n");
1047
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1048
                                        g_snprintf(error_cause, 512, "Invalid element (audiopt should be a positive integer)");
1049
                                        goto error;
1050
                                }
1051
                                acodec = json_integer_value(audiopt);
1052
                                json_t *audiortpmap = json_object_get(root, "audiortpmap");
1053
                                if(!audiortpmap) {
1054
                                        JANUS_LOG(LOG_ERR, "Missing element (audiortpmap)\n");
1055
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1056
                                        g_snprintf(error_cause, 512, "Missing element (audiortpmap)");
1057
                                        goto error;
1058
                                }
1059
                                if(!json_is_string(audiortpmap)) {
1060
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiortpmap should be a string)\n");
1061
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1062
                                        g_snprintf(error_cause, 512, "Invalid element (audiortpmap should be a string)");
1063
                                        goto error;
1064
                                }
1065
                                artpmap = (char *)json_string_value(audiortpmap);
1066
                                json_t *audiofmtp = json_object_get(root, "audiofmtp");
1067
                                if(audiofmtp && !json_is_string(audiofmtp)) {
1068
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiofmtp should be a string)\n");
1069
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1070
                                        g_snprintf(error_cause, 512, "Invalid element (audiofmtp should be a string)");
1071
                                        goto error;
1072
                                }
1073
                                afmtp = (char *)json_string_value(audiofmtp);
1074
                        }
1075
                        uint16_t vport = 0;
1076
                        uint8_t vcodec = 0;
1077
                        char *vrtpmap = NULL, *vfmtp = NULL, *vmcast = NULL;
1078
                        if(dovideo) {
1079
                                json_t *videomcast = json_object_get(root, "videomcast");
1080
                                if(videomcast && !json_is_string(videomcast)) {
1081
                                        JANUS_LOG(LOG_ERR, "Invalid element (videomcast should be a string)\n");
1082
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1083
                                        g_snprintf(error_cause, 512, "Invalid element (videomcast should be a string)");
1084
                                        goto error;
1085
                                }
1086
                                vmcast = (char *)json_string_value(videomcast);
1087
                                json_t *videoport = json_object_get(root, "videoport");
1088
                                if(!videoport) {
1089
                                        JANUS_LOG(LOG_ERR, "Missing element (videoport)\n");
1090
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1091
                                        g_snprintf(error_cause, 512, "Missing element (videoport)");
1092
                                        goto error;
1093
                                }
1094
                                if(!json_is_integer(videoport) || json_integer_value(videoport) < 0) {
1095
                                        JANUS_LOG(LOG_ERR, "Invalid element (videoport should be a positive integer)\n");
1096
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1097
                                        g_snprintf(error_cause, 512, "Invalid element (videoport should be a positive integer)");
1098
                                        goto error;
1099
                                }
1100
                                vport = json_integer_value(videoport);
1101
                                json_t *videopt = json_object_get(root, "videopt");
1102
                                if(!videopt) {
1103
                                        JANUS_LOG(LOG_ERR, "Missing element (videopt)\n");
1104
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1105
                                        g_snprintf(error_cause, 512, "Missing element (videopt)");
1106
                                        goto error;
1107
                                }
1108
                                if(!json_is_integer(videopt) || json_integer_value(videopt) < 0) {
1109
                                        JANUS_LOG(LOG_ERR, "Invalid element (videopt should be a positive integer)\n");
1110
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1111
                                        g_snprintf(error_cause, 512, "Invalid element (videopt should be a positive integer)");
1112
                                        goto error;
1113
                                }
1114
                                vcodec = json_integer_value(videopt);
1115
                                json_t *videortpmap = json_object_get(root, "videortpmap");
1116
                                if(!videortpmap) {
1117
                                        JANUS_LOG(LOG_ERR, "Missing element (videortpmap)\n");
1118
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1119
                                        g_snprintf(error_cause, 512, "Missing element (videortpmap)");
1120
                                        goto error;
1121
                                }
1122
                                if(!json_is_string(videortpmap)) {
1123
                                        JANUS_LOG(LOG_ERR, "Invalid element (videortpmap should be a string)\n");
1124
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1125
                                        g_snprintf(error_cause, 512, "Invalid element (videortpmap should be a string)");
1126
                                        goto error;
1127
                                }
1128
                                vrtpmap = (char *)json_string_value(videortpmap);
1129
                                json_t *videofmtp = json_object_get(root, "videofmtp");
1130
                                if(videofmtp && !json_is_string(videofmtp)) {
1131
                                        JANUS_LOG(LOG_ERR, "Invalid element (videofmtp should be a string)\n");
1132
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1133
                                        g_snprintf(error_cause, 512, "Invalid element (videofmtp should be a string)");
1134
                                        goto error;
1135
                                }
1136
                                vfmtp = (char *)json_string_value(videofmtp);
1137
                        }
1138
                        if(id == NULL) {
1139
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1140
                        } else {
1141
                                janus_mutex_lock(&mountpoints_mutex);
1142
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1143
                                janus_mutex_unlock(&mountpoints_mutex);
1144
                                if(mp != NULL) {
1145
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1146
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1147
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1148
                                        goto error;
1149
                                }
1150
                        }
1151
                        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
1152
                        mp = janus_streaming_create_rtp_source(
1153
                                        id ? json_integer_value(id) : 0,
1154
                                        name ? (char *)json_string_value(name) : NULL,
1155
                                        desc ? (char *)json_string_value(desc) : NULL,
1156
                                        doaudio, amcast, aport, acodec, artpmap, afmtp,
1157
                                        dovideo, vmcast, vport, vcodec, vrtpmap, vfmtp);
1158
                        if(mp == NULL) {
1159
                                JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream...\n");
1160
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1161
                                g_snprintf(error_cause, 512, "Error creating 'rtp' stream");
1162
                                goto error;
1163
                        }
1164
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1165
                } else if(!strcasecmp(type_text, "live")) {
1166
                        /* File live source */
1167
                        json_t *id = json_object_get(root, "id");
1168
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1169
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1170
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1171
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1172
                                goto error;
1173
                        }
1174
                        json_t *name = json_object_get(root, "name");
1175
                        if(name && !json_is_string(name)) {
1176
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1177
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1178
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1179
                                goto error;
1180
                        }
1181
                        json_t *desc = json_object_get(root, "description");
1182
                        if(desc && !json_is_string(desc)) {
1183
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1184
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1185
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1186
                                goto error;
1187
                        }
1188
                        json_t *is_private = json_object_get(root, "is_private");
1189
                        if(is_private && !json_is_boolean(is_private)) {
1190
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1191
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1192
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1193
                                goto error;
1194
                        }
1195
                        json_t *file = json_object_get(root, "file");
1196
                        if(file && !json_is_string(file)) {
1197
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1198
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1199
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1200
                                goto error;
1201
                        }
1202
                        json_t *audio = json_object_get(root, "audio");
1203
                        if(audio && !json_is_boolean(audio)) {
1204
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1205
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1206
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1207
                                goto error;
1208
                        }
1209
                        json_t *video = json_object_get(root, "video");
1210
                        if(video && !json_is_boolean(video)) {
1211
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1212
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1213
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1214
                                goto error;
1215
                        }
1216
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1217
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1218
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1219
                        if(!doaudio || dovideo) {
1220
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n");
1221
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1222
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, we only support audio file streaming right now...");
1223
                                goto error;
1224
                        }
1225
                        char *filename = (char *)json_string_value(file);
1226
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1227
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1228
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1229
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1230
                                goto error;
1231
                        }
1232
                        if(id == NULL) {
1233
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1234
                        } else {
1235
                                janus_mutex_lock(&mountpoints_mutex);
1236
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1237
                                janus_mutex_unlock(&mountpoints_mutex);
1238
                                if(mp != NULL) {
1239
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1240
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1241
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1242
                                        goto error;
1243
                                }
1244
                        }
1245
                        mp = janus_streaming_create_file_source(
1246
                                        id ? json_integer_value(id) : 0,
1247
                                        name ? (char *)json_string_value(name) : NULL,
1248
                                        desc ? (char *)json_string_value(desc) : NULL,
1249
                                        filename,
1250
                                        TRUE, doaudio, dovideo);
1251
                        if(mp == NULL) {
1252
                                JANUS_LOG(LOG_ERR, "Error creating 'live' stream...\n");
1253
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1254
                                g_snprintf(error_cause, 512, "Error creating 'live' stream");
1255
                                goto error;
1256
                        }
1257
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1258
                } else if(!strcasecmp(type_text, "ondemand")) {
1259
                        /* mu-Law file on demand source */
1260
                        json_t *id = json_object_get(root, "id");
1261
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1262
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1263
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1264
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1265
                                goto error;
1266
                        }
1267
                        json_t *name = json_object_get(root, "name");
1268
                        if(name && !json_is_string(name)) {
1269
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1270
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1271
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1272
                                goto error;
1273
                        }
1274
                        json_t *desc = json_object_get(root, "description");
1275
                        if(desc && !json_is_string(desc)) {
1276
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1277
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1278
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1279
                                goto error;
1280
                        }
1281
                        json_t *is_private = json_object_get(root, "is_private");
1282
                        if(is_private && !json_is_boolean(is_private)) {
1283
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1284
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1285
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1286
                                goto error;
1287
                        }
1288
                        json_t *file = json_object_get(root, "file");
1289
                        if(file && !json_is_string(file)) {
1290
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1291
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1292
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1293
                                goto error;
1294
                        }
1295
                        json_t *audio = json_object_get(root, "audio");
1296
                        if(audio && !json_is_boolean(audio)) {
1297
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1298
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1299
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1300
                                goto error;
1301
                        }
1302
                        json_t *video = json_object_get(root, "video");
1303
                        if(video && !json_is_boolean(video)) {
1304
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1305
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1306
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1307
                                goto error;
1308
                        }
1309
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1310
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1311
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1312
                        if(!doaudio || dovideo) {
1313
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
1314
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1315
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, we only support audio file streaming right now...");
1316
                                goto error;
1317
                        }
1318
                        char *filename = (char *)json_string_value(file);
1319
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1320
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1321
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1322
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1323
                                goto error;
1324
                        }
1325
                        if(id == NULL) {
1326
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1327
                        } else {
1328
                                janus_mutex_lock(&mountpoints_mutex);
1329
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1330
                                janus_mutex_unlock(&mountpoints_mutex);
1331
                                if(mp != NULL) {
1332
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1333
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1334
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1335
                                        goto error;
1336
                                }
1337
                        }
1338
                        mp = janus_streaming_create_file_source(
1339
                                        id ? json_integer_value(id) : 0,
1340
                                        name ? (char *)json_string_value(name) : NULL,
1341
                                        desc ? (char *)json_string_value(desc) : NULL,
1342
                                        filename,
1343
                                        FALSE, doaudio, dovideo);
1344
                        if(mp == NULL) {
1345
                                JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream...\n");
1346
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1347
                                g_snprintf(error_cause, 512, "Error creating 'ondemand' stream");
1348
                                goto error;
1349
                        }
1350
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1351
                } else if(!strcasecmp(type_text, "rtsp")) {
1352
#ifndef HAVE_LIBCURL
1353
                        JANUS_LOG(LOG_ERR, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1354
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1355
                        g_snprintf(error_cause, 512, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1356
                        goto error;
1357
#else
1358
                        /* RTSP source*/
1359
                        json_t *id = json_object_get(root, "id");
1360
                        if(id && (!json_is_integer(id) || json_integer_value(id) < 0)) {
1361
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1362
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1363
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1364
                                goto error;
1365
                        }
1366
                        json_t *name = json_object_get(root, "name");
1367
                        if(name && !json_is_string(name)) {
1368
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1369
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1370
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1371
                                goto error;
1372
                        }
1373
                        json_t *desc = json_object_get(root, "description");
1374
                        if(desc && !json_is_string(desc)) {
1375
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1376
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1377
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1378
                                goto error;
1379
                        }
1380
                        json_t *is_private = json_object_get(root, "is_private");
1381
                        if(is_private && !json_is_boolean(is_private)) {
1382
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1383
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1384
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1385
                                goto error;
1386
                        }
1387
                        json_t *audio = json_object_get(root, "audio");
1388
                        if(audio && !json_is_boolean(audio)) {
1389
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1390
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1391
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1392
                                goto error;
1393
                        }
1394
                        json_t *video = json_object_get(root, "video");
1395
                        if(video && !json_is_boolean(video)) {
1396
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1397
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1398
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1399
                                goto error;
1400
                        }
1401
                        json_t *url = json_object_get(root, "url");
1402
                        if(url && !json_is_string(url)) {
1403
                                JANUS_LOG(LOG_ERR, "Invalid element (url should be a string)\n");
1404
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1405
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1406
                                goto error;
1407
                        }                        
1408
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1409
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1410
                        if(!doaudio && !dovideo) {
1411
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1412
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1413
                                g_snprintf(error_cause, 512, "Can't add 'rtsp' stream, no audio or video have to be streamed...");
1414
                                goto error;
1415
                        }
1416
                        mp = janus_streaming_create_rtsp_source(
1417
                                        id ? json_integer_value(id) : 0,
1418
                                        name ? (char *)json_string_value(name) : NULL,
1419
                                        desc ? (char *)json_string_value(desc) : NULL,
1420
                                        (char *)json_string_value(url),
1421
                                        doaudio, dovideo);
1422
                        if(mp == NULL) {
1423
                                JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream...\n");
1424
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1425
                                g_snprintf(error_cause, 512, "Error creating 'RTSP' stream");
1426
                                goto error;
1427
                        }
1428
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;                        
1429
#endif
1430
                } else {
1431
                        JANUS_LOG(LOG_ERR, "Unknown stream type '%s'...\n", type_text);
1432
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1433
                        g_snprintf(error_cause, 512, "Unknown stream type '%s'...\n", type_text);
1434
                        goto error;
1435
                }
1436
                /* Any secret? */
1437
                if(secret)
1438
                        mp->secret = g_strdup(json_string_value(secret));
1439
                /* Send info back */
1440
                response = json_object();
1441
                json_object_set_new(response, "streaming", json_string("created"));
1442
                json_object_set_new(response, "created", json_string(mp->name));
1443
                json_t *ml = json_object();
1444
                json_object_set_new(ml, "id", json_integer(mp->id));
1445
                json_object_set_new(ml, "description", json_string(mp->description));
1446
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1447
                json_object_set_new(ml, "is_private", json_string(mp->is_private ? "true" : "false"));
1448
                json_object_set_new(response, "stream", ml);
1449
                goto plugin_response;
1450
        } else if(!strcasecmp(request_text, "destroy")) {
1451
                /* Get rid of an existing stream (notice this doesn't remove it from the config file, though) */
1452
                json_t *id = json_object_get(root, "id");
1453
                if(!id) {
1454
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1455
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1456
                        g_snprintf(error_cause, 512, "Missing element (id)");
1457
                        goto error;
1458
                }
1459
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1460
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1461
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1462
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1463
                        goto error;
1464
                }
1465
                gint64 id_value = json_integer_value(id);
1466
                janus_mutex_lock(&mountpoints_mutex);
1467
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1468
                if(mp == NULL) {
1469
                        janus_mutex_unlock(&mountpoints_mutex);
1470
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1471
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1472
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1473
                        goto error;
1474
                }
1475
                if(mp->secret) {
1476
                        /* This action requires an authorized user */
1477
                        json_t *secret = json_object_get(root, "secret");
1478
                        if(!secret) {
1479
                                janus_mutex_unlock(&mountpoints_mutex);
1480
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1481
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1482
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1483
                                goto error;
1484
                        }
1485
                        if(!json_is_string(secret)) {
1486
                                janus_mutex_unlock(&mountpoints_mutex);
1487
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1488
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1489
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1490
                                goto error;
1491
                        }
1492
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1493
                                janus_mutex_unlock(&mountpoints_mutex);
1494
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1495
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1496
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1497
                                goto error;
1498
                        }
1499
                }
1500
                JANUS_LOG(LOG_VERB, "Request to unmount mountpoint/stream %"SCNu64"\n", id_value);
1501
                /* FIXME Should we kick the current viewers as well? */
1502
                janus_mutex_lock(&mp->mutex);
1503
                GList *viewer = g_list_first(mp->listeners);
1504
                /* Prepare JSON event */
1505
                json_t *event = json_object();
1506
                json_object_set_new(event, "streaming", json_string("event"));
1507
                json_t *result = json_object();
1508
                json_object_set_new(result, "status", json_string("stopped"));
1509
                json_object_set_new(event, "result", result);
1510
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1511
                json_decref(event);
1512
                while(viewer) {
1513
                        janus_streaming_session *session = (janus_streaming_session *)viewer->data;
1514
                        if(session != NULL) {
1515
                                session->stopping = TRUE;
1516
                                session->started = FALSE;
1517
                                session->paused = FALSE;
1518
                                session->mountpoint = NULL;
1519
                                /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1520
                                gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1521
                                gateway->close_pc(session->handle);
1522
                        }
1523
                        mp->listeners = g_list_remove_all(mp->listeners, session);
1524
                        viewer = g_list_first(mp->listeners);
1525
                }
1526
                g_free(event_text);
1527
                janus_mutex_unlock(&mp->mutex);
1528
                /* Remove mountpoint from the hashtable: this will get it destroyed */
1529
                g_hash_table_remove(mountpoints, GINT_TO_POINTER(id_value));
1530
                janus_mutex_unlock(&mountpoints_mutex);
1531
                /* Send info back */
1532
                response = json_object();
1533
                json_object_set_new(response, "streaming", json_string("destroyed"));
1534
                json_object_set_new(response, "destroyed", json_integer(id_value));
1535
                goto plugin_response;
1536
        } else if(!strcasecmp(request_text, "recording")) {
1537
                /* We can start/stop recording a live, RTP-based stream */
1538
                json_t *action = json_object_get(root, "action");
1539
                if(!action) {
1540
                        JANUS_LOG(LOG_ERR, "Missing element (action)\n");
1541
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1542
                        g_snprintf(error_cause, 512, "Missing element (action)");
1543
                        goto error;
1544
                }
1545
                if(!json_is_string(action)) {
1546
                        JANUS_LOG(LOG_ERR, "Invalid element (action should be a string)\n");
1547
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1548
                        g_snprintf(error_cause, 512, "Invalid element (action should be a string)");
1549
                        goto error;
1550
                }
1551
                const char *action_text = json_string_value(action);
1552
                if(strcasecmp(action_text, "start") && strcasecmp(action_text, "stop")) {
1553
                        JANUS_LOG(LOG_ERR, "Invalid action (should be start|stop)\n");
1554
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1555
                        g_snprintf(error_cause, 512, "Invalid action (should be start|stop)");
1556
                        goto error;
1557
                }
1558
                json_t *id = json_object_get(root, "id");
1559
                if(!id) {
1560
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1561
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1562
                        g_snprintf(error_cause, 512, "Missing element (id)");
1563
                        goto error;
1564
                }
1565
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1566
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1567
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1568
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1569
                        goto error;
1570
                }
1571
                gint64 id_value = json_integer_value(id);
1572
                janus_mutex_lock(&mountpoints_mutex);
1573
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1574
                if(mp == NULL) {
1575
                        janus_mutex_unlock(&mountpoints_mutex);
1576
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1577
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1578
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1579
                        goto error;
1580
                }
1581
                if(mp->streaming_type != janus_streaming_type_live || mp->streaming_source != janus_streaming_source_rtp) {
1582
                        janus_mutex_unlock(&mountpoints_mutex);
1583
                        JANUS_LOG(LOG_ERR, "Recording is only available on RTP-based live streams\n");
1584
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1585
                        g_snprintf(error_cause, 512, "Recording is only available on RTP-based live streams");
1586
                        goto error;
1587
                }
1588
                if(mp->secret) {
1589
                        /* This action requires an authorized user */
1590
                        json_t *secret = json_object_get(root, "secret");
1591
                        if(!secret) {
1592
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1593
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1594
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1595
                                goto error;
1596
                        }
1597
                        if(!json_is_string(secret)) {
1598
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1599
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1600
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1601
                                goto error;
1602
                        }
1603
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1604
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1605
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1606
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1607
                                goto error;
1608
                        }
1609
                }
1610
                janus_streaming_rtp_source *source = mp->source;
1611
                if(!strcasecmp(action_text, "start")) {
1612
                        /* Start a recording for audio and/or video */
1613
                        json_t *audio = json_object_get(root, "audio");
1614
                        if(audio && !json_is_string(audio)) {
1615
                                janus_mutex_unlock(&mountpoints_mutex);
1616
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a string)\n");
1617
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1618
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a string)");
1619
                                goto error;
1620
                        }
1621
                        json_t *video = json_object_get(root, "video");
1622
                        if(video && !json_is_string(video)) {
1623
                                janus_mutex_unlock(&mountpoints_mutex);
1624
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a string)\n");
1625
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1626
                                g_snprintf(error_cause, 512, "Invalid value (video should be a string)");
1627
                                goto error;
1628
                        }
1629
                        if((audio && source->arc) || (video && source->vrc)) {
1630
                                janus_mutex_unlock(&mountpoints_mutex);
1631
                                JANUS_LOG(LOG_ERR, "Recording for audio and/or video already started for this stream\n");
1632
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1633
                                g_snprintf(error_cause, 512, "Recording for audio and/or video already started for this stream");
1634
                                goto error;
1635
                        }
1636
                        if(!audio && !video) {
1637
                                janus_mutex_unlock(&mountpoints_mutex);
1638
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1639
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1640
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1641
                                goto error;
1642
                        }
1643
                        if(audio) {
1644
                                const char *audiofile = json_string_value(audio);
1645
                                source->arc = janus_recorder_create(NULL, 0, (char *)audiofile);
1646
                                if(source->arc == NULL) {
1647
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for audio\n");
1648
                                } else {
1649
                                        JANUS_LOG(LOG_INFO, "[%s] Audio recording started\n", mp->name);
1650
                                }
1651
                        }
1652
                        if(video) {
1653
                                const char *videofile = json_string_value(video);
1654
                                source->vrc = janus_recorder_create(NULL, 1, (char *)videofile);
1655
                                if(source->vrc == NULL) {
1656
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for video\n");
1657
                                } else {
1658
                                        JANUS_LOG(LOG_INFO, "[%s] Video recording started\n", mp->name);
1659
                                }
1660
                        }
1661
                        janus_mutex_unlock(&mountpoints_mutex);
1662
                        /* Send a success response back */
1663
                        response = json_object();
1664
                        json_object_set_new(response, "streaming", json_string("ok"));
1665
                        goto plugin_response;
1666
                } else if(!strcasecmp(action_text, "stop")) {
1667
                        /* Stop the recording */
1668
                        json_t *audio = json_object_get(root, "audio");
1669
                        if(audio && !json_is_boolean(audio)) {
1670
                                janus_mutex_unlock(&mountpoints_mutex);
1671
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1672
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1673
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1674
                                goto error;
1675
                        }
1676
                        json_t *video = json_object_get(root, "video");
1677
                        if(video && !json_is_boolean(video)) {
1678
                                janus_mutex_unlock(&mountpoints_mutex);
1679
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1680
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1681
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1682
                                goto error;
1683
                        }
1684
                        if(!audio && !video) {
1685
                                janus_mutex_unlock(&mountpoints_mutex);
1686
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1687
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1688
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1689
                                goto error;
1690
                        }
1691
                        if(audio && json_is_true(audio) && source->arc) {
1692
                                /* Close the audio recording */
1693
                                janus_recorder_close(source->arc);
1694
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1695
                                janus_recorder *tmp = source->arc;
1696
                                source->arc = NULL;
1697
                                janus_recorder_free(tmp);
1698
                        }
1699
                        if(video && json_is_true(video) && source->vrc) {
1700
                                /* Close the video recording */
1701
                                janus_recorder_close(source->vrc);
1702
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1703
                                janus_recorder *tmp = source->vrc;
1704
                                source->vrc = NULL;
1705
                                janus_recorder_free(tmp);
1706
                        }
1707
                        janus_mutex_unlock(&mountpoints_mutex);
1708
                        /* Send a success response back */
1709
                        response = json_object();
1710
                        json_object_set_new(response, "streaming", json_string("ok"));
1711
                        goto plugin_response;
1712
                }
1713
        } else if(!strcasecmp(request_text, "enable") || !strcasecmp(request_text, "disable")) {
1714
                /* A request to enable/disable a mountpoint */
1715
                json_t *id = json_object_get(root, "id");
1716
                if(!id) {
1717
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1718
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1719
                        g_snprintf(error_cause, 512, "Missing element (id)");
1720
                        goto error;
1721
                }
1722
                if(!json_is_integer(id) || json_integer_value(id) < 0) {
1723
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
1724
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1725
                        g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
1726
                        goto error;
1727
                }
1728
                gint64 id_value = json_integer_value(id);
1729
                janus_mutex_lock(&mountpoints_mutex);
1730
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1731
                if(mp == NULL) {
1732
                        janus_mutex_unlock(&mountpoints_mutex);
1733
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1734
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1735
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1736
                        goto error;
1737
                }
1738
                if(mp->secret) {
1739
                        /* This action requires an authorized user */
1740
                        json_t *secret = json_object_get(root, "secret");
1741
                        if(!secret) {
1742
                                janus_mutex_unlock(&mountpoints_mutex);
1743
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1744
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1745
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1746
                                goto error;
1747
                        }
1748
                        if(!json_is_string(secret)) {
1749
                                janus_mutex_unlock(&mountpoints_mutex);
1750
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1751
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1752
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1753
                                goto error;
1754
                        }
1755
                        if(!janus_strcmp_const_time(mp->secret, json_string_value(secret))) {
1756
                                janus_mutex_unlock(&mountpoints_mutex);
1757
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1758
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1759
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1760
                                goto error;
1761
                        }
1762
                }
1763
                if(!strcasecmp(request_text, "enable")) {
1764
                        /* Enable a previously disabled mountpoint */
1765
                        JANUS_LOG(LOG_INFO, "[%s] Stream enabled\n", mp->name);
1766
                        mp->enabled = TRUE;
1767
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1768
                } else {
1769
                        /* Disable a previously enabled mountpoint */
1770
                        JANUS_LOG(LOG_INFO, "[%s] Stream disabled\n", mp->name);
1771
                        mp->enabled = FALSE;
1772
                        /* Any recording to close? */
1773
                        janus_streaming_rtp_source *source = mp->source;
1774
                        if(source->arc) {
1775
                                janus_recorder_close(source->arc);
1776
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1777
                                janus_recorder *tmp = source->arc;
1778
                                source->arc = NULL;
1779
                                janus_recorder_free(tmp);
1780
                        }
1781
                        if(source->vrc) {
1782
                                janus_recorder_close(source->vrc);
1783
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1784
                                janus_recorder *tmp = source->vrc;
1785
                                source->vrc = NULL;
1786
                                janus_recorder_free(tmp);
1787
                        }
1788
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1789
                }
1790
                janus_mutex_unlock(&mountpoints_mutex);
1791
                /* Send a success response back */
1792
                response = json_object();
1793
                json_object_set_new(response, "streaming", json_string("ok"));
1794
                goto plugin_response;
1795
        } else if(!strcasecmp(request_text, "watch") || !strcasecmp(request_text, "start")
1796
                        || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "stop")
1797
                        || !strcasecmp(request_text, "switch")) {
1798
                /* These messages are handled asynchronously */
1799
                janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
1800
                if(msg == NULL) {
1801
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1802
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1803
                        g_snprintf(error_cause, 512, "Memory error");
1804
                        goto error;
1805
                }
1806

    
1807
                g_free(message);
1808
                msg->handle = handle;
1809
                msg->transaction = transaction;
1810
                msg->message = root;
1811
                msg->sdp_type = sdp_type;
1812
                msg->sdp = sdp;
1813

    
1814
                g_async_queue_push(messages, msg);
1815

    
1816
                return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
1817
        } else {
1818
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1819
                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1820
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1821
                goto error;
1822
        }
1823

    
1824
plugin_response:
1825
                {
1826
                        if(!response) {
1827
                                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1828
                                g_snprintf(error_cause, 512, "Invalid response");
1829
                                goto error;
1830
                        }
1831
                        if(root != NULL)
1832
                                json_decref(root);
1833
                        g_free(transaction);
1834
                        g_free(message);
1835
                        g_free(sdp_type);
1836
                        g_free(sdp);
1837

    
1838
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1839
                        json_decref(response);
1840
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1841
                        g_free(response_text);
1842
                        return result;
1843
                }
1844

    
1845
error:
1846
                {
1847
                        if(root != NULL)
1848
                                json_decref(root);
1849
                        g_free(transaction);
1850
                        g_free(message);
1851
                        g_free(sdp_type);
1852
                        g_free(sdp);
1853

    
1854
                        /* Prepare JSON error event */
1855
                        json_t *event = json_object();
1856
                        json_object_set_new(event, "streaming", json_string("event"));
1857
                        json_object_set_new(event, "error_code", json_integer(error_code));
1858
                        json_object_set_new(event, "error", json_string(error_cause));
1859
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1860
                        json_decref(event);
1861
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
1862
                        g_free(event_text);
1863
                        return result;
1864
                }
1865

    
1866
}
1867

    
1868
void janus_streaming_setup_media(janus_plugin_session *handle) {
1869
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
1870
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1871
                return;
1872
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1873
        if(!session) {
1874
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1875
                return;
1876
        }
1877
        if(session->destroyed)
1878
                return;
1879
        /* TODO Only start streaming when we get this event */
1880
        session->context.a_last_ssrc = 0;
1881
        session->context.a_last_ssrc = 0;
1882
        session->context.a_last_ts = 0;
1883
        session->context.a_base_ts = 0;
1884
        session->context.a_base_ts_prev = 0;
1885
        session->context.v_last_ssrc = 0;
1886
        session->context.v_last_ts = 0;
1887
        session->context.v_base_ts = 0;
1888
        session->context.v_base_ts_prev = 0;
1889
        session->context.a_last_seq = 0;
1890
        session->context.a_base_seq = 0;
1891
        session->context.a_base_seq_prev = 0;
1892
        session->context.v_last_seq = 0;
1893
        session->context.v_base_seq = 0;
1894
        session->context.v_base_seq_prev = 0;
1895
        session->started = TRUE;
1896
        /* Prepare JSON event */
1897
        json_t *event = json_object();
1898
        json_object_set_new(event, "streaming", json_string("event"));
1899
        json_t *result = json_object();
1900
        json_object_set_new(result, "status", json_string("started"));
1901
        json_object_set_new(event, "result", result);
1902
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1903
        json_decref(event);
1904
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1905
        int ret = gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1906
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1907
        g_free(event_text);
1908
}
1909

    
1910
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
1911
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1912
                return;
1913
        /* FIXME We don't care about what the browser sends us, we're sendonly */
1914
}
1915

    
1916
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
1917
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1918
                return;
1919
        /* We might interested in the available bandwidth that the user advertizes */
1920
        uint64_t bw = janus_rtcp_get_remb(buf, len);
1921
        if(bw > 0) {
1922
                JANUS_LOG(LOG_HUGE, "REMB for this PeerConnection: %"SCNu64"\n", bw);
1923
                /* TODO Use this somehow (e.g., notification towards application?) */
1924
        }
1925
        /* FIXME Maybe we should care about RTCP, but not now */
1926
}
1927

    
1928
void janus_streaming_hangup_media(janus_plugin_session *handle) {
1929
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
1930
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1931
                return;
1932
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1933
        if(!session) {
1934
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1935
                return;
1936
        }
1937
        if(session->destroyed)
1938
                return;
1939
        /* FIXME Simulate a "stop" coming from the browser */
1940
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
1941
        if(msg == NULL) {
1942
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1943
                return;
1944
        }
1945
        msg->handle = handle;
1946
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
1947
        msg->transaction = NULL;
1948
        msg->sdp_type = NULL;
1949
        msg->sdp = NULL;
1950
        g_async_queue_push(messages, msg);
1951
}
1952

    
1953
/* Thread to handle incoming messages */
1954
static void *janus_streaming_handler(void *data) {
1955
        JANUS_LOG(LOG_VERB, "Joining Streaming handler thread\n");
1956
        janus_streaming_message *msg = NULL;
1957
        int error_code = 0;
1958
        char *error_cause = calloc(1024, sizeof(char));
1959
        if(error_cause == NULL) {
1960
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1961
                return NULL;
1962
        }
1963
        json_t *root = NULL;
1964
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
1965
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
1966
                        usleep(50000);
1967
                        continue;
1968
                }
1969
                janus_streaming_session *session = (janus_streaming_session *)msg->handle->plugin_handle;
1970
                if(!session) {
1971
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1972
                        janus_streaming_message_free(msg);
1973
                        continue;
1974
                }
1975
                if(session->destroyed) {
1976
                        janus_streaming_message_free(msg);
1977
                        continue;
1978
                }
1979
                /* Handle request */
1980
                error_code = 0;
1981
                root = NULL;
1982
                if(msg->message == NULL) {
1983
                        JANUS_LOG(LOG_ERR, "No message??\n");
1984
                        error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1985
                        g_snprintf(error_cause, 512, "%s", "No message??");
1986
                        goto error;
1987
                }
1988
                root = msg->message;
1989
                /* Get the request first */
1990
                json_t *request = json_object_get(root, "request");
1991
                if(!request) {
1992
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
1993
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1994
                        g_snprintf(error_cause, 512, "Missing element (request)");
1995
                        goto error;
1996
                }
1997
                if(!json_is_string(request)) {
1998
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
1999
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2000
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
2001
                        goto error;
2002
                }
2003
                const char *request_text = json_string_value(request);
2004
                json_t *result = NULL;
2005
                const char *sdp_type = NULL;
2006
                char *sdp = NULL;
2007
                /* All these requests can only be handled asynchronously */
2008
                if(!strcasecmp(request_text, "watch")) {
2009
                        json_t *id = json_object_get(root, "id");
2010
                        if(!id) {
2011
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
2012
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
2013
                                g_snprintf(error_cause, 512, "Missing element (id)");
2014
                                goto error;
2015
                        }
2016
                        if(!json_is_integer(id) || json_integer_value(id) < 0) {
2017
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
2018
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2019
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
2020
                                goto error;
2021
                        }
2022
                        gint64 id_value = json_integer_value(id);
2023
                        janus_mutex_lock(&mountpoints_mutex);
2024
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2025
                        if(mp == NULL) {
2026
                                janus_mutex_unlock(&mountpoints_mutex);
2027
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2028
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2029
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2030
                                goto error;
2031
                        }
2032
                        janus_mutex_unlock(&mountpoints_mutex);
2033
                        JANUS_LOG(LOG_VERB, "Request to watch mountpoint/stream %"SCNu64"\n", id_value);
2034
                        session->stopping = FALSE;
2035
                        session->mountpoint = mp;
2036
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
2037
                                GError *error = NULL;
2038
                                g_thread_try_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session, &error);
2039
                                if(error != NULL) {
2040
                                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the on-demand thread...\n", error->code, error->message ? error->message : "??");
2041
                                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
2042
                                        g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the on-demand thread", error->code, error->message ? error->message : "??");
2043
                                        goto error;
2044
                                }
2045
                        }
2046
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
2047
                        janus_mutex_lock(&mp->mutex);
2048
                        mp->listeners = g_list_append(mp->listeners, session);
2049
                        janus_mutex_unlock(&mp->mutex);
2050
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
2051
                        char sdptemp[2048];
2052
                        memset(sdptemp, 0, 2048);
2053
                        gchar buffer[512];
2054
                        memset(buffer, 0, 512);
2055
                        gint64 sessid = janus_get_monotonic_time();
2056
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
2057
                        g_snprintf(buffer, 512,
2058
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
2059
                                        "-", sessid, version);
2060
                        g_strlcat(sdptemp, buffer, 2048);
2061
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 2048);
2062
                        if(mp->codecs.audio_pt >= 0) {
2063
                                /* Add audio line */
2064
                                g_snprintf(buffer, 512,
2065
                                        "m=audio 1 RTP/SAVPF %d\r\n"
2066
                                        "c=IN IP4 1.1.1.1\r\n",
2067
                                        mp->codecs.audio_pt);
2068
                                g_strlcat(sdptemp, buffer, 2048);
2069
                                if(mp->codecs.audio_rtpmap) {
2070
                                        g_snprintf(buffer, 512,
2071
                                                "a=rtpmap:%d %s\r\n",
2072
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
2073
                                        g_strlcat(sdptemp, buffer, 2048);
2074
                                }
2075
                                if(mp->codecs.audio_fmtp) {
2076
                                        g_snprintf(buffer, 512,
2077
                                                "a=fmtp:%d %s\r\n",
2078
                                                mp->codecs.audio_pt, mp->codecs.audio_fmtp);
2079
                                        g_strlcat(sdptemp, buffer, 2048);
2080
                                }
2081
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2082
                        }
2083
                        if(mp->codecs.video_pt >= 0) {
2084
                                /* Add video line */
2085
                                g_snprintf(buffer, 512,
2086
                                        "m=video 1 RTP/SAVPF %d\r\n"
2087
                                        "c=IN IP4 1.1.1.1\r\n",
2088
                                        mp->codecs.video_pt);
2089
                                g_strlcat(sdptemp, buffer, 2048);
2090
                                if(mp->codecs.video_rtpmap) {
2091
                                        g_snprintf(buffer, 512,
2092
                                                "a=rtpmap:%d %s\r\n",
2093
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
2094
                                        g_strlcat(sdptemp, buffer, 2048);
2095
                                }
2096
                                if(mp->codecs.video_fmtp) {
2097
                                        g_snprintf(buffer, 512,
2098
                                                "a=fmtp:%d %s\r\n",
2099
                                                mp->codecs.video_pt, mp->codecs.video_fmtp);
2100
                                        g_strlcat(sdptemp, buffer, 2048);
2101
                                }
2102
                                g_snprintf(buffer, 512,
2103
                                        "a=rtcp-fb:%d nack\r\n",
2104
                                        mp->codecs.video_pt);
2105
                                g_strlcat(sdptemp, buffer, 2048);
2106
                                g_snprintf(buffer, 512,
2107
                                        "a=rtcp-fb:%d goog-remb\r\n",
2108
                                        mp->codecs.video_pt);
2109
                                g_strlcat(sdptemp, buffer, 2048);
2110
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2111
                        }
2112
                        sdp = g_strdup(sdptemp);
2113
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
2114
                        result = json_object();
2115
                        json_object_set_new(result, "status", json_string("preparing"));
2116
                } else if(!strcasecmp(request_text, "start")) {
2117
                        if(session->mountpoint == NULL) {
2118
                                JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
2119
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2120
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2121
                                goto error;
2122
                        }
2123
                        JANUS_LOG(LOG_VERB, "Starting the streaming\n");
2124
                        session->paused = FALSE;
2125
                        result = json_object();
2126
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
2127
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
2128
                } else if(!strcasecmp(request_text, "pause")) {
2129
                        if(session->mountpoint == NULL) {
2130
                                JANUS_LOG(LOG_VERB, "Can't pause: no mountpoint set\n");
2131
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2132
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2133
                                goto error;
2134
                        }
2135
                        JANUS_LOG(LOG_VERB, "Pausing the streaming\n");
2136
                        session->paused = TRUE;
2137
                        result = json_object();
2138
                        json_object_set_new(result, "status", json_string("pausing"));
2139
                } else if(!strcasecmp(request_text, "switch")) {
2140
                        /* This listener wants to switch to a different mountpoint
2141
                         * NOTE: this only works for live RTP streams as of now: you
2142
                         * cannot, for instance, switch from a live RTP mountpoint to
2143
                         * an on demand one or viceversa (TBD.) */
2144
                        janus_streaming_mountpoint *oldmp = session->mountpoint;
2145
                        if(oldmp == NULL) {
2146
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a mountpoint\n");
2147
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2148
                                g_snprintf(error_cause, 512, "Can't switch: not on a mountpoint");
2149
                                goto error;
2150
                        }
2151
                        if(oldmp->streaming_type != janus_streaming_type_live || 
2152
                                        oldmp->streaming_source != janus_streaming_source_rtp) {
2153
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a live RTP mountpoint\n");
2154
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2155
                                g_snprintf(error_cause, 512, "Can't switch: not on a live RTP mountpoint");
2156
                                goto error;
2157
                        }
2158
                        json_t *id = json_object_get(root, "id");
2159
                        if(!id) {
2160
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
2161
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
2162
                                g_snprintf(error_cause, 512, "Missing element (id)");
2163
                                goto error;
2164
                        }
2165
                        if(!json_is_integer(id) || json_integer_value(id) < 0) {
2166
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be a positive integer)\n");
2167
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
2168
                                g_snprintf(error_cause, 512, "Invalid element (id should be a positive integer)");
2169
                                goto error;
2170
                        }
2171
                        gint64 id_value = json_integer_value(id);
2172
                        janus_mutex_lock(&mountpoints_mutex);
2173
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2174
                        if(mp == NULL) {
2175
                                janus_mutex_unlock(&mountpoints_mutex);
2176
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2177
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2178
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2179
                                goto error;
2180
                        }
2181
                        if(mp->streaming_type != janus_streaming_type_live || 
2182
                                        mp->streaming_source != janus_streaming_source_rtp) {
2183
                                janus_mutex_unlock(&mountpoints_mutex);
2184
                                JANUS_LOG(LOG_VERB, "Can't switch: target is not a live RTP mountpoint\n");
2185
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2186
                                g_snprintf(error_cause, 512, "Can't switch: target is not a live RTP mountpoint");
2187
                                goto error;
2188
                        }
2189
                        janus_mutex_unlock(&mountpoints_mutex);
2190
                        JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %"SCNu64" (old: %"SCNu64")\n", id_value, oldmp->id);
2191
                        session->paused = TRUE;
2192
                        /* Unsubscribe from the previous mountpoint and subscribe to the new one */
2193
                        janus_mutex_lock(&oldmp->mutex);
2194
                        oldmp->listeners = g_list_remove_all(oldmp->listeners, session);
2195
                        janus_mutex_unlock(&oldmp->mutex);
2196
                        /* Subscribe to the new one */
2197
                        janus_mutex_lock(&mp->mutex);
2198
                        mp->listeners = g_list_append(mp->listeners, session);
2199
                        janus_mutex_unlock(&mp->mutex);
2200
                        session->mountpoint = mp;
2201
                        session->paused = FALSE;
2202
                        /* Done */
2203
                        result = json_object();
2204
                        json_object_set_new(result, "streaming", json_string("event"));
2205
                        json_object_set_new(result, "switched", json_string("ok"));
2206
                        json_object_set_new(result, "id", json_integer(id_value));
2207
                } else if(!strcasecmp(request_text, "stop")) {
2208
                        if(session->stopping || !session->started) {
2209
                                /* Been there, done that: ignore */
2210
                                janus_streaming_message_free(msg);
2211
                                continue;
2212
                        }
2213
                        JANUS_LOG(LOG_VERB, "Stopping the streaming\n");
2214
                        session->stopping = TRUE;
2215
                        session->started = FALSE;
2216
                        session->paused = FALSE;
2217
                        result = json_object();
2218
                        json_object_set_new(result, "status", json_string("stopping"));
2219
                        if(session->mountpoint) {
2220
                                janus_mutex_lock(&session->mountpoint->mutex);
2221
                                JANUS_LOG(LOG_VERB, "  -- Removing the session from the mountpoint listeners\n");
2222
                                if(g_list_find(session->mountpoint->listeners, session) != NULL) {
2223
                                        JANUS_LOG(LOG_VERB, "  -- -- Found!\n");
2224
                                }
2225
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
2226
                                janus_mutex_unlock(&session->mountpoint->mutex);
2227
                        }
2228
                        session->mountpoint = NULL;
2229
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
2230
                        gateway->close_pc(session->handle);
2231
                } else {
2232
                        JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
2233
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
2234
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
2235
                        goto error;
2236
                }
2237
                
2238
                /* Any SDP to handle? */
2239
                if(msg->sdp) {
2240
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
2241
                }
2242

    
2243
                /* Prepare JSON event */
2244
                json_t *event = json_object();
2245
                json_object_set_new(event, "streaming", json_string("event"));
2246
                if(result != NULL)
2247
                        json_object_set_new(event, "result", result);
2248
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2249
                json_decref(event);
2250
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2251
                int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp);
2252
                JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2253
                g_free(event_text);
2254
                if(sdp)
2255
                        g_free(sdp);
2256
                janus_streaming_message_free(msg);
2257
                continue;
2258
                
2259
error:
2260
                {
2261
                        /* Prepare JSON error event */
2262
                        json_t *event = json_object();
2263
                        json_object_set_new(event, "streaming", json_string("event"));
2264
                        json_object_set_new(event, "error_code", json_integer(error_code));
2265
                        json_object_set_new(event, "error", json_string(error_cause));
2266
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2267
                        json_decref(event);
2268
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2269
                        int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL);
2270
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2271
                        g_free(event_text);
2272
                        janus_streaming_message_free(msg);
2273
                }
2274
        }
2275
        g_free(error_cause);
2276
        JANUS_LOG(LOG_VERB, "Leaving Streaming handler thread\n");
2277
        return NULL;
2278
}
2279

    
2280

    
2281
/* Helpers to destroy a streaming mountpoint. */
2282
static void janus_streaming_rtp_source_free(janus_streaming_rtp_source *source) {
2283
        if(source->audio_fd > 0) {
2284
                close(source->audio_fd);
2285
        }
2286
        if(source->video_fd > 0) {
2287
                close(source->video_fd);
2288
        }
2289
#ifdef HAVE_LIBCURL
2290
        if(source->curl) {
2291
                curl_easy_cleanup(source->curl);
2292
        }
2293
#endif
2294
        free(source);
2295
}
2296

    
2297
static void janus_streaming_file_source_free(janus_streaming_file_source *source) {
2298
        g_free(source->filename);
2299
        free(source);
2300
}
2301

    
2302
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp) {
2303
        mp->destroyed = janus_get_monotonic_time();
2304
        
2305
        g_free(mp->name);
2306
        g_free(mp->description);
2307
        janus_mutex_lock(&mp->mutex);
2308
        g_list_free(mp->listeners);
2309
        janus_mutex_unlock(&mp->mutex);
2310

    
2311
        if(mp->source != NULL && mp->source_destroy != NULL) {
2312
                mp->source_destroy(mp->source);
2313
        }
2314

    
2315
        g_free(mp->codecs.audio_rtpmap);
2316
        g_free(mp->codecs.audio_fmtp);
2317
        g_free(mp->codecs.video_rtpmap);
2318
        g_free(mp->codecs.video_fmtp);
2319

    
2320
        free(mp);
2321
}
2322

    
2323

    
2324
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
2325
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
2326
                uint64_t id, char *name, char *desc,
2327
                gboolean doaudio, char *amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
2328
                gboolean dovideo, char *vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp) {
2329
        if(name == NULL) {
2330
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2331
        }
2332
        if(id == 0) {
2333
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2334
        }
2335
        if(!doaudio && !dovideo) {
2336
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
2337
                return NULL;
2338
        }
2339
        if(doaudio && (aport == 0 || artpmap == NULL)) {
2340
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for audio...\n");
2341
                return NULL;
2342
        }
2343
        if(dovideo && (vport == 0 || vcodec == 0 || vrtpmap == NULL)) {
2344
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for video...\n");
2345
                return NULL;
2346
        }
2347
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
2348
        janus_streaming_mountpoint *live_rtp = calloc(1, sizeof(janus_streaming_mountpoint));
2349
        if(live_rtp == NULL) {
2350
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2351
                return NULL;
2352
        }
2353
        live_rtp->id = id ? id : g_random_int();
2354
        char tempname[255];
2355
        if(!name) {
2356
                memset(tempname, 0, 255);
2357
                g_snprintf(tempname, 255, "%"SCNu64, live_rtp->id);
2358
        }
2359
        live_rtp->name = g_strdup(name ? name : tempname);
2360
        char *description = NULL;
2361
        if(desc != NULL)
2362
                description = g_strdup(desc);
2363
        else
2364
                description = g_strdup(name ? name : tempname);
2365
        live_rtp->description = description;
2366
        live_rtp->enabled = TRUE;
2367
        live_rtp->active = FALSE;
2368
        live_rtp->streaming_type = janus_streaming_type_live;
2369
        live_rtp->streaming_source = janus_streaming_source_rtp;
2370
        janus_streaming_rtp_source *live_rtp_source = calloc(1, sizeof(janus_streaming_rtp_source));
2371
        if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
2372
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2373
                if(live_rtp->name)
2374
                        g_free(live_rtp->name);
2375
                if(description)
2376
                        g_free(description);
2377
                if(live_rtp_source)
2378
                        g_free(live_rtp_source);
2379
                g_free(live_rtp);
2380
                return NULL;
2381
        }
2382
        live_rtp_source->audio_mcast = doaudio ? (amcast ? inet_addr(amcast) : INADDR_ANY) : INADDR_ANY;
2383
        live_rtp_source->audio_port = doaudio ? aport : -1;
2384
        live_rtp_source->video_mcast = dovideo ? (vmcast ? inet_addr(vmcast) : INADDR_ANY) : INADDR_ANY;
2385
        live_rtp_source->video_port = dovideo ? vport : -1;
2386
        live_rtp_source->arc = NULL;
2387
        live_rtp_source->vrc = NULL;
2388
        live_rtp_source->audio_fd = -1;
2389
        live_rtp_source->video_fd = -1;        
2390
        live_rtp->source = live_rtp_source;
2391
        live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2392
        live_rtp->codecs.audio_pt = doaudio ? acodec : -1;
2393
        live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2394
        live_rtp->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL;
2395
        live_rtp->codecs.video_pt = dovideo ? vcodec : -1;
2396
        live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2397
        live_rtp->codecs.video_fmtp = dovideo ? (vfmtp ? g_strdup(vfmtp) : NULL) : NULL;
2398
        live_rtp->listeners = NULL;
2399
        live_rtp->destroyed = 0;
2400
        janus_mutex_init(&live_rtp->mutex);
2401
        janus_mutex_lock(&mountpoints_mutex);
2402
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtp->id), live_rtp);
2403
        janus_mutex_unlock(&mountpoints_mutex);
2404
        GError *error = NULL;
2405
        g_thread_try_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp, &error);
2406
        if(error != NULL) {
2407
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTP thread...\n", error->code, error->message ? error->message : "??");
2408
                if(live_rtp->name)
2409
                        g_free(live_rtp->name);
2410
                if(description)
2411
                        g_free(description);
2412
                if(live_rtp_source)
2413
                        g_free(live_rtp_source);
2414
                g_free(live_rtp);
2415
                return NULL;
2416
        }
2417
        return live_rtp;
2418
}
2419

    
2420
/* Helper to create a file/ondemand live source */
2421
janus_streaming_mountpoint *janus_streaming_create_file_source(
2422
                uint64_t id, char *name, char *desc, char *filename,
2423
                gboolean live, gboolean doaudio, gboolean dovideo) {
2424
        if(filename == NULL) {
2425
                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, missing filename...\n");
2426
                return NULL;
2427
        }
2428
        if(name == NULL) {
2429
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2430
        }
2431
        if(id == 0) {
2432
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2433
        }
2434
        if(!doaudio && !dovideo) {
2435
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, no audio or video have to be streamed...\n");
2436
                return NULL;
2437
        }
2438
        /* FIXME We don't support video streaming from file yet */
2439
        if(!doaudio || dovideo) {
2440
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, we only support audio file streaming right now...\n");
2441
                return NULL;
2442
        }
2443
        /* TODO We should support something more than raw a-Law and mu-Law streams... */
2444
        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
2445
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
2446
                return NULL;
2447
        }
2448
        janus_streaming_mountpoint *file_source = calloc(1, sizeof(janus_streaming_mountpoint));
2449
        if(file_source == NULL) {
2450
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2451
                return NULL;
2452
        }
2453
        file_source->id = id ? id : g_random_int();
2454
        char tempname[255];
2455
        if(!name) {
2456
                memset(tempname, 0, 255);
2457
                g_snprintf(tempname, 255, "%"SCNu64, file_source->id);
2458
        }
2459
        file_source->name = g_strdup(name ? name : tempname);
2460
        char *description = NULL;
2461
        if(desc != NULL)
2462
                description = g_strdup(desc);
2463
        else
2464
                description = g_strdup(name ? name : tempname);
2465
        file_source->description = description;
2466
        file_source->enabled = TRUE;
2467
        file_source->active = FALSE;
2468
        file_source->streaming_type = live ? janus_streaming_type_live : janus_streaming_type_on_demand;
2469
        file_source->streaming_source = janus_streaming_source_file;
2470
        janus_streaming_file_source *file_source_source = calloc(1, sizeof(janus_streaming_file_source));
2471
        if(file_source->name == NULL || description == NULL || file_source_source == NULL) {
2472
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2473
                if(file_source->name)
2474
                        g_free(file_source->name);
2475
                if(description)
2476
                        g_free(description);
2477
                if(file_source_source)
2478
                        g_free(file_source_source);
2479
                g_free(file_source);
2480
                return NULL;
2481
        }
2482
        file_source_source->filename = g_strdup(filename);
2483
        file_source->source = file_source_source;
2484
        file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free;
2485
        file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0;
2486
        file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000");
2487
        file_source->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
2488
        file_source->codecs.video_rtpmap = NULL;
2489
        file_source->listeners = NULL;
2490
        file_source->destroyed = 0;
2491
        janus_mutex_init(&file_source->mutex);
2492
        janus_mutex_lock(&mountpoints_mutex);
2493
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(file_source->id), file_source);
2494
        janus_mutex_unlock(&mountpoints_mutex);
2495
        if(live) {
2496
                GError *error = NULL;
2497
                g_thread_try_new(file_source->name, &janus_streaming_filesource_thread, file_source, &error);
2498
                if(error != NULL) {
2499
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the live filesource thread...\n", error->code, error->message ? error->message : "??");
2500
                        if(file_source->name)
2501
                                g_free(file_source->name);
2502
                        if(description)
2503
                                g_free(description);
2504
                        if(file_source_source)
2505
                                g_free(file_source_source);
2506
                        g_free(file_source);
2507
                        return NULL;
2508
                }
2509
        }
2510
        return file_source;
2511
}
2512

    
2513
#ifdef HAVE_LIBCURL                
2514
typedef struct janus_streaming_buffer {
2515
        char *buffer;
2516
        size_t size;
2517
} janus_streaming_buffer;
2518

    
2519
static size_t janus_streaming_rtsp_curl_callback(void *payload, size_t size, size_t nmemb, void *data) {
2520
        size_t realsize = size * nmemb;
2521
        janus_streaming_buffer *buf = (struct janus_streaming_buffer *)data;
2522
        /* (Re)allocate if needed */
2523
        buf->buffer = realloc(buf->buffer, buf->size+realsize+1);
2524
        if(buf->buffer == NULL) {
2525
                /* Memory error! */ 
2526
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2527
                return 0;
2528
        }
2529
        /* Update the buffer */
2530
        memcpy(&(buf->buffer[buf->size]), payload, realsize);
2531
        buf->size += realsize;
2532
        buf->buffer[buf->size] = 0;
2533
        /* Done! */
2534
        return realsize;
2535
}
2536

    
2537
static int janus_streaming_rtsp_parse_sdp(const char* buffer, const char* name, const char* media, int* pt, int* port, char* rtpmap, char* fmtp, char* control) {
2538
        char pattern[256];
2539
        g_snprintf(pattern, sizeof(pattern), "m=%s", media);
2540
        char *m=strstr(buffer, pattern);
2541
        if(m == NULL) {
2542
                JANUS_LOG(LOG_VERB, "[%s] no media %s...\n", name, media);
2543
                return -1;
2544
        }
2545
        sscanf(m, "m=%*s %d %*s %d", port, pt);
2546
        char *s=strstr(m, "a=control:");
2547
        if(s == NULL) {
2548
                JANUS_LOG(LOG_ERR, "[%s] no control for %s...\n", name, media);
2549
                return -1;
2550
        }                
2551
        sscanf(s, "a=control:%s", control);
2552
        char *r=strstr(m, "a=rtpmap:");
2553
        if(r != NULL) {
2554
                sscanf(r, "a=rtpmap:%*d %s", rtpmap);
2555
        }
2556
        char *f=strstr(m, "a=fmtp:");
2557
        if(f != NULL) {
2558
                sscanf(f, "a=fmtp:%*d %s", fmtp);
2559
        }        
2560
        int fd = socket(AF_INET, SOCK_DGRAM, 0);
2561
        if(fd < 0) {
2562
                JANUS_LOG(LOG_ERR, "[%s] cannot create socket for %s...\n", name, media);
2563
                return -1;
2564
        }
2565
        struct sockaddr_in address;
2566
        address.sin_family = AF_INET;
2567
        address.sin_addr.s_addr = INADDR_ANY;
2568
        address.sin_port = htons(*port);
2569
        if(*port > 0) {
2570
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
2571
                setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
2572
        }                
2573
        if(bind(fd, (struct sockaddr *)(&address), sizeof(struct sockaddr)) < 0) {
2574
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for video (%d)...\n", name, *port);
2575
                close(fd);
2576
                return -1;
2577
        }
2578
        socklen_t len = sizeof(address);
2579
        if(getsockname(fd, (struct sockaddr *)&address, &len) < 0)
2580
        {
2581
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (%d)...\n", name, media, *port);
2582
                close(fd);
2583
                return -1;
2584
        }
2585
        *port=ntohs(address.sin_port);
2586
        
2587
        return fd;
2588
}        
2589

    
2590
/* Helper to create an RTSP source */
2591
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2592
                uint64_t id, char *name, char *desc, char *url,
2593
                gboolean doaudio, gboolean dovideo) {
2594
        if(url == NULL) {
2595
                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n");
2596
                return NULL;
2597
        }        
2598
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");        
2599
        CURL* curl = curl_easy_init();
2600
        if(curl == NULL) {
2601
                JANUS_LOG(LOG_ERR, "Can't init CURL\n");
2602
                return NULL;
2603
        }
2604
        curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
2605
        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
2606
        curl_easy_setopt(curl, CURLOPT_URL, url);        
2607
        /* Send an RTSP DESCRIBE */
2608
        janus_streaming_buffer data;
2609
        data.buffer = malloc(1);
2610
        data.size = 0;
2611
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, url);
2612
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_DESCRIBE);
2613
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, janus_streaming_rtsp_curl_callback);                
2614
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &data);
2615
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, janus_streaming_rtsp_curl_callback);                
2616
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, &data);
2617
        int res = curl_easy_perform(curl);
2618
        if(res != CURLE_OK) {
2619
                JANUS_LOG(LOG_ERR, "Couldn't send DESCRIBE request: %s\n", curl_easy_strerror(res));
2620
                curl_easy_cleanup(curl);
2621
                return NULL;
2622
        }                
2623
        JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n",data.buffer);        
2624
        /* Parse the SDP we just got to figure out the negotiated media */
2625
        int vpt = -1;
2626
        int vport = -1;
2627
        char vrtpmap[2048];
2628
        char vfmtp[2048];
2629
        char vcontrol[2048];
2630
        char uri[1024];
2631
        char transport[1024];
2632
        int video_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "video", &vpt, &vport, vrtpmap, vfmtp, vcontrol);
2633
        if(video_fd >= 0) {
2634
                /* Send an RTSP SETUP for video */
2635
                free(data.buffer);
2636
                data.buffer = malloc(1);
2637
                data.size = 0;                
2638
                sprintf(uri, "%s/%s", url, vcontrol);
2639
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2640
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", vport, vport+1);        
2641
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2642
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2643
                res = curl_easy_perform(curl);
2644
                if(res != CURLE_OK) {
2645
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2646
                        curl_easy_cleanup(curl);
2647
                        return NULL;
2648
                }                
2649
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2650
        }
2651
        int apt = -1;
2652
        int aport = -1;
2653
        char artpmap[2048];
2654
        char afmtp[2048];
2655
        char acontrol[2048];
2656
        int audio_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "audio", &apt, &aport, artpmap, afmtp, acontrol);
2657
        if(audio_fd >= 0) {
2658
                /* Send an RTSP SETUP for audio */
2659
                free(data.buffer);
2660
                data.buffer = malloc(1);
2661
                data.size = 0;                
2662
                sprintf(uri, "%s/%s", url, acontrol);
2663
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2664
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", aport, aport+1);        
2665
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2666
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2667
                res = curl_easy_perform(curl);
2668
                if(res != CURLE_OK) {
2669
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2670
                        curl_easy_cleanup(curl);
2671
                        return NULL;
2672
                }                
2673
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2674
        }        
2675
        /* Create an RTP source for the media we'll get */
2676
        char tempname[255];
2677
        if(!name) {
2678
                memset(tempname, 0, 255);
2679
                g_snprintf(tempname, 255, "%"SCNu64, id);
2680
        }
2681
        char *sourcename =  g_strdup(name ? name : tempname);
2682
        if(sourcename == NULL) {
2683
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2684
                curl_easy_cleanup(curl);
2685
                return NULL;
2686
        }        
2687
        char *description = NULL;
2688
        if(desc != NULL) {
2689
                description = g_strdup(desc);
2690
        } else {
2691
                description = g_strdup(name ? name : tempname);
2692
        }
2693
        if(description == NULL) {
2694
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2695
                g_free(sourcename);
2696
                curl_easy_cleanup(curl);
2697
                return NULL;
2698
        }                
2699
        janus_streaming_mountpoint *live_rtsp = calloc(1, sizeof(janus_streaming_mountpoint));
2700
        if(live_rtsp == NULL) {
2701
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2702
                g_free(description);
2703
                g_free(sourcename);
2704
                curl_easy_cleanup(curl);
2705
                return NULL;
2706
        }        
2707
        live_rtsp->id = id ? id : g_random_int();
2708
        live_rtsp->name = sourcename;
2709
        live_rtsp->description = description;
2710
        live_rtsp->enabled = TRUE;
2711
        live_rtsp->active = FALSE;
2712
        live_rtsp->streaming_type = janus_streaming_type_live;
2713
        live_rtsp->streaming_source = janus_streaming_source_rtp;
2714
        janus_streaming_rtp_source *live_rtsp_source = calloc(1, sizeof(janus_streaming_rtp_source));
2715
        live_rtsp_source->arc = NULL;
2716
        live_rtsp_source->vrc = NULL;
2717
        live_rtsp_source->audio_fd = audio_fd;
2718
        live_rtsp_source->video_fd = video_fd;
2719
        live_rtsp_source->curl = curl;
2720
        live_rtsp->source = live_rtsp_source;
2721
        live_rtsp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2722
        live_rtsp->codecs.audio_pt = doaudio ? apt : -1;
2723
        live_rtsp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2724
        live_rtsp->codecs.audio_fmtp = doaudio ? g_strdup(afmtp) : NULL;
2725
        live_rtsp->codecs.video_pt = dovideo ? vpt : -1;
2726
        live_rtsp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2727
        live_rtsp->codecs.video_fmtp = dovideo ? g_strdup(vfmtp) : NULL;
2728
        live_rtsp->listeners = NULL;
2729
        live_rtsp->destroyed = 0;
2730
        janus_mutex_init(&live_rtsp->mutex);
2731
        janus_mutex_lock(&mountpoints_mutex);
2732
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtsp->id), live_rtsp);
2733
        janus_mutex_unlock(&mountpoints_mutex);        
2734
        GError *error = NULL;
2735
        g_thread_try_new(live_rtsp->name, &janus_streaming_relay_thread, live_rtsp, &error);        
2736
        if(error != NULL) {
2737
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTSP thread...\n", error->code, error->message ? error->message : "??");
2738
                g_free(description);
2739
                g_free(sourcename);
2740
                janus_streaming_rtp_source_free(live_rtsp_source);
2741
                g_free(live_rtsp);
2742
                curl_easy_cleanup(curl);
2743
                return NULL;        
2744
        }                                
2745
        /* Send an RTSP PLAY */
2746
        free(data.buffer);        
2747
        data.buffer = malloc(1);
2748
        data.size = 0;                
2749
        sprintf(uri, "%s/", url);
2750
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2751
        curl_easy_setopt(curl, CURLOPT_RANGE, "0.000-");
2752
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY);                
2753
        res = curl_easy_perform(curl);
2754
        if(res != CURLE_OK) {
2755
                JANUS_LOG(LOG_ERR, "Couldn't send PLAY request: %s\n", curl_easy_strerror(res));
2756
                g_free(description);
2757
                g_free(sourcename);
2758
                janus_streaming_rtp_source_free(live_rtsp_source);
2759
                g_free(live_rtsp);
2760
                curl_easy_cleanup(curl);
2761
                return NULL;
2762
        }                
2763
        JANUS_LOG(LOG_VERB, "PLAY answer:%s\n",data.buffer);        
2764
        free(data.buffer);        
2765
        
2766
        return live_rtsp;
2767
}
2768
#else
2769
/* Helper to create an RTSP source */
2770
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2771
                uint64_t id, char *name, char *desc, char *url,
2772
                gboolean doaudio, gboolean dovideo) {
2773
        JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
2774
        return NULL;
2775
}
2776
#endif
2777

    
2778
/* FIXME Thread to send RTP packets from a file (on demand) */
2779
static void *janus_streaming_ondemand_thread(void *data) {
2780
        JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n");
2781
        janus_streaming_session *session = (janus_streaming_session *)data;
2782
        if(!session) {
2783
                JANUS_LOG(LOG_ERR, "Invalid session!\n");
2784
                g_thread_unref(g_thread_self());
2785
                return NULL;
2786
        }
2787
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
2788
        if(!mountpoint) {
2789
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2790
                g_thread_unref(g_thread_self());
2791
                return NULL;
2792
        }
2793
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2794
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2795
                g_thread_unref(g_thread_self());
2796
                return NULL;
2797
        }
2798
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
2799
                JANUS_LOG(LOG_ERR, "[%s] Not an on-demand file source mountpoint!\n", mountpoint->name);
2800
                g_thread_unref(g_thread_self());
2801
                return NULL;
2802
        }
2803
        janus_streaming_file_source *source = mountpoint->source;
2804
        if(source == NULL || source->filename == NULL) {
2805
                g_thread_unref(g_thread_self());
2806
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2807
                return NULL;
2808
        }
2809
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2810
        FILE *audio = fopen(source->filename, "rb");
2811
        if(!audio) {
2812
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2813
                g_thread_unref(g_thread_self());
2814
                return NULL;
2815
        }
2816
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2817
        /* Buffer */
2818
        char *buf = calloc(1024, sizeof(char));
2819
        if(buf == NULL) {
2820
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2821
                g_thread_unref(g_thread_self());
2822
                return NULL;
2823
        }
2824
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2825
        /* Set up RTP */
2826
        gint16 seq = 1;
2827
        gint32 ts = 0;
2828
        rtp_header *header = (rtp_header *)buf;
2829
        header->version = 2;
2830
        header->markerbit = 1;
2831
        header->type = mountpoint->codecs.audio_pt;
2832
        header->seq_number = htons(seq);
2833
        header->timestamp = htonl(ts);
2834
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2835
        /* Timer */
2836
        struct timeval now, before;
2837
        gettimeofday(&before, NULL);
2838
        now.tv_sec = before.tv_sec;
2839
        now.tv_usec = before.tv_usec;
2840
        time_t passed, d_s, d_us;
2841
        /* Loop */
2842
        gint read = 0;
2843
        janus_streaming_rtp_relay_packet packet;
2844
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed && !session->stopping && !session->destroyed) {
2845
                /* See if it's time to prepare a frame */
2846
                gettimeofday(&now, NULL);
2847
                d_s = now.tv_sec - before.tv_sec;
2848
                d_us = now.tv_usec - before.tv_usec;
2849
                if(d_us < 0) {
2850
                        d_us += 1000000;
2851
                        --d_s;
2852
                }
2853
                passed = d_s*1000000 + d_us;
2854
                if(passed < 18000) {        /* Let's wait about 18ms */
2855
                        usleep(1000);
2856
                        continue;
2857
                }
2858
                /* Update the reference time */
2859
                before.tv_usec += 20000;
2860
                if(before.tv_usec > 1000000) {
2861
                        before.tv_sec++;
2862
                        before.tv_usec -= 1000000;
2863
                }
2864
                /* If not started or paused, wait some more */
2865
                if(!session->started || session->paused || !mountpoint->enabled)
2866
                        continue;
2867
                /* Read frame from file... */
2868
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2869
                if(feof(audio)) {
2870
                        /* FIXME We're doing this forever... should this be configurable? */
2871
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2872
                        fseek(audio, 0, SEEK_SET);
2873
                        continue;
2874
                }
2875
                if(read < 0)
2876
                        break;
2877
                if(mountpoint->active == FALSE)
2878
                        mountpoint->active = TRUE;
2879
                //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
2880
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
2881
                //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
2882
                /* Relay on all sessions */
2883
                packet.data = header;
2884
                packet.length = RTP_HEADER_SIZE + read;
2885
                packet.is_video = 0;
2886
                /* Backup the actual timestamp and sequence number */
2887
                packet.timestamp = ntohl(packet.data->timestamp);
2888
                packet.seq_number = ntohs(packet.data->seq_number);
2889
                /* Go! */
2890
                janus_streaming_relay_rtp_packet(session, &packet);
2891
                /* Update header */
2892
                seq++;
2893
                header->seq_number = htons(seq);
2894
                ts += 160;
2895
                header->timestamp = htonl(ts);
2896
                header->markerbit = 0;
2897
        }
2898
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name);
2899
        g_free(name);
2900
        g_free(buf);
2901
        fclose(audio);
2902
        g_thread_unref(g_thread_self());
2903
        return NULL;
2904
}
2905

    
2906
/* FIXME Thread to send RTP packets from a file (live) */
2907
static void *janus_streaming_filesource_thread(void *data) {
2908
        JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n");
2909
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
2910
        if(!mountpoint) {
2911
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2912
                g_thread_unref(g_thread_self());
2913
                return NULL;
2914
        }
2915
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2916
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2917
                g_thread_unref(g_thread_self());
2918
                return NULL;
2919
        }
2920
        if(mountpoint->streaming_type != janus_streaming_type_live) {
2921
                JANUS_LOG(LOG_ERR, "[%s] Not a live file source mountpoint!\n", mountpoint->name);
2922
                g_thread_unref(g_thread_self());
2923
                return NULL;
2924
        }
2925
        janus_streaming_file_source *source = mountpoint->source;
2926
        if(source == NULL || source->filename == NULL) {
2927
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2928
                g_thread_unref(g_thread_self());
2929
                return NULL;
2930
        }
2931
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2932
        FILE *audio = fopen(source->filename, "rb");
2933
        if(!audio) {
2934
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2935
                g_thread_unref(g_thread_self());
2936
                return NULL;
2937
        }
2938
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2939
        /* Buffer */
2940
        char *buf = calloc(1024, sizeof(char));
2941
        if(buf == NULL) {
2942
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2943
                g_thread_unref(g_thread_self());
2944
                return NULL;
2945
        }
2946
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2947
        /* Set up RTP */
2948
        gint16 seq = 1;
2949
        gint32 ts = 0;
2950
        rtp_header *header = (rtp_header *)buf;
2951
        header->version = 2;
2952
        header->markerbit = 1;
2953
        header->type = mountpoint->codecs.audio_pt;
2954
        header->seq_number = htons(seq);
2955
        header->timestamp = htonl(ts);
2956
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2957
        /* Timer */
2958
        struct timeval now, before;
2959
        gettimeofday(&before, NULL);
2960
        now.tv_sec = before.tv_sec;
2961
        now.tv_usec = before.tv_usec;
2962
        time_t passed, d_s, d_us;
2963
        /* Loop */
2964
        gint read = 0;
2965
        janus_streaming_rtp_relay_packet packet;
2966
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
2967
                /* See if it's time to prepare a frame */
2968
                gettimeofday(&now, NULL);
2969
                d_s = now.tv_sec - before.tv_sec;
2970
                d_us = now.tv_usec - before.tv_usec;
2971
                if(d_us < 0) {
2972
                        d_us += 1000000;
2973
                        --d_s;
2974
                }
2975
                passed = d_s*1000000 + d_us;
2976
                if(passed < 18000) {        /* Let's wait about 18ms */
2977
                        usleep(1000);
2978
                        continue;
2979
                }
2980
                /* Update the reference time */
2981
                before.tv_usec += 20000;
2982
                if(before.tv_usec > 1000000) {
2983
                        before.tv_sec++;
2984
                        before.tv_usec -= 1000000;
2985
                }
2986
                /* If paused, wait some more */
2987
                if(!mountpoint->enabled)
2988
                        continue;
2989
                /* Read frame from file... */
2990
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2991
                if(feof(audio)) {
2992
                        /* FIXME We're doing this forever... should this be configurable? */
2993
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2994
                        fseek(audio, 0, SEEK_SET);
2995
                        continue;
2996
                }
2997
                if(read < 0)
2998
                        break;
2999
                if(mountpoint->active == FALSE)
3000
                        mountpoint->active = TRUE;
3001
                // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3002
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3003
                // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3004
                /* Relay on all sessions */
3005
                packet.data = header;
3006
                packet.length = RTP_HEADER_SIZE + read;
3007
                packet.is_video = 0;
3008
                /* Backup the actual timestamp and sequence number */
3009
                packet.timestamp = ntohl(packet.data->timestamp);
3010
                packet.seq_number = ntohs(packet.data->seq_number);
3011
                /* Go! */
3012
                janus_mutex_lock_nodebug(&mountpoint->mutex);
3013
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3014
                janus_mutex_unlock_nodebug(&mountpoint->mutex);
3015
                /* Update header */
3016
                seq++;
3017
                header->seq_number = htons(seq);
3018
                ts += 160;
3019
                header->timestamp = htonl(ts);
3020
                header->markerbit = 0;
3021
        }
3022
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name);
3023
        g_free(name);
3024
        g_free(buf);
3025
        fclose(audio);
3026
        g_thread_unref(g_thread_self());
3027
        return NULL;
3028
}
3029

    
3030
/* FIXME Test thread to relay RTP frames coming from gstreamer/ffmpeg/others */
3031
static void *janus_streaming_relay_thread(void *data) {
3032
        JANUS_LOG(LOG_VERB, "Starting streaming relay thread\n");
3033
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3034
        if(!mountpoint) {
3035
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3036
                g_thread_unref(g_thread_self());
3037
                return NULL;
3038
        }
3039
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
3040
                JANUS_LOG(LOG_ERR, "[%s] Not an RTP source mountpoint!\n", mountpoint->name);
3041
                g_thread_unref(g_thread_self());
3042
                return NULL;
3043
        }
3044
        janus_streaming_rtp_source *source = mountpoint->source;
3045
        if(source == NULL) {
3046
                JANUS_LOG(LOG_ERR, "[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
3047
                g_thread_unref(g_thread_self());
3048
                return NULL;
3049
        }
3050
        gint audio_port = source->audio_port;
3051
        gint video_port = source->video_port;
3052
        /* Socket stuff */
3053
        struct sockaddr_in audio_address, video_address;
3054
        int audio_fd = source->audio_fd;
3055
        if((audio_fd < 0) && (audio_port >= 0)) {
3056
                audio_fd = socket(AF_INET, SOCK_DGRAM, 0);
3057
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
3058
                setsockopt(audio_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
3059
                if(IN_MULTICAST(source->audio_mcast))
3060
                {
3061
                        struct ip_mreq mreq;
3062
                        mreq.imr_multiaddr.s_addr = source->audio_mcast;
3063
                        if(setsockopt(audio_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
3064
                                JANUS_LOG(LOG_ERR, "[%s] Audio listener IP_ADD_MEMBERSHIP fail\n", mountpoint->name);
3065
                                g_thread_unref(g_thread_self());
3066
                                return NULL;
3067
                        }
3068
                }
3069

    
3070
                audio_address.sin_family = AF_INET;
3071
                audio_address.sin_port = htons(audio_port);
3072
                audio_address.sin_addr.s_addr = INADDR_ANY;
3073
                if(bind(audio_fd, (struct sockaddr *)(&audio_address), sizeof(struct sockaddr)) < 0) {
3074
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for audio (port %d)...\n", mountpoint->name, audio_port);
3075
                        g_thread_unref(g_thread_self());
3076
                        return NULL;
3077
                }
3078
                JANUS_LOG(LOG_VERB, "[%s] Audio listener bound to port %d\n", mountpoint->name, audio_port);
3079
        }
3080
        int video_fd = source->video_fd;
3081
        if((video_fd < 0) && (video_port >= 0)) {
3082
                video_fd = socket(AF_INET, SOCK_DGRAM, 0);
3083
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
3084
                setsockopt(video_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
3085
                if(IN_MULTICAST(source->video_mcast))
3086
                {
3087
                        struct ip_mreq mreq;
3088
                        mreq.imr_multiaddr.s_addr = source->video_mcast;
3089
                        if(setsockopt(video_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
3090
                                JANUS_LOG(LOG_ERR, "[%s] Video listener IP_ADD_MEMBERSHIP fail\n", mountpoint->name);
3091
                                g_thread_unref(g_thread_self());
3092
                                return NULL;
3093
                        }
3094
                }
3095
                
3096
                video_address.sin_family = AF_INET;
3097
                video_address.sin_port = htons(video_port);
3098
                video_address.sin_addr.s_addr = INADDR_ANY;
3099
                if(bind(video_fd, (struct sockaddr *)(&video_address), sizeof(struct sockaddr)) < 0) {
3100
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for video (%d)...\n", mountpoint->name, video_port);
3101
                        g_thread_unref(g_thread_self());
3102
                        return NULL;
3103
                }
3104
                JANUS_LOG(LOG_VERB, "[%s] Video listener bound to port %d\n", mountpoint->name, video_port);
3105
        }
3106
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3107
        /* Needed to fix seq and ts */
3108
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
3109
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
3110
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
3111
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
3112
        /* Loop */
3113
        socklen_t addrlen;
3114
        struct sockaddr_in remote;
3115
        int resfd = 0, bytes = 0;
3116
        struct pollfd fds[2];
3117
        char buffer[1500];
3118
        memset(buffer, 0, 1500);
3119
        janus_streaming_rtp_relay_packet packet;
3120
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3121
                /* Wait for some data */
3122
                fds[0].fd = 0;
3123
                fds[0].events = 0;
3124
                fds[0].revents = 0;
3125
                if(audio_fd > 0) {
3126
                        fds[0].fd = audio_fd;
3127
                        fds[0].events = POLLIN;
3128
                }
3129
                fds[1].fd = 0;
3130
                fds[1].events = 0;
3131
                fds[1].revents = 0;
3132
                if(video_fd > 0) {
3133
                        fds[1].fd = video_fd;
3134
                        fds[1].events = POLLIN;
3135
                }
3136
                resfd = poll(fds, 2, 1000);
3137
                if(resfd < 0) {
3138
                        JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", mountpoint->name, errno, strerror(errno));
3139
                        break;
3140
                } else if(resfd == 0) {
3141
                        /* No data, keep going */
3142
                        continue;
3143
                }
3144
                if(audio_fd && (fds[0].revents & POLLIN)) {
3145
                        /* Got something audio (RTP) */
3146
                        fds[0].revents = 0;
3147
                        if(mountpoint->active == FALSE)
3148
                                mountpoint->active = TRUE;
3149
                        addrlen = sizeof(remote);
3150
                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3151
                        // JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the audio channel...\n", bytes);
3152
                        /* If paused, ignore this packet */
3153
                        if(!mountpoint->enabled)
3154
                                continue;
3155
                        rtp_header *rtp = (rtp_header *)buffer;
3156
                        // JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3157
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3158
                        /* Relay on all sessions */
3159
                        packet.data = rtp;
3160
                        packet.length = bytes;
3161
                        packet.is_video = 0;
3162
                        /* Do we have a new stream? */
3163
                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
3164
                                a_last_ssrc = ntohl(packet.data->ssrc);
3165
                                JANUS_LOG(LOG_INFO, "[%s] New audio stream! (ssrc=%u)\n", name, a_last_ssrc);
3166
                                a_base_ts_prev = a_last_ts;
3167
                                a_base_ts = ntohl(packet.data->timestamp);
3168
                                a_base_seq_prev = a_last_seq;
3169
                                a_base_seq = ntohs(packet.data->seq_number);
3170
                        }
3171
                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
3172
                        packet.data->timestamp = htonl(a_last_ts);
3173
                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
3174
                        packet.data->seq_number = htons(a_last_seq);
3175
                        // JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3176
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3177
                        packet.data->type = mountpoint->codecs.audio_pt;
3178
                        /* Is there a recorder? */
3179
                        if(source->arc) {
3180
                                JANUS_LOG(LOG_HUGE, "[%s] Saving audio frame (%d bytes)\n", name, bytes);
3181
                                janus_recorder_save_frame(source->arc, buffer, bytes);
3182
                        }
3183
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3184
                        packet.timestamp = ntohl(packet.data->timestamp);
3185
                        packet.seq_number = ntohs(packet.data->seq_number);
3186
                        /* Go! */
3187
                        janus_mutex_lock(&mountpoint->mutex);
3188
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3189
                        janus_mutex_unlock(&mountpoint->mutex);
3190
                        continue;
3191
                }
3192
                if(video_fd && (fds[1].revents & POLLIN)) {
3193
                        /* Got something video (RTP) */
3194
                        fds[1].revents = 0;
3195
                        if(mountpoint->active == FALSE)
3196
                                mountpoint->active = TRUE;
3197
                        addrlen = sizeof(remote);
3198
                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3199
                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the video channel...\n", bytes);
3200
                        /* If paused, ignore this packet */
3201
                        if(!mountpoint->enabled)
3202
                                continue;
3203
                        rtp_header *rtp = (rtp_header *)buffer;
3204
                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3205
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3206
                        /* Relay on all sessions */
3207
                        packet.data = rtp;
3208
                        packet.length = bytes;
3209
                        packet.is_video = 1;
3210
                        /* Do we have a new stream? */
3211
                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
3212
                                v_last_ssrc = ntohl(packet.data->ssrc);
3213
                                JANUS_LOG(LOG_INFO, "[%s] New video stream! (ssrc=%u)\n", name, v_last_ssrc);
3214
                                v_base_ts_prev = v_last_ts;
3215
                                v_base_ts = ntohl(packet.data->timestamp);
3216
                                v_base_seq_prev = v_last_seq;
3217
                                v_base_seq = ntohs(packet.data->seq_number);
3218
                        }
3219
                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
3220
                        packet.data->timestamp = htonl(v_last_ts);
3221
                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
3222
                        packet.data->seq_number = htons(v_last_seq);
3223
                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3224
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3225
                        packet.data->type = mountpoint->codecs.video_pt;
3226
                        /* Is there a recorder? */
3227
                        if(source->vrc) {
3228
                                JANUS_LOG(LOG_HUGE, "[%s] Saving video frame (%d bytes)\n", name, bytes);
3229
                                janus_recorder_save_frame(source->vrc, buffer, bytes);
3230
                        }
3231
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3232
                        packet.timestamp = ntohl(packet.data->timestamp);
3233
                        packet.seq_number = ntohs(packet.data->seq_number);
3234
                        /* Go! */
3235
                        janus_mutex_lock(&mountpoint->mutex);
3236
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3237
                        janus_mutex_unlock(&mountpoint->mutex);
3238
                        continue;
3239
                }
3240
        }
3241
        JANUS_LOG(LOG_VERB, "[%s] Leaving streaming relay thread\n", name);
3242
        g_free(name);
3243
        g_thread_unref(g_thread_self());
3244
        return NULL;
3245
}
3246

    
3247
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
3248
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
3249
        if(!packet || !packet->data || packet->length < 1) {
3250
                JANUS_LOG(LOG_ERR, "Invalid packet...\n");
3251
                return;
3252
        }
3253
        janus_streaming_session *session = (janus_streaming_session *)data;
3254
        if(!session || !session->handle) {
3255
                // JANUS_LOG(LOG_ERR, "Invalid session...\n");
3256
                return;
3257
        }
3258
        if(!session->started || session->paused) {
3259
                // JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n");
3260
                return;
3261
        }
3262

    
3263
        /* Make sure there hasn't been a publisher switch by checking the SSRC */
3264
        if(packet->is_video) {
3265
                if(ntohl(packet->data->ssrc) != session->context.v_last_ssrc) {
3266
                        session->context.v_last_ssrc = ntohl(packet->data->ssrc);
3267
                        session->context.v_base_ts_prev = session->context.v_last_ts;
3268
                        session->context.v_base_ts = packet->timestamp;
3269
                        session->context.v_base_seq_prev = session->context.v_last_seq;
3270
                        session->context.v_base_seq = packet->seq_number;
3271
                }
3272
                /* Compute a coherent timestamp and sequence number */
3273
                session->context.v_last_ts = (packet->timestamp-session->context.v_base_ts)
3274
                        + session->context.v_base_ts_prev+4500;        /* FIXME When switching, we assume 15fps */
3275
                session->context.v_last_seq = (packet->seq_number-session->context.v_base_seq)+session->context.v_base_seq_prev+1;
3276
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3277
                packet->data->timestamp = htonl(session->context.v_last_ts);
3278
                packet->data->seq_number = htons(session->context.v_last_seq);
3279
                if(gateway != NULL)
3280
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3281
                /* Restore the timestamp and sequence number to what the publisher set them to */
3282
                packet->data->timestamp = htonl(packet->timestamp);
3283
                packet->data->seq_number = htons(packet->seq_number);
3284
        } else {
3285
                if(ntohl(packet->data->ssrc) != session->context.a_last_ssrc) {
3286
                        session->context.a_last_ssrc = ntohl(packet->data->ssrc);
3287
                        session->context.a_base_ts_prev = session->context.a_last_ts;
3288
                        session->context.a_base_ts = packet->timestamp;
3289
                        session->context.a_base_seq_prev = session->context.a_last_seq;
3290
                        session->context.a_base_seq = packet->seq_number;
3291
                }
3292
                /* Compute a coherent timestamp and sequence number */
3293
                session->context.a_last_ts = (packet->timestamp-session->context.a_base_ts)
3294
                        + session->context.a_base_ts_prev+960;        /* FIXME When switching, we assume Opus and so a 960 ts step */
3295
                session->context.a_last_seq = (packet->seq_number-session->context.a_base_seq)+session->context.a_base_seq_prev+1;
3296
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3297
                packet->data->timestamp = htonl(session->context.a_last_ts);
3298
                packet->data->seq_number = htons(session->context.a_last_seq);
3299
                if(gateway != NULL)
3300
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3301
                /* Restore the timestamp and sequence number to what the publisher set them to */
3302
                packet->data->timestamp = htonl(packet->timestamp);
3303
                packet->data->seq_number = htons(packet->seq_number);
3304
        }
3305

    
3306
        return;
3307
}