Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ d223cef1

History | View | Annotate | Download (142 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand|rtsp
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
       rtsp = stream originated by an external RTSP feed (only
50
              available if libcurl support was compiled)
51
id = <unique numeric ID>
52
description = This is my awesome stream
53
is_private = yes|no (private streams don't appear when you do a 'list' request)
54
filename = path to the local file to stream (only for live/ondemand)
55
secret = <optional password needed for manipulating (e.g., destroying
56
                or enabling/disabling) the stream>
57
pin = <optional password needed for watching the stream>
58
audio = yes|no (do/don't stream audio)
59
video = yes|no (do/don't stream video)
60
   The following options are only valid for the 'rtp' type:
61
audioport = local port for receiving audio frames
62
audiomcast = multicast group port for receiving audio frames, if any
63
audiopt = <audio RTP payload type> (e.g., 111)
64
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
65
audiofmtp = Codec specific parameters, if any
66
videoport = local port for receiving video frames (only for rtp)
67
videomcast = multicast group port for receiving video frames, if any
68
videopt = <video RTP payload type> (e.g., 100)
69
videortpmap = RTP map of the video codec (e.g., VP8/90000)
70
videofmtp = Codec specific parameters, if any
71
videobufferkf = yes|no (whether the plugin should store the latest
72
        keyframe and send it immediately for new viewers, EXPERIMENTAL)
73

74
   The following options are only valid for the 'rstp' type:
75
url = RTSP stream URL (only if type=rtsp)
76
\endverbatim
77
 *
78
 * \section streamapi Streaming API
79
 * 
80
 * The Streaming API supports several requests, some of which are
81
 * synchronous and some asynchronous. There are some situations, though,
82
 * (invalid JSON, invalid request) which will always result in a
83
 * synchronous error response even for asynchronous requests. 
84
 * 
85
 * \c list , \c create , \c destroy , \c recording , \c enable and
86
 * \c disable are synchronous requests, which means you'll
87
 * get a response directly within the context of the transaction. \c list
88
 * lists all the available streams; \c create allows you to create a new
89
 * mountpoint dynamically, as an alternative to using the configuration
90
 * file; \c destroy removes a mountpoint and destroys it; \c recording
91
 * instructs the plugin on whether or not a live RTP stream should be
92
 * recorded while it's broadcasted; \c enable and \c disable respectively
93
 * enable and disable a mountpoint, that is decide whether or not a
94
 * mountpoint should be available to users without destroying it.
95
 * 
96
 * The \c watch , \c start , \c pause , \c switch and \c stop requests
97
 * instead are all asynchronous, which means you'll get a notification
98
 * about their success or failure in an event. \c watch asks the plugin
99
 * to prepare the playout of one of the available streams; \c start
100
 * starts the actual playout; \c pause allows you to pause a playout
101
 * without tearing down the PeerConnection; \c switch allows you to
102
 * switch to a different mountpoint of the same kind (note: only live
103
 * RTP mountpoints supported as of now) without having to stop and watch
104
 * the new one; \c stop stops the playout and tears the PeerConnection
105
 * down.
106
 * 
107
 * Actual API docs: TBD.
108
 * 
109
 * \ingroup plugins
110
 * \ref plugins
111
 */
112

    
113
#include "plugin.h"
114

    
115
#include <jansson.h>
116
#include <errno.h>
117
#include <sys/poll.h>
118
#include <sys/time.h>
119

    
120
#ifdef HAVE_LIBCURL
121
#include <curl/curl.h>
122
#endif
123

    
124
#include "../debug.h"
125
#include "../apierror.h"
126
#include "../config.h"
127
#include "../mutex.h"
128
#include "../rtp.h"
129
#include "../rtcp.h"
130
#include "../record.h"
131
#include "../utils.h"
132

    
133

    
134
/* Plugin information */
135
#define JANUS_STREAMING_VERSION                        5
136
#define JANUS_STREAMING_VERSION_STRING        "0.0.5"
137
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
138
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
139
#define JANUS_STREAMING_AUTHOR                        "Meetecho s.r.l."
140
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
141

    
142
/* Plugin methods */
143
janus_plugin *create(void);
144
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
145
void janus_streaming_destroy(void);
146
int janus_streaming_get_api_compatibility(void);
147
int janus_streaming_get_version(void);
148
const char *janus_streaming_get_version_string(void);
149
const char *janus_streaming_get_description(void);
150
const char *janus_streaming_get_name(void);
151
const char *janus_streaming_get_author(void);
152
const char *janus_streaming_get_package(void);
153
void janus_streaming_create_session(janus_plugin_session *handle, int *error);
154
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
155
void janus_streaming_setup_media(janus_plugin_session *handle);
156
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
157
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
158
void janus_streaming_hangup_media(janus_plugin_session *handle);
159
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
160
char *janus_streaming_query_session(janus_plugin_session *handle);
161

    
162
/* Plugin setup */
163
static janus_plugin janus_streaming_plugin =
164
        JANUS_PLUGIN_INIT (
165
                .init = janus_streaming_init,
166
                .destroy = janus_streaming_destroy,
167

    
168
                .get_api_compatibility = janus_streaming_get_api_compatibility,
169
                .get_version = janus_streaming_get_version,
170
                .get_version_string = janus_streaming_get_version_string,
171
                .get_description = janus_streaming_get_description,
172
                .get_name = janus_streaming_get_name,
173
                .get_author = janus_streaming_get_author,
174
                .get_package = janus_streaming_get_package,
175
                
176
                .create_session = janus_streaming_create_session,
177
                .handle_message = janus_streaming_handle_message,
178
                .setup_media = janus_streaming_setup_media,
179
                .incoming_rtp = janus_streaming_incoming_rtp,
180
                .incoming_rtcp = janus_streaming_incoming_rtcp,
181
                .hangup_media = janus_streaming_hangup_media,
182
                .destroy_session = janus_streaming_destroy_session,
183
                .query_session = janus_streaming_query_session,
184
        );
185

    
186
/* Plugin creator */
187
janus_plugin *create(void) {
188
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_STREAMING_NAME);
189
        return &janus_streaming_plugin;
190
}
191

    
192
/* Parameter validation */
193
static struct janus_json_parameter request_parameters[] = {
194
        {"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
195
};
196
static struct janus_json_parameter id_parameters[] = {
197
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE}
198
};
199
static struct janus_json_parameter create_parameters[] = {
200
        {"type", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
201
        {"secret", JSON_STRING, 0},
202
        {"pin", JSON_STRING, 0},
203
        {"permanent", JANUS_JSON_BOOL, 0}
204
};
205
static struct janus_json_parameter rtp_parameters[] = {
206
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
207
        {"name", JSON_STRING, 0},
208
        {"description", JSON_STRING, 0},
209
        {"is_private", JANUS_JSON_BOOL, 0},
210
        {"audio", JANUS_JSON_BOOL, 0},
211
        {"video", JANUS_JSON_BOOL, 0}
212
};
213
static struct janus_json_parameter live_parameters[] = {
214
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
215
        {"name", JSON_STRING, 0},
216
        {"description", JSON_STRING, 0},
217
        {"is_private", JANUS_JSON_BOOL, 0},
218
        {"file", JSON_STRING, 0},
219
        {"audio", JANUS_JSON_BOOL, 0},
220
        {"video", JANUS_JSON_BOOL, 0}
221
};
222
static struct janus_json_parameter ondemand_parameters[] = {
223
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
224
        {"name", JSON_STRING, 0},
225
        {"description", JSON_STRING, 0},
226
        {"is_private", JANUS_JSON_BOOL, 0},
227
        {"file", JSON_STRING, 0},
228
        {"audio", JANUS_JSON_BOOL, 0},
229
        {"video", JANUS_JSON_BOOL, 0}
230
};
231
static struct janus_json_parameter rtsp_parameters[] = {
232
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
233
        {"name", JSON_STRING, 0},
234
        {"description", JSON_STRING, 0},
235
        {"is_private", JANUS_JSON_BOOL, 0},
236
        {"url", JSON_STRING, 0},
237
        {"audio", JANUS_JSON_BOOL, 0},
238
        {"video", JANUS_JSON_BOOL, 0}
239
};
240
static struct janus_json_parameter rtp_audio_parameters[] = {
241
        {"audiomcast", JSON_STRING, 0},
242
        {"audioport", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
243
        {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
244
        {"audiortpmap", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
245
        {"audiofmtp", JSON_STRING, 0}
246
};
247
static struct janus_json_parameter rtp_video_parameters[] = {
248
        {"videomcast", JSON_STRING, 0},
249
        {"videoport", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
250
        {"videopt", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
251
        {"videortpmap", JSON_STRING, JANUS_JSON_PARAM_REQUIRED},
252
        {"videofmtp", JSON_STRING, 0},
253
        {"videobufferkf", JANUS_JSON_BOOL, 0}
254
};
255
static struct janus_json_parameter destroy_parameters[] = {
256
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
257
        {"permanent", JANUS_JSON_BOOL, 0}
258
};
259
static struct janus_json_parameter recording_parameters[] = {
260
        {"id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE},
261
        {"action", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
262
};
263
static struct janus_json_parameter recording_start_parameters[] = {
264
        {"audio", JSON_STRING, 0},
265
        {"video", JSON_STRING, 0}
266
};
267
static struct janus_json_parameter recording_stop_parameters[] = {
268
        {"audio", JANUS_JSON_BOOL, 0},
269
        {"video", JANUS_JSON_BOOL, 0}
270
};
271

    
272
/* Static configuration instance */
273
static janus_config *config = NULL;
274
static const char *config_folder = NULL;
275
static janus_mutex config_mutex;
276

    
277
/* Useful stuff */
278
static volatile gint initialized = 0, stopping = 0;
279
static janus_callbacks *gateway = NULL;
280
static GThread *handler_thread;
281
static GThread *watchdog;
282
static void *janus_streaming_handler(void *data);
283
static void *janus_streaming_ondemand_thread(void *data);
284
static void *janus_streaming_filesource_thread(void *data);
285
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
286
static void *janus_streaming_relay_thread(void *data);
287
static gboolean janus_streaming_is_keyframe(gint codec, char* buffer, int len);
288

    
289
typedef enum janus_streaming_type {
290
        janus_streaming_type_none = 0,
291
        janus_streaming_type_live,
292
        janus_streaming_type_on_demand,
293
} janus_streaming_type;
294

    
295
typedef enum janus_streaming_source {
296
        janus_streaming_source_none = 0,
297
        janus_streaming_source_file,
298
        janus_streaming_source_rtp,
299
} janus_streaming_source;
300

    
301
typedef struct janus_streaming_rtp_keyframe {
302
        gboolean enabled;
303
        /* If enabled, we store the packets of the last keyframe, to immediately send them for new viewers */
304
        GList *latest_keyframe;
305
        /* This is where we store packets while we're still collecting the whole keyframe */
306
        GList *temp_keyframe;
307
        guint32 temp_ts;
308
        janus_mutex mutex;
309
} janus_streaming_rtp_keyframe;
310

    
311
typedef struct janus_streaming_rtp_source {
312
        gint audio_port;
313
        in_addr_t audio_mcast;
314
        gint video_port;
315
        in_addr_t video_mcast;
316
        janus_recorder *arc;        /* The Janus recorder instance for this streams's audio, if enabled */
317
        janus_recorder *vrc;        /* The Janus recorder instance for this streams's video, if enabled */
318
        janus_mutex rec_mutex;        /* Mutex to protect the recorders from race conditions */
319
        int audio_fd;
320
        int video_fd;
321
        gint64 last_received_video;
322
        gint64 last_received_audio;
323
#ifdef HAVE_LIBCURL
324
        CURL* curl;
325
#endif
326
        janus_streaming_rtp_keyframe keyframe;
327
} janus_streaming_rtp_source;
328

    
329
typedef struct janus_streaming_file_source {
330
        char *filename;
331
} janus_streaming_file_source;
332

    
333
#define JANUS_STREAMING_VP8                0
334
#define JANUS_STREAMING_H264        1
335
#define JANUS_STREAMING_VP9                2
336
typedef struct janus_streaming_codecs {
337
        gint audio_pt;
338
        char *audio_rtpmap;
339
        char *audio_fmtp;
340
        gint video_codec;
341
        gint video_pt;
342
        char *video_rtpmap;
343
        char *video_fmtp;
344
} janus_streaming_codecs;
345

    
346
typedef struct janus_streaming_mountpoint {
347
        gint64 id;
348
        char *name;
349
        char *description;
350
        gboolean is_private;
351
        char *secret;
352
        char *pin;
353
        gboolean enabled;
354
        gboolean active;
355
        janus_streaming_type streaming_type;
356
        janus_streaming_source streaming_source;
357
        void *source;        /* Can differ according to the source type */
358
        GDestroyNotify source_destroy;
359
        janus_streaming_codecs codecs;
360
        GList/*<unowned janus_streaming_session>*/ *listeners;
361
        gint64 destroyed;
362
        janus_mutex mutex;
363
} janus_streaming_mountpoint;
364
GHashTable *mountpoints;
365
static GList *old_mountpoints;
366
janus_mutex mountpoints_mutex;
367

    
368
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp);
369

    
370
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
371
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
372
                uint64_t id, char *name, char *desc,
373
                gboolean doaudio, char* amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
374
                gboolean dovideo, char* vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf);
375
/* Helper to create a file/ondemand live source */
376
janus_streaming_mountpoint *janus_streaming_create_file_source(
377
                uint64_t id, char *name, char *desc, char *filename,
378
                gboolean live, gboolean doaudio, gboolean dovideo);
379
/* Helper to create a rtsp live source */
380
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
381
                uint64_t id, char *name, char *desc, char *url,
382
                gboolean doaudio, gboolean dovideo);
383

    
384

    
385
typedef struct janus_streaming_message {
386
        janus_plugin_session *handle;
387
        char *transaction;
388
        json_t *message;
389
        char *sdp_type;
390
        char *sdp;
391
} janus_streaming_message;
392
static GAsyncQueue *messages = NULL;
393
static janus_streaming_message exit_message;
394

    
395
static void janus_streaming_message_free(janus_streaming_message *msg) {
396
        if(!msg || msg == &exit_message)
397
                return;
398

    
399
        msg->handle = NULL;
400

    
401
        g_free(msg->transaction);
402
        msg->transaction = NULL;
403
        if(msg->message)
404
                json_decref(msg->message);
405
        msg->message = NULL;
406
        g_free(msg->sdp_type);
407
        msg->sdp_type = NULL;
408
        g_free(msg->sdp);
409
        msg->sdp = NULL;
410

    
411
        g_free(msg);
412
}
413

    
414

    
415
typedef struct janus_streaming_context {
416
        /* Needed to fix seq and ts in case of stream switching */
417
        uint32_t a_last_ssrc, a_last_ts, a_base_ts, a_base_ts_prev,
418
                        v_last_ssrc, v_last_ts, v_base_ts, v_base_ts_prev;
419
        uint16_t a_last_seq, a_base_seq, a_base_seq_prev,
420
                        v_last_seq, v_base_seq, v_base_seq_prev;
421
} janus_streaming_context;
422

    
423
typedef struct janus_streaming_session {
424
        janus_plugin_session *handle;
425
        janus_streaming_mountpoint *mountpoint;
426
        gboolean started;
427
        gboolean paused;
428
        janus_streaming_context context;
429
        gboolean stopping;
430
        volatile gint hangingup;
431
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
432
} janus_streaming_session;
433
static GHashTable *sessions;
434
static GList *old_sessions;
435
static janus_mutex sessions_mutex;
436

    
437
/* Packets we get from gstreamer and relay */
438
typedef struct janus_streaming_rtp_relay_packet {
439
        rtp_header *data;
440
        gint length;
441
        gint is_video;
442
        gint is_keyframe;
443
        uint32_t timestamp;
444
        uint16_t seq_number;
445
} janus_streaming_rtp_relay_packet;
446

    
447

    
448
/* Error codes */
449
#define JANUS_STREAMING_ERROR_NO_MESSAGE                        450
450
#define JANUS_STREAMING_ERROR_INVALID_JSON                        451
451
#define JANUS_STREAMING_ERROR_INVALID_REQUEST                452
452
#define JANUS_STREAMING_ERROR_MISSING_ELEMENT                453
453
#define JANUS_STREAMING_ERROR_INVALID_ELEMENT                454
454
#define JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT        455
455
#define JANUS_STREAMING_ERROR_CANT_CREATE                        456
456
#define JANUS_STREAMING_ERROR_UNAUTHORIZED                        457
457
#define JANUS_STREAMING_ERROR_CANT_SWITCH                        458
458
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR                        470
459

    
460

    
461
/* Streaming watchdog/garbage collector (sort of) */
462
void *janus_streaming_watchdog(void *data);
463
void *janus_streaming_watchdog(void *data) {
464
        JANUS_LOG(LOG_INFO, "Streaming watchdog started\n");
465
        gint64 now = 0;
466
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
467
                janus_mutex_lock(&sessions_mutex);
468
                /* Iterate on all the sessions */
469
                now = janus_get_monotonic_time();
470
                if(old_sessions != NULL) {
471
                        GList *sl = old_sessions;
472
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming sessions...\n", g_list_length(old_sessions));
473
                        while(sl) {
474
                                janus_streaming_session *session = (janus_streaming_session *)sl->data;
475
                                if(!session) {
476
                                        sl = sl->next;
477
                                        continue;
478
                                }
479
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
480
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
481
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming session\n");
482
                                        GList *rm = sl->next;
483
                                        old_sessions = g_list_delete_link(old_sessions, sl);
484
                                        sl = rm;
485
                                        session->handle = NULL;
486
                                        g_free(session);
487
                                        session = NULL;
488
                                        continue;
489
                                }
490
                                sl = sl->next;
491
                        }
492
                }
493
                janus_mutex_unlock(&sessions_mutex);
494
                janus_mutex_lock(&mountpoints_mutex);
495
                /* Iterate on all the mountpoints */
496
                if(old_mountpoints != NULL) {
497
                        GList *sl = old_mountpoints;
498
                        JANUS_LOG(LOG_HUGE, "Checking %d old Streaming mountpoints...\n", g_list_length(old_mountpoints));
499
                        while(sl) {
500
                                janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)sl->data;
501
                                if(!mountpoint) {
502
                                        sl = sl->next;
503
                                        continue;
504
                                }
505
                                if(now-mountpoint->destroyed >= 5*G_USEC_PER_SEC) {
506
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
507
                                        JANUS_LOG(LOG_VERB, "Freeing old Streaming mountpoint\n");
508
                                        GList *rm = sl->next;
509
                                        old_mountpoints = g_list_delete_link(old_mountpoints, sl);
510
                                        sl = rm;
511
                                        janus_streaming_mountpoint_free(mountpoint);
512
                                        mountpoint = NULL;
513
                                        continue;
514
                                }
515
                                sl = sl->next;
516
                        }
517
                }
518
                janus_mutex_unlock(&mountpoints_mutex);
519
                g_usleep(500000);
520
        }
521
        JANUS_LOG(LOG_INFO, "Streaming watchdog stopped\n");
522
        return NULL;
523
}
524

    
525

    
526
/* Plugin implementation */
527
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
528
#ifdef HAVE_LIBCURL        
529
        curl_global_init(CURL_GLOBAL_ALL);
530
#endif        
531
        if(g_atomic_int_get(&stopping)) {
532
                /* Still stopping from before */
533
                return -1;
534
        }
535
        if(callback == NULL || config_path == NULL) {
536
                /* Invalid arguments */
537
                return -1;
538
        }
539

    
540
        /* Read configuration */
541
        char filename[255];
542
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
543
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
544
        config = janus_config_parse(filename);
545
        config_folder = config_path;
546
        if(config != NULL)
547
                janus_config_print(config);
548
        janus_mutex_init(&config_mutex);
549
        
550
        mountpoints = g_hash_table_new(NULL, NULL);
551
        janus_mutex_init(&mountpoints_mutex);
552
        /* Parse configuration to populate the mountpoints */
553
        if(config != NULL) {
554
                GList *cl = janus_config_get_categories(config);
555
                while(cl != NULL) {
556
                        janus_config_category *cat = (janus_config_category *)cl->data;
557
                        if(cat->name == NULL) {
558
                                cl = cl->next;
559
                                continue;
560
                        }
561
                        JANUS_LOG(LOG_VERB, "Adding stream '%s'\n", cat->name);
562
                        janus_config_item *type = janus_config_get_item(cat, "type");
563
                        if(type == NULL || type->value == NULL) {
564
                                JANUS_LOG(LOG_WARN, "  -- Invalid type, skipping stream '%s'...\n", cat->name);
565
                                cl = cl->next;
566
                                continue;
567
                        }
568
                        if(!strcasecmp(type->value, "rtp")) {
569
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
570
                                janus_config_item *id = janus_config_get_item(cat, "id");
571
                                janus_config_item *desc = janus_config_get_item(cat, "description");
572
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
573
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
574
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
575
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
576
                                janus_config_item *video = janus_config_get_item(cat, "video");
577
                                janus_config_item *amcast = janus_config_get_item(cat, "audiomcast");
578
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
579
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
580
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
581
                                janus_config_item *afmtp = janus_config_get_item(cat, "audiofmtp");
582
                                janus_config_item *vmcast = janus_config_get_item(cat, "videomcast");
583
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
584
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
585
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
586
                                janus_config_item *vfmtp = janus_config_get_item(cat, "videofmtp");
587
                                janus_config_item *vkf = janus_config_get_item(cat, "videobufferkf");
588
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
589
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
590
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
591
                                gboolean bufferkf = video && vkf && vkf->value && janus_is_true(vkf->value);
592
                                if(!doaudio && !dovideo) {
593
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', no audio or video have to be streamed...\n", cat->name);
594
                                        cl = cl->next;
595
                                        continue;
596
                                }
597
                                if(doaudio &&
598
                                                (aport == NULL || aport->value == NULL ||
599
                                                acodec == NULL || acodec->value == NULL ||
600
                                                artpmap == NULL || artpmap->value == NULL)) {
601
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for audio...\n", cat->name);
602
                                        cl = cl->next;
603
                                        continue;
604
                                }
605
                                if(dovideo &&
606
                                                (vport == NULL || vport->value == NULL ||
607
                                                vcodec == NULL || vcodec->value == NULL ||
608
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
609
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for video...\n", cat->name);
610
                                        cl = cl->next;
611
                                        continue;
612
                                }
613
                                if(id == NULL || id->value == NULL) {
614
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
615
                                } else {
616
                                        janus_mutex_lock(&mountpoints_mutex);
617
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
618
                                        janus_mutex_unlock(&mountpoints_mutex);
619
                                        if(mp != NULL) {
620
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
621
                                                cl = cl->next;
622
                                                continue;
623
                                        }
624
                                }
625
                                JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
626
                                janus_streaming_mountpoint *mp = NULL;
627
                                if((mp = janus_streaming_create_rtp_source(
628
                                                (id && id->value) ? atol(id->value) : 0,
629
                                                (char *)cat->name,
630
                                                desc ? (char *)desc->value : NULL,
631
                                                doaudio,
632
                                                amcast ? (char *)amcast->value : NULL,
633
                                                (aport && aport->value) ? atoi(aport->value) : 0,
634
                                                (acodec && acodec->value) ? atoi(acodec->value) : 0,
635
                                                artpmap ? (char *)artpmap->value : NULL,
636
                                                afmtp ? (char *)afmtp->value : NULL,
637
                                                dovideo,
638
                                                vmcast ? (char *)vmcast->value : NULL,
639
                                                (vport && vport->value) ? atoi(vport->value) : 0,
640
                                                (vcodec && vcodec->value) ? atoi(vcodec->value) : 0,
641
                                                vrtpmap ? (char *)vrtpmap->value : NULL,
642
                                                vfmtp ? (char *)vfmtp->value : NULL,
643
                                                bufferkf)) == NULL) {
644
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream '%s'...\n", cat->name);
645
                                        cl = cl->next;
646
                                        continue;
647
                                }
648
                                mp->is_private = is_private;
649
                                if(secret && secret->value)
650
                                        mp->secret = g_strdup(secret->value);
651
                                if(pin && pin->value)
652
                                        mp->pin = g_strdup(pin->value);
653
                        } else if(!strcasecmp(type->value, "live")) {
654
                                /* File live source */
655
                                janus_config_item *id = janus_config_get_item(cat, "id");
656
                                janus_config_item *desc = janus_config_get_item(cat, "description");
657
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
658
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
659
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
660
                                janus_config_item *file = janus_config_get_item(cat, "filename");
661
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
662
                                janus_config_item *video = janus_config_get_item(cat, "video");
663
                                if(file == NULL || file->value == NULL) {
664
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', missing mandatory information...\n", cat->name);
665
                                        cl = cl->next;
666
                                        continue;
667
                                }
668
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
669
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
670
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
671
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
672
                                if(!doaudio || dovideo) {
673
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', we only support audio file streaming right now...\n", cat->name);
674
                                        cl = cl->next;
675
                                        continue;
676
                                }
677
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
678
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
679
                                        cl = cl->next;
680
                                        continue;
681
                                }
682
                                if(id == NULL || id->value == NULL) {
683
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
684
                                } else {
685
                                        janus_mutex_lock(&mountpoints_mutex);
686
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
687
                                        janus_mutex_unlock(&mountpoints_mutex);
688
                                        if(mp != NULL) {
689
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
690
                                                cl = cl->next;
691
                                                continue;
692
                                        }
693
                                }
694
                                janus_streaming_mountpoint *mp = NULL;
695
                                if((mp = janus_streaming_create_file_source(
696
                                                (id && id->value) ? atol(id->value) : 0,
697
                                                (char *)cat->name,
698
                                                desc ? (char *)desc->value : NULL,
699
                                                (char *)file->value,
700
                                                TRUE, doaudio, dovideo)) == NULL) {
701
                                        JANUS_LOG(LOG_ERR, "Error creating 'live' stream '%s'...\n", cat->name);
702
                                        cl = cl->next;
703
                                        continue;
704
                                }
705
                                mp->is_private = is_private;
706
                                if(secret && secret->value)
707
                                        mp->secret = g_strdup(secret->value);
708
                                if(pin && pin->value)
709
                                        mp->pin = g_strdup(pin->value);
710
                        } else if(!strcasecmp(type->value, "ondemand")) {
711
                                /* mu-Law file on demand source */
712
                                janus_config_item *id = janus_config_get_item(cat, "id");
713
                                janus_config_item *desc = janus_config_get_item(cat, "description");
714
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
715
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
716
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
717
                                janus_config_item *file = janus_config_get_item(cat, "filename");
718
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
719
                                janus_config_item *video = janus_config_get_item(cat, "video");
720
                                if(file == NULL || file->value == NULL) {
721
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', missing mandatory information...\n", cat->name);
722
                                        cl = cl->next;
723
                                        continue;
724
                                }
725
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
726
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
727
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
728
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
729
                                if(!doaudio || dovideo) {
730
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', we only support audio file streaming right now...\n", cat->name);
731
                                        cl = cl->next;
732
                                        continue;
733
                                }
734
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
735
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
736
                                        cl = cl->next;
737
                                        continue;
738
                                }
739
                                if(id == NULL || id->value == NULL) {
740
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
741
                                } else {
742
                                        janus_mutex_lock(&mountpoints_mutex);
743
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
744
                                        janus_mutex_unlock(&mountpoints_mutex);
745
                                        if(mp != NULL) {
746
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
747
                                                cl = cl->next;
748
                                                continue;
749
                                        }
750
                                }
751
                                janus_streaming_mountpoint *mp = NULL;
752
                                if((mp = janus_streaming_create_file_source(
753
                                                (id && id->value) ? atol(id->value) : 0,
754
                                                (char *)cat->name,
755
                                                desc ? (char *)desc->value : NULL,
756
                                                (char *)file->value,
757
                                                FALSE, doaudio, dovideo)) == NULL) {
758
                                        JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream '%s'...\n", cat->name);
759
                                        cl = cl->next;
760
                                        continue;
761
                                }
762
                                mp->is_private = is_private;
763
                                if(secret && secret->value)
764
                                        mp->secret = g_strdup(secret->value);
765
                                if(pin && pin->value)
766
                                        mp->pin = g_strdup(pin->value);
767
                        } else if(!strcasecmp(type->value, "rtsp")) {
768
#ifndef HAVE_LIBCURL
769
                                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', libcurl support not compiled...\n", cat->name);
770
                                cl = cl->next;
771
                                continue;
772
#else
773
                                janus_config_item *id = janus_config_get_item(cat, "id");
774
                                janus_config_item *desc = janus_config_get_item(cat, "description");
775
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
776
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
777
                                janus_config_item *pin = janus_config_get_item(cat, "pin");
778
                                janus_config_item *file = janus_config_get_item(cat, "url");
779
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
780
                                janus_config_item *video = janus_config_get_item(cat, "video");
781
                                if(file == NULL || file->value == NULL) {
782
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream '%s', missing mandatory information...\n", cat->name);
783
                                        cl = cl->next;
784
                                        continue;
785
                                }
786
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
787
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
788
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
789
                                if(id == NULL || id->value == NULL) {
790
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
791
                                } else {
792
                                        janus_mutex_lock(&mountpoints_mutex);
793
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
794
                                        janus_mutex_unlock(&mountpoints_mutex);
795
                                        if(mp != NULL) {
796
                                                JANUS_LOG(LOG_ERR, "A stream with the provided ID %s already exists, skipping '%s'\n", id->value, cat->name);
797
                                                cl = cl->next;
798
                                                continue;
799
                                        }
800
                                }
801
                                janus_streaming_mountpoint *mp = NULL;
802
                                if((mp = janus_streaming_create_rtsp_source(
803
                                                (id && id->value) ? atol(id->value) : 0,
804
                                                (char *)cat->name,
805
                                                desc ? (char *)desc->value : NULL,
806
                                                (char *)file->value, doaudio, dovideo)) == NULL) {
807
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream '%s'...\n", cat->name);
808
                                        cl = cl->next;
809
                                        continue;
810
                                }
811
                                mp->is_private = is_private;
812
                                if(secret && secret->value)
813
                                        mp->secret = g_strdup(secret->value);
814
                                if(pin && pin->value)
815
                                        mp->pin = g_strdup(pin->value);
816
#endif
817
                        } else {
818
                                JANUS_LOG(LOG_WARN, "Ignoring unknown stream type '%s' (%s)...\n", type->value, cat->name);
819
                        }
820
                        cl = cl->next;
821
                }
822
                /* Done: we keep the configuration file open in case we get a "create" or "destroy" with permanent=true */
823
        }
824
        /* Show available mountpoints */
825
        janus_mutex_lock(&mountpoints_mutex);
826
        GHashTableIter iter;
827
        gpointer value;
828
        g_hash_table_iter_init(&iter, mountpoints);
829
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
830
                janus_streaming_mountpoint *mp = value;
831
                JANUS_LOG(LOG_VERB, "  ::: [%"SCNu64"][%s] %s (%s, %s, %s, pin: %s)\n", mp->id, mp->name, mp->description,
832
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
833
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source",
834
                        mp->is_private ? "private" : "public",
835
                        mp->pin ? mp->pin : "no pin");
836
        }
837
        janus_mutex_unlock(&mountpoints_mutex);
838

    
839
        sessions = g_hash_table_new(NULL, NULL);
840
        janus_mutex_init(&sessions_mutex);
841
        messages = g_async_queue_new_full((GDestroyNotify) janus_streaming_message_free);
842
        /* This is the callback we'll need to invoke to contact the gateway */
843
        gateway = callback;
844
        g_atomic_int_set(&initialized, 1);
845

    
846
        GError *error = NULL;
847
        /* Start the sessions watchdog */
848
        watchdog = g_thread_try_new("streaming watchdog", &janus_streaming_watchdog, NULL, &error);
849
        if(!watchdog) {
850
                g_atomic_int_set(&initialized, 0);
851
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming watchdog thread...\n", error->code, error->message ? error->message : "??");
852
                janus_config_destroy(config);
853
                return -1;
854
        }
855
        /* Launch the thread that will handle incoming messages */
856
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
857
        if(error != NULL) {
858
                g_atomic_int_set(&initialized, 0);
859
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming handler thread...\n", error->code, error->message ? error->message : "??");
860
                janus_config_destroy(config);
861
                return -1;
862
        }
863
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_STREAMING_NAME);
864
        return 0;
865
}
866

    
867
void janus_streaming_destroy(void) {
868
        if(!g_atomic_int_get(&initialized))
869
                return;
870
        g_atomic_int_set(&stopping, 1);
871

    
872
        g_async_queue_push(messages, &exit_message);
873

    
874
        if(handler_thread != NULL) {
875
                g_thread_join(handler_thread);
876
                handler_thread = NULL;
877
        }
878

    
879
        /* Remove all mountpoints */
880
        janus_mutex_unlock(&mountpoints_mutex);
881
        GHashTableIter iter;
882
        gpointer value;
883
        g_hash_table_iter_init(&iter, mountpoints);
884
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
885
                janus_streaming_mountpoint *mp = value;
886
                if(!mp->destroyed) {
887
                        mp->destroyed = janus_get_monotonic_time();
888
                        old_mountpoints = g_list_append(old_mountpoints, mp);
889
                }
890
        }
891
        janus_mutex_unlock(&mountpoints_mutex);
892
        if(watchdog != NULL) {
893
                g_thread_join(watchdog);
894
                watchdog = NULL;
895
        }
896

    
897
        /* FIXME We should destroy the sessions cleanly */
898
        usleep(500000);
899
        janus_mutex_lock(&mountpoints_mutex);
900
        g_hash_table_destroy(mountpoints);
901
        janus_mutex_unlock(&mountpoints_mutex);
902
        janus_mutex_lock(&sessions_mutex);
903
        g_hash_table_destroy(sessions);
904
        janus_mutex_unlock(&sessions_mutex);
905
        g_async_queue_unref(messages);
906
        messages = NULL;
907
        sessions = NULL;
908

    
909
        janus_config_destroy(config);
910

    
911
        g_atomic_int_set(&initialized, 0);
912
        g_atomic_int_set(&stopping, 0);
913
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_STREAMING_NAME);
914
}
915

    
916
int janus_streaming_get_api_compatibility(void) {
917
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
918
        return JANUS_PLUGIN_API_VERSION;
919
}
920

    
921
int janus_streaming_get_version(void) {
922
        return JANUS_STREAMING_VERSION;
923
}
924

    
925
const char *janus_streaming_get_version_string(void) {
926
        return JANUS_STREAMING_VERSION_STRING;
927
}
928

    
929
const char *janus_streaming_get_description(void) {
930
        return JANUS_STREAMING_DESCRIPTION;
931
}
932

    
933
const char *janus_streaming_get_name(void) {
934
        return JANUS_STREAMING_NAME;
935
}
936

    
937
const char *janus_streaming_get_author(void) {
938
        return JANUS_STREAMING_AUTHOR;
939
}
940

    
941
const char *janus_streaming_get_package(void) {
942
        return JANUS_STREAMING_PACKAGE;
943
}
944

    
945
void janus_streaming_create_session(janus_plugin_session *handle, int *error) {
946
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
947
                *error = -1;
948
                return;
949
        }        
950
        janus_streaming_session *session = (janus_streaming_session *)g_malloc0(sizeof(janus_streaming_session));
951
        if(session == NULL) {
952
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
953
                *error = -2;
954
                return;
955
        }
956
        session->handle = handle;
957
        session->mountpoint = NULL;        /* This will happen later */
958
        session->started = FALSE;        /* This will happen later */
959
        session->paused = FALSE;
960
        session->destroyed = 0;
961
        g_atomic_int_set(&session->hangingup, 0);
962
        handle->plugin_handle = session;
963
        janus_mutex_lock(&sessions_mutex);
964
        g_hash_table_insert(sessions, handle, session);
965
        janus_mutex_unlock(&sessions_mutex);
966

    
967
        return;
968
}
969

    
970
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error) {
971
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
972
                *error = -1;
973
                return;
974
        }        
975
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
976
        if(!session) {
977
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
978
                *error = -2;
979
                return;
980
        }
981
        JANUS_LOG(LOG_VERB, "Removing streaming session...\n");
982
        if(session->mountpoint) {
983
                janus_mutex_lock(&session->mountpoint->mutex);
984
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
985
                janus_mutex_unlock(&session->mountpoint->mutex);
986
        }
987
        janus_mutex_lock(&sessions_mutex);
988
        if(!session->destroyed) {
989
                session->destroyed = janus_get_monotonic_time();
990
                g_hash_table_remove(sessions, handle);
991
                /* Cleaning up and removing the session is done in a lazy way */
992
                old_sessions = g_list_append(old_sessions, session);
993
        }
994
        janus_mutex_unlock(&sessions_mutex);
995
        return;
996
}
997

    
998
char *janus_streaming_query_session(janus_plugin_session *handle) {
999
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
1000
                return NULL;
1001
        }        
1002
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
1003
        if(!session) {
1004
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1005
                return NULL;
1006
        }
1007
        /* What is this user watching, if anything? */
1008
        json_t *info = json_object();
1009
        json_object_set_new(info, "state", json_string(session->mountpoint ? "watching" : "idle"));
1010
        if(session->mountpoint) {
1011
                json_object_set_new(info, "mountpoint_id", json_integer(session->mountpoint->id));
1012
                json_object_set_new(info, "mountpoint_name", session->mountpoint->name ? json_string(session->mountpoint->name) : NULL);
1013
        }
1014
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
1015
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1016
        json_decref(info);
1017
        return info_text;
1018
}
1019

    
1020
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
1021
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1022
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
1023

    
1024
        /* Pre-parse the message */
1025
        int error_code = 0;
1026
        char error_cause[512];
1027
        json_t *root = NULL;
1028
        json_t *response = NULL;
1029

    
1030
        if(message == NULL) {
1031
                JANUS_LOG(LOG_ERR, "No message??\n");
1032
                error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1033
                g_snprintf(error_cause, 512, "%s", "No message??");
1034
                goto error;
1035
        }
1036
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
1037

    
1038
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1039
        if(!session) {
1040
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1041
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1042
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
1043
                goto error;
1044
        }
1045
        if(session->destroyed) {
1046
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
1047
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1048
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
1049
                goto error;
1050
        }
1051
        json_error_t error;
1052
        root = json_loads(message, 0, &error);
1053
        if(!root) {
1054
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
1055
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
1056
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
1057
                goto error;
1058
        }
1059
        if(!json_is_object(root)) {
1060
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
1061
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
1062
                g_snprintf(error_cause, 512, "JSON error: not an object");
1063
                goto error;
1064
        }
1065
        /* Get the request first */
1066
        JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,
1067
                error_code, error_cause, TRUE,
1068
                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1069
        if(error_code != 0)
1070
                goto error;
1071
        json_t *request = json_object_get(root, "request");
1072
        /* Some requests ('create' and 'destroy') can be handled synchronously */
1073
        const char *request_text = json_string_value(request);
1074
        if(!strcasecmp(request_text, "list")) {
1075
                json_t *list = json_array();
1076
                JANUS_LOG(LOG_VERB, "Request for the list of mountpoints\n");
1077
                /* Return a list of all available mountpoints */
1078
                janus_mutex_lock(&mountpoints_mutex);
1079
                GHashTableIter iter;
1080
                gpointer value;
1081
                g_hash_table_iter_init(&iter, mountpoints);
1082
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
1083
                        janus_streaming_mountpoint *mp = value;
1084
                        if(mp->is_private) {
1085
                                /* Skip private stream */
1086
                                JANUS_LOG(LOG_VERB, "Skipping private mountpoint '%s'\n", mp->description);
1087
                                continue;
1088
                        }
1089
                        json_t *ml = json_object();
1090
                        json_object_set_new(ml, "id", json_integer(mp->id));
1091
                        json_object_set_new(ml, "description", json_string(mp->description));
1092
                        json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1093
                        if(mp->streaming_source == janus_streaming_source_rtp) {
1094
                                janus_streaming_rtp_source *source = mp->source;
1095
                                gint64 now = janus_get_monotonic_time();
1096
                                if(source->audio_fd != -1)
1097
                                        json_object_set_new(ml, "audio_age_ms", json_integer((now - source->last_received_audio) / 1000));
1098
                                if(source->video_fd != -1)
1099
                                        json_object_set_new(ml, "video_age_ms", json_integer((now - source->last_received_video) / 1000));
1100
                        }
1101
                        json_array_append_new(list, ml);
1102
                }
1103
                janus_mutex_unlock(&mountpoints_mutex);
1104
                /* Send info back */
1105
                response = json_object();
1106
                json_object_set_new(response, "streaming", json_string("list"));
1107
                json_object_set_new(response, "list", list);
1108
                goto plugin_response;
1109
        } else if(!strcasecmp(request_text, "info")) {
1110
                JANUS_LOG(LOG_VERB, "Request info on a specific mountpoint\n");
1111
                /* Return info on a specific mountpoint */
1112
                JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
1113
                        error_code, error_cause, TRUE,
1114
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1115
                if(error_code != 0)
1116
                        goto error;
1117
                json_t *id = json_object_get(root, "id");
1118
                gint64 id_value = json_integer_value(id);
1119
                janus_mutex_lock(&mountpoints_mutex);
1120
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1121
                if(mp == NULL) {
1122
                        janus_mutex_unlock(&mountpoints_mutex);
1123
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1124
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1125
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1126
                        goto error;
1127
                }
1128
                json_t *ml = json_object();
1129
                json_object_set_new(ml, "id", json_integer(mp->id));
1130
                json_object_set_new(ml, "description", json_string(mp->description));
1131
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1132
                if(mp->streaming_source == janus_streaming_source_rtp) {
1133
                        janus_streaming_rtp_source *source = mp->source;
1134
                        gint64 now = janus_get_monotonic_time();
1135
                        if(source->audio_fd != -1)
1136
                                json_object_set_new(ml, "audio_age_ms", json_integer((now - source->last_received_audio) / 1000));
1137
                        if(source->video_fd != -1)
1138
                                json_object_set_new(ml, "video_age_ms", json_integer((now - source->last_received_video) / 1000));
1139
                }
1140
                janus_mutex_unlock(&mountpoints_mutex);
1141
                /* Send info back */
1142
                response = json_object();
1143
                json_object_set_new(response, "streaming", json_string("info"));
1144
                json_object_set_new(response, "info", ml);
1145
                goto plugin_response;
1146
        } else if(!strcasecmp(request_text, "create")) {
1147
                /* Create a new stream */
1148
                JANUS_VALIDATE_JSON_OBJECT(root, create_parameters,
1149
                        error_code, error_cause, TRUE,
1150
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1151
                if(error_code != 0)
1152
                        goto error;
1153
                json_t *type = json_object_get(root, "type");
1154
                const char *type_text = json_string_value(type);
1155
                json_t *secret = json_object_get(root, "secret");
1156
                json_t *pin = json_object_get(root, "pin");
1157
                json_t *permanent = json_object_get(root, "permanent");
1158
                gboolean save = permanent ? json_is_true(permanent) : FALSE;
1159
                if(save && config == NULL) {
1160
                        JANUS_LOG(LOG_ERR, "No configuration file, can't create permanent mountpoint\n");
1161
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1162
                        g_snprintf(error_cause, 512, "No configuration file, can't create permanent mountpoint");
1163
                        goto error;
1164
                }
1165
                janus_streaming_mountpoint *mp = NULL;
1166
                if(!strcasecmp(type_text, "rtp")) {
1167
                        /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
1168
                        JANUS_VALIDATE_JSON_OBJECT(root, rtp_parameters,
1169
                                error_code, error_cause, TRUE,
1170
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1171
                        if(error_code != 0)
1172
                                goto error;
1173
                        json_t *id = json_object_get(root, "id");
1174
                        json_t *name = json_object_get(root, "name");
1175
                        json_t *desc = json_object_get(root, "description");
1176
                        json_t *is_private = json_object_get(root, "is_private");
1177
                        json_t *audio = json_object_get(root, "audio");
1178
                        json_t *video = json_object_get(root, "video");
1179
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1180
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1181
                        if(!doaudio && !dovideo) {
1182
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1183
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1184
                                g_snprintf(error_cause, 512, "Can't add 'rtp' stream, no audio or video have to be streamed...");
1185
                                goto error;
1186
                        }
1187
                        uint16_t aport = 0;
1188
                        uint8_t acodec = 0;
1189
                        char *artpmap = NULL, *afmtp = NULL, *amcast = NULL;
1190
                        if(doaudio) {
1191
                                JANUS_VALIDATE_JSON_OBJECT(root, rtp_audio_parameters,
1192
                                        error_code, error_cause, TRUE,
1193
                                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1194
                                if(error_code != 0)
1195
                                        goto error;
1196
                                json_t *audiomcast = json_object_get(root, "audiomcast");
1197
                                amcast = (char *)json_string_value(audiomcast);
1198
                                json_t *audioport = json_object_get(root, "audioport");
1199
                                aport = json_integer_value(audioport);
1200
                                json_t *audiopt = json_object_get(root, "audiopt");
1201
                                acodec = json_integer_value(audiopt);
1202
                                json_t *audiortpmap = json_object_get(root, "audiortpmap");
1203
                                artpmap = (char *)json_string_value(audiortpmap);
1204
                                json_t *audiofmtp = json_object_get(root, "audiofmtp");
1205
                                afmtp = (char *)json_string_value(audiofmtp);
1206
                        }
1207
                        uint16_t vport = 0;
1208
                        uint8_t vcodec = 0;
1209
                        char *vrtpmap = NULL, *vfmtp = NULL, *vmcast = NULL;
1210
                        gboolean bufferkf = FALSE;
1211
                        if(dovideo) {
1212
                                JANUS_VALIDATE_JSON_OBJECT(root, rtp_video_parameters,
1213
                                        error_code, error_cause, TRUE,
1214
                                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1215
                                if(error_code != 0)
1216
                                        goto error;
1217
                                json_t *videomcast = json_object_get(root, "videomcast");
1218
                                vmcast = (char *)json_string_value(videomcast);
1219
                                json_t *videoport = json_object_get(root, "videoport");
1220
                                vport = json_integer_value(videoport);
1221
                                json_t *videopt = json_object_get(root, "videopt");
1222
                                vcodec = json_integer_value(videopt);
1223
                                json_t *videortpmap = json_object_get(root, "videortpmap");
1224
                                vrtpmap = (char *)json_string_value(videortpmap);
1225
                                json_t *videofmtp = json_object_get(root, "videofmtp");
1226
                                vfmtp = (char *)json_string_value(videofmtp);
1227
                                json_t *vkf = json_object_get(root, "videobufferkf");
1228
                                bufferkf = vkf ? json_is_true(vkf) : FALSE;
1229
                        }
1230
                        if(id == NULL) {
1231
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1232
                        } else {
1233
                                janus_mutex_lock(&mountpoints_mutex);
1234
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1235
                                janus_mutex_unlock(&mountpoints_mutex);
1236
                                if(mp != NULL) {
1237
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1238
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1239
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1240
                                        goto error;
1241
                                }
1242
                        }
1243
                        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
1244
                        mp = janus_streaming_create_rtp_source(
1245
                                        id ? json_integer_value(id) : 0,
1246
                                        name ? (char *)json_string_value(name) : NULL,
1247
                                        desc ? (char *)json_string_value(desc) : NULL,
1248
                                        doaudio, amcast, aport, acodec, artpmap, afmtp,
1249
                                        dovideo, vmcast, vport, vcodec, vrtpmap, vfmtp, bufferkf);
1250
                        if(mp == NULL) {
1251
                                JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream...\n");
1252
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1253
                                g_snprintf(error_cause, 512, "Error creating 'rtp' stream");
1254
                                goto error;
1255
                        }
1256
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1257
                } else if(!strcasecmp(type_text, "live")) {
1258
                        /* File live source */
1259
                        JANUS_VALIDATE_JSON_OBJECT(root, live_parameters,
1260
                                error_code, error_cause, TRUE,
1261
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1262
                        if(error_code != 0)
1263
                                goto error;
1264
                        json_t *id = json_object_get(root, "id");
1265
                        json_t *name = json_object_get(root, "name");
1266
                        json_t *desc = json_object_get(root, "description");
1267
                        json_t *is_private = json_object_get(root, "is_private");
1268
                        json_t *file = json_object_get(root, "file");
1269
                        json_t *audio = json_object_get(root, "audio");
1270
                        json_t *video = json_object_get(root, "video");
1271
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1272
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1273
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1274
                        if(!doaudio || dovideo) {
1275
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n");
1276
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1277
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, we only support audio file streaming right now...");
1278
                                goto error;
1279
                        }
1280
                        char *filename = (char *)json_string_value(file);
1281
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1282
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1283
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1284
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1285
                                goto error;
1286
                        }
1287
                        if(id == NULL) {
1288
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1289
                        } else {
1290
                                janus_mutex_lock(&mountpoints_mutex);
1291
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1292
                                janus_mutex_unlock(&mountpoints_mutex);
1293
                                if(mp != NULL) {
1294
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1295
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1296
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1297
                                        goto error;
1298
                                }
1299
                        }
1300
                        mp = janus_streaming_create_file_source(
1301
                                        id ? json_integer_value(id) : 0,
1302
                                        name ? (char *)json_string_value(name) : NULL,
1303
                                        desc ? (char *)json_string_value(desc) : NULL,
1304
                                        filename,
1305
                                        TRUE, doaudio, dovideo);
1306
                        if(mp == NULL) {
1307
                                JANUS_LOG(LOG_ERR, "Error creating 'live' stream...\n");
1308
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1309
                                g_snprintf(error_cause, 512, "Error creating 'live' stream");
1310
                                goto error;
1311
                        }
1312
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1313
                } else if(!strcasecmp(type_text, "ondemand")) {
1314
                        /* mu-Law file on demand source */
1315
                        JANUS_VALIDATE_JSON_OBJECT(root, ondemand_parameters,
1316
                                error_code, error_cause, TRUE,
1317
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1318
                        if(error_code != 0)
1319
                                goto error;
1320
                        json_t *id = json_object_get(root, "id");
1321
                        json_t *name = json_object_get(root, "name");
1322
                        json_t *desc = json_object_get(root, "description");
1323
                        json_t *is_private = json_object_get(root, "is_private");
1324
                        json_t *file = json_object_get(root, "file");
1325
                        json_t *audio = json_object_get(root, "audio");
1326
                        json_t *video = json_object_get(root, "video");
1327
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1328
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1329
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1330
                        if(!doaudio || dovideo) {
1331
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
1332
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1333
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, we only support audio file streaming right now...");
1334
                                goto error;
1335
                        }
1336
                        char *filename = (char *)json_string_value(file);
1337
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1338
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1339
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1340
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1341
                                goto error;
1342
                        }
1343
                        if(id == NULL) {
1344
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1345
                        } else {
1346
                                janus_mutex_lock(&mountpoints_mutex);
1347
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1348
                                janus_mutex_unlock(&mountpoints_mutex);
1349
                                if(mp != NULL) {
1350
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1351
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1352
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1353
                                        goto error;
1354
                                }
1355
                        }
1356
                        mp = janus_streaming_create_file_source(
1357
                                        id ? json_integer_value(id) : 0,
1358
                                        name ? (char *)json_string_value(name) : NULL,
1359
                                        desc ? (char *)json_string_value(desc) : NULL,
1360
                                        filename,
1361
                                        FALSE, doaudio, dovideo);
1362
                        if(mp == NULL) {
1363
                                JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream...\n");
1364
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1365
                                g_snprintf(error_cause, 512, "Error creating 'ondemand' stream");
1366
                                goto error;
1367
                        }
1368
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1369
                } else if(!strcasecmp(type_text, "rtsp")) {
1370
#ifndef HAVE_LIBCURL
1371
                        JANUS_LOG(LOG_ERR, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1372
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1373
                        g_snprintf(error_cause, 512, "Can't create 'rtsp' mountpoint, libcurl support not compiled...\n");
1374
                        goto error;
1375
#else
1376
                        JANUS_VALIDATE_JSON_OBJECT(root, rtsp_parameters,
1377
                                error_code, error_cause, TRUE,
1378
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1379
                        if(error_code != 0)
1380
                                goto error;
1381
                        /* RTSP source*/
1382
                        json_t *id = json_object_get(root, "id");
1383
                        json_t *name = json_object_get(root, "name");
1384
                        json_t *desc = json_object_get(root, "description");
1385
                        json_t *is_private = json_object_get(root, "is_private");
1386
                        json_t *audio = json_object_get(root, "audio");
1387
                        json_t *video = json_object_get(root, "video");
1388
                        json_t *url = json_object_get(root, "url");
1389
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1390
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1391
                        if(!doaudio && !dovideo) {
1392
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
1393
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1394
                                g_snprintf(error_cause, 512, "Can't add 'rtsp' stream, no audio or video have to be streamed...");
1395
                                goto error;
1396
                        }
1397
                        mp = janus_streaming_create_rtsp_source(
1398
                                        id ? json_integer_value(id) : 0,
1399
                                        name ? (char *)json_string_value(name) : NULL,
1400
                                        desc ? (char *)json_string_value(desc) : NULL,
1401
                                        (char *)json_string_value(url),
1402
                                        doaudio, dovideo);
1403
                        if(mp == NULL) {
1404
                                JANUS_LOG(LOG_ERR, "Error creating 'rtsp' stream...\n");
1405
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1406
                                g_snprintf(error_cause, 512, "Error creating 'RTSP' stream");
1407
                                goto error;
1408
                        }
1409
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;                        
1410
#endif
1411
                } else {
1412
                        JANUS_LOG(LOG_ERR, "Unknown stream type '%s'...\n", type_text);
1413
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1414
                        g_snprintf(error_cause, 512, "Unknown stream type '%s'...\n", type_text);
1415
                        goto error;
1416
                }
1417
                /* Any secret? */
1418
                if(secret)
1419
                        mp->secret = g_strdup(json_string_value(secret));
1420
                /* Any PIN? */
1421
                if(pin)
1422
                        mp->pin = g_strdup(json_string_value(pin));
1423
                if(save) {
1424
                        /* This mountpoint is permanent: save to the configuration file too
1425
                         * FIXME: We should check if anything fails... */
1426
                        JANUS_LOG(LOG_VERB, "Saving mountpoint %"SCNu64" permanently in config file\n", mp->id);
1427
                        janus_mutex_lock(&config_mutex);
1428
                        char value[BUFSIZ];
1429
                        /* The category to add is the mountpoint name */
1430
                        janus_config_add_category(config, mp->name);
1431
                        /* Now for the common values */
1432
                        janus_config_add_item(config, mp->name, "type", type_text);
1433
                        g_snprintf(value, BUFSIZ, "%"SCNu64, mp->id);
1434
                        janus_config_add_item(config, mp->name, "id", value);
1435
                        janus_config_add_item(config, mp->name, "description", mp->description);
1436
                        if(mp->is_private)
1437
                                janus_config_add_item(config, mp->name, "is_private", "yes");
1438
                        /* Per type values */
1439
                        if(!strcasecmp(type_text, "rtp")) {
1440
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt >= 0 ? "yes" : "no");
1441
                                janus_streaming_rtp_source *source = mp->source;
1442
                                if(mp->codecs.audio_pt >= 0) {
1443
                                        g_snprintf(value, BUFSIZ, "%d", source->audio_port);
1444
                                        janus_config_add_item(config, mp->name, "audioport", value);
1445
                                        json_t *audiomcast = json_object_get(root, "audiomcast");
1446
                                        if(audiomcast)
1447
                                                janus_config_add_item(config, mp->name, "audiomcast", json_string_value(json_object_get(root, "audiomcast")));
1448
                                        g_snprintf(value, BUFSIZ, "%d", mp->codecs.audio_pt);
1449
                                        janus_config_add_item(config, mp->name, "audiopt", value);
1450
                                        janus_config_add_item(config, mp->name, "audiortpmap", mp->codecs.audio_rtpmap);
1451
                                        if(mp->codecs.audio_fmtp)
1452
                                                janus_config_add_item(config, mp->name, "audiofmtp", mp->codecs.audio_fmtp);
1453
                                }
1454
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt >= 0? "yes" : "no");
1455
                                if(mp->codecs.video_pt >= 0) {
1456
                                        g_snprintf(value, BUFSIZ, "%d", source->video_port);
1457
                                        janus_config_add_item(config, mp->name, "videoport", value);
1458
                                        json_t *videomcast = json_object_get(root, "videomcast");
1459
                                        if(videomcast)
1460
                                                janus_config_add_item(config, mp->name, "videomcast", json_string_value(json_object_get(root, "videomcast")));
1461
                                        g_snprintf(value, BUFSIZ, "%d", mp->codecs.video_pt);
1462
                                        janus_config_add_item(config, mp->name, "videopt", value);
1463
                                        janus_config_add_item(config, mp->name, "videortpmap", mp->codecs.video_rtpmap);
1464
                                        if(mp->codecs.video_fmtp)
1465
                                                janus_config_add_item(config, mp->name, "videofmtp", mp->codecs.video_fmtp);
1466
                                        if(source->keyframe.enabled)
1467
                                                janus_config_add_item(config, mp->name, "videobufferkf", "yes");
1468
                                }
1469
                        } else if(!strcasecmp(type_text, "live") || !strcasecmp(type_text, "ondemand")) {
1470
                                janus_streaming_file_source *source = mp->source;
1471
                                janus_config_add_item(config, mp->name, "filename", source->filename);
1472
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt ? "yes" : "no");
1473
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt ? "yes" : "no");
1474
                        } else if(!strcasecmp(type_text, "rtsp")) {
1475
                                janus_config_add_item(config, mp->name, "url", json_string_value(json_object_get(root, "url")));
1476
                                janus_config_add_item(config, mp->name, "audio", mp->codecs.audio_pt ? "yes" : "no");
1477
                                janus_config_add_item(config, mp->name, "video", mp->codecs.video_pt ? "yes" : "no");
1478
                        }
1479
                        /* Some more common values */
1480
                        if(mp->secret)
1481
                                janus_config_add_item(config, mp->name, "secret", mp->secret);
1482
                        if(mp->pin)
1483
                                janus_config_add_item(config, mp->name, "pin", mp->pin);
1484
                        /* Save modified configuration */
1485
                        janus_config_save(config, config_folder, JANUS_STREAMING_PACKAGE);
1486
                        janus_mutex_unlock(&config_mutex);
1487
                }
1488
                /* Send info back */
1489
                response = json_object();
1490
                json_object_set_new(response, "streaming", json_string("created"));
1491
                json_object_set_new(response, "created", json_string(mp->name));
1492
                json_t *ml = json_object();
1493
                json_object_set_new(ml, "id", json_integer(mp->id));
1494
                json_object_set_new(ml, "description", json_string(mp->description));
1495
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1496
                json_object_set_new(ml, "is_private", json_string(mp->is_private ? "true" : "false"));
1497
                json_object_set_new(response, "stream", ml);
1498
                goto plugin_response;
1499
        } else if(!strcasecmp(request_text, "destroy")) {
1500
                /* Get rid of an existing stream (notice this doesn't remove it from the config file, though) */
1501
                JANUS_VALIDATE_JSON_OBJECT(root, destroy_parameters,
1502
                        error_code, error_cause, TRUE,
1503
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1504
                if(error_code != 0)
1505
                        goto error;
1506
                json_t *id = json_object_get(root, "id");
1507
                json_t *permanent = json_object_get(root, "permanent");
1508
                gboolean save = permanent ? json_is_true(permanent) : FALSE;
1509
                if(save && config == NULL) {
1510
                        JANUS_LOG(LOG_ERR, "No configuration file, can't destroy mountpoint permanently\n");
1511
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1512
                        g_snprintf(error_cause, 512, "No configuration file, can't destroy mountpoint permanently");
1513
                        goto error;
1514
                }
1515
                gint64 id_value = json_integer_value(id);
1516
                janus_mutex_lock(&mountpoints_mutex);
1517
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1518
                if(mp == NULL) {
1519
                        janus_mutex_unlock(&mountpoints_mutex);
1520
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1521
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1522
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1523
                        goto error;
1524
                }
1525
                /* A secret may be required for this action */
1526
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1527
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1528
                if(error_code != 0) {
1529
                        janus_mutex_unlock(&mountpoints_mutex);
1530
                        goto error;
1531
                }
1532
                JANUS_LOG(LOG_VERB, "Request to unmount mountpoint/stream %"SCNu64"\n", id_value);
1533
                /* FIXME Should we kick the current viewers as well? */
1534
                janus_mutex_lock(&mp->mutex);
1535
                GList *viewer = g_list_first(mp->listeners);
1536
                /* Prepare JSON event */
1537
                json_t *event = json_object();
1538
                json_object_set_new(event, "streaming", json_string("event"));
1539
                json_t *result = json_object();
1540
                json_object_set_new(result, "status", json_string("stopped"));
1541
                json_object_set_new(event, "result", result);
1542
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1543
                json_decref(event);
1544
                while(viewer) {
1545
                        janus_streaming_session *session = (janus_streaming_session *)viewer->data;
1546
                        if(session != NULL) {
1547
                                session->stopping = TRUE;
1548
                                session->started = FALSE;
1549
                                session->paused = FALSE;
1550
                                session->mountpoint = NULL;
1551
                                /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1552
                                gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1553
                                gateway->close_pc(session->handle);
1554
                        }
1555
                        mp->listeners = g_list_remove_all(mp->listeners, session);
1556
                        viewer = g_list_first(mp->listeners);
1557
                }
1558
                g_free(event_text);
1559
                janus_mutex_unlock(&mp->mutex);
1560
                if(save) {
1561
                        /* This change is permanent: save to the configuration file too
1562
                         * FIXME: We should check if anything fails... */
1563
                        JANUS_LOG(LOG_VERB, "Destroying mountpoint %"SCNu64" (%s) permanently in config file\n", mp->id, mp->name);
1564
                        janus_mutex_lock(&config_mutex);
1565
                        /* The category to remove is the mountpoint name */
1566
                        janus_config_remove_category(config, mp->name);
1567
                        /* Save modified configuration */
1568
                        janus_config_save(config, config_folder, JANUS_STREAMING_PACKAGE);
1569
                        janus_mutex_unlock(&config_mutex);
1570
                }
1571
                /* Remove mountpoint from the hashtable: this will get it destroyed */
1572
                if(!mp->destroyed) {
1573
                        mp->destroyed = janus_get_monotonic_time();
1574
                        g_hash_table_remove(mountpoints, GINT_TO_POINTER(id_value));
1575
                        /* Cleaning up and removing the mountpoint is done in a lazy way */
1576
                        old_mountpoints = g_list_append(old_mountpoints, mp);
1577
                }
1578
                janus_mutex_unlock(&mountpoints_mutex);
1579
                /* Send info back */
1580
                response = json_object();
1581
                json_object_set_new(response, "streaming", json_string("destroyed"));
1582
                json_object_set_new(response, "destroyed", json_integer(id_value));
1583
                goto plugin_response;
1584
        } else if(!strcasecmp(request_text, "recording")) {
1585
                /* We can start/stop recording a live, RTP-based stream */
1586
                JANUS_VALIDATE_JSON_OBJECT(root, recording_parameters,
1587
                        error_code, error_cause, TRUE,
1588
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1589
                if(error_code != 0)
1590
                        goto error;
1591
                json_t *action = json_object_get(root, "action");
1592
                const char *action_text = json_string_value(action);
1593
                if(strcasecmp(action_text, "start") && strcasecmp(action_text, "stop")) {
1594
                        JANUS_LOG(LOG_ERR, "Invalid action (should be start|stop)\n");
1595
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1596
                        g_snprintf(error_cause, 512, "Invalid action (should be start|stop)");
1597
                        goto error;
1598
                }
1599
                json_t *id = json_object_get(root, "id");
1600
                gint64 id_value = json_integer_value(id);
1601
                janus_mutex_lock(&mountpoints_mutex);
1602
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1603
                if(mp == NULL) {
1604
                        janus_mutex_unlock(&mountpoints_mutex);
1605
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1606
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1607
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1608
                        goto error;
1609
                }
1610
                if(mp->streaming_type != janus_streaming_type_live || mp->streaming_source != janus_streaming_source_rtp) {
1611
                        janus_mutex_unlock(&mountpoints_mutex);
1612
                        JANUS_LOG(LOG_ERR, "Recording is only available on RTP-based live streams\n");
1613
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1614
                        g_snprintf(error_cause, 512, "Recording is only available on RTP-based live streams");
1615
                        goto error;
1616
                }
1617
                /* A secret may be required for this action */
1618
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1619
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1620
                if(error_code != 0) {
1621
                        janus_mutex_unlock(&mountpoints_mutex);
1622
                        goto error;
1623
                }
1624
                janus_streaming_rtp_source *source = mp->source;
1625
                if(!strcasecmp(action_text, "start")) {
1626
                        /* Start a recording for audio and/or video */
1627
                        JANUS_VALIDATE_JSON_OBJECT(root, recording_start_parameters,
1628
                                error_code, error_cause, TRUE,
1629
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1630
                        if(error_code != 0) {
1631
                                janus_mutex_unlock(&mountpoints_mutex);
1632
                                goto error;
1633
                        }
1634
                        json_t *audio = json_object_get(root, "audio");
1635
                        json_t *video = json_object_get(root, "video");
1636
                        if((audio && source->arc) || (video && source->vrc)) {
1637
                                janus_mutex_unlock(&mountpoints_mutex);
1638
                                JANUS_LOG(LOG_ERR, "Recording for audio and/or video already started for this stream\n");
1639
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1640
                                g_snprintf(error_cause, 512, "Recording for audio and/or video already started for this stream");
1641
                                goto error;
1642
                        }
1643
                        if(!audio && !video) {
1644
                                janus_mutex_unlock(&mountpoints_mutex);
1645
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1646
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1647
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1648
                                goto error;
1649
                        }
1650
                        if(audio) {
1651
                                const char *codec = NULL;
1652
                                if(strstr(mp->codecs.audio_rtpmap, "opus") || strstr(mp->codecs.audio_rtpmap, "OPUS"))
1653
                                        codec = "opus";
1654
                                else if(strstr(mp->codecs.audio_rtpmap, "pcm") || strstr(mp->codecs.audio_rtpmap, "PCM"))
1655
                                        codec = "g711";
1656
                                const char *audiofile = json_string_value(audio);
1657
                                source->arc = janus_recorder_create(NULL, codec, (char *)audiofile);
1658
                                if(source->arc == NULL) {
1659
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for audio\n");
1660
                                } else {
1661
                                        JANUS_LOG(LOG_INFO, "[%s] Audio recording started\n", mp->name);
1662
                                }
1663
                        }
1664
                        if(video) {
1665
                                const char *codec = NULL;
1666
                                if(strstr(mp->codecs.video_rtpmap, "vp8") || strstr(mp->codecs.video_rtpmap, "VP8"))
1667
                                        codec = "vp8";
1668
                                else if(strstr(mp->codecs.video_rtpmap, "vp9") || strstr(mp->codecs.video_rtpmap, "VP9"))
1669
                                        codec = "vp9";
1670
                                else if(strstr(mp->codecs.video_rtpmap, "h264") || strstr(mp->codecs.video_rtpmap, "H264"))
1671
                                        codec = "h264";
1672
                                const char *videofile = json_string_value(video);
1673
                                source->vrc = janus_recorder_create(NULL, codec, (char *)videofile);
1674
                                if(source->vrc == NULL) {
1675
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for video\n");
1676
                                } else {
1677
                                        JANUS_LOG(LOG_INFO, "[%s] Video recording started\n", mp->name);
1678
                                }
1679
                        }
1680
                        janus_mutex_unlock(&mountpoints_mutex);
1681
                        /* Send a success response back */
1682
                        response = json_object();
1683
                        json_object_set_new(response, "streaming", json_string("ok"));
1684
                        goto plugin_response;
1685
                } else if(!strcasecmp(action_text, "stop")) {
1686
                        /* Stop the recording */
1687
                        JANUS_VALIDATE_JSON_OBJECT(root, recording_stop_parameters,
1688
                                error_code, error_cause, TRUE,
1689
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1690
                        if(error_code != 0) {
1691
                                janus_mutex_unlock(&mountpoints_mutex);
1692
                                goto error;
1693
                        }
1694
                        json_t *audio = json_object_get(root, "audio");
1695
                        json_t *video = json_object_get(root, "video");
1696
                        if(!audio && !video) {
1697
                                janus_mutex_unlock(&mountpoints_mutex);
1698
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1699
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1700
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1701
                                goto error;
1702
                        }
1703
                        janus_mutex_lock(&source->rec_mutex);
1704
                        if(audio && json_is_true(audio) && source->arc) {
1705
                                /* Close the audio recording */
1706
                                janus_recorder_close(source->arc);
1707
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1708
                                janus_recorder *tmp = source->arc;
1709
                                source->arc = NULL;
1710
                                janus_recorder_free(tmp);
1711
                        }
1712
                        if(video && json_is_true(video) && source->vrc) {
1713
                                /* Close the video recording */
1714
                                janus_recorder_close(source->vrc);
1715
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1716
                                janus_recorder *tmp = source->vrc;
1717
                                source->vrc = NULL;
1718
                                janus_recorder_free(tmp);
1719
                        }
1720
                        janus_mutex_unlock(&source->rec_mutex);
1721
                        janus_mutex_unlock(&mountpoints_mutex);
1722
                        /* Send a success response back */
1723
                        response = json_object();
1724
                        json_object_set_new(response, "streaming", json_string("ok"));
1725
                        goto plugin_response;
1726
                }
1727
        } else if(!strcasecmp(request_text, "enable") || !strcasecmp(request_text, "disable")) {
1728
                /* A request to enable/disable a mountpoint */
1729
                JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
1730
                        error_code, error_cause, TRUE,
1731
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
1732
                if(error_code != 0)
1733
                        goto error;
1734
                json_t *id = json_object_get(root, "id");
1735
                gint64 id_value = json_integer_value(id);
1736
                janus_mutex_lock(&mountpoints_mutex);
1737
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1738
                if(mp == NULL) {
1739
                        janus_mutex_unlock(&mountpoints_mutex);
1740
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1741
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1742
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1743
                        goto error;
1744
                }
1745
                /* A secret may be required for this action */
1746
                JANUS_CHECK_SECRET(mp->secret, root, "secret", error_code, error_cause,
1747
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
1748
                if(error_code != 0) {
1749
                        janus_mutex_unlock(&mountpoints_mutex);
1750
                        goto error;
1751
                }
1752
                if(!strcasecmp(request_text, "enable")) {
1753
                        /* Enable a previously disabled mountpoint */
1754
                        JANUS_LOG(LOG_INFO, "[%s] Stream enabled\n", mp->name);
1755
                        mp->enabled = TRUE;
1756
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1757
                } else {
1758
                        /* Disable a previously enabled mountpoint */
1759
                        JANUS_LOG(LOG_INFO, "[%s] Stream disabled\n", mp->name);
1760
                        mp->enabled = FALSE;
1761
                        /* Any recording to close? */
1762
                        janus_streaming_rtp_source *source = mp->source;
1763
                        janus_mutex_lock(&source->rec_mutex);
1764
                        if(source->arc) {
1765
                                janus_recorder_close(source->arc);
1766
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1767
                                janus_recorder *tmp = source->arc;
1768
                                source->arc = NULL;
1769
                                janus_recorder_free(tmp);
1770
                        }
1771
                        if(source->vrc) {
1772
                                janus_recorder_close(source->vrc);
1773
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1774
                                janus_recorder *tmp = source->vrc;
1775
                                source->vrc = NULL;
1776
                                janus_recorder_free(tmp);
1777
                        }
1778
                        janus_mutex_unlock(&source->rec_mutex);
1779
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1780
                }
1781
                janus_mutex_unlock(&mountpoints_mutex);
1782
                /* Send a success response back */
1783
                response = json_object();
1784
                json_object_set_new(response, "streaming", json_string("ok"));
1785
                goto plugin_response;
1786
        } else if(!strcasecmp(request_text, "watch") || !strcasecmp(request_text, "start")
1787
                        || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "stop")
1788
                        || !strcasecmp(request_text, "switch")) {
1789
                /* These messages are handled asynchronously */
1790
                janus_streaming_message *msg = g_malloc0(sizeof(janus_streaming_message));
1791
                if(msg == NULL) {
1792
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1793
                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1794
                        g_snprintf(error_cause, 512, "Memory error");
1795
                        goto error;
1796
                }
1797

    
1798
                g_free(message);
1799
                msg->handle = handle;
1800
                msg->transaction = transaction;
1801
                msg->message = root;
1802
                msg->sdp_type = sdp_type;
1803
                msg->sdp = sdp;
1804

    
1805
                g_async_queue_push(messages, msg);
1806

    
1807
                return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
1808
        } else {
1809
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1810
                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1811
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1812
                goto error;
1813
        }
1814

    
1815
plugin_response:
1816
                {
1817
                        if(!response) {
1818
                                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1819
                                g_snprintf(error_cause, 512, "Invalid response");
1820
                                goto error;
1821
                        }
1822
                        if(root != NULL)
1823
                                json_decref(root);
1824
                        g_free(transaction);
1825
                        g_free(message);
1826
                        g_free(sdp_type);
1827
                        g_free(sdp);
1828

    
1829
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1830
                        json_decref(response);
1831
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1832
                        g_free(response_text);
1833
                        return result;
1834
                }
1835

    
1836
error:
1837
                {
1838
                        if(root != NULL)
1839
                                json_decref(root);
1840
                        g_free(transaction);
1841
                        g_free(message);
1842
                        g_free(sdp_type);
1843
                        g_free(sdp);
1844

    
1845
                        /* Prepare JSON error event */
1846
                        json_t *event = json_object();
1847
                        json_object_set_new(event, "streaming", json_string("event"));
1848
                        json_object_set_new(event, "error_code", json_integer(error_code));
1849
                        json_object_set_new(event, "error", json_string(error_cause));
1850
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1851
                        json_decref(event);
1852
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
1853
                        g_free(event_text);
1854
                        return result;
1855
                }
1856

    
1857
}
1858

    
1859
void janus_streaming_setup_media(janus_plugin_session *handle) {
1860
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
1861
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1862
                return;
1863
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1864
        if(!session) {
1865
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1866
                return;
1867
        }
1868
        if(session->destroyed)
1869
                return;
1870
        g_atomic_int_set(&session->hangingup, 0);
1871
        /* We only start streaming towards this user when we get this event */
1872
        session->context.a_last_ssrc = 0;
1873
        session->context.a_last_ssrc = 0;
1874
        session->context.a_last_ts = 0;
1875
        session->context.a_base_ts = 0;
1876
        session->context.a_base_ts_prev = 0;
1877
        session->context.v_last_ssrc = 0;
1878
        session->context.v_last_ts = 0;
1879
        session->context.v_base_ts = 0;
1880
        session->context.v_base_ts_prev = 0;
1881
        session->context.a_last_seq = 0;
1882
        session->context.a_base_seq = 0;
1883
        session->context.a_base_seq_prev = 0;
1884
        session->context.v_last_seq = 0;
1885
        session->context.v_base_seq = 0;
1886
        session->context.v_base_seq_prev = 0;
1887
        /* If this is related to a live RTP mountpoint, any keyframe we can shoot already? */
1888
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
1889
        if(mountpoint->streaming_source == janus_streaming_source_rtp) {
1890
                janus_streaming_rtp_source *source = mountpoint->source;
1891
                if(source->keyframe.enabled) {
1892
                        JANUS_LOG(LOG_HUGE, "Any keyframe to send?\n");
1893
                        janus_mutex_lock(&source->keyframe.mutex);
1894
                        if(source->keyframe.latest_keyframe != NULL) {
1895
                                JANUS_LOG(LOG_HUGE, "Yep! %d packets\n", g_list_length(source->keyframe.latest_keyframe));
1896
                                GList *temp = source->keyframe.latest_keyframe;
1897
                                while(temp) {
1898
                                        janus_streaming_relay_rtp_packet(session, temp->data);
1899
                                        temp = temp->next;
1900
                                }
1901
                        }
1902
                        janus_mutex_unlock(&source->keyframe.mutex);
1903
                }
1904
        }
1905
        session->started = TRUE;
1906
        /* Prepare JSON event */
1907
        json_t *event = json_object();
1908
        json_object_set_new(event, "streaming", json_string("event"));
1909
        json_t *result = json_object();
1910
        json_object_set_new(result, "status", json_string("started"));
1911
        json_object_set_new(event, "result", result);
1912
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1913
        json_decref(event);
1914
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1915
        int ret = gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1916
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1917
        g_free(event_text);
1918
}
1919

    
1920
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
1921
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1922
                return;
1923
        /* FIXME We don't care about what the browser sends us, we're sendonly */
1924
}
1925

    
1926
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
1927
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1928
                return;
1929
        /* We might interested in the available bandwidth that the user advertizes */
1930
        uint64_t bw = janus_rtcp_get_remb(buf, len);
1931
        if(bw > 0) {
1932
                JANUS_LOG(LOG_HUGE, "REMB for this PeerConnection: %"SCNu64"\n", bw);
1933
                /* TODO Use this somehow (e.g., notification towards application?) */
1934
        }
1935
        /* FIXME Maybe we should care about RTCP, but not now */
1936
}
1937

    
1938
void janus_streaming_hangup_media(janus_plugin_session *handle) {
1939
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
1940
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1941
                return;
1942
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1943
        if(!session) {
1944
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1945
                return;
1946
        }
1947
        if(session->destroyed)
1948
                return;
1949
        if(g_atomic_int_add(&session->hangingup, 1))
1950
                return;
1951
        /* FIXME Simulate a "stop" coming from the browser */
1952
        janus_streaming_message *msg = g_malloc0(sizeof(janus_streaming_message));
1953
        msg->handle = handle;
1954
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
1955
        msg->transaction = NULL;
1956
        msg->sdp_type = NULL;
1957
        msg->sdp = NULL;
1958
        g_async_queue_push(messages, msg);
1959
}
1960

    
1961
/* Thread to handle incoming messages */
1962
static void *janus_streaming_handler(void *data) {
1963
        JANUS_LOG(LOG_VERB, "Joining Streaming handler thread\n");
1964
        janus_streaming_message *msg = NULL;
1965
        int error_code = 0;
1966
        char error_cause[512];
1967
        json_t *root = NULL;
1968
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
1969
                msg = g_async_queue_pop(messages);
1970
                if(msg == NULL)
1971
                        continue;
1972
                if(msg == &exit_message)
1973
                        break;
1974
                if(msg->handle == NULL) {
1975
                        janus_streaming_message_free(msg);
1976
                        continue;
1977
                }
1978
                janus_streaming_session *session = NULL;
1979
                janus_mutex_lock(&sessions_mutex);
1980
                if(g_hash_table_lookup(sessions, msg->handle) != NULL ) {
1981
                        session = (janus_streaming_session *)msg->handle->plugin_handle;
1982
                }
1983
                janus_mutex_unlock(&sessions_mutex);
1984
                if(!session) {
1985
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1986
                        janus_streaming_message_free(msg);
1987
                        continue;
1988
                }
1989
                if(session->destroyed) {
1990
                        janus_streaming_message_free(msg);
1991
                        continue;
1992
                }
1993
                /* Handle request */
1994
                error_code = 0;
1995
                root = NULL;
1996
                if(msg->message == NULL) {
1997
                        JANUS_LOG(LOG_ERR, "No message??\n");
1998
                        error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1999
                        g_snprintf(error_cause, 512, "%s", "No message??");
2000
                        goto error;
2001
                }
2002
                root = msg->message;
2003
                /* Get the request first */
2004
                JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,
2005
                        error_code, error_cause, TRUE,
2006
                        JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2007
                if(error_code != 0)
2008
                        goto error;
2009
                json_t *request = json_object_get(root, "request");
2010
                const char *request_text = json_string_value(request);
2011
                json_t *result = NULL;
2012
                const char *sdp_type = NULL;
2013
                char *sdp = NULL;
2014
                /* All these requests can only be handled asynchronously */
2015
                if(!strcasecmp(request_text, "watch")) {
2016
                        JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
2017
                                error_code, error_cause, TRUE,
2018
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2019
                        if(error_code != 0)
2020
                                goto error;
2021
                        json_t *id = json_object_get(root, "id");
2022
                        gint64 id_value = json_integer_value(id);
2023
                        janus_mutex_lock(&mountpoints_mutex);
2024
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2025
                        if(mp == NULL) {
2026
                                janus_mutex_unlock(&mountpoints_mutex);
2027
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2028
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2029
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2030
                                goto error;
2031
                        }
2032
                        /* A secret may be required for this action */
2033
                        JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause,
2034
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
2035
                        if(error_code != 0) {
2036
                                janus_mutex_unlock(&mountpoints_mutex);
2037
                                goto error;
2038
                        }
2039
                        janus_mutex_unlock(&mountpoints_mutex);
2040
                        JANUS_LOG(LOG_VERB, "Request to watch mountpoint/stream %"SCNu64"\n", id_value);
2041
                        session->stopping = FALSE;
2042
                        session->mountpoint = mp;
2043
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
2044
                                GError *error = NULL;
2045
                                g_thread_try_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session, &error);
2046
                                if(error != NULL) {
2047
                                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the on-demand thread...\n", error->code, error->message ? error->message : "??");
2048
                                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
2049
                                        g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the on-demand thread", error->code, error->message ? error->message : "??");
2050
                                        goto error;
2051
                                }
2052
                        }
2053
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
2054
                        janus_mutex_lock(&mp->mutex);
2055
                        mp->listeners = g_list_append(mp->listeners, session);
2056
                        janus_mutex_unlock(&mp->mutex);
2057
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
2058
                        char sdptemp[2048];
2059
                        memset(sdptemp, 0, 2048);
2060
                        gchar buffer[512];
2061
                        memset(buffer, 0, 512);
2062
                        gint64 sessid = janus_get_real_time();
2063
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
2064
                        g_snprintf(buffer, 512,
2065
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
2066
                                        "-", sessid, version);
2067
                        g_strlcat(sdptemp, buffer, 2048);
2068
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 2048);
2069
                        if(mp->codecs.audio_pt >= 0) {
2070
                                /* Add audio line */
2071
                                g_snprintf(buffer, 512,
2072
                                        "m=audio 1 RTP/SAVPF %d\r\n"
2073
                                        "c=IN IP4 1.1.1.1\r\n",
2074
                                        mp->codecs.audio_pt);
2075
                                g_strlcat(sdptemp, buffer, 2048);
2076
                                if(mp->codecs.audio_rtpmap) {
2077
                                        g_snprintf(buffer, 512,
2078
                                                "a=rtpmap:%d %s\r\n",
2079
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
2080
                                        g_strlcat(sdptemp, buffer, 2048);
2081
                                }
2082
                                if(mp->codecs.audio_fmtp) {
2083
                                        g_snprintf(buffer, 512,
2084
                                                "a=fmtp:%d %s\r\n",
2085
                                                mp->codecs.audio_pt, mp->codecs.audio_fmtp);
2086
                                        g_strlcat(sdptemp, buffer, 2048);
2087
                                }
2088
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2089
                        }
2090
                        if(mp->codecs.video_pt >= 0) {
2091
                                /* Add video line */
2092
                                g_snprintf(buffer, 512,
2093
                                        "m=video 1 RTP/SAVPF %d\r\n"
2094
                                        "c=IN IP4 1.1.1.1\r\n",
2095
                                        mp->codecs.video_pt);
2096
                                g_strlcat(sdptemp, buffer, 2048);
2097
                                if(mp->codecs.video_rtpmap) {
2098
                                        g_snprintf(buffer, 512,
2099
                                                "a=rtpmap:%d %s\r\n",
2100
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
2101
                                        g_strlcat(sdptemp, buffer, 2048);
2102
                                }
2103
                                if(mp->codecs.video_fmtp) {
2104
                                        g_snprintf(buffer, 512,
2105
                                                "a=fmtp:%d %s\r\n",
2106
                                                mp->codecs.video_pt, mp->codecs.video_fmtp);
2107
                                        g_strlcat(sdptemp, buffer, 2048);
2108
                                }
2109
                                g_snprintf(buffer, 512,
2110
                                        "a=rtcp-fb:%d nack\r\n",
2111
                                        mp->codecs.video_pt);
2112
                                g_strlcat(sdptemp, buffer, 2048);
2113
                                g_snprintf(buffer, 512,
2114
                                        "a=rtcp-fb:%d goog-remb\r\n",
2115
                                        mp->codecs.video_pt);
2116
                                g_strlcat(sdptemp, buffer, 2048);
2117
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
2118
                        }
2119
                        sdp = g_strdup(sdptemp);
2120
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
2121
                        result = json_object();
2122
                        json_object_set_new(result, "status", json_string("preparing"));
2123
                } else if(!strcasecmp(request_text, "start")) {
2124
                        if(session->mountpoint == NULL) {
2125
                                JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
2126
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2127
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2128
                                goto error;
2129
                        }
2130
                        JANUS_LOG(LOG_VERB, "Starting the streaming\n");
2131
                        session->paused = FALSE;
2132
                        result = json_object();
2133
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
2134
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
2135
                } else if(!strcasecmp(request_text, "pause")) {
2136
                        if(session->mountpoint == NULL) {
2137
                                JANUS_LOG(LOG_VERB, "Can't pause: no mountpoint set\n");
2138
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2139
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
2140
                                goto error;
2141
                        }
2142
                        JANUS_LOG(LOG_VERB, "Pausing the streaming\n");
2143
                        session->paused = TRUE;
2144
                        result = json_object();
2145
                        json_object_set_new(result, "status", json_string("pausing"));
2146
                } else if(!strcasecmp(request_text, "switch")) {
2147
                        /* This listener wants to switch to a different mountpoint
2148
                         * NOTE: this only works for live RTP streams as of now: you
2149
                         * cannot, for instance, switch from a live RTP mountpoint to
2150
                         * an on demand one or viceversa (TBD.) */
2151
                        janus_streaming_mountpoint *oldmp = session->mountpoint;
2152
                        if(oldmp == NULL) {
2153
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a mountpoint\n");
2154
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2155
                                g_snprintf(error_cause, 512, "Can't switch: not on a mountpoint");
2156
                                goto error;
2157
                        }
2158
                        if(oldmp->streaming_type != janus_streaming_type_live || 
2159
                                        oldmp->streaming_source != janus_streaming_source_rtp) {
2160
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a live RTP mountpoint\n");
2161
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2162
                                g_snprintf(error_cause, 512, "Can't switch: not on a live RTP mountpoint");
2163
                                goto error;
2164
                        }
2165
                        JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
2166
                                error_code, error_cause, TRUE,
2167
                                JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
2168
                        if(error_code != 0)
2169
                                goto error;
2170
                        json_t *id = json_object_get(root, "id");
2171
                        gint64 id_value = json_integer_value(id);
2172
                        janus_mutex_lock(&mountpoints_mutex);
2173
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
2174
                        if(mp == NULL) {
2175
                                janus_mutex_unlock(&mountpoints_mutex);
2176
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
2177
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
2178
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
2179
                                goto error;
2180
                        }
2181
                        if(mp->streaming_type != janus_streaming_type_live || 
2182
                                        mp->streaming_source != janus_streaming_source_rtp) {
2183
                                janus_mutex_unlock(&mountpoints_mutex);
2184
                                JANUS_LOG(LOG_VERB, "Can't switch: target is not a live RTP mountpoint\n");
2185
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
2186
                                g_snprintf(error_cause, 512, "Can't switch: target is not a live RTP mountpoint");
2187
                                goto error;
2188
                        }
2189
                        janus_mutex_unlock(&mountpoints_mutex);
2190
                        JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %"SCNu64" (old: %"SCNu64")\n", id_value, oldmp->id);
2191
                        session->paused = TRUE;
2192
                        /* Unsubscribe from the previous mountpoint and subscribe to the new one */
2193
                        janus_mutex_lock(&oldmp->mutex);
2194
                        oldmp->listeners = g_list_remove_all(oldmp->listeners, session);
2195
                        janus_mutex_unlock(&oldmp->mutex);
2196
                        /* Subscribe to the new one */
2197
                        janus_mutex_lock(&mp->mutex);
2198
                        mp->listeners = g_list_append(mp->listeners, session);
2199
                        janus_mutex_unlock(&mp->mutex);
2200
                        session->mountpoint = mp;
2201
                        session->paused = FALSE;
2202
                        /* Done */
2203
                        result = json_object();
2204
                        json_object_set_new(result, "streaming", json_string("event"));
2205
                        json_object_set_new(result, "switched", json_string("ok"));
2206
                        json_object_set_new(result, "id", json_integer(id_value));
2207
                } else if(!strcasecmp(request_text, "stop")) {
2208
                        if(session->stopping || !session->started) {
2209
                                /* Been there, done that: ignore */
2210
                                janus_streaming_message_free(msg);
2211
                                continue;
2212
                        }
2213
                        JANUS_LOG(LOG_VERB, "Stopping the streaming\n");
2214
                        session->stopping = TRUE;
2215
                        session->started = FALSE;
2216
                        session->paused = FALSE;
2217
                        result = json_object();
2218
                        json_object_set_new(result, "status", json_string("stopping"));
2219
                        if(session->mountpoint) {
2220
                                janus_mutex_lock(&session->mountpoint->mutex);
2221
                                JANUS_LOG(LOG_VERB, "  -- Removing the session from the mountpoint listeners\n");
2222
                                if(g_list_find(session->mountpoint->listeners, session) != NULL) {
2223
                                        JANUS_LOG(LOG_VERB, "  -- -- Found!\n");
2224
                                }
2225
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
2226
                                janus_mutex_unlock(&session->mountpoint->mutex);
2227
                        }
2228
                        session->mountpoint = NULL;
2229
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
2230
                        gateway->close_pc(session->handle);
2231
                } else {
2232
                        JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
2233
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
2234
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
2235
                        goto error;
2236
                }
2237
                
2238
                /* Any SDP to handle? */
2239
                if(msg->sdp) {
2240
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
2241
                }
2242

    
2243
                /* Prepare JSON event */
2244
                json_t *event = json_object();
2245
                json_object_set_new(event, "streaming", json_string("event"));
2246
                if(result != NULL)
2247
                        json_object_set_new(event, "result", result);
2248
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2249
                json_decref(event);
2250
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2251
                int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp);
2252
                JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2253
                g_free(event_text);
2254
                if(sdp)
2255
                        g_free(sdp);
2256
                janus_streaming_message_free(msg);
2257
                continue;
2258
                
2259
error:
2260
                {
2261
                        /* Prepare JSON error event */
2262
                        json_t *event = json_object();
2263
                        json_object_set_new(event, "streaming", json_string("event"));
2264
                        json_object_set_new(event, "error_code", json_integer(error_code));
2265
                        json_object_set_new(event, "error", json_string(error_cause));
2266
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2267
                        json_decref(event);
2268
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2269
                        int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL);
2270
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2271
                        g_free(event_text);
2272
                        janus_streaming_message_free(msg);
2273
                }
2274
        }
2275
        JANUS_LOG(LOG_VERB, "Leaving Streaming handler thread\n");
2276
        return NULL;
2277
}
2278

    
2279
/* Helpers to create a listener filedescriptor */
2280
static int janus_streaming_create_fd(int port, in_addr_t mcast, const char* listenername, const char* medianame, const char* mountpointname) {
2281
        struct sockaddr_in address;
2282
        int fd = socket(AF_INET, SOCK_DGRAM, 0);
2283
        if(fd < 0) {
2284
                JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s...\n", mountpointname, medianame);
2285
                return -1;
2286
        }        
2287
        if(port > 0) {
2288
                if(IN_MULTICAST(ntohl(mcast))) {
2289
#ifdef IP_MULTICAST_ALL                        
2290
                        int mc_all = 0;
2291
                        if((setsockopt(fd, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all))) < 0) {
2292
                                JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt IP_MULTICAST_ALL failed\n", mountpointname, listenername);
2293
                                close(fd);
2294
                                return -1;
2295
                        }                        
2296
#endif                        
2297
                        struct ip_mreq mreq;
2298
                        memset(&mreq, 0, sizeof(mreq));
2299
                        mreq.imr_multiaddr.s_addr = mcast;
2300
                        if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) {
2301
                                JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP failed\n", mountpointname, listenername);
2302
                                close(fd);
2303
                                return -1;
2304
                        }
2305
                        JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP ok\n", mountpointname, listenername);
2306
                }
2307
        }
2308

    
2309
        address.sin_family = AF_INET;
2310
        address.sin_port = htons(port);
2311
        address.sin_addr.s_addr = INADDR_ANY;
2312
        if(bind(fd, (struct sockaddr *)(&address), sizeof(struct sockaddr)) < 0) {
2313
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (port %d)...\n", mountpointname, medianame, port);
2314
                close(fd);
2315
                return -1;
2316
        }
2317
        return fd;
2318
}
2319

    
2320
/* Helpers to destroy a streaming mountpoint. */
2321
static void janus_streaming_rtp_source_free(janus_streaming_rtp_source *source) {
2322
        if(source->audio_fd > 0) {
2323
                close(source->audio_fd);
2324
        }
2325
        if(source->video_fd > 0) {
2326
                close(source->video_fd);
2327
        }
2328
        janus_mutex_lock(&source->keyframe.mutex);
2329
        GList *temp = NULL;
2330
        while(source->keyframe.latest_keyframe) {
2331
                temp = g_list_first(source->keyframe.latest_keyframe);
2332
                source->keyframe.latest_keyframe = g_list_remove_link(source->keyframe.latest_keyframe, temp);
2333
                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
2334
                g_free(pkt->data);
2335
                g_free(pkt);
2336
                g_list_free(temp);
2337
        }
2338
        source->keyframe.latest_keyframe = NULL;
2339
        while(source->keyframe.temp_keyframe) {
2340
                temp = g_list_first(source->keyframe.temp_keyframe);
2341
                source->keyframe.temp_keyframe = g_list_remove_link(source->keyframe.temp_keyframe, temp);
2342
                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
2343
                g_free(pkt->data);
2344
                g_free(pkt);
2345
                g_list_free(temp);
2346
        }
2347
        source->keyframe.latest_keyframe = NULL;
2348
        janus_mutex_unlock(&source->keyframe.mutex);
2349
#ifdef HAVE_LIBCURL
2350
        if(source->curl) {
2351
                /* Send an RTSP TEARDOWN */
2352
                curl_easy_setopt(source->curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_TEARDOWN);                
2353
                int res = curl_easy_perform(source->curl);
2354
                if(res != CURLE_OK) {
2355
                        JANUS_LOG(LOG_ERR, "Couldn't send TEARDOWN request: %s\n", curl_easy_strerror(res));
2356
                }                        
2357
                curl_easy_cleanup(source->curl);
2358
        }
2359
#endif
2360
        g_free(source);
2361
}
2362

    
2363
static void janus_streaming_file_source_free(janus_streaming_file_source *source) {
2364
        g_free(source->filename);
2365
        g_free(source);
2366
}
2367

    
2368
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp) {
2369
        mp->destroyed = janus_get_monotonic_time();
2370
        
2371
        g_free(mp->name);
2372
        g_free(mp->description);
2373
        g_free(mp->secret);
2374
        g_free(mp->pin);
2375
        janus_mutex_lock(&mp->mutex);
2376
        g_list_free(mp->listeners);
2377
        janus_mutex_unlock(&mp->mutex);
2378

    
2379
        if(mp->source != NULL && mp->source_destroy != NULL) {
2380
                mp->source_destroy(mp->source);
2381
        }
2382

    
2383
        g_free(mp->codecs.audio_rtpmap);
2384
        g_free(mp->codecs.audio_fmtp);
2385
        g_free(mp->codecs.video_rtpmap);
2386
        g_free(mp->codecs.video_fmtp);
2387

    
2388
        g_free(mp);
2389
}
2390

    
2391

    
2392
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
2393
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
2394
                uint64_t id, char *name, char *desc,
2395
                gboolean doaudio, char *amcast, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
2396
                gboolean dovideo, char *vmcast, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf) {
2397
        janus_mutex_lock(&mountpoints_mutex);
2398
        if(id == 0) {
2399
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2400
                while(id == 0) {
2401
                        id = g_random_int();
2402
                        if(g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id)) != NULL) {
2403
                                /* ID already in use, try another one */
2404
                                id = 0;
2405
                        }
2406
                }
2407
        }
2408
        char tempname[255];
2409
        if(name == NULL) {
2410
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2411
                memset(tempname, 0, 255);
2412
                g_snprintf(tempname, 255, "%"SCNu64, id);
2413
        }
2414
        if(!doaudio && !dovideo) {
2415
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
2416
                janus_mutex_unlock(&mountpoints_mutex);
2417
                return NULL;
2418
        }
2419
        if(doaudio && (aport == 0 || artpmap == NULL)) {
2420
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for audio...\n");
2421
                janus_mutex_unlock(&mountpoints_mutex);
2422
                return NULL;
2423
        }
2424
        if(dovideo && (vport == 0 || vcodec == 0 || vrtpmap == NULL)) {
2425
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for video...\n");
2426
                janus_mutex_unlock(&mountpoints_mutex);
2427
                return NULL;
2428
        }
2429
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
2430
        /* First of all, let's check if the requested ports are free */
2431
        int audio_fd = -1;
2432
        if(doaudio) {
2433
                audio_fd = janus_streaming_create_fd(aport, amcast ? inet_addr(amcast) : INADDR_ANY,
2434
                        "Audio", "audio", name ? name : tempname);
2435
                if(audio_fd < 0) {
2436
                        JANUS_LOG(LOG_ERR, "Can't bind to port %d for audio...\n", aport);
2437
                        janus_mutex_unlock(&mountpoints_mutex);
2438
                        return NULL;
2439
                }
2440
        }
2441
        int video_fd = -1;
2442
        if(dovideo) {
2443
                video_fd = janus_streaming_create_fd(vport, vmcast ? inet_addr(vmcast) : INADDR_ANY,
2444
                        "Video", "video", name ? name : tempname);
2445
                if(video_fd < 0) {
2446
                        JANUS_LOG(LOG_ERR, "Can't bind to port %d for video...\n", vport);
2447
                        if(audio_fd > 0)
2448
                                close(audio_fd);
2449
                        janus_mutex_unlock(&mountpoints_mutex);
2450
                        return NULL;
2451
                }
2452
        }
2453
        /* Create the mountpoint */
2454
        janus_streaming_mountpoint *live_rtp = g_malloc0(sizeof(janus_streaming_mountpoint));
2455
        if(live_rtp == NULL) {
2456
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2457
                janus_mutex_unlock(&mountpoints_mutex);
2458
                return NULL;
2459
        }
2460
        live_rtp->id = id;
2461
        live_rtp->name = g_strdup(name ? name : tempname);
2462
        char *description = NULL;
2463
        if(desc != NULL)
2464
                description = g_strdup(desc);
2465
        else
2466
                description = g_strdup(name ? name : tempname);
2467
        live_rtp->description = description;
2468
        live_rtp->enabled = TRUE;
2469
        live_rtp->active = FALSE;
2470
        live_rtp->streaming_type = janus_streaming_type_live;
2471
        live_rtp->streaming_source = janus_streaming_source_rtp;
2472
        janus_streaming_rtp_source *live_rtp_source = g_malloc0(sizeof(janus_streaming_rtp_source));
2473
        if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
2474
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2475
                if(live_rtp->name)
2476
                        g_free(live_rtp->name);
2477
                if(description)
2478
                        g_free(description);
2479
                if(live_rtp_source)
2480
                        g_free(live_rtp_source);
2481
                g_free(live_rtp);
2482
                janus_mutex_unlock(&mountpoints_mutex);
2483
                return NULL;
2484
        }
2485
        live_rtp_source->audio_mcast = doaudio ? (amcast ? inet_addr(amcast) : INADDR_ANY) : INADDR_ANY;
2486
        live_rtp_source->audio_port = doaudio ? aport : -1;
2487
        live_rtp_source->video_mcast = dovideo ? (vmcast ? inet_addr(vmcast) : INADDR_ANY) : INADDR_ANY;
2488
        live_rtp_source->video_port = dovideo ? vport : -1;
2489
        live_rtp_source->arc = NULL;
2490
        live_rtp_source->vrc = NULL;
2491
        janus_mutex_init(&live_rtp_source->rec_mutex);
2492
        live_rtp_source->audio_fd = audio_fd;
2493
        live_rtp_source->video_fd = video_fd;
2494
        live_rtp_source->last_received_audio = janus_get_monotonic_time();
2495
        live_rtp_source->last_received_video = janus_get_monotonic_time();
2496
        live_rtp_source->keyframe.enabled = bufferkf;
2497
        live_rtp_source->keyframe.latest_keyframe = NULL;
2498
        live_rtp_source->keyframe.temp_keyframe = NULL;
2499
        live_rtp_source->keyframe.temp_ts = 0;
2500
        janus_mutex_init(&live_rtp_source->keyframe.mutex);
2501
        live_rtp->source = live_rtp_source;
2502
        live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2503
        live_rtp->codecs.audio_pt = doaudio ? acodec : -1;
2504
        live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2505
        live_rtp->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL;
2506
        live_rtp->codecs.video_codec = -1;
2507
        if(dovideo) {
2508
                if(strstr(vrtpmap, "vp8") || strstr(vrtpmap, "VP8"))
2509
                        live_rtp->codecs.video_codec = JANUS_STREAMING_VP8;
2510
                else if(strstr(vrtpmap, "vp9") || strstr(vrtpmap, "VP9"))
2511
                        live_rtp->codecs.video_codec = JANUS_STREAMING_VP9;
2512
                else if(strstr(vrtpmap, "h264") || strstr(vrtpmap, "H264"))
2513
                        live_rtp->codecs.video_codec = JANUS_STREAMING_H264;
2514
        }
2515
        live_rtp->codecs.video_pt = dovideo ? vcodec : -1;
2516
        live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2517
        live_rtp->codecs.video_fmtp = dovideo ? (vfmtp ? g_strdup(vfmtp) : NULL) : NULL;
2518
        live_rtp->listeners = NULL;
2519
        live_rtp->destroyed = 0;
2520
        janus_mutex_init(&live_rtp->mutex);
2521
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtp->id), live_rtp);
2522
        janus_mutex_unlock(&mountpoints_mutex);
2523
        GError *error = NULL;
2524
        g_thread_try_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp, &error);
2525
        if(error != NULL) {
2526
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTP thread...\n", error->code, error->message ? error->message : "??");
2527
                if(live_rtp->name)
2528
                        g_free(live_rtp->name);
2529
                if(description)
2530
                        g_free(description);
2531
                if(live_rtp_source)
2532
                        g_free(live_rtp_source);
2533
                g_free(live_rtp);
2534
                return NULL;
2535
        }
2536
        return live_rtp;
2537
}
2538

    
2539
/* Helper to create a file/ondemand live source */
2540
janus_streaming_mountpoint *janus_streaming_create_file_source(
2541
                uint64_t id, char *name, char *desc, char *filename,
2542
                gboolean live, gboolean doaudio, gboolean dovideo) {
2543
        janus_mutex_lock(&mountpoints_mutex);
2544
        if(filename == NULL) {
2545
                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, missing filename...\n");
2546
                janus_mutex_unlock(&mountpoints_mutex);
2547
                return NULL;
2548
        }
2549
        if(name == NULL) {
2550
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2551
        }
2552
        if(id == 0) {
2553
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2554
                while(id == 0) {
2555
                        id = g_random_int();
2556
                        if(g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id)) != NULL) {
2557
                                /* ID already in use, try another one */
2558
                                id = 0;
2559
                        }
2560
                }
2561
        }
2562
        if(!doaudio && !dovideo) {
2563
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, no audio or video have to be streamed...\n");
2564
                janus_mutex_unlock(&mountpoints_mutex);
2565
                return NULL;
2566
        }
2567
        /* FIXME We don't support video streaming from file yet */
2568
        if(!doaudio || dovideo) {
2569
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, we only support audio file streaming right now...\n");
2570
                janus_mutex_unlock(&mountpoints_mutex);
2571
                return NULL;
2572
        }
2573
        /* TODO We should support something more than raw a-Law and mu-Law streams... */
2574
        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
2575
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
2576
                janus_mutex_unlock(&mountpoints_mutex);
2577
                return NULL;
2578
        }
2579
        janus_streaming_mountpoint *file_source = g_malloc0(sizeof(janus_streaming_mountpoint));
2580
        if(file_source == NULL) {
2581
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2582
                janus_mutex_unlock(&mountpoints_mutex);
2583
                return NULL;
2584
        }
2585
        file_source->id = id;
2586
        char tempname[255];
2587
        if(!name) {
2588
                memset(tempname, 0, 255);
2589
                g_snprintf(tempname, 255, "%"SCNu64, file_source->id);
2590
        }
2591
        file_source->name = g_strdup(name ? name : tempname);
2592
        char *description = NULL;
2593
        if(desc != NULL)
2594
                description = g_strdup(desc);
2595
        else
2596
                description = g_strdup(name ? name : tempname);
2597
        file_source->description = description;
2598
        file_source->enabled = TRUE;
2599
        file_source->active = FALSE;
2600
        file_source->streaming_type = live ? janus_streaming_type_live : janus_streaming_type_on_demand;
2601
        file_source->streaming_source = janus_streaming_source_file;
2602
        janus_streaming_file_source *file_source_source = g_malloc0(sizeof(janus_streaming_file_source));
2603
        if(file_source->name == NULL || description == NULL || file_source_source == NULL) {
2604
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2605
                if(file_source->name)
2606
                        g_free(file_source->name);
2607
                if(description)
2608
                        g_free(description);
2609
                if(file_source_source)
2610
                        g_free(file_source_source);
2611
                g_free(file_source);
2612
                janus_mutex_unlock(&mountpoints_mutex);
2613
                return NULL;
2614
        }
2615
        file_source_source->filename = g_strdup(filename);
2616
        file_source->source = file_source_source;
2617
        file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free;
2618
        file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0;
2619
        file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000");
2620
        file_source->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
2621
        file_source->codecs.video_rtpmap = NULL;
2622
        file_source->listeners = NULL;
2623
        file_source->destroyed = 0;
2624
        janus_mutex_init(&file_source->mutex);
2625
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(file_source->id), file_source);
2626
        janus_mutex_unlock(&mountpoints_mutex);
2627
        if(live) {
2628
                GError *error = NULL;
2629
                g_thread_try_new(file_source->name, &janus_streaming_filesource_thread, file_source, &error);
2630
                if(error != NULL) {
2631
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the live filesource thread...\n", error->code, error->message ? error->message : "??");
2632
                        if(file_source->name)
2633
                                g_free(file_source->name);
2634
                        if(description)
2635
                                g_free(description);
2636
                        if(file_source_source)
2637
                                g_free(file_source_source);
2638
                        g_free(file_source);
2639
                        return NULL;
2640
                }
2641
        }
2642
        return file_source;
2643
}
2644

    
2645
#ifdef HAVE_LIBCURL                
2646
typedef struct janus_streaming_buffer {
2647
        char *buffer;
2648
        size_t size;
2649
} janus_streaming_buffer;
2650

    
2651
static size_t janus_streaming_rtsp_curl_callback(void *payload, size_t size, size_t nmemb, void *data) {
2652
        size_t realsize = size * nmemb;
2653
        janus_streaming_buffer *buf = (struct janus_streaming_buffer *)data;
2654
        /* (Re)allocate if needed */
2655
        buf->buffer = realloc(buf->buffer, buf->size+realsize+1);
2656
        if(buf->buffer == NULL) {
2657
                /* Memory error! */ 
2658
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2659
                return 0;
2660
        }
2661
        /* Update the buffer */
2662
        memcpy(&(buf->buffer[buf->size]), payload, realsize);
2663
        buf->size += realsize;
2664
        buf->buffer[buf->size] = 0;
2665
        /* Done! */
2666
        return realsize;
2667
}
2668

    
2669
static int janus_streaming_rtsp_parse_sdp(const char* buffer, const char* name, const char* media, int* pt, char* transport, char* rtpmap, char* fmtp, char* control) {
2670
        int port=-1;
2671
        char pattern[256];
2672
        g_snprintf(pattern, sizeof(pattern), "m=%s", media);
2673
        char *m=strstr(buffer, pattern);
2674
        if(m == NULL) {
2675
                JANUS_LOG(LOG_VERB, "[%s] no media %s...\n", name, media);
2676
                return -1;
2677
        }
2678
        sscanf(m, "m=%*s %d %*s %d", &port, pt);
2679
        char *s=strstr(m, "a=control:");
2680
        if(s == NULL) {
2681
                JANUS_LOG(LOG_ERR, "[%s] no control for %s...\n", name, media);
2682
                return -1;
2683
        }                
2684
        sscanf(s, "a=control:%s", control);
2685
        char *r=strstr(m, "a=rtpmap:");
2686
        if(r != NULL) {
2687
                sscanf(r, "a=rtpmap:%*d %s", rtpmap);
2688
        }
2689
        char *f=strstr(m, "a=fmtp:");
2690
        if(f != NULL) {
2691
                sscanf(f, "a=fmtp:%*d %s", fmtp);
2692
        }        
2693
        char *c=strstr(m, "c=IN IP4");
2694
        char ip[256];
2695
        in_addr_t mcast = INADDR_ANY;
2696
        if(c != NULL) {
2697
                if(sscanf(c, "c=IN IP4 %[^/]", ip) != 0) {
2698
                        mcast = inet_addr(ip);
2699
                }
2700
        }        
2701
        int fd = janus_streaming_create_fd(port, mcast, media, media, name);
2702
        if(fd < 0) {
2703
                return -1;
2704
        }
2705
        struct sockaddr_in address;        
2706
        socklen_t len = sizeof(address);
2707
        if(getsockname(fd, (struct sockaddr *)&address, &len) < 0) {
2708
                JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (%d)...\n", name, media, port);
2709
                close(fd);
2710
                return -1;
2711
        }
2712
        port=ntohs(address.sin_port);
2713
        if(IN_MULTICAST(ntohl(mcast))) {
2714
                sprintf(transport, "RTP/AVP;multicast;client_port=%d-%d", port, port+1);        
2715
        } else {
2716
                sprintf(transport, "RTP/AVP;unicast;client_port=%d-%d", port, port+1);        
2717
        }
2718

    
2719
        return fd;
2720
}        
2721

    
2722
/* Helper to create an RTSP source */
2723
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2724
                uint64_t id, char *name, char *desc, char *url,
2725
                gboolean doaudio, gboolean dovideo) {
2726
        if(url == NULL) {
2727
                JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n");
2728
                return NULL;
2729
        }        
2730
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");        
2731
        CURL* curl = curl_easy_init();
2732
        if(curl == NULL) {
2733
                JANUS_LOG(LOG_ERR, "Can't init CURL\n");
2734
                return NULL;
2735
        }
2736
        if(janus_log_level > LOG_INFO)
2737
                curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
2738
        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
2739
        curl_easy_setopt(curl, CURLOPT_URL, url);        
2740
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); 
2741
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 0L);          
2742
        /* Send an RTSP DESCRIBE */
2743
        janus_streaming_buffer data;
2744
        data.buffer = g_malloc0(1);
2745
        data.size = 0;
2746
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, url);
2747
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_DESCRIBE);
2748
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, janus_streaming_rtsp_curl_callback);                
2749
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &data);
2750
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, janus_streaming_rtsp_curl_callback);                
2751
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, &data);
2752
        int res = curl_easy_perform(curl);
2753
        if(res != CURLE_OK) {
2754
                JANUS_LOG(LOG_ERR, "Couldn't send DESCRIBE request: %s\n", curl_easy_strerror(res));
2755
                curl_easy_cleanup(curl);
2756
                return NULL;
2757
        }                
2758
        long code = 0;
2759
        res = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &code);
2760
        if(res != CURLE_OK) {
2761
                JANUS_LOG(LOG_ERR, "Couldn't get DESCRIBE answer: %s\n", curl_easy_strerror(res));
2762
                curl_easy_cleanup(curl);
2763
                return NULL;
2764
        } else if(code != 200) {
2765
                JANUS_LOG(LOG_ERR, "Couldn't get DESCRIBE code: %ld\n", code);
2766
                curl_easy_cleanup(curl);
2767
                return NULL;
2768
        }                         
2769
        JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n",data.buffer);        
2770
        /* Parse the SDP we just got to figure out the negotiated media */
2771
        int vpt = -1;
2772
        char vrtpmap[2048];
2773
        char vfmtp[2048];
2774
        char vcontrol[2048];
2775
        char uri[1024];
2776
        char transport[1024];
2777
        int video_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "video", &vpt, transport, vrtpmap, vfmtp, vcontrol);
2778
        if(video_fd != -1) {
2779
                /* Send an RTSP SETUP for video */
2780
                g_free(data.buffer);
2781
                data.buffer = g_malloc0(1);
2782
                data.size = 0;                
2783
                sprintf(uri, "%s/%s", url, vcontrol);
2784
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2785
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2786
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2787
                res = curl_easy_perform(curl);
2788
                if(res != CURLE_OK) {
2789
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2790
                        curl_easy_cleanup(curl);
2791
                        return NULL;
2792
                }                
2793
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2794
        }
2795
        int apt = -1;
2796
        char artpmap[2048];
2797
        char afmtp[2048];
2798
        char acontrol[2048];
2799
        int audio_fd = janus_streaming_rtsp_parse_sdp(data.buffer, name, "audio", &apt, transport, artpmap, afmtp, acontrol);
2800
        if(audio_fd != -1) {
2801
                /* Send an RTSP SETUP for audio */
2802
                g_free(data.buffer);
2803
                data.buffer = g_malloc0(1);
2804
                data.size = 0;                
2805
                sprintf(uri, "%s/%s", url, acontrol);
2806
                curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2807
                curl_easy_setopt(curl, CURLOPT_RTSP_TRANSPORT, transport);
2808
                curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP);
2809
                res = curl_easy_perform(curl);
2810
                if(res != CURLE_OK) {
2811
                        JANUS_LOG(LOG_ERR, "Couldn't send SETUP request: %s\n", curl_easy_strerror(res));
2812
                        curl_easy_cleanup(curl);
2813
                        return NULL;
2814
                }                
2815
                JANUS_LOG(LOG_VERB, "SETUP answer:%s\n",data.buffer);
2816
        }        
2817
        janus_mutex_lock(&mountpoints_mutex);
2818
        /* Create an RTP source for the media we'll get */
2819
        if(id == 0) {
2820
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2821
                while(id == 0) {
2822
                        id = g_random_int();
2823
                        if(g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id)) != NULL) {
2824
                                /* ID already in use, try another one */
2825
                                id = 0;
2826
                        }
2827
                }
2828
        }
2829
        char tempname[255];
2830
        if(name == NULL) {
2831
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2832
                memset(tempname, 0, 255);
2833
                g_snprintf(tempname, 255, "%"SCNu64, id);
2834
        }
2835
        char *sourcename =  g_strdup(name ? name : tempname);
2836
        if(sourcename == NULL) {
2837
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2838
                janus_mutex_unlock(&mountpoints_mutex);
2839
                curl_easy_cleanup(curl);
2840
                return NULL;
2841
        }        
2842
        char *description = NULL;
2843
        if(desc != NULL) {
2844
                description = g_strdup(desc);
2845
        } else {
2846
                description = g_strdup(name ? name : tempname);
2847
        }
2848
        if(description == NULL) {
2849
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2850
                g_free(sourcename);
2851
                janus_mutex_unlock(&mountpoints_mutex);
2852
                curl_easy_cleanup(curl);
2853
                return NULL;
2854
        }                
2855
        janus_streaming_mountpoint *live_rtsp = g_malloc0(sizeof(janus_streaming_mountpoint));
2856
        if(live_rtsp == NULL) {
2857
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2858
                g_free(description);
2859
                g_free(sourcename);
2860
                janus_mutex_unlock(&mountpoints_mutex);
2861
                curl_easy_cleanup(curl);
2862
                return NULL;
2863
        }        
2864
        live_rtsp->id = id ? id : g_random_int();
2865
        live_rtsp->name = sourcename;
2866
        live_rtsp->description = description;
2867
        live_rtsp->enabled = TRUE;
2868
        live_rtsp->active = FALSE;
2869
        live_rtsp->streaming_type = janus_streaming_type_live;
2870
        live_rtsp->streaming_source = janus_streaming_source_rtp;
2871
        janus_streaming_rtp_source *live_rtsp_source = g_malloc0(sizeof(janus_streaming_rtp_source));
2872
        live_rtsp_source->arc = NULL;
2873
        live_rtsp_source->vrc = NULL;
2874
        live_rtsp_source->audio_fd = audio_fd;
2875
        live_rtsp_source->video_fd = video_fd;
2876
        live_rtsp_source->curl = curl;
2877
        live_rtsp->source = live_rtsp_source;
2878
        live_rtsp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2879
        live_rtsp->codecs.audio_pt = doaudio ? apt : -1;
2880
        live_rtsp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2881
        live_rtsp->codecs.audio_fmtp = doaudio ? g_strdup(afmtp) : NULL;
2882
        live_rtsp->codecs.video_pt = dovideo ? vpt : -1;
2883
        live_rtsp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2884
        live_rtsp->codecs.video_fmtp = dovideo ? g_strdup(vfmtp) : NULL;
2885
        live_rtsp->listeners = NULL;
2886
        live_rtsp->destroyed = 0;
2887
        janus_mutex_init(&live_rtsp->mutex);
2888
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtsp->id), live_rtsp);
2889
        janus_mutex_unlock(&mountpoints_mutex);        
2890
        GError *error = NULL;
2891
        g_thread_try_new(live_rtsp->name, &janus_streaming_relay_thread, live_rtsp, &error);        
2892
        if(error != NULL) {
2893
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTSP thread...\n", error->code, error->message ? error->message : "??");
2894
                janus_streaming_mountpoint_free(live_rtsp);
2895
                return NULL;        
2896
        }                                
2897
        /* Send an RTSP PLAY */
2898
        g_free(data.buffer);
2899
        data.buffer = g_malloc0(1);
2900
        data.size = 0;                
2901
        sprintf(uri, "%s/", url);
2902
        curl_easy_setopt(curl, CURLOPT_RTSP_STREAM_URI, uri);
2903
        curl_easy_setopt(curl, CURLOPT_RANGE, "0.000-");
2904
        curl_easy_setopt(curl, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY);                
2905
        res = curl_easy_perform(curl);
2906
        if(res != CURLE_OK) {
2907
                JANUS_LOG(LOG_ERR, "Couldn't send PLAY request: %s\n", curl_easy_strerror(res));
2908
                janus_streaming_mountpoint_free(live_rtsp);
2909
                return NULL;
2910
        }                
2911
        JANUS_LOG(LOG_VERB, "PLAY answer:%s\n",data.buffer);        
2912
        g_free(data.buffer);
2913
        
2914
        return live_rtsp;
2915
}
2916
#else
2917
/* Helper to create an RTSP source */
2918
janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
2919
                uint64_t id, char *name, char *desc, char *url,
2920
                gboolean doaudio, gboolean dovideo) {
2921
        JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
2922
        return NULL;
2923
}
2924
#endif
2925

    
2926
/* FIXME Thread to send RTP packets from a file (on demand) */
2927
static void *janus_streaming_ondemand_thread(void *data) {
2928
        JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n");
2929
        janus_streaming_session *session = (janus_streaming_session *)data;
2930
        if(!session) {
2931
                JANUS_LOG(LOG_ERR, "Invalid session!\n");
2932
                g_thread_unref(g_thread_self());
2933
                return NULL;
2934
        }
2935
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
2936
        if(!mountpoint) {
2937
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2938
                g_thread_unref(g_thread_self());
2939
                return NULL;
2940
        }
2941
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2942
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2943
                g_thread_unref(g_thread_self());
2944
                return NULL;
2945
        }
2946
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
2947
                JANUS_LOG(LOG_ERR, "[%s] Not an on-demand file source mountpoint!\n", mountpoint->name);
2948
                g_thread_unref(g_thread_self());
2949
                return NULL;
2950
        }
2951
        janus_streaming_file_source *source = mountpoint->source;
2952
        if(source == NULL || source->filename == NULL) {
2953
                g_thread_unref(g_thread_self());
2954
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2955
                return NULL;
2956
        }
2957
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2958
        FILE *audio = fopen(source->filename, "rb");
2959
        if(!audio) {
2960
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2961
                g_thread_unref(g_thread_self());
2962
                return NULL;
2963
        }
2964
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2965
        /* Buffer */
2966
        char *buf = g_malloc0(1024);
2967
        if(buf == NULL) {
2968
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2969
                g_thread_unref(g_thread_self());
2970
                return NULL;
2971
        }
2972
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2973
        /* Set up RTP */
2974
        gint16 seq = 1;
2975
        gint32 ts = 0;
2976
        rtp_header *header = (rtp_header *)buf;
2977
        header->version = 2;
2978
        header->markerbit = 1;
2979
        header->type = mountpoint->codecs.audio_pt;
2980
        header->seq_number = htons(seq);
2981
        header->timestamp = htonl(ts);
2982
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2983
        /* Timer */
2984
        struct timeval now, before;
2985
        gettimeofday(&before, NULL);
2986
        now.tv_sec = before.tv_sec;
2987
        now.tv_usec = before.tv_usec;
2988
        time_t passed, d_s, d_us;
2989
        /* Loop */
2990
        gint read = 0;
2991
        janus_streaming_rtp_relay_packet packet;
2992
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed && !session->stopping && !session->destroyed) {
2993
                /* See if it's time to prepare a frame */
2994
                gettimeofday(&now, NULL);
2995
                d_s = now.tv_sec - before.tv_sec;
2996
                d_us = now.tv_usec - before.tv_usec;
2997
                if(d_us < 0) {
2998
                        d_us += 1000000;
2999
                        --d_s;
3000
                }
3001
                passed = d_s*1000000 + d_us;
3002
                if(passed < 18000) {        /* Let's wait about 18ms */
3003
                        usleep(1000);
3004
                        continue;
3005
                }
3006
                /* Update the reference time */
3007
                before.tv_usec += 20000;
3008
                if(before.tv_usec > 1000000) {
3009
                        before.tv_sec++;
3010
                        before.tv_usec -= 1000000;
3011
                }
3012
                /* If not started or paused, wait some more */
3013
                if(!session->started || session->paused || !mountpoint->enabled)
3014
                        continue;
3015
                /* Read frame from file... */
3016
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
3017
                if(feof(audio)) {
3018
                        /* FIXME We're doing this forever... should this be configurable? */
3019
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
3020
                        fseek(audio, 0, SEEK_SET);
3021
                        continue;
3022
                }
3023
                if(read < 0)
3024
                        break;
3025
                if(mountpoint->active == FALSE)
3026
                        mountpoint->active = TRUE;
3027
                //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3028
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3029
                //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3030
                /* Relay on all sessions */
3031
                packet.data = header;
3032
                packet.length = RTP_HEADER_SIZE + read;
3033
                packet.is_video = 0;
3034
                packet.is_keyframe = 0;
3035
                /* Backup the actual timestamp and sequence number */
3036
                packet.timestamp = ntohl(packet.data->timestamp);
3037
                packet.seq_number = ntohs(packet.data->seq_number);
3038
                /* Go! */
3039
                janus_streaming_relay_rtp_packet(session, &packet);
3040
                /* Update header */
3041
                seq++;
3042
                header->seq_number = htons(seq);
3043
                ts += 160;
3044
                header->timestamp = htonl(ts);
3045
                header->markerbit = 0;
3046
        }
3047
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name);
3048
        g_free(name);
3049
        g_free(buf);
3050
        fclose(audio);
3051
        g_thread_unref(g_thread_self());
3052
        return NULL;
3053
}
3054

    
3055
/* FIXME Thread to send RTP packets from a file (live) */
3056
static void *janus_streaming_filesource_thread(void *data) {
3057
        JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n");
3058
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3059
        if(!mountpoint) {
3060
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3061
                g_thread_unref(g_thread_self());
3062
                return NULL;
3063
        }
3064
        if(mountpoint->streaming_source != janus_streaming_source_file) {
3065
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
3066
                g_thread_unref(g_thread_self());
3067
                return NULL;
3068
        }
3069
        if(mountpoint->streaming_type != janus_streaming_type_live) {
3070
                JANUS_LOG(LOG_ERR, "[%s] Not a live file source mountpoint!\n", mountpoint->name);
3071
                g_thread_unref(g_thread_self());
3072
                return NULL;
3073
        }
3074
        janus_streaming_file_source *source = mountpoint->source;
3075
        if(source == NULL || source->filename == NULL) {
3076
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
3077
                g_thread_unref(g_thread_self());
3078
                return NULL;
3079
        }
3080
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
3081
        FILE *audio = fopen(source->filename, "rb");
3082
        if(!audio) {
3083
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
3084
                g_thread_unref(g_thread_self());
3085
                return NULL;
3086
        }
3087
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
3088
        /* Buffer */
3089
        char *buf = g_malloc0(1024);
3090
        if(buf == NULL) {
3091
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
3092
                g_thread_unref(g_thread_self());
3093
                return NULL;
3094
        }
3095
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3096
        /* Set up RTP */
3097
        gint16 seq = 1;
3098
        gint32 ts = 0;
3099
        rtp_header *header = (rtp_header *)buf;
3100
        header->version = 2;
3101
        header->markerbit = 1;
3102
        header->type = mountpoint->codecs.audio_pt;
3103
        header->seq_number = htons(seq);
3104
        header->timestamp = htonl(ts);
3105
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
3106
        /* Timer */
3107
        struct timeval now, before;
3108
        gettimeofday(&before, NULL);
3109
        now.tv_sec = before.tv_sec;
3110
        now.tv_usec = before.tv_usec;
3111
        time_t passed, d_s, d_us;
3112
        /* Loop */
3113
        gint read = 0;
3114
        janus_streaming_rtp_relay_packet packet;
3115
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3116
                /* See if it's time to prepare a frame */
3117
                gettimeofday(&now, NULL);
3118
                d_s = now.tv_sec - before.tv_sec;
3119
                d_us = now.tv_usec - before.tv_usec;
3120
                if(d_us < 0) {
3121
                        d_us += 1000000;
3122
                        --d_s;
3123
                }
3124
                passed = d_s*1000000 + d_us;
3125
                if(passed < 18000) {        /* Let's wait about 18ms */
3126
                        usleep(1000);
3127
                        continue;
3128
                }
3129
                /* Update the reference time */
3130
                before.tv_usec += 20000;
3131
                if(before.tv_usec > 1000000) {
3132
                        before.tv_sec++;
3133
                        before.tv_usec -= 1000000;
3134
                }
3135
                /* If paused, wait some more */
3136
                if(!mountpoint->enabled)
3137
                        continue;
3138
                /* Read frame from file... */
3139
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
3140
                if(feof(audio)) {
3141
                        /* FIXME We're doing this forever... should this be configurable? */
3142
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
3143
                        fseek(audio, 0, SEEK_SET);
3144
                        continue;
3145
                }
3146
                if(read < 0)
3147
                        break;
3148
                if(mountpoint->active == FALSE)
3149
                        mountpoint->active = TRUE;
3150
                // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
3151
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
3152
                // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
3153
                /* Relay on all sessions */
3154
                packet.data = header;
3155
                packet.length = RTP_HEADER_SIZE + read;
3156
                packet.is_video = 0;
3157
                packet.is_keyframe = 0;
3158
                /* Backup the actual timestamp and sequence number */
3159
                packet.timestamp = ntohl(packet.data->timestamp);
3160
                packet.seq_number = ntohs(packet.data->seq_number);
3161
                /* Go! */
3162
                janus_mutex_lock_nodebug(&mountpoint->mutex);
3163
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3164
                janus_mutex_unlock_nodebug(&mountpoint->mutex);
3165
                /* Update header */
3166
                seq++;
3167
                header->seq_number = htons(seq);
3168
                ts += 160;
3169
                header->timestamp = htonl(ts);
3170
                header->markerbit = 0;
3171
        }
3172
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name);
3173
        g_free(name);
3174
        g_free(buf);
3175
        fclose(audio);
3176
        g_thread_unref(g_thread_self());
3177
        return NULL;
3178
}
3179
                
3180
/* FIXME Test thread to relay RTP frames coming from gstreamer/ffmpeg/others */
3181
static void *janus_streaming_relay_thread(void *data) {
3182
        JANUS_LOG(LOG_VERB, "Starting streaming relay thread\n");
3183
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
3184
        if(!mountpoint) {
3185
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
3186
                g_thread_unref(g_thread_self());
3187
                return NULL;
3188
        }
3189
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
3190
                JANUS_LOG(LOG_ERR, "[%s] Not an RTP source mountpoint!\n", mountpoint->name);
3191
                g_thread_unref(g_thread_self());
3192
                return NULL;
3193
        }
3194
        janus_streaming_rtp_source *source = mountpoint->source;
3195
        if(source == NULL) {
3196
                JANUS_LOG(LOG_ERR, "[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
3197
                g_thread_unref(g_thread_self());
3198
                return NULL;
3199
        }
3200
        int audio_fd = source->audio_fd;
3201
        int video_fd = source->video_fd;
3202
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
3203
        /* Needed to fix seq and ts */
3204
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
3205
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
3206
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
3207
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
3208
        /* File descriptors */
3209
        socklen_t addrlen;
3210
        struct sockaddr_in remote;
3211
        int resfd = 0, bytes = 0;
3212
        struct pollfd fds[2];
3213
        char buffer[1500];
3214
        memset(buffer, 0, 1500);
3215
        /* Loop */
3216
        int num = 0;
3217
        janus_streaming_rtp_relay_packet packet;
3218
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
3219
                /* Prepare poll */
3220
                num = 0;
3221
                if(audio_fd != -1) {
3222
                        fds[num].fd = audio_fd;
3223
                        fds[num].events = POLLIN;
3224
                        fds[num].revents = 0;
3225
                        num++;
3226
                }
3227
                if(video_fd != -1) {
3228
                        fds[num].fd = video_fd;
3229
                        fds[num].events = POLLIN;
3230
                        fds[num].revents = 0;
3231
                        num++;
3232
                }
3233
                /* Wait for some data */
3234
                resfd = poll(fds, num, 1000);
3235
                if(resfd < 0) {
3236
                        JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", mountpoint->name, errno, strerror(errno));
3237
                        mountpoint->enabled = FALSE;
3238
                        break;
3239
                } else if(resfd == 0) {
3240
                        /* No data, keep going */
3241
                        continue;
3242
                }
3243
                int i = 0;
3244
                for(i=0; i<num; i++) {
3245
                        if(fds[i].revents & (POLLERR | POLLHUP)) {
3246
                                /* Socket error? */
3247
                                JANUS_LOG(LOG_ERR, "[%s] Error polling: %s... %d (%s)\n", mountpoint->name,
3248
                                        fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, strerror(errno));
3249
                                mountpoint->enabled = FALSE;
3250
                                break;
3251
                        } else if(fds[i].revents & POLLIN) {
3252
                                /* Got an RTP packet */
3253
                                if(audio_fd != -1 && fds[i].fd == audio_fd) {
3254
                                        /* Got something audio (RTP) */
3255
                                        if(mountpoint->active == FALSE)
3256
                                                mountpoint->active = TRUE;
3257
                                        source->last_received_audio = janus_get_monotonic_time();
3258
                                        addrlen = sizeof(remote);
3259
                                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3260
                                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the audio channel...\n", bytes);
3261
                                        /* If paused, ignore this packet */
3262
                                        if(!mountpoint->enabled)
3263
                                                continue;
3264
                                        rtp_header *rtp = (rtp_header *)buffer;
3265
                                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3266
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3267
                                        /* Relay on all sessions */
3268
                                        packet.data = rtp;
3269
                                        packet.length = bytes;
3270
                                        packet.is_video = 0;
3271
                                        packet.is_keyframe = 0;
3272
                                        /* Do we have a new stream? */
3273
                                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
3274
                                                a_last_ssrc = ntohl(packet.data->ssrc);
3275
                                                JANUS_LOG(LOG_INFO, "[%s] New audio stream! (ssrc=%u)\n", name, a_last_ssrc);
3276
                                                a_base_ts_prev = a_last_ts;
3277
                                                a_base_ts = ntohl(packet.data->timestamp);
3278
                                                a_base_seq_prev = a_last_seq;
3279
                                                a_base_seq = ntohs(packet.data->seq_number);
3280
                                        }
3281
                                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
3282
                                        packet.data->timestamp = htonl(a_last_ts);
3283
                                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
3284
                                        packet.data->seq_number = htons(a_last_seq);
3285
                                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3286
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3287
                                        packet.data->type = mountpoint->codecs.audio_pt;
3288
                                        /* Is there a recorder? */
3289
                                        janus_recorder_save_frame(source->arc, buffer, bytes);
3290
                                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3291
                                        packet.timestamp = ntohl(packet.data->timestamp);
3292
                                        packet.seq_number = ntohs(packet.data->seq_number);
3293
                                        /* Go! */
3294
                                        janus_mutex_lock(&mountpoint->mutex);
3295
                                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3296
                                        janus_mutex_unlock(&mountpoint->mutex);
3297
                                        continue;
3298
                                } else if(video_fd != -1 && fds[i].fd == video_fd) {
3299
                                        /* Got something video (RTP) */
3300
                                        if(mountpoint->active == FALSE)
3301
                                                mountpoint->active = TRUE;
3302
                                        source->last_received_video = janus_get_monotonic_time();
3303
                                        addrlen = sizeof(remote);
3304
                                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
3305
                                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the video channel...\n", bytes);
3306
                                        rtp_header *rtp = (rtp_header *)buffer;
3307
                                        /* First of all, let's check if this is (part of) a keyframe that we may need to save it for future reference */
3308
                                        if(source->keyframe.enabled) {
3309
                                                if(source->keyframe.temp_ts > 0 && ntohl(rtp->timestamp) != source->keyframe.temp_ts) {
3310
                                                        /* We received the last part of the keyframe, get rid of the old one and use this from now on */
3311
                                                        JANUS_LOG(LOG_HUGE, "[%s] ... ... last part of keyframe received! ts=%"SCNu32", %d packets\n",
3312
                                                                mountpoint->name, source->keyframe.temp_ts, g_list_length(source->keyframe.temp_keyframe));
3313
                                                        source->keyframe.temp_ts = 0;
3314
                                                        janus_mutex_lock(&source->keyframe.mutex);
3315
                                                        GList *temp = NULL;
3316
                                                        while(source->keyframe.latest_keyframe) {
3317
                                                                temp = g_list_first(source->keyframe.latest_keyframe);
3318
                                                                source->keyframe.latest_keyframe = g_list_remove_link(source->keyframe.latest_keyframe, temp);
3319
                                                                janus_streaming_rtp_relay_packet *pkt = (janus_streaming_rtp_relay_packet *)temp->data;
3320
                                                                g_free(pkt->data);
3321
                                                                g_free(pkt);
3322
                                                                g_list_free(temp);
3323
                                                        }
3324
                                                        source->keyframe.latest_keyframe = source->keyframe.temp_keyframe;
3325
                                                        source->keyframe.temp_keyframe = NULL;
3326
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3327
                                                } else if(ntohl(rtp->timestamp) == source->keyframe.temp_ts) {
3328
                                                        /* Part of the keyframe we're currently saving, store */
3329
                                                        janus_mutex_lock(&source->keyframe.mutex);
3330
                                                        JANUS_LOG(LOG_HUGE, "[%s] ... other part of keyframe received! ts=%"SCNu32"\n", mountpoint->name, source->keyframe.temp_ts);
3331
                                                        janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
3332
                                                        pkt->data = g_malloc0(bytes);
3333
                                                        memcpy(pkt->data, buffer, bytes);
3334
                                                        pkt->data->ssrc = htons(1);
3335
                                                        pkt->data->type = mountpoint->codecs.video_pt;
3336
                                                        pkt->is_video = 1;
3337
                                                        pkt->is_keyframe = 1;
3338
                                                        pkt->length = bytes;
3339
                                                        pkt->timestamp = source->keyframe.temp_ts;
3340
                                                        pkt->seq_number = ntohs(rtp->seq_number);
3341
                                                        source->keyframe.temp_keyframe = g_list_append(source->keyframe.temp_keyframe, pkt);
3342
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3343
                                                } else if(janus_streaming_is_keyframe(mountpoint->codecs.video_codec, buffer, bytes)) {
3344
                                                        /* New keyframe, start saving it */
3345
                                                        source->keyframe.temp_ts = ntohl(rtp->timestamp);
3346
                                                        JANUS_LOG(LOG_HUGE, "[%s] New keyframe received! ts=%"SCNu32"\n", mountpoint->name, source->keyframe.temp_ts);
3347
                                                        janus_mutex_lock(&source->keyframe.mutex);
3348
                                                        janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
3349
                                                        pkt->data = g_malloc0(bytes);
3350
                                                        memcpy(pkt->data, buffer, bytes);
3351
                                                        pkt->data->ssrc = htons(1);
3352
                                                        pkt->data->type = mountpoint->codecs.video_pt;
3353
                                                        pkt->is_video = 1;
3354
                                                        pkt->is_keyframe = 1;
3355
                                                        pkt->length = bytes;
3356
                                                        pkt->timestamp = source->keyframe.temp_ts;
3357
                                                        pkt->seq_number = ntohs(rtp->seq_number);
3358
                                                        source->keyframe.temp_keyframe = g_list_append(source->keyframe.temp_keyframe, pkt);
3359
                                                        janus_mutex_unlock(&source->keyframe.mutex);
3360
                                                }
3361
                                        }
3362
                                        /* If paused, ignore this packet */
3363
                                        if(!mountpoint->enabled)
3364
                                                continue;
3365
                                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3366
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3367
                                        /* Relay on all sessions */
3368
                                        packet.data = rtp;
3369
                                        packet.length = bytes;
3370
                                        packet.is_video = 1;
3371
                                        packet.is_keyframe = 0;
3372
                                        /* Do we have a new stream? */
3373
                                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
3374
                                                v_last_ssrc = ntohl(packet.data->ssrc);
3375
                                                JANUS_LOG(LOG_INFO, "[%s] New video stream! (ssrc=%u)\n", name, v_last_ssrc);
3376
                                                v_base_ts_prev = v_last_ts;
3377
                                                v_base_ts = ntohl(packet.data->timestamp);
3378
                                                v_base_seq_prev = v_last_seq;
3379
                                                v_base_seq = ntohs(packet.data->seq_number);
3380
                                        }
3381
                                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
3382
                                        packet.data->timestamp = htonl(v_last_ts);
3383
                                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
3384
                                        packet.data->seq_number = htons(v_last_seq);
3385
                                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
3386
                                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
3387
                                        packet.data->type = mountpoint->codecs.video_pt;
3388
                                        /* Is there a recorder? */
3389
                                        janus_recorder_save_frame(source->vrc, buffer, bytes);
3390
                                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
3391
                                        packet.timestamp = ntohl(packet.data->timestamp);
3392
                                        packet.seq_number = ntohs(packet.data->seq_number);
3393
                                        /* Go! */
3394
                                        janus_mutex_lock(&mountpoint->mutex);
3395
                                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
3396
                                        janus_mutex_unlock(&mountpoint->mutex);
3397
                                        continue;
3398
                                }
3399
                        }
3400
                }
3401
        }
3402

    
3403
        /* Notify users this mountpoint is done */
3404
        janus_mutex_lock(&mountpoint->mutex);
3405
        GList *viewer = g_list_first(mountpoint->listeners);
3406
        /* Prepare JSON event */
3407
        json_t *event = json_object();
3408
        json_object_set_new(event, "streaming", json_string("event"));
3409
        json_t *result = json_object();
3410
        json_object_set_new(result, "status", json_string("stopped"));
3411
        json_object_set_new(event, "result", result);
3412
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
3413
        json_decref(event);
3414
        while(viewer) {
3415
                janus_streaming_session *session = (janus_streaming_session *)viewer->data;
3416
                if(session != NULL) {
3417
                        session->stopping = TRUE;
3418
                        session->started = FALSE;
3419
                        session->paused = FALSE;
3420
                        session->mountpoint = NULL;
3421
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
3422
                        gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
3423
                        gateway->close_pc(session->handle);
3424
                }
3425
                mountpoint->listeners = g_list_remove_all(mountpoint->listeners, session);
3426
                viewer = g_list_first(mountpoint->listeners);
3427
        }
3428
        g_free(event_text);
3429
        janus_mutex_unlock(&mountpoint->mutex);
3430

    
3431
        JANUS_LOG(LOG_VERB, "[%s] Leaving streaming relay thread\n", name);
3432
        g_free(name);
3433
        g_thread_unref(g_thread_self());
3434
        return NULL;
3435
}
3436

    
3437
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
3438
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
3439
        if(!packet || !packet->data || packet->length < 1) {
3440
                JANUS_LOG(LOG_ERR, "Invalid packet...\n");
3441
                return;
3442
        }
3443
        janus_streaming_session *session = (janus_streaming_session *)data;
3444
        if(!session || !session->handle) {
3445
                //~ JANUS_LOG(LOG_ERR, "Invalid session...\n");
3446
                return;
3447
        }
3448
        if(!packet->is_keyframe && (!session->started || session->paused)) {
3449
                //~ JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n");
3450
                return;
3451
        }
3452

    
3453
        /* Make sure there hasn't been a publisher switch by checking the SSRC */
3454
        if(packet->is_video) {
3455
                if(ntohl(packet->data->ssrc) != session->context.v_last_ssrc) {
3456
                        session->context.v_last_ssrc = ntohl(packet->data->ssrc);
3457
                        session->context.v_base_ts_prev = session->context.v_last_ts;
3458
                        session->context.v_base_ts = packet->timestamp;
3459
                        session->context.v_base_seq_prev = session->context.v_last_seq;
3460
                        session->context.v_base_seq = packet->seq_number;
3461
                }
3462
                /* Compute a coherent timestamp and sequence number */
3463
                session->context.v_last_ts = (packet->timestamp-session->context.v_base_ts)
3464
                        + session->context.v_base_ts_prev+4500;        /* FIXME When switching, we assume 15fps */
3465
                session->context.v_last_seq = (packet->seq_number-session->context.v_base_seq)+session->context.v_base_seq_prev+1;
3466
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3467
                packet->data->timestamp = htonl(session->context.v_last_ts);
3468
                packet->data->seq_number = htons(session->context.v_last_seq);
3469
                if(gateway != NULL)
3470
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3471
                /* Restore the timestamp and sequence number to what the publisher set them to */
3472
                packet->data->timestamp = htonl(packet->timestamp);
3473
                packet->data->seq_number = htons(packet->seq_number);
3474
        } else {
3475
                if(ntohl(packet->data->ssrc) != session->context.a_last_ssrc) {
3476
                        session->context.a_last_ssrc = ntohl(packet->data->ssrc);
3477
                        session->context.a_base_ts_prev = session->context.a_last_ts;
3478
                        session->context.a_base_ts = packet->timestamp;
3479
                        session->context.a_base_seq_prev = session->context.a_last_seq;
3480
                        session->context.a_base_seq = packet->seq_number;
3481
                }
3482
                /* Compute a coherent timestamp and sequence number */
3483
                session->context.a_last_ts = (packet->timestamp-session->context.a_base_ts)
3484
                        + session->context.a_base_ts_prev+960;        /* FIXME When switching, we assume Opus and so a 960 ts step */
3485
                session->context.a_last_seq = (packet->seq_number-session->context.a_base_seq)+session->context.a_base_seq_prev+1;
3486
                /* Update the timestamp and sequence number in the RTP packet, and send it */
3487
                packet->data->timestamp = htonl(session->context.a_last_ts);
3488
                packet->data->seq_number = htons(session->context.a_last_seq);
3489
                if(gateway != NULL)
3490
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
3491
                /* Restore the timestamp and sequence number to what the publisher set them to */
3492
                packet->data->timestamp = htonl(packet->timestamp);
3493
                packet->data->seq_number = htons(packet->seq_number);
3494
        }
3495

    
3496
        return;
3497
}
3498

    
3499
/* Helpers to check if frame is a key frame (see post processor code) */
3500
#if defined(__ppc__) || defined(__ppc64__)
3501
        # define swap2(d)  \
3502
        ((d&0x000000ff)<<8) |  \
3503
        ((d&0x0000ff00)>>8)
3504
#else
3505
        # define swap2(d) d
3506
#endif
3507

    
3508
static gboolean janus_streaming_is_keyframe(gint codec, char* buffer, int len) {
3509
        if(codec == JANUS_STREAMING_VP8) {
3510
                /* VP8 packet */
3511
                if(!buffer || len < 28)
3512
                        return FALSE;
3513
                /* Parse RTP header first */
3514
                rtp_header *header = (rtp_header *)buffer;
3515
                guint32 timestamp = ntohl(header->timestamp);
3516
                guint16 seq = ntohs(header->seq_number);
3517
                JANUS_LOG(LOG_HUGE, "Checking if VP8 packet (size=%d, seq=%"SCNu16", ts=%"SCNu32") is a key frame...\n",
3518
                        len, seq, timestamp);
3519
                uint16_t skip = 0;
3520
                if(header->extension) {
3521
                        janus_rtp_header_extension *ext = (janus_rtp_header_extension *)(buffer+12);
3522
                        JANUS_LOG(LOG_HUGE, "  -- RTP extension found (type=%"SCNu16", length=%"SCNu16")\n",
3523
                                ntohs(ext->type), ntohs(ext->length));
3524
                        skip = 4 + ntohs(ext->length)*4;
3525
                }
3526
                buffer += 12+skip;
3527
                /* Parse VP8 header now */
3528
                uint8_t vp8pd = *buffer;
3529
                uint8_t xbit = (vp8pd & 0x80);
3530
                uint8_t sbit = (vp8pd & 0x10);
3531
                if(xbit) {
3532
                        JANUS_LOG(LOG_HUGE, "  -- X bit is set!\n");
3533
                        /* Read the Extended control bits octet */
3534
                        buffer++;
3535
                        vp8pd = *buffer;
3536
                        uint8_t ibit = (vp8pd & 0x80);
3537
                        uint8_t lbit = (vp8pd & 0x40);
3538
                        uint8_t tbit = (vp8pd & 0x20);
3539
                        uint8_t kbit = (vp8pd & 0x10);
3540
                        if(ibit) {
3541
                                JANUS_LOG(LOG_HUGE, "  -- I bit is set!\n");
3542
                                /* Read the PictureID octet */
3543
                                buffer++;
3544
                                vp8pd = *buffer;
3545
                                uint16_t picid = vp8pd, wholepicid = picid;
3546
                                uint8_t mbit = (vp8pd & 0x80);
3547
                                if(mbit) {
3548
                                        JANUS_LOG(LOG_HUGE, "  -- M bit is set!\n");
3549
                                        memcpy(&picid, buffer, sizeof(uint16_t));
3550
                                        wholepicid = ntohs(picid);
3551
                                        picid = (wholepicid & 0x7FFF);
3552
                                        buffer++;
3553
                                }
3554
                                JANUS_LOG(LOG_HUGE, "  -- -- PictureID: %"SCNu16"\n", picid);
3555
                        }
3556
                        if(lbit) {
3557
                                JANUS_LOG(LOG_HUGE, "  -- L bit is set!\n");
3558
                                /* Read the TL0PICIDX octet */
3559
                                buffer++;
3560
                                vp8pd = *buffer;
3561
                        }
3562
                        if(tbit || kbit) {
3563
                                JANUS_LOG(LOG_HUGE, "  -- T/K bit is set!\n");
3564
                                /* Read the TID/KEYIDX octet */
3565
                                buffer++;
3566
                                vp8pd = *buffer;
3567
                        }
3568
                        buffer++;        /* Now we're in the payload */
3569
                        if(sbit) {
3570
                                JANUS_LOG(LOG_HUGE, "  -- S bit is set!\n");
3571
                                unsigned long int vp8ph = 0;
3572
                                memcpy(&vp8ph, buffer, 4);
3573
                                vp8ph = ntohl(vp8ph);
3574
                                uint8_t pbit = ((vp8ph & 0x01000000) >> 24);
3575
                                if(!pbit) {
3576
                                        JANUS_LOG(LOG_HUGE, "  -- P bit is NOT set!\n");
3577
                                        /* It is a key frame! Get resolution for debugging */
3578
                                        unsigned char *c = (unsigned char *)buffer+3;
3579
                                        /* vet via sync code */
3580
                                        if(c[0]!=0x9d||c[1]!=0x01||c[2]!=0x2a) {
3581
                                                JANUS_LOG(LOG_WARN, "First 3-bytes after header not what they're supposed to be?\n");
3582
                                        } else {
3583
                                                int vp8w = swap2(*(unsigned short*)(c+3))&0x3fff;
3584
                                                int vp8ws = swap2(*(unsigned short*)(c+3))>>14;
3585
                                                int vp8h = swap2(*(unsigned short*)(c+5))&0x3fff;
3586
                                                int vp8hs = swap2(*(unsigned short*)(c+5))>>14;
3587
                                                JANUS_LOG(LOG_HUGE, "Got a VP8 key frame: %dx%d (scale=%dx%d)\n", vp8w, vp8h, vp8ws, vp8hs);
3588
                                                return TRUE;
3589
                                        }
3590
                                }
3591
                        }
3592
                }
3593
                /* If we got here it's not a key frame */
3594
                return FALSE;
3595
        } else if(codec == JANUS_STREAMING_H264) {
3596
                /* Parse RTP header first */
3597
                rtp_header *header = (rtp_header *)buffer;
3598
                guint32 timestamp = ntohl(header->timestamp);
3599
                guint16 seq = ntohs(header->seq_number);
3600
                JANUS_LOG(LOG_HUGE, "Checking if H264 packet (size=%d, seq=%"SCNu16", ts=%"SCNu32") is a key frame...\n",
3601
                        len, seq, timestamp);
3602
                uint16_t skip = 0;
3603
                if(header->extension) {
3604
                        janus_rtp_header_extension *ext = (janus_rtp_header_extension *)(buffer+12);
3605
                        JANUS_LOG(LOG_HUGE, "  -- RTP extension found (type=%"SCNu16", length=%"SCNu16")\n",
3606
                                ntohs(ext->type), ntohs(ext->length));
3607
                        skip = 4 + ntohs(ext->length)*4;
3608
                }
3609
                buffer += 12+skip;
3610
                /* Parse H264 header now */
3611
                uint8_t fragment = *buffer & 0x1F;
3612
                uint8_t nal = *(buffer+1) & 0x1F;
3613
                uint8_t start_bit = *(buffer+1) & 0x80;
3614
                JANUS_LOG(LOG_HUGE, "Fragment=%d, NAL=%d, Start=%d\n", fragment, nal, start_bit);
3615
                if(fragment == 5 ||
3616
                                ((fragment == 28 || fragment == 29) && nal == 5 && start_bit == 128)) {
3617
                        JANUS_LOG(LOG_HUGE, "Got an H264 key frame\n");
3618
                        return TRUE;
3619
                }
3620
                /* If we got here it's not a key frame */
3621
                return FALSE;
3622
        } else {
3623
                /* FIXME Not a clue */
3624
                return FALSE;
3625
        }
3626
}