Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_streaming.c @ 5e9e29e0

History | View | Annotate | Download (44.8 KB)

1
/*! \file   janus_streaming.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU Affero General Public License v3
4
 * \brief  Janus Streaming plugin
5
 * \details  This is a streaming plugin for Janus, allowing WebRTC peers
6
 * to watch/listen to pre-recorded files or media generated by another tool.
7
 * Specifically, the plugin currently supports three different type of streams:
8
 * 
9
 * -# on-demand streaming of pre-recorded media files (different
10
 * streaming context for each peer);
11
 * -# live streaming of pre-recorded media files (shared streaming
12
 * context for all peers attached to the stream);
13
 * -# live streaming of media generated by another tool (shared
14
 * streaming context for all peers attached to the stream).
15
 * 
16
 * For what concerns types 1. and 2., considering the proof of concept
17
 * nature of the implementation the only pre-recorded media files
18
 * that the plugins supports right now are raw mu-Law and a-Law files:
19
 * support is of course planned for other additional widespread formats
20
 * as well.
21
 * 
22
 * For what concerns type 3., instead, the plugin is configured
23
 * to listen on a couple of ports for RTP: this means that the plugin
24
 * is implemented to receive RTP on those ports and relay them to all
25
 * peers attached to that stream. Any tool that can generate audio/video
26
 * RTP streams and specify a destination is good for the purpose: the
27
 * examples section contains samples that make use of GStreamer (http://gstreamer.freedesktop.org/)
28
 * but other tools like FFmpeg (http://www.ffmpeg.org/), LibAV (http://libav.org/)
29
 * or others are fine as well. This makes it really easy to capture and
30
 * encode whatever you want using your favourite tool, and then have it
31
 * transparently broadcasted via WebRTC using Janus.
32
 * 
33
 * Streams to make available are listed in the plugin configuration file.
34
 * A pre-filled configuration file is provided in \c conf/janus.plugin.streaming.cfg
35
 * and includes a stream of every type.
36
 * 
37
 * To add more streams or modify the existing ones, you can use the following
38
 * syntax:
39
 * 
40
 * \verbatim
41
[stream-name]
42
type = rtp|live|ondemand
43
       rtp = stream originated by an external tool (e.g., gstreamer or
44
             ffmpeg) and sent to the plugin via RTP
45
       live = local file streamed live to multiple listeners
46
              (multiple listeners = same streaming context)
47
       ondemand = local file streamed on-demand to a single listener
48
                  (multiple listeners = different streaming contexts)
49
id = <unique numeric ID>
50
description = This is my awesome stream
51
filename = path to the local file to stream (only for live/ondemand)
52
audio = yes|no (do/don't stream audio)
53
video = yes|no (do/don't stream video)
54
   The following options are only valid for the 'rtp' type:
55
audioport = local port for receiving audio frames
56
audiopt = <audio RTP payload type> (e.g., 111)
57
audiortpmap = RTP map of the audio codec (e.g., opus/48000/2)
58
videoport = local port for receiving video frames (only for rtp)
59
videopt = <video RTP payload type> (e.g., 100)
60
videortpmap = RTP map of the video codec (e.g., VP8/90000)
61
\endverbatim
62
 *
63
 * \ingroup plugins
64
 * \ref plugins
65
 */
66

    
67
#include "plugin.h"
68

    
69
#include <jansson.h>
70
#include <sys/time.h>
71

    
72
#include "../config.h"
73
#include "../rtp.h"
74
#include "../utils.h"
75

    
76

    
77
/* Plugin information */
78
#define JANUS_STREAMING_VERSION                        1
79
#define JANUS_STREAMING_VERSION_STRING        "0.0.1"
80
#define JANUS_STREAMING_DESCRIPTION                "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by gstreamer."
81
#define JANUS_STREAMING_NAME                        "JANUS Streaming plugin"
82
#define JANUS_STREAMING_PACKAGE                        "janus.plugin.streaming"
83

    
84
/* Plugin methods */
85
janus_plugin *create(void);
86
int janus_streaming_init(janus_callbacks *callback, const char *config_path);
87
void janus_streaming_destroy(void);
88
int janus_streaming_get_version(void);
89
const char *janus_streaming_get_version_string(void);
90
const char *janus_streaming_get_description(void);
91
const char *janus_streaming_get_name(void);
92
const char *janus_streaming_get_package(void);
93
void janus_streaming_create_session(janus_pluginession *handle, int *error);
94
void janus_streaming_handle_message(janus_pluginession *handle, char *transaction, char *message, char *sdp_type, char *sdp);
95
void janus_streaming_setup_media(janus_pluginession *handle);
96
void janus_streaming_incoming_rtp(janus_pluginession *handle, int video, char *buf, int len);
97
void janus_streaming_incoming_rtcp(janus_pluginession *handle, int video, char *buf, int len);
98
void janus_streaming_hangup_media(janus_pluginession *handle);
99
void janus_streaming_destroy_session(janus_pluginession *handle, int *error);
100

    
101
/* Plugin setup */
102
static janus_plugin janus_streaming_plugin =
103
        {
104
                .init = janus_streaming_init,
105
                .destroy = janus_streaming_destroy,
106

    
107
                .get_version = janus_streaming_get_version,
108
                .get_version_string = janus_streaming_get_version_string,
109
                .get_description = janus_streaming_get_description,
110
                .get_name = janus_streaming_get_name,
111
                .get_package = janus_streaming_get_package,
112
                
113
                .create_session = janus_streaming_create_session,
114
                .handle_message = janus_streaming_handle_message,
115
                .setup_media = janus_streaming_setup_media,
116
                .incoming_rtp = janus_streaming_incoming_rtp,
117
                .incoming_rtcp = janus_streaming_incoming_rtcp,
118
                .hangup_media = janus_streaming_hangup_media,
119
                .destroy_session = janus_streaming_destroy_session,
120
        }; 
121

    
122
/* Plugin creator */
123
janus_plugin *create(void) {
124
        JANUS_PRINT("%s created!\n", JANUS_STREAMING_NAME);
125
        return &janus_streaming_plugin;
126
}
127

    
128

    
129
/* Useful stuff */
130
static int initialized = 0, stopping = 0;
131
static janus_callbacks *gateway = NULL;
132
static GThread *handler_thread;
133
static void *janus_streaming_handler(void *data);
134
static void *janus_streaming_ondemand_thread(void *data);
135
static void *janus_streaming_filesource_thread(void *data);
136
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data);
137
static void *janus_streaming_relay_thread(void *data);
138

    
139
typedef enum janus_streaming_type {
140
        janus_streaming_type_none = 0,
141
        janus_streaming_type_live,
142
        janus_streaming_type_on_demand,
143
} janus_streaming_type;
144

    
145
typedef enum janus_streaming_source {
146
        janus_streaming_source_none = 0,
147
        janus_streaming_source_file,
148
        janus_streaming_source_rtp,
149
} janus_streaming_source;
150

    
151
typedef struct janus_streaming_rtp_source {
152
        gint audio_port;
153
        gint video_port;
154
} janus_streaming_rtp_source;
155

    
156
typedef struct janus_streaming_file_source {
157
        char *filename;
158
} janus_streaming_file_source;
159

    
160
typedef struct janus_streaming_codecs {
161
        gint audio_pt;
162
        char *audio_rtpmap;
163
        gint video_pt;
164
        char *video_rtpmap;        /* FIXME We need better ways to describe a codec in SDP */        
165
} janus_streaming_codecs;
166

    
167
typedef struct janus_streaming_mountpoint {
168
        gint64 id;
169
        char *name;
170
        char *description;
171
        gboolean active;
172
        janus_streaming_type streaming_type;
173
        janus_streaming_source streaming_source;
174
        void *source;        /* Can differ according to the source type */
175
        janus_streaming_codecs codecs;
176
        GList *listeners;
177
} janus_streaming_mountpoint;
178
GHashTable *mountpoints;
179

    
180
typedef struct janus_streaming_message {
181
        janus_pluginession *handle;
182
        char *transaction;
183
        char *message;
184
        char *sdp_type;
185
        char *sdp;
186
} janus_streaming_message;
187
GQueue *messages;
188

    
189
typedef struct janus_streaming_session {
190
        janus_pluginession *handle;
191
        janus_streaming_mountpoint *mountpoint;
192
        gboolean started;
193
        gboolean stopping;
194
        gboolean destroy;
195
} janus_streaming_session;
196
GHashTable *sessions;
197

    
198
/* Packets we get from gstreamer and relay */
199
typedef struct janus_streaming_rtp_relay_packet {
200
        rtp_header *data;
201
        gint length;
202
        gint is_video;
203
} janus_streaming_rtp_relay_packet;
204

    
205

    
206
/* Plugin implementation */
207
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
208
        if(stopping) {
209
                /* Still stopping from before */
210
                return -1;
211
        }
212
        if(callback == NULL || config_path == NULL) {
213
                /* Invalid arguments */
214
                return -1;
215
        }
216

    
217
        /* Read configuration */
218
        char filename[255];
219
        sprintf(filename, "%s/%s.cfg", config_path, JANUS_STREAMING_PACKAGE);
220
        JANUS_PRINT("Configuration file: %s\n", filename);
221
        janus_config *config = janus_config_parse(filename);
222
        if(config != NULL)
223
                janus_config_print(config);
224
        
225
        mountpoints = g_hash_table_new(NULL, NULL);
226
        /* Parse configuration to populate the mountpoints */
227
        if(config != NULL) {
228
                janus_config_category *cat = janus_config_get_categories(config);
229
                while(cat != NULL) {
230
                        if(cat->name == NULL) {
231
                                cat = cat->next;
232
                                continue;
233
                        }
234
                        JANUS_PRINT("Adding stream '%s'\n", cat->name);
235
                        janus_config_item *type = janus_config_get_item(cat, "type");
236
                        if(type == NULL || type->value == NULL) {
237
                                JANUS_PRINT("  -- Invalid type, skipping stream...\n");
238
                                cat = cat->next;
239
                                continue;
240
                        }
241
                        if(!strcasecmp(type->value, "rtp")) {
242
                                /* RTP live source (e.g., from gstreamer/ffmpeg/vlc/etc.) */
243
                                janus_config_item *id = janus_config_get_item(cat, "id");
244
                                janus_config_item *desc = janus_config_get_item(cat, "description");
245
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
246
                                janus_config_item *video = janus_config_get_item(cat, "video");
247
                                janus_config_item *aport = janus_config_get_item(cat, "audioport");
248
                                janus_config_item *acodec = janus_config_get_item(cat, "audiopt");
249
                                janus_config_item *artpmap = janus_config_get_item(cat, "audiortpmap");
250
                                janus_config_item *vport = janus_config_get_item(cat, "videoport");
251
                                janus_config_item *vcodec = janus_config_get_item(cat, "videopt");
252
                                janus_config_item *vrtpmap = janus_config_get_item(cat, "videortpmap");
253
                                janus_streaming_mountpoint *live_rtp = calloc(1, sizeof(janus_streaming_mountpoint));
254
                                if(live_rtp == NULL) {
255
                                        JANUS_DEBUG("Memory error!\n");
256
                                        continue;
257
                                }
258
                                if(id == NULL || id->value == NULL) {
259
                                        JANUS_DEBUG("Can't add 'rtp' stream, missing mandatory information...\n");
260
                                        cat = cat->next;
261
                                        continue;
262
                                }
263
                                gboolean doaudio = audio && audio->value && !strcasecmp(audio->value, "yes");
264
                                gboolean dovideo = video && video->value && !strcasecmp(video->value, "yes");
265
                                if(!doaudio && !dovideo) {
266
                                        JANUS_DEBUG("Can't add 'rtp' stream, no audio or video have to be streamed...\n");
267
                                        g_free(live_rtp);
268
                                        cat = cat->next;
269
                                        continue;
270
                                }
271
                                if(doaudio &&
272
                                                (aport == NULL || aport->value == NULL ||
273
                                                acodec == NULL || acodec->value == NULL ||
274
                                                artpmap == NULL || artpmap->value == NULL)) {
275
                                        JANUS_DEBUG("Can't add 'rtp' stream, missing mandatory information for audio...\n");
276
                                        cat = cat->next;
277
                                        continue;
278
                                }
279
                                if(dovideo &&
280
                                                (vport == NULL || vport->value == NULL ||
281
                                                vcodec == NULL || vcodec->value == NULL ||
282
                                                vrtpmap == NULL || vrtpmap->value == NULL)) {
283
                                        JANUS_DEBUG("Can't add 'rtp' stream, missing mandatory information for video...\n");
284
                                        cat = cat->next;
285
                                        continue;
286
                                }
287
                                JANUS_PRINT("Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled");
288
                                live_rtp->name = g_strdup(cat->name);
289
                                live_rtp->id = atoi(id->value);
290
                                char *description = NULL;
291
                                if(desc != NULL && desc->value != NULL)
292
                                        description = g_strdup(desc->value);
293
                                else
294
                                        description = g_strdup(cat->name);
295
                                live_rtp->description = description;
296
                                live_rtp->active = FALSE;
297
                                live_rtp->streaming_type = janus_streaming_type_live;
298
                                live_rtp->streaming_source = janus_streaming_source_rtp;
299
                                janus_streaming_rtp_source *live_rtp_source = calloc(1, sizeof(janus_streaming_rtp_source));
300
                                if(live_rtp->name == NULL || description == NULL || live_rtp_source == NULL) {
301
                                        JANUS_DEBUG("Memory error!\n");
302
                                        if(live_rtp->name)
303
                                                g_free(live_rtp->name);
304
                                        if(description)
305
                                                g_free(description);
306
                                        if(live_rtp_source);
307
                                                g_free(live_rtp_source);
308
                                        g_free(live_rtp);
309
                                        continue;
310
                                }
311
                                live_rtp_source->audio_port = doaudio ? atoi(aport->value) : -1;
312
                                live_rtp_source->video_port = dovideo ? atoi(vport->value) : -1;
313
                                live_rtp->source = live_rtp_source;
314
                                live_rtp->codecs.audio_pt = doaudio ? atoi(acodec->value) : -1;
315
                                live_rtp->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap->value) : NULL;
316
                                live_rtp->codecs.video_pt = dovideo ? atoi(vcodec->value) : -1;
317
                                live_rtp->codecs.video_rtpmap = dovideo ? g_strdup(vrtpmap->value) : NULL;
318
                                live_rtp->listeners = NULL;
319
                                g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_rtp->id), live_rtp);
320
                                g_thread_new(live_rtp->name, &janus_streaming_relay_thread, live_rtp);
321
                        } else if(!strcasecmp(type->value, "live")) {
322
                                /* a-Law file live source */
323
                                janus_config_item *id = janus_config_get_item(cat, "id");
324
                                janus_config_item *desc = janus_config_get_item(cat, "description");
325
                                janus_config_item *file = janus_config_get_item(cat, "filename");
326
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
327
                                janus_config_item *video = janus_config_get_item(cat, "video");
328
                                if(id == NULL || id->value == NULL || file == NULL || file->value == NULL) {
329
                                        JANUS_DEBUG("Can't add 'live' stream, missing mandatory information...\n");
330
                                        cat = cat->next;
331
                                        continue;
332
                                }
333
                                gboolean doaudio = audio && audio->value && !strcasecmp(audio->value, "yes");
334
                                gboolean dovideo = video && video->value && !strcasecmp(video->value, "yes");
335
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
336
                                if(!doaudio || dovideo) {
337
                                        JANUS_DEBUG("Can't add 'live' stream, we only support audio file streaming right now...\n");
338
                                        cat = cat->next;
339
                                        continue;
340
                                }
341
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
342
                                        JANUS_DEBUG("Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
343
                                        cat = cat->next;
344
                                        continue;
345
                                }
346
                                janus_streaming_mountpoint *live_file = calloc(1, sizeof(janus_streaming_mountpoint));
347
                                if(live_file == NULL) {
348
                                        JANUS_DEBUG("Memory error!\n");
349
                                        continue;
350
                                }
351
                                live_file->name = g_strdup(cat->name);
352
                                live_file->id = atoi(id->value);
353
                                char *description = NULL;
354
                                if(desc != NULL && desc->value != NULL)
355
                                        description = g_strdup(desc->value);
356
                                else
357
                                        description = g_strdup(cat->name);
358
                                live_file->description = description;
359
                                live_file->active = FALSE;
360
                                live_file->streaming_type = janus_streaming_type_live;
361
                                live_file->streaming_source = janus_streaming_source_file;
362
                                janus_streaming_file_source *live_file_source = calloc(1, sizeof(janus_streaming_file_source));
363
                                if(live_file->name == NULL || description == NULL || live_file_source == NULL) {
364
                                        JANUS_DEBUG("Memory error!\n");
365
                                        if(live_file->name)
366
                                                g_free(live_file->name);
367
                                        if(description)
368
                                                g_free(description);
369
                                        if(live_file_source);
370
                                                g_free(live_file_source);
371
                                        g_free(live_file);
372
                                        continue;
373
                                }
374
                                live_file_source->filename = g_strdup(file->value);
375
                                live_file->source = live_file_source;
376
                                live_file->codecs.audio_pt = strstr(file->value, ".alaw") ? 8 : 0;
377
                                live_file->codecs.audio_rtpmap = strstr(file->value, ".alaw") ? "PCMA/8000" : "PCMU/8000";
378
                                live_file->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
379
                                live_file->codecs.video_rtpmap = NULL;
380
                                live_file->listeners = NULL;
381
                                g_hash_table_insert(mountpoints, GINT_TO_POINTER(live_file->id), live_file);
382
                                g_thread_new(live_file->name, &janus_streaming_filesource_thread, live_file);
383
                        } else if(!strcasecmp(type->value, "ondemand")) {
384
                                /* mu-Law file on demand source */
385
                                janus_config_item *id = janus_config_get_item(cat, "id");
386
                                janus_config_item *desc = janus_config_get_item(cat, "description");
387
                                janus_config_item *file = janus_config_get_item(cat, "filename");
388
                                janus_config_item *audio = janus_config_get_item(cat, "audio");
389
                                janus_config_item *video = janus_config_get_item(cat, "video");
390
                                if(id == NULL || id->value == NULL || file == NULL || file->value == NULL) {
391
                                        JANUS_DEBUG("Can't add 'ondemand' stream, missing mandatory information...\n");
392
                                        cat = cat->next;
393
                                        continue;
394
                                }
395
                                gboolean doaudio = audio && audio->value && !strcasecmp(audio->value, "yes");
396
                                gboolean dovideo = video && video->value && !strcasecmp(video->value, "yes");
397
                                /* TODO We should support something more than raw a-Law and mu-Law streams... */
398
                                if(!doaudio || dovideo) {
399
                                        JANUS_DEBUG("Can't add 'ondemand' stream, we only support audio file streaming right now...\n");
400
                                        cat = cat->next;
401
                                        continue;
402
                                }
403
                                if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) {
404
                                        JANUS_DEBUG("Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n");
405
                                        cat = cat->next;
406
                                        continue;
407
                                }
408
                                janus_streaming_mountpoint *ondemand_file = calloc(1, sizeof(janus_streaming_mountpoint));
409
                                if(ondemand_file == NULL) {
410
                                        JANUS_DEBUG("Memory error!\n");
411
                                        continue;
412
                                }
413
                                ondemand_file->name = g_strdup(cat->name);
414
                                ondemand_file->id = atoi(id->value);
415
                                char *description = NULL;
416
                                if(desc != NULL && desc->value != NULL)
417
                                        description = g_strdup(desc->value);
418
                                else
419
                                        description = g_strdup(cat->name);
420
                                ondemand_file->description = description;
421
                                ondemand_file->active = FALSE;
422
                                ondemand_file->streaming_type = janus_streaming_type_on_demand;
423
                                ondemand_file->streaming_source = janus_streaming_source_file;
424
                                janus_streaming_file_source *ondemand_file_source = calloc(1, sizeof(janus_streaming_file_source));
425
                                if(ondemand_file->name == NULL || description == NULL || ondemand_file_source == NULL) {
426
                                        JANUS_DEBUG("Memory error!\n");
427
                                        if(ondemand_file->name)
428
                                                g_free(ondemand_file->name);
429
                                        if(description)
430
                                                g_free(description);
431
                                        if(ondemand_file_source);
432
                                                g_free(ondemand_file_source);
433
                                        g_free(ondemand_file);
434
                                        continue;
435
                                }
436
                                ondemand_file_source->filename = g_strdup(file->value);
437
                                ondemand_file->source = ondemand_file_source;
438
                                ondemand_file->codecs.audio_pt = strstr(file->value, ".alaw") ? 8 : 0;
439
                                ondemand_file->codecs.audio_rtpmap = strstr(file->value, ".alaw") ? "PCMA/8000" : "PCMU/8000";
440
                                ondemand_file->codecs.video_pt = -1;        /* FIXME We don't support video for this type yet */
441
                                ondemand_file->codecs.video_rtpmap = NULL;
442
                                ondemand_file->listeners = NULL;
443
                                g_hash_table_insert(mountpoints, GINT_TO_POINTER(ondemand_file->id), ondemand_file);
444
                        }
445
                        cat = cat->next;
446
                }
447
                /* Done */
448
                janus_config_destroy(config);
449
                config = NULL;
450
        }
451
        /* Show available mountpoints */
452
        GList *mountpoints_list = g_hash_table_get_values(mountpoints);
453
        GList *m = mountpoints_list;
454
        while(m) {
455
                janus_streaming_mountpoint *mp = (janus_streaming_mountpoint *)m->data;
456
                JANUS_PRINT("  ::: [%"SCNu64"][%s] %s (%s, %s)\n", mp->id, mp->name, mp->description,
457
                        mp->streaming_type == janus_streaming_type_live ? "live" : "on demand",
458
                        mp->streaming_source == janus_streaming_source_rtp ? "RTP source" : "file source");
459
                m = m->next;
460
        }
461
        g_list_free(mountpoints_list);
462

    
463
        sessions = g_hash_table_new(NULL, NULL);
464
        messages = g_queue_new();
465
        /* This is the callback we'll need to invoke to contact the gateway */
466
        gateway = callback;
467

    
468
        initialized = 1;
469
        /* Launch the thread that will handle incoming messages */
470
        GError *error = NULL;
471
        handler_thread = g_thread_try_new("janus streaming handler", janus_streaming_handler, NULL, &error);
472
        if(error != NULL) {
473
                initialized = 0;
474
                /* Something went wrong... */
475
                JANUS_DEBUG("Got error %d (%s) trying to launch thread...\n", error->code, error->message ? error->message : "??");
476
                return -1;
477
        }
478
        JANUS_PRINT("%s initialized!\n", JANUS_STREAMING_NAME);
479
        return 0;
480
}
481

    
482
void janus_streaming_destroy() {
483
        if(!initialized)
484
                return;
485
        stopping = 1;
486
        if(handler_thread != NULL) {
487
                g_thread_join(handler_thread);
488
        }
489
        handler_thread = NULL;
490
        /* TODO Actually clean up and remove ongoing sessions (and free the mountpoint resources) */
491
        g_hash_table_destroy(mountpoints);
492
        g_hash_table_destroy(sessions);
493
        g_queue_free(messages);
494
        sessions = NULL;
495
        initialized = 0;
496
        stopping = 0;
497
        JANUS_PRINT("%s destroyed!\n", JANUS_STREAMING_NAME);
498
}
499

    
500
int janus_streaming_get_version() {
501
        return JANUS_STREAMING_VERSION;
502
}
503

    
504
const char *janus_streaming_get_version_string() {
505
        return JANUS_STREAMING_VERSION_STRING;
506
}
507

    
508
const char *janus_streaming_get_description() {
509
        return JANUS_STREAMING_DESCRIPTION;
510
}
511

    
512
const char *janus_streaming_get_name() {
513
        return JANUS_STREAMING_NAME;
514
}
515

    
516
const char *janus_streaming_get_package() {
517
        return JANUS_STREAMING_PACKAGE;
518
}
519

    
520
void janus_streaming_create_session(janus_pluginession *handle, int *error) {
521
        if(stopping || !initialized) {
522
                *error = -1;
523
                return;
524
        }        
525
        janus_streaming_session *session = (janus_streaming_session *)calloc(1, sizeof(janus_streaming_session));
526
        if(session == NULL) {
527
                JANUS_DEBUG("Memory error!\n");
528
                *error = -2;
529
                return;
530
        }
531
        session->handle = handle;
532
        session->mountpoint = NULL;        /* This will happen later */
533
        session->started = FALSE;        /* This will happen later */
534
        handle->plugin_handle = session;
535
        g_hash_table_insert(sessions, handle, session);
536

    
537
        return;
538
}
539

    
540
void janus_streaming_destroy_session(janus_pluginession *handle, int *error) {
541
        if(stopping || !initialized) {
542
                *error = -1;
543
                return;
544
        }        
545
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; 
546
        if(!session) {
547
                JANUS_DEBUG("No session associated with this handle...\n");
548
                *error = -2;
549
                return;
550
        }
551
        JANUS_PRINT("Removing streaming session...\n");
552
        /* TODO Actually clean up and remove session */
553
        if(session->mountpoint) {
554
                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
555
        }
556
        g_hash_table_remove(sessions, handle);
557
        session->destroy = TRUE;
558
        g_free(session);
559
        return;
560
}
561

    
562
void janus_streaming_handle_message(janus_pluginession *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
563
        if(stopping || !initialized)
564
                return;
565
        JANUS_PRINT("%s\n", message);
566
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
567
        if(msg == NULL) {
568
                JANUS_DEBUG("Memory error!\n");
569
                return;
570
        }
571
        msg->handle = handle;
572
        msg->message = message;
573
        msg->transaction = transaction ? g_strdup(transaction) : NULL;
574
        msg->sdp_type = sdp_type;
575
        msg->sdp = sdp;
576
        g_queue_push_tail(messages, msg);
577
}
578

    
579
void janus_streaming_setup_media(janus_pluginession *handle) {
580
        JANUS_DEBUG("WebRTC media is now available\n");
581
        if(stopping || !initialized)
582
                return;
583
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
584
        if(!session) {
585
                JANUS_DEBUG("No session associated with this handle...\n");
586
                return;
587
        }
588
        if(session->destroy)
589
                return;
590
        /* TODO Only start streaming when we get this event */
591
        session->started = TRUE;
592
        /* Prepare JSON event */
593
        json_t *event = json_object();
594
        json_object_set(event, "streaming", json_string("event"));
595
        json_t *result = json_object();
596
        json_object_set_new(result, "status", json_string("started"));
597
        json_object_set(event, "result", result);
598
        char *event_text = json_dumps(event, JSON_INDENT(3));
599
        json_decref(event);
600
        json_decref(result);
601
        JANUS_PRINT("Pushing event: %s\n", event_text);
602
        JANUS_PRINT("  >> %d\n", gateway->push_event(handle, &janus_streaming_plugin, NULL, event_text, NULL, NULL));
603
}
604

    
605
void janus_streaming_incoming_rtp(janus_pluginession *handle, int video, char *buf, int len) {
606
        if(stopping || !initialized)
607
                return;
608
        /* FIXME We don't care about what the browser sends us, we're sendonly */
609
}
610

    
611
void janus_streaming_incoming_rtcp(janus_pluginession *handle, int video, char *buf, int len) {
612
        if(stopping || !initialized)
613
                return;
614
        /* FIXME Maybe we should care about RTCP, but not now */
615
}
616

    
617
void janus_streaming_hangup_media(janus_pluginession *handle) {
618
        JANUS_PRINT("No WebRTC media anymore\n");
619
        if(stopping || !initialized)
620
                return;
621
        janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;        
622
        if(!session) {
623
                JANUS_DEBUG("No session associated with this handle...\n");
624
                return;
625
        }
626
        if(session->destroy)
627
                return;
628
        /* FIXME Simulate a "stop" coming from the browser */
629
        janus_streaming_message *msg = calloc(1, sizeof(janus_streaming_message));
630
        if(msg == NULL) {
631
                JANUS_DEBUG("Memory error!\n");
632
                return;
633
        }
634
        msg->handle = handle;
635
        msg->message = "{\"request\":\"stop\"}";
636
        msg->transaction = NULL;
637
        msg->sdp_type = NULL;
638
        msg->sdp = NULL;
639
        g_queue_push_tail(messages, msg);
640
}
641

    
642
/* Thread to handle incoming messages */
643
static void *janus_streaming_handler(void *data) {
644
        JANUS_DEBUG("Joining thread\n");
645
        janus_streaming_message *msg = NULL;
646
        char *error_cause = calloc(1024, sizeof(char));
647
        if(error_cause == NULL) {
648
                JANUS_DEBUG("Memory error!\n");
649
                return NULL;
650
        }
651
        while(initialized && !stopping) {
652
                if(!messages || (msg = g_queue_pop_head(messages)) == NULL) {
653
                        usleep(50000);
654
                        continue;
655
                }
656
                janus_streaming_session *session = (janus_streaming_session *)msg->handle->plugin_handle;
657
                if(!session) {
658
                        JANUS_DEBUG("No session associated with this handle...\n");
659
                        continue;
660
                }
661
                if(session->destroy)
662
                        continue;
663
                /* Handle request */
664
                JANUS_PRINT("Handling message: %s\n", msg->message);
665
                if(msg->message == NULL) {
666
                        JANUS_DEBUG("No message??\n");
667
                        sprintf(error_cause, "%s", "No message??");
668
                        goto error;
669
                }
670
                json_error_t error;
671
                json_t *root = json_loads(msg->message, 0, &error);
672
                if(!root) {
673
                        JANUS_DEBUG("JSON error: on line %d: %s\n", error.line, error.text);
674
                        sprintf(error_cause, "JSON error: on line %d: %s", error.line, error.text);
675
                        goto error;
676
                }
677
                if(!json_is_object(root)) {
678
                        JANUS_DEBUG("JSON error: not an object\n");
679
                        sprintf(error_cause, "JSON error: not an object");
680
                        goto error;
681
                }
682
                json_t *request = json_object_get(root, "request");
683
                if(!request || !json_is_string(request)) {
684
                        JANUS_DEBUG("JSON error: invalid element (request)\n");
685
                        sprintf(error_cause, "JSON error: invalid element (request)");
686
                        goto error;
687
                }
688
                const char *request_text = json_string_value(request);
689
                json_t *result = NULL;
690
                char *sdp_type = NULL, *sdp = NULL;
691
                if(!strcasecmp(request_text, "list")) {
692
                        result = json_object();
693
                        json_t *list = json_array();
694
                        JANUS_PRINT("Request for the list of mountpoints\n");
695
                        /* Return a list of all available mountpoints */
696
                        GList *mountpoints_list = g_hash_table_get_values(mountpoints);
697
                        GList *m = mountpoints_list;
698
                        while(m) {
699
                                janus_streaming_mountpoint *mp = (janus_streaming_mountpoint *)m->data;
700
                                json_t *ml = json_object();
701
                                json_object_set_new(ml, "id", json_integer(mp->id));
702
                                json_object_set_new(ml, "description", json_string(mp->description));
703
                                json_object_set_new(ml, "type", json_string(mp->streaming_type == janus_streaming_type_live ? "live" : "on demand"));
704
                                json_array_append_new(list, ml);
705
                                m = m->next;
706
                        }
707
                        json_object_set_new(result, "list", list);
708
                        g_list_free(mountpoints_list);
709
                } else if(!strcasecmp(request_text, "watch")) {
710
                        json_t *id = json_object_get(root, "id");
711
                        if(id && !json_is_integer(id)) {
712
                                JANUS_DEBUG("JSON error: invalid element (id)\n");
713
                                sprintf(error_cause, "JSON error: invalid element (id)");
714
                                goto error;
715
                        }
716
                        gint64 id_value = json_integer_value(id);
717
                        janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, GINT_TO_POINTER(id_value));
718
                        if(mp == NULL) {
719
                                JANUS_PRINT("No such mountpoint/stream %"SCNu64"\n", id_value);
720
                                sprintf(error_cause, "No such mountpoint/stream %"SCNu64"", id_value);
721
                                goto error;
722
                        }
723
                        JANUS_PRINT("Request to watch mountpoint/stream %"SCNu64"\n", id_value);
724
                        session->stopping = FALSE;
725
                        session->mountpoint = mp;
726
                        if(mp->streaming_type == janus_streaming_type_on_demand) {
727
                                g_thread_new(session->mountpoint->name, &janus_streaming_ondemand_thread, session);
728
                        }
729
                        /* TODO Check if user is already watching a stream, if the video is active, etc. */
730
                        mp->listeners = g_list_append(mp->listeners, session);
731
                        sdp_type = "offer";        /* We're always going to do the offer ourselves, never answer */
732
                        char sdptemp[1024];
733
                        memset(sdptemp, 0, 1024);
734
                        gchar buffer[100];
735
                        memset(buffer, 0, 100);
736
                        gint64 sessid = janus_get_monotonic_time();
737
                        gint64 version = sessid;        /* FIXME This needs to be increased when it changes, so time should be ok */
738
                        g_sprintf(buffer,
739
                                "v=0\r\no=%s %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n",
740
                                        "-", sessid, version);
741
                        g_strlcat(sdptemp, buffer, 1024);
742
                        g_strlcat(sdptemp, "s=Streaming Test\r\nt=0 0\r\n", 1024);
743
                        if(mp->codecs.audio_pt >= 0) {
744
                                /* Add audio line */
745
                                g_sprintf(buffer,
746
                                        "m=audio 1 RTP/SAVPF %d\r\n"
747
                                        "c=IN IP4 1.1.1.1\r\n",
748
                                        mp->codecs.audio_pt);
749
                                g_strlcat(sdptemp, buffer, 1024);
750
                                if(mp->codecs.audio_rtpmap) {
751
                                        g_sprintf(buffer,
752
                                                "a=rtpmap:%d %s\r\n",
753
                                                mp->codecs.audio_pt, mp->codecs.audio_rtpmap);
754
                                        g_strlcat(sdptemp, buffer, 1024);
755
                                }
756
                                g_strlcat(sdptemp, "a=mid:audio\r\na=sendonly\r\n", 1024);
757
                        }
758
                        if(mp->codecs.video_pt >= 0) {
759
                                /* Add video line */
760
                                g_sprintf(buffer,
761
                                        "m=video 1 RTP/SAVPF %d\r\n"
762
                                        "c=IN IP4 1.1.1.1\r\n",
763
                                        mp->codecs.video_pt);
764
                                g_strlcat(sdptemp, buffer, 1024);
765
                                if(mp->codecs.video_rtpmap) {
766
                                        g_sprintf(buffer,
767
                                                "a=rtpmap:%d %s\r\n",
768
                                                mp->codecs.video_pt, mp->codecs.video_rtpmap);
769
                                        g_strlcat(sdptemp, buffer, 1024);
770
                                }
771
                                g_strlcat(sdptemp, "a=mid:video\r\na=sendonly\r\n", 1024);
772
                        }
773
                        sdp = g_strdup(sdptemp);
774
                        JANUS_PRINT("Going to offer this SDP:\n%s\n", sdp);
775
                        result = json_object();
776
                        json_object_set_new(result, "status", json_string("preparing"));
777
                } else if(!strcasecmp(request_text, "start")) {
778
                        JANUS_PRINT("Starting the streaming\n");
779
                        result = json_object();
780
                        /* We wait for the setup_media event to start: on the other hand, it may have already arrived */
781
                        json_object_set_new(result, "status", json_string(session->started ? "started" : "starting"));
782
                } else if(!strcasecmp(request_text, "pause")) {
783
                        JANUS_PRINT("Pausing the streaming\n");
784
                        session->started = FALSE;
785
                        result = json_object();
786
                        json_object_set_new(result, "status", json_string("pausing"));
787
                } else if(!strcasecmp(request_text, "stop")) {
788
                        JANUS_PRINT("Stopping the streaming\n");
789
                        session->stopping = TRUE;
790
                        session->started = FALSE;
791
                        result = json_object();
792
                        json_object_set_new(result, "status", json_string("stopping"));
793
                        if(session->mountpoint) {
794
                                JANUS_PRINT("  -- Removing the session from the mountpoint listeners\n");
795
                                if(g_list_find(session->mountpoint->listeners, session) != NULL)
796
                                        JANUS_PRINT("  -- -- Found!\n");
797
                                session->mountpoint->listeners = g_list_remove_all(session->mountpoint->listeners, session);
798
                        }
799
                        session->mountpoint = NULL;
800
                } else {
801
                        JANUS_PRINT("Unknown request '%s'\n", request_text);
802
                        sprintf(error_cause, "Unknown request '%s'", request_text);
803
                        goto error;
804
                }
805
                
806
                /* Any SDP to handle? */
807
                if(msg->sdp) {
808
                        JANUS_PRINT("This is involving a negotiation (%s) as well (but we really don't care):\n%s\n", msg->sdp_type, msg->sdp);
809
                }
810

    
811
                json_decref(root);
812
                /* Prepare JSON event */
813
                json_t *event = json_object();
814
                json_object_set(event, "streaming", json_string("event"));
815
                if(result != NULL)
816
                        json_object_set(event, "result", result);
817
                char *event_text = json_dumps(event, JSON_INDENT(3));
818
                json_decref(event);
819
                if(result != NULL)
820
                        json_decref(result);
821
                JANUS_PRINT("Pushing event: %s\n", event_text);
822
                JANUS_PRINT("  >> %d\n", gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, sdp_type, sdp));
823
                if(sdp)
824
                        g_free(sdp);
825
                continue;
826
                
827
error:
828
                {
829
                        if(root != NULL)
830
                                json_decref(root);
831
                        /* Prepare JSON error event */
832
                        json_t *event = json_object();
833
                        json_object_set(event, "streaming", json_string("event"));
834
                        json_object_set(event, "error", json_string(error_cause));
835
                        char *event_text = json_dumps(event, JSON_INDENT(3));
836
                        json_decref(event);
837
                        JANUS_PRINT("Pushing event: %s\n", event_text);
838
                        JANUS_PRINT("  >> %d\n", gateway->push_event(msg->handle, &janus_streaming_plugin, msg->transaction, event_text, NULL, NULL));
839
                }
840
        }
841
        JANUS_DEBUG("Leaving thread\n");
842
        return NULL;
843
}
844

    
845
/* FIXME Thread to send RTP packets from a file (on demand) */
846
static void *janus_streaming_ondemand_thread(void *data) {
847
        JANUS_DEBUG("Filesource (on demand) RTP thread starting...\n");
848
        janus_streaming_session *session = (janus_streaming_session *)data;
849
        if(!session) {
850
                JANUS_DEBUG("Invalid session!\n");
851
                return NULL;
852
        }
853
        janus_streaming_mountpoint *mountpoint = session->mountpoint;
854
        if(!mountpoint) {
855
                JANUS_DEBUG("Invalid mountpoint!\n");
856
                return NULL;
857
        }
858
        if(mountpoint->streaming_source != janus_streaming_source_file) {
859
                JANUS_DEBUG("Not an file source mountpoint!\n");
860
                return NULL;
861
        }
862
        if(mountpoint->streaming_type != janus_streaming_type_on_demand) {
863
                JANUS_PRINT("Not an on-demand file source mountpoint!\n");
864
                return NULL;
865
        }
866
        janus_streaming_file_source *source = mountpoint->source;
867
        if(source == NULL || source->filename == NULL) {
868
                JANUS_PRINT("Invalid file source mountpoint!\n");
869
                return NULL;
870
        }
871
        JANUS_PRINT("Opening file source %s...\n", source->filename);
872
        FILE *audio = fopen(source->filename, "rb");
873
        if(!audio) {
874
                JANUS_PRINT("Ooops, audio file missing!\n");
875
                return NULL;
876
        }
877
        JANUS_PRINT("Streaming audio file: %s\n", source->filename);
878
        /* Buffer */
879
        char *buf = calloc(1024, sizeof(char));
880
        if(buf == NULL) {
881
                JANUS_DEBUG("Memory error!\n");
882
                return NULL;
883
        }
884
        /* Set up RTP */
885
        gint16 seq = 1;
886
        gint32 ts = 0;
887
        rtp_header *header = (rtp_header *)buf;
888
        header->version = 2;
889
        header->markerbit = 1;
890
        header->type = mountpoint->codecs.audio_pt;
891
        header->seq_number = htons(seq);
892
        header->timestamp = htonl(ts);
893
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
894
        /* Timer */
895
        struct timeval now, before;
896
        gettimeofday(&before, NULL);
897
        now.tv_sec = before.tv_sec;
898
        now.tv_usec = before.tv_usec;
899
        time_t passed, d_s, d_us;
900
        /* Loop */
901
        gint read = 0;
902
        janus_streaming_rtp_relay_packet packet;
903
        while(!stopping && !session->stopping && !session->destroy) {
904
                /* See if it's time to prepare a frame */
905
                gettimeofday(&now, NULL);
906
                d_s = now.tv_sec - before.tv_sec;
907
                d_us = now.tv_usec - before.tv_usec;
908
                if(d_us < 0) {
909
                        d_us += 1000000;
910
                        --d_s;
911
                }
912
                passed = d_s*1000000 + d_us;
913
                if(passed < 18000) {        /* Let's wait about 18ms */
914
                        usleep(1000);
915
                        continue;
916
                }
917
                /* Update the reference time */
918
                before.tv_usec += 20000;
919
                if(before.tv_usec > 1000000) {
920
                        before.tv_sec++;
921
                        before.tv_usec -= 1000000;
922
                }
923
                /* If not started or paused, wait some more */
924
                if(!session->started)
925
                        continue;
926
                /* Read frame from file... */
927
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
928
                if(feof(audio)) {
929
                        /* FIXME We're doing this forever... should this be configurable? */
930
                        JANUS_PRINT("Rewind! (%s)\n", source->filename);
931
                        fseek(audio, 0, SEEK_SET);
932
                        continue;
933
                }
934
                if(read < 0)
935
                        break;
936
                if(mountpoint->active == FALSE)
937
                        mountpoint->active = TRUE;
938
                //~ JANUS_PRINT(" ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
939
                        //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp));
940
                //~ JANUS_PRINT(" ... Read %d bytes from the audio file...\n", read);
941
                /* Relay on all sessions */
942
                packet.data = header;
943
                packet.length = RTP_HEADER_SIZE + read;
944
                packet.is_video = 0;
945
                janus_streaming_relay_rtp_packet(session, &packet);
946
                /* Update header */
947
                seq++;
948
                header->seq_number = htons(seq);
949
                ts += 160;
950
                header->timestamp = htonl(ts);
951
                header->markerbit = 0;
952
        }
953
        JANUS_DEBUG("Leaving filesource thread\n");
954
        fclose(audio);
955
        return NULL;
956
}
957

    
958
/* FIXME Thread to send RTP packets from a file (live) */
959
static void *janus_streaming_filesource_thread(void *data) {
960
        JANUS_DEBUG("Filesource RTP thread starting...\n");
961
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
962
        if(!mountpoint) {
963
                JANUS_DEBUG("Invalid mountpoint!\n");
964
                return NULL;
965
        }
966
        if(mountpoint->streaming_source != janus_streaming_source_file) {
967
                JANUS_DEBUG("Not an file source mountpoint!\n");
968
                return NULL;
969
        }
970
        if(mountpoint->streaming_type != janus_streaming_type_live) {
971
                JANUS_DEBUG("Not a live file source mountpoint!\n");
972
                return NULL;
973
        }
974
        janus_streaming_file_source *source = mountpoint->source;
975
        if(source == NULL || source->filename == NULL) {
976
                JANUS_DEBUG("Invalid file source mountpoint!\n");
977
                return NULL;
978
        }
979
        JANUS_PRINT("Opening file source %s...\n", source->filename);
980
        FILE *audio = fopen(source->filename, "rb");
981
        if(!audio) {
982
                JANUS_DEBUG("Ooops, audio file missing!\n");
983
                return NULL;
984
        }
985
        JANUS_PRINT("Streaming audio file: %s\n", source->filename);
986
        /* Buffer */
987
        char *buf = calloc(1024, sizeof(char));
988
        if(buf == NULL) {
989
                JANUS_DEBUG("Memory error!\n");
990
                return NULL;
991
        }
992
        /* Set up RTP */
993
        gint16 seq = 1;
994
        gint32 ts = 0;
995
        rtp_header *header = (rtp_header *)buf;
996
        header->version = 2;
997
        header->markerbit = 1;
998
        header->type = mountpoint->codecs.audio_pt;
999
        header->seq_number = htons(seq);
1000
        header->timestamp = htonl(ts);
1001
        header->ssrc = htonl(1);        /* The gateway will fix this anyway */
1002
        /* Timer */
1003
        struct timeval now, before;
1004
        gettimeofday(&before, NULL);
1005
        now.tv_sec = before.tv_sec;
1006
        now.tv_usec = before.tv_usec;
1007
        time_t passed, d_s, d_us;
1008
        /* Loop */
1009
        gint read = 0;
1010
        janus_streaming_rtp_relay_packet packet;
1011
        while(!stopping) {        /* FIXME We need a per-mountpoint watchdog as well */
1012
                /* See if it's time to prepare a frame */
1013
                gettimeofday(&now, NULL);
1014
                d_s = now.tv_sec - before.tv_sec;
1015
                d_us = now.tv_usec - before.tv_usec;
1016
                if(d_us < 0) {
1017
                        d_us += 1000000;
1018
                        --d_s;
1019
                }
1020
                passed = d_s*1000000 + d_us;
1021
                if(passed < 18000) {        /* Let's wait about 18ms */
1022
                        usleep(1000);
1023
                        continue;
1024
                }
1025
                /* Update the reference time */
1026
                before.tv_usec += 20000;
1027
                if(before.tv_usec > 1000000) {
1028
                        before.tv_sec++;
1029
                        before.tv_usec -= 1000000;
1030
                }
1031
                /* Read frame from file... */
1032
                read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio);
1033
                if(feof(audio)) {
1034
                        /* FIXME We're doing this forever... should this be configurable? */
1035
                        JANUS_PRINT("Rewind! (%s)\n", source->filename);
1036
                        fseek(audio, 0, SEEK_SET);
1037
                        continue;
1038
                }
1039
                if(read < 0)
1040
                        break;
1041
                if(mountpoint->active == FALSE)
1042
                        mountpoint->active = TRUE;
1043
                // JANUS_PRINT(" ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n",
1044
                        // header->type, ntohs(header->seq_number), ntohl(header->timestamp));
1045
                // JANUS_PRINT(" ... Read %d bytes from the audio file...\n", read);
1046
                /* Relay on all sessions */
1047
                packet.data = header;
1048
                packet.length = RTP_HEADER_SIZE + read;
1049
                packet.is_video = 0;
1050
                g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
1051
                /* Update header */
1052
                seq++;
1053
                header->seq_number = htons(seq);
1054
                ts += 160;
1055
                header->timestamp = htonl(ts);
1056
                header->markerbit = 0;
1057
        }
1058
        JANUS_DEBUG("Leaving filesource thread\n");
1059
        fclose(audio);
1060
        return NULL;
1061
}
1062

    
1063
/* FIXME Test thread to relay RTP frames coming from gstreamer */
1064
static void *janus_streaming_relay_thread(void *data) {
1065
        JANUS_DEBUG("Starting relay thread\n");
1066
        janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data;
1067
        if(!mountpoint) {
1068
                JANUS_DEBUG("Invalid mountpoint!\n");
1069
                return NULL;
1070
        }
1071
        if(mountpoint->streaming_source != janus_streaming_source_rtp) {
1072
                JANUS_DEBUG("[%s] Not an RTP source mountpoint!\n", mountpoint->name);
1073
                return NULL;
1074
        }
1075
        janus_streaming_rtp_source *source = mountpoint->source;
1076
        if(source == NULL) {
1077
                JANUS_DEBUG("[%s] Invalid RTP source mountpoint!\n", mountpoint->name);
1078
                return NULL;
1079
        }
1080
        gint audio_port = source->audio_port;
1081
        gint video_port = source->video_port;
1082
        /* Socket stuff */
1083
        struct sockaddr_in audio_address, video_address;
1084
        int audio_fd = 0;
1085
        if(audio_port >= 0) {
1086
                audio_fd = socket(AF_INET, SOCK_DGRAM, 0);
1087
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
1088
                setsockopt(audio_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
1089
                audio_address.sin_family = AF_INET;
1090
                audio_address.sin_port = htons(audio_port);
1091
                audio_address.sin_addr.s_addr = INADDR_ANY;
1092
                if(bind(audio_fd, (struct sockaddr *)(&audio_address), sizeof(struct sockaddr)) < 0) {
1093
                        JANUS_DEBUG("[%s] Bind failed for audio (port %d)...\n", mountpoint->name, audio_port);
1094
                        return NULL;
1095
                }
1096
                JANUS_PRINT("[%s] Audio listener bound to port %d\n", mountpoint->name, audio_port);
1097
        }
1098
        int video_fd = 0;
1099
        if(video_port >= 0) {
1100
                video_fd = socket(AF_INET, SOCK_DGRAM, 0);
1101
                int yes = 1;        /* For setsockopt() SO_REUSEADDR */
1102
                setsockopt(video_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
1103
                video_address.sin_family = AF_INET;
1104
                video_address.sin_port = htons(video_port);
1105
                video_address.sin_addr.s_addr = INADDR_ANY;
1106
                if(bind(video_fd, (struct sockaddr *)(&video_address), sizeof(struct sockaddr)) < 0) {
1107
                        JANUS_DEBUG("[%s] Bind failed for video (%d)...\n", mountpoint->name, video_port);
1108
                        return NULL;
1109
                }
1110
                JANUS_PRINT("[%s] Video listener bound to port %d\n", mountpoint->name, video_port);
1111
        }
1112
        int maxfd = (audio_fd > video_fd) ? audio_fd : video_fd;
1113
        /* Needed to fix seq and ts */
1114
        uint32_t a_last_ssrc = 0, a_last_ts = 0, a_base_ts = 0, a_base_ts_prev = 0,
1115
                        v_last_ssrc = 0, v_last_ts = 0, v_base_ts = 0, v_base_ts_prev = 0;
1116
        uint16_t a_last_seq = 0, a_base_seq = 0, a_base_seq_prev = 0,
1117
                        v_last_seq = 0, v_base_seq = 0, v_base_seq_prev = 0;
1118
        /* Loop */
1119
        socklen_t addrlen;
1120
        struct sockaddr_in remote;
1121
        int resfd = 0, bytes = 0;
1122
        struct timeval timeout;
1123
        fd_set readfds;
1124
        FD_ZERO(&readfds);
1125
        char buffer[1500];
1126
        memset(buffer, 0, 1500);
1127
        janus_streaming_rtp_relay_packet packet;
1128
        while(!stopping) {        /* FIXME We need a per-mountpoint watchdog as well */
1129
                /* Wait for some data */
1130
                if(audio_fd > 0)
1131
                        FD_SET(audio_fd, &readfds);
1132
                if(video_fd > 0)
1133
                        FD_SET(video_fd, &readfds);
1134
                timeout.tv_sec = 1;
1135
                timeout.tv_usec = 0;
1136
                resfd = select(maxfd+1, &readfds, NULL, NULL, &timeout);
1137
                if(resfd < 0)
1138
                        break;
1139
                if(audio_fd > 0 && FD_ISSET(audio_fd, &readfds)) {
1140
                        if(mountpoint->active == FALSE)
1141
                                mountpoint->active = TRUE;
1142
                        /* Got something audio */
1143
                        addrlen = sizeof(remote);
1144
                        bytes = recvfrom(audio_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
1145
                        // JANUS_PRINT("************************\nGot %d bytes on the audio channel...\n", bytes);
1146
                        rtp_header *rtp = (rtp_header *)buffer;
1147
                        // JANUS_PRINT(" ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
1148
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
1149
                        /* Relay on all sessions */
1150
                        packet.data = rtp;
1151
                        packet.length = bytes;
1152
                        packet.is_video = 0;
1153
                        /* Do we have a new stream? */
1154
                        if(ntohl(packet.data->ssrc) != a_last_ssrc) {
1155
                                a_last_ssrc = ntohl(packet.data->ssrc);
1156
                                JANUS_PRINT("[%s] New audio stream! (ssrc=%u)\n", mountpoint->name, a_last_ssrc);
1157
                                a_base_ts_prev = a_last_ts;
1158
                                a_base_ts = ntohl(packet.data->timestamp);
1159
                                a_base_seq_prev = a_last_seq;
1160
                                a_base_seq = ntohs(packet.data->seq_number);
1161
                        }
1162
                        a_last_ts = (ntohl(packet.data->timestamp)-a_base_ts)+a_base_ts_prev+960;        /* FIXME We're assuming Opus here... */
1163
                        packet.data->timestamp = htonl(a_last_ts);
1164
                        a_last_seq = (ntohs(packet.data->seq_number)-a_base_seq)+a_base_seq_prev+1;
1165
                        packet.data->seq_number = htons(a_last_seq);
1166
                        // JANUS_PRINT(" ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
1167
                                // ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
1168
                        packet.data->type = mountpoint->codecs.audio_pt;
1169
                        /* Go! */
1170
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
1171
                        continue;
1172
                }
1173
                if(video_fd > 0 && FD_ISSET(video_fd, &readfds)) {
1174
                        if(mountpoint->active == FALSE)
1175
                                mountpoint->active = TRUE;
1176
                        /* Got something video */
1177
                        addrlen = sizeof(remote);
1178
                        bytes = recvfrom(video_fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen);
1179
                        //~ JANUS_PRINT("************************\nGot %d bytes on the video channel...\n", bytes);
1180
                        rtp_header *rtp = (rtp_header *)buffer;
1181
                        //~ JANUS_PRINT(" ... parsed RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
1182
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
1183
                        /* Relay on all sessions */
1184
                        packet.data = rtp;
1185
                        packet.length = bytes;
1186
                        packet.is_video = 1;
1187
                        /* Do we have a new stream? */
1188
                        if(ntohl(packet.data->ssrc) != v_last_ssrc) {
1189
                                v_last_ssrc = ntohl(packet.data->ssrc);
1190
                                JANUS_PRINT("[%s] New video stream! (ssrc=%u)\n", mountpoint->name, v_last_ssrc);
1191
                                v_base_ts_prev = v_last_ts;
1192
                                v_base_ts = ntohl(packet.data->timestamp);
1193
                                v_base_seq_prev = v_last_seq;
1194
                                v_base_seq = ntohs(packet.data->seq_number);
1195
                        }
1196
                        v_last_ts = (ntohl(packet.data->timestamp)-v_base_ts)+v_base_ts_prev+4500;        /* FIXME We're assuming 15fps here... */
1197
                        packet.data->timestamp = htonl(v_last_ts);
1198
                        v_last_seq = (ntohs(packet.data->seq_number)-v_base_seq)+v_base_seq_prev+1;
1199
                        packet.data->seq_number = htons(v_last_seq);
1200
                        //~ JANUS_PRINT(" ... updated RTP packet (ssrc=%u, pt=%u, seq=%u, ts=%u)...\n",
1201
                                //~ ntohl(rtp->ssrc), rtp->type, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
1202
                        packet.data->type = mountpoint->codecs.video_pt;
1203
                        /* Go! */
1204
                        g_list_foreach(mountpoint->listeners, janus_streaming_relay_rtp_packet, &packet);
1205
                        continue;
1206
                }
1207
        }
1208
        JANUS_DEBUG("Leaving relay thread\n");
1209
        return NULL;
1210
}
1211

    
1212
static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) {
1213
        janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data;
1214
        if(!packet || !packet->data || packet->length < 1) {
1215
                JANUS_DEBUG("Invalid packet...\n");
1216
                return;
1217
        }
1218
        janus_streaming_session *session = (janus_streaming_session *)data;
1219
        if(!session || !session->handle) {
1220
                // JANUS_DEBUG("Invalid session...\n");
1221
                return;
1222
        }
1223
        if(!session->started) {
1224
                // JANUS_DEBUG("Streaming not started yet for this session...\n");
1225
                return;
1226
        }
1227
        if(gateway != NULL)        /* FIXME What about RTCP? */
1228
                gateway->relay_rtp(session->handle, packet->is_video, (char *)packet->data, packet->length);
1229
        return;
1230
}