Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ 71a04f89

History | View | Annotate | Download (147 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand|rtsp
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
       rtsp = stream originated by an external RTSP feed (only
50
              available if libcurl support was compiled)
51
id = <unique numeric ID>
52
description = This is my awesome stream
53
is_private = yes|no (private streams don't appear when you do a 'list' request)
54
filename = path to the local file to stream (only for live/ondemand)
55
secret = <optional password needed for manipulating (e.g., destroying
56
                or enabling/disabling) the stream>
57
pin = <optional password needed for watching the stream>
58
audio = yes|no (do/don't stream audio)
59
video = yes|no (do/don't stream video)
60
   The following options are only valid for the 'rtp' type:
61
audioport = local port for receiving audio frames
62
audiomcast = multicast group port for receiving audio frames, if any
63
audiopt = <audio RTP payload type> (e.g., 111)
64
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
65
audiofmtp = Codec specific parameters, if any
66
videoport = local port for receiving video frames (only for rtp)
67
videomcast = multicast group port for receiving video frames, if any
68
videopt = <video RTP payload type> (e.g., 100)
69
videortpmap = RTP map of the video codec (e.g., VP8/90000)
70
videofmtp = Codec specific parameters, if any
71
videobufferkf = yes|no (whether the plugin should store the latest
72
        keyframe and send it immediately for new viewers, EXPERIMENTAL)
73

74
   The following options are only valid for the 'rstp' type:
75
url = RTSP stream URL (only if type=rtsp)
76
\endverbatim
77
 *
78
 * \section streamapi Streaming API
79
 * 
80
 * The Streaming API supports several requests, some of which are
81
 * synchronous and some asynchronous. There are some situations, though,
82
 * (invalid JSON, invalid request) which will always result in a
83
 * synchronous error response even for asynchronous requests. 
84
 * 
85
 * \c list , \c create , \c destroy , \c recording , \c enable and
86
 * \c disable are synchronous requests, which means you'll
87
 * get a response directly within the context of the transaction. \c list
88
 * lists all the available streams; \c create allows you to create a new
89
 * mountpoint dynamically, as an alternative to using the configuration
90
 * file; \c destroy removes a mountpoint and destroys it; \c recording
91
 * instructs the plugin on whether or not a live RTP stream should be
92
 * recorded while it's broadcasted; \c enable and \c disable respectively
93
 * enable and disable a mountpoint, that is decide whether or not a
94
 * mountpoint should be available to users without destroying it.
95
 * 
96
 * The \c watch , \c start , \c pause , \c switch and \c stop requests
97
 * instead are all asynchronous, which means you'll get a notification
98
 * about their success or failure in an event. \c watch asks the plugin
99
 * to prepare the playout of one of the available streams; \c start
100
 * starts the actual playout; \c pause allows you to pause a playout
101
 * without tearing down the PeerConnection; \c switch allows you to
102
 * switch to a different mountpoint of the same kind (note: only live
103
 * RTP mountpoints supported as of now) without having to stop and watch
104
 * the new one; \c stop stops the playout and tears the PeerConnection
105
 * down.
106
 * 
107
 * Notice that, in general, all users can create mountpoints, no matter
108
 * what type they are. If you want to limit this functionality, you can
109
 * configure an admin \c admin_key in the plugin settings. When
110
 * configured, only "create" requests that include the correct
111
 * \c admin_key value in an "admin_key" property will succeed, and will
112
 * be rejected otherwise.
113
 * 
114
 * Actual API docs: TBD.
115
 * 
116
 * \ingroup plugins
117
 * \ref plugins
118
 */
119

    
120
#include "plugin.h"
121

    
122
#include <jansson.h>
123
#include <errno.h>
124
#include <sys/poll.h>
125
#include <sys/time.h>
126

    
127
#ifdef HAVE_LIBCURL
128
#include <curl/curl.h>
129
#endif
130

    
131
#include "../debug.h"
132
#include "../apierror.h"
133
#include "../config.h"
134
#include "../mutex.h"
135
#include "../rtp.h"
136
#include "../rtcp.h"
137
#include "../record.h"
138
#include "../utils.h"
139

    
140

    
141
/* Plugin information */
142
#define JANUS_STREAMING_VERSION                        6
143
#define JANUS_STREAMING_VERSION_STRING        "0.0.6"
144
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
145
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
146
#define JANUS_STREAMING_AUTHOR                        "Meetecho s.r.l."
147
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
148

    
149
/* Plugin methods */
150
janus_plugin *create(void);
151
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
152
void janus_streaming_destroy(void);
153
int janus_streaming_get_api_compatibility(void);
154
int janus_streaming_get_version(void);
155
const char *janus_streaming_get_version_string(void);
156
const char *janus_streaming_get_description(void);
157
const char *janus_streaming_get_name(void);
158
const char *janus_streaming_get_author(void);
159
const char *janus_streaming_get_package(void);
160
void janus_streaming_create_session(janus_plugin_session *handle, int *error);
161
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
162
void janus_streaming_setup_media(janus_plugin_session *handle);
163
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
164
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
165
void janus_streaming_hangup_media(janus_plugin_session *handle);
166
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
167
char *janus_streaming_query_session(janus_plugin_session *handle);
168

    
169
/* Plugin setup */
170
static janus_plugin janus_streaming_plugin =
171
        JANUS_PLUGIN_INIT (
172
                .init = janus_streaming_init,
173
                .destroy = janus_streaming_destroy,
174

    
175
                .get_api_compatibility = janus_streaming_get_api_compatibility,
176
                .get_version = janus_streaming_get_version,
177
                .get_version_string = janus_streaming_get_version_string,
178
                .get_description = janus_streaming_get_description,
179
                .get_name = janus_streaming_get_name,
180
                .get_author = janus_streaming_get_author,
181
                .get_package = janus_streaming_get_package,
182
                
183
                .create_session = janus_streaming_create_session,
184
                .handle_message = janus_streaming_handle_message,
185
                .setup_media = janus_streaming_setup_media,
186
                .incoming_rtp = janus_streaming_incoming_rtp,
187
                .incoming_rtcp = janus_streaming_incoming_rtcp,
188
                .hangup_media = janus_streaming_hangup_media,
189
                .destroy_session = janus_streaming_destroy_session,
190
                .query_session = janus_streaming_query_session,
191
        );
192

    
193
/* Plugin creator */
194
janus_plugin *create(void) {
195
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_STREAMING_NAME);
196
        return &janus_streaming_plugin;
197
}
198

    
199
/* Parameter validation */
200
static struct janus_json_parameter request_parameters[] = {
201
        {"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
202
};
203
static struct janus_json_parameter id_parameters[] = {
204
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE}
205
};
206
static struct janus_json_parameter adminkey_parameters[] = {
207
        {"admin_key", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
208
};
209
static struct janus_json_parameter create_parameters[] = {
210
        {"type", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
211
        {"secret", JSON_STRING, 0},
212
        {"pin", JSON_STRING, 0},
213
        {"permanent", JANUS_JSON_BOOL, 0}
214
};
215
static struct janus_json_parameter rtp_parameters[] = {
216
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
217
        {"name", JSON_STRING, 0},
218
        {"description", JSON_STRING, 0},
219
        {"is_private", JANUS_JSON_BOOL, 0},
220
        {"audio", JANUS_JSON_BOOL, 0},
221
        {"video", JANUS_JSON_BOOL, 0}
222
};
223
static struct janus_json_parameter live_parameters[] = {
224
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
225
        {"name", JSON_STRING, 0},
226
        {"description", JSON_STRING, 0},
227
        {"is_private", JANUS_JSON_BOOL, 0},
228
        {"filename", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
229
        {"audio", JANUS_JSON_BOOL, 0},
230
        {"video", JANUS_JSON_BOOL, 0}
231
};
232
static struct janus_json_parameter ondemand_parameters[] = {
233
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
234
        {"name", JSON_STRING, 0},
235
        {"description", JSON_STRING, 0},
236
        {"is_private", JANUS_JSON_BOOL, 0},
237
        {"filename", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
238
        {"audio", JANUS_JSON_BOOL, 0},
239
        {"video", JANUS_JSON_BOOL, 0}
240
};
241
#ifdef HAVE_LIBCURL
242
static struct janus_json_parameter rtsp_parameters[] = {
243
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
244
        {"name", JSON_STRING, 0},
245
        {"description", JSON_STRING, 0},
246
        {"is_private", JANUS_JSON_BOOL, 0},
247
        {"url", JSON_STRING, 0},
248
        {"audio", JANUS_JSON_BOOL, 0},
249
        {"video", JANUS_JSON_BOOL, 0}
250
};
251
#endif
252
static struct janus_json_parameter rtp_audio_parameters[] = {
253
        {"audiomcast", JSON_STRING, 0},
254
        {"audioport", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
255
        {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
256
        {"audiortpmap", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
257
        {"audiofmtp", JSON_STRING, 0}
258
};
259
static struct janus_json_parameter rtp_video_parameters[] = {
260
        {"videomcast", JSON_STRING, 0},
261
        {"videoport", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
262
        {"videopt", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
263
        {"videortpmap", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
264
        {"videofmtp", JSON_STRING, 0},
265
        {"videobufferkf", JANUS_JSON_BOOL, 0}
266
};
267
static struct janus_json_parameter destroy_parameters[] = {
268
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
269
        {"permanent", JANUS_JSON_BOOL, 0}
270
};
271
static struct janus_json_parameter recording_parameters[] = {
272
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
273
        {"action", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
274
};
275
static struct janus_json_parameter recording_start_parameters[] = {
276
        {"audio", JSON_STRING, 0},
277
        {"video", JSON_STRING, 0}
278
};
279
static struct janus_json_parameter recording_stop_parameters[] = {
280
        {"audio", JANUS_JSON_BOOL, 0},
281
        {"video", JANUS_JSON_BOOL, 0}
282
};
283

    
284
/* Static configuration instance */
285
static janus_config *config = NULL;
286
static const char *config_folder = NULL;
287
static janus_mutex config_mutex;
288

    
289
/* Useful stuff */
290
static volatile gint initialized = 0, stopping = 0;
291
static gboolean notify_events = TRUE;
292
static janus_callbacks *gateway = NULL;
293
static GThread *handler_thread;
294
static GThread *watchdog;
295
static void *janus_streaming_handler(void *data);
296
static void *janus_streaming_ondemand_thread(void *data);
297
static void *janus_streaming_filesource_thread(void *data);
298
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
299
static void *janus_streaming_relay_thread(void *data);
300
static gboolean janus_streaming_is_keyframe(gint codec, char* buffer, int len);
301

    
302
typedef enum janus_streaming_type {
303
        janus_streaming_type_none = 0,
304
        janus_streaming_type_live,
305
        janus_streaming_type_on_demand,
306
} janus_streaming_type;
307

    
308
typedef enum janus_streaming_source {
309
        janus_streaming_source_none = 0,
310
        janus_streaming_source_file,
311
        janus_streaming_source_rtp,
312
} janus_streaming_source;
313

    
314
typedef struct janus_streaming_rtp_keyframe {
315
        gboolean enabled;
316
        /* If enabled, we store the packets of the last keyframe, to immediately send them for new viewers */
317
        GList *latest_keyframe;
318
        /* This is where we store packets while we're still collecting the whole keyframe */
319
        GList *temp_keyframe;
320
        guint32 temp_ts;
321
        janus_mutex mutex;
322
} janus_streaming_rtp_keyframe;
323

    
324
typedef struct janus_streaming_rtp_source {
325
        gint audio_port;
326
        in_addr_t audio_mcast;
327
        gint video_port;
328
        in_addr_t video_mcast;
329
        janus_recorder *arc;        /* The Janus recorder instance for this streams's audio, if enabled */
330
        janus_recorder *vrc;        /* The Janus recorder instance for this streams's video, if enabled */
331
        janus_mutex rec_mutex;        /* Mutex to protect the recorders from race conditions */
332
        int audio_fd;
333
        int video_fd;
334
        gint64 last_received_video;
335
        gint64 last_received_audio;
336
#ifdef HAVE_LIBCURL
337
        CURL* curl;
338
#endif
339
        janus_streaming_rtp_keyframe keyframe;
340
} janus_streaming_rtp_source;
341

    
342
typedef struct janus_streaming_file_source {
343
        char *filename;
344
} janus_streaming_file_source;
345

    
346
#define JANUS_STREAMING_VP8                0
347
#define JANUS_STREAMING_H264        1
348
#define JANUS_STREAMING_VP9                2
349
typedef struct janus_streaming_codecs {
350
        gint audio_pt;
351
        char *audio_rtpmap;
352
        char *audio_fmtp;
353
        gint video_codec;
354
        gint video_pt;
355
        char *video_rtpmap;
356
        char *video_fmtp;
357
} janus_streaming_codecs;
358

    
359
typedef struct janus_streaming_mountpoint {
360
        guint64 id;
361
        char *name;
362
        char *description;
363
        gboolean is_private;
364
        char *secret;
365
        char *pin;
366
        gboolean enabled;
367
        gboolean active;
368
        janus_streaming_type streaming_type;
369
        janus_streaming_source streaming_source;
370
        void *source;        /* Can differ according to the source type */
371
        GDestroyNotify source_destroy;
372
        janus_streaming_codecs codecs;
373
        GList/*<unowned janus_streaming_session>*/ *listeners;
374
        gint64 destroyed;
375
        janus_mutex mutex;
376
} janus_streaming_mountpoint;
377
GHashTable *mountpoints;
378
static GList *old_mountpoints;
379
janus_mutex mountpoints_mutex;
380
static char *admin_key = NULL;
381

    
382
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp);
383

    
384
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
385
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
386
                uint64_t id, char *name, char *desc,
387
                gboolean doaudio, char* amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
388
                gboolean dovideo, char* vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf);
389
/* Helper to create a file/ondemand live source */
390
janus_streaming_mountpoint *janus_streaming_create_file_source(
391
                uint64_t id, char *name, char *desc, char *filename,
392
                gboolean live, gboolean doaudio, gboolean dovideo);
393
/* Helper to create a rtsp live source */
394
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
395
                uint64_t id, char *name, char *desc, char *url,
396
                gboolean doaudio, gboolean dovideo);
397

    
398

    
399
typedef struct janus_streaming_message {
400
        janus_plugin_session *handle;
401
        char *transaction;
402
        json_t *message;
403
        char *sdp_type;
404
        char *sdp;
405
} janus_streaming_message;
406
static GAsyncQueue *messages = NULL;
407
static janus_streaming_message exit_message;
408

    
409
static void janus_streaming_message_free(janus_streaming_message *msg) {
410
        if(!msg || msg == &exit_message)
411
                return;
412

    
413
        msg->handle = NULL;
414

    
415
        g_free(msg->transaction);
416
        msg->transaction = NULL;
417
        if(msg->message)
418
                json_decref(msg->message);
419
        msg->message = NULL;
420
        g_free(msg->sdp_type);
421
        msg->sdp_type = NULL;
422
        g_free(msg->sdp);
423
        msg->sdp = NULL;
424

    
425
        g_free(msg);
426
}
427

    
428

    
429
typedef struct janus_streaming_context {
430
        /* Needed to fix seq and ts in case of stream switching */
431
        uint32_t a_last_ssrc, a_last_ts, a_base_ts, a_base_ts_prev,
432
                        v_last_ssrc, v_last_ts, v_base_ts, v_base_ts_prev;
433
        uint16_t a_last_seq, a_base_seq, a_base_seq_prev,
434
                        v_last_seq, v_base_seq, v_base_seq_prev;
435
} janus_streaming_context;
436

    
437
typedef struct janus_streaming_session {
438
        janus_plugin_session *handle;
439
        janus_streaming_mountpoint *mountpoint;
440
        gboolean started;
441
        gboolean paused;
442
        janus_streaming_context context;
443
        gboolean stopping;
444
        volatile gint hangingup;
445
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
446
} janus_streaming_session;
447
static GHashTable *sessions;
448
static GList *old_sessions;
449
static janus_mutex sessions_mutex;
450

    
451
/* Packets we get from gstreamer and relay */
452
typedef struct janus_streaming_rtp_relay_packet {
453
        rtp_header *data;
454
        gint length;
455
        gint is_video;
456
        gint is_keyframe;
457
        uint32_t timestamp;
458
        uint16_t seq_number;
459
} janus_streaming_rtp_relay_packet;
460

    
461

    
462
/* Error codes */
463
#define JANUS_STREAMING_ERROR_NO_MESSAGE                        450
464
#define JANUS_STREAMING_ERROR_INVALID_JSON                        451
465
#define JANUS_STREAMING_ERROR_INVALID_REQUEST                452
466
#define JANUS_STREAMING_ERROR_MISSING_ELEMENT                453
467
#define JANUS_STREAMING_ERROR_INVALID_ELEMENT                454
468
#define JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT        455
469
#define JANUS_STREAMING_ERROR_CANT_CREATE                        456
470
#define JANUS_STREAMING_ERROR_UNAUTHORIZED                        457
471
#define JANUS_STREAMING_ERROR_CANT_SWITCH                        458
472
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR                        470
473

    
474

    
475
/* Streaming watchdog/garbage collector (sort of) */
476
void *janus_streaming_watchdog(void *data);
477
void *janus_streaming_watchdog(void *data) {
478
        JANUS_LOG(LOG_INFO, "Streaming watchdog started\n");
479
        gint64 now = 0;
480
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
481
                janus_mutex_lock(&sessions_mutex);
482
                /* Iterate on all the sessions */
483
                now = janus_get_monotonic_time();
484
                if(old_sessions != NULL) {
485
                        GList *sl = old_sessions;
486
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming sessions...\n", g_list_length(old_sessions));
487
                        while(sl) {
488
                                janus_streaming_session *session = (janus_streaming_session *)sl->data;
489
                                if(!session) {
490
                                        sl = sl->next;
491
                                        continue;
492
                                }
493
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
494
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
495
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming session\n");
496
                                        GList *rm = sl->next;
497
                                        old_sessions = g_list_delete_link(old_sessions, sl);
498
                                        sl = rm;
499
                                        session->handle = NULL;
500
                                        g_free(session);
501
                                        session = NULL;
502
                                        continue;
503
                                }
504
                                sl = sl->next;
505
                        }
506
                }
507
                janus_mutex_unlock(&sessions_mutex);
508
                janus_mutex_lock(&mountpoints_mutex);
509
                /* Iterate on all the mountpoints */
510
                if(old_mountpoints != NULL) {
511
                        GList *sl = old_mountpoints;
512
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming mountpoints...\n", g_list_length(old_mountpoints));
513
                        while(sl) {
514
                                janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)sl->data;
515
                                if(!mountpoint) {
516
                                        sl = sl->next;
517
                                        continue;
518
                                }
519
                                if(now-mountpoint->destroyed >= 5*G_USEC_PER_SEC) {
520
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
521
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming mountpoint\n");
522
                                        GList *rm = sl->next;
523
                                        old_mountpoints = g_list_delete_link(old_mountpoints, sl);
524
                                        sl = rm;
525
                                        janus_streaming_mountpoint_free(mountpoint);
526
                                        mountpoint = NULL;
527
                                        continue;
528
                                }
529
                                sl = sl->next;
530
                        }
531
                }
532
                janus_mutex_unlock(&mountpoints_mutex);
533
                g_usleep(500000);
534
        }
535
        JANUS_LOG(LOG_INFO, "Streaming watchdog stopped\n");
536
        return NULL;
537
}
538

    
539

    
540
/* Plugin implementation */
541
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
542
#ifdef HAVE_LIBCURL        
543
        curl_global_init(CURL_GLOBAL_ALL);
544
#endif        
545
        if(g_atomic_int_get(&stopping)) {
546
                /* Still stopping from before */
547
                return -1;
548
        }
549
        if(callback == NULL || config_path == NULL) {
550
                /* Invalid arguments */
551
                return -1;
552
        }
553

    
554
        /* Read configuration */
555
        char filename[255];
556
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
557
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
558
        config = janus_config_parse(filename);
559
        config_folder = config_path;
560
        if(config != NULL)
561
                janus_config_print(config);
562
        janus_mutex_init(&config_mutex);
563
        
564
        mountpoints = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL);
565
        janus_mutex_init(&mountpoints_mutex);
566
        /* Parse configuration to populate the mountpoints */
567
        if(config != NULL) {
568
                /* Any admin key to limit who can "create"? */
569
                janus_config_item *key = janus_config_get_item_drilldown(config, "general", "admin_key");
570
                if(key != NULL && key->value != NULL)
571
                        admin_key = g_strdup(key->value);
572
                janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
573
                if(events != NULL && events->value != NULL)
574
                        notify_events = janus_is_true(events->value);
575
                if(!notify_events && callback->events_is_enabled()) {
576
                        JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_STREAMING_NAME);
577
                }
578
                /* Iterate on all rooms */
579
                GList *cl = janus_config_get_categories(config);
580
                while(cl != NULL) {
581
                        janus_config_category *cat = (janus_config_category *)cl->data;
582
                        if(cat->name == NULL || !strcasecmp(cat->name, "general")) {
583
                                cl = cl->next;
584
                                continue;
585
                        }
586
                        JANUS_LOG(LOG_VERB, "Adding stream '%s'\n", cat->name);
587
                        janus_config_item *type = janus_config_get_item(cat, "type");
588
                        if(type == NULL || type->value == NULL) {
589
                                JANUS_LOG(LOG_WARN, "  -- Invalid type, skipping stream '%s'...\n", cat->name);
590
                                cl = cl->next;
591
                                continue;
592
                        }
593
                        if(!strcasecmp(type->value, "rtp")) {
594
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
595
                                janus_config_item *id = janus_config_get_item(cat, "id");
596
                                janus_config_item *desc = janus_config_get_item(cat, "description");
597
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
598
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
599
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
600
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
601
                                janus_config_item *video = janus_config_get_item(cat, "video");
602
                                janus_config_item *amcast = janus_config_get_item(cat, "audiomcast");
603
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
604
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
605
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
606
                                janus_config_item *afmtp = janus_config_get_item(cat, "audiofmtp");
607
                                janus_config_item *vmcast = janus_config_get_item(cat, "videomcast");
608
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
609
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
610
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
611
                                janus_config_item *vfmtp = janus_config_get_item(cat, "videofmtp");
612
                                janus_config_item *vkf = janus_config_get_item(cat, "videobufferkf");
613
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
614
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
615
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
616
                                gboolean bufferkf = video && vkf && vkf->value && janus_is_true(vkf->value);
617
                                if(!doaudio && !dovideo) {
618
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', no audio or video have to be streamed...\n", cat->name);
619
                                        cl = cl->next;
620
                                        continue;
621
                                }
622
                                if(doaudio &&
623
                                                (aport == NULL || aport->value == NULL ||
624
                                                acodec == NULL || acodec->value == NULL ||
625
                                                artpmap == NULL || artpmap->value == NULL)) {
626
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for audio...\n", cat->name);
627
                                        cl = cl->next;
628
                                        continue;
629
                                }
630
                                if(dovideo &&
631
                                                (vport == NULL || vport->value == NULL ||
632
                                                vcodec == NULL || vcodec->value == NULL ||
633
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
634
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for video...\n", cat->name);
635
                                        cl = cl->next;
636
                                        continue;
637
                                }
638
                                if(id == NULL || id->value == NULL) {
639
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
640
                                } else {
641
                                        janus_mutex_lock(&mountpoints_mutex);
642
                                        guint64 mpid = g_ascii_strtoull(id->value, 0, 10);
643
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &mpid);
644
                                        janus_mutex_unlock(&mountpoints_mutex);
645
                                        if(mp != NULL) {
646
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
647
                                                cl = cl->next;
648
                                                continue;
649
                                        }
650
                                }
651
                                JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
652
                                janus_streaming_mountpoint *mp = NULL;
653
                                if((mp = janus_streaming_create_rtp_source(
654
                                                (id && id->value) ? g_ascii_strtoull(id->value, 0, 10) : 0,
655
                                                (char *)cat->name,
656
                                                desc ? (char *)desc->value : NULL,
657
                                                doaudio,
658
                                                amcast ? (char *)amcast->value : NULL,
659
                                                (aport && aport->value) ? atoi(aport->value) : 0,
660
                                                (acodec && acodec->value) ? atoi(acodec->value) : 0,
661
                                                artpmap ? (char *)artpmap->value : NULL,
662
                                                afmtp ? (char *)afmtp->value : NULL,
663
                                                dovideo,
664
                                                vmcast ? (char *)vmcast->value : NULL,
665
                                                (vport && vport->value) ? atoi(vport->value) : 0,
666
                                                (vcodec && vcodec->value) ? atoi(vcodec->value) : 0,
667
                                                vrtpmap ? (char *)vrtpmap->value : NULL,
668
                                                vfmtp ? (char *)vfmtp->value : NULL,
669
                                                bufferkf)) == NULL) {
670
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream '%s'...\n", cat->name);
671
                                        cl = cl->next;
672
                                        continue;
673
                                }
674
                                mp->is_private = is_private;
675
                                if(secret && secret->value)
676
                                        mp->secret = g_strdup(secret->value);
677
                                if(pin && pin->value)
678
                                        mp->pin = g_strdup(pin->value);
679
                        } else if(!strcasecmp(type->value, "live")) {
680
                                /* File live source */
681
                                janus_config_item *id = janus_config_get_item(cat, "id");
682
                                janus_config_item *desc = janus_config_get_item(cat, "description");
683
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
684
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
685
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
686
                                janus_config_item *file = janus_config_get_item(cat, "filename");
687
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
688
                                janus_config_item *video = janus_config_get_item(cat, "video");
689
                                if(file == NULL || file->value == NULL) {
690
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', missing mandatory information...\n", cat->name);
691
                                        cl = cl->next;
692
                                        continue;
693
                                }
694
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
695
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
696
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
697
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
698
                                if(!doaudio || dovideo) {
699
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', we only support audio file streaming right now...\n", cat->name);
700
                                        cl = cl->next;
701
                                        continue;
702
                                }
703
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
704
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
705
                                        cl = cl->next;
706
                                        continue;
707
                                }
708
                                FILE *audiofile = fopen(file->value, "rb");
709
                                if(!audiofile) {
710
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream, no such file '%s'...\n", file->value);
711
                                        cl = cl->next;
712
                                        continue;
713
                                }
714
                                fclose(audiofile);
715
                                if(id == NULL || id->value == NULL) {
716
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
717
                                } else {
718
                                        janus_mutex_lock(&mountpoints_mutex);
719
                                        guint64 mpid = g_ascii_strtoull(id->value, 0, 10);
720
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &mpid);
721
                                        janus_mutex_unlock(&mountpoints_mutex);
722
                                        if(mp != NULL) {
723
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
724
                                                cl = cl->next;
725
                                                continue;
726
                                        }
727
                                }
728
                                janus_streaming_mountpoint *mp = NULL;
729
                                if((mp = janus_streaming_create_file_source(
730
                                                (id && id->value) ? g_ascii_strtoull(id->value, 0, 10) : 0,
731
                                                (char *)cat->name,
732
                                                desc ? (char *)desc->value : NULL,
733
                                                (char *)file->value,
734
                                                TRUE, doaudio, dovideo)) == NULL) {
735
                                        JANUS_LOG(LOG_ERR, "Error creating 'live' stream '%s'...\n", cat->name);
736
                                        cl = cl->next;
737
                                        continue;
738
                                }
739
                                mp->is_private = is_private;
740
                                if(secret && secret->value)
741
                                        mp->secret = g_strdup(secret->value);
742
                                if(pin && pin->value)
743
                                        mp->pin = g_strdup(pin->value);
744
                        } else if(!strcasecmp(type->value, "ondemand")) {
745
                                /* mu-Law file on demand source */
746
                                janus_config_item *id = janus_config_get_item(cat, "id");
747
                                janus_config_item *desc = janus_config_get_item(cat, "description");
748
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
749
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
750
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
751
                                janus_config_item *file = janus_config_get_item(cat, "filename");
752
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
753
                                janus_config_item *video = janus_config_get_item(cat, "video");
754
                                if(file == NULL || file->value == NULL) {
755
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', missing mandatory information...\n", cat->name);
756
                                        cl = cl->next;
757
                                        continue;
758
                                }
759
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
760
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
761
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
762
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
763
                                if(!doaudio || dovideo) {
764
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', we only support audio file streaming right now...\n", cat->name);
765
                                        cl = cl->next;
766
                                        continue;
767
                                }
768
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
769
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
770
                                        cl = cl->next;
771
                                        continue;
772
                                }
773
                                FILE *audiofile = fopen(file->value, "rb");
774
                                if(!audiofile) {
775
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, no such file '%s'...\n", file->value);
776
                                        cl = cl->next;
777
                                        continue;
778
                                }
779
                                fclose(audiofile);
780
                                if(id == NULL || id->value == NULL) {
781
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
782
                                } else {
783
                                        janus_mutex_lock(&mountpoints_mutex);
784
                                        guint64 mpid = g_ascii_strtoull(id->value, 0, 10);
785
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &mpid);
786
                                        janus_mutex_unlock(&mountpoints_mutex);
787
                                        if(mp != NULL) {
788
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
789
                                                cl = cl->next;
790
                                                continue;
791
                                        }
792
                                }
793
                                janus_streaming_mountpoint *mp = NULL;
794
                                if((mp = janus_streaming_create_file_source(
795
                                                (id && id->value) ? g_ascii_strtoull(id->value, 0, 10) : 0,
796
                                                (char *)cat->name,
797
                                                desc ? (char *)desc->value : NULL,
798
                                                (char *)file->value,
799
                                                FALSE, doaudio, dovideo)) == NULL) {
800
                                        JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream '%s'...\n", cat->name);
801
                                        cl = cl->next;
802
                                        continue;
803
                                }
804
                                mp->is_private = is_private;
805
                                if(secret && secret->value)
806
                                        mp->secret = g_strdup(secret->value);
807
                                if(pin && pin->value)
808
                                        mp->pin = g_strdup(pin->value);
809
                        } else if(!strcasecmp(type->value, "rtsp")) {
810
#ifndef HAVE_LIBCURL
811
                                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', libcurl support not compiled...\n", cat->name);
812
                                cl = cl->next;
813
                                continue;
814
#else
815
                                janus_config_item *id = janus_config_get_item(cat, "id");
816
                                janus_config_item *desc = janus_config_get_item(cat, "description");
817
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
818
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
819
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
820
                                janus_config_item *file = janus_config_get_item(cat, "url");
821
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
822
                                janus_config_item *video = janus_config_get_item(cat, "video");
823
                                if(file == NULL || file->value == NULL) {
824
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', missing mandatory information...\n", cat->name);
825
                                        cl = cl->next;
826
                                        continue;
827
                                }
828
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
829
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
830
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
831
                                if(id == NULL || id->value == NULL) {
832
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
833
                                } else {
834
                                        janus_mutex_lock(&mountpoints_mutex);
835
                                        guint64 mpid = g_ascii_strtoull(id->value, 0, 10);
836
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &mpid);
837
                                        janus_mutex_unlock(&mountpoints_mutex);
838
                                        if(mp != NULL) {
839
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
840
                                                cl = cl->next;
841
                                                continue;
842
                                        }
843
                                }
844
                                janus_streaming_mountpoint *mp = NULL;
845
                                if((mp = janus_streaming_create_rtsp_source(
846
                                                (id && id->value) ? g_ascii_strtoull(id->value, 0, 10) : 0,
847
                                                (char *)cat->name,
848
                                                desc ? (char *)desc->value : NULL,
849
                                                (char *)file->value, doaudio, dovideo)) == NULL) {
850
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream '%s'...\n", cat->name);
851
                                        cl = cl->next;
852
                                        continue;
853
                                }
854
                                mp->is_private = is_private;
855
                                if(secret && secret->value)
856
                                        mp->secret = g_strdup(secret->value);
857
                                if(pin && pin->value)
858
                                        mp->pin = g_strdup(pin->value);
859
#endif
860
                        } else {
861
                                JANUS_LOG(LOG_WARN, "Ignoring unknown stream type '%s' (%s)...\n", type->value, cat->name);
862
                        }
863
                        cl = cl->next;
864
                }
865
                /* Done: we keep the configuration file open in case we get a "create" or "destroy" with permanent=true */
866
        }
867
        /* Show available mountpoints */
868
        janus_mutex_lock(&mountpoints_mutex);
869
        GHashTableIter iter;
870
        gpointer value;
871
        g_hash_table_iter_init(&iter, mountpoints);
872
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
873
                janus_streaming_mountpoint *mp = value;
874
                JANUS_LOG(LOG_VERB, "  ::: [%"SCNu64"][%s] %s (%s, %s, %s, pin: %s)\n", mp->id, mp->name, mp->description,
875
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
876
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source",
877
                        mp->is_private ? "private" : "public",
878
                        mp->pin ? mp->pin : "no pin");
879
        }
880
        janus_mutex_unlock(&mountpoints_mutex);
881

    
882
        sessions = g_hash_table_new(NULL, NULL);
883
        janus_mutex_init(&sessions_mutex);
884
        messages = g_async_queue_new_full((GDestroyNotify) janus_streaming_message_free);
885
        /* This is the callback we'll need to invoke to contact the gateway */
886
        gateway = callback;
887
        g_atomic_int_set(&initialized, 1);
888

    
889
        GError *error = NULL;
890
        /* Start the sessions watchdog */
891
        watchdog = g_thread_try_new("streaming watchdog", &janus_streaming_watchdog, NULL, &error);
892
        if(!watchdog) {
893
                g_atomic_int_set(&initialized, 0);
894
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming watchdog thread...\n", error->code, error->message ? error->message : "??");
895
                janus_config_destroy(config);
896
                return -1;
897
        }
898
        /* Launch the thread that will handle incoming messages */
899
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
900
        if(error != NULL) {
901
                g_atomic_int_set(&initialized, 0);
902
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming handler thread...\n", error->code, error->message ? error->message : "??");
903
                janus_config_destroy(config);
904
                return -1;
905
        }
906
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_STREAMING_NAME);
907
        return 0;
908
}
909

    
910
void janus_streaming_destroy(void) {
911
        if(!g_atomic_int_get(&initialized))
912
                return;
913
        g_atomic_int_set(&stopping, 1);
914

    
915
        g_async_queue_push(messages, &exit_message);
916

    
917
        if(handler_thread != NULL) {
918
                g_thread_join(handler_thread);
919
                handler_thread = NULL;
920
        }
921

    
922
        /* Remove all mountpoints */
923
        janus_mutex_unlock(&mountpoints_mutex);
924
        GHashTableIter iter;
925
        gpointer value;
926
        g_hash_table_iter_init(&iter, mountpoints);
927
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
928
                janus_streaming_mountpoint *mp = value;
929
                if(!mp->destroyed) {
930
                        mp->destroyed = janus_get_monotonic_time();
931
                        old_mountpoints = g_list_append(old_mountpoints, mp);
932
                }
933
        }
934
        janus_mutex_unlock(&mountpoints_mutex);
935
        if(watchdog != NULL) {
936
                g_thread_join(watchdog);
937
                watchdog = NULL;
938
        }
939

    
940
        /* FIXME We should destroy the sessions cleanly */
941
        usleep(500000);
942
        janus_mutex_lock(&mountpoints_mutex);
943
        g_hash_table_destroy(mountpoints);
944
        janus_mutex_unlock(&mountpoints_mutex);
945
        janus_mutex_lock(&sessions_mutex);
946
        g_hash_table_destroy(sessions);
947
        janus_mutex_unlock(&sessions_mutex);
948
        g_async_queue_unref(messages);
949
        messages = NULL;
950
        sessions = NULL;
951

    
952
        janus_config_destroy(config);
953
        g_free(admin_key);
954

    
955
        g_atomic_int_set(&initialized, 0);
956
        g_atomic_int_set(&stopping, 0);
957
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_STREAMING_NAME);
958
}
959

    
960
int janus_streaming_get_api_compatibility(void) {
961
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
962
        return JANUS_PLUGIN_API_VERSION;
963
}
964

    
965
int janus_streaming_get_version(void) {
966
        return JANUS_STREAMING_VERSION;
967
}
968

    
969
const char *janus_streaming_get_version_string(void) {
970
        return JANUS_STREAMING_VERSION_STRING;
971
}
972

    
973
const char *janus_streaming_get_description(void) {
974
        return JANUS_STREAMING_DESCRIPTION;
975
}
976

    
977
const char *janus_streaming_get_name(void) {
978
        return JANUS_STREAMING_NAME;
979
}
980

    
981
const char *janus_streaming_get_author(void) {
982
        return JANUS_STREAMING_AUTHOR;
983
}
984

    
985
const char *janus_streaming_get_package(void) {
986
        return JANUS_STREAMING_PACKAGE;
987
}
988

    
989
void janus_streaming_create_session(janus_plugin_session *handle, int *error) {
990
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
991
                *error = -1;
992
                return;
993
        }        
994
        janus_streaming_session *session = (janus_streaming_session *)g_malloc0(sizeof(janus_streaming_session));
995
        if(session == NULL) {
996
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
997
                *error = -2;
998
                return;
999
        }
1000
        session->handle = handle;
1001
        session->mountpoint = NULL;        /* This will happen later */
1002
        session->started = FALSE;        /* This will happen later */
1003
        session->paused = FALSE;
1004
        session->destroyed = 0;
1005
        g_atomic_int_set(&session->hangingup, 0);
1006
        handle->plugin_handle = session;
1007
        janus_mutex_lock(&sessions_mutex);
1008
        g_hash_table_insert(sessions, handle, session);
1009
        janus_mutex_unlock(&sessions_mutex);
1010

    
1011
        return;
1012
}
1013

    
1014
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error) {
1015
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
1016
                *error = -1;
1017
                return;
1018
        }        
1019
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
1020
        if(!session) {
1021
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1022
                *error = -2;
1023
                return;
1024
        }
1025
        JANUS_LOG(LOG_VERB, "Removing streaming session...\n");
1026
        if(session->mountpoint) {
1027
                janus_mutex_lock(&session->mountpoint->mutex);
1028
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
1029
                janus_mutex_unlock(&session->mountpoint->mutex);
1030
        }
1031
        janus_mutex_lock(&sessions_mutex);
1032
        if(!session->destroyed) {
1033
                session->destroyed = janus_get_monotonic_time();
1034
                g_hash_table_remove(sessions, handle);
1035
                /* Cleaning up and removing the session is done in a lazy way */
1036
                old_sessions = g_list_append(old_sessions, session);
1037
        }
1038
        janus_mutex_unlock(&sessions_mutex);
1039
        return;
1040
}
1041

    
1042
char *janus_streaming_query_session(janus_plugin_session *handle) {
1043
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
1044
                return NULL;
1045
        }        
1046
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
1047
        if(!session) {
1048
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1049
                return NULL;
1050
        }
1051
        /* What is this user watching, if anything? */
1052
        json_t *info = json_object();
1053
        json_object_set_new(info, "state", json_string(session->mountpoint ? "watching" : "idle"));
1054
        if(session->mountpoint) {
1055
                json_object_set_new(info, "mountpoint_id", json_integer(session->mountpoint->id));
1056
                json_object_set_new(info, "mountpoint_name", session->mountpoint->name ? json_string(session->mountpoint->name) : NULL);
1057
        }
1058
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
1059
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1060
        json_decref(info);
1061
        return info_text;
1062
}
1063

    
1064
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
1065
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1066
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
1067

    
1068
        /* Pre-parse the message */
1069
        int error_code = 0;
1070
        char error_cause[512];
1071
        json_t *root = NULL;
1072
        json_t *response = NULL;
1073

    
1074
        if(message == NULL) {
1075
                JANUS_LOG(LOG_ERR, "No message??\n");
1076
                error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1077
                g_snprintf(error_cause, 512, "%s", "No message??");
1078
                goto error;
1079
        }
1080
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
1081

    
1082
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1083
        if(!session) {
1084
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1085
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1086
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
1087
                goto error;
1088
        }
1089
        if(session->destroyed) {
1090
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
1091
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1092
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
1093
                goto error;
1094
        }
1095
        json_error_t error;
1096
        root = json_loads(message, 0, &error);
1097
        if(!root) {
1098
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
1099
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
1100
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
1101
                goto error;
1102
        }
1103
        if(!json_is_object(root)) {
1104
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
1105
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
1106
                g_snprintf(error_cause, 512, "JSON error: not an object");
1107
                goto error;
1108
        }
1109
        /* Get the request first */
1110
        JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,
1111
                error_code, error_cause, TRUE,
1112
                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1113
        if(error_code != 0)
1114
                goto error;
1115
        json_t *request = json_object_get(root, "request");
1116
        /* Some requests ('create' and 'destroy') can be handled synchronously */
1117
        const char *request_text = json_string_value(request);
1118
        if(!strcasecmp(request_text, "list")) {
1119
                json_t *list = json_array();
1120
                JANUS_LOG(LOG_VERB, "Request for the list of mountpoints\n");
1121
                /* Return a list of all available mountpoints */
1122
                janus_mutex_lock(&mountpoints_mutex);
1123
                GHashTableIter iter;
1124
                gpointer value;
1125
                g_hash_table_iter_init(&iter, mountpoints);
1126
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
1127
                        janus_streaming_mountpoint *mp = value;
1128
                        if(mp->is_private) {
1129
                                /* Skip private stream */
1130
                                JANUS_LOG(LOG_VERB, "Skipping private mountpoint '%s'\n", mp->description);
1131
                                continue;
1132
                        }
1133
                        json_t *ml = json_object();
1134
                        json_object_set_new(ml, "id", json_integer(mp->id));
1135
                        json_object_set_new(ml, "description", json_string(mp->description));
1136
                        json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1137
                        if(mp->streaming_source == janus_streaming_source_rtp) {
1138
                                janus_streaming_rtp_source *source = mp->source;
1139
                                gint64 now = janus_get_monotonic_time();
1140
                                if(source->audio_fd != -1)
1141
                                        json_object_set_new(ml, "audio_age_ms", json_integer((now - source->last_received_audio) / 1000));
1142
                                if(source->video_fd != -1)
1143
                                        json_object_set_new(ml, "video_age_ms", json_integer((now - source->last_received_video) / 1000));
1144
                        }
1145
                        json_array_append_new(list, ml);
1146
                }
1147
                janus_mutex_unlock(&mountpoints_mutex);
1148
                /* Send info back */
1149
                response = json_object();
1150
                json_object_set_new(response, "streaming", json_string("list"));
1151
                json_object_set_new(response, "list", list);
1152
                goto plugin_response;
1153
        } else if(!strcasecmp(request_text, "info")) {
1154
                JANUS_LOG(LOG_VERB, "Request info on a specific mountpoint\n");
1155
                /* Return info on a specific mountpoint */
1156
                JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
1157
                        error_code, error_cause, TRUE,
1158
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1159
                if(error_code != 0)
1160
                        goto error;
1161
                json_t *id = json_object_get(root, "id");
1162
                guint64 id_value = json_integer_value(id);
1163
                janus_mutex_lock(&mountpoints_mutex);
1164
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
1165
                if(mp == NULL) {
1166
                        janus_mutex_unlock(&mountpoints_mutex);
1167
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1168
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1169
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1170
                        goto error;
1171
                }
1172
                json_t *ml = json_object();
1173
                json_object_set_new(ml, "id", json_integer(mp->id));
1174
                json_object_set_new(ml, "description", json_string(mp->description));
1175
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1176
                if(mp->streaming_source == janus_streaming_source_rtp) {
1177
                        janus_streaming_rtp_source *source = mp->source;
1178
                        gint64 now = janus_get_monotonic_time();
1179
                        if(source->audio_fd != -1)
1180
                                json_object_set_new(ml, "audio_age_ms", json_integer((now - source->last_received_audio) / 1000));
1181
                        if(source->video_fd != -1)
1182
                                json_object_set_new(ml, "video_age_ms", json_integer((now - source->last_received_video) / 1000));
1183
                }
1184
                janus_mutex_unlock(&mountpoints_mutex);
1185
                /* Send info back */
1186
                response = json_object();
1187
                json_object_set_new(response, "streaming", json_string("info"));
1188
                json_object_set_new(response, "info", ml);
1189
                goto plugin_response;
1190
        } else if(!strcasecmp(request_text, "create")) {
1191
                /* Create a new stream */
1192
                JANUS_VALIDATE_JSON_OBJECT(root, create_parameters,
1193
                        error_code, error_cause, TRUE,
1194
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1195
                if(error_code != 0)
1196
                        goto error;
1197
                if(admin_key != NULL) {
1198
                        /* An admin key was specified: make sure it was provided, and that it's valid */
1199
                        JANUS_VALIDATE_JSON_OBJECT(root, adminkey_parameters,
1200
                                error_code, error_cause, TRUE,
1201
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1202
                        if(error_code != 0)
1203
                                goto error;
1204
                        JANUS_CHECK_SECRET(admin_key, root, "admin_key", error_code, error_cause,
1205
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1206
                        if(error_code != 0)
1207
                                goto error;
1208
                }
1209
                json_t *type = json_object_get(root, "type");
1210
                const char *type_text = json_string_value(type);
1211
                json_t *secret = json_object_get(root, "secret");
1212
                json_t *pin = json_object_get(root, "pin");
1213
                json_t *permanent = json_object_get(root, "permanent");
1214
                gboolean save = permanent ? json_is_true(permanent) : FALSE;
1215
                if(save && config == NULL) {
1216
                        JANUS_LOG(LOG_ERR, "No configuration file, can't create permanent mountpoint\n");
1217
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1218
                        g_snprintf(error_cause, 512, "No configuration file, can't create permanent mountpoint");
1219
                        goto error;
1220
                }
1221
                janus_streaming_mountpoint *mp = NULL;
1222
                if(!strcasecmp(type_text, "rtp")) {
1223
                        /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
1224
                        JANUS_VALIDATE_JSON_OBJECT(root, rtp_parameters,
1225
                                error_code, error_cause, TRUE,
1226
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1227
                        if(error_code != 0)
1228
                                goto error;
1229
                        json_t *id = json_object_get(root, "id");
1230
                        json_t *name = json_object_get(root, "name");
1231
                        json_t *desc = json_object_get(root, "description");
1232
                        json_t *is_private = json_object_get(root, "is_private");
1233
                        json_t *audio = json_object_get(root, "audio");
1234
                        json_t *video = json_object_get(root, "video");
1235
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1236
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1237
                        if(!doaudio && !dovideo) {
1238
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1239
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1240
                                g_snprintf(error_cause, 512, "Can't add 'rtp' stream, no audio or video have to be streamed...");
1241
                                goto error;
1242
                        }
1243
                        uint16_t aport = 0;
1244
                        uint8_t acodec = 0;
1245
                        char *artpmap = NULL, *afmtp = NULL, *amcast = NULL;
1246
                        if(doaudio) {
1247
                                JANUS_VALIDATE_JSON_OBJECT(root, rtp_audio_parameters,
1248
                                        error_code, error_cause, TRUE,
1249
                                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1250
                                if(error_code != 0)
1251
                                        goto error;
1252
                                json_t *audiomcast = json_object_get(root, "audiomcast");
1253
                                amcast = (char *)json_string_value(audiomcast);
1254
                                json_t *audioport = json_object_get(root, "audioport");
1255
                                aport = json_integer_value(audioport);
1256
                                json_t *audiopt = json_object_get(root, "audiopt");
1257
                                acodec = json_integer_value(audiopt);
1258
                                json_t *audiortpmap = json_object_get(root, "audiortpmap");
1259
                                artpmap = (char *)json_string_value(audiortpmap);
1260
                                json_t *audiofmtp = json_object_get(root, "audiofmtp");
1261
                                afmtp = (char *)json_string_value(audiofmtp);
1262
                        }
1263
                        uint16_t vport = 0;
1264
                        uint8_t vcodec = 0;
1265
                        char *vrtpmap = NULL, *vfmtp = NULL, *vmcast = NULL;
1266
                        gboolean bufferkf = FALSE;
1267
                        if(dovideo) {
1268
                                JANUS_VALIDATE_JSON_OBJECT(root, rtp_video_parameters,
1269
                                        error_code, error_cause, TRUE,
1270
                                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1271
                                if(error_code != 0)
1272
                                        goto error;
1273
                                json_t *videomcast = json_object_get(root, "videomcast");
1274
                                vmcast = (char *)json_string_value(videomcast);
1275
                                json_t *videoport = json_object_get(root, "videoport");
1276
                                vport = json_integer_value(videoport);
1277
                                json_t *videopt = json_object_get(root, "videopt");
1278
                                vcodec = json_integer_value(videopt);
1279
                                json_t *videortpmap = json_object_get(root, "videortpmap");
1280
                                vrtpmap = (char *)json_string_value(videortpmap);
1281
                                json_t *videofmtp = json_object_get(root, "videofmtp");
1282
                                vfmtp = (char *)json_string_value(videofmtp);
1283
                                json_t *vkf = json_object_get(root, "videobufferkf");
1284
                                bufferkf = vkf ? json_is_true(vkf) : FALSE;
1285
                        }
1286
                        if(id == NULL) {
1287
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1288
                        } else {
1289
                                janus_mutex_lock(&mountpoints_mutex);
1290
                                guint64 mpid = json_integer_value(id);
1291
                                mp = g_hash_table_lookup(mountpoints, &mpid);
1292
                                janus_mutex_unlock(&mountpoints_mutex);
1293
                                if(mp != NULL) {
1294
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1295
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1296
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1297
                                        goto error;
1298
                                }
1299
                        }
1300
                        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
1301
                        mp = janus_streaming_create_rtp_source(
1302
                                        id ? json_integer_value(id) : 0,
1303
                                        name ? (char *)json_string_value(name) : NULL,
1304
                                        desc ? (char *)json_string_value(desc) : NULL,
1305
                                        doaudio, amcast, aport, acodec, artpmap, afmtp,
1306
                                        dovideo, vmcast, vport, vcodec, vrtpmap, vfmtp, bufferkf);
1307
                        if(mp == NULL) {
1308
                                JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream...\n");
1309
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1310
                                g_snprintf(error_cause, 512, "Error creating 'rtp' stream");
1311
                                goto error;
1312
                        }
1313
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1314
                } else if(!strcasecmp(type_text, "live")) {
1315
                        /* File live source */
1316
                        JANUS_VALIDATE_JSON_OBJECT(root, live_parameters,
1317
                                error_code, error_cause, TRUE,
1318
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1319
                        if(error_code != 0)
1320
                                goto error;
1321
                        json_t *id = json_object_get(root, "id");
1322
                        json_t *name = json_object_get(root, "name");
1323
                        json_t *desc = json_object_get(root, "description");
1324
                        json_t *is_private = json_object_get(root, "is_private");
1325
                        json_t *file = json_object_get(root, "filename");
1326
                        json_t *audio = json_object_get(root, "audio");
1327
                        json_t *video = json_object_get(root, "video");
1328
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1329
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1330
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1331
                        if(!doaudio || dovideo) {
1332
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n");
1333
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1334
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, we only support audio file streaming right now...");
1335
                                goto error;
1336
                        }
1337
                        char *filename = (char *)json_string_value(file);
1338
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1339
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1340
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1341
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1342
                                goto error;
1343
                        }
1344
                        FILE *audiofile = fopen(filename, "rb");
1345
                        if(!audiofile) {
1346
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, no such file '%s'...\n", filename);
1347
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1348
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, no such file '%s'\n", filename);
1349
                                goto error;
1350
                        }
1351
                        fclose(audiofile);
1352
                        if(id == NULL) {
1353
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1354
                        } else {
1355
                                janus_mutex_lock(&mountpoints_mutex);
1356
                                guint64 mpid = json_integer_value(id);
1357
                                mp = g_hash_table_lookup(mountpoints, &mpid);
1358
                                janus_mutex_unlock(&mountpoints_mutex);
1359
                                if(mp != NULL) {
1360
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1361
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1362
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1363
                                        goto error;
1364
                                }
1365
                        }
1366
                        mp = janus_streaming_create_file_source(
1367
                                        id ? json_integer_value(id) : 0,
1368
                                        name ? (char *)json_string_value(name) : NULL,
1369
                                        desc ? (char *)json_string_value(desc) : NULL,
1370
                                        filename,
1371
                                        TRUE, doaudio, dovideo);
1372
                        if(mp == NULL) {
1373
                                JANUS_LOG(LOG_ERR, "Error creating 'live' stream...\n");
1374
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1375
                                g_snprintf(error_cause, 512, "Error creating 'live' stream");
1376
                                goto error;
1377
                        }
1378
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1379
                } else if(!strcasecmp(type_text, "ondemand")) {
1380
                        /* mu-Law file on demand source */
1381
                        JANUS_VALIDATE_JSON_OBJECT(root, ondemand_parameters,
1382
                                error_code, error_cause, TRUE,
1383
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1384
                        if(error_code != 0)
1385
                                goto error;
1386
                        json_t *id = json_object_get(root, "id");
1387
                        json_t *name = json_object_get(root, "name");
1388
                        json_t *desc = json_object_get(root, "description");
1389
                        json_t *is_private = json_object_get(root, "is_private");
1390
                        json_t *file = json_object_get(root, "filename");
1391
                        json_t *audio = json_object_get(root, "audio");
1392
                        json_t *video = json_object_get(root, "video");
1393
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1394
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1395
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1396
                        if(!doaudio || dovideo) {
1397
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
1398
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1399
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, we only support audio file streaming right now...");
1400
                                goto error;
1401
                        }
1402
                        char *filename = (char *)json_string_value(file);
1403
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1404
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1405
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1406
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1407
                                goto error;
1408
                        }
1409
                        FILE *audiofile = fopen(filename, "rb");
1410
                        if(!audiofile) {
1411
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, no such file '%s'...\n", filename);
1412
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1413
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, no such file '%s'\n", filename);
1414
                                goto error;
1415
                        }
1416
                        fclose(audiofile);
1417
                        if(id == NULL) {
1418
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1419
                        } else {
1420
                                janus_mutex_lock(&mountpoints_mutex);
1421
                                guint64 mpid = json_integer_value(id);
1422
                                mp = g_hash_table_lookup(mountpoints, &mpid);
1423
                                janus_mutex_unlock(&mountpoints_mutex);
1424
                                if(mp != NULL) {
1425
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1426
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1427
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1428
                                        goto error;
1429
                                }
1430
                        }
1431
                        mp = janus_streaming_create_file_source(
1432
                                        id ? json_integer_value(id) : 0,
1433
                                        name ? (char *)json_string_value(name) : NULL,
1434
                                        desc ? (char *)json_string_value(desc) : NULL,
1435
                                        filename,
1436
                                        FALSE, doaudio, dovideo);
1437
                        if(mp == NULL) {
1438
                                JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream...\n");
1439
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1440
                                g_snprintf(error_cause, 512, "Error creating 'ondemand' stream");
1441
                                goto error;
1442
                        }
1443
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1444
                } else if(!strcasecmp(type_text, "rtsp")) {
1445
#ifndef HAVE_LIBCURL
1446
                        JANUS_LOG(LOG_ERR, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1447
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1448
                        g_snprintf(error_cause, 512, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1449
                        goto error;
1450
#else
1451
                        JANUS_VALIDATE_JSON_OBJECT(root, rtsp_parameters,
1452
                                error_code, error_cause, TRUE,
1453
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1454
                        if(error_code != 0)
1455
                                goto error;
1456
                        /* RTSP source*/
1457
                        json_t *id = json_object_get(root, "id");
1458
                        json_t *name = json_object_get(root, "name");
1459
                        json_t *desc = json_object_get(root, "description");
1460
                        json_t *is_private = json_object_get(root, "is_private");
1461
                        json_t *audio = json_object_get(root, "audio");
1462
                        json_t *video = json_object_get(root, "video");
1463
                        json_t *url = json_object_get(root, "url");
1464
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1465
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1466
                        if(!doaudio && !dovideo) {
1467
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1468
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1469
                                g_snprintf(error_cause, 512, "Can't add 'rtsp' stream, no audio or video have to be streamed...");
1470
                                goto error;
1471
                        }
1472
                        mp = janus_streaming_create_rtsp_source(
1473
                                        id ? json_integer_value(id) : 0,
1474
                                        name ? (char *)json_string_value(name) : NULL,
1475
                                        desc ? (char *)json_string_value(desc) : NULL,
1476
                                        (char *)json_string_value(url),
1477
                                        doaudio, dovideo);
1478
                        if(mp == NULL) {
1479
                                JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream...\n");
1480
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1481
                                g_snprintf(error_cause, 512, "Error creating 'RTSP' stream");
1482
                                goto error;
1483
                        }
1484
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;                        
1485
#endif
1486
                } else {
1487
                        JANUS_LOG(LOG_ERR, "Unknown stream type '%s'...\n", type_text);
1488
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1489
                        g_snprintf(error_cause, 512, "Unknown stream type '%s'...\n", type_text);
1490
                        goto error;
1491
                }
1492
                /* Any secret? */
1493
                if(secret)
1494
                        mp->secret = g_strdup(json_string_value(secret));
1495
                /* Any PIN? */
1496
                if(pin)
1497
                        mp->pin = g_strdup(json_string_value(pin));
1498
                if(save) {
1499
                        /* This mountpoint is permanent: save to the configuration file too
1500
                         * FIXME: We should check if anything fails... */
1501
                        JANUS_LOG(LOG_VERB, "Saving mountpoint %"SCNu64" permanently in config file\n", mp->id);
1502
                        janus_mutex_lock(&config_mutex);
1503
                        char value[BUFSIZ];
1504
                        /* The category to add is the mountpoint name */
1505
                        janus_config_add_category(config, mp->name);
1506
                        /* Now for the common values */
1507
                        janus_config_add_item(config, mp->name, "type", type_text);
1508
                        g_snprintf(value, BUFSIZ, "%"SCNu64, mp->id);
1509
                        janus_config_add_item(config, mp->name, "id", value);
1510
                        janus_config_add_item(config, mp->name, "description", mp->description);
1511
                        if(mp->is_private)
1512
                                janus_config_add_item(config, mp->name, "is_private", "yes");
1513
                        /* Per type values */
1514
                        if(!strcasecmp(type_text, "rtp")) {
1515
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt >= 0 ? "yes" : "no");
1516
                                janus_streaming_rtp_source *source = mp->source;
1517
                                if(mp->codecs.audio_pt >= 0) {
1518
                                        g_snprintf(value, BUFSIZ, "%d", source->audio_port);
1519
                                        janus_config_add_item(config, mp->name, "audioport", value);
1520
                                        json_t *audiomcast = json_object_get(root, "audiomcast");
1521
                                        if(audiomcast)
1522
                                                janus_config_add_item(config, mp->name, "audiomcast", json_string_value(json_object_get(root, "audiomcast")));
1523
                                        g_snprintf(value, BUFSIZ, "%d", mp->codecs.audio_pt);
1524
                                        janus_config_add_item(config, mp->name, "audiopt", value);
1525
                                        janus_config_add_item(config, mp->name, "audiortpmap", mp->codecs.audio_rtpmap);
1526
                                        if(mp->codecs.audio_fmtp)
1527
                                                janus_config_add_item(config, mp->name, "audiofmtp", mp->codecs.audio_fmtp);
1528
                                }
1529
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt >= 0? "yes" : "no");
1530
                                if(mp->codecs.video_pt >= 0) {
1531
                                        g_snprintf(value, BUFSIZ, "%d", source->video_port);
1532
                                        janus_config_add_item(config, mp->name, "videoport", value);
1533
                                        json_t *videomcast = json_object_get(root, "videomcast");
1534
                                        if(videomcast)
1535
                                                janus_config_add_item(config, mp->name, "videomcast", json_string_value(json_object_get(root, "videomcast")));
1536
                                        g_snprintf(value, BUFSIZ, "%d", mp->codecs.video_pt);
1537
                                        janus_config_add_item(config, mp->name, "videopt", value);
1538
                                        janus_config_add_item(config, mp->name, "videortpmap", mp->codecs.video_rtpmap);
1539
                                        if(mp->codecs.video_fmtp)
1540
                                                janus_config_add_item(config, mp->name, "videofmtp", mp->codecs.video_fmtp);
1541
                                        if(source->keyframe.enabled)
1542
                                                janus_config_add_item(config, mp->name, "videobufferkf", "yes");
1543
                                }
1544
                        } else if(!strcasecmp(type_text, "live") || !strcasecmp(type_text, "ondemand")) {
1545
                                janus_streaming_file_source *source = mp->source;
1546
                                janus_config_add_item(config, mp->name, "filename", source->filename);
1547
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt ? "yes" : "no");
1548
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt ? "yes" : "no");
1549
                        } else if(!strcasecmp(type_text, "rtsp")) {
1550
                                janus_config_add_item(config, mp->name, "url", json_string_value(json_object_get(root, "url")));
1551
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt ? "yes" : "no");
1552
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt ? "yes" : "no");
1553
                        }
1554
                        /* Some more common values */
1555
                        if(mp->secret)
1556
                                janus_config_add_item(config, mp->name, "secret", mp->secret);
1557
                        if(mp->pin)
1558
                                janus_config_add_item(config, mp->name, "pin", mp->pin);
1559
                        /* Save modified configuration */
1560
                        janus_config_save(config, config_folder, JANUS_STREAMING_PACKAGE);
1561
                        janus_mutex_unlock(&config_mutex);
1562
                }
1563
                /* Send info back */
1564
                response = json_object();
1565
                json_object_set_new(response, "streaming", json_string("created"));
1566
                json_object_set_new(response, "created", json_string(mp->name));
1567
                json_t *ml = json_object();
1568
                json_object_set_new(ml, "id", json_integer(mp->id));
1569
                json_object_set_new(ml, "description", json_string(mp->description));
1570
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1571
                json_object_set_new(ml, "is_private", json_string(mp->is_private ? "true" : "false"));
1572
                json_object_set_new(response, "stream", ml);
1573
                /* Also notify event handlers */
1574
                if(notify_events && gateway->events_is_enabled()) {
1575
                        json_t *info = json_object();
1576
                        json_object_set_new(info, "event", json_string("created"));
1577
                        json_object_set_new(info, "id", json_integer(mp->id));
1578
                        json_object_set_new(info, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1579
                        gateway->notify_event(session->handle, info);
1580
                }
1581
                goto plugin_response;
1582
        } else if(!strcasecmp(request_text, "destroy")) {
1583
                /* Get rid of an existing stream (notice this doesn't remove it from the config file, though) */
1584
                JANUS_VALIDATE_JSON_OBJECT(root, destroy_parameters,
1585
                        error_code, error_cause, TRUE,
1586
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1587
                if(error_code != 0)
1588
                        goto error;
1589
                json_t *id = json_object_get(root, "id");
1590
                json_t *permanent = json_object_get(root, "permanent");
1591
                gboolean save = permanent ? json_is_true(permanent) : FALSE;
1592
                if(save && config == NULL) {
1593
                        JANUS_LOG(LOG_ERR, "No configuration file, can't destroy mountpoint permanently\n");
1594
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1595
                        g_snprintf(error_cause, 512, "No configuration file, can't destroy mountpoint permanently");
1596
                        goto error;
1597
                }
1598
                guint64 id_value = json_integer_value(id);
1599
                janus_mutex_lock(&mountpoints_mutex);
1600
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
1601
                if(mp == NULL) {
1602
                        janus_mutex_unlock(&mountpoints_mutex);
1603
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1604
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1605
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1606
                        goto error;
1607
                }
1608
                /* A secret may be required for this action */
1609
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1610
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1611
                if(error_code != 0) {
1612
                        janus_mutex_unlock(&mountpoints_mutex);
1613
                        goto error;
1614
                }
1615
                JANUS_LOG(LOG_VERB, "Request to unmount mountpoint/stream %"SCNu64"\n", id_value);
1616
                /* FIXME Should we kick the current viewers as well? */
1617
                janus_mutex_lock(&mp->mutex);
1618
                GList *viewer = g_list_first(mp->listeners);
1619
                /* Prepare JSON event */
1620
                json_t *event = json_object();
1621
                json_object_set_new(event, "streaming", json_string("event"));
1622
                json_t *result = json_object();
1623
                json_object_set_new(result, "status", json_string("stopped"));
1624
                json_object_set_new(event, "result", result);
1625
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1626
                json_decref(event);
1627
                while(viewer) {
1628
                        janus_streaming_session *session = (janus_streaming_session *)viewer->data;
1629
                        if(session != NULL) {
1630
                                session->stopping = TRUE;
1631
                                session->started = FALSE;
1632
                                session->paused = FALSE;
1633
                                session->mountpoint = NULL;
1634
                                /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1635
                                gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1636
                                gateway->close_pc(session->handle);
1637
                        }
1638
                        mp->listeners = g_list_remove_all(mp->listeners, session);
1639
                        viewer = g_list_first(mp->listeners);
1640
                }
1641
                g_free(event_text);
1642
                janus_mutex_unlock(&mp->mutex);
1643
                if(save) {
1644
                        /* This change is permanent: save to the configuration file too
1645
                         * FIXME: We should check if anything fails... */
1646
                        JANUS_LOG(LOG_VERB, "Destroying mountpoint %"SCNu64" (%s) permanently in config file\n", mp->id, mp->name);
1647
                        janus_mutex_lock(&config_mutex);
1648
                        /* The category to remove is the mountpoint name */
1649
                        janus_config_remove_category(config, mp->name);
1650
                        /* Save modified configuration */
1651
                        janus_config_save(config, config_folder, JANUS_STREAMING_PACKAGE);
1652
                        janus_mutex_unlock(&config_mutex);
1653
                }
1654
                /* Remove mountpoint from the hashtable: this will get it destroyed */
1655
                if(!mp->destroyed) {
1656
                        mp->destroyed = janus_get_monotonic_time();
1657
                        g_hash_table_remove(mountpoints, &id_value);
1658
                        /* Cleaning up and removing the mountpoint is done in a lazy way */
1659
                        old_mountpoints = g_list_append(old_mountpoints, mp);
1660
                }
1661
                /* Also notify event handlers */
1662
                if(notify_events && gateway->events_is_enabled()) {
1663
                        json_t *info = json_object();
1664
                        json_object_set_new(info, "event", json_string("destroyed"));
1665
                        json_object_set_new(info, "id", json_integer(id_value));
1666
                        gateway->notify_event(session->handle, info);
1667
                }
1668
                janus_mutex_unlock(&mountpoints_mutex);
1669
                /* Send info back */
1670
                response = json_object();
1671
                json_object_set_new(response, "streaming", json_string("destroyed"));
1672
                json_object_set_new(response, "destroyed", json_integer(id_value));
1673
                goto plugin_response;
1674
        } else if(!strcasecmp(request_text, "recording")) {
1675
                /* We can start/stop recording a live, RTP-based stream */
1676
                JANUS_VALIDATE_JSON_OBJECT(root, recording_parameters,
1677
                        error_code, error_cause, TRUE,
1678
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1679
                if(error_code != 0)
1680
                        goto error;
1681
                json_t *action = json_object_get(root, "action");
1682
                const char *action_text = json_string_value(action);
1683
                if(strcasecmp(action_text, "start") && strcasecmp(action_text, "stop")) {
1684
                        JANUS_LOG(LOG_ERR, "Invalid action (should be start|stop)\n");
1685
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1686
                        g_snprintf(error_cause, 512, "Invalid action (should be start|stop)");
1687
                        goto error;
1688
                }
1689
                json_t *id = json_object_get(root, "id");
1690
                guint64 id_value = json_integer_value(id);
1691
                janus_mutex_lock(&mountpoints_mutex);
1692
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
1693
                if(mp == NULL) {
1694
                        janus_mutex_unlock(&mountpoints_mutex);
1695
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1696
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1697
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1698
                        goto error;
1699
                }
1700
                if(mp->streaming_type != janus_streaming_type_live || mp->streaming_source != janus_streaming_source_rtp) {
1701
                        janus_mutex_unlock(&mountpoints_mutex);
1702
                        JANUS_LOG(LOG_ERR, "Recording is only available on RTP-based live streams\n");
1703
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1704
                        g_snprintf(error_cause, 512, "Recording is only available on RTP-based live streams");
1705
                        goto error;
1706
                }
1707
                /* A secret may be required for this action */
1708
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1709
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1710
                if(error_code != 0) {
1711
                        janus_mutex_unlock(&mountpoints_mutex);
1712
                        goto error;
1713
                }
1714
                janus_streaming_rtp_source *source = mp->source;
1715
                if(!strcasecmp(action_text, "start")) {
1716
                        /* Start a recording for audio and/or video */
1717
                        JANUS_VALIDATE_JSON_OBJECT(root, recording_start_parameters,
1718
                                error_code, error_cause, TRUE,
1719
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1720
                        if(error_code != 0) {
1721
                                janus_mutex_unlock(&mountpoints_mutex);
1722
                                goto error;
1723
                        }
1724
                        json_t *audio = json_object_get(root, "audio");
1725
                        json_t *video = json_object_get(root, "video");
1726
                        if((audio && source->arc) || (video && source->vrc)) {
1727
                                janus_mutex_unlock(&mountpoints_mutex);
1728
                                JANUS_LOG(LOG_ERR, "Recording for audio and/or video already started for this stream\n");
1729
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1730
                                g_snprintf(error_cause, 512, "Recording for audio and/or video already started for this stream");
1731
                                goto error;
1732
                        }
1733
                        if(!audio && !video) {
1734
                                janus_mutex_unlock(&mountpoints_mutex);
1735
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1736
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1737
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1738
                                goto error;
1739
                        }
1740
                        if(audio) {
1741
                                const char *codec = NULL;
1742
                                if(strstr(mp->codecs.audio_rtpmap, "opus") || strstr(mp->codecs.audio_rtpmap, "OPUS"))
1743
                                        codec = "opus";
1744
                                else if(strstr(mp->codecs.audio_rtpmap, "pcm") || strstr(mp->codecs.audio_rtpmap, "PCM"))
1745
                                        codec = "g711";
1746
                                const char *audiofile = json_string_value(audio);
1747
                                source->arc = janus_recorder_create(NULL, codec, (char *)audiofile);
1748
                                if(source->arc == NULL) {
1749
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for audio\n");
1750
                                } else {
1751
                                        JANUS_LOG(LOG_INFO, "[%s] Audio recording started\n", mp->name);
1752
                                }
1753
                        }
1754
                        if(video) {
1755
                                const char *codec = NULL;
1756
                                if(strstr(mp->codecs.video_rtpmap, "vp8") || strstr(mp->codecs.video_rtpmap, "VP8"))
1757
                                        codec = "vp8";
1758
                                else if(strstr(mp->codecs.video_rtpmap, "vp9") || strstr(mp->codecs.video_rtpmap, "VP9"))
1759
                                        codec = "vp9";
1760
                                else if(strstr(mp->codecs.video_rtpmap, "h264") || strstr(mp->codecs.video_rtpmap, "H264"))
1761
                                        codec = "h264";
1762
                                const char *videofile = json_string_value(video);
1763
                                source->vrc = janus_recorder_create(NULL, codec, (char *)videofile);
1764
                                if(source->vrc == NULL) {
1765
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for video\n");
1766
                                } else {
1767
                                        JANUS_LOG(LOG_INFO, "[%s] Video recording started\n", mp->name);
1768
                                }
1769
                        }
1770
                        janus_mutex_unlock(&mountpoints_mutex);
1771
                        /* Send a success response back */
1772
                        response = json_object();
1773
                        json_object_set_new(response, "streaming", json_string("ok"));
1774
                        goto plugin_response;
1775
                } else if(!strcasecmp(action_text, "stop")) {
1776
                        /* Stop the recording */
1777
                        JANUS_VALIDATE_JSON_OBJECT(root, recording_stop_parameters,
1778
                                error_code, error_cause, TRUE,
1779
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1780
                        if(error_code != 0) {
1781
                                janus_mutex_unlock(&mountpoints_mutex);
1782
                                goto error;
1783
                        }
1784
                        json_t *audio = json_object_get(root, "audio");
1785
                        json_t *video = json_object_get(root, "video");
1786
                        if(!audio && !video) {
1787
                                janus_mutex_unlock(&mountpoints_mutex);
1788
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1789
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1790
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1791
                                goto error;
1792
                        }
1793
                        janus_mutex_lock(&source->rec_mutex);
1794
                        if(audio && json_is_true(audio) && source->arc) {
1795
                                /* Close the audio recording */
1796
                                janus_recorder_close(source->arc);
1797
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1798
                                janus_recorder *tmp = source->arc;
1799
                                source->arc = NULL;
1800
                                janus_recorder_free(tmp);
1801
                        }
1802
                        if(video && json_is_true(video) && source->vrc) {
1803
                                /* Close the video recording */
1804
                                janus_recorder_close(source->vrc);
1805
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1806
                                janus_recorder *tmp = source->vrc;
1807
                                source->vrc = NULL;
1808
                                janus_recorder_free(tmp);
1809
                        }
1810
                        janus_mutex_unlock(&source->rec_mutex);
1811
                        janus_mutex_unlock(&mountpoints_mutex);
1812
                        /* Send a success response back */
1813
                        response = json_object();
1814
                        json_object_set_new(response, "streaming", json_string("ok"));
1815
                        goto plugin_response;
1816
                }
1817
        } else if(!strcasecmp(request_text, "enable") || !strcasecmp(request_text, "disable")) {
1818
                /* A request to enable/disable a mountpoint */
1819
                JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
1820
                        error_code, error_cause, TRUE,
1821
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1822
                if(error_code != 0)
1823
                        goto error;
1824
                json_t *id = json_object_get(root, "id");
1825
                guint64 id_value = json_integer_value(id);
1826
                janus_mutex_lock(&mountpoints_mutex);
1827
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
1828
                if(mp == NULL) {
1829
                        janus_mutex_unlock(&mountpoints_mutex);
1830
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1831
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1832
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1833
                        goto error;
1834
                }
1835
                /* A secret may be required for this action */
1836
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1837
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1838
                if(error_code != 0) {
1839
                        janus_mutex_unlock(&mountpoints_mutex);
1840
                        goto error;
1841
                }
1842
                if(!strcasecmp(request_text, "enable")) {
1843
                        /* Enable a previously disabled mountpoint */
1844
                        JANUS_LOG(LOG_INFO, "[%s] Stream enabled\n", mp->name);
1845
                        mp->enabled = TRUE;
1846
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1847
                } else {
1848
                        /* Disable a previously enabled mountpoint */
1849
                        JANUS_LOG(LOG_INFO, "[%s] Stream disabled\n", mp->name);
1850
                        mp->enabled = FALSE;
1851
                        /* Any recording to close? */
1852
                        janus_streaming_rtp_source *source = mp->source;
1853
                        janus_mutex_lock(&source->rec_mutex);
1854
                        if(source->arc) {
1855
                                janus_recorder_close(source->arc);
1856
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1857
                                janus_recorder *tmp = source->arc;
1858
                                source->arc = NULL;
1859
                                janus_recorder_free(tmp);
1860
                        }
1861
                        if(source->vrc) {
1862
                                janus_recorder_close(source->vrc);
1863
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1864
                                janus_recorder *tmp = source->vrc;
1865
                                source->vrc = NULL;
1866
                                janus_recorder_free(tmp);
1867
                        }
1868
                        janus_mutex_unlock(&source->rec_mutex);
1869
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1870
                }
1871
                janus_mutex_unlock(&mountpoints_mutex);
1872
                /* Send a success response back */
1873
                response = json_object();
1874
                json_object_set_new(response, "streaming", json_string("ok"));
1875
                goto plugin_response;
1876
        } else if(!strcasecmp(request_text, "watch") || !strcasecmp(request_text, "start")
1877
                        || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "stop")
1878
                        || !strcasecmp(request_text, "switch")) {
1879
                /* These messages are handled asynchronously */
1880
                janus_streaming_message *msg = g_malloc0(sizeof(janus_streaming_message));
1881
                if(msg == NULL) {
1882
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1883
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1884
                        g_snprintf(error_cause, 512, "Memory error");
1885
                        goto error;
1886
                }
1887

    
1888
                g_free(message);
1889
                msg->handle = handle;
1890
                msg->transaction = transaction;
1891
                msg->message = root;
1892
                msg->sdp_type = sdp_type;
1893
                msg->sdp = sdp;
1894

    
1895
                g_async_queue_push(messages, msg);
1896

    
1897
                return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
1898
        } else {
1899
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1900
                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1901
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1902
                goto error;
1903
        }
1904

    
1905
plugin_response:
1906
                {
1907
                        if(!response) {
1908
                                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1909
                                g_snprintf(error_cause, 512, "Invalid response");
1910
                                goto error;
1911
                        }
1912
                        if(root != NULL)
1913
                                json_decref(root);
1914
                        g_free(transaction);
1915
                        g_free(message);
1916
                        g_free(sdp_type);
1917
                        g_free(sdp);
1918

    
1919
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1920
                        json_decref(response);
1921
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1922
                        g_free(response_text);
1923
                        return result;
1924
                }
1925

    
1926
error:
1927
                {
1928
                        if(root != NULL)
1929
                                json_decref(root);
1930
                        g_free(transaction);
1931
                        g_free(message);
1932
                        g_free(sdp_type);
1933
                        g_free(sdp);
1934

    
1935
                        /* Prepare JSON error event */
1936
                        json_t *event = json_object();
1937
                        json_object_set_new(event, "streaming", json_string("event"));
1938
                        json_object_set_new(event, "error_code", json_integer(error_code));
1939
                        json_object_set_new(event, "error", json_string(error_cause));
1940
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1941
                        json_decref(event);
1942
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
1943
                        g_free(event_text);
1944
                        return result;
1945
                }
1946

    
1947
}
1948

    
1949
void janus_streaming_setup_media(janus_plugin_session *handle) {
1950
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
1951
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1952
                return;
1953
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1954
        if(!session) {
1955
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1956
                return;
1957
        }
1958
        if(session->destroyed)
1959
                return;
1960
        g_atomic_int_set(&session->hangingup, 0);
1961
        /* We only start streaming towards this user when we get this event */
1962
        session->context.a_last_ssrc = 0;
1963
        session->context.a_last_ssrc = 0;
1964
        session->context.a_last_ts = 0;
1965
        session->context.a_base_ts = 0;
1966
        session->context.a_base_ts_prev = 0;
1967
        session->context.v_last_ssrc = 0;
1968
        session->context.v_last_ts = 0;
1969
        session->context.v_base_ts = 0;
1970
        session->context.v_base_ts_prev = 0;
1971
        session->context.a_last_seq = 0;
1972
        session->context.a_base_seq = 0;
1973
        session->context.a_base_seq_prev = 0;
1974
        session->context.v_last_seq = 0;
1975
        session->context.v_base_seq = 0;
1976
        session->context.v_base_seq_prev = 0;
1977
        /* If this is related to a live RTP mountpoint, any keyframe we can shoot already? */
1978
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
1979
        if(mountpoint->streaming_source == janus_streaming_source_rtp) {
1980
                janus_streaming_rtp_source *source = mountpoint->source;
1981
                if(source->keyframe.enabled) {
1982
                        JANUS_LOG(LOG_HUGE, "Any keyframe to send?\n");
1983
                        janus_mutex_lock(&source->keyframe.mutex);
1984
                        if(source->keyframe.latest_keyframe != NULL) {
1985
                                JANUS_LOG(LOG_HUGE, "Yep! %d packets\n", g_list_length(source->keyframe.latest_keyframe));
1986
                                GList *temp = source->keyframe.latest_keyframe;
1987
                                while(temp) {
1988
                                        janus_streaming_relay_rtp_packet(session, temp->data);
1989
                                        temp = temp->next;
1990
                                }
1991
                        }
1992
                        janus_mutex_unlock(&source->keyframe.mutex);
1993
                }
1994
        }
1995
        session->started = TRUE;
1996
        /* Prepare JSON event */
1997
        json_t *event = json_object();
1998
        json_object_set_new(event, "streaming", json_string("event"));
1999
        json_t *result = json_object();
2000
        json_object_set_new(result, "status", json_string("started"));
2001
        json_object_set_new(event, "result", result);
2002
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2003
        json_decref(event);
2004
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2005
        int ret = gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
2006
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2007
        g_free(event_text);
2008
}
2009

    
2010
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
2011
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
2012
                return;
2013
        /* FIXME We don't care about what the browser sends us, we're sendonly */
2014
}
2015

    
2016
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
2017
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
2018
                return;
2019
        /* We might interested in the available bandwidth that the user advertizes */
2020
        uint64_t bw = janus_rtcp_get_remb(buf, len);
2021
        if(bw > 0) {
2022
                JANUS_LOG(LOG_HUGE, "REMB for this PeerConnection: %"SCNu64"\n", bw);
2023
                /* TODO Use this somehow (e.g., notification towards application?) */
2024
        }
2025
        /* FIXME Maybe we should care about RTCP, but not now */
2026
}
2027

    
2028
void janus_streaming_hangup_media(janus_plugin_session *handle) {
2029
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
2030
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
2031
                return;
2032
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
2033
        if(!session) {
2034
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
2035
                return;
2036
        }
2037
        if(session->destroyed)
2038
                return;
2039
        if(g_atomic_int_add(&session->hangingup, 1))
2040
                return;
2041
        /* FIXME Simulate a "stop" coming from the browser */
2042
        janus_streaming_message *msg = g_malloc0(sizeof(janus_streaming_message));
2043
        msg->handle = handle;
2044
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
2045
        msg->transaction = NULL;
2046
        msg->sdp_type = NULL;
2047
        msg->sdp = NULL;
2048
        g_async_queue_push(messages, msg);
2049
}
2050

    
2051
/* Thread to handle incoming messages */
2052
static void *janus_streaming_handler(void *data) {
2053
        JANUS_LOG(LOG_VERB, "Joining Streaming handler thread\n");
2054
        janus_streaming_message *msg = NULL;
2055
        int error_code = 0;
2056
        char error_cause[512];
2057
        json_t *root = NULL;
2058
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
2059
                msg = g_async_queue_pop(messages);
2060
                if(msg == NULL)
2061
                        continue;
2062
                if(msg == &exit_message)
2063
                        break;
2064
                if(msg->handle == NULL) {
2065
                        janus_streaming_message_free(msg);
2066
                        continue;
2067
                }
2068
                janus_streaming_session *session = NULL;
2069
                janus_mutex_lock(&sessions_mutex);
2070
                if(g_hash_table_lookup(sessions, msg->handle) != NULL ) {
2071
                        session = (janus_streaming_session *)msg->handle->plugin_handle;
2072
                }
2073
                janus_mutex_unlock(&sessions_mutex);
2074
                if(!session) {
2075
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
2076
                        janus_streaming_message_free(msg);
2077
                        continue;
2078
                }
2079
                if(session->destroyed) {
2080
                        janus_streaming_message_free(msg);
2081
                        continue;
2082
                }
2083
                /* Handle request */
2084
                error_code = 0;
2085
                root = NULL;
2086
                if(msg->message == NULL) {
2087
                        JANUS_LOG(LOG_ERR, "No message??\n");
2088
                        error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
2089
                        g_snprintf(error_cause, 512, "%s", "No message??");
2090
                        goto error;
2091
                }
2092
                root = msg->message;
2093
                /* Get the request first */
2094
                JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,
2095
                        error_code, error_cause, TRUE,
2096
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2097
                if(error_code != 0)
2098
                        goto error;
2099
                json_t *request = json_object_get(root, "request");
2100
                const char *request_text = json_string_value(request);
2101
                json_t *result = NULL;
2102
                const char *sdp_type = NULL;
2103
                char *sdp = NULL;
2104
                /* All these requests can only be handled asynchronously */
2105
                if(!strcasecmp(request_text, "watch")) {
2106
                        JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
2107
                                error_code, error_cause, TRUE,
2108
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2109
                        if(error_code != 0)
2110
                                goto error;
2111
                        json_t *id = json_object_get(root, "id");
2112
                        guint64 id_value = json_integer_value(id);
2113
                        janus_mutex_lock(&mountpoints_mutex);
2114
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
2115
                        if(mp == NULL) {
2116
                                janus_mutex_unlock(&mountpoints_mutex);
2117
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2118
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2119
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2120
                                goto error;
2121
                        }
2122
                        /* A secret may be required for this action */
2123
                        JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause,
2124
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
2125
                        if(error_code != 0) {
2126
                                janus_mutex_unlock(&mountpoints_mutex);
2127
                                goto error;
2128
                        }
2129
                        janus_mutex_unlock(&mountpoints_mutex);
2130
                        JANUS_LOG(LOG_VERB, "Request to watch mountpoint/stream %"SCNu64"\n", id_value);
2131
                        session->stopping = FALSE;
2132
                        session->mountpoint = mp;
2133
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
2134
                                GError *error = NULL;
2135
                                g_thread_try_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session, &error);
2136
                                if(error != NULL) {
2137
                                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the on-demand thread...\n", error->code, error->message ? error->message : "??");
2138
                                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
2139
                                        g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the on-demand thread", error->code, error->message ? error->message : "??");
2140
                                        goto error;
2141
                                }
2142
                        }
2143
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
2144
                        janus_mutex_lock(&mp->mutex);
2145
                        mp->listeners = g_list_append(mp->listeners, session);
2146
                        janus_mutex_unlock(&mp->mutex);
2147
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
2148
                        char sdptemp[2048];
2149
                        memset(sdptemp, 0, 2048);
2150
                        gchar buffer[512];
2151
                        memset(buffer, 0, 512);
2152
                        gint64 sessid = janus_get_real_time();
2153
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
2154
                        g_snprintf(buffer, 512,
2155
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
2156
                                        "-", sessid, version);
2157
                        g_strlcat(sdptemp, buffer, 2048);
2158
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 2048);
2159
                        if(mp->codecs.audio_pt >= 0) {
2160
                                /* Add audio line */
2161
                                g_snprintf(buffer, 512,
2162
                                        "m=audio 1 RTP/SAVPF %d\r\n"
2163
                                        "c=IN IP4 1.1.1.1\r\n",
2164
                                        mp->codecs.audio_pt);
2165
                                g_strlcat(sdptemp, buffer, 2048);
2166
                                if(mp->codecs.audio_rtpmap) {
2167
                                        g_snprintf(buffer, 512,
2168
                                                "a=rtpmap:%d %s\r\n",
2169
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
2170
                                        g_strlcat(sdptemp, buffer, 2048);
2171
                                }
2172
                                if(mp->codecs.audio_fmtp) {
2173
                                        g_snprintf(buffer, 512,
2174
                                                "a=fmtp:%d %s\r\n",
2175
                                                mp->codecs.audio_pt, mp->codecs.audio_fmtp);
2176
                                        g_strlcat(sdptemp, buffer, 2048);
2177
                                }
2178
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2179
                        }
2180
                        if(mp->codecs.video_pt >= 0) {
2181
                                /* Add video line */
2182
                                g_snprintf(buffer, 512,
2183
                                        "m=video 1 RTP/SAVPF %d\r\n"
2184
                                        "c=IN IP4 1.1.1.1\r\n",
2185
                                        mp->codecs.video_pt);
2186
                                g_strlcat(sdptemp, buffer, 2048);
2187
                                if(mp->codecs.video_rtpmap) {
2188
                                        g_snprintf(buffer, 512,
2189
                                                "a=rtpmap:%d %s\r\n",
2190
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
2191
                                        g_strlcat(sdptemp, buffer, 2048);
2192
                                }
2193
                                if(mp->codecs.video_fmtp) {
2194
                                        g_snprintf(buffer, 512,
2195
                                                "a=fmtp:%d %s\r\n",
2196
                                                mp->codecs.video_pt, mp->codecs.video_fmtp);
2197
                                        g_strlcat(sdptemp, buffer, 2048);
2198
                                }
2199
                                g_snprintf(buffer, 512,
2200
                                        "a=rtcp-fb:%d nack\r\n",
2201
                                        mp->codecs.video_pt);
2202
                                g_strlcat(sdptemp, buffer, 2048);
2203
                                g_snprintf(buffer, 512,
2204
                                        "a=rtcp-fb:%d goog-remb\r\n",
2205
                                        mp->codecs.video_pt);
2206
                                g_strlcat(sdptemp, buffer, 2048);
2207
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2208
                        }
2209
                        sdp = g_strdup(sdptemp);
2210
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
2211
                        result = json_object();
2212
                        json_object_set_new(result, "status", json_string("preparing"));
2213
                } else if(!strcasecmp(request_text, "start")) {
2214
                        if(session->mountpoint == NULL) {
2215
                                JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
2216
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2217
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2218
                                goto error;
2219
                        }
2220
                        JANUS_LOG(LOG_VERB, "Starting the streaming\n");
2221
                        session->paused = FALSE;
2222
                        result = json_object();
2223
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
2224
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
2225
                        /* Also notify event handlers */
2226
                        if(notify_events && gateway->events_is_enabled()) {
2227
                                json_t *info = json_object();
2228
                                json_object_set_new(info, "status", json_string("starting"));
2229
                                json_object_set_new(info, "id", json_integer(session->mountpoint->id));
2230
                                gateway->notify_event(session->handle, info);
2231
                        }
2232
                } else if(!strcasecmp(request_text, "pause")) {
2233
                        if(session->mountpoint == NULL) {
2234
                                JANUS_LOG(LOG_VERB, "Can't pause: no mountpoint set\n");
2235
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2236
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2237
                                goto error;
2238
                        }
2239
                        JANUS_LOG(LOG_VERB, "Pausing the streaming\n");
2240
                        session->paused = TRUE;
2241
                        result = json_object();
2242
                        json_object_set_new(result, "status", json_string("pausing"));
2243
                        /* Also notify event handlers */
2244
                        if(notify_events && gateway->events_is_enabled()) {
2245
                                json_t *info = json_object();
2246
                                json_object_set_new(info, "status", json_string("pausing"));
2247
                                json_object_set_new(info, "id", json_integer(session->mountpoint->id));
2248
                                gateway->notify_event(session->handle, info);
2249
                        }
2250
                } else if(!strcasecmp(request_text, "switch")) {
2251
                        /* This listener wants to switch to a different mountpoint
2252
                         * NOTE: this only works for live RTP streams as of now: you
2253
                         * cannot, for instance, switch from a live RTP mountpoint to
2254
                         * an on demand one or viceversa (TBD.) */
2255
                        janus_streaming_mountpoint *oldmp = session->mountpoint;
2256
                        if(oldmp == NULL) {
2257
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a mountpoint\n");
2258
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2259
                                g_snprintf(error_cause, 512, "Can't switch: not on a mountpoint");
2260
                                goto error;
2261
                        }
2262
                        if(oldmp->streaming_type != janus_streaming_type_live || 
2263
                                        oldmp->streaming_source != janus_streaming_source_rtp) {
2264
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a live RTP mountpoint\n");
2265
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2266
                                g_snprintf(error_cause, 512, "Can't switch: not on a live RTP mountpoint");
2267
                                goto error;
2268
                        }
2269
                        JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
2270
                                error_code, error_cause, TRUE,
2271
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2272
                        if(error_code != 0)
2273
                                goto error;
2274
                        json_t *id = json_object_get(root, "id");
2275
                        guint64 id_value = json_integer_value(id);
2276
                        janus_mutex_lock(&mountpoints_mutex);
2277
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, &id_value);
2278
                        if(mp == NULL) {
2279
                                janus_mutex_unlock(&mountpoints_mutex);
2280
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2281
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2282
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2283
                                goto error;
2284
                        }
2285
                        if(mp->streaming_type != janus_streaming_type_live || 
2286
                                        mp->streaming_source != janus_streaming_source_rtp) {
2287
                                janus_mutex_unlock(&mountpoints_mutex);
2288
                                JANUS_LOG(LOG_VERB, "Can't switch: target is not a live RTP mountpoint\n");
2289
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2290
                                g_snprintf(error_cause, 512, "Can't switch: target is not a live RTP mountpoint");
2291
                                goto error;
2292
                        }
2293
                        janus_mutex_unlock(&mountpoints_mutex);
2294
                        JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %"SCNu64" (old: %"SCNu64")\n", id_value, oldmp->id);
2295
                        session->paused = TRUE;
2296
                        /* Unsubscribe from the previous mountpoint and subscribe to the new one */
2297
                        janus_mutex_lock(&oldmp->mutex);
2298
                        oldmp->listeners = g_list_remove_all(oldmp->listeners, session);
2299
                        janus_mutex_unlock(&oldmp->mutex);
2300
                        /* Subscribe to the new one */
2301
                        janus_mutex_lock(&mp->mutex);
2302
                        mp->listeners = g_list_append(mp->listeners, session);
2303
                        janus_mutex_unlock(&mp->mutex);
2304
                        session->mountpoint = mp;
2305
                        session->paused = FALSE;
2306
                        /* Done */
2307
                        result = json_object();
2308
                        json_object_set_new(result, "streaming", json_string("event"));
2309
                        json_object_set_new(result, "switched", json_string("ok"));
2310
                        json_object_set_new(result, "id", json_integer(id_value));
2311
                        /* Also notify event handlers */
2312
                        if(notify_events && gateway->events_is_enabled()) {
2313
                                json_t *info = json_object();
2314
                                json_object_set_new(info, "status", json_string("switching"));
2315
                                json_object_set_new(info, "id", json_integer(id_value));
2316
                                gateway->notify_event(session->handle, info);
2317
                        }
2318
                } else if(!strcasecmp(request_text, "stop")) {
2319
                        if(session->stopping || !session->started) {
2320
                                /* Been there, done that: ignore */
2321
                                janus_streaming_message_free(msg);
2322
                                continue;
2323
                        }
2324
                        JANUS_LOG(LOG_VERB, "Stopping the streaming\n");
2325
                        session->stopping = TRUE;
2326
                        session->started = FALSE;
2327
                        session->paused = FALSE;
2328
                        result = json_object();
2329
                        json_object_set_new(result, "status", json_string("stopping"));
2330
                        if(session->mountpoint) {
2331
                                janus_mutex_lock(&session->mountpoint->mutex);
2332
                                JANUS_LOG(LOG_VERB, "  -- Removing the session from the mountpoint listeners\n");
2333
                                if(g_list_find(session->mountpoint->listeners, session) != NULL) {
2334
                                        JANUS_LOG(LOG_VERB, "  -- -- Found!\n");
2335
                                }
2336
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
2337
                                janus_mutex_unlock(&session->mountpoint->mutex);
2338
                        }
2339
                        /* Also notify event handlers */
2340
                        if(notify_events && gateway->events_is_enabled()) {
2341
                                json_t *info = json_object();
2342
                                json_object_set_new(info, "status", json_string("stopping"));
2343
                                json_object_set_new(info, "id", json_integer(session->mountpoint->id));
2344
                                gateway->notify_event(session->handle, info);
2345
                        }
2346
                        session->mountpoint = NULL;
2347
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
2348
                        gateway->close_pc(session->handle);
2349
                } else {
2350
                        JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
2351
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
2352
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
2353
                        goto error;
2354
                }
2355
                
2356
                /* Any SDP to handle? */
2357
                if(msg->sdp) {
2358
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
2359
                }
2360

    
2361
                /* Prepare JSON event */
2362
                json_t *event = json_object();
2363
                json_object_set_new(event, "streaming", json_string("event"));
2364
                if(result != NULL)
2365
                        json_object_set_new(event, "result", result);
2366
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2367
                json_decref(event);
2368
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2369
                int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp);
2370
                JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2371
                g_free(event_text);
2372
                if(sdp)
2373
                        g_free(sdp);
2374
                janus_streaming_message_free(msg);
2375
                continue;
2376
                
2377
error:
2378
                {
2379
                        /* Prepare JSON error event */
2380
                        json_t *event = json_object();
2381
                        json_object_set_new(event, "streaming", json_string("event"));
2382
                        json_object_set_new(event, "error_code", json_integer(error_code));
2383
                        json_object_set_new(event, "error", json_string(error_cause));
2384
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2385
                        json_decref(event);
2386
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2387
                        int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL);
2388
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2389
                        g_free(event_text);
2390
                        janus_streaming_message_free(msg);
2391
                }
2392
        }
2393
        JANUS_LOG(LOG_VERB, "Leaving Streaming handler thread\n");
2394
        return NULL;
2395
}
2396

    
2397
/* Helpers to create a listener filedescriptor */
2398
static int janus_streaming_create_fd(int port, in_addr_t mcast, const char* listenername, const char* medianame, const char* mountpointname) {
2399
        struct sockaddr_in address;
2400
        int fd = socket(AF_INET, SOCK_DGRAM, 0);
2401
        if(fd < 0) {
2402
                JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s...\n", mountpointname, medianame);
2403
                return -1;
2404
        }        
2405
        if(port > 0) {
2406
                if(IN_MULTICAST(ntohl(mcast))) {
2407
#ifdef IP_MULTICAST_ALL                        
2408
                        int mc_all = 0;
2409
                        if((setsockopt(fd, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all))) < 0) {
2410
                                JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt IP_MULTICAST_ALL failed\n", mountpointname, listenername);
2411
                                close(fd);
2412
                                return -1;
2413
                        }                        
2414
#endif                        
2415
                        struct ip_mreq mreq;
2416
                        memset(&mreq, 0, sizeof(mreq));
2417
                        mreq.imr_multiaddr.s_addr = mcast;
2418
                        if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
2419
                                JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP failed\n", mountpointname, listenername);
2420
                                close(fd);
2421
                                return -1;
2422
                        }
2423
                        JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP ok\n", mountpointname, listenername);
2424
                }
2425
        }
2426

    
2427
        address.sin_family = AF_INET;
2428
        address.sin_port = htons(port);
2429
        address.sin_addr.s_addr = INADDR_ANY;
2430
        if(bind(fd, (struct sockaddr *)(&address), sizeof(struct sockaddr)) < 0) {
2431
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (port %d)...\n", mountpointname, medianame, port);
2432
                close(fd);
2433
                return -1;
2434
        }
2435
        return fd;
2436
}
2437

    
2438
/* Helpers to destroy a streaming mountpoint. */
2439
static void janus_streaming_rtp_source_free(janus_streaming_rtp_source *source) {
2440
        if(source->audio_fd > 0) {
2441
                close(source->audio_fd);
2442
        }
2443
        if(source->video_fd > 0) {
2444
                close(source->video_fd);
2445
        }
2446
        janus_mutex_lock(&source->keyframe.mutex);
2447
        GList *temp = NULL;
2448
        while(source->keyframe.latest_keyframe) {
2449
                temp = g_list_first(source->keyframe.latest_keyframe);
2450
                source->keyframe.latest_keyframe = g_list_remove_link(source->keyframe.latest_keyframe, temp);
2451
                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
2452
                g_free(pkt->data);
2453
                g_free(pkt);
2454
                g_list_free(temp);
2455
        }
2456
        source->keyframe.latest_keyframe = NULL;
2457
        while(source->keyframe.temp_keyframe) {
2458
                temp = g_list_first(source->keyframe.temp_keyframe);
2459
                source->keyframe.temp_keyframe = g_list_remove_link(source->keyframe.temp_keyframe, temp);
2460
                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
2461
                g_free(pkt->data);
2462
                g_free(pkt);
2463
                g_list_free(temp);
2464
        }
2465
        source->keyframe.latest_keyframe = NULL;
2466
        janus_mutex_unlock(&source->keyframe.mutex);
2467
#ifdef HAVE_LIBCURL
2468
        if(source->curl) {
2469
                /* Send an RTSP TEARDOWN */
2470
                curl_easy_setopt(source->curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_TEARDOWN);                
2471
                int res = curl_easy_perform(source->curl);
2472
                if(res != CURLE_OK) {
2473
                        JANUS_LOG(LOG_ERR, "Couldn't send TEARDOWN request: %s\n", curl_easy_strerror(res));
2474
                }                        
2475
                curl_easy_cleanup(source->curl);
2476
        }
2477
#endif
2478
        g_free(source);
2479
}
2480

    
2481
static void janus_streaming_file_source_free(janus_streaming_file_source *source) {
2482
        g_free(source->filename);
2483
        g_free(source);
2484
}
2485

    
2486
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp) {
2487
        mp->destroyed = janus_get_monotonic_time();
2488
        
2489
        g_free(mp->name);
2490
        g_free(mp->description);
2491
        g_free(mp->secret);
2492
        g_free(mp->pin);
2493
        janus_mutex_lock(&mp->mutex);
2494
        g_list_free(mp->listeners);
2495
        janus_mutex_unlock(&mp->mutex);
2496

    
2497
        if(mp->source != NULL && mp->source_destroy != NULL) {
2498
                mp->source_destroy(mp->source);
2499
        }
2500

    
2501
        g_free(mp->codecs.audio_rtpmap);
2502
        g_free(mp->codecs.audio_fmtp);
2503
        g_free(mp->codecs.video_rtpmap);
2504
        g_free(mp->codecs.video_fmtp);
2505

    
2506
        g_free(mp);
2507
}
2508

    
2509

    
2510
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
2511
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
2512
                uint64_t id, char *name, char *desc,
2513
                gboolean doaudio, char *amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
2514
                gboolean dovideo, char *vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf) {
2515
        janus_mutex_lock(&mountpoints_mutex);
2516
        if(id == 0) {
2517
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2518
                while(id == 0) {
2519
                        id = janus_random_uint64();
2520
                        if(g_hash_table_lookup(mountpoints, &id) != NULL) {
2521
                                /* ID already in use, try another one */
2522
                                id = 0;
2523
                        }
2524
                }
2525
        }
2526
        char tempname[255];
2527
        if(name == NULL) {
2528
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2529
                memset(tempname, 0, 255);
2530
                g_snprintf(tempname, 255, "%"SCNu64, id);
2531
        }
2532
        if(!doaudio && !dovideo) {
2533
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
2534
                janus_mutex_unlock(&mountpoints_mutex);
2535
                return NULL;
2536
        }
2537
        if(doaudio && (aport == 0 || artpmap == NULL)) {
2538
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for audio...\n");
2539
                janus_mutex_unlock(&mountpoints_mutex);
2540
                return NULL;
2541
        }
2542
        if(dovideo && (vport == 0 || vcodec == 0 || vrtpmap == NULL)) {
2543
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for video...\n");
2544
                janus_mutex_unlock(&mountpoints_mutex);
2545
                return NULL;
2546
        }
2547
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
2548
        /* First of all, let's check if the requested ports are free */
2549
        int audio_fd = -1;
2550
        if(doaudio) {
2551
                audio_fd = janus_streaming_create_fd(aport, amcast ? inet_addr(amcast) : INADDR_ANY,
2552
                        "Audio", "audio", name ? name : tempname);
2553
                if(audio_fd < 0) {
2554
                        JANUS_LOG(LOG_ERR, "Can't bind to port %d for audio...\n", aport);
2555
                        janus_mutex_unlock(&mountpoints_mutex);
2556
                        return NULL;
2557
                }
2558
        }
2559
        int video_fd = -1;
2560
        if(dovideo) {
2561
                video_fd = janus_streaming_create_fd(vport, vmcast ? inet_addr(vmcast) : INADDR_ANY,
2562
                        "Video", "video", name ? name : tempname);
2563
                if(video_fd < 0) {
2564
                        JANUS_LOG(LOG_ERR, "Can't bind to port %d for video...\n", vport);
2565
                        if(audio_fd > 0)
2566
                                close(audio_fd);
2567
                        janus_mutex_unlock(&mountpoints_mutex);
2568
                        return NULL;
2569
                }
2570
        }
2571
        /* Create the mountpoint */
2572
        janus_streaming_mountpoint *live_rtp = g_malloc0(sizeof(janus_streaming_mountpoint));
2573
        if(live_rtp == NULL) {
2574
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2575
                janus_mutex_unlock(&mountpoints_mutex);
2576
                return NULL;
2577
        }
2578
        live_rtp->id = id;
2579
        live_rtp->name = g_strdup(name ? name : tempname);
2580
        char *description = NULL;
2581
        if(desc != NULL)
2582
                description = g_strdup(desc);
2583
        else
2584
                description = g_strdup(name ? name : tempname);
2585
        live_rtp->description = description;
2586
        live_rtp->enabled = TRUE;
2587
        live_rtp->active = FALSE;
2588
        live_rtp->streaming_type = janus_streaming_type_live;
2589
        live_rtp->streaming_source = janus_streaming_source_rtp;
2590
        janus_streaming_rtp_source *live_rtp_source = g_malloc0(sizeof(janus_streaming_rtp_source));
2591
        if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
2592
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2593
                if(live_rtp->name)
2594
                        g_free(live_rtp->name);
2595
                if(description)
2596
                        g_free(description);
2597
                if(live_rtp_source)
2598
                        g_free(live_rtp_source);
2599
                g_free(live_rtp);
2600
                janus_mutex_unlock(&mountpoints_mutex);
2601
                return NULL;
2602
        }
2603
        live_rtp_source->audio_mcast = doaudio ? (amcast ? inet_addr(amcast) : INADDR_ANY) : INADDR_ANY;
2604
        live_rtp_source->audio_port = doaudio ? aport : -1;
2605
        live_rtp_source->video_mcast = dovideo ? (vmcast ? inet_addr(vmcast) : INADDR_ANY) : INADDR_ANY;
2606
        live_rtp_source->video_port = dovideo ? vport : -1;
2607
        live_rtp_source->arc = NULL;
2608
        live_rtp_source->vrc = NULL;
2609
        janus_mutex_init(&live_rtp_source->rec_mutex);
2610
        live_rtp_source->audio_fd = audio_fd;
2611
        live_rtp_source->video_fd = video_fd;
2612
        live_rtp_source->last_received_audio = janus_get_monotonic_time();
2613
        live_rtp_source->last_received_video = janus_get_monotonic_time();
2614
        live_rtp_source->keyframe.enabled = bufferkf;
2615
        live_rtp_source->keyframe.latest_keyframe = NULL;
2616
        live_rtp_source->keyframe.temp_keyframe = NULL;
2617
        live_rtp_source->keyframe.temp_ts = 0;
2618
        janus_mutex_init(&live_rtp_source->keyframe.mutex);
2619
        live_rtp->source = live_rtp_source;
2620
        live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2621
        live_rtp->codecs.audio_pt = doaudio ? acodec : -1;
2622
        live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2623
        live_rtp->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL;
2624
        live_rtp->codecs.video_codec = -1;
2625
        if(dovideo) {
2626
                if(strstr(vrtpmap, "vp8") || strstr(vrtpmap, "VP8"))
2627
                        live_rtp->codecs.video_codec = JANUS_STREAMING_VP8;
2628
                else if(strstr(vrtpmap, "vp9") || strstr(vrtpmap, "VP9"))
2629
                        live_rtp->codecs.video_codec = JANUS_STREAMING_VP9;
2630
                else if(strstr(vrtpmap, "h264") || strstr(vrtpmap, "H264"))
2631
                        live_rtp->codecs.video_codec = JANUS_STREAMING_H264;
2632
        }
2633
        live_rtp->codecs.video_pt = dovideo ? vcodec : -1;
2634
        live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2635
        live_rtp->codecs.video_fmtp = dovideo ? (vfmtp ? g_strdup(vfmtp) : NULL) : NULL;
2636
        live_rtp->listeners = NULL;
2637
        live_rtp->destroyed = 0;
2638
        janus_mutex_init(&live_rtp->mutex);
2639
        g_hash_table_insert(mountpoints, janus_uint64_dup(live_rtp->id), live_rtp);
2640
        janus_mutex_unlock(&mountpoints_mutex);
2641
        GError *error = NULL;
2642
        g_thread_try_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp, &error);
2643
        if(error != NULL) {
2644
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTP thread...\n", error->code, error->message ? error->message : "??");
2645
                if(live_rtp->name)
2646
                        g_free(live_rtp->name);
2647
                if(description)
2648
                        g_free(description);
2649
                if(live_rtp_source)
2650
                        g_free(live_rtp_source);
2651
                g_free(live_rtp);
2652
                return NULL;
2653
        }
2654
        return live_rtp;
2655
}
2656

    
2657
/* Helper to create a file/ondemand live source */
2658
janus_streaming_mountpoint *janus_streaming_create_file_source(
2659
                uint64_t id, char *name, char *desc, char *filename,
2660
                gboolean live, gboolean doaudio, gboolean dovideo) {
2661
        janus_mutex_lock(&mountpoints_mutex);
2662
        if(filename == NULL) {
2663
                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, missing filename...\n");
2664
                janus_mutex_unlock(&mountpoints_mutex);
2665
                return NULL;
2666
        }
2667
        if(name == NULL) {
2668
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2669
        }
2670
        if(id == 0) {
2671
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2672
                while(id == 0) {
2673
                        id = janus_random_uint64();
2674
                        if(g_hash_table_lookup(mountpoints, &id) != NULL) {
2675
                                /* ID already in use, try another one */
2676
                                id = 0;
2677
                        }
2678
                }
2679
        }
2680
        if(!doaudio && !dovideo) {
2681
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, no audio or video have to be streamed...\n");
2682
                janus_mutex_unlock(&mountpoints_mutex);
2683
                return NULL;
2684
        }
2685
        /* FIXME We don't support video streaming from file yet */
2686
        if(!doaudio || dovideo) {
2687
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, we only support audio file streaming right now...\n");
2688
                janus_mutex_unlock(&mountpoints_mutex);
2689
                return NULL;
2690
        }
2691
        /* TODO We should support something more than raw a-Law and mu-Law streams... */
2692
        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
2693
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
2694
                janus_mutex_unlock(&mountpoints_mutex);
2695
                return NULL;
2696
        }
2697
        janus_streaming_mountpoint *file_source = g_malloc0(sizeof(janus_streaming_mountpoint));
2698
        if(file_source == NULL) {
2699
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2700
                janus_mutex_unlock(&mountpoints_mutex);
2701
                return NULL;
2702
        }
2703
        file_source->id = id;
2704
        char tempname[255];
2705
        if(!name) {
2706
                memset(tempname, 0, 255);
2707
                g_snprintf(tempname, 255, "%"SCNu64, file_source->id);
2708
        }
2709
        file_source->name = g_strdup(name ? name : tempname);
2710
        char *description = NULL;
2711
        if(desc != NULL)
2712
                description = g_strdup(desc);
2713
        else
2714
                description = g_strdup(name ? name : tempname);
2715
        file_source->description = description;
2716
        file_source->enabled = TRUE;
2717
        file_source->active = FALSE;
2718
        file_source->streaming_type = live ? janus_streaming_type_live : janus_streaming_type_on_demand;
2719
        file_source->streaming_source = janus_streaming_source_file;
2720
        janus_streaming_file_source *file_source_source = g_malloc0(sizeof(janus_streaming_file_source));
2721
        if(file_source->name == NULL || description == NULL || file_source_source == NULL) {
2722
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2723
                if(file_source->name)
2724
                        g_free(file_source->name);
2725
                if(description)
2726
                        g_free(description);
2727
                if(file_source_source)
2728
                        g_free(file_source_source);
2729
                g_free(file_source);
2730
                janus_mutex_unlock(&mountpoints_mutex);
2731
                return NULL;
2732
        }
2733
        file_source_source->filename = g_strdup(filename);
2734
        file_source->source = file_source_source;
2735
        file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free;
2736
        file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0;
2737
        file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000");
2738
        file_source->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
2739
        file_source->codecs.video_rtpmap = NULL;
2740
        file_source->listeners = NULL;
2741
        file_source->destroyed = 0;
2742
        janus_mutex_init(&file_source->mutex);
2743
        g_hash_table_insert(mountpoints, janus_uint64_dup(file_source->id), file_source);
2744
        janus_mutex_unlock(&mountpoints_mutex);
2745
        if(live) {
2746
                GError *error = NULL;
2747
                g_thread_try_new(file_source->name, &janus_streaming_filesource_thread, file_source, &error);
2748
                if(error != NULL) {
2749
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the live filesource thread...\n", error->code, error->message ? error->message : "??");
2750
                        if(file_source->name)
2751
                                g_free(file_source->name);
2752
                        if(description)
2753
                                g_free(description);
2754
                        if(file_source_source)
2755
                                g_free(file_source_source);
2756
                        g_free(file_source);
2757
                        return NULL;
2758
                }
2759
        }
2760
        return file_source;
2761
}
2762

    
2763
#ifdef HAVE_LIBCURL                
2764
typedef struct janus_streaming_buffer {
2765
        char *buffer;
2766
        size_t size;
2767
} janus_streaming_buffer;
2768

    
2769
static size_t janus_streaming_rtsp_curl_callback(void *payload, size_t size, size_t nmemb, void *data) {
2770
        size_t realsize = size * nmemb;
2771
        janus_streaming_buffer *buf = (struct janus_streaming_buffer *)data;
2772
        /* (Re)allocate if needed */
2773
        buf->buffer = realloc(buf->buffer, buf->size+realsize+1);
2774
        if(buf->buffer == NULL) {
2775
                /* Memory error! */ 
2776
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2777
                return 0;
2778
        }
2779
        /* Update the buffer */
2780
        memcpy(&(buf->buffer[buf->size]), payload, realsize);
2781
        buf->size += realsize;
2782
        buf->buffer[buf->size] = 0;
2783
        /* Done! */
2784
        return realsize;
2785
}
2786

    
2787
static int janus_streaming_rtsp_parse_sdp(const char* buffer, const char* name, const char* media, int* pt, char* transport, char* rtpmap, char* fmtp, char* control) {
2788
        int port=-1;
2789
        char pattern[256];
2790
        g_snprintf(pattern, sizeof(pattern), "m=%s", media);
2791
        char *m=strstr(buffer, pattern);
2792
        if(m == NULL) {
2793
                JANUS_LOG(LOG_VERB, "[%s] no media %s...\n", name, media);
2794
                return -1;
2795
        }
2796
        sscanf(m, "m=%*s %d %*s %d", &port, pt);
2797
        char *s=strstr(m, "a=control:");
2798
        if(s == NULL) {
2799
                JANUS_LOG(LOG_ERR, "[%s] no control for %s...\n", name, media);
2800
                return -1;
2801
        }                
2802
        sscanf(s, "a=control:%s", control);
2803
        char *r=strstr(m, "a=rtpmap:");
2804
        if(r != NULL) {
2805
                sscanf(r, "a=rtpmap:%*d %s", rtpmap);
2806
        }
2807
        char *f=strstr(m, "a=fmtp:");
2808
        if(f != NULL) {
2809
                sscanf(f, "a=fmtp:%*d %s", fmtp);
2810
        }        
2811
        char *c=strstr(m, "c=IN IP4");
2812
        char ip[256];
2813
        in_addr_t mcast = INADDR_ANY;
2814
        if(c != NULL) {
2815
                if(sscanf(c, "c=IN IP4 %[^/]", ip) != 0) {
2816
                        mcast = inet_addr(ip);
2817
                }
2818
        }        
2819
        int fd = janus_streaming_create_fd(port, mcast, media, media, name);
2820
        if(fd < 0) {
2821
                return -1;
2822
        }
2823
        struct sockaddr_in address;        
2824
        socklen_t len = sizeof(address);
2825
        if(getsockname(fd, (struct sockaddr *)&address, &len) < 0) {
2826
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (%d)...\n", name, media, port);
2827
                close(fd);
2828
                return -1;
2829
        }
2830
        port=ntohs(address.sin_port);
2831
        if(IN_MULTICAST(ntohl(mcast))) {
2832
                sprintf(transport, "RTP/AVP;multicast;client_port=%d-%d", port, port+1);        
2833
        } else {
2834
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", port, port+1);        
2835
        }
2836

    
2837
        return fd;
2838
}        
2839

    
2840
/* Helper to create an RTSP source */
2841
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2842
                uint64_t id, char *name, char *desc, char *url,
2843
                gboolean doaudio, gboolean dovideo) {
2844
        if(url == NULL) {
2845
                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n");
2846
                return NULL;
2847
        }        
2848
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");        
2849
        CURL* curl = curl_easy_init();
2850
        if(curl == NULL) {
2851
                JANUS_LOG(LOG_ERR, "Can't init CURL\n");
2852
                return NULL;
2853
        }
2854
        if(janus_log_level > LOG_INFO)
2855
                curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
2856
        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
2857
        curl_easy_setopt(curl, CURLOPT_URL, url);        
2858
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); 
2859
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 0L);          
2860
        /* Send an RTSP DESCRIBE */
2861
        janus_streaming_buffer data;
2862
        data.buffer = g_malloc0(1);
2863
        data.size = 0;
2864
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, url);
2865
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_DESCRIBE);
2866
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, janus_streaming_rtsp_curl_callback);                
2867
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &data);
2868
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, janus_streaming_rtsp_curl_callback);                
2869
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, &data);
2870
        int res = curl_easy_perform(curl);
2871
        if(res != CURLE_OK) {
2872
                JANUS_LOG(LOG_ERR, "Couldn't send DESCRIBE request: %s\n", curl_easy_strerror(res));
2873
                curl_easy_cleanup(curl);
2874
                return NULL;
2875
        }                
2876
        long code = 0;
2877
        res = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &code);
2878
        if(res != CURLE_OK) {
2879
                JANUS_LOG(LOG_ERR, "Couldn't get DESCRIBE answer: %s\n", curl_easy_strerror(res));
2880
                curl_easy_cleanup(curl);
2881
                return NULL;
2882
        } else if(code != 200) {
2883
                JANUS_LOG(LOG_ERR, "Couldn't get DESCRIBE code: %ld\n", code);
2884
                curl_easy_cleanup(curl);
2885
                return NULL;
2886
        }                         
2887
        JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n",data.buffer);        
2888
        /* Parse the SDP we just got to figure out the negotiated media */
2889
        int vpt = -1;
2890
        char vrtpmap[2048];
2891
        char vfmtp[2048];
2892
        char vcontrol[2048];
2893
        char uri[1024];
2894
        char transport[1024];
2895
        int video_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "video", &vpt, transport, vrtpmap, vfmtp, vcontrol);
2896
        if(video_fd != -1) {
2897
                /* Send an RTSP SETUP for video */
2898
                g_free(data.buffer);
2899
                data.buffer = g_malloc0(1);
2900
                data.size = 0;                
2901
                sprintf(uri, "%s/%s", url, vcontrol);
2902
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2903
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2904
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2905
                res = curl_easy_perform(curl);
2906
                if(res != CURLE_OK) {
2907
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2908
                        curl_easy_cleanup(curl);
2909
                        return NULL;
2910
                }                
2911
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2912
        }
2913
        int apt = -1;
2914
        char artpmap[2048];
2915
        char afmtp[2048];
2916
        char acontrol[2048];
2917
        int audio_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "audio", &apt, transport, artpmap, afmtp, acontrol);
2918
        if(audio_fd != -1) {
2919
                /* Send an RTSP SETUP for audio */
2920
                g_free(data.buffer);
2921
                data.buffer = g_malloc0(1);
2922
                data.size = 0;                
2923
                sprintf(uri, "%s/%s", url, acontrol);
2924
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2925
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2926
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2927
                res = curl_easy_perform(curl);
2928
                if(res != CURLE_OK) {
2929
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2930
                        curl_easy_cleanup(curl);
2931
                        return NULL;
2932
                }                
2933
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2934
        }        
2935
        janus_mutex_lock(&mountpoints_mutex);
2936
        /* Create an RTP source for the media we'll get */
2937
        if(id == 0) {
2938
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2939
                while(id == 0) {
2940
                        id = janus_random_uint64();
2941
                        if(g_hash_table_lookup(mountpoints, &id) != NULL) {
2942
                                /* ID already in use, try another one */
2943
                                id = 0;
2944
                        }
2945
                }
2946
        }
2947
        char tempname[255];
2948
        if(name == NULL) {
2949
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2950
                memset(tempname, 0, 255);
2951
                g_snprintf(tempname, 255, "%"SCNu64, id);
2952
        }
2953
        char *sourcename =  g_strdup(name ? name : tempname);
2954
        if(sourcename == NULL) {
2955
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2956
                janus_mutex_unlock(&mountpoints_mutex);
2957
                curl_easy_cleanup(curl);
2958
                return NULL;
2959
        }        
2960
        char *description = NULL;
2961
        if(desc != NULL) {
2962
                description = g_strdup(desc);
2963
        } else {
2964
                description = g_strdup(name ? name : tempname);
2965
        }
2966
        if(description == NULL) {
2967
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2968
                g_free(sourcename);
2969
                janus_mutex_unlock(&mountpoints_mutex);
2970
                curl_easy_cleanup(curl);
2971
                return NULL;
2972
        }                
2973
        janus_streaming_mountpoint *live_rtsp = g_malloc0(sizeof(janus_streaming_mountpoint));
2974
        if(live_rtsp == NULL) {
2975
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2976
                g_free(description);
2977
                g_free(sourcename);
2978
                janus_mutex_unlock(&mountpoints_mutex);
2979
                curl_easy_cleanup(curl);
2980
                return NULL;
2981
        }        
2982
        live_rtsp->id = id ? id : janus_random_uint64();
2983
        live_rtsp->name = sourcename;
2984
        live_rtsp->description = description;
2985
        live_rtsp->enabled = TRUE;
2986
        live_rtsp->active = FALSE;
2987
        live_rtsp->streaming_type = janus_streaming_type_live;
2988
        live_rtsp->streaming_source = janus_streaming_source_rtp;
2989
        janus_streaming_rtp_source *live_rtsp_source = g_malloc0(sizeof(janus_streaming_rtp_source));
2990
        live_rtsp_source->arc = NULL;
2991
        live_rtsp_source->vrc = NULL;
2992
        live_rtsp_source->audio_fd = audio_fd;
2993
        live_rtsp_source->video_fd = video_fd;
2994
        live_rtsp_source->curl = curl;
2995
        live_rtsp->source = live_rtsp_source;
2996
        live_rtsp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2997
        live_rtsp->codecs.audio_pt = doaudio ? apt : -1;
2998
        live_rtsp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2999
        live_rtsp->codecs.audio_fmtp = doaudio ? g_strdup(afmtp) : NULL;
3000
        live_rtsp->codecs.video_pt = dovideo ? vpt : -1;
3001
        live_rtsp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
3002
        live_rtsp->codecs.video_fmtp = dovideo ? g_strdup(vfmtp) : NULL;
3003
        live_rtsp->listeners = NULL;
3004
        live_rtsp->destroyed = 0;
3005
        janus_mutex_init(&live_rtsp->mutex);
3006
        g_hash_table_insert(mountpoints, janus_uint64_dup(live_rtsp->id), live_rtsp);
3007
        janus_mutex_unlock(&mountpoints_mutex);        
3008
        GError *error = NULL;
3009
        g_thread_try_new(live_rtsp->name, &janus_streaming_relay_thread, live_rtsp, &error);        
3010
        if(error != NULL) {
3011
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTSP thread...\n", error->code, error->message ? error->message : "??");
3012
                janus_streaming_mountpoint_free(live_rtsp);
3013
                return NULL;        
3014
        }                                
3015
        /* Send an RTSP PLAY */
3016
        g_free(data.buffer);
3017
        data.buffer = g_malloc0(1);
3018
        data.size = 0;                
3019
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, url);
3020
        curl_easy_setopt(curl, CURLOPT_RANGE, "0.000-");
3021
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY);                
3022
        res = curl_easy_perform(curl);
3023
        if(res != CURLE_OK) {
3024
                JANUS_LOG(LOG_ERR, "Couldn't send PLAY request: %s\n", curl_easy_strerror(res));
3025
                janus_streaming_mountpoint_free(live_rtsp);
3026
                return NULL;
3027
        }                
3028
        JANUS_LOG(LOG_VERB, "PLAY answer:%s\n",data.buffer);        
3029
        g_free(data.buffer);
3030
        
3031
        return live_rtsp;
3032
}
3033
#else
3034
/* Helper to create an RTSP source */
3035
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
3036
                uint64_t id, char *name, char *desc, char *url,
3037
                gboolean doaudio, gboolean dovideo) {
3038
        JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
3039
        return NULL;
3040
}
3041
#endif
3042

    
3043
/* FIXME Thread to send RTP packets from a file (on demand) */
3044
static void *janus_streaming_ondemand_thread(void *data) {
3045
        JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n");
3046
        janus_streaming_session *session = (janus_streaming_session *)data;
3047
        if(!session) {
3048
                JANUS_LOG(LOG_ERR, "Invalid session!\n");
3049
                g_thread_unref(g_thread_self());
3050
                return NULL;
3051
        }
3052
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
3053
        if(!mountpoint) {
3054
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3055
                g_thread_unref(g_thread_self());
3056
                return NULL;
3057
        }
3058
        if(mountpoint->streaming_source != janus_streaming_source_file) {
3059
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
3060
                g_thread_unref(g_thread_self());
3061
                return NULL;
3062
        }
3063
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
3064
                JANUS_LOG(LOG_ERR, "[%s] Not an on-demand file source mountpoint!\n", mountpoint->name);
3065
                g_thread_unref(g_thread_self());
3066
                return NULL;
3067
        }
3068
        janus_streaming_file_source *source = mountpoint->source;
3069
        if(source == NULL || source->filename == NULL) {
3070
                g_thread_unref(g_thread_self());
3071
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
3072
                return NULL;
3073
        }
3074
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
3075
        FILE *audio = fopen(source->filename, "rb");
3076
        if(!audio) {
3077
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
3078
                g_thread_unref(g_thread_self());
3079
                return NULL;
3080
        }
3081
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
3082
        /* Buffer */
3083
        char *buf = g_malloc0(1024);
3084
        if(buf == NULL) {
3085
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
3086
                g_thread_unref(g_thread_self());
3087
                return NULL;
3088
        }
3089
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3090
        /* Set up RTP */
3091
        gint16 seq = 1;
3092
        gint32 ts = 0;
3093
        rtp_header *header = (rtp_header *)buf;
3094
        header->version = 2;
3095
        header->markerbit = 1;
3096
        header->type = mountpoint->codecs.audio_pt;
3097
        header->seq_number = htons(seq);
3098
        header->timestamp = htonl(ts);
3099
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
3100
        /* Timer */
3101
        struct timeval now, before;
3102
        gettimeofday(&before, NULL);
3103
        now.tv_sec = before.tv_sec;
3104
        now.tv_usec = before.tv_usec;
3105
        time_t passed, d_s, d_us;
3106
        /* Loop */
3107
        gint read = 0;
3108
        janus_streaming_rtp_relay_packet packet;
3109
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed && !session->stopping && !session->destroyed) {
3110
                /* See if it's time to prepare a frame */
3111
                gettimeofday(&now, NULL);
3112
                d_s = now.tv_sec - before.tv_sec;
3113
                d_us = now.tv_usec - before.tv_usec;
3114
                if(d_us < 0) {
3115
                        d_us += 1000000;
3116
                        --d_s;
3117
                }
3118
                passed = d_s*1000000 + d_us;
3119
                if(passed < 18000) {        /* Let's wait about 18ms */
3120
                        usleep(1000);
3121
                        continue;
3122
                }
3123
                /* Update the reference time */
3124
                before.tv_usec += 20000;
3125
                if(before.tv_usec > 1000000) {
3126
                        before.tv_sec++;
3127
                        before.tv_usec -= 1000000;
3128
                }
3129
                /* If not started or paused, wait some more */
3130
                if(!session->started || session->paused || !mountpoint->enabled)
3131
                        continue;
3132
                /* Read frame from file... */
3133
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
3134
                if(feof(audio)) {
3135
                        /* FIXME We're doing this forever... should this be configurable? */
3136
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
3137
                        fseek(audio, 0, SEEK_SET);
3138
                        continue;
3139
                }
3140
                if(read < 0)
3141
                        break;
3142
                if(mountpoint->active == FALSE)
3143
                        mountpoint->active = TRUE;
3144
                //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3145
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3146
                //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3147
                /* Relay on all sessions */
3148
                packet.data = header;
3149
                packet.length = RTP_HEADER_SIZE + read;
3150
                packet.is_video = 0;
3151
                packet.is_keyframe = 0;
3152
                /* Backup the actual timestamp and sequence number */
3153
                packet.timestamp = ntohl(packet.data->timestamp);
3154
                packet.seq_number = ntohs(packet.data->seq_number);
3155
                /* Go! */
3156
                janus_streaming_relay_rtp_packet(session, &packet);
3157
                /* Update header */
3158
                seq++;
3159
                header->seq_number = htons(seq);
3160
                ts += 160;
3161
                header->timestamp = htonl(ts);
3162
                header->markerbit = 0;
3163
        }
3164
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name);
3165
        g_free(name);
3166
        g_free(buf);
3167
        fclose(audio);
3168
        g_thread_unref(g_thread_self());
3169
        return NULL;
3170
}
3171

    
3172
/* FIXME Thread to send RTP packets from a file (live) */
3173
static void *janus_streaming_filesource_thread(void *data) {
3174
        JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n");
3175
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3176
        if(!mountpoint) {
3177
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3178
                g_thread_unref(g_thread_self());
3179
                return NULL;
3180
        }
3181
        if(mountpoint->streaming_source != janus_streaming_source_file) {
3182
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
3183
                g_thread_unref(g_thread_self());
3184
                return NULL;
3185
        }
3186
        if(mountpoint->streaming_type != janus_streaming_type_live) {
3187
                JANUS_LOG(LOG_ERR, "[%s] Not a live file source mountpoint!\n", mountpoint->name);
3188
                g_thread_unref(g_thread_self());
3189
                return NULL;
3190
        }
3191
        janus_streaming_file_source *source = mountpoint->source;
3192
        if(source == NULL || source->filename == NULL) {
3193
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
3194
                g_thread_unref(g_thread_self());
3195
                return NULL;
3196
        }
3197
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
3198
        FILE *audio = fopen(source->filename, "rb");
3199
        if(!audio) {
3200
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
3201
                g_thread_unref(g_thread_self());
3202
                return NULL;
3203
        }
3204
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
3205
        /* Buffer */
3206
        char *buf = g_malloc0(1024);
3207
        if(buf == NULL) {
3208
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
3209
                g_thread_unref(g_thread_self());
3210
                return NULL;
3211
        }
3212
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3213
        /* Set up RTP */
3214
        gint16 seq = 1;
3215
        gint32 ts = 0;
3216
        rtp_header *header = (rtp_header *)buf;
3217
        header->version = 2;
3218
        header->markerbit = 1;
3219
        header->type = mountpoint->codecs.audio_pt;
3220
        header->seq_number = htons(seq);
3221
        header->timestamp = htonl(ts);
3222
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
3223
        /* Timer */
3224
        struct timeval now, before;
3225
        gettimeofday(&before, NULL);
3226
        now.tv_sec = before.tv_sec;
3227
        now.tv_usec = before.tv_usec;
3228
        time_t passed, d_s, d_us;
3229
        /* Loop */
3230
        gint read = 0;
3231
        janus_streaming_rtp_relay_packet packet;
3232
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3233
                /* See if it's time to prepare a frame */
3234
                gettimeofday(&now, NULL);
3235
                d_s = now.tv_sec - before.tv_sec;
3236
                d_us = now.tv_usec - before.tv_usec;
3237
                if(d_us < 0) {
3238
                        d_us += 1000000;
3239
                        --d_s;
3240
                }
3241
                passed = d_s*1000000 + d_us;
3242
                if(passed < 18000) {        /* Let's wait about 18ms */
3243
                        usleep(1000);
3244
                        continue;
3245
                }
3246
                /* Update the reference time */
3247
                before.tv_usec += 20000;
3248
                if(before.tv_usec > 1000000) {
3249
                        before.tv_sec++;
3250
                        before.tv_usec -= 1000000;
3251
                }
3252
                /* If paused, wait some more */
3253
                if(!mountpoint->enabled)
3254
                        continue;
3255
                /* Read frame from file... */
3256
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
3257
                if(feof(audio)) {
3258
                        /* FIXME We're doing this forever... should this be configurable? */
3259
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
3260
                        fseek(audio, 0, SEEK_SET);
3261
                        continue;
3262
                }
3263
                if(read < 0)
3264
                        break;
3265
                if(mountpoint->active == FALSE)
3266
                        mountpoint->active = TRUE;
3267
                // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3268
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3269
                // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3270
                /* Relay on all sessions */
3271
                packet.data = header;
3272
                packet.length = RTP_HEADER_SIZE + read;
3273
                packet.is_video = 0;
3274
                packet.is_keyframe = 0;
3275
                /* Backup the actual timestamp and sequence number */
3276
                packet.timestamp = ntohl(packet.data->timestamp);
3277
                packet.seq_number = ntohs(packet.data->seq_number);
3278
                /* Go! */
3279
                janus_mutex_lock_nodebug(&mountpoint->mutex);
3280
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3281
                janus_mutex_unlock_nodebug(&mountpoint->mutex);
3282
                /* Update header */
3283
                seq++;
3284
                header->seq_number = htons(seq);
3285
                ts += 160;
3286
                header->timestamp = htonl(ts);
3287
                header->markerbit = 0;
3288
        }
3289
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name);
3290
        g_free(name);
3291
        g_free(buf);
3292
        fclose(audio);
3293
        g_thread_unref(g_thread_self());
3294
        return NULL;
3295
}
3296
                
3297
/* FIXME Test thread to relay RTP frames coming from gstreamer/ffmpeg/others */
3298
static void *janus_streaming_relay_thread(void *data) {
3299
        JANUS_LOG(LOG_VERB, "Starting streaming relay thread\n");
3300
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3301
        if(!mountpoint) {
3302
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3303
                g_thread_unref(g_thread_self());
3304
                return NULL;
3305
        }
3306
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
3307
                JANUS_LOG(LOG_ERR, "[%s] Not an RTP source mountpoint!\n", mountpoint->name);
3308
                g_thread_unref(g_thread_self());
3309
                return NULL;
3310
        }
3311
        janus_streaming_rtp_source *source = mountpoint->source;
3312
        if(source == NULL) {
3313
                JANUS_LOG(LOG_ERR, "[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
3314
                g_thread_unref(g_thread_self());
3315
                return NULL;
3316
        }
3317
        int audio_fd = source->audio_fd;
3318
        int video_fd = source->video_fd;
3319
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3320
        /* Needed to fix seq and ts */
3321
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
3322
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
3323
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
3324
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
3325
        /* File descriptors */
3326
        socklen_t addrlen;
3327
        struct sockaddr_in remote;
3328
        int resfd = 0, bytes = 0;
3329
        struct pollfd fds[2];
3330
        char buffer[1500];
3331
        memset(buffer, 0, 1500);
3332
        /* Loop */
3333
        int num = 0;
3334
        janus_streaming_rtp_relay_packet packet;
3335
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3336
                /* Prepare poll */
3337
                num = 0;
3338
                if(audio_fd != -1) {
3339
                        fds[num].fd = audio_fd;
3340
                        fds[num].events = POLLIN;
3341
                        fds[num].revents = 0;
3342
                        num++;
3343
                }
3344
                if(video_fd != -1) {
3345
                        fds[num].fd = video_fd;
3346
                        fds[num].events = POLLIN;
3347
                        fds[num].revents = 0;
3348
                        num++;
3349
                }
3350
                /* Wait for some data */
3351
                resfd = poll(fds, num, 1000);
3352
                if(resfd < 0) {
3353
                        JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", mountpoint->name, errno, strerror(errno));
3354
                        mountpoint->enabled = FALSE;
3355
                        break;
3356
                } else if(resfd == 0) {
3357
                        /* No data, keep going */
3358
                        continue;
3359
                }
3360
                int i = 0;
3361
                for(i=0; i<num; i++) {
3362
                        if(fds[i].revents & (POLLERR | POLLHUP)) {
3363
                                /* Socket error? */
3364
                                JANUS_LOG(LOG_ERR, "[%s] Error polling: %s... %d (%s)\n", mountpoint->name,
3365
                                        fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, strerror(errno));
3366
                                mountpoint->enabled = FALSE;
3367
                                break;
3368
                        } else if(fds[i].revents & POLLIN) {
3369
                                /* Got an RTP packet */
3370
                                if(audio_fd != -1 && fds[i].fd == audio_fd) {
3371
                                        /* Got something audio (RTP) */
3372
                                        if(mountpoint->active == FALSE)
3373
                                                mountpoint->active = TRUE;
3374
                                        source->last_received_audio = janus_get_monotonic_time();
3375
                                        addrlen = sizeof(remote);
3376
                                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3377
                                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the audio channel...\n", bytes);
3378
                                        /* If paused, ignore this packet */
3379
                                        if(!mountpoint->enabled)
3380
                                                continue;
3381
                                        rtp_header *rtp = (rtp_header *)buffer;
3382
                                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3383
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3384
                                        /* Relay on all sessions */
3385
                                        packet.data = rtp;
3386
                                        packet.length = bytes;
3387
                                        packet.is_video = 0;
3388
                                        packet.is_keyframe = 0;
3389
                                        /* Do we have a new stream? */
3390
                                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
3391
                                                a_last_ssrc = ntohl(packet.data->ssrc);
3392
                                                JANUS_LOG(LOG_INFO, "[%s] New audio stream! (ssrc=%u)\n", name, a_last_ssrc);
3393
                                                a_base_ts_prev = a_last_ts;
3394
                                                a_base_ts = ntohl(packet.data->timestamp);
3395
                                                a_base_seq_prev = a_last_seq;
3396
                                                a_base_seq = ntohs(packet.data->seq_number);
3397
                                        }
3398
                                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
3399
                                        packet.data->timestamp = htonl(a_last_ts);
3400
                                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
3401
                                        packet.data->seq_number = htons(a_last_seq);
3402
                                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3403
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3404
                                        packet.data->type = mountpoint->codecs.audio_pt;
3405
                                        /* Is there a recorder? */
3406
                                        janus_recorder_save_frame(source->arc, buffer, bytes);
3407
                                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3408
                                        packet.timestamp = ntohl(packet.data->timestamp);
3409
                                        packet.seq_number = ntohs(packet.data->seq_number);
3410
                                        /* Go! */
3411
                                        janus_mutex_lock(&mountpoint->mutex);
3412
                                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3413
                                        janus_mutex_unlock(&mountpoint->mutex);
3414
                                        continue;
3415
                                } else if(video_fd != -1 && fds[i].fd == video_fd) {
3416
                                        /* Got something video (RTP) */
3417
                                        if(mountpoint->active == FALSE)
3418
                                                mountpoint->active = TRUE;
3419
                                        source->last_received_video = janus_get_monotonic_time();
3420
                                        addrlen = sizeof(remote);
3421
                                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3422
                                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the video channel...\n", bytes);
3423
                                        rtp_header *rtp = (rtp_header *)buffer;
3424
                                        /* First of all, let's check if this is (part of) a keyframe that we may need to save it for future reference */
3425
                                        if(source->keyframe.enabled) {
3426
                                                if(source->keyframe.temp_ts > 0 && ntohl(rtp->timestamp) != source->keyframe.temp_ts) {
3427
                                                        /* We received the last part of the keyframe, get rid of the old one and use this from now on */
3428
                                                        JANUS_LOG(LOG_HUGE, "[%s] ... ... last part of keyframe received! ts=%"SCNu32", %d packets\n",
3429
                                                                mountpoint->name, source->keyframe.temp_ts, g_list_length(source->keyframe.temp_keyframe));
3430
                                                        source->keyframe.temp_ts = 0;
3431
                                                        janus_mutex_lock(&source->keyframe.mutex);
3432
                                                        GList *temp = NULL;
3433
                                                        while(source->keyframe.latest_keyframe) {
3434
                                                                temp = g_list_first(source->keyframe.latest_keyframe);
3435
                                                                source->keyframe.latest_keyframe = g_list_remove_link(source->keyframe.latest_keyframe, temp);
3436
                                                                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
3437
                                                                g_free(pkt->data);
3438
                                                                g_free(pkt);
3439
                                                                g_list_free(temp);
3440
                                                        }
3441
                                                        source->keyframe.latest_keyframe = source->keyframe.temp_keyframe;
3442
                                                        source->keyframe.temp_keyframe = NULL;
3443
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3444
                                                } else if(ntohl(rtp->timestamp) == source->keyframe.temp_ts) {
3445
                                                        /* Part of the keyframe we're currently saving, store */
3446
                                                        janus_mutex_lock(&source->keyframe.mutex);
3447
                                                        JANUS_LOG(LOG_HUGE, "[%s] ... other part of keyframe received! ts=%"SCNu32"\n", mountpoint->name, source->keyframe.temp_ts);
3448
                                                        janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
3449
                                                        pkt->data = g_malloc0(bytes);
3450
                                                        memcpy(pkt->data, buffer, bytes);
3451
                                                        pkt->data->ssrc = htons(1);
3452
                                                        pkt->data->type = mountpoint->codecs.video_pt;
3453
                                                        pkt->is_video = 1;
3454
                                                        pkt->is_keyframe = 1;
3455
                                                        pkt->length = bytes;
3456
                                                        pkt->timestamp = source->keyframe.temp_ts;
3457
                                                        pkt->seq_number = ntohs(rtp->seq_number);
3458
                                                        source->keyframe.temp_keyframe = g_list_append(source->keyframe.temp_keyframe, pkt);
3459
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3460
                                                } else if(janus_streaming_is_keyframe(mountpoint->codecs.video_codec, buffer, bytes)) {
3461
                                                        /* New keyframe, start saving it */
3462
                                                        source->keyframe.temp_ts = ntohl(rtp->timestamp);
3463
                                                        JANUS_LOG(LOG_HUGE, "[%s] New keyframe received! ts=%"SCNu32"\n", mountpoint->name, source->keyframe.temp_ts);
3464
                                                        janus_mutex_lock(&source->keyframe.mutex);
3465
                                                        janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
3466
                                                        pkt->data = g_malloc0(bytes);
3467
                                                        memcpy(pkt->data, buffer, bytes);
3468
                                                        pkt->data->ssrc = htons(1);
3469
                                                        pkt->data->type = mountpoint->codecs.video_pt;
3470
                                                        pkt->is_video = 1;
3471
                                                        pkt->is_keyframe = 1;
3472
                                                        pkt->length = bytes;
3473
                                                        pkt->timestamp = source->keyframe.temp_ts;
3474
                                                        pkt->seq_number = ntohs(rtp->seq_number);
3475
                                                        source->keyframe.temp_keyframe = g_list_append(source->keyframe.temp_keyframe, pkt);
3476
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3477
                                                }
3478
                                        }
3479
                                        /* If paused, ignore this packet */
3480
                                        if(!mountpoint->enabled)
3481
                                                continue;
3482
                                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3483
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3484
                                        /* Relay on all sessions */
3485
                                        packet.data = rtp;
3486
                                        packet.length = bytes;
3487
                                        packet.is_video = 1;
3488
                                        packet.is_keyframe = 0;
3489
                                        /* Do we have a new stream? */
3490
                                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
3491
                                                v_last_ssrc = ntohl(packet.data->ssrc);
3492
                                                JANUS_LOG(LOG_INFO, "[%s] New video stream! (ssrc=%u)\n", name, v_last_ssrc);
3493
                                                v_base_ts_prev = v_last_ts;
3494
                                                v_base_ts = ntohl(packet.data->timestamp);
3495
                                                v_base_seq_prev = v_last_seq;
3496
                                                v_base_seq = ntohs(packet.data->seq_number);
3497
                                        }
3498
                                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
3499
                                        packet.data->timestamp = htonl(v_last_ts);
3500
                                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
3501
                                        packet.data->seq_number = htons(v_last_seq);
3502
                                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3503
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3504
                                        packet.data->type = mountpoint->codecs.video_pt;
3505
                                        /* Is there a recorder? */
3506
                                        janus_recorder_save_frame(source->vrc, buffer, bytes);
3507
                                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3508
                                        packet.timestamp = ntohl(packet.data->timestamp);
3509
                                        packet.seq_number = ntohs(packet.data->seq_number);
3510
                                        /* Go! */
3511
                                        janus_mutex_lock(&mountpoint->mutex);
3512
                                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3513
                                        janus_mutex_unlock(&mountpoint->mutex);
3514
                                        continue;
3515
                                }
3516
                        }
3517
                }
3518
        }
3519

    
3520
        /* Notify users this mountpoint is done */
3521
        janus_mutex_lock(&mountpoint->mutex);
3522
        GList *viewer = g_list_first(mountpoint->listeners);
3523
        /* Prepare JSON event */
3524
        json_t *event = json_object();
3525
        json_object_set_new(event, "streaming", json_string("event"));
3526
        json_t *result = json_object();
3527
        json_object_set_new(result, "status", json_string("stopped"));
3528
        json_object_set_new(event, "result", result);
3529
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
3530
        json_decref(event);
3531
        while(viewer) {
3532
                janus_streaming_session *session = (janus_streaming_session *)viewer->data;
3533
                if(session != NULL) {
3534
                        session->stopping = TRUE;
3535
                        session->started = FALSE;
3536
                        session->paused = FALSE;
3537
                        session->mountpoint = NULL;
3538
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
3539
                        gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
3540
                        gateway->close_pc(session->handle);
3541
                }
3542
                mountpoint->listeners = g_list_remove_all(mountpoint->listeners, session);
3543
                viewer = g_list_first(mountpoint->listeners);
3544
        }
3545
        g_free(event_text);
3546
        janus_mutex_unlock(&mountpoint->mutex);
3547

    
3548
        JANUS_LOG(LOG_VERB, "[%s] Leaving streaming relay thread\n", name);
3549
        g_free(name);
3550
        g_thread_unref(g_thread_self());
3551
        return NULL;
3552
}
3553

    
3554
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
3555
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
3556
        if(!packet || !packet->data || packet->length < 1) {
3557
                JANUS_LOG(LOG_ERR, "Invalid packet...\n");
3558
                return;
3559
        }
3560
        janus_streaming_session *session = (janus_streaming_session *)data;
3561
        if(!session || !session->handle) {
3562
                //~ JANUS_LOG(LOG_ERR, "Invalid session...\n");
3563
                return;
3564
        }
3565
        if(!packet->is_keyframe && (!session->started || session->paused)) {
3566
                //~ JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n");
3567
                return;
3568
        }
3569

    
3570
        /* Make sure there hasn't been a publisher switch by checking the SSRC */
3571
        if(packet->is_video) {
3572
                if(ntohl(packet->data->ssrc) != session->context.v_last_ssrc) {
3573
                        session->context.v_last_ssrc = ntohl(packet->data->ssrc);
3574
                        session->context.v_base_ts_prev = session->context.v_last_ts;
3575
                        session->context.v_base_ts = packet->timestamp;
3576
                        session->context.v_base_seq_prev = session->context.v_last_seq;
3577
                        session->context.v_base_seq = packet->seq_number;
3578
                }
3579
                /* Compute a coherent timestamp and sequence number */
3580
                session->context.v_last_ts = (packet->timestamp-session->context.v_base_ts)
3581
                        + session->context.v_base_ts_prev+4500;        /* FIXME When switching, we assume 15fps */
3582
                session->context.v_last_seq = (packet->seq_number-session->context.v_base_seq)+session->context.v_base_seq_prev+1;
3583
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3584
                packet->data->timestamp = htonl(session->context.v_last_ts);
3585
                packet->data->seq_number = htons(session->context.v_last_seq);
3586
                if(gateway != NULL)
3587
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3588
                /* Restore the timestamp and sequence number to what the publisher set them to */
3589
                packet->data->timestamp = htonl(packet->timestamp);
3590
                packet->data->seq_number = htons(packet->seq_number);
3591
        } else {
3592
                if(ntohl(packet->data->ssrc) != session->context.a_last_ssrc) {
3593
                        session->context.a_last_ssrc = ntohl(packet->data->ssrc);
3594
                        session->context.a_base_ts_prev = session->context.a_last_ts;
3595
                        session->context.a_base_ts = packet->timestamp;
3596
                        session->context.a_base_seq_prev = session->context.a_last_seq;
3597
                        session->context.a_base_seq = packet->seq_number;
3598
                }
3599
                /* Compute a coherent timestamp and sequence number */
3600
                session->context.a_last_ts = (packet->timestamp-session->context.a_base_ts)
3601
                        + session->context.a_base_ts_prev+960;        /* FIXME When switching, we assume Opus and so a 960 ts step */
3602
                session->context.a_last_seq = (packet->seq_number-session->context.a_base_seq)+session->context.a_base_seq_prev+1;
3603
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3604
                packet->data->timestamp = htonl(session->context.a_last_ts);
3605
                packet->data->seq_number = htons(session->context.a_last_seq);
3606
                if(gateway != NULL)
3607
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3608
                /* Restore the timestamp and sequence number to what the publisher set them to */
3609
                packet->data->timestamp = htonl(packet->timestamp);
3610
                packet->data->seq_number = htons(packet->seq_number);
3611
        }
3612

    
3613
        return;
3614
}
3615

    
3616
/* Helpers to check if frame is a key frame (see post processor code) */
3617
#if defined(__ppc__) || defined(__ppc64__)
3618
        # define swap2(d)  \
3619
        ((d&0x000000ff)<<8) |  \
3620
        ((d&0x0000ff00)>>8)
3621
#else
3622
        # define swap2(d) d
3623
#endif
3624

    
3625
static gboolean janus_streaming_is_keyframe(gint codec, char* buffer, int len) {
3626
        if(codec == JANUS_STREAMING_VP8) {
3627
                /* VP8 packet */
3628
                if(!buffer || len < 28)
3629
                        return FALSE;
3630
                /* Parse RTP header first */
3631
                rtp_header *header = (rtp_header *)buffer;
3632
                guint32 timestamp = ntohl(header->timestamp);
3633
                guint16 seq = ntohs(header->seq_number);
3634
                JANUS_LOG(LOG_HUGE, "Checking if VP8 packet (size=%d, seq=%"SCNu16", ts=%"SCNu32") is a key frame...\n",
3635
                        len, seq, timestamp);
3636
                uint16_t skip = 0;
3637
                if(header->extension) {
3638
                        janus_rtp_header_extension *ext = (janus_rtp_header_extension *)(buffer+12);
3639
                        JANUS_LOG(LOG_HUGE, "  -- RTP extension found (type=%"SCNu16", length=%"SCNu16")\n",
3640
                                ntohs(ext->type), ntohs(ext->length));
3641
                        skip = 4 + ntohs(ext->length)*4;
3642
                }
3643
                buffer += 12+skip;
3644
                /* Parse VP8 header now */
3645
                uint8_t vp8pd = *buffer;
3646
                uint8_t xbit = (vp8pd & 0x80);
3647
                uint8_t sbit = (vp8pd & 0x10);
3648
                if(xbit) {
3649
                        JANUS_LOG(LOG_HUGE, "  -- X bit is set!\n");
3650
                        /* Read the Extended control bits octet */
3651
                        buffer++;
3652
                        vp8pd = *buffer;
3653
                        uint8_t ibit = (vp8pd & 0x80);
3654
                        uint8_t lbit = (vp8pd & 0x40);
3655
                        uint8_t tbit = (vp8pd & 0x20);
3656
                        uint8_t kbit = (vp8pd & 0x10);
3657
                        if(ibit) {
3658
                                JANUS_LOG(LOG_HUGE, "  -- I bit is set!\n");
3659
                                /* Read the PictureID octet */
3660
                                buffer++;
3661
                                vp8pd = *buffer;
3662
                                uint16_t picid = vp8pd, wholepicid = picid;
3663
                                uint8_t mbit = (vp8pd & 0x80);
3664
                                if(mbit) {
3665
                                        JANUS_LOG(LOG_HUGE, "  -- M bit is set!\n");
3666
                                        memcpy(&picid, buffer, sizeof(uint16_t));
3667
                                        wholepicid = ntohs(picid);
3668
                                        picid = (wholepicid & 0x7FFF);
3669
                                        buffer++;
3670
                                }
3671
                                JANUS_LOG(LOG_HUGE, "  -- -- PictureID: %"SCNu16"\n", picid);
3672
                        }
3673
                        if(lbit) {
3674
                                JANUS_LOG(LOG_HUGE, "  -- L bit is set!\n");
3675
                                /* Read the TL0PICIDX octet */
3676
                                buffer++;
3677
                                vp8pd = *buffer;
3678
                        }
3679
                        if(tbit || kbit) {
3680
                                JANUS_LOG(LOG_HUGE, "  -- T/K bit is set!\n");
3681
                                /* Read the TID/KEYIDX octet */
3682
                                buffer++;
3683
                                vp8pd = *buffer;
3684
                        }
3685
                        buffer++;        /* Now we're in the payload */
3686
                        if(sbit) {
3687
                                JANUS_LOG(LOG_HUGE, "  -- S bit is set!\n");
3688
                                unsigned long int vp8ph = 0;
3689
                                memcpy(&vp8ph, buffer, 4);
3690
                                vp8ph = ntohl(vp8ph);
3691
                                uint8_t pbit = ((vp8ph & 0x01000000) >> 24);
3692
                                if(!pbit) {
3693
                                        JANUS_LOG(LOG_HUGE, "  -- P bit is NOT set!\n");
3694
                                        /* It is a key frame! Get resolution for debugging */
3695
                                        unsigned char *c = (unsigned char *)buffer+3;
3696
                                        /* vet via sync code */
3697
                                        if(c[0]!=0x9d||c[1]!=0x01||c[2]!=0x2a) {
3698
                                                JANUS_LOG(LOG_WARN, "First 3-bytes after header not what they're supposed to be?\n");
3699
                                        } else {
3700
                                                int vp8w = swap2(*(unsigned short*)(c+3))&0x3fff;
3701
                                                int vp8ws = swap2(*(unsigned short*)(c+3))>>14;
3702
                                                int vp8h = swap2(*(unsigned short*)(c+5))&0x3fff;
3703
                                                int vp8hs = swap2(*(unsigned short*)(c+5))>>14;
3704
                                                JANUS_LOG(LOG_HUGE, "Got a VP8 key frame: %dx%d (scale=%dx%d)\n", vp8w, vp8h, vp8ws, vp8hs);
3705
                                                return TRUE;
3706
                                        }
3707
                                }
3708
                        }
3709
                }
3710
                /* If we got here it's not a key frame */
3711
                return FALSE;
3712
        } else if(codec == JANUS_STREAMING_H264) {
3713
                /* Parse RTP header first */
3714
                rtp_header *header = (rtp_header *)buffer;
3715
                guint32 timestamp = ntohl(header->timestamp);
3716
                guint16 seq = ntohs(header->seq_number);
3717
                JANUS_LOG(LOG_HUGE, "Checking if H264 packet (size=%d, seq=%"SCNu16", ts=%"SCNu32") is a key frame...\n",
3718
                        len, seq, timestamp);
3719
                uint16_t skip = 0;
3720
                if(header->extension) {
3721
                        janus_rtp_header_extension *ext = (janus_rtp_header_extension *)(buffer+12);
3722
                        JANUS_LOG(LOG_HUGE, "  -- RTP extension found (type=%"SCNu16", length=%"SCNu16")\n",
3723
                                ntohs(ext->type), ntohs(ext->length));
3724
                        skip = 4 + ntohs(ext->length)*4;
3725
                }
3726
                buffer += 12+skip;
3727
                /* Parse H264 header now */
3728
                uint8_t fragment = *buffer & 0x1F;
3729
                uint8_t nal = *(buffer+1) & 0x1F;
3730
                uint8_t start_bit = *(buffer+1) & 0x80;
3731
                JANUS_LOG(LOG_HUGE, "Fragment=%d, NAL=%d, Start=%d\n", fragment, nal, start_bit);
3732
                if(fragment == 5 ||
3733
                                ((fragment == 28 || fragment == 29) && nal == 5 && start_bit == 128)) {
3734
                        JANUS_LOG(LOG_HUGE, "Got an H264 key frame\n");
3735
                        return TRUE;
3736
                }
3737
                /* If we got here it's not a key frame */
3738
                return FALSE;
3739
        } else {
3740
                /* FIXME Not a clue */
3741
                return FALSE;
3742
        }
3743
}