Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ febef1ea

History | View | Annotate | Download (109 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
id = <unique numeric ID>
50
description = This is my awesome stream
51
is_private = yes|no (private streams don't appear when you do a 'list' request)
52
filename = path to the local file to stream (only for live/ondemand)
53
secret = <optional password needed for manipulating (e.g., destroying
54
                or enabling/disabling) the stream>
55
audio = yes|no (do/don't stream audio)
56
video = yes|no (do/don't stream video)
57
   The following options are only valid for the 'rtp' type:
58
audioport = local port for receiving audio frames
59
audiopt = <audio RTP payload type> (e.g., 111)
60
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
61
audiofmtp = Codec specific parameters, if any
62
videoport = local port for receiving video frames (only for rtp)
63
videopt = <video RTP payload type> (e.g., 100)
64
videortpmap = RTP map of the video codec (e.g., VP8/90000)
65
videofmtp = Codec specific parameters, if any
66
\endverbatim
67
 *
68
 * \ingroup plugins
69
 * \ref plugins
70
 */
71

    
72
#include "plugin.h"
73

    
74
#include <jansson.h>
75
#include <sys/time.h>
76

    
77
#include "../debug.h"
78
#include "../apierror.h"
79
#include "../config.h"
80
#include "../mutex.h"
81
#include "../rtp.h"
82
#include "../rtcp.h"
83
#include "../record.h"
84
#include "../utils.h"
85

    
86

    
87
/* Plugin information */
88
#define JANUS_STREAMING_VERSION                        4
89
#define JANUS_STREAMING_VERSION_STRING        "0.0.4"
90
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
91
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
92
#define JANUS_STREAMING_AUTHOR                        "Meetecho s.r.l."
93
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
94

    
95
/* Plugin methods */
96
janus_plugin *create(void);
97
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
98
void janus_streaming_destroy(void);
99
int janus_streaming_get_api_compatibility(void);
100
int janus_streaming_get_version(void);
101
const char *janus_streaming_get_version_string(void);
102
const char *janus_streaming_get_description(void);
103
const char *janus_streaming_get_name(void);
104
const char *janus_streaming_get_author(void);
105
const char *janus_streaming_get_package(void);
106
void janus_streaming_create_session(janus_plugin_session *handle, int *error);
107
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
108
void janus_streaming_setup_media(janus_plugin_session *handle);
109
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
110
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
111
void janus_streaming_hangup_media(janus_plugin_session *handle);
112
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
113
char *janus_streaming_query_session(janus_plugin_session *handle);
114

    
115
/* Plugin setup */
116
static janus_plugin janus_streaming_plugin =
117
        {
118
                .init = janus_streaming_init,
119
                .destroy = janus_streaming_destroy,
120

    
121
                .get_api_compatibility = janus_streaming_get_api_compatibility,
122
                .get_version = janus_streaming_get_version,
123
                .get_version_string = janus_streaming_get_version_string,
124
                .get_description = janus_streaming_get_description,
125
                .get_name = janus_streaming_get_name,
126
                .get_author = janus_streaming_get_author,
127
                .get_package = janus_streaming_get_package,
128
                
129
                .create_session = janus_streaming_create_session,
130
                .handle_message = janus_streaming_handle_message,
131
                .setup_media = janus_streaming_setup_media,
132
                .incoming_rtp = janus_streaming_incoming_rtp,
133
                .incoming_rtcp = janus_streaming_incoming_rtcp,
134
                .hangup_media = janus_streaming_hangup_media,
135
                .destroy_session = janus_streaming_destroy_session,
136
                .query_session = janus_streaming_query_session,
137
        }; 
138

    
139
/* Plugin creator */
140
janus_plugin *create(void) {
141
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_STREAMING_NAME);
142
        return &janus_streaming_plugin;
143
}
144

    
145

    
146
/* Useful stuff */
147
static gint initialized = 0, stopping = 0;
148
static janus_callbacks *gateway = NULL;
149
static GThread *handler_thread;
150
static GThread *watchdog;
151
static void *janus_streaming_handler(void *data);
152
static void *janus_streaming_ondemand_thread(void *data);
153
static void *janus_streaming_filesource_thread(void *data);
154
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
155
static void *janus_streaming_relay_thread(void *data);
156

    
157
typedef enum janus_streaming_type {
158
        janus_streaming_type_none = 0,
159
        janus_streaming_type_live,
160
        janus_streaming_type_on_demand,
161
} janus_streaming_type;
162

    
163
typedef enum janus_streaming_source {
164
        janus_streaming_source_none = 0,
165
        janus_streaming_source_file,
166
        janus_streaming_source_rtp,
167
} janus_streaming_source;
168

    
169
typedef struct janus_streaming_rtp_source {
170
        gint audio_port;
171
        gint video_port;
172
        janus_recorder *arc;        /* The Janus recorder instance for this streams's audio, if enabled */
173
        janus_recorder *vrc;        /* The Janus recorder instance for this streams's video, if enabled */
174
} janus_streaming_rtp_source;
175

    
176
typedef struct janus_streaming_file_source {
177
        char *filename;
178
} janus_streaming_file_source;
179

    
180
typedef struct janus_streaming_codecs {
181
        gint audio_pt;
182
        char *audio_rtpmap;
183
        char *audio_fmtp;
184
        gint video_pt;
185
        char *video_rtpmap;
186
        char *video_fmtp;
187
} janus_streaming_codecs;
188

    
189
typedef struct janus_streaming_mountpoint {
190
        gint64 id;
191
        char *name;
192
        char *description;
193
        gboolean is_private;
194
        char *secret;
195
        gboolean enabled;
196
        gboolean active;
197
        janus_streaming_type streaming_type;
198
        janus_streaming_source streaming_source;
199
        void *source;        /* Can differ according to the source type */
200
        GDestroyNotify source_destroy;
201
        janus_streaming_codecs codecs;
202
        GList/*<unowned janus_streaming_session>*/ *listeners;
203
        gint64 destroyed;
204
        janus_mutex mutex;
205
} janus_streaming_mountpoint;
206
GHashTable *mountpoints;
207
janus_mutex mountpoints_mutex;
208

    
209
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp);
210

    
211
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
212
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
213
                uint64_t id, char *name, char *desc,
214
                gboolean doaudio, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
215
                gboolean dovideo, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp);
216
/* Helper to create a file/ondemand live source */
217
janus_streaming_mountpoint *janus_streaming_create_file_source(
218
                uint64_t id, char *name, char *desc, char *filename,
219
                gboolean live, gboolean doaudio, gboolean dovideo);
220

    
221

    
222
typedef struct janus_streaming_message {
223
        janus_plugin_session *handle;
224
        char *transaction;
225
        json_t *message;
226
        char *sdp_type;
227
        char *sdp;
228
} janus_streaming_message;
229
static GAsyncQueue *messages = NULL;
230

    
231
void janus_streaming_message_free(janus_streaming_message *msg);
232
void janus_streaming_message_free(janus_streaming_message *msg) {
233
        if(!msg)
234
                return;
235

    
236
        msg->handle = NULL;
237

    
238
        g_free(msg->transaction);
239
        msg->transaction = NULL;
240
        if(msg->message)
241
                json_decref(msg->message);
242
        msg->message = NULL;
243
        g_free(msg->sdp_type);
244
        msg->sdp_type = NULL;
245
        g_free(msg->sdp);
246
        msg->sdp = NULL;
247

    
248
        g_free(msg);
249
}
250

    
251

    
252
typedef struct janus_streaming_context {
253
        /* Needed to fix seq and ts in case of stream switching */
254
        uint32_t a_last_ssrc, a_last_ts, a_base_ts, a_base_ts_prev,
255
                        v_last_ssrc, v_last_ts, v_base_ts, v_base_ts_prev;
256
        uint16_t a_last_seq, a_base_seq, a_base_seq_prev,
257
                        v_last_seq, v_base_seq, v_base_seq_prev;
258
} janus_streaming_context;
259

    
260
typedef struct janus_streaming_session {
261
        janus_plugin_session *handle;
262
        janus_streaming_mountpoint *mountpoint;
263
        gboolean started;
264
        gboolean paused;
265
        janus_streaming_context context;
266
        gboolean stopping;
267
        guint64 destroyed;        /* Time at which this session was marked as destroyed */
268
} janus_streaming_session;
269
static GHashTable *sessions;
270
static GList *old_sessions;
271
static janus_mutex sessions_mutex;
272

    
273
/* Packets we get from gstreamer and relay */
274
typedef struct janus_streaming_rtp_relay_packet {
275
        rtp_header *data;
276
        gint length;
277
        gint is_video;
278
        uint32_t timestamp;
279
        uint16_t seq_number;
280
} janus_streaming_rtp_relay_packet;
281

    
282

    
283
/* Error codes */
284
#define JANUS_STREAMING_ERROR_NO_MESSAGE                        450
285
#define JANUS_STREAMING_ERROR_INVALID_JSON                        451
286
#define JANUS_STREAMING_ERROR_INVALID_REQUEST                452
287
#define JANUS_STREAMING_ERROR_MISSING_ELEMENT                453
288
#define JANUS_STREAMING_ERROR_INVALID_ELEMENT                454
289
#define JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT        455
290
#define JANUS_STREAMING_ERROR_CANT_CREATE                        456
291
#define JANUS_STREAMING_ERROR_UNAUTHORIZED                        457
292
#define JANUS_STREAMING_ERROR_CANT_SWITCH                        458
293
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR                        470
294

    
295

    
296
/* Streaming watchdog/garbage collector (sort of) */
297
void *janus_streaming_watchdog(void *data);
298
void *janus_streaming_watchdog(void *data) {
299
        JANUS_LOG(LOG_INFO, "Streaming watchdog started\n");
300
        gint64 now = 0;
301
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
302
                janus_mutex_lock(&sessions_mutex);
303
                /* Iterate on all the sessions */
304
                now = janus_get_monotonic_time();
305
                if(old_sessions != NULL) {
306
                        GList *sl = old_sessions;
307
                        JANUS_LOG(LOG_VERB, "Checking %d old sessions\n", g_list_length(old_sessions));
308
                        while(sl) {
309
                                janus_streaming_session *session = (janus_streaming_session *)sl->data;
310
                                if(!session) {
311
                                        sl = sl->next;
312
                                        continue;
313
                                }
314
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
315
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
316
                                        GList *rm = sl->next;
317
                                        old_sessions = g_list_delete_link(old_sessions, sl);
318
                                        sl = rm;
319
                                        session->handle = NULL;
320
                                        g_free(session);
321
                                        session = NULL;
322
                                        continue;
323
                                }
324
                                sl = sl->next;
325
                        }
326
                }
327
                janus_mutex_unlock(&sessions_mutex);
328
                g_usleep(2000000);
329
        }
330
        JANUS_LOG(LOG_INFO, "Streaming watchdog stopped\n");
331
        return NULL;
332
}
333

    
334

    
335
/* Plugin implementation */
336
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
337
        if(g_atomic_int_get(&stopping)) {
338
                /* Still stopping from before */
339
                return -1;
340
        }
341
        if(callback == NULL || config_path == NULL) {
342
                /* Invalid arguments */
343
                return -1;
344
        }
345

    
346
        /* Read configuration */
347
        char filename[255];
348
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
349
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
350
        janus_config *config = janus_config_parse(filename);
351
        if(config != NULL)
352
                janus_config_print(config);
353
        
354
        mountpoints = g_hash_table_new_full(NULL, NULL, NULL,
355
                                            (GDestroyNotify) janus_streaming_mountpoint_free);
356
        janus_mutex_init(&mountpoints_mutex);
357
        /* Parse configuration to populate the mountpoints */
358
        if(config != NULL) {
359
                janus_config_category *cat = janus_config_get_categories(config);
360
                while(cat != NULL) {
361
                        if(cat->name == NULL) {
362
                                cat = cat->next;
363
                                continue;
364
                        }
365
                        JANUS_LOG(LOG_VERB, "Adding stream '%s'\n", cat->name);
366
                        janus_config_item *type = janus_config_get_item(cat, "type");
367
                        if(type == NULL || type->value == NULL) {
368
                                JANUS_LOG(LOG_VERB, "  -- Invalid type, skipping stream...\n");
369
                                cat = cat->next;
370
                                continue;
371
                        }
372
                        if(!strcasecmp(type->value, "rtp")) {
373
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
374
                                janus_config_item *id = janus_config_get_item(cat, "id");
375
                                janus_config_item *desc = janus_config_get_item(cat, "description");
376
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
377
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
378
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
379
                                janus_config_item *video = janus_config_get_item(cat, "video");
380
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
381
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
382
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
383
                                janus_config_item *afmtp = janus_config_get_item(cat, "audiofmtp");
384
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
385
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
386
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
387
                                janus_config_item *vfmtp = janus_config_get_item(cat, "videofmtp");
388
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
389
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
390
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
391
                                if(!doaudio && !dovideo) {
392
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', no audio or video have to be streamed...\n", cat->name);
393
                                        cat = cat->next;
394
                                        continue;
395
                                }
396
                                if(doaudio &&
397
                                                (aport == NULL || aport->value == NULL ||
398
                                                acodec == NULL || acodec->value == NULL ||
399
                                                artpmap == NULL || artpmap->value == NULL)) {
400
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for audio...\n", cat->name);
401
                                        cat = cat->next;
402
                                        continue;
403
                                }
404
                                if(dovideo &&
405
                                                (vport == NULL || vport->value == NULL ||
406
                                                vcodec == NULL || vcodec->value == NULL ||
407
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
408
                                        JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream '%s', missing mandatory information for video...\n", cat->name);
409
                                        cat = cat->next;
410
                                        continue;
411
                                }
412
                                if(id == NULL || id->value == NULL) {
413
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
414
                                } else {
415
                                        janus_mutex_lock(&mountpoints_mutex);
416
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
417
                                        janus_mutex_unlock(&mountpoints_mutex);
418
                                        if(mp != NULL) {
419
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
420
                                                cat = cat->next;
421
                                                continue;
422
                                        }
423
                                }
424
                                JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
425
                                janus_streaming_mountpoint *mp = NULL;
426
                                if((mp = janus_streaming_create_rtp_source(
427
                                                (id && id->value) ? atoi(id->value) : 0,
428
                                                (char *)cat->name,
429
                                                desc ? (char *)desc->value : NULL,
430
                                                doaudio,
431
                                                (aport && aport->value) ? atoi(aport->value) : 0,
432
                                                (acodec && acodec->value) ? atoi(acodec->value) : 0,
433
                                                artpmap ? (char *)artpmap->value : NULL,
434
                                                afmtp ? (char *)afmtp->value : NULL,
435
                                                dovideo,
436
                                                (vport && vport->value) ? atoi(vport->value) : 0,
437
                                                (vcodec && vcodec->value) ? atoi(vcodec->value) : 0,
438
                                                vrtpmap ? (char *)vrtpmap->value : NULL,
439
                                                vfmtp ? (char *)vfmtp->value : NULL)) == NULL) {
440
                                        JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream '%s'...\n", cat->name);
441
                                        continue;
442
                                }
443
                                mp->is_private = is_private;
444
                                if(secret && secret->value)
445
                                        mp->secret = g_strdup(secret->value);
446
                        } else if(!strcasecmp(type->value, "live")) {
447
                                /* File live source */
448
                                janus_config_item *id = janus_config_get_item(cat, "id");
449
                                janus_config_item *desc = janus_config_get_item(cat, "description");
450
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
451
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
452
                                janus_config_item *file = janus_config_get_item(cat, "filename");
453
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
454
                                janus_config_item *video = janus_config_get_item(cat, "video");
455
                                if(file == NULL || file->value == NULL) {
456
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', missing mandatory information...\n", cat->name);
457
                                        cat = cat->next;
458
                                        continue;
459
                                }
460
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
461
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
462
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
463
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
464
                                if(!doaudio || dovideo) {
465
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', we only support audio file streaming right now...\n", cat->name);
466
                                        cat = cat->next;
467
                                        continue;
468
                                }
469
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
470
                                        JANUS_LOG(LOG_ERR, "Can't add 'live' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
471
                                        cat = cat->next;
472
                                        continue;
473
                                }
474
                                if(id == NULL || id->value == NULL) {
475
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
476
                                } else {
477
                                        janus_mutex_lock(&mountpoints_mutex);
478
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
479
                                        janus_mutex_unlock(&mountpoints_mutex);
480
                                        if(mp != NULL) {
481
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
482
                                                cat = cat->next;
483
                                                continue;
484
                                        }
485
                                }
486
                                janus_streaming_mountpoint *mp = NULL;
487
                                if((mp = janus_streaming_create_file_source(
488
                                                (id && id->value) ? atoi(id->value) : 0,
489
                                                (char *)cat->name,
490
                                                desc ? (char *)desc->value : NULL,
491
                                                (char *)file->value,
492
                                                TRUE, doaudio, dovideo)) == NULL) {
493
                                        JANUS_LOG(LOG_ERR, "Error creating 'live' stream '%s'...\n", cat->name);
494
                                        continue;
495
                                }
496
                                mp->is_private = is_private;
497
                                if(secret && secret->value)
498
                                        mp->secret = g_strdup(secret->value);
499
                        } else if(!strcasecmp(type->value, "ondemand")) {
500
                                /* mu-Law file on demand source */
501
                                janus_config_item *id = janus_config_get_item(cat, "id");
502
                                janus_config_item *desc = janus_config_get_item(cat, "description");
503
                                janus_config_item *priv = janus_config_get_item(cat, "is_private");
504
                                janus_config_item *secret = janus_config_get_item(cat, "secret");
505
                                janus_config_item *file = janus_config_get_item(cat, "filename");
506
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
507
                                janus_config_item *video = janus_config_get_item(cat, "video");
508
                                if(file == NULL || file->value == NULL) {
509
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', missing mandatory information...\n", cat->name);
510
                                        cat = cat->next;
511
                                        continue;
512
                                }
513
                                gboolean is_private = priv && priv->value && janus_is_true(priv->value);
514
                                gboolean doaudio = audio && audio->value && janus_is_true(audio->value);
515
                                gboolean dovideo = video && video->value && janus_is_true(video->value);
516
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
517
                                if(!doaudio || dovideo) {
518
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', we only support audio file streaming right now...\n", cat->name);
519
                                        cat = cat->next;
520
                                        continue;
521
                                }
522
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
523
                                        JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name);
524
                                        cat = cat->next;
525
                                        continue;
526
                                }
527
                                if(id == NULL || id->value == NULL) {
528
                                        JANUS_LOG(LOG_VERB, "Missing id for stream '%s', will generate a random one...\n", cat->name);
529
                                } else {
530
                                        janus_mutex_lock(&mountpoints_mutex);
531
                                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(atol(id->value)));
532
                                        janus_mutex_unlock(&mountpoints_mutex);
533
                                        if(mp != NULL) {
534
                                                JANUS_LOG(LOG_WARN, "A stream with the provided ID already exists, skipping '%s'\n", cat->name);
535
                                                cat = cat->next;
536
                                                continue;
537
                                        }
538
                                }
539
                                janus_streaming_mountpoint *mp = NULL;
540
                                if((mp = janus_streaming_create_file_source(
541
                                                (id && id->value) ? atoi(id->value) : 0,
542
                                                (char *)cat->name,
543
                                                desc ? (char *)desc->value : NULL,
544
                                                (char *)file->value,
545
                                                FALSE, doaudio, dovideo)) == NULL) {
546
                                        JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream '%s'...\n", cat->name);
547
                                        continue;
548
                                }
549
                                mp->is_private = is_private;
550
                                if(secret && secret->value)
551
                                        mp->secret = g_strdup(secret->value);
552
                        } else {
553
                                JANUS_LOG(LOG_WARN, "Ignoring unknown stream type '%s' (%s)...\n", type->value, cat->name);
554
                        }
555
                        cat = cat->next;
556
                }
557
                /* Done */
558
                janus_config_destroy(config);
559
                config = NULL;
560
        }
561
        /* Show available mountpoints */
562
        janus_mutex_lock(&mountpoints_mutex);
563
        GHashTableIter iter;
564
        gpointer value;
565
        g_hash_table_iter_init(&iter, mountpoints);
566
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
567
                janus_streaming_mountpoint *mp = value;
568
                JANUS_LOG(LOG_VERB, "  ::: [%"SCNu64"][%s] %s (%s, %s, %s)\n", mp->id, mp->name, mp->description,
569
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
570
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source",
571
                        mp->is_private ? "private" : "public");
572
        }
573
        janus_mutex_unlock(&mountpoints_mutex);
574

    
575
        sessions = g_hash_table_new(NULL, NULL);
576
        janus_mutex_init(&sessions_mutex);
577
        messages = g_async_queue_new_full((GDestroyNotify) janus_streaming_message_free);
578
        /* This is the callback we'll need to invoke to contact the gateway */
579
        gateway = callback;
580
        g_atomic_int_set(&initialized, 1);
581

    
582
        GError *error = NULL;
583
        /* Start the sessions watchdog */
584
        watchdog = g_thread_try_new("streaming watchdog", &janus_streaming_watchdog, NULL, &error);
585
        if(!watchdog) {
586
                g_atomic_int_set(&initialized, 0);
587
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming watchdog thread...\n", error->code, error->message ? error->message : "??");
588
                return -1;
589
        }
590
        /* Launch the thread that will handle incoming messages */
591
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
592
        if(error != NULL) {
593
                g_atomic_int_set(&initialized, 0);
594
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Streaming handler thread...\n", error->code, error->message ? error->message : "??");
595
                return -1;
596
        }
597
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_STREAMING_NAME);
598
        return 0;
599
}
600

    
601
void janus_streaming_destroy(void) {
602
        if(!g_atomic_int_get(&initialized))
603
                return;
604
        g_atomic_int_set(&stopping, 1);
605

    
606
        if(handler_thread != NULL) {
607
                g_thread_join(handler_thread);
608
                handler_thread = NULL;
609
        }
610
        if(watchdog != NULL) {
611
                g_thread_join(watchdog);
612
                watchdog = NULL;
613
        }
614

    
615
        /* FIXME We should destroy the sessions cleanly */
616
        usleep(500000);
617
        janus_mutex_lock(&mountpoints_mutex);
618
        g_hash_table_destroy(mountpoints);
619
        janus_mutex_unlock(&mountpoints_mutex);
620
        janus_mutex_lock(&sessions_mutex);
621
        g_hash_table_destroy(sessions);
622
        janus_mutex_unlock(&sessions_mutex);
623
        g_async_queue_unref(messages);
624
        messages = NULL;
625
        sessions = NULL;
626

    
627
        g_atomic_int_set(&initialized, 0);
628
        g_atomic_int_set(&stopping, 0);
629
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_STREAMING_NAME);
630
}
631

    
632
int janus_streaming_get_api_compatibility(void) {
633
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
634
        return JANUS_PLUGIN_API_VERSION;
635
}
636

    
637
int janus_streaming_get_version(void) {
638
        return JANUS_STREAMING_VERSION;
639
}
640

    
641
const char *janus_streaming_get_version_string(void) {
642
        return JANUS_STREAMING_VERSION_STRING;
643
}
644

    
645
const char *janus_streaming_get_description(void) {
646
        return JANUS_STREAMING_DESCRIPTION;
647
}
648

    
649
const char *janus_streaming_get_name(void) {
650
        return JANUS_STREAMING_NAME;
651
}
652

    
653
const char *janus_streaming_get_author(void) {
654
        return JANUS_STREAMING_AUTHOR;
655
}
656

    
657
const char *janus_streaming_get_package(void) {
658
        return JANUS_STREAMING_PACKAGE;
659
}
660

    
661
void janus_streaming_create_session(janus_plugin_session *handle, int *error) {
662
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
663
                *error = -1;
664
                return;
665
        }        
666
        janus_streaming_session *session = (janus_streaming_session *)calloc(1, sizeof(janus_streaming_session));
667
        if(session == NULL) {
668
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
669
                *error = -2;
670
                return;
671
        }
672
        session->handle = handle;
673
        session->mountpoint = NULL;        /* This will happen later */
674
        session->started = FALSE;        /* This will happen later */
675
        session->paused = FALSE;
676
        session->destroyed = 0;
677
        handle->plugin_handle = session;
678
        janus_mutex_lock(&sessions_mutex);
679
        g_hash_table_insert(sessions, handle, session);
680
        janus_mutex_unlock(&sessions_mutex);
681

    
682
        return;
683
}
684

    
685
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error) {
686
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
687
                *error = -1;
688
                return;
689
        }        
690
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
691
        if(!session) {
692
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
693
                *error = -2;
694
                return;
695
        }
696
        JANUS_LOG(LOG_VERB, "Removing streaming session...\n");
697
        if(session->mountpoint) {
698
                janus_mutex_lock(&session->mountpoint->mutex);
699
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
700
                janus_mutex_unlock(&session->mountpoint->mutex);
701
        }
702
        janus_mutex_lock(&sessions_mutex);
703
        g_hash_table_remove(sessions, handle);
704
        janus_mutex_unlock(&sessions_mutex);
705
        /* Cleaning up and removing the session is done in a lazy way */
706
        session->destroyed = janus_get_monotonic_time();
707
        janus_mutex_lock(&sessions_mutex);
708
        old_sessions = g_list_append(old_sessions, session);
709
        janus_mutex_unlock(&sessions_mutex);
710
        return;
711
}
712

    
713
char *janus_streaming_query_session(janus_plugin_session *handle) {
714
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
715
                return NULL;
716
        }        
717
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
718
        if(!session) {
719
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
720
                return NULL;
721
        }
722
        /* What is this user watching, if anything? */
723
        json_t *info = json_object();
724
        json_object_set_new(info, "state", json_string(session->mountpoint ? "watching" : "idle"));
725
        if(session->mountpoint) {
726
                json_object_set_new(info, "mountpoint_id", json_integer(session->mountpoint->id));
727
                json_object_set_new(info, "mountpoint_name", session->mountpoint->name ? json_string(session->mountpoint->name) : NULL);
728
        }
729
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
730
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
731
        json_decref(info);
732
        return info_text;
733
}
734

    
735
struct janus_plugin_result *janus_streaming_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
736
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
737
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
738
        JANUS_LOG(LOG_VERB, "%s\n", message);
739
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
740
        if(msg == NULL) {
741
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
742
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
743
        }
744

    
745
        /* Pre-parse the message */
746
        int error_code = 0;
747
        char error_cause[512];        /* FIXME 512 should be enough, but anyway... */
748
        json_t *root = NULL;
749
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
750
        if(!session) {
751
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
752
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
753
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
754
                goto error;
755
        }
756
        if(session->destroyed) {
757
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
758
                error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
759
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
760
                goto error;
761
        }
762
        error_code = 0;
763
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
764
        if(message == NULL) {
765
                JANUS_LOG(LOG_ERR, "No message??\n");
766
                error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
767
                g_snprintf(error_cause, 512, "%s", "No message??");
768
                goto error;
769
        }
770
        json_error_t error;
771
        root = json_loads(message, 0, &error);
772
        if(!root) {
773
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
774
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
775
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
776
                goto error;
777
        }
778
        if(!json_is_object(root)) {
779
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
780
                error_code = JANUS_STREAMING_ERROR_INVALID_JSON;
781
                g_snprintf(error_cause, 512, "JSON error: not an object");
782
                goto error;
783
        }
784
        /* Get the request first */
785
        json_t *request = json_object_get(root, "request");
786
        if(!request) {
787
                JANUS_LOG(LOG_ERR, "Missing element (request)\n");
788
                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
789
                g_snprintf(error_cause, 512, "Missing element (request)");
790
                goto error;
791
        }
792
        if(!json_is_string(request)) {
793
                JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
794
                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
795
                g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
796
                goto error;
797
        }
798
        /* Some requests ('create' and 'destroy') can be handled synchronously */
799
        const char *request_text = json_string_value(request);
800
        if(!strcasecmp(request_text, "list")) {
801
                json_t *list = json_array();
802
                JANUS_LOG(LOG_VERB, "Request for the list of mountpoints\n");
803
                /* Return a list of all available mountpoints */
804
                janus_mutex_lock(&mountpoints_mutex);
805
                GHashTableIter iter;
806
                gpointer value;
807
                g_hash_table_iter_init(&iter, mountpoints);
808
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
809
                        janus_streaming_mountpoint *mp = value;
810
                        if(mp->is_private) {
811
                                /* Skip private stream */
812
                                JANUS_LOG(LOG_VERB, "Skipping private mountpoint '%s'\n", mp->description);
813
                                continue;
814
                        }
815
                        json_t *ml = json_object();
816
                        json_object_set_new(ml, "id", json_integer(mp->id));
817
                        json_object_set_new(ml, "description", json_string(mp->description));
818
                        json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
819
                        json_array_append_new(list, ml);
820
                }
821
                janus_mutex_unlock(&mountpoints_mutex);
822
                /* Send info back */
823
                json_t *response = json_object();
824
                json_object_set_new(response, "streaming", json_string("list"));
825
                json_object_set_new(response, "list", list);
826
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
827
                json_decref(response);
828
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
829
                g_free(response_text);
830
                return result;
831
        } else if(!strcasecmp(request_text, "create")) {
832
                /* Create a new stream */
833
                json_t *type = json_object_get(root, "type");
834
                if(!type) {
835
                        JANUS_LOG(LOG_ERR, "Missing element (type)\n");
836
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
837
                        g_snprintf(error_cause, 512, "Missing element (type)");
838
                        goto error;
839
                }
840
                if(!json_is_string(type)) {
841
                        JANUS_LOG(LOG_ERR, "Invalid element (type should be a string)\n");
842
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
843
                        g_snprintf(error_cause, 512, "Invalid element (type should be a string)");
844
                        goto error;
845
                }
846
                const char *type_text = json_string_value(type);
847
                json_t *secret = json_object_get(root, "secret");
848
                if(secret && !json_is_string(secret)) {
849
                        JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
850
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
851
                        g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
852
                        goto error;
853
                }
854
                janus_streaming_mountpoint *mp = NULL;
855
                if(!strcasecmp(type_text, "rtp")) {
856
                        /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
857
                        json_t *id = json_object_get(root, "id");
858
                        if(id && !json_is_integer(id)) {
859
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
860
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
861
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
862
                                goto error;
863
                        }
864
                        json_t *name = json_object_get(root, "name");
865
                        if(name && !json_is_string(name)) {
866
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
867
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
868
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
869
                                goto error;
870
                        }
871
                        json_t *desc = json_object_get(root, "description");
872
                        if(desc && !json_is_string(desc)) {
873
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
874
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
875
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
876
                                goto error;
877
                        }
878
                        json_t *is_private = json_object_get(root, "is_private");
879
                        if(is_private && !json_is_boolean(is_private)) {
880
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
881
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
882
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
883
                                goto error;
884
                        }
885
                        json_t *audio = json_object_get(root, "audio");
886
                        if(audio && !json_is_boolean(audio)) {
887
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
888
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
889
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
890
                                goto error;
891
                        }
892
                        json_t *video = json_object_get(root, "video");
893
                        if(video && !json_is_boolean(video)) {
894
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
895
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
896
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
897
                                goto error;
898
                        }
899
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
900
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
901
                        if(!doaudio && !dovideo) {
902
                                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
903
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
904
                                g_snprintf(error_cause, 512, "Can't add 'rtp' stream, no audio or video have to be streamed...");
905
                                goto error;
906
                        }
907
                        uint16_t aport = 0;
908
                        uint8_t acodec = 0;
909
                        char *artpmap = NULL, *afmtp = NULL;
910
                        if(doaudio) {
911
                                json_t *audioport = json_object_get(root, "audioport");
912
                                if(!audioport) {
913
                                        JANUS_LOG(LOG_ERR, "Missing element (audioport)\n");
914
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
915
                                        g_snprintf(error_cause, 512, "Missing element (audioport)");
916
                                        goto error;
917
                                }
918
                                if(!json_is_integer(audioport)) {
919
                                        JANUS_LOG(LOG_ERR, "Invalid element (audioport should be an integer)\n");
920
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
921
                                        g_snprintf(error_cause, 512, "Invalid element (audioport should be an integer)");
922
                                        goto error;
923
                                }
924
                                aport = json_integer_value(audioport);
925
                                json_t *audiopt = json_object_get(root, "audiopt");
926
                                if(!audiopt) {
927
                                        JANUS_LOG(LOG_ERR, "Missing element (audiopt)\n");
928
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
929
                                        g_snprintf(error_cause, 512, "Missing element (audiopt)");
930
                                        goto error;
931
                                }
932
                                if(!json_is_integer(audiopt)) {
933
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiopt should be an integer)\n");
934
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
935
                                        g_snprintf(error_cause, 512, "Invalid element (audiopt should be an integer)");
936
                                        goto error;
937
                                }
938
                                acodec = json_integer_value(audiopt);
939
                                json_t *audiortpmap = json_object_get(root, "audiortpmap");
940
                                if(!audiortpmap) {
941
                                        JANUS_LOG(LOG_ERR, "Missing element (audiortpmap)\n");
942
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
943
                                        g_snprintf(error_cause, 512, "Missing element (audiortpmap)");
944
                                        goto error;
945
                                }
946
                                if(!json_is_string(audiortpmap)) {
947
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiortpmap should be a string)\n");
948
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
949
                                        g_snprintf(error_cause, 512, "Invalid element (audiortpmap should be a string)");
950
                                        goto error;
951
                                }
952
                                artpmap = (char *)json_string_value(audiortpmap);
953
                                json_t *audiofmtp = json_object_get(root, "audiofmtp");
954
                                if(audiofmtp && !json_is_string(audiofmtp)) {
955
                                        JANUS_LOG(LOG_ERR, "Invalid element (audiofmtp should be a string)\n");
956
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
957
                                        g_snprintf(error_cause, 512, "Invalid element (audiofmtp should be a string)");
958
                                        goto error;
959
                                }
960
                                afmtp = (char *)json_string_value(audiofmtp);
961
                        }
962
                        uint16_t vport = 0;
963
                        uint8_t vcodec = 0;
964
                        char *vrtpmap = NULL, *vfmtp = NULL;
965
                        if(dovideo) {
966
                                json_t *videoport = json_object_get(root, "videoport");
967
                                if(!videoport) {
968
                                        JANUS_LOG(LOG_ERR, "Missing element (videoport)\n");
969
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
970
                                        g_snprintf(error_cause, 512, "Missing element (videoport)");
971
                                        goto error;
972
                                }
973
                                if(!json_is_integer(videoport)) {
974
                                        JANUS_LOG(LOG_ERR, "Invalid element (videoport should be an integer)\n");
975
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
976
                                        g_snprintf(error_cause, 512, "Invalid element (videoport should be an integer)");
977
                                        goto error;
978
                                }
979
                                vport = json_integer_value(videoport);
980
                                json_t *videopt = json_object_get(root, "videopt");
981
                                if(!videopt) {
982
                                        JANUS_LOG(LOG_ERR, "Missing element (videopt)\n");
983
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
984
                                        g_snprintf(error_cause, 512, "Missing element (videopt)");
985
                                        goto error;
986
                                }
987
                                if(!json_is_integer(videopt)) {
988
                                        JANUS_LOG(LOG_ERR, "Invalid element (videopt should be an integer)\n");
989
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
990
                                        g_snprintf(error_cause, 512, "Invalid element (videopt should be an integer)");
991
                                        goto error;
992
                                }
993
                                vcodec = json_integer_value(videopt);
994
                                json_t *videortpmap = json_object_get(root, "videortpmap");
995
                                if(!videortpmap) {
996
                                        JANUS_LOG(LOG_ERR, "Missing element (videortpmap)\n");
997
                                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
998
                                        g_snprintf(error_cause, 512, "Missing element (videortpmap)");
999
                                        goto error;
1000
                                }
1001
                                if(!json_is_string(videortpmap)) {
1002
                                        JANUS_LOG(LOG_ERR, "Invalid element (videortpmap should be a string)\n");
1003
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1004
                                        g_snprintf(error_cause, 512, "Invalid element (videortpmap should be a string)");
1005
                                        goto error;
1006
                                }
1007
                                vrtpmap = (char *)json_string_value(videortpmap);
1008
                                json_t *videofmtp = json_object_get(root, "videofmtp");
1009
                                if(videofmtp && !json_is_string(videofmtp)) {
1010
                                        JANUS_LOG(LOG_ERR, "Invalid element (videofmtp should be a string)\n");
1011
                                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1012
                                        g_snprintf(error_cause, 512, "Invalid element (videofmtp should be a string)");
1013
                                        goto error;
1014
                                }
1015
                                vfmtp = (char *)json_string_value(videofmtp);
1016
                        }
1017
                        if(id == NULL) {
1018
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1019
                        } else {
1020
                                janus_mutex_lock(&mountpoints_mutex);
1021
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1022
                                janus_mutex_unlock(&mountpoints_mutex);
1023
                                if(mp != NULL) {
1024
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1025
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1026
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1027
                                        goto error;
1028
                                }
1029
                        }
1030
                        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
1031
                        mp = janus_streaming_create_rtp_source(
1032
                                        id ? json_integer_value(id) : 0,
1033
                                        name ? (char *)json_string_value(name) : NULL,
1034
                                        desc ? (char *)json_string_value(desc) : NULL,
1035
                                        doaudio, aport, acodec, artpmap, afmtp,
1036
                                        dovideo, vport, vcodec, vrtpmap, vfmtp);
1037
                        if(mp == NULL) {
1038
                                JANUS_LOG(LOG_ERR, "Error creating 'rtp' stream...\n");
1039
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1040
                                g_snprintf(error_cause, 512, "Error creating 'rtp' stream");
1041
                                goto error;
1042
                        }
1043
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1044
                } else if(!strcasecmp(type_text, "live")) {
1045
                        /* File live source */
1046
                        json_t *id = json_object_get(root, "id");
1047
                        if(id && !json_is_integer(id)) {
1048
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1049
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1050
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1051
                                goto error;
1052
                        }
1053
                        json_t *name = json_object_get(root, "name");
1054
                        if(name && !json_is_string(name)) {
1055
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1056
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1057
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1058
                                goto error;
1059
                        }
1060
                        json_t *desc = json_object_get(root, "description");
1061
                        if(desc && !json_is_string(desc)) {
1062
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1063
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1064
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1065
                                goto error;
1066
                        }
1067
                        json_t *is_private = json_object_get(root, "is_private");
1068
                        if(is_private && !json_is_boolean(is_private)) {
1069
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1070
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1071
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1072
                                goto error;
1073
                        }
1074
                        json_t *file = json_object_get(root, "file");
1075
                        if(file && !json_is_string(file)) {
1076
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1077
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1078
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1079
                                goto error;
1080
                        }
1081
                        json_t *audio = json_object_get(root, "audio");
1082
                        if(audio && !json_is_boolean(audio)) {
1083
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1084
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1085
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1086
                                goto error;
1087
                        }
1088
                        json_t *video = json_object_get(root, "video");
1089
                        if(video && !json_is_boolean(video)) {
1090
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1091
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1092
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1093
                                goto error;
1094
                        }
1095
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1096
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1097
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1098
                        if(!doaudio || dovideo) {
1099
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n");
1100
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1101
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, we only support audio file streaming right now...");
1102
                                goto error;
1103
                        }
1104
                        char *filename = (char *)json_string_value(file);
1105
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1106
                                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1107
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1108
                                g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1109
                                goto error;
1110
                        }
1111
                        if(id == NULL) {
1112
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1113
                        } else {
1114
                                janus_mutex_lock(&mountpoints_mutex);
1115
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1116
                                janus_mutex_unlock(&mountpoints_mutex);
1117
                                if(mp != NULL) {
1118
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1119
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1120
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1121
                                        goto error;
1122
                                }
1123
                        }
1124
                        mp = janus_streaming_create_file_source(
1125
                                        id ? json_integer_value(id) : 0,
1126
                                        name ? (char *)json_string_value(name) : NULL,
1127
                                        desc ? (char *)json_string_value(desc) : NULL,
1128
                                        filename,
1129
                                        TRUE, doaudio, dovideo);
1130
                        if(mp == NULL) {
1131
                                JANUS_LOG(LOG_ERR, "Error creating 'live' stream...\n");
1132
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1133
                                g_snprintf(error_cause, 512, "Error creating 'live' stream");
1134
                                goto error;
1135
                        }
1136
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1137
                } else if(!strcasecmp(type_text, "ondemand")) {
1138
                        /* mu-Law file on demand source */
1139
                        json_t *id = json_object_get(root, "id");
1140
                        if(id && !json_is_integer(id)) {
1141
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1142
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1143
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1144
                                goto error;
1145
                        }
1146
                        json_t *name = json_object_get(root, "name");
1147
                        if(name && !json_is_string(name)) {
1148
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1149
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1150
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1151
                                goto error;
1152
                        }
1153
                        json_t *desc = json_object_get(root, "description");
1154
                        if(desc && !json_is_string(desc)) {
1155
                                JANUS_LOG(LOG_ERR, "Invalid element (desc should be a string)\n");
1156
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1157
                                g_snprintf(error_cause, 512, "Invalid element (desc should be a string)");
1158
                                goto error;
1159
                        }
1160
                        json_t *is_private = json_object_get(root, "is_private");
1161
                        if(is_private && !json_is_boolean(is_private)) {
1162
                                JANUS_LOG(LOG_ERR, "Invalid element (is_private should be a boolean)\n");
1163
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1164
                                g_snprintf(error_cause, 512, "Invalid value (is_private should be a boolean)");
1165
                                goto error;
1166
                        }
1167
                        json_t *file = json_object_get(root, "file");
1168
                        if(file && !json_is_string(file)) {
1169
                                JANUS_LOG(LOG_ERR, "Invalid element (file should be a string)\n");
1170
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1171
                                g_snprintf(error_cause, 512, "Invalid element (file should be a string)");
1172
                                goto error;
1173
                        }
1174
                        json_t *audio = json_object_get(root, "audio");
1175
                        if(audio && !json_is_boolean(audio)) {
1176
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1177
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1178
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1179
                                goto error;
1180
                        }
1181
                        json_t *video = json_object_get(root, "video");
1182
                        if(video && !json_is_boolean(video)) {
1183
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1184
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1185
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1186
                                goto error;
1187
                        }
1188
                        gboolean doaudio = audio ? json_is_true(audio) : FALSE;
1189
                        gboolean dovideo = video ? json_is_true(video) : FALSE;
1190
                        /* TODO We should support something more than raw a-Law and mu-Law streams... */
1191
                        if(!doaudio || dovideo) {
1192
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
1193
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1194
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, we only support audio file streaming right now...");
1195
                                goto error;
1196
                        }
1197
                        char *filename = (char *)json_string_value(file);
1198
                        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
1199
                                JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
1200
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1201
                                g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)");
1202
                                goto error;
1203
                        }
1204
                        if(id == NULL) {
1205
                                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
1206
                        } else {
1207
                                janus_mutex_lock(&mountpoints_mutex);
1208
                                mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(json_integer_value(id)));
1209
                                janus_mutex_unlock(&mountpoints_mutex);
1210
                                if(mp != NULL) {
1211
                                        JANUS_LOG(LOG_ERR, "A stream with the provided ID already exists\n");
1212
                                        error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1213
                                        g_snprintf(error_cause, 512, "A stream with the provided ID already exists");
1214
                                        goto error;
1215
                                }
1216
                        }
1217
                        mp = janus_streaming_create_file_source(
1218
                                        id ? json_integer_value(id) : 0,
1219
                                        name ? (char *)json_string_value(name) : NULL,
1220
                                        desc ? (char *)json_string_value(desc) : NULL,
1221
                                        filename,
1222
                                        FALSE, doaudio, dovideo);
1223
                        if(mp == NULL) {
1224
                                JANUS_LOG(LOG_ERR, "Error creating 'ondemand' stream...\n");
1225
                                error_code = JANUS_STREAMING_ERROR_CANT_CREATE;
1226
                                g_snprintf(error_cause, 512, "Error creating 'ondemand' stream");
1227
                                goto error;
1228
                        }
1229
                        mp->is_private = is_private ? json_is_true(is_private) : FALSE;
1230
                } else {
1231
                        JANUS_LOG(LOG_ERR, "Unknown stream type '%s'...\n", type_text);
1232
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1233
                        g_snprintf(error_cause, 512, "Unknown stream type '%s'...\n", type_text);
1234
                        goto error;
1235
                }
1236
                /* Any secret? */
1237
                if(secret)
1238
                        mp->secret = g_strdup(json_string_value(secret));
1239
                /* Send info back */
1240
                json_t *response = json_object();
1241
                json_object_set_new(response, "streaming", json_string("created"));
1242
                json_object_set_new(response, "created", json_string(mp->name));
1243
                json_t *ml = json_object();
1244
                json_object_set_new(ml, "id", json_integer(mp->id));
1245
                json_object_set_new(ml, "description", json_string(mp->description));
1246
                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
1247
                json_object_set_new(ml, "is_private", json_string(mp->is_private ? "true" : "false"));
1248
                json_object_set_new(response, "stream", ml);
1249
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1250
                json_decref(response);
1251
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1252
                g_free(response_text);
1253
                return result;
1254
        } else if(!strcasecmp(request_text, "destroy")) {
1255
                /* Get rid of an existing stream (notice this doesn't remove it from the config file, though) */
1256
                json_t *id = json_object_get(root, "id");
1257
                if(!id) {
1258
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1259
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1260
                        g_snprintf(error_cause, 512, "Missing element (id)");
1261
                        goto error;
1262
                }
1263
                if(!json_is_integer(id)) {
1264
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1265
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1266
                        g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1267
                        goto error;
1268
                }
1269
                gint64 id_value = json_integer_value(id);
1270
                janus_mutex_lock(&mountpoints_mutex);
1271
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1272
                if(mp == NULL) {
1273
                        janus_mutex_unlock(&mountpoints_mutex);
1274
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1275
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1276
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1277
                        goto error;
1278
                }
1279
                if(mp->secret) {
1280
                        /* This action requires an authorized user */
1281
                        json_t *secret = json_object_get(root, "secret");
1282
                        if(!secret) {
1283
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1284
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1285
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1286
                                goto error;
1287
                        }
1288
                        if(!json_is_string(secret)) {
1289
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1290
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1291
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1292
                                goto error;
1293
                        }
1294
                        if(strcmp(mp->secret, json_string_value(secret))) {
1295
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1296
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1297
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1298
                                goto error;
1299
                        }
1300
                }
1301
                JANUS_LOG(LOG_VERB, "Request to unmount mountpoint/stream %"SCNu64"\n", id_value);
1302
                /* FIXME Should we kick the current viewers as well? */
1303
                g_hash_table_remove(mountpoints, GINT_TO_POINTER(id_value));
1304
                janus_mutex_unlock(&mountpoints_mutex);
1305
                /* Send info back */
1306
                json_t *response = json_object();
1307
                json_object_set_new(response, "streaming", json_string("destroyed"));
1308
                json_object_set_new(response, "destroyed", json_integer(id_value));
1309
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1310
                json_decref(response);
1311
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1312
                g_free(response_text);
1313
                return result;
1314
        } else if(!strcasecmp(request_text, "recording")) {
1315
                /* We can start/stop recording a live, RTP-based stream */
1316
                json_t *action = json_object_get(root, "action");
1317
                if(!action) {
1318
                        JANUS_LOG(LOG_ERR, "Missing element (action)\n");
1319
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1320
                        g_snprintf(error_cause, 512, "Missing element (action)");
1321
                        goto error;
1322
                }
1323
                if(!json_is_string(action)) {
1324
                        JANUS_LOG(LOG_ERR, "Invalid element (action should be a string)\n");
1325
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1326
                        g_snprintf(error_cause, 512, "Invalid element (action should be a string)");
1327
                        goto error;
1328
                }
1329
                const char *action_text = json_string_value(action);
1330
                if(strcasecmp(action_text, "start") && strcasecmp(action_text, "stop")) {
1331
                        JANUS_LOG(LOG_ERR, "Invalid action (should be start|stop)\n");
1332
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1333
                        g_snprintf(error_cause, 512, "Invalid action (should be start|stop)");
1334
                        goto error;
1335
                }
1336
                json_t *id = json_object_get(root, "id");
1337
                if(!id) {
1338
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1339
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1340
                        g_snprintf(error_cause, 512, "Missing element (id)");
1341
                        goto error;
1342
                }
1343
                if(!json_is_integer(id)) {
1344
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1345
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1346
                        g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1347
                        goto error;
1348
                }
1349
                gint64 id_value = json_integer_value(id);
1350
                janus_mutex_lock(&mountpoints_mutex);
1351
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1352
                if(mp == NULL) {
1353
                        janus_mutex_unlock(&mountpoints_mutex);
1354
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1355
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1356
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1357
                        goto error;
1358
                }
1359
                if(mp->streaming_type != janus_streaming_type_live || mp->streaming_source != janus_streaming_source_rtp) {
1360
                        janus_mutex_unlock(&mountpoints_mutex);
1361
                        JANUS_LOG(LOG_ERR, "Recording is only available on RTP-based live streams\n");
1362
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1363
                        g_snprintf(error_cause, 512, "Recording is only available on RTP-based live streams");
1364
                        goto error;
1365
                }
1366
                if(mp->secret) {
1367
                        /* This action requires an authorized user */
1368
                        json_t *secret = json_object_get(root, "secret");
1369
                        if(!secret) {
1370
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1371
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1372
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1373
                                goto error;
1374
                        }
1375
                        if(!json_is_string(secret)) {
1376
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1377
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1378
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1379
                                goto error;
1380
                        }
1381
                        if(strcmp(mp->secret, json_string_value(secret))) {
1382
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1383
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1384
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1385
                                goto error;
1386
                        }
1387
                }
1388
                janus_streaming_rtp_source *source = mp->source;
1389
                if(!strcasecmp(action_text, "start")) {
1390
                        /* Start a recording for audio and/or video */
1391
                        json_t *audio = json_object_get(root, "audio");
1392
                        if(audio && !json_is_string(audio)) {
1393
                                janus_mutex_unlock(&mountpoints_mutex);
1394
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a string)\n");
1395
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1396
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a string)");
1397
                                goto error;
1398
                        }
1399
                        json_t *video = json_object_get(root, "video");
1400
                        if(video && !json_is_string(video)) {
1401
                                janus_mutex_unlock(&mountpoints_mutex);
1402
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a string)\n");
1403
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1404
                                g_snprintf(error_cause, 512, "Invalid value (video should be a string)");
1405
                                goto error;
1406
                        }
1407
                        if((audio && source->arc) || (video && source->vrc)) {
1408
                                janus_mutex_unlock(&mountpoints_mutex);
1409
                                JANUS_LOG(LOG_ERR, "Recording for audio and/or video already started for this stream\n");
1410
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1411
                                g_snprintf(error_cause, 512, "Recording for audio and/or video already started for this stream");
1412
                                goto error;
1413
                        }
1414
                        if(!audio && !video) {
1415
                                janus_mutex_unlock(&mountpoints_mutex);
1416
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1417
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1418
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1419
                                goto error;
1420
                        }
1421
                        if(audio) {
1422
                                const char *audiofile = json_string_value(audio);
1423
                                source->arc = janus_recorder_create(NULL, 0, (char *)audiofile);
1424
                                if(source->arc == NULL) {
1425
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for audio\n");
1426
                                } else {
1427
                                        JANUS_LOG(LOG_INFO, "[%s] Audio recording started\n", mp->name);
1428
                                }
1429
                        }
1430
                        if(video) {
1431
                                const char *videofile = json_string_value(video);
1432
                                source->vrc = janus_recorder_create(NULL, 1, (char *)videofile);
1433
                                if(source->vrc == NULL) {
1434
                                        JANUS_LOG(LOG_ERR, "Error starting recorder for video\n");
1435
                                } else {
1436
                                        JANUS_LOG(LOG_INFO, "[%s] Video recording started\n", mp->name);
1437
                                }
1438
                        }
1439
                        janus_mutex_unlock(&mountpoints_mutex);
1440
                        /* Send a success response back */
1441
                        json_t *response = json_object();
1442
                        json_object_set_new(response, "streaming", json_string("ok"));
1443
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1444
                        json_decref(response);
1445
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1446
                        g_free(response_text);
1447
                        return result;
1448
                } else if(!strcasecmp(action_text, "stop")) {
1449
                        /* Stop the recording */
1450
                        json_t *audio = json_object_get(root, "audio");
1451
                        if(audio && !json_is_boolean(audio)) {
1452
                                janus_mutex_unlock(&mountpoints_mutex);
1453
                                JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
1454
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1455
                                g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
1456
                                goto error;
1457
                        }
1458
                        json_t *video = json_object_get(root, "video");
1459
                        if(video && !json_is_boolean(video)) {
1460
                                janus_mutex_unlock(&mountpoints_mutex);
1461
                                JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
1462
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1463
                                g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
1464
                                goto error;
1465
                        }
1466
                        if(!audio && !video) {
1467
                                janus_mutex_unlock(&mountpoints_mutex);
1468
                                JANUS_LOG(LOG_ERR, "Missing audio and/or video\n");
1469
                                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1470
                                g_snprintf(error_cause, 512, "Missing audio and/or video");
1471
                                goto error;
1472
                        }
1473
                        if(audio && json_is_true(audio) && source->arc) {
1474
                                /* Close the audio recording */
1475
                                janus_recorder_close(source->arc);
1476
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1477
                                janus_recorder *tmp = source->arc;
1478
                                source->arc = NULL;
1479
                                janus_recorder_free(tmp);
1480
                        }
1481
                        if(video && json_is_true(video) && source->vrc) {
1482
                                /* Close the video recording */
1483
                                janus_recorder_close(source->vrc);
1484
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1485
                                janus_recorder *tmp = source->vrc;
1486
                                source->vrc = NULL;
1487
                                janus_recorder_free(tmp);
1488
                        }
1489
                        janus_mutex_unlock(&mountpoints_mutex);
1490
                        /* Send a success response back */
1491
                        json_t *response = json_object();
1492
                        json_object_set_new(response, "streaming", json_string("ok"));
1493
                        char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1494
                        json_decref(response);
1495
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1496
                        g_free(response_text);
1497
                        return result;
1498
                }
1499
        } else if(!strcasecmp(request_text, "enable") || !strcasecmp(request_text, "disable")) {
1500
                /* A request to enable/disable a mountpoint */
1501
                json_t *id = json_object_get(root, "id");
1502
                if(!id) {
1503
                        JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1504
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1505
                        g_snprintf(error_cause, 512, "Missing element (id)");
1506
                        goto error;
1507
                }
1508
                if(!json_is_integer(id)) {
1509
                        JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1510
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1511
                        g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1512
                        goto error;
1513
                }
1514
                gint64 id_value = json_integer_value(id);
1515
                janus_mutex_lock(&mountpoints_mutex);
1516
                janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1517
                if(mp == NULL) {
1518
                        janus_mutex_unlock(&mountpoints_mutex);
1519
                        JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1520
                        error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1521
                        g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1522
                        goto error;
1523
                }
1524
                if(mp->secret) {
1525
                        /* This action requires an authorized user */
1526
                        json_t *secret = json_object_get(root, "secret");
1527
                        if(!secret) {
1528
                                janus_mutex_unlock(&mountpoints_mutex);
1529
                                JANUS_LOG(LOG_ERR, "Missing element (secret)\n");
1530
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1531
                                g_snprintf(error_cause, 512, "Missing element (secret)");
1532
                                goto error;
1533
                        }
1534
                        if(!json_is_string(secret)) {
1535
                                janus_mutex_unlock(&mountpoints_mutex);
1536
                                JANUS_LOG(LOG_ERR, "Invalid element (secret should be a string)\n");
1537
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1538
                                g_snprintf(error_cause, 512, "Invalid element (secret should be a string)");
1539
                                goto error;
1540
                        }
1541
                        if(strcmp(mp->secret, json_string_value(secret))) {
1542
                                janus_mutex_unlock(&mountpoints_mutex);
1543
                                JANUS_LOG(LOG_ERR, "Unauthorized (wrong secret)\n");
1544
                                error_code = JANUS_STREAMING_ERROR_UNAUTHORIZED;
1545
                                g_snprintf(error_cause, 512, "Unauthorized (wrong secret)");
1546
                                goto error;
1547
                        }
1548
                }
1549
                if(!strcasecmp(request_text, "enable")) {
1550
                        /* Enable a previously disabled mountpoint */
1551
                        JANUS_LOG(LOG_INFO, "[%s] Stream enabled\n", mp->name);
1552
                        mp->enabled = TRUE;
1553
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1554
                } else {
1555
                        /* Disable a previously enabled mountpoint */
1556
                        JANUS_LOG(LOG_INFO, "[%s] Stream disabled\n", mp->name);
1557
                        mp->enabled = FALSE;
1558
                        /* Any recording to close? */
1559
                        janus_streaming_rtp_source *source = mp->source;
1560
                        if(source->arc) {
1561
                                janus_recorder_close(source->arc);
1562
                                JANUS_LOG(LOG_INFO, "[%s] Closed audio recording %s\n", mp->name, source->arc->filename ? source->arc->filename : "??");
1563
                                janus_recorder *tmp = source->arc;
1564
                                source->arc = NULL;
1565
                                janus_recorder_free(tmp);
1566
                        }
1567
                        if(source->vrc) {
1568
                                janus_recorder_close(source->vrc);
1569
                                JANUS_LOG(LOG_INFO, "[%s] Closed video recording %s\n", mp->name, source->vrc->filename ? source->vrc->filename : "??");
1570
                                janus_recorder *tmp = source->vrc;
1571
                                source->vrc = NULL;
1572
                                janus_recorder_free(tmp);
1573
                        }
1574
                        /* FIXME: Should we notify the listeners, or is this up to the controller application? */
1575
                }
1576
                janus_mutex_unlock(&mountpoints_mutex);
1577
                /* Send a success response back */
1578
                json_t *response = json_object();
1579
                json_object_set_new(response, "streaming", json_string("ok"));
1580
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1581
                json_decref(response);
1582
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
1583
                g_free(response_text);
1584
                return result;
1585
        } else if(!strcasecmp(request_text, "watch") || !strcasecmp(request_text, "start")
1586
                        || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "stop")
1587
                        || !strcasecmp(request_text, "switch")) {
1588
                /* These messages are handled asynchronously */
1589
                goto async;
1590
        } else {
1591
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1592
                error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1593
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1594
                goto error;
1595
        }
1596

    
1597
error:
1598
                {
1599
                        if(root != NULL)
1600
                                json_decref(root);
1601
                        /* Prepare JSON error event */
1602
                        json_t *event = json_object();
1603
                        json_object_set_new(event, "videoroom", json_string("event"));
1604
                        json_object_set_new(event, "error_code", json_integer(error_code));
1605
                        json_object_set_new(event, "error", json_string(error_cause));
1606
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1607
                        json_decref(event);
1608
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
1609
                        g_free(event_text);
1610
                        return result;
1611
                }
1612

    
1613
async:
1614
                {
1615
                        /* All the other requests to this plugin are handled asynchronously */
1616
                        msg->handle = handle;
1617
                        msg->transaction = transaction;
1618
                        msg->message = root;
1619
                        msg->sdp_type = sdp_type;
1620
                        msg->sdp = sdp;
1621

    
1622
                        g_async_queue_push(messages, msg);
1623

    
1624
                        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
1625
                }
1626
}
1627

    
1628
void janus_streaming_setup_media(janus_plugin_session *handle) {
1629
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
1630
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1631
                return;
1632
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1633
        if(!session) {
1634
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1635
                return;
1636
        }
1637
        if(session->destroyed)
1638
                return;
1639
        /* TODO Only start streaming when we get this event */
1640
        session->context.a_last_ssrc = 0;
1641
        session->context.a_last_ssrc = 0;
1642
        session->context.a_last_ts = 0;
1643
        session->context.a_base_ts = 0;
1644
        session->context.a_base_ts_prev = 0;
1645
        session->context.v_last_ssrc = 0;
1646
        session->context.v_last_ts = 0;
1647
        session->context.v_base_ts = 0;
1648
        session->context.v_base_ts_prev = 0;
1649
        session->context.a_last_seq = 0;
1650
        session->context.a_base_seq = 0;
1651
        session->context.a_base_seq_prev = 0;
1652
        session->context.v_last_seq = 0;
1653
        session->context.v_base_seq = 0;
1654
        session->context.v_base_seq_prev = 0;
1655
        session->started = TRUE;
1656
        /* Prepare JSON event */
1657
        json_t *event = json_object();
1658
        json_object_set_new(event, "streaming", json_string("event"));
1659
        json_t *result = json_object();
1660
        json_object_set_new(result, "status", json_string("started"));
1661
        json_object_set(event, "result", result);
1662
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1663
        json_decref(event);
1664
        json_decref(result);
1665
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1666
        int ret = gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL);
1667
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1668
        g_free(event_text);
1669
}
1670

    
1671
void janus_streaming_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
1672
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1673
                return;
1674
        /* FIXME We don't care about what the browser sends us, we're sendonly */
1675
}
1676

    
1677
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
1678
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1679
                return;
1680
        /* We might interested in the available bandwidth that the user advertizes */
1681
        uint64_t bw = janus_rtcp_get_remb(buf, len);
1682
        if(bw > 0) {
1683
                JANUS_LOG(LOG_HUGE, "REMB for this PeerConnection: %"SCNu64"\n", bw);
1684
                /* TODO Use this somehow (e.g., notification towards application?) */
1685
        }
1686
        /* FIXME Maybe we should care about RTCP, but not now */
1687
}
1688

    
1689
void janus_streaming_hangup_media(janus_plugin_session *handle) {
1690
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
1691
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
1692
                return;
1693
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
1694
        if(!session) {
1695
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1696
                return;
1697
        }
1698
        if(session->destroyed)
1699
                return;
1700
        /* FIXME Simulate a "stop" coming from the browser */
1701
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
1702
        if(msg == NULL) {
1703
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1704
                return;
1705
        }
1706
        msg->handle = handle;
1707
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
1708
        msg->transaction = NULL;
1709
        msg->sdp_type = NULL;
1710
        msg->sdp = NULL;
1711
        g_async_queue_push(messages, msg);
1712
}
1713

    
1714
/* Thread to handle incoming messages */
1715
static void *janus_streaming_handler(void *data) {
1716
        JANUS_LOG(LOG_VERB, "Joining Streaming handler thread\n");
1717
        janus_streaming_message *msg = NULL;
1718
        int error_code = 0;
1719
        char *error_cause = calloc(1024, sizeof(char));
1720
        if(error_cause == NULL) {
1721
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1722
                return NULL;
1723
        }
1724
        json_t *root = NULL;
1725
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
1726
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
1727
                        usleep(50000);
1728
                        continue;
1729
                }
1730
                janus_streaming_session *session = (janus_streaming_session *)msg->handle->plugin_handle;
1731
                if(!session) {
1732
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1733
                        janus_streaming_message_free(msg);
1734
                        continue;
1735
                }
1736
                if(session->destroyed) {
1737
                        janus_streaming_message_free(msg);
1738
                        continue;
1739
                }
1740
                /* Handle request */
1741
                error_code = 0;
1742
                root = NULL;
1743
                if(msg->message == NULL) {
1744
                        JANUS_LOG(LOG_ERR, "No message??\n");
1745
                        error_code = JANUS_STREAMING_ERROR_NO_MESSAGE;
1746
                        g_snprintf(error_cause, 512, "%s", "No message??");
1747
                        goto error;
1748
                }
1749
                root = msg->message;
1750
                /* Get the request first */
1751
                json_t *request = json_object_get(root, "request");
1752
                if(!request) {
1753
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
1754
                        error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1755
                        g_snprintf(error_cause, 512, "Missing element (request)");
1756
                        goto error;
1757
                }
1758
                if(!json_is_string(request)) {
1759
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
1760
                        error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1761
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
1762
                        goto error;
1763
                }
1764
                const char *request_text = json_string_value(request);
1765
                json_t *result = NULL;
1766
                const char *sdp_type = NULL;
1767
                char *sdp = NULL;
1768
                /* All these requests can only be handled asynchronously */
1769
                if(!strcasecmp(request_text, "watch")) {
1770
                        json_t *id = json_object_get(root, "id");
1771
                        if(!id) {
1772
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1773
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1774
                                g_snprintf(error_cause, 512, "Missing element (id)");
1775
                                goto error;
1776
                        }
1777
                        if(!json_is_integer(id)) {
1778
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1779
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1780
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1781
                                goto error;
1782
                        }
1783
                        gint64 id_value = json_integer_value(id);
1784
                        janus_mutex_lock(&mountpoints_mutex);
1785
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1786
                        if(mp == NULL) {
1787
                                janus_mutex_unlock(&mountpoints_mutex);
1788
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1789
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1790
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1791
                                goto error;
1792
                        }
1793
                        janus_mutex_unlock(&mountpoints_mutex);
1794
                        JANUS_LOG(LOG_VERB, "Request to watch mountpoint/stream %"SCNu64"\n", id_value);
1795
                        session->stopping = FALSE;
1796
                        session->mountpoint = mp;
1797
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
1798
                                GError *error = NULL;
1799
                                g_thread_try_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session, &error);
1800
                                if(error != NULL) {
1801
                                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the on-demand thread...\n", error->code, error->message ? error->message : "??");
1802
                                        error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
1803
                                        g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the on-demand thread", error->code, error->message ? error->message : "??");
1804
                                        goto error;
1805
                                }
1806
                        }
1807
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
1808
                        janus_mutex_lock(&mp->mutex);
1809
                        mp->listeners = g_list_append(mp->listeners, session);
1810
                        janus_mutex_unlock(&mp->mutex);
1811
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
1812
                        char sdptemp[2048];
1813
                        memset(sdptemp, 0, 2048);
1814
                        gchar buffer[512];
1815
                        memset(buffer, 0, 512);
1816
                        gint64 sessid = janus_get_monotonic_time();
1817
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
1818
                        g_snprintf(buffer, 512,
1819
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
1820
                                        "-", sessid, version);
1821
                        g_strlcat(sdptemp, buffer, 2048);
1822
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 2048);
1823
                        if(mp->codecs.audio_pt >= 0) {
1824
                                /* Add audio line */
1825
                                g_snprintf(buffer, 512,
1826
                                        "m=audio 1 RTP/SAVPF %d\r\n"
1827
                                        "c=IN IP4 1.1.1.1\r\n",
1828
                                        mp->codecs.audio_pt);
1829
                                g_strlcat(sdptemp, buffer, 2048);
1830
                                if(mp->codecs.audio_rtpmap) {
1831
                                        g_snprintf(buffer, 512,
1832
                                                "a=rtpmap:%d %s\r\n",
1833
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
1834
                                        g_strlcat(sdptemp, buffer, 2048);
1835
                                }
1836
                                if(mp->codecs.audio_fmtp) {
1837
                                        g_snprintf(buffer, 512,
1838
                                                "a=fmtp:%d %s\r\n",
1839
                                                mp->codecs.audio_pt, mp->codecs.audio_fmtp);
1840
                                        g_strlcat(sdptemp, buffer, 2048);
1841
                                }
1842
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
1843
                        }
1844
                        if(mp->codecs.video_pt >= 0) {
1845
                                /* Add video line */
1846
                                g_snprintf(buffer, 512,
1847
                                        "m=video 1 RTP/SAVPF %d\r\n"
1848
                                        "c=IN IP4 1.1.1.1\r\n",
1849
                                        mp->codecs.video_pt);
1850
                                g_strlcat(sdptemp, buffer, 2048);
1851
                                if(mp->codecs.video_rtpmap) {
1852
                                        g_snprintf(buffer, 512,
1853
                                                "a=rtpmap:%d %s\r\n",
1854
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
1855
                                        g_strlcat(sdptemp, buffer, 2048);
1856
                                }
1857
                                if(mp->codecs.video_fmtp) {
1858
                                        g_snprintf(buffer, 512,
1859
                                                "a=fmtp:%d %s\r\n",
1860
                                                mp->codecs.video_pt, mp->codecs.video_fmtp);
1861
                                        g_strlcat(sdptemp, buffer, 2048);
1862
                                }
1863
                                g_snprintf(buffer, 512,
1864
                                        "a=rtcp-fb:%d goog-remb\r\n",
1865
                                        mp->codecs.video_pt);
1866
                                g_strlcat(sdptemp, buffer, 2048);
1867
                                g_strlcat(sdptemp, "a=sendonly\r\n", 2048);
1868
                        }
1869
                        sdp = g_strdup(sdptemp);
1870
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
1871
                        result = json_object();
1872
                        json_object_set_new(result, "status", json_string("preparing"));
1873
                } else if(!strcasecmp(request_text, "start")) {
1874
                        if(session->mountpoint == NULL) {
1875
                                JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
1876
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1877
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
1878
                                goto error;
1879
                        }
1880
                        JANUS_LOG(LOG_VERB, "Starting the streaming\n");
1881
                        session->paused = FALSE;
1882
                        result = json_object();
1883
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
1884
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
1885
                } else if(!strcasecmp(request_text, "pause")) {
1886
                        if(session->mountpoint == NULL) {
1887
                                JANUS_LOG(LOG_VERB, "Can't pause: no mountpoint set\n");
1888
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1889
                                g_snprintf(error_cause, 512, "Can't start: no mountpoint set");
1890
                                goto error;
1891
                        }
1892
                        JANUS_LOG(LOG_VERB, "Pausing the streaming\n");
1893
                        session->paused = TRUE;
1894
                        result = json_object();
1895
                        json_object_set_new(result, "status", json_string("pausing"));
1896
                } else if(!strcasecmp(request_text, "switch")) {
1897
                        /* This listener wants to switch to a different mountpoint
1898
                         * NOTE: this only works for live RTP streams as of now: you
1899
                         * cannot, for instance, switch from a live RTP mountpoint to
1900
                         * an on demand one or viceversa (TBD.) */
1901
                        janus_streaming_mountpoint *oldmp = session->mountpoint;
1902
                        if(oldmp == NULL) {
1903
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a mountpoint\n");
1904
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1905
                                g_snprintf(error_cause, 512, "Can't switch: not on a mountpoint");
1906
                                goto error;
1907
                        }
1908
                        if(oldmp->streaming_type != janus_streaming_type_live || 
1909
                                        oldmp->streaming_source != janus_streaming_source_rtp) {
1910
                                JANUS_LOG(LOG_VERB, "Can't switch: not on a live RTP mountpoint\n");
1911
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
1912
                                g_snprintf(error_cause, 512, "Can't switch: not on a live RTP mountpoint");
1913
                                goto error;
1914
                        }
1915
                        json_t *id = json_object_get(root, "id");
1916
                        if(!id) {
1917
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1918
                                error_code = JANUS_STREAMING_ERROR_MISSING_ELEMENT;
1919
                                g_snprintf(error_cause, 512, "Missing element (id)");
1920
                                goto error;
1921
                        }
1922
                        if(!json_is_integer(id)) {
1923
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1924
                                error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT;
1925
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1926
                                goto error;
1927
                        }
1928
                        gint64 id_value = json_integer_value(id);
1929
                        janus_mutex_lock(&mountpoints_mutex);
1930
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
1931
                        if(mp == NULL) {
1932
                                janus_mutex_unlock(&mountpoints_mutex);
1933
                                JANUS_LOG(LOG_VERB, "No such mountpoint/stream %"SCNu64"\n", id_value);
1934
                                error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
1935
                                g_snprintf(error_cause, 512, "No such mountpoint/stream %"SCNu64"", id_value);
1936
                                goto error;
1937
                        }
1938
                        if(mp->streaming_type != janus_streaming_type_live || 
1939
                                        mp->streaming_source != janus_streaming_source_rtp) {
1940
                                janus_mutex_unlock(&mountpoints_mutex);
1941
                                JANUS_LOG(LOG_VERB, "Can't switch: target is not a live RTP mountpoint\n");
1942
                                error_code = JANUS_STREAMING_ERROR_CANT_SWITCH;
1943
                                g_snprintf(error_cause, 512, "Can't switch: target is not a live RTP mountpoint");
1944
                                goto error;
1945
                        }
1946
                        janus_mutex_unlock(&mountpoints_mutex);
1947
                        JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %"SCNu64" (old: %"SCNu64")\n", id_value, oldmp->id);
1948
                        session->paused = TRUE;
1949
                        /* Unsubscribe from the previous mountpoint and subscribe to the new one */
1950
                        janus_mutex_lock(&oldmp->mutex);
1951
                        oldmp->listeners = g_list_remove_all(oldmp->listeners, session);
1952
                        janus_mutex_unlock(&oldmp->mutex);
1953
                        /* Subscribe to the new one */
1954
                        janus_mutex_lock(&mp->mutex);
1955
                        mp->listeners = g_list_append(mp->listeners, session);
1956
                        janus_mutex_unlock(&mp->mutex);
1957
                        session->mountpoint = mp;
1958
                        session->paused = FALSE;
1959
                        /* Done */
1960
                        result = json_object();
1961
                        json_object_set_new(result, "streaming", json_string("event"));
1962
                        json_object_set_new(result, "switched", json_string("ok"));
1963
                        json_object_set_new(result, "id", json_integer(id_value));
1964
                } else if(!strcasecmp(request_text, "stop")) {
1965
                        if(session->stopping || !session->started) {
1966
                                /* Been there, done that: ignore */
1967
                                janus_streaming_message_free(msg);
1968
                                continue;
1969
                        }
1970
                        JANUS_LOG(LOG_VERB, "Stopping the streaming\n");
1971
                        session->stopping = TRUE;
1972
                        session->started = FALSE;
1973
                        session->paused = FALSE;
1974
                        result = json_object();
1975
                        json_object_set_new(result, "status", json_string("stopping"));
1976
                        if(session->mountpoint) {
1977
                                janus_mutex_lock(&session->mountpoint->mutex);
1978
                                JANUS_LOG(LOG_VERB, "  -- Removing the session from the mountpoint listeners\n");
1979
                                if(g_list_find(session->mountpoint->listeners, session) != NULL) {
1980
                                        JANUS_LOG(LOG_VERB, "  -- -- Found!\n");
1981
                                }
1982
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
1983
                                janus_mutex_unlock(&session->mountpoint->mutex);
1984
                        }
1985
                        session->mountpoint = NULL;
1986
                        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1987
                        gateway->close_pc(session->handle);
1988
                } else {
1989
                        JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1990
                        error_code = JANUS_STREAMING_ERROR_INVALID_REQUEST;
1991
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1992
                        goto error;
1993
                }
1994
                
1995
                /* Any SDP to handle? */
1996
                if(msg->sdp) {
1997
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
1998
                }
1999

    
2000
                /* Prepare JSON event */
2001
                json_t *event = json_object();
2002
                json_object_set_new(event, "streaming", json_string("event"));
2003
                if(result != NULL)
2004
                        json_object_set(event, "result", result);
2005
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2006
                json_decref(event);
2007
                if(result != NULL)
2008
                        json_decref(result);
2009
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2010
                int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp);
2011
                JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2012
                g_free(event_text);
2013
                if(sdp)
2014
                        g_free(sdp);
2015
                janus_streaming_message_free(msg);
2016
                continue;
2017
                
2018
error:
2019
                {
2020
                        /* Prepare JSON error event */
2021
                        json_t *event = json_object();
2022
                        json_object_set_new(event, "streaming", json_string("event"));
2023
                        json_object_set_new(event, "error_code", json_integer(error_code));
2024
                        json_object_set_new(event, "error", json_string(error_cause));
2025
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2026
                        json_decref(event);
2027
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
2028
                        int ret = gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL);
2029
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
2030
                        g_free(event_text);
2031
                        janus_streaming_message_free(msg);
2032
                }
2033
        }
2034
        g_free(error_cause);
2035
        JANUS_LOG(LOG_VERB, "Leaving Streaming handler thread\n");
2036
        return NULL;
2037
}
2038

    
2039

    
2040
/* Helpers to destroy a streaming mountpoint. */
2041
static void janus_streaming_rtp_source_free(janus_streaming_rtp_source *source)
2042
{
2043
        free(source);
2044
}
2045

    
2046
static void janus_streaming_file_source_free(janus_streaming_file_source *source) {
2047
        g_free(source->filename);
2048
        free(source);
2049
}
2050

    
2051
static void janus_streaming_mountpoint_free(janus_streaming_mountpoint *mp) {
2052
        mp->destroyed = janus_get_monotonic_time();
2053
        
2054
        g_free(mp->name);
2055
        g_free(mp->description);
2056
        janus_mutex_lock(&mp->mutex);
2057
        g_list_free(mp->listeners);
2058
        janus_mutex_unlock(&mp->mutex);
2059

    
2060
        if (mp->source != NULL && mp->source_destroy != NULL) {
2061
                mp->source_destroy(mp->source);
2062
        }
2063

    
2064
        g_free(mp->codecs.audio_rtpmap);
2065
        g_free(mp->codecs.audio_fmtp);
2066
        g_free(mp->codecs.video_rtpmap);
2067
        g_free(mp->codecs.video_fmtp);
2068

    
2069
        free(mp);
2070
}
2071

    
2072

    
2073
/* Helper to create an RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
2074
janus_streaming_mountpoint *janus_streaming_create_rtp_source(
2075
                uint64_t id, char *name, char *desc,
2076
                gboolean doaudio, uint16_t aport, uint8_t acodec, char *artpmap, char *afmtp,
2077
                gboolean dovideo, uint16_t vport, uint8_t vcodec, char *vrtpmap, char *vfmtp) {
2078
        if(name == NULL) {
2079
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2080
        }
2081
        if(id == 0) {
2082
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2083
        }
2084
        if(!doaudio && !dovideo) {
2085
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, no audio or video have to be streamed...\n");
2086
                return NULL;
2087
        }
2088
        if(doaudio && (aport == 0 || artpmap == NULL)) {
2089
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for audio...\n");
2090
                return NULL;
2091
        }
2092
        if(dovideo && (vport == 0 || vcodec == 0 || vrtpmap == NULL)) {
2093
                JANUS_LOG(LOG_ERR, "Can't add 'rtp' stream, missing mandatory information for video...\n");
2094
                return NULL;
2095
        }
2096
        JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
2097
        janus_streaming_mountpoint *live_rtp = calloc(1, sizeof(janus_streaming_mountpoint));
2098
        if(live_rtp == NULL) {
2099
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2100
                return NULL;
2101
        }
2102
        live_rtp->id = id ? id : g_random_int();
2103
        char tempname[255];
2104
        if(!name) {
2105
                memset(tempname, 0, 255);
2106
                g_snprintf(tempname, 255, "%"SCNu64, live_rtp->id);
2107
        }
2108
        live_rtp->name = g_strdup(name ? name : tempname);
2109
        char *description = NULL;
2110
        if(desc != NULL)
2111
                description = g_strdup(desc);
2112
        else
2113
                description = g_strdup(name ? name : tempname);
2114
        live_rtp->description = description;
2115
        live_rtp->enabled = TRUE;
2116
        live_rtp->active = FALSE;
2117
        live_rtp->streaming_type = janus_streaming_type_live;
2118
        live_rtp->streaming_source = janus_streaming_source_rtp;
2119
        janus_streaming_rtp_source *live_rtp_source = calloc(1, sizeof(janus_streaming_rtp_source));
2120
        if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
2121
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2122
                if(live_rtp->name)
2123
                        g_free(live_rtp->name);
2124
                if(description)
2125
                        g_free(description);
2126
                if(live_rtp_source)
2127
                        g_free(live_rtp_source);
2128
                g_free(live_rtp);
2129
                return NULL;
2130
        }
2131
        live_rtp_source->audio_port = doaudio ? aport : -1;
2132
        live_rtp_source->video_port = dovideo ? vport : -1;
2133
        live_rtp_source->arc = NULL;
2134
        live_rtp_source->vrc = NULL;
2135
        live_rtp->source = live_rtp_source;
2136
        live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free;
2137
        live_rtp->codecs.audio_pt = doaudio ? acodec : -1;
2138
        live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL;
2139
        live_rtp->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL;
2140
        live_rtp->codecs.video_pt = dovideo ? vcodec : -1;
2141
        live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap) : NULL;
2142
        live_rtp->codecs.video_fmtp = dovideo ? (vfmtp ? g_strdup(vfmtp) : NULL) : NULL;
2143
        live_rtp->listeners = NULL;
2144
        live_rtp->destroyed = 0;
2145
        janus_mutex_init(&live_rtp->mutex);
2146
        janus_mutex_lock(&mountpoints_mutex);
2147
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtp->id), live_rtp);
2148
        janus_mutex_unlock(&mountpoints_mutex);
2149
        GError *error = NULL;
2150
        g_thread_try_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp, &error);
2151
        if(error != NULL) {
2152
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTP thread...\n", error->code, error->message ? error->message : "??");
2153
                if(live_rtp->name)
2154
                        g_free(live_rtp->name);
2155
                if(description)
2156
                        g_free(description);
2157
                if(live_rtp_source)
2158
                        g_free(live_rtp_source);
2159
                g_free(live_rtp);
2160
                return NULL;
2161
        }
2162
        return live_rtp;
2163
}
2164

    
2165
/* Helper to create a file/ondemand live source */
2166
janus_streaming_mountpoint *janus_streaming_create_file_source(
2167
                uint64_t id, char *name, char *desc, char *filename,
2168
                gboolean live, gboolean doaudio, gboolean dovideo) {
2169
        if(filename == NULL) {
2170
                JANUS_LOG(LOG_ERR, "Can't add 'live' stream, missing filename...\n");
2171
                return NULL;
2172
        }
2173
        if(name == NULL) {
2174
                JANUS_LOG(LOG_VERB, "Missing name, will generate a random one...\n");
2175
        }
2176
        if(id == 0) {
2177
                JANUS_LOG(LOG_VERB, "Missing id, will generate a random one...\n");
2178
        }
2179
        if(!doaudio && !dovideo) {
2180
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, no audio or video have to be streamed...\n");
2181
                return NULL;
2182
        }
2183
        /* FIXME We don't support video streaming from file yet */
2184
        if(!doaudio || dovideo) {
2185
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, we only support audio file streaming right now...\n");
2186
                return NULL;
2187
        }
2188
        /* TODO We should support something more than raw a-Law and mu-Law streams... */
2189
        if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) {
2190
                JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
2191
                return NULL;
2192
        }
2193
        janus_streaming_mountpoint *file_source = calloc(1, sizeof(janus_streaming_mountpoint));
2194
        if(file_source == NULL) {
2195
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2196
                return NULL;
2197
        }
2198
        file_source->id = id ? id : g_random_int();
2199
        char tempname[255];
2200
        if(!name) {
2201
                memset(tempname, 0, 255);
2202
                g_snprintf(tempname, 255, "%"SCNu64, file_source->id);
2203
        }
2204
        file_source->name = g_strdup(name ? name : tempname);
2205
        char *description = NULL;
2206
        if(desc != NULL)
2207
                description = g_strdup(desc);
2208
        else
2209
                description = g_strdup(name ? name : tempname);
2210
        file_source->description = description;
2211
        file_source->enabled = TRUE;
2212
        file_source->active = FALSE;
2213
        file_source->streaming_type = live ? janus_streaming_type_live : janus_streaming_type_on_demand;
2214
        file_source->streaming_source = janus_streaming_source_file;
2215
        janus_streaming_file_source *file_source_source = calloc(1, sizeof(janus_streaming_file_source));
2216
        if(file_source->name == NULL || description == NULL || file_source_source == NULL) {
2217
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2218
                if(file_source->name)
2219
                        g_free(file_source->name);
2220
                if(description)
2221
                        g_free(description);
2222
                if(file_source_source)
2223
                        g_free(file_source_source);
2224
                g_free(file_source);
2225
                return NULL;
2226
        }
2227
        file_source_source->filename = g_strdup(filename);
2228
        file_source->source = file_source_source;
2229
        file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free;
2230
        file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0;
2231
        file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000");
2232
        file_source->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
2233
        file_source->codecs.video_rtpmap = NULL;
2234
        file_source->listeners = NULL;
2235
        file_source->destroyed = 0;
2236
        janus_mutex_init(&file_source->mutex);
2237
        janus_mutex_lock(&mountpoints_mutex);
2238
        g_hash_table_insert(mountpoints, GINT_TO_POINTER(file_source->id), file_source);
2239
        janus_mutex_unlock(&mountpoints_mutex);
2240
        if(live) {
2241
                GError *error = NULL;
2242
                g_thread_try_new(file_source->name, &janus_streaming_filesource_thread, file_source, &error);
2243
                if(error != NULL) {
2244
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the live filesource thread...\n", error->code, error->message ? error->message : "??");
2245
                        if(file_source->name)
2246
                                g_free(file_source->name);
2247
                        if(description)
2248
                                g_free(description);
2249
                        if(file_source_source)
2250
                                g_free(file_source_source);
2251
                        g_free(file_source);
2252
                        return NULL;
2253
                }
2254
        }
2255
        return file_source;
2256
}
2257

    
2258

    
2259
/* FIXME Thread to send RTP packets from a file (on demand) */
2260
static void *janus_streaming_ondemand_thread(void *data) {
2261
        JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n");
2262
        janus_streaming_session *session = (janus_streaming_session *)data;
2263
        if(!session) {
2264
                JANUS_LOG(LOG_ERR, "Invalid session!\n");
2265
                g_thread_unref(g_thread_self());
2266
                return NULL;
2267
        }
2268
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
2269
        if(!mountpoint) {
2270
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2271
                g_thread_unref(g_thread_self());
2272
                return NULL;
2273
        }
2274
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2275
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2276
                g_thread_unref(g_thread_self());
2277
                return NULL;
2278
        }
2279
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
2280
                JANUS_LOG(LOG_ERR, "[%s] Not an on-demand file source mountpoint!\n", mountpoint->name);
2281
                g_thread_unref(g_thread_self());
2282
                return NULL;
2283
        }
2284
        janus_streaming_file_source *source = mountpoint->source;
2285
        if(source == NULL || source->filename == NULL) {
2286
                g_thread_unref(g_thread_self());
2287
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2288
                return NULL;
2289
        }
2290
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2291
        FILE *audio = fopen(source->filename, "rb");
2292
        if(!audio) {
2293
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2294
                g_thread_unref(g_thread_self());
2295
                return NULL;
2296
        }
2297
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2298
        /* Buffer */
2299
        char *buf = calloc(1024, sizeof(char));
2300
        if(buf == NULL) {
2301
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2302
                g_thread_unref(g_thread_self());
2303
                return NULL;
2304
        }
2305
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2306
        /* Set up RTP */
2307
        gint16 seq = 1;
2308
        gint32 ts = 0;
2309
        rtp_header *header = (rtp_header *)buf;
2310
        header->version = 2;
2311
        header->markerbit = 1;
2312
        header->type = mountpoint->codecs.audio_pt;
2313
        header->seq_number = htons(seq);
2314
        header->timestamp = htonl(ts);
2315
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2316
        /* Timer */
2317
        struct timeval now, before;
2318
        gettimeofday(&before, NULL);
2319
        now.tv_sec = before.tv_sec;
2320
        now.tv_usec = before.tv_usec;
2321
        time_t passed, d_s, d_us;
2322
        /* Loop */
2323
        gint read = 0;
2324
        janus_streaming_rtp_relay_packet packet;
2325
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed && !session->stopping && !session->destroyed) {
2326
                /* See if it's time to prepare a frame */
2327
                gettimeofday(&now, NULL);
2328
                d_s = now.tv_sec - before.tv_sec;
2329
                d_us = now.tv_usec - before.tv_usec;
2330
                if(d_us < 0) {
2331
                        d_us += 1000000;
2332
                        --d_s;
2333
                }
2334
                passed = d_s*1000000 + d_us;
2335
                if(passed < 18000) {        /* Let's wait about 18ms */
2336
                        usleep(1000);
2337
                        continue;
2338
                }
2339
                /* Update the reference time */
2340
                before.tv_usec += 20000;
2341
                if(before.tv_usec > 1000000) {
2342
                        before.tv_sec++;
2343
                        before.tv_usec -= 1000000;
2344
                }
2345
                /* If not started or paused, wait some more */
2346
                if(!session->started || session->paused || !mountpoint->enabled)
2347
                        continue;
2348
                /* Read frame from file... */
2349
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2350
                if(feof(audio)) {
2351
                        /* FIXME We're doing this forever... should this be configurable? */
2352
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2353
                        fseek(audio, 0, SEEK_SET);
2354
                        continue;
2355
                }
2356
                if(read < 0)
2357
                        break;
2358
                if(mountpoint->active == FALSE)
2359
                        mountpoint->active = TRUE;
2360
                //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
2361
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
2362
                //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
2363
                /* Relay on all sessions */
2364
                packet.data = header;
2365
                packet.length = RTP_HEADER_SIZE + read;
2366
                packet.is_video = 0;
2367
                janus_streaming_relay_rtp_packet(session, &packet);
2368
                /* Update header */
2369
                seq++;
2370
                header->seq_number = htons(seq);
2371
                ts += 160;
2372
                header->timestamp = htonl(ts);
2373
                header->markerbit = 0;
2374
        }
2375
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name);
2376
        g_free(name);
2377
        g_free(buf);
2378
        fclose(audio);
2379
        g_thread_unref(g_thread_self());
2380
        return NULL;
2381
}
2382

    
2383
/* FIXME Thread to send RTP packets from a file (live) */
2384
static void *janus_streaming_filesource_thread(void *data) {
2385
        JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n");
2386
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
2387
        if(!mountpoint) {
2388
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2389
                g_thread_unref(g_thread_self());
2390
                return NULL;
2391
        }
2392
        if(mountpoint->streaming_source != janus_streaming_source_file) {
2393
                JANUS_LOG(LOG_ERR, "[%s] Not an file source mountpoint!\n", mountpoint->name);
2394
                g_thread_unref(g_thread_self());
2395
                return NULL;
2396
        }
2397
        if(mountpoint->streaming_type != janus_streaming_type_live) {
2398
                JANUS_LOG(LOG_ERR, "[%s] Not a live file source mountpoint!\n", mountpoint->name);
2399
                g_thread_unref(g_thread_self());
2400
                return NULL;
2401
        }
2402
        janus_streaming_file_source *source = mountpoint->source;
2403
        if(source == NULL || source->filename == NULL) {
2404
                JANUS_LOG(LOG_ERR, "[%s] Invalid file source mountpoint!\n", mountpoint->name);
2405
                g_thread_unref(g_thread_self());
2406
                return NULL;
2407
        }
2408
        JANUS_LOG(LOG_VERB, "[%s] Opening file source %s...\n", mountpoint->name, source->filename);
2409
        FILE *audio = fopen(source->filename, "rb");
2410
        if(!audio) {
2411
                JANUS_LOG(LOG_ERR, "[%s] Ooops, audio file missing!\n", mountpoint->name);
2412
                g_thread_unref(g_thread_self());
2413
                return NULL;
2414
        }
2415
        JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename);
2416
        /* Buffer */
2417
        char *buf = calloc(1024, sizeof(char));
2418
        if(buf == NULL) {
2419
                JANUS_LOG(LOG_FATAL, "[%s] Memory error!\n", mountpoint->name);
2420
                g_thread_unref(g_thread_self());
2421
                return NULL;
2422
        }
2423
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2424
        /* Set up RTP */
2425
        gint16 seq = 1;
2426
        gint32 ts = 0;
2427
        rtp_header *header = (rtp_header *)buf;
2428
        header->version = 2;
2429
        header->markerbit = 1;
2430
        header->type = mountpoint->codecs.audio_pt;
2431
        header->seq_number = htons(seq);
2432
        header->timestamp = htonl(ts);
2433
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
2434
        /* Timer */
2435
        struct timeval now, before;
2436
        gettimeofday(&before, NULL);
2437
        now.tv_sec = before.tv_sec;
2438
        now.tv_usec = before.tv_usec;
2439
        time_t passed, d_s, d_us;
2440
        /* Loop */
2441
        gint read = 0;
2442
        janus_streaming_rtp_relay_packet packet;
2443
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
2444
                /* See if it's time to prepare a frame */
2445
                gettimeofday(&now, NULL);
2446
                d_s = now.tv_sec - before.tv_sec;
2447
                d_us = now.tv_usec - before.tv_usec;
2448
                if(d_us < 0) {
2449
                        d_us += 1000000;
2450
                        --d_s;
2451
                }
2452
                passed = d_s*1000000 + d_us;
2453
                if(passed < 18000) {        /* Let's wait about 18ms */
2454
                        usleep(1000);
2455
                        continue;
2456
                }
2457
                /* Update the reference time */
2458
                before.tv_usec += 20000;
2459
                if(before.tv_usec > 1000000) {
2460
                        before.tv_sec++;
2461
                        before.tv_usec -= 1000000;
2462
                }
2463
                /* If paused, wait some more */
2464
                if(!mountpoint->enabled)
2465
                        continue;
2466
                /* Read frame from file... */
2467
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
2468
                if(feof(audio)) {
2469
                        /* FIXME We're doing this forever... should this be configurable? */
2470
                        JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename);
2471
                        fseek(audio, 0, SEEK_SET);
2472
                        continue;
2473
                }
2474
                if(read < 0)
2475
                        break;
2476
                if(mountpoint->active == FALSE)
2477
                        mountpoint->active = TRUE;
2478
                // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
2479
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
2480
                // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read);
2481
                /* Relay on all sessions */
2482
                packet.data = header;
2483
                packet.length = RTP_HEADER_SIZE + read;
2484
                packet.is_video = 0;
2485
                janus_mutex_lock_nodebug(&mountpoint->mutex);
2486
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
2487
                janus_mutex_unlock_nodebug(&mountpoint->mutex);
2488
                /* Update header */
2489
                seq++;
2490
                header->seq_number = htons(seq);
2491
                ts += 160;
2492
                header->timestamp = htonl(ts);
2493
                header->markerbit = 0;
2494
        }
2495
        JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name);
2496
        g_free(name);
2497
        g_free(buf);
2498
        fclose(audio);
2499
        g_thread_unref(g_thread_self());
2500
        return NULL;
2501
}
2502

    
2503
/* FIXME Test thread to relay RTP frames coming from gstreamer/ffmpeg/others */
2504
static void *janus_streaming_relay_thread(void *data) {
2505
        JANUS_LOG(LOG_VERB, "Starting streaming relay thread\n");
2506
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
2507
        if(!mountpoint) {
2508
                JANUS_LOG(LOG_ERR, "Invalid mountpoint!\n");
2509
                g_thread_unref(g_thread_self());
2510
                return NULL;
2511
        }
2512
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
2513
                JANUS_LOG(LOG_ERR, "[%s] Not an RTP source mountpoint!\n", mountpoint->name);
2514
                g_thread_unref(g_thread_self());
2515
                return NULL;
2516
        }
2517
        janus_streaming_rtp_source *source = mountpoint->source;
2518
        if(source == NULL) {
2519
                JANUS_LOG(LOG_ERR, "[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
2520
                g_thread_unref(g_thread_self());
2521
                return NULL;
2522
        }
2523
        gint audio_port = source->audio_port;
2524
        gint video_port = source->video_port;
2525
        /* Socket stuff */
2526
        struct sockaddr_in audio_address, video_address;
2527
        int audio_fd = 0;
2528
        if(audio_port >= 0) {
2529
                audio_fd = socket(AF_INET, SOCK_DGRAM, 0);
2530
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
2531
                setsockopt(audio_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
2532
                audio_address.sin_family = AF_INET;
2533
                audio_address.sin_port = htons(audio_port);
2534
                audio_address.sin_addr.s_addr = INADDR_ANY;
2535
                if(bind(audio_fd, (struct sockaddr *)(&audio_address), sizeof(struct sockaddr)) < 0) {
2536
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for audio (port %d)...\n", mountpoint->name, audio_port);
2537
                        g_thread_unref(g_thread_self());
2538
                        return NULL;
2539
                }
2540
                JANUS_LOG(LOG_VERB, "[%s] Audio listener bound to port %d\n", mountpoint->name, audio_port);
2541
        }
2542
        int video_fd = 0;
2543
        if(video_port >= 0) {
2544
                video_fd = socket(AF_INET, SOCK_DGRAM, 0);
2545
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
2546
                setsockopt(video_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
2547
                video_address.sin_family = AF_INET;
2548
                video_address.sin_port = htons(video_port);
2549
                video_address.sin_addr.s_addr = INADDR_ANY;
2550
                if(bind(video_fd, (struct sockaddr *)(&video_address), sizeof(struct sockaddr)) < 0) {
2551
                        JANUS_LOG(LOG_ERR, "[%s] Bind failed for video (%d)...\n", mountpoint->name, video_port);
2552
                        g_thread_unref(g_thread_self());
2553
                        return NULL;
2554
                }
2555
                JANUS_LOG(LOG_VERB, "[%s] Video listener bound to port %d\n", mountpoint->name, video_port);
2556
        }
2557
        char *name = g_strdup(mountpoint->name ? mountpoint->name : "??");
2558
        int maxfd = (audio_fd > video_fd) ? audio_fd : video_fd;
2559
        /* Needed to fix seq and ts */
2560
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
2561
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
2562
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
2563
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
2564
        /* Loop */
2565
        socklen_t addrlen;
2566
        struct sockaddr_in remote;
2567
        int resfd = 0, bytes = 0;
2568
        struct timeval timeout;
2569
        fd_set readfds;
2570
        FD_ZERO(&readfds);
2571
        char buffer[1500];
2572
        memset(buffer, 0, 1500);
2573
        janus_streaming_rtp_relay_packet packet;
2574
        while(!g_atomic_int_get(&stopping) && !mountpoint->destroyed) {
2575
                /* Wait for some data */
2576
                if(audio_fd > 0)
2577
                        FD_SET(audio_fd, &readfds);
2578
                if(video_fd > 0)
2579
                        FD_SET(video_fd, &readfds);
2580
                timeout.tv_sec = 1;
2581
                timeout.tv_usec = 0;
2582
                resfd = select(maxfd+1, &readfds, NULL, NULL, &timeout);
2583
                if(resfd < 0)
2584
                        break;
2585
                if(audio_fd > 0 && FD_ISSET(audio_fd, &readfds)) {
2586
                        if(mountpoint->active == FALSE)
2587
                                mountpoint->active = TRUE;
2588
                        /* Got something audio */
2589
                        addrlen = sizeof(remote);
2590
                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
2591
                        // JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the audio channel...\n", bytes);
2592
                        /* If paused, ignore this packet */
2593
                        if(!mountpoint->enabled)
2594
                                continue;
2595
                        rtp_header *rtp = (rtp_header *)buffer;
2596
                        // JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
2597
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
2598
                        /* Relay on all sessions */
2599
                        packet.data = rtp;
2600
                        packet.length = bytes;
2601
                        packet.is_video = 0;
2602
                        /* Do we have a new stream? */
2603
                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
2604
                                a_last_ssrc = ntohl(packet.data->ssrc);
2605
                                JANUS_LOG(LOG_INFO, "[%s] New audio stream! (ssrc=%u)\n", name, a_last_ssrc);
2606
                                a_base_ts_prev = a_last_ts;
2607
                                a_base_ts = ntohl(packet.data->timestamp);
2608
                                a_base_seq_prev = a_last_seq;
2609
                                a_base_seq = ntohs(packet.data->seq_number);
2610
                        }
2611
                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
2612
                        packet.data->timestamp = htonl(a_last_ts);
2613
                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
2614
                        packet.data->seq_number = htons(a_last_seq);
2615
                        // JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
2616
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
2617
                        packet.data->type = mountpoint->codecs.audio_pt;
2618
                        /* Is there a recorder? */
2619
                        if(source->arc) {
2620
                                JANUS_LOG(LOG_HUGE, "[%s] Saving audio frame (%d bytes)\n", name, bytes);
2621
                                janus_recorder_save_frame(source->arc, buffer, bytes);
2622
                        }
2623
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
2624
                        packet.timestamp = ntohl(packet.data->timestamp);
2625
                        packet.seq_number = ntohs(packet.data->seq_number);
2626
                        /* Go! */
2627
                        janus_mutex_lock(&mountpoint->mutex);
2628
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
2629
                        janus_mutex_unlock(&mountpoint->mutex);
2630
                        continue;
2631
                }
2632
                if(video_fd > 0 && FD_ISSET(video_fd, &readfds)) {
2633
                        if(mountpoint->active == FALSE)
2634
                                mountpoint->active = TRUE;
2635
                        /* Got something video */
2636
                        addrlen = sizeof(remote);
2637
                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
2638
                        //~ JANUS_LOG(LOG_VERB, "************************\nGot %d bytes on the video channel...\n", bytes);
2639
                        /* If paused, ignore this packet */
2640
                        if(!mountpoint->enabled)
2641
                                continue;
2642
                        rtp_header *rtp = (rtp_header *)buffer;
2643
                        //~ JANUS_LOG(LOG_VERB, " ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
2644
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
2645
                        /* Relay on all sessions */
2646
                        packet.data = rtp;
2647
                        packet.length = bytes;
2648
                        packet.is_video = 1;
2649
                        /* Do we have a new stream? */
2650
                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
2651
                                v_last_ssrc = ntohl(packet.data->ssrc);
2652
                                JANUS_LOG(LOG_INFO, "[%s] New video stream! (ssrc=%u)\n", name, v_last_ssrc);
2653
                                v_base_ts_prev = v_last_ts;
2654
                                v_base_ts = ntohl(packet.data->timestamp);
2655
                                v_base_seq_prev = v_last_seq;
2656
                                v_base_seq = ntohs(packet.data->seq_number);
2657
                        }
2658
                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
2659
                        packet.data->timestamp = htonl(v_last_ts);
2660
                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
2661
                        packet.data->seq_number = htons(v_last_seq);
2662
                        //~ JANUS_LOG(LOG_VERB, " ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
2663
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
2664
                        packet.data->type = mountpoint->codecs.video_pt;
2665
                        /* Is there a recorder? */
2666
                        if(source->vrc) {
2667
                                JANUS_LOG(LOG_HUGE, "[%s] Saving video frame (%d bytes)\n", name, bytes);
2668
                                janus_recorder_save_frame(source->vrc, buffer, bytes);
2669
                        }
2670
                        /* Backup the actual timestamp and sequence number set by the restreamer, in case switching is involved */
2671
                        packet.timestamp = ntohl(packet.data->timestamp);
2672
                        packet.seq_number = ntohs(packet.data->seq_number);
2673
                        /* Go! */
2674
                        janus_mutex_lock(&mountpoint->mutex);
2675
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
2676
                        janus_mutex_unlock(&mountpoint->mutex);
2677
                        continue;
2678
                }
2679
        }
2680
        JANUS_LOG(LOG_VERB, "[%s] Leaving streaming relay thread\n", name);
2681
        g_free(name);
2682
        g_thread_unref(g_thread_self());
2683
        return NULL;
2684
}
2685

    
2686
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
2687
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
2688
        if(!packet || !packet->data || packet->length < 1) {
2689
                JANUS_LOG(LOG_ERR, "Invalid packet...\n");
2690
                return;
2691
        }
2692
        janus_streaming_session *session = (janus_streaming_session *)data;
2693
        if(!session || !session->handle) {
2694
                // JANUS_LOG(LOG_ERR, "Invalid session...\n");
2695
                return;
2696
        }
2697
        if(!session->started || session->paused) {
2698
                // JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n");
2699
                return;
2700
        }
2701

    
2702
        /* Make sure there hasn't been a publisher switch by checking the SSRC */
2703
        if(packet->is_video) {
2704
                if(ntohl(packet->data->ssrc) != session->context.v_last_ssrc) {
2705
                        session->context.v_last_ssrc = ntohl(packet->data->ssrc);
2706
                        session->context.v_base_ts_prev = session->context.v_last_ts;
2707
                        session->context.v_base_ts = packet->timestamp;
2708
                        session->context.v_base_seq_prev = session->context.v_last_seq;
2709
                        session->context.v_base_seq = packet->seq_number;
2710
                }
2711
                /* Compute a coherent timestamp and sequence number */
2712
                session->context.v_last_ts = (packet->timestamp-session->context.v_base_ts)
2713
                        + session->context.v_base_ts_prev+4500;        /* FIXME When switching, we assume 15fps */
2714
                session->context.v_last_seq = (packet->seq_number-session->context.v_base_seq)+session->context.v_base_seq_prev+1;
2715
                /* Update the timestamp and sequence number in the RTP packet, and send it */
2716
                packet->data->timestamp = htonl(session->context.v_last_ts);
2717
                packet->data->seq_number = htons(session->context.v_last_seq);
2718
                if(gateway != NULL)
2719
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
2720
                /* Restore the timestamp and sequence number to what the publisher set them to */
2721
                packet->data->timestamp = htonl(packet->timestamp);
2722
                packet->data->seq_number = htons(packet->seq_number);
2723
        } else {
2724
                if(ntohl(packet->data->ssrc) != session->context.a_last_ssrc) {
2725
                        session->context.a_last_ssrc = ntohl(packet->data->ssrc);
2726
                        session->context.a_base_ts_prev = session->context.a_last_ts;
2727
                        session->context.a_base_ts = packet->timestamp;
2728
                        session->context.a_base_seq_prev = session->context.a_last_seq;
2729
                        session->context.a_base_seq = packet->seq_number;
2730
                }
2731
                /* Compute a coherent timestamp and sequence number */
2732
                session->context.a_last_ts = (packet->timestamp-session->context.a_base_ts)
2733
                        + session->context.a_base_ts_prev+960;        /* FIXME When switching, we assume Opus and so a 960 ts step */
2734
                session->context.a_last_seq = (packet->seq_number-session->context.a_base_seq)+session->context.a_base_seq_prev+1;
2735
                /* Update the timestamp and sequence number in the RTP packet, and send it */
2736
                packet->data->timestamp = htonl(session->context.a_last_ts);
2737
                packet->data->seq_number = htons(session->context.a_last_seq);
2738
                if(gateway != NULL)
2739
                        gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
2740
                /* Restore the timestamp and sequence number to what the publisher set them to */
2741
                packet->data->timestamp = htonl(packet->timestamp);
2742
                packet->data->seq_number = htons(packet->seq_number);
2743
        }
2744

    
2745
        return;
2746
}