Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_recordplay.c @ d7299f20

History | View | Annotate | Download (63.6 KB)

1
/*! \file   janus_recordplay.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Record&Play plugin
5
 * \details  This is a simple application that implements two different
6
 * features: it allows you to record a message you send with WebRTC in
7
 * the format defined in recorded.c (MJR recording) and subsequently
8
 * replay this recording (or other previously recorded) through WebRTC
9
 * as well.
10
 * 
11
 * This application aims at showing how easy recording frames sent by
12
 * a peer is, and how this recording can be re-used directly, without
13
 * necessarily involving a post-processing process (e.g., through the
14
 * tool we provide in janus-pp-rec.c).
15
 * 
16
 * The configuration process is quite easy: just choose where the
17
 * recordings should be saved. The same folder will also be used to list
18
 * the available recordings that can be replayed.
19
 * 
20
 * \note The application creates a special file in INI format with
21
 * \c .nfo extension for each recording that is saved. This is necessary
22
 * to map a specific audio .mjr file to a different video .mjr one, as
23
 * they always get saved in different files. If you want to replay
24
 * recordings you took in a different application (e.g., the streaming
25
 * or videoroom plugins) just copy the related files in the folder you
26
 * configured this plugin to use and create a .nfo file in the same
27
 * folder to create a mapping, e.g.:
28
 * 
29
 *                 [12345678]
30
 *                 name = My videoroom recording
31
 *                 date = 2014-10-14 17:11:26
32
 *                 audio = mcu-audio.mjr
33
 *                 video = mcu-video.mjr
34
 * 
35
 * \section recplayapi Record&Play API
36
 * 
37
 * The Record&Play API supports several requests, some of which are
38
 * synchronous and some asynchronous. There are some situations, though,
39
 * (invalid JSON, invalid request) which will always result in a
40
 * synchronous error response even for asynchronous requests. 
41
 * 
42
 * \c list and \c update are synchronous requests, which means you'll
43
 * get a response directly within the context of the transaction. \c list
44
 * lists all the available recordings, while \c update forces the plugin
45
 * to scan the folder of recordings again in case some were added manually
46
 * and not indexed in the meanwhile.
47
 * 
48
 * The \c record , \c play , \c start and \c stop requests instead are
49
 * all asynchronous, which means you'll get a notification about their
50
 * success or failure in an event. \c record asks the plugin to start
51
 * recording a session; \c play asks the plugin to prepare the playout
52
 * of one of the previously recorded sessions; \c start starts the
53
 * actual playout, and \c stop stops whatever the session was for, i.e.,
54
 * recording or replaying.
55
 * 
56
 * The \c list request has to be formatted as follows:
57
 *
58
\verbatim
59
{
60
        "request" : "list"
61
}
62
\endverbatim
63
 *
64
 * A successful request will result in an array of recordings:
65
 * 
66
\verbatim
67
{
68
        "recordplay" : "list",
69
        "list": [        // Array of recording objects
70
                {                        // Recording #1
71
                        "id": <numeric ID>,
72
                        "name": "<Name of the recording>",
73
                        "date": "<Date of the recording>",
74
                        "audio": "<Audio rec file, if any; optional>",
75
                        "video": "<Video rec file, if any; optional>"
76
                },
77
                <other recordings>
78
        ]
79
}
80
\endverbatim
81
 * 
82
 * An error instead (and the same applies to all other requests, so this
83
 * won't be repeated) would provide both an error code and a more verbose
84
 * description of the cause of the issue:
85
 * 
86
\verbatim
87
{
88
        "recordplay" : "event",
89
        "error_code" : <numeric ID, check Macros below>,
90
        "error" : "<error description as a string>"
91
}
92
\endverbatim
93
 * 
94
 * The \c update request instead has to be formatted as follows:
95
 *
96
\verbatim
97
{
98
        "request" : "update"
99
}
100
\endverbatim
101
 *
102
 * which will always result in an immediate ack ( \c ok ):
103
 * 
104
\verbatim
105
{
106
        "recordplay" : "ok",
107
}
108
\endverbatim
109
 *
110
 * Coming to the asynchronous requests, \c record has to be attached to
111
 * a JSEP offer (failure to do so will result in an error) and has to be
112
 * formatted as follows:
113
 *
114
\verbatim
115
{
116
        "request" : "record",
117
        "name" : "<Pretty name for the recording>"
118
}
119
\endverbatim
120
 *
121
 * A successful management of this request will result in a \c recording
122
 * event which will include the unique ID of the recording and a JSEP
123
 * answer to complete the setup of the associated PeerConnection to record:
124
 * 
125
\verbatim
126
{
127
        "recordplay" : "event",
128
        "result": {
129
                "status" : "recording",
130
                "id" : <unique numeric ID>
131
        }
132
}
133
\endverbatim
134
 *
135
 * A \c stop request can interrupt the recording process and tear the
136
 * associated PeerConnection down:
137
 * 
138
\verbatim
139
{
140
        "request" : "stop",
141
}
142
\endverbatim
143
 * 
144
 * This will result in a \c stopped status:
145
 * 
146
\verbatim
147
{
148
        "recordplay" : "event",
149
        "result": {
150
                "status" : "stopped",
151
                "id" : <unique numeric ID of the interrupted recording>
152
        }
153
}
154
\endverbatim
155
 * 
156
 * For what concerns the playout, instead, the process is slightly
157
 * different: you first choose a recording to replay, using \c play ,
158
 * and then start its playout using a \c start request. Just as before,
159
 * a \c stop request will interrupt the playout and tear the PeerConnection
160
 * down. It's very important to point out that no JSEP offer must be
161
 * sent for replaying a recording: in this case, it will always be the
162
 * plugin to generate a JSON offer (in response to a \c play request),
163
 * which means you'll then have to provide a JSEP answer within the
164
 * context of the following \c start request which will close the circle.
165
 * 
166
 * A \c play request has to be formatted as follows:
167
 * 
168
\verbatim
169
{
170
        "request" : "play",
171
        "id" : <unique numeric ID of the recording to replay>
172
}
173
\endverbatim
174
 * 
175
 * This will result in a \c preparing status notification which will be
176
 * attached to the JSEP offer originated by the plugin in order to
177
 * match the media available in the recording:
178
 * 
179
\verbatim
180
{
181
        "recordplay" : "event",
182
        "result": {
183
                "status" : "preparing",
184
                "id" : <unique numeric ID of the recording>
185
        }
186
}
187
\endverbatim
188
 * 
189
 * A \c start request, which as anticipated must be attached to the JSEP
190
 * answer to the previous offer sent by the plugin, has to be formatted
191
 * as follows:
192
 * 
193
\verbatim
194
{
195
        "request" : "start",
196
}
197
\endverbatim
198
 * 
199
 * This will result in a \c playing status notification:
200
 * 
201
\verbatim
202
{
203
        "recordplay" : "event",
204
        "result": {
205
                "status" : "playing"
206
        }
207
}
208
\endverbatim
209
 * 
210
 * Just as before, a \c stop request can interrupt the playout process at
211
 * any time, and tear the associated PeerConnection down:
212
 * 
213
\verbatim
214
{
215
        "request" : "stop",
216
}
217
\endverbatim
218
 * 
219
 * This will result in a \c stopped status:
220
 * 
221
\verbatim
222
{
223
        "recordplay" : "event",
224
        "result": {
225
                "status" : "stopped"
226
        }
227
}
228
\endverbatim
229
 * 
230
 * If the plugin detects a loss of the associated PeerConnection, whether
231
 * as a result of a \c stop request or because the 10 seconds passed, a
232
 * \c done result notification is triggered to inform the application
233
 * the recording/playout session is over:
234
 * 
235
\verbatim
236
{
237
        "recordplay" : "event",
238
        "result": "done"
239
}
240
\endverbatim
241
 *
242
 * \ingroup plugins
243
 * \ref plugins
244
 */
245

    
246
#include "plugin.h"
247

    
248
#include <dirent.h>
249
#include <arpa/inet.h>
250
#include <sys/time.h>
251
#include <jansson.h>
252

    
253
#include "../debug.h"
254
#include "../apierror.h"
255
#include "../config.h"
256
#include "../mutex.h"
257
#include "../record.h"
258
#include "../rtp.h"
259
#include "../rtcp.h"
260
#include "../utils.h"
261

    
262

    
263
/* Plugin information */
264
#define JANUS_RECORDPLAY_VERSION                        3
265
#define JANUS_RECORDPLAY_VERSION_STRING                "0.0.3"
266
#define JANUS_RECORDPLAY_DESCRIPTION                "This is a trivial Record&Play plugin for Janus, to record WebRTC sessions and replay them."
267
#define JANUS_RECORDPLAY_NAME                                "JANUS Record&Play plugin"
268
#define JANUS_RECORDPLAY_AUTHOR                                "Meetecho s.r.l."
269
#define JANUS_RECORDPLAY_PACKAGE                        "janus.plugin.recordplay"
270

    
271
/* Plugin methods */
272
janus_plugin *create(void);
273
int janus_recordplay_init(janus_callbacks *callback, const char *onfig_path);
274
void janus_recordplay_destroy(void);
275
int janus_recordplay_get_api_compatibility(void);
276
int janus_recordplay_get_version(void);
277
const char *janus_recordplay_get_version_string(void);
278
const char *janus_recordplay_get_description(void);
279
const char *janus_recordplay_get_name(void);
280
const char *janus_recordplay_get_author(void);
281
const char *janus_recordplay_get_package(void);
282
void janus_recordplay_create_session(janus_plugin_session *handle, int *error);
283
struct janus_plugin_result *janus_recordplay_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
284
void janus_recordplay_setup_media(janus_plugin_session *handle);
285
void janus_recordplay_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
286
void janus_recordplay_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
287
void janus_recordplay_incoming_data(janus_plugin_session *handle, char *buf, int len);
288
void janus_recordplay_hangup_media(janus_plugin_session *handle);
289
void janus_recordplay_destroy_session(janus_plugin_session *handle, int *error);
290
char *janus_recordplay_query_session(janus_plugin_session *handle);
291

    
292
/* Plugin setup */
293
static janus_plugin janus_recordplay_plugin =
294
        JANUS_PLUGIN_INIT (
295
                .init = janus_recordplay_init,
296
                .destroy = janus_recordplay_destroy,
297

    
298
                .get_api_compatibility = janus_recordplay_get_api_compatibility,
299
                .get_version = janus_recordplay_get_version,
300
                .get_version_string = janus_recordplay_get_version_string,
301
                .get_description = janus_recordplay_get_description,
302
                .get_name = janus_recordplay_get_name,
303
                .get_author = janus_recordplay_get_author,
304
                .get_package = janus_recordplay_get_package,
305
                
306
                .create_session = janus_recordplay_create_session,
307
                .handle_message = janus_recordplay_handle_message,
308
                .setup_media = janus_recordplay_setup_media,
309
                .incoming_rtp = janus_recordplay_incoming_rtp,
310
                .incoming_rtcp = janus_recordplay_incoming_rtcp,
311
                .incoming_data = janus_recordplay_incoming_data,
312
                .hangup_media = janus_recordplay_hangup_media,
313
                .destroy_session = janus_recordplay_destroy_session,
314
                .query_session = janus_recordplay_query_session,
315
        );
316

    
317
/* Plugin creator */
318
janus_plugin *create(void) {
319
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_RECORDPLAY_NAME);
320
        return &janus_recordplay_plugin;
321
}
322

    
323

    
324
/* Useful stuff */
325
static gint initialized = 0, stopping = 0;
326
static janus_callbacks *gateway = NULL;
327
static GThread *handler_thread;
328
static GThread *watchdog;
329
static void *janus_recordplay_handler(void *data);
330

    
331
typedef struct janus_recordplay_message {
332
        janus_plugin_session *handle;
333
        char *transaction;
334
        json_t *message;
335
        char *sdp_type;
336
        char *sdp;
337
} janus_recordplay_message;
338
static GAsyncQueue *messages = NULL;
339

    
340
typedef struct janus_recordplay_rtp_header_extension {
341
        uint16_t type;
342
        uint16_t length;
343
} janus_recordplay_rtp_header_extension;
344

    
345
typedef struct janus_recordplay_frame_packet {
346
        uint16_t seq;        /* RTP Sequence number */
347
        uint64_t ts;        /* RTP Timestamp */
348
        int len;                /* Length of the data */
349
        long offset;        /* Offset of the data in the file */
350
        struct janus_recordplay_frame_packet *next;
351
        struct janus_recordplay_frame_packet *prev;
352
} janus_recordplay_frame_packet;
353
janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, const char *filename);
354

    
355
typedef struct janus_recordplay_recording {
356
        guint64 id;                        /* Recording unique ID */
357
        char *name;                        /* Name of the recording */
358
        char *date;                        /* Time of the recording */
359
        char *arc_file;                /* Audio file name */
360
        char *vrc_file;                /* Video file name */
361
} janus_recordplay_recording;
362
static GHashTable *recordings;
363
static janus_mutex recordings_mutex;
364

    
365
typedef struct janus_recordplay_session {
366
        janus_plugin_session *handle;
367
        gboolean active;
368
        gboolean recorder;                /* Whether this session is used to record or to replay a WebRTC session */
369
        gboolean firefox;        /* We send Firefox users a different kind of FIR */
370
        janus_recordplay_recording *recording;
371
        janus_recorder *arc;        /* Audio recorder */
372
        janus_recorder *vrc;        /* Video recorder */
373
        janus_recordplay_frame_packet *aframes;        /* Audio frames (for playout) */
374
        janus_recordplay_frame_packet *vframes;        /* Video frames (for playout) */
375
        guint video_remb_startup;
376
        guint64 video_remb_last;
377
        guint64 video_bitrate;
378
        guint64 video_rtp_seq_number;
379
        gint video_fir_seq;
380
        guint64 destroyed;        /* Time at which this session was marked as destroyed */
381
} janus_recordplay_session;
382
static GHashTable *sessions;
383
static GList *old_sessions;
384
static janus_mutex sessions_mutex;
385

    
386

    
387
static char *recordings_path = NULL;
388
void janus_recordplay_update_recordings_list(void);
389
static void *janus_recordplay_playout_thread(void *data);
390

    
391

    
392
/* SDP offer/answer templates for the playout */
393
#define OPUS_PT                111
394
#define VP8_PT                100
395
#define sdp_template \
396
                "v=0\r\n" \
397
                "o=- %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n"        /* We need current time here */ \
398
                "s=%s\r\n"                                                        /* Recording playout id */ \
399
                "t=0 0\r\n" \
400
                "%s%s"                                                                /* Audio and/or video m-lines */
401
#define sdp_a_template \
402
                "m=audio 1 RTP/SAVPF %d\r\n"                /* Opus payload type */ \
403
                "c=IN IP4 1.1.1.1\r\n" \
404
                "a=%s\r\n"                                                        /* Media direction */ \
405
                "a=rtpmap:%d opus/48000/2\r\n"                /* Opus payload type */
406
#define sdp_v_template \
407
                "m=video 1 RTP/SAVPF %d\r\n"                /* VP8 payload type */ \
408
                "c=IN IP4 1.1.1.1\r\n" \
409
                "a=%s\r\n"                                                        /* Media direction */ \
410
                "a=rtpmap:%d VP8/90000\r\n"                        /* VP8 payload type */ \
411
                "a=rtcp-fb:%d ccm fir\r\n"                        /* VP8 payload type */ \
412
                "a=rtcp-fb:%d nack\r\n"                                /* VP8 payload type */ \
413
                "a=rtcp-fb:%d nack pli\r\n"                        /* VP8 payload type */ \
414
                "a=rtcp-fb:%d goog-remb\r\n"                /* VP8 payload type */
415

    
416

    
417
void janus_recordplay_message_free(janus_recordplay_message *msg);
418
void janus_recordplay_message_free(janus_recordplay_message *msg) {
419
        if(!msg)
420
                return;
421

    
422
        msg->handle = NULL;
423

    
424
        g_free(msg->transaction);
425
        msg->transaction = NULL;
426
        g_free(msg->message);
427
        msg->message = NULL;
428
        g_free(msg->sdp_type);
429
        msg->sdp_type = NULL;
430
        g_free(msg->sdp);
431
        msg->sdp = NULL;
432

    
433
        g_free(msg);
434
}
435

    
436

    
437
/* Error codes */
438
#define JANUS_RECORDPLAY_ERROR_NO_MESSAGE                        411
439
#define JANUS_RECORDPLAY_ERROR_INVALID_JSON                412
440
#define JANUS_RECORDPLAY_ERROR_INVALID_REQUEST        413
441
#define JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT        414
442
#define JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT        415
443
#define JANUS_RECORDPLAY_ERROR_NOT_FOUND        416
444
#define JANUS_RECORDPLAY_ERROR_INVALID_RECORDING        417
445
#define JANUS_RECORDPLAY_ERROR_INVALID_STATE        418
446
#define JANUS_RECORDPLAY_ERROR_UNKNOWN_ERROR        499
447

    
448

    
449
/* Record&Play watchdog/garbage collector (sort of) */
450
void *janus_recordplay_watchdog(void *data);
451
void *janus_recordplay_watchdog(void *data) {
452
        JANUS_LOG(LOG_INFO, "Record&Play watchdog started\n");
453
        gint64 now = 0;
454
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
455
                janus_mutex_lock(&sessions_mutex);
456
                /* Iterate on all the sessions */
457
                now = janus_get_monotonic_time();
458
                if(old_sessions != NULL) {
459
                        GList *sl = old_sessions;
460
                        JANUS_LOG(LOG_HUGE, "Checking %d old Record&Play sessions...\n", g_list_length(old_sessions));
461
                        while(sl) {
462
                                janus_recordplay_session *session = (janus_recordplay_session *)sl->data;
463
                                if(!session) {
464
                                        sl = sl->next;
465
                                        continue;
466
                                }
467
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
468
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
469
                                        JANUS_LOG(LOG_VERB, "Freeing old Record&Play session\n");
470
                                        GList *rm = sl->next;
471
                                        old_sessions = g_list_delete_link(old_sessions, sl);
472
                                        sl = rm;
473
                                        session->handle = NULL;
474
                                        g_free(session);
475
                                        session = NULL;
476
                                        continue;
477
                                }
478
                                sl = sl->next;
479
                        }
480
                }
481
                janus_mutex_unlock(&sessions_mutex);
482
                g_usleep(500000);
483
        }
484
        JANUS_LOG(LOG_INFO, "Record&Play watchdog stopped\n");
485
        return NULL;
486
}
487

    
488

    
489
/* Plugin implementation */
490
int janus_recordplay_init(janus_callbacks *callback, const char *config_path) {
491
        if(g_atomic_int_get(&stopping)) {
492
                /* Still stopping from before */
493
                return -1;
494
        }
495
        if(callback == NULL || config_path == NULL) {
496
                /* Invalid arguments */
497
                return -1;
498
        }
499

    
500
        /* Read configuration */
501
        char filename[255];
502
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_RECORDPLAY_PACKAGE);
503
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
504
        janus_config *config = janus_config_parse(filename);
505
        if(config != NULL)
506
                janus_config_print(config);
507
        /* Parse configuration */
508
        if(config != NULL) {
509
                janus_config_item *path = janus_config_get_item_drilldown(config, "general", "path");
510
                if(path && path->value)
511
                        recordings_path = g_strdup(path->value);
512
                /* Done */
513
                janus_config_destroy(config);
514
                config = NULL;
515
        }
516
        if(recordings_path == NULL) {
517
                recordings_path = g_strdup("/tmp");
518
                JANUS_LOG(LOG_WARN, "No recordings path specified, using /tmp...\n");
519
        }
520
        /* Create the folder, if needed */
521
        struct stat st = {0};
522
        if(stat(recordings_path, &st) == -1) {
523
                int res = janus_mkdir(recordings_path, 0755);
524
                JANUS_LOG(LOG_VERB, "Creating folder: %d\n", res);
525
                if(res != 0) {
526
                        JANUS_LOG(LOG_ERR, "%s", strerror(res));
527
                        return -1;        /* No point going on... */
528
                }
529
        }
530
        recordings = g_hash_table_new(NULL, NULL);
531
        janus_mutex_init(&recordings_mutex);
532
        janus_recordplay_update_recordings_list();
533
        
534
        sessions = g_hash_table_new(NULL, NULL);
535
        janus_mutex_init(&sessions_mutex);
536
        messages = g_async_queue_new_full((GDestroyNotify) janus_recordplay_message_free);
537
        /* This is the callback we'll need to invoke to contact the gateway */
538
        gateway = callback;
539

    
540
        g_atomic_int_set(&initialized, 1);
541

    
542
        GError *error = NULL;
543
        /* Start the sessions watchdog */
544
        watchdog = g_thread_try_new("rplay watchdog", &janus_recordplay_watchdog, NULL, &error);
545
        if(error != NULL) {
546
                g_atomic_int_set(&initialized, 0);
547
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Record&Play watchdog thread...\n", error->code, error->message ? error->message : "??");
548
                return -1;
549
        }
550
        /* Launch the thread that will handle incoming messages */
551
        handler_thread = g_thread_try_new("janus recordplay handler", janus_recordplay_handler, NULL, &error);
552
        if(error != NULL) {
553
                g_atomic_int_set(&initialized, 0);
554
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Record&Play handler thread...\n", error->code, error->message ? error->message : "??");
555
                return -1;
556
        }
557
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RECORDPLAY_NAME);
558
        return 0;
559
}
560

    
561
void janus_recordplay_destroy(void) {
562
        if(!g_atomic_int_get(&initialized))
563
                return;
564
        g_atomic_int_set(&stopping, 1);
565
        if(handler_thread != NULL) {
566
                g_thread_join(handler_thread);
567
                handler_thread = NULL;
568
        }
569
        if(watchdog != NULL) {
570
                g_thread_join(watchdog);
571
                watchdog = NULL;
572
        }
573
        /* FIXME We should destroy the sessions cleanly */
574
        janus_mutex_lock(&sessions_mutex);
575
        g_hash_table_destroy(sessions);
576
        janus_mutex_unlock(&sessions_mutex);
577
        g_async_queue_unref(messages);
578
        messages = NULL;
579
        sessions = NULL;
580
        g_atomic_int_set(&initialized, 0);
581
        g_atomic_int_set(&stopping, 0);
582
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_RECORDPLAY_NAME);
583
}
584

    
585
int janus_recordplay_get_api_compatibility(void) {
586
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
587
        return JANUS_PLUGIN_API_VERSION;
588
}
589

    
590
int janus_recordplay_get_version(void) {
591
        return JANUS_RECORDPLAY_VERSION;
592
}
593

    
594
const char *janus_recordplay_get_version_string(void) {
595
        return JANUS_RECORDPLAY_VERSION_STRING;
596
}
597

    
598
const char *janus_recordplay_get_description(void) {
599
        return JANUS_RECORDPLAY_DESCRIPTION;
600
}
601

    
602
const char *janus_recordplay_get_name(void) {
603
        return JANUS_RECORDPLAY_NAME;
604
}
605

    
606
const char *janus_recordplay_get_author(void) {
607
        return JANUS_RECORDPLAY_AUTHOR;
608
}
609

    
610
const char *janus_recordplay_get_package(void) {
611
        return JANUS_RECORDPLAY_PACKAGE;
612
}
613

    
614
void janus_recordplay_create_session(janus_plugin_session *handle, int *error) {
615
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
616
                *error = -1;
617
                return;
618
        }        
619
        janus_recordplay_session *session = (janus_recordplay_session *)calloc(1, sizeof(janus_recordplay_session));
620
        if(session == NULL) {
621
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
622
                *error = -2;
623
                return;
624
        }
625
        session->handle = handle;
626
        session->active = FALSE;
627
        session->recorder = FALSE;
628
        session->firefox = FALSE;
629
        session->arc = NULL;
630
        session->vrc = NULL;
631
        session->destroyed = 0;
632
        session->video_remb_startup = 4;
633
        session->video_remb_last = janus_get_monotonic_time();
634
        session->video_bitrate = 1024 * 1024; // 1 megabit
635
        session->video_rtp_seq_number = 0;
636
        session->video_fir_seq = 0;
637
        handle->plugin_handle = session;
638
        janus_mutex_lock(&sessions_mutex);
639
        g_hash_table_insert(sessions, handle, session);
640
        janus_mutex_unlock(&sessions_mutex);
641

    
642
        return;
643
}
644

    
645
void janus_recordplay_destroy_session(janus_plugin_session *handle, int *error) {
646
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
647
                *error = -1;
648
                return;
649
        }        
650
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;
651
        if(!session) {
652
                JANUS_LOG(LOG_ERR, "No Record&Play session associated with this handle...\n");
653
                *error = -2;
654
                return;
655
        }
656
        if(session->destroyed) {
657
                JANUS_LOG(LOG_WARN, "Record&Play session already destroyed...\n");
658
                return;
659
        }
660
        JANUS_LOG(LOG_VERB, "Removing Record&Play session...\n");
661
        janus_mutex_lock(&sessions_mutex);
662
        g_hash_table_remove(sessions, handle);
663
        janus_mutex_unlock(&sessions_mutex);
664
        /* Cleaning up and removing the session is done in a lazy way */
665
        session->destroyed = janus_get_monotonic_time();
666
        janus_mutex_lock(&sessions_mutex);
667
        old_sessions = g_list_append(old_sessions, session);
668
        janus_mutex_unlock(&sessions_mutex);
669
        return;
670
}
671

    
672
char *janus_recordplay_query_session(janus_plugin_session *handle) {
673
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
674
                return NULL;
675
        }        
676
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;
677
        if(!session) {
678
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
679
                return NULL;
680
        }
681
        /* In the echo test, every session is the same: we just provide some configure info */
682
        json_t *info = json_object();
683
        json_object_set_new(info, "type", json_string(session->recorder ? "recorder" : (session->recording ? "player" : "none")));
684
        if(session->recording) {
685
                json_object_set_new(info, "recording_id", json_integer(session->recording->id));
686
                json_object_set_new(info, "recording_name", json_string(session->recording->name));
687
        }
688
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
689
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
690
        json_decref(info);
691
        return info_text;
692
}
693

    
694
struct janus_plugin_result *janus_recordplay_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
695
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
696
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
697
        JANUS_LOG(LOG_VERB, "%s\n", message);
698

    
699
        /* Pre-parse the message */
700
        int error_code = 0;
701
        char error_cause[512];        /* FIXME 512 should be enough, but anyway... */
702
        json_t *root = NULL;
703
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;        
704
        if(!session) {
705
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
706
                error_code = JANUS_RECORDPLAY_ERROR_UNKNOWN_ERROR;
707
                g_snprintf(error_cause, 512, "%s", "session associated with this handle...");
708
                goto error;
709
        }
710
        if(session->destroyed) {
711
                JANUS_LOG(LOG_ERR, "Session has already been destroyed...\n");
712
                error_code = JANUS_RECORDPLAY_ERROR_UNKNOWN_ERROR;
713
                g_snprintf(error_cause, 512, "%s", "Session has already been destroyed...");
714
                goto error;
715
        }
716
        error_code = 0;
717
        JANUS_LOG(LOG_VERB, "Handling message: %s\n", message);
718
        if(message == NULL) {
719
                JANUS_LOG(LOG_ERR, "No message??\n");
720
                error_code = JANUS_RECORDPLAY_ERROR_NO_MESSAGE;
721
                g_snprintf(error_cause, 512, "%s", "No message??");
722
                goto error;
723
        }
724
        json_error_t error;
725
        root = json_loads(message, 0, &error);
726
        if(!root) {
727
                JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
728
                error_code = JANUS_RECORDPLAY_ERROR_INVALID_JSON;
729
                g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
730
                goto error;
731
        }
732
        if(!json_is_object(root)) {
733
                JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
734
                error_code = JANUS_RECORDPLAY_ERROR_INVALID_JSON;
735
                g_snprintf(error_cause, 512, "JSON error: not an object");
736
                goto error;
737
        }
738
        /* Get the request first */
739
        json_t *request = json_object_get(root, "request");
740
        if(!request) {
741
                JANUS_LOG(LOG_ERR, "Missing element (request)\n");
742
                error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
743
                g_snprintf(error_cause, 512, "Missing element (request)");
744
                goto error;
745
        }
746
        if(!json_is_string(request)) {
747
                JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
748
                error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
749
                g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
750
                goto error;
751
        }
752
        /* Some requests ('create' and 'destroy') can be handled synchronously */
753
        const char *request_text = json_string_value(request);
754
        if(!strcasecmp(request_text, "update")) {
755
                /* Update list of available recordings, scanning the folder again */
756
                janus_recordplay_update_recordings_list();
757
                /* Send info back */
758
                json_t *response = json_object();
759
                json_object_set_new(response, "recordplay", json_string("ok"));
760
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
761
                json_decref(response);
762
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
763
                g_free(response_text);
764
                return result;
765
        } else if(!strcasecmp(request_text, "list")) {
766
                json_t *list = json_array();
767
                JANUS_LOG(LOG_VERB, "Request for the list of recordings\n");
768
                /* Return a list of all available recordings */
769
                janus_mutex_lock(&recordings_mutex);
770
                GHashTableIter iter;
771
                gpointer value;
772
                g_hash_table_iter_init(&iter, recordings);
773
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
774
                        janus_recordplay_recording *rec = value;
775
                        json_t *ml = json_object();
776
                        json_object_set_new(ml, "id", json_integer(rec->id));
777
                        json_object_set_new(ml, "name", json_string(rec->name));
778
                        json_object_set_new(ml, "date", json_string(rec->date));
779
                        json_object_set_new(ml, "audio", json_string(rec->arc_file ? "false" : "true"));
780
                        json_object_set_new(ml, "video", json_string(rec->vrc_file ? "false" : "true"));
781
                        json_array_append_new(list, ml);
782
                }
783
                janus_mutex_unlock(&recordings_mutex);
784
                /* Send info back */
785
                json_t *response = json_object();
786
                json_object_set_new(response, "recordplay", json_string("list"));
787
                json_object_set_new(response, "list", list);
788
                char *response_text = json_dumps(response, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
789
                json_decref(response);
790
                janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
791
                g_free(response_text);
792
                return result;
793
        } else if(!strcasecmp(request_text, "record") || !strcasecmp(request_text, "play")
794
                        || !strcasecmp(request_text, "start") || !strcasecmp(request_text, "stop")) {
795
                /* These messages are handled asynchronously */
796
                goto async;
797
        } else {
798
                JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
799
                error_code = JANUS_RECORDPLAY_ERROR_INVALID_REQUEST;
800
                g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
801
                goto error;
802
        }
803

    
804
error:
805
                {
806
                        if(root != NULL)
807
                                json_decref(root);
808
                        /* Prepare JSON error event */
809
                        json_t *event = json_object();
810
                        json_object_set_new(event, "recordplay", json_string("event"));
811
                        json_object_set_new(event, "error_code", json_integer(error_code));
812
                        json_object_set_new(event, "error", json_string(error_cause));
813
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
814
                        json_decref(event);
815
                        janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, event_text);
816
                        g_free(event_text);
817
                        return result;
818
                }
819

    
820
async:
821
                {
822
                        /* All the other requests to this plugin are handled asynchronously */
823
                        janus_recordplay_message *msg = calloc(1, sizeof(janus_recordplay_message));
824
                        if(msg == NULL) {
825
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
826
                                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
827
                        }
828
                        msg->handle = handle;
829
                        msg->transaction = transaction;
830
                        msg->message = root;
831
                        msg->sdp_type = sdp_type;
832
                        msg->sdp = sdp;
833

    
834
                        g_async_queue_push(messages, msg);
835

    
836
                        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
837
                }
838
}
839

    
840
void janus_recordplay_setup_media(janus_plugin_session *handle) {
841
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
842
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
843
                return;
844
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;        
845
        if(!session) {
846
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
847
                return;
848
        }
849
        if(session->destroyed)
850
                return;
851
        /* Take note of the fact that the session is now active */
852
        session->active = TRUE;
853
        if(!session->recorder) {
854
                GError *error = NULL;
855
                g_thread_try_new("recordplay playout thread", &janus_recordplay_playout_thread, session, &error);
856
                if(error != NULL) {
857
                        /* FIXME Should we notify this back to the user somehow? */
858
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Record&Play playout thread...\n", error->code, error->message ? error->message : "??");
859
                }
860
        }
861
}
862

    
863
void janus_recordplay_send_rtcp_feedback(janus_plugin_session *handle, int video, char *buf, int len) {
864
        if (video != 1) { return; } // we just do this for video, for now
865
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;
866
        char rtcpbuf[200];
867

    
868
        rtp_header *rtp = (rtp_header *)buf;
869
        guint64 rtp_lost_count = (ntohs(rtp->seq_number) - session->video_rtp_seq_number) - 1;
870
        session->video_rtp_seq_number = ntohs(rtp->seq_number);
871

    
872
        if (rtp_lost_count > 0) {
873
                JANUS_LOG(LOG_ERR, "We lost %"SCNu64" video packets\n", rtp_lost_count);
874
                // send fir, pli, and a wtf
875
                memset(rtcpbuf, 0, 20);
876
                janus_rtcp_fir((char *)&rtcpbuf, 20, &session->video_fir_seq);
877
                gateway->relay_rtcp(handle, video, rtcpbuf, 20);
878
                memset(rtcpbuf, 0, 12);
879
                janus_rtcp_pli((char *)&rtcpbuf, 12);
880
                gateway->relay_rtcp(handle, video, rtcpbuf, 12);
881
        }
882

    
883
        // send REMB every second
884
        gint64 now = janus_get_monotonic_time();
885
        guint64 elapsed = now - session->video_remb_last;
886

    
887
        if (elapsed < G_USEC_PER_SEC) { return; }
888

    
889
        session->video_remb_last = now;
890
        uint64_t bitrate = session->video_bitrate;
891

    
892
        // Turns out ramp-up is not doing anything (Chrome)
893
        if (session->video_remb_startup > 0) {
894
                bitrate = bitrate / session->video_remb_startup;
895
                session->video_remb_startup--;
896
        }
897

    
898
        memset(rtcpbuf, 0, 200);
899
        /* FIXME First put a RR (fake)... */
900
        int rrlen = 32;
901
        rtcp_rr *rr = (rtcp_rr *)&rtcpbuf;
902
        rr->header.version = 2;
903
        rr->header.type = RTCP_RR;
904
        rr->header.rc = 1;
905
        rr->header.length = htons((rrlen/4)-1);
906
        /* ... then put a SDES... */
907
        int sdeslen = janus_rtcp_sdes((char *)(&rtcpbuf)+rrlen, 200-rrlen, "janusvideo", 10);
908

    
909
        if(sdeslen > 0) {
910
                /* ... and then finally a REMB */
911
                janus_rtcp_remb((char *)(&rtcpbuf)+rrlen+sdeslen, 24, bitrate);
912
                gateway->relay_rtcp(handle, video, rtcpbuf, rrlen+sdeslen+24);
913
        }
914
}
915

    
916
void janus_recordplay_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
917
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
918
                return;
919
        if(gateway) {
920
                janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;        
921
                if(!session) {
922
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
923
                        return;
924
                }
925
                if(session->destroyed)
926
                        return;
927
                /* Are we recording? */
928
                if(session->recorder) {
929
                        if(video && session->vrc)
930
                                janus_recorder_save_frame(session->vrc, buf, len);
931
                        else if(!video && session->arc)
932
                                janus_recorder_save_frame(session->arc, buf, len);
933
                }
934

    
935
                janus_recordplay_send_rtcp_feedback(handle, video, buf, len);
936
        }
937
}
938

    
939
void janus_recordplay_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
940
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
941
                return;
942
}
943

    
944
void janus_recordplay_incoming_data(janus_plugin_session *handle, char *buf, int len) {
945
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
946
                return;
947
        /* FIXME We don't care */
948
}
949

    
950
void janus_recordplay_hangup_media(janus_plugin_session *handle) {
951
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
952
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
953
                return;
954
        janus_recordplay_session *session = (janus_recordplay_session *)handle->plugin_handle;
955
        if(!session) {
956
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
957
                return;
958
        }
959
        if(session->destroyed)
960
                return;
961
        session->active = FALSE;
962
        if(!session->recorder)
963
                return;
964

    
965
        /* Send an event to the browser and tell it's over */
966
        json_t *event = json_object();
967
        json_object_set_new(event, "recordplay", json_string("event"));
968
        json_object_set_new(event, "result", json_string("done"));
969
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
970
        json_decref(event);
971
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
972
        int ret = gateway->push_event(handle, &janus_recordplay_plugin, NULL, event_text, NULL, NULL);
973
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
974
        g_free(event_text);
975

    
976
        /* FIXME Simulate a "stop" coming from the browser */
977
        janus_recordplay_message *msg = calloc(1, sizeof(janus_recordplay_message));
978
        msg->handle = handle;
979
        msg->message = json_loads("{\"request\":\"stop\"}", 0, NULL);
980
        msg->transaction = NULL;
981
        msg->sdp_type = NULL;
982
        msg->sdp = NULL;
983
        g_async_queue_push(messages, msg);
984
}
985

    
986
/* Thread to handle incoming messages */
987
static void *janus_recordplay_handler(void *data) {
988
        JANUS_LOG(LOG_VERB, "Joining Record&Play handler thread\n");
989
        janus_recordplay_message *msg = NULL;
990
        int error_code = 0;
991
        char *error_cause = calloc(512, sizeof(char));        /* FIXME 512 should be enough, but anyway... */
992
        if(error_cause == NULL) {
993
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
994
                return NULL;
995
        }
996
        json_t *root = NULL;
997
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
998
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
999
                        usleep(50000);
1000
                        continue;
1001
                }
1002
                janus_recordplay_session *session = (janus_recordplay_session *)msg->handle->plugin_handle;
1003
                if(!session) {
1004
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
1005
                        janus_recordplay_message_free(msg);
1006
                        continue;
1007
                }
1008
                if(session->destroyed) {
1009
                        janus_recordplay_message_free(msg);
1010
                        continue;
1011
                }
1012
                /* Handle request */
1013
                error_code = 0;
1014
                root = NULL;
1015
                if(msg->message == NULL) {
1016
                        JANUS_LOG(LOG_ERR, "No message??\n");
1017
                        error_code = JANUS_RECORDPLAY_ERROR_NO_MESSAGE;
1018
                        g_snprintf(error_cause, 512, "%s", "No message??");
1019
                        goto error;
1020
                }
1021
                root = msg->message;
1022
                /* Get the request first */
1023
                json_t *request = json_object_get(root, "request");
1024
                if(!request) {
1025
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
1026
                        error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
1027
                        g_snprintf(error_cause, 512, "Missing element (request)");
1028
                        goto error;
1029
                }
1030
                if(!json_is_string(request)) {
1031
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
1032
                        error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
1033
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
1034
                        goto error;
1035
                }
1036
                const char *request_text = json_string_value(request);
1037
                json_t *event = NULL;
1038
                json_t *result = NULL;
1039
                char *sdp = NULL;
1040
                const char *filename_text = NULL;
1041
                if(!strcasecmp(request_text, "record")) {
1042
                        if(!msg->sdp) {
1043
                                JANUS_LOG(LOG_ERR, "Missing SDP offer\n");
1044
                                error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
1045
                                g_snprintf(error_cause, 512, "Missing SDP offer");
1046
                                goto error;
1047
                        }
1048
                        json_t *name = json_object_get(root, "name");
1049
                        if(!name) {
1050
                                JANUS_LOG(LOG_ERR, "Missing element (name)\n");
1051
                                error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
1052
                                g_snprintf(error_cause, 512, "Missing element (name)");
1053
                                goto error;
1054
                        }
1055
                        if(!json_is_string(name)) {
1056
                                JANUS_LOG(LOG_ERR, "Invalid element (name should be a string)\n");
1057
                                error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
1058
                                g_snprintf(error_cause, 512, "Invalid element (name should be a string)");
1059
                                goto error;
1060
                        }
1061
                        json_t *filename = json_object_get(root, "filename");
1062
                        if (filename) {
1063
                                if(!json_is_string(name)) {
1064
                                        JANUS_LOG(LOG_ERR, "Invalid element (filename should be a string)\n");
1065
                                        error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
1066
                                        g_snprintf(error_cause, 512, "Invalid element (filename should be a string)");
1067
                                        goto error;
1068
                                }
1069
                                filename_text = json_string_value(filename);
1070
                        }
1071
                        const char *name_text = json_string_value(name);
1072
                        guint64 id = 0;
1073
                        while(id == 0) {
1074
                                id = g_random_int();
1075
                                if(g_hash_table_lookup(recordings, GUINT_TO_POINTER(id)) != NULL) {
1076
                                        /* Room ID already taken, try another one */
1077
                                        id = 0;
1078
                                }
1079
                        }
1080
                        JANUS_LOG(LOG_VERB, "Starting new recording with ID %"SCNu64"\n", id);
1081
                        janus_recordplay_recording *rec = (janus_recordplay_recording *)calloc(1, sizeof(janus_recordplay_recording));
1082
                        rec->id = id;
1083
                        rec->name = g_strdup(name_text);
1084
                        /* Create a date string */
1085
                        time_t t = time(NULL);
1086
                        struct tm *tmv = localtime(&t);
1087
                        char outstr[200];
1088
                        strftime(outstr, sizeof(outstr), "%Y-%m-%d %H:%M:%S", tmv);
1089
                        rec->date = g_strdup(outstr);
1090
                        if(strstr(msg->sdp, "m=audio")) {
1091
                                char filename[256];
1092
                                if (filename_text != NULL) {
1093
                                        g_snprintf(filename, 256, "%s-audio", filename_text);
1094
                                } else {
1095
                                        g_snprintf(filename, 256, "rec-%"SCNu64"-audio", id);
1096
                                }
1097
                                rec->arc_file = g_strdup(filename);
1098
                                session->arc = janus_recorder_create(recordings_path, 0, rec->arc_file);
1099
                        }
1100
                        if(strstr(msg->sdp, "m=video")) {
1101
                                char filename[256];
1102
                                if (filename_text != NULL) {
1103
                                        g_snprintf(filename, 256, "%s-video", filename_text);
1104
                                } else {
1105
                                        g_snprintf(filename, 256, "rec-%"SCNu64"-video", id);
1106
                                }
1107
                                rec->vrc_file = g_strdup(filename);
1108
                                session->vrc = janus_recorder_create(recordings_path, 1, rec->vrc_file);
1109
                        }
1110
                        session->recorder = TRUE;
1111
                        session->recording = rec;
1112
                        janus_mutex_lock(&recordings_mutex);
1113
                        g_hash_table_insert(recordings, GINT_TO_POINTER(rec->id), rec);
1114
                        janus_mutex_unlock(&recordings_mutex);
1115
                        /* We need to prepare an answer */
1116
                        int opus_pt = 0, vp8_pt = 0;
1117
                        opus_pt = janus_get_opus_pt(msg->sdp);
1118
                        JANUS_LOG(LOG_VERB, "Opus payload type is %d\n", opus_pt);
1119
                        vp8_pt = janus_get_vp8_pt(msg->sdp);
1120
                        JANUS_LOG(LOG_VERB, "VP8 payload type is %d\n", vp8_pt);
1121
                        char sdptemp[1024], audio_mline[256], video_mline[512];
1122
                        if(opus_pt > 0) {
1123
                                g_snprintf(audio_mline, 256, sdp_a_template,
1124
                                        opus_pt,                                                /* Opus payload type */
1125
                                        "recvonly",                                                /* Recording is recvonly */
1126
                                        opus_pt);                                                 /* Opus payload type */
1127
                        } else {
1128
                                audio_mline[0] = '\0';
1129
                        }
1130
                        if(vp8_pt > 0) {
1131
                                g_snprintf(video_mline, 512, sdp_v_template,
1132
                                        vp8_pt,                                                        /* VP8 payload type */
1133
                                        "recvonly",                                                /* Recording is recvonly */
1134
                                        vp8_pt,                                                 /* VP8 payload type */
1135
                                        vp8_pt,                                                 /* VP8 payload type */
1136
                                        vp8_pt,                                                 /* VP8 payload type */
1137
                                        vp8_pt,                                                 /* VP8 payload type */
1138
                                        vp8_pt);                                                 /* VP8 payload type */
1139
                        } else {
1140
                                video_mline[0] = '\0';
1141
                        }
1142
                        g_snprintf(sdptemp, 1024, sdp_template,
1143
                                janus_get_monotonic_time(),                /* We need current time here */
1144
                                janus_get_monotonic_time(),                /* We need current time here */
1145
                                session->recording->name,                /* Playout session */
1146
                                audio_mline,                                        /* Audio m-line, if any */
1147
                                video_mline);                                        /* Video m-line, if any */
1148
                        sdp = g_strdup(sdptemp);
1149
                        JANUS_LOG(LOG_VERB, "Going to answer this SDP:\n%s\n", sdp);
1150
                        /* Done! */
1151
                        result = json_object();
1152
                        json_object_set_new(result, "status", json_string("recording"));
1153
                        json_object_set_new(result, "id", json_integer(id));
1154
                } else if(!strcasecmp(request_text, "play")) {
1155
                        if(msg->sdp) {
1156
                                JANUS_LOG(LOG_ERR, "A play request can't contain an SDP\n");
1157
                                error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
1158
                                g_snprintf(error_cause, 512, "A play request can't contain an SDP");
1159
                                goto error;
1160
                        }
1161
                        JANUS_LOG(LOG_VERB, "Replaying a recording\n");
1162
                        json_t *id = json_object_get(root, "id");
1163
                        if(!id) {
1164
                                JANUS_LOG(LOG_ERR, "Missing element (id)\n");
1165
                                error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
1166
                                g_snprintf(error_cause, 512, "Missing element (id)");
1167
                                goto error;
1168
                        }
1169
                        if(!json_is_integer(id)) {
1170
                                JANUS_LOG(LOG_ERR, "Invalid element (id should be an integer)\n");
1171
                                error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
1172
                                g_snprintf(error_cause, 512, "Invalid element (id should be an integer)");
1173
                                goto error;
1174
                        }
1175
                        guint64 id_value = json_integer_value(id);
1176
                        /* Look for this recording */
1177
                        janus_mutex_lock(&recordings_mutex);
1178
                        janus_recordplay_recording *rec = g_hash_table_lookup(recordings, GINT_TO_POINTER(id_value));
1179
                        janus_mutex_unlock(&recordings_mutex);
1180
                        if(rec == NULL) {
1181
                                JANUS_LOG(LOG_ERR, "No such recording\n");
1182
                                error_code = JANUS_RECORDPLAY_ERROR_NOT_FOUND;
1183
                                g_snprintf(error_cause, 512, "No such recording");
1184
                                goto error;
1185
                        }
1186
                        /* Access the frames */
1187
                        if(rec->arc_file) {
1188
                                session->aframes = janus_recordplay_get_frames(recordings_path, rec->arc_file);
1189
                                if(session->aframes == NULL) {
1190
                                        JANUS_LOG(LOG_ERR, "Error opening audio recording\n");
1191
                                        error_code = JANUS_RECORDPLAY_ERROR_INVALID_RECORDING;
1192
                                        g_snprintf(error_cause, 512, "Error opening audio recording");
1193
                                        goto error;
1194
                                }
1195
                        }
1196
                        if(rec->vrc_file) {
1197
                                session->vframes = janus_recordplay_get_frames(recordings_path, rec->vrc_file);
1198
                                if(session->vframes == NULL) {
1199
                                        JANUS_LOG(LOG_ERR, "Error opening video recording\n");
1200
                                        error_code = JANUS_RECORDPLAY_ERROR_INVALID_RECORDING;
1201
                                        g_snprintf(error_cause, 512, "Error opening video recording");
1202
                                        goto error;
1203
                                }
1204
                        }
1205
                        session->recording = rec;
1206
                        session->recorder = FALSE;
1207
                        /* We need to prepare an offer */
1208
                        char sdptemp[1024], audio_mline[256], video_mline[512];
1209
                        if(session->recording->arc_file) {
1210
                                g_snprintf(audio_mline, 256, sdp_a_template,
1211
                                        OPUS_PT,                                                /* Opus payload type */
1212
                                        "sendonly",                                                /* Playout is sendonly */
1213
                                        OPUS_PT);                                                 /* Opus payload type */
1214
                        } else {
1215
                                audio_mline[0] = '\0';
1216
                        }
1217
                        if(session->recording->vrc_file) {
1218
                                g_snprintf(video_mline, 512, sdp_v_template,
1219
                                        VP8_PT,                                                        /* VP8 payload type */
1220
                                        "sendonly",                                                /* Playout is sendonly */
1221
                                        VP8_PT,                                                 /* VP8 payload type */
1222
                                        VP8_PT,                                                 /* VP8 payload type */
1223
                                        VP8_PT,                                                 /* VP8 payload type */
1224
                                        VP8_PT,                                                 /* VP8 payload type */
1225
                                        VP8_PT);                                                 /* VP8 payload type */
1226
                        } else {
1227
                                video_mline[0] = '\0';
1228
                        }
1229
                        g_snprintf(sdptemp, 1024, sdp_template,
1230
                                janus_get_monotonic_time(),                /* We need current time here */
1231
                                janus_get_monotonic_time(),                /* We need current time here */
1232
                                session->recording->name,                /* Playout session */
1233
                                audio_mline,                                        /* Audio m-line, if any */
1234
                                video_mline);                                        /* Video m-line, if any */
1235
                        sdp = g_strdup(sdptemp);
1236
                        JANUS_LOG(LOG_VERB, "Going to offer this SDP:\n%s\n", sdp);
1237
                        /* Done! */
1238
                        result = json_object();
1239
                        json_object_set_new(result, "status", json_string("preparing"));
1240
                        json_object_set_new(result, "id", json_integer(id_value));
1241
                } else if(!strcasecmp(request_text, "start")) {
1242
                        if(!session->aframes && !session->vframes) {
1243
                                JANUS_LOG(LOG_ERR, "Not a playout session, can't start\n");
1244
                                error_code = JANUS_RECORDPLAY_ERROR_INVALID_STATE;
1245
                                g_snprintf(error_cause, 512, "Not a playout session, can't start");
1246
                                goto error;
1247
                        }
1248
                        /* Just a final message we make use of, e.g., to receive an ANSWER to our OFFER for a playout */
1249
                        if(!msg->sdp) {
1250
                                JANUS_LOG(LOG_ERR, "Missing SDP answer\n");
1251
                                error_code = JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT;
1252
                                g_snprintf(error_cause, 512, "Missing SDP answer");
1253
                                goto error;
1254
                        }
1255
                        /* Done! */
1256
                        result = json_object();
1257
                        json_object_set_new(result, "status", json_string("playing"));
1258
                } else if(!strcasecmp(request_text, "stop")) {
1259
                        /* Stop the recording/playout */
1260
                        session->active = FALSE;
1261
                        if(session->arc) {
1262
                                janus_recorder_close(session->arc);
1263
                                JANUS_LOG(LOG_INFO, "Closed audio recording %s\n", session->arc->filename ? session->arc->filename : "??");
1264
                        }
1265
                        session->arc = NULL;
1266
                        if(session->vrc) {
1267
                                janus_recorder_close(session->vrc);
1268
                                JANUS_LOG(LOG_INFO, "Closed video recording %s\n", session->vrc->filename ? session->vrc->filename : "??");
1269
                        }
1270
                        session->vrc = NULL;
1271
                        if(session->recorder) {
1272
                                /* Create a .nfo file for this recording */
1273
                                char nfofile[1024], nfo[1024];
1274
                                g_snprintf(nfofile, 1024, "%s/%"SCNu64".nfo", recordings_path, session->recording->id);
1275
                                FILE *file = fopen(nfofile, "wt");
1276
                                if(file == NULL) {
1277
                                        JANUS_LOG(LOG_ERR, "Error creating file %s...\n", nfofile);
1278
                                } else {
1279
                                        if(session->recording->arc_file && session->recording->vrc_file) {
1280
                                                g_snprintf(nfo, 1024,
1281
                                                        "[%"SCNu64"]\r\n"
1282
                                                        "name = %s\r\n"
1283
                                                        "date = %s\r\n"
1284
                                                        "audio = %s.mjr\r\n"
1285
                                                        "video = %s.mjr\r\n",
1286
                                                                session->recording->id, session->recording->name, session->recording->date,
1287
                                                                session->recording->arc_file, session->recording->vrc_file);
1288
                                        } else if(session->recording->arc_file) {
1289
                                                g_snprintf(nfo, 1024,
1290
                                                        "[%"SCNu64"]\r\n"
1291
                                                        "name = %s\r\n"
1292
                                                        "date = %s\r\n"
1293
                                                        "audio = %s.mjr\r\n",
1294
                                                                session->recording->id, session->recording->name, session->recording->date,
1295
                                                                session->recording->arc_file);
1296
                                        } else if(session->recording->vrc_file) {
1297
                                                g_snprintf(nfo, 1024,
1298
                                                        "[%"SCNu64"]\r\n"
1299
                                                        "name = %s\r\n"
1300
                                                        "date = %s\r\n"
1301
                                                        "video = %s.mjr\r\n",
1302
                                                                session->recording->id, session->recording->name, session->recording->date,
1303
                                                                session->recording->vrc_file);
1304
                                        }
1305
                                        /* Write to the file now */
1306
                                        fwrite(nfo, strlen(nfo), sizeof(char), file);
1307
                                        fclose(file);
1308
                                }
1309
                        }
1310
                        /* Done! */
1311
                        result = json_object();
1312
                        json_object_set_new(result, "status", json_string("stopped"));
1313
                        if(session->recording)
1314
                                json_object_set_new(result, "id", json_integer(session->recording->id));
1315
                } else {
1316
                        JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);
1317
                        error_code = JANUS_RECORDPLAY_ERROR_INVALID_REQUEST;
1318
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1319
                        goto error;
1320
                }
1321

    
1322
                /* Any SDP to handle? */
1323
                if(msg->sdp) {
1324
                        session->firefox = strstr(msg->sdp, "Mozilla") ? TRUE : FALSE;
1325
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
1326
                }
1327

    
1328
                /* Prepare JSON event */
1329
                event = json_object();
1330
                json_object_set_new(event, "recordplay", json_string("event"));
1331
                if(result != NULL)
1332
                        json_object_set(event, "result", result);
1333
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1334
                json_decref(event);
1335
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1336
                if(!sdp) {
1337
                        int ret = gateway->push_event(msg->handle, &janus_recordplay_plugin, msg->transaction, event_text, NULL, NULL);
1338
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1339
                } else {
1340
                        const char *type = session->recorder ? "answer" : "offer";
1341
                        /* How long will the gateway take to push the event? */
1342
                        gint64 start = janus_get_monotonic_time();
1343
                        int res = gateway->push_event(msg->handle, &janus_recordplay_plugin, msg->transaction, event_text, type, sdp);
1344
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n",
1345
                                res, janus_get_monotonic_time()-start);
1346
                }
1347
                g_free(event_text);
1348
                janus_recordplay_message_free(msg);
1349
                continue;
1350
                
1351
error:
1352
                {
1353
                        /* Prepare JSON error event */
1354
                        json_t *event = json_object();
1355
                        json_object_set_new(event, "recordplay", json_string("event"));
1356
                        json_object_set_new(event, "error_code", json_integer(error_code));
1357
                        json_object_set_new(event, "error", json_string(error_cause));
1358
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1359
                        json_decref(event);
1360
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
1361
                        int ret = gateway->push_event(msg->handle, &janus_recordplay_plugin, msg->transaction, event_text, NULL, NULL);
1362
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1363
                        g_free(event_text);
1364
                        janus_recordplay_message_free(msg);
1365
                }
1366
        }
1367
        g_free(error_cause);
1368
        JANUS_LOG(LOG_VERB, "LeavingRecord&Play handler thread\n");
1369
        return NULL;
1370
}
1371

    
1372
void janus_recordplay_update_recordings_list(void) {
1373
        if(recordings_path == NULL)
1374
                return;
1375
        JANUS_LOG(LOG_VERB, "Updating recordings list in %s\n", recordings_path);
1376
        janus_mutex_lock(&recordings_mutex);
1377
        /* Open dir */
1378
        DIR *dir = opendir(recordings_path);
1379
        if(!dir) {
1380
                JANUS_LOG(LOG_ERR, "Couldn't open folder...\n");
1381
                return;
1382
        }
1383
        struct dirent *recent = NULL;
1384
        char recpath[1024];
1385
        while((recent = readdir(dir))) {
1386
                int len = strlen(recent->d_name);
1387
                if(len < 4)
1388
                        continue;
1389
                if(strcasecmp(recent->d_name+len-4, ".nfo"))
1390
                        continue;
1391
                JANUS_LOG(LOG_VERB, "Importing recording '%s'...\n", recent->d_name);
1392
                memset(recpath, 0, 1024);
1393
                g_snprintf(recpath, 1024, "%s/%s", recordings_path, recent->d_name);
1394
                janus_config *nfo = janus_config_parse(recpath);
1395
                if(nfo == NULL) { 
1396
                        JANUS_LOG(LOG_ERR, "Invalid recording '%s'...\n", recent->d_name);
1397
                        continue;
1398
                }
1399
                janus_config_category *cat = janus_config_get_categories(nfo);
1400
                if(cat == NULL) {
1401
                        JANUS_LOG(LOG_WARN, "No recording info in '%s', skipping...\n", recent->d_name);
1402
                        janus_config_destroy(nfo);
1403
                        continue;
1404
                }
1405
                guint64 id = atol(cat->name);
1406
                if(id == 0) {
1407
                        JANUS_LOG(LOG_WARN, "Invalid ID, skipping...\n");
1408
                        janus_config_destroy(nfo);
1409
                        continue;
1410
                }
1411
                janus_config_item *name = janus_config_get_item(cat, "name");
1412
                janus_config_item *date = janus_config_get_item(cat, "date");
1413
                janus_config_item *audio = janus_config_get_item(cat, "audio");
1414
                janus_config_item *video = janus_config_get_item(cat, "video");
1415
                if(!name || !name->value || !date || !date->value) {
1416
                        JANUS_LOG(LOG_WARN, "Invalid info, skipping...\n");
1417
                        janus_config_destroy(nfo);
1418
                        continue;
1419
                }
1420
                if((!audio || !audio->value) && (!video || !video->value)) {
1421
                        JANUS_LOG(LOG_WARN, "No audio and no video, skipping...\n");
1422
                        janus_config_destroy(nfo);
1423
                        continue;
1424
                }
1425
                janus_recordplay_recording *rec = (janus_recordplay_recording *)calloc(1, sizeof(janus_recordplay_recording));
1426
                rec->id = id;
1427
                rec->name = g_strdup(name->value);
1428
                rec->date = g_strdup(date->value);
1429
                rec->arc_file = g_strdup(audio->value);
1430
                if(strstr(rec->arc_file, ".mjr")) {
1431
                        char *ext = strstr(rec->arc_file, ".mjr");
1432
                        *ext = '\0';
1433
                }
1434
                rec->vrc_file = g_strdup(video->value);
1435
                if(strstr(rec->vrc_file, ".mjr")) {
1436
                        char *ext = strstr(rec->vrc_file, ".mjr");
1437
                        *ext = '\0';
1438
                }
1439

    
1440
                janus_config_destroy(nfo);
1441

    
1442
                /* FIXME We should clean previous recordings with the same ID */
1443
                g_hash_table_remove(recordings, GUINT_TO_POINTER(id));
1444
                g_hash_table_insert(recordings, GUINT_TO_POINTER(id), rec);
1445
        }
1446
        closedir(dir);
1447
        janus_mutex_unlock(&recordings_mutex);
1448
}
1449

    
1450
janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, const char *filename) {
1451
        if(!dir || !filename)
1452
                return NULL;
1453
        /* Open the file */
1454
        char source[1024];
1455
        if(strstr(filename, ".mjr"))
1456
                g_snprintf(source, 1024, "%s/%s", dir, filename);
1457
        else
1458
                g_snprintf(source, 1024, "%s/%s.mjr", dir, filename);
1459
        FILE *file = fopen(source, "rb");
1460
        if(file == NULL) {
1461
                JANUS_LOG(LOG_ERR, "Could not open file %s\n", source);
1462
                return NULL;
1463
        }
1464
        fseek(file, 0L, SEEK_END);
1465
        long fsize = ftell(file);
1466
        fseek(file, 0L, SEEK_SET);
1467
        JANUS_LOG(LOG_VERB, "File is %zu bytes\n", fsize);
1468

    
1469
        /* Pre-parse */
1470
        JANUS_LOG(LOG_VERB, "Pre-parsing file %s to generate ordered index...\n", source);
1471
        int bytes = 0;
1472
        long offset = 0;
1473
        uint16_t len = 0, count = 0;
1474
        uint32_t first_ts = 0, last_ts = 0, reset = 0;        /* To handle whether there's a timestamp reset in the recording */
1475
        char prebuffer[1500];
1476
        memset(prebuffer, 0, 1500);
1477
        /* Let's look for timestamp resets first */
1478
        while(offset < fsize) {
1479
                /* Read frame header */
1480
                fseek(file, offset, SEEK_SET);
1481
                bytes = fread(prebuffer, sizeof(char), 8, file);
1482
                if(bytes != 8 || prebuffer[0] != 'M') {
1483
                        JANUS_LOG(LOG_ERR, "Invalid header...\n");
1484
                        fclose(file);
1485
                        return NULL;
1486
                }
1487
                offset += 8;
1488
                bytes = fread(&len, sizeof(uint16_t), 1, file);
1489
                len = ntohs(len);
1490
                offset += 2;
1491
                if(len == 5) {
1492
                        /* This is the main header */
1493
                        bytes = fread(prebuffer, sizeof(char), 5, file);
1494
                        if(prebuffer[0] == 'v') {
1495
                                JANUS_LOG(LOG_VERB, "This is a video recording, assuming VP8\n");
1496
                        } else if(prebuffer[0] == 'a') {
1497
                                JANUS_LOG(LOG_VERB, "This is an audio recording, assuming Opus\n");
1498
                        } else {
1499
                                JANUS_LOG(LOG_ERR, "Unsupported recording media type...\n");
1500
                                fclose(file);
1501
                                return NULL;
1502
                        }
1503
                        offset += len;
1504
                        continue;
1505
                } else if(len < 12) {
1506
                        /* Not RTP, skip */
1507
                        offset += len;
1508
                        continue;
1509
                }
1510
                /* Only read RTP header */
1511
                bytes = fread(prebuffer, sizeof(char), 16, file);
1512
                rtp_header *rtp = (rtp_header *)prebuffer;
1513
                if(last_ts == 0) {
1514
                        first_ts = ntohl(rtp->timestamp);
1515
                        if(first_ts > 1000*1000)        /* Just used to check whether a packet is pre- or post-reset */
1516
                                first_ts -= 1000*1000;
1517
                } else {
1518
                        if(ntohl(rtp->timestamp) < last_ts) {
1519
                                /* The new timestamp is smaller than the next one, is it a timestamp reset or simply out of order? */
1520
                                if(last_ts-ntohl(rtp->timestamp) > 2*1000*1000*1000) {
1521
                                        reset = ntohl(rtp->timestamp);
1522
                                        JANUS_LOG(LOG_VERB, "Timestamp reset: %"SCNu32"\n", reset);
1523
                                }
1524
                        } else if(ntohl(rtp->timestamp) < reset) {
1525
                                JANUS_LOG(LOG_VERB, "Updating timestamp reset: %"SCNu32" (was %"SCNu32")\n", ntohl(rtp->timestamp), reset);
1526
                                reset = ntohl(rtp->timestamp);
1527
                        }
1528
                }
1529
                last_ts = ntohl(rtp->timestamp);
1530
                /* Skip data for now */
1531
                offset += len;
1532
        }
1533
        /* Now let's parse the frames and order them */
1534
        offset = 0;
1535
        janus_recordplay_frame_packet *list = NULL, *last = NULL;
1536
        while(offset < fsize) {
1537
                /* Read frame header */
1538
                fseek(file, offset, SEEK_SET);
1539
                bytes = fread(prebuffer, sizeof(char), 8, file);
1540
                prebuffer[8] = '\0';
1541
                JANUS_LOG(LOG_HUGE, "Header: %s\n", prebuffer);
1542
                offset += 8;
1543
                bytes = fread(&len, sizeof(uint16_t), 1, file);
1544
                len = ntohs(len);
1545
                JANUS_LOG(LOG_HUGE, "  -- Length: %"SCNu16"\n", len);
1546
                offset += 2;
1547
                if(len < 12) {
1548
                        /* Not RTP, skip */
1549
                        JANUS_LOG(LOG_HUGE, "  -- Not RTP, skipping\n");
1550
                        offset += len;
1551
                        continue;
1552
                }
1553
                /* Only read RTP header */
1554
                bytes = fread(prebuffer, sizeof(char), 16, file);
1555
                rtp_header *rtp = (rtp_header *)prebuffer;
1556
                JANUS_LOG(LOG_HUGE, "  -- RTP packet (ssrc=%"SCNu32", pt=%"SCNu16", ext=%"SCNu16", seq=%"SCNu16", ts=%"SCNu32")\n",
1557
                                ntohl(rtp->ssrc), rtp->type, rtp->extension, ntohs(rtp->seq_number), ntohl(rtp->timestamp));
1558
                /* Generate frame packet and insert in the ordered list */
1559
                janus_recordplay_frame_packet *p = calloc(1, sizeof(janus_recordplay_frame_packet));
1560
                if(p == NULL) {
1561
                        JANUS_LOG(LOG_ERR, "Memory error!\n");
1562
                        fclose(file);
1563
                        return NULL;
1564
                }
1565
                p->seq = ntohs(rtp->seq_number);
1566
                if(reset == 0) {
1567
                        /* Simple enough... */
1568
                        p->ts = ntohl(rtp->timestamp);
1569
                } else {
1570
                        /* Is this packet pre- or post-reset? */
1571
                        if(ntohl(rtp->timestamp) > first_ts) {
1572
                                /* Pre-reset... */
1573
                                p->ts = ntohl(rtp->timestamp);
1574
                        } else {
1575
                                /* Post-reset... */
1576
                                uint64_t max32 = UINT32_MAX;
1577
                                max32++;
1578
                                p->ts = max32+ntohl(rtp->timestamp);
1579
                        }
1580
                }
1581
                p->len = len;
1582
                p->offset = offset;
1583
                p->next = NULL;
1584
                p->prev = NULL;
1585
                if(list == NULL) {
1586
                        /* First element becomes the list itself (and the last item), at least for now */
1587
                        list = p;
1588
                        last = p;
1589
                } else {
1590
                        /* Check where we should insert this, starting from the end */
1591
                        int added = 0;
1592
                        janus_recordplay_frame_packet *tmp = last;
1593
                        while(tmp) {
1594
                                if(tmp->ts < p->ts) {
1595
                                        /* The new timestamp is greater than the last one we have, append */
1596
                                        added = 1;
1597
                                        if(tmp->next != NULL) {
1598
                                                /* We're inserting */
1599
                                                tmp->next->prev = p;
1600
                                                p->next = tmp->next;
1601
                                        } else {
1602
                                                /* Update the last packet */
1603
                                                last = p;
1604
                                        }
1605
                                        tmp->next = p;
1606
                                        p->prev = tmp;
1607
                                        break;
1608
                                } else if(tmp->ts == p->ts) {
1609
                                        /* Same timestamp, check the sequence number */
1610
                                        if(tmp->seq < p->seq && (abs(tmp->seq - p->seq) < 10000)) {
1611
                                                /* The new sequence number is greater than the last one we have, append */
1612
                                                added = 1;
1613
                                                if(tmp->next != NULL) {
1614
                                                        /* We're inserting */
1615
                                                        tmp->next->prev = p;
1616
                                                        p->next = tmp->next;
1617
                                                } else {
1618
                                                        /* Update the last packet */
1619
                                                        last = p;
1620
                                                }
1621
                                                tmp->next = p;
1622
                                                p->prev = tmp;
1623
                                                break;
1624
                                        } else if(tmp->seq > p->seq && (abs(tmp->seq - p->seq) > 10000)) {
1625
                                                /* The new sequence number (resetted) is greater than the last one we have, append */
1626
                                                added = 1;
1627
                                                if(tmp->next != NULL) {
1628
                                                        /* We're inserting */
1629
                                                        tmp->next->prev = p;
1630
                                                        p->next = tmp->next;
1631
                                                } else {
1632
                                                        /* Update the last packet */
1633
                                                        last = p;
1634
                                                }
1635
                                                tmp->next = p;
1636
                                                p->prev = tmp;
1637
                                                break;
1638
                                        }
1639
                                }
1640
                                /* If either the timestamp ot the sequence number we just got is smaller, keep going back */
1641
                                tmp = tmp->prev;
1642
                        }
1643
                        if(!added) {
1644
                                /* We reached the start */
1645
                                p->next = list;
1646
                                list->prev = p;
1647
                                list = p;
1648
                        }
1649
                }
1650
                /* Skip data for now */
1651
                offset += len;
1652
                count++;
1653
        }
1654
        
1655
        JANUS_LOG(LOG_VERB, "Counted %"SCNu16" RTP packets\n", count);
1656
        janus_recordplay_frame_packet *tmp = list;
1657
        count = 0;
1658
        while(tmp) {
1659
                count++;
1660
                JANUS_LOG(LOG_HUGE, "[%10lu][%4d] seq=%"SCNu16", ts=%"SCNu64"\n", tmp->offset, tmp->len, tmp->seq, tmp->ts);
1661
                tmp = tmp->next;
1662
        }
1663
        JANUS_LOG(LOG_VERB, "Counted %"SCNu16" frame packets\n", count);
1664
        
1665
        /* Done! */
1666
        fclose(file);
1667
        return list;
1668
}
1669

    
1670
static void *janus_recordplay_playout_thread(void *data) {
1671
        janus_recordplay_session *session = (janus_recordplay_session *)data;
1672
        if(!session) {
1673
                JANUS_LOG(LOG_ERR, "Invalid session, can't start playout thread...\n");
1674
                g_thread_unref(g_thread_self());
1675
                return NULL;
1676
        }
1677
        if(session->recorder) {
1678
                JANUS_LOG(LOG_ERR, "This is a recorder, can't start playout thread...\n");
1679
                g_thread_unref(g_thread_self());
1680
                return NULL;
1681
        }
1682
        if(!session->aframes || !session->vframes) {
1683
                JANUS_LOG(LOG_ERR, "No audio and no video frames, can't start playout thread...\n");
1684
                g_thread_unref(g_thread_self());
1685
                return NULL;
1686
        }
1687
        JANUS_LOG(LOG_INFO, "Joining playout thread\n");
1688
        /* Open the files */
1689
        FILE *afile = NULL, *vfile = NULL;
1690
        if(session->aframes) {
1691
                char source[1024];
1692
                if(strstr(session->recording->arc_file, ".mjr"))
1693
                        g_snprintf(source, 1024, "%s/%s", recordings_path, session->recording->arc_file);
1694
                else
1695
                        g_snprintf(source, 1024, "%s/%s.mjr", recordings_path, session->recording->arc_file);
1696
                afile = fopen(source, "rb");
1697
                if(afile == NULL) {
1698
                        JANUS_LOG(LOG_ERR, "Could not open audio file %s, can't start playout thread...\n", source);
1699
                        g_thread_unref(g_thread_self());
1700
                        return NULL;
1701
                }
1702
        }
1703
        if(session->vframes) {
1704
                char source[1024];
1705
                if(strstr(session->recording->vrc_file, ".mjr"))
1706
                        g_snprintf(source, 1024, "%s/%s", recordings_path, session->recording->vrc_file);
1707
                else
1708
                        g_snprintf(source, 1024, "%s/%s.mjr", recordings_path, session->recording->vrc_file);
1709
                vfile = fopen(source, "rb");
1710
                if(vfile == NULL) {
1711
                        JANUS_LOG(LOG_ERR, "Could not open video file %s, can't start playout thread...\n", source);
1712
                        if(afile)
1713
                                fclose(afile);
1714
                        afile = NULL;
1715
                        g_thread_unref(g_thread_self());
1716
                        return NULL;
1717
                }
1718
        }
1719
        
1720
        /* Timer */
1721
        gboolean asent = FALSE, vsent = FALSE;
1722
        struct timeval now, abefore, vbefore;
1723
        time_t d_s, d_us;
1724
        gettimeofday(&now, NULL);
1725
        gettimeofday(&abefore, NULL);
1726
        gettimeofday(&vbefore, NULL);
1727

    
1728
        janus_recordplay_frame_packet *audio = session->aframes, *video = session->vframes;
1729
        char *buffer = (char *)calloc(1500, sizeof(char));
1730
        memset(buffer, 0, 1500);
1731
        int bytes = 0;
1732
        int64_t ts_diff = 0, passed = 0;
1733
        
1734
        while(!session->destroyed && session->active && (audio || video)) {
1735
                if(!asent && !vsent) {
1736
                        /* We skipped the last round, so sleep a bit (5ms) */
1737
                        usleep(5000);
1738
                }
1739
                asent = FALSE;
1740
                vsent = FALSE;
1741
                if(audio) {
1742
                        if(audio == session->aframes) {
1743
                                /* First packet, send now */
1744
                                fseek(afile, audio->offset, SEEK_SET);
1745
                                bytes = fread(buffer, sizeof(char), audio->len, afile);
1746
                                if(bytes != audio->len)
1747
                                        JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, audio->len);
1748
                                /* Update payload type */
1749
                                rtp_header *rtp = (rtp_header *)buffer;
1750
                                rtp->type = OPUS_PT;        /* FIXME We assume it's Opus */
1751
                                if(gateway != NULL)
1752
                                        gateway->relay_rtp(session->handle, 0, (char *)buffer, bytes);
1753
                                gettimeofday(&now, NULL);
1754
                                abefore.tv_sec = now.tv_sec;
1755
                                abefore.tv_usec = now.tv_usec;
1756
                                asent = TRUE;
1757
                                audio = audio->next;
1758
                        } else {
1759
                                /* What's the timestamp skip from the previous packet? */
1760
                                ts_diff = audio->ts - audio->prev->ts;
1761
                                ts_diff = (ts_diff*1000)/48;        /* FIXME Again, we're assuming Opus and it's 48khz */
1762
                                /* Check if it's time to send */
1763
                                gettimeofday(&now, NULL);
1764
                                d_s = now.tv_sec - abefore.tv_sec;
1765
                                d_us = now.tv_usec - abefore.tv_usec;
1766
                                if(d_us < 0) {
1767
                                        d_us += 1000000;
1768
                                        --d_s;
1769
                                }
1770
                                passed = d_s*1000000 + d_us;
1771
                                if(passed < (ts_diff-5000)) {
1772
                                        asent = FALSE;
1773
                                } else {
1774
                                        /* Update the reference time */
1775
                                        abefore.tv_usec += ts_diff%1000000;
1776
                                        if(abefore.tv_usec > 1000000) {
1777
                                                abefore.tv_sec++;
1778
                                                abefore.tv_usec -= 1000000;
1779
                                        }
1780
                                        if(ts_diff/1000000 > 0) {
1781
                                                abefore.tv_sec += ts_diff/1000000;
1782
                                                abefore.tv_usec -= ts_diff/1000000;
1783
                                        }
1784
                                        /* Send now */
1785
                                        fseek(afile, audio->offset, SEEK_SET);
1786
                                        bytes = fread(buffer, sizeof(char), audio->len, afile);
1787
                                        if(bytes != audio->len)
1788
                                                JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, audio->len);
1789
                                        /* Update payload type */
1790
                                        rtp_header *rtp = (rtp_header *)buffer;
1791
                                        rtp->type = OPUS_PT;        /* FIXME We assume it's Opus */
1792
                                        if(gateway != NULL)
1793
                                                gateway->relay_rtp(session->handle, 0, (char *)buffer, bytes);
1794
                                        asent = TRUE;
1795
                                        audio = audio->next;
1796
                                }
1797
                        }
1798
                }
1799
                if(video) {
1800
                        if(video == session->vframes) {
1801
                                /* First packets: there may be many of them with the same timestamp, send them all */
1802
                                uint64_t ts = video->ts;
1803
                                while(video && video->ts == ts) {
1804
                                        fseek(vfile, video->offset, SEEK_SET);
1805
                                        bytes = fread(buffer, sizeof(char), video->len, vfile);
1806
                                        if(bytes != video->len)
1807
                                                JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, video->len);
1808
                                        /* Update payload type */
1809
                                        rtp_header *rtp = (rtp_header *)buffer;
1810
                                        rtp->type = VP8_PT;        /* FIXME We assume it's VP8 */
1811
                                        if(gateway != NULL)
1812
                                                gateway->relay_rtp(session->handle, 1, (char *)buffer, bytes);
1813
                                        video = video->next;
1814
                                }
1815
                                vsent = TRUE;
1816
                                gettimeofday(&now, NULL);
1817
                                vbefore.tv_sec = now.tv_sec;
1818
                                vbefore.tv_usec = now.tv_usec;
1819
                        } else {
1820
                                /* What's the timestamp skip from the previous packet? */
1821
                                ts_diff = video->ts - video->prev->ts;
1822
                                ts_diff = (ts_diff*1000)/90;
1823
                                /* Check if it's time to send */
1824
                                gettimeofday(&now, NULL);
1825
                                d_s = now.tv_sec - vbefore.tv_sec;
1826
                                d_us = now.tv_usec - vbefore.tv_usec;
1827
                                if(d_us < 0) {
1828
                                        d_us += 1000000;
1829
                                        --d_s;
1830
                                }
1831
                                passed = d_s*1000000 + d_us;
1832
                                if(passed < (ts_diff-5000)) {
1833
                                        vsent = FALSE;
1834
                                } else {
1835
                                        /* Update the reference time */
1836
                                        vbefore.tv_usec += ts_diff%1000000;
1837
                                        if(vbefore.tv_usec > 1000000) {
1838
                                                vbefore.tv_sec++;
1839
                                                vbefore.tv_usec -= 1000000;
1840
                                        }
1841
                                        if(ts_diff/1000000 > 0) {
1842
                                                vbefore.tv_sec += ts_diff/1000000;
1843
                                                vbefore.tv_usec -= ts_diff/1000000;
1844
                                        }
1845
                                        /* There may be multiple packets with the same timestamp, send them all */
1846
                                        uint64_t ts = video->ts;
1847
                                        while(video && video->ts == ts) {
1848
                                                /* Send now */
1849
                                                fseek(vfile, video->offset, SEEK_SET);
1850
                                                bytes = fread(buffer, sizeof(char), video->len, vfile);
1851
                                                if(bytes != video->len)
1852
                                                        JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, video->len);
1853
                                                /* Update payload type */
1854
                                                rtp_header *rtp = (rtp_header *)buffer;
1855
                                                rtp->type = VP8_PT;        /* FIXME We assume it's VP8 */
1856
                                                if(gateway != NULL)
1857
                                                        gateway->relay_rtp(session->handle, 1, (char *)buffer, bytes);
1858
                                                video = video->next;
1859
                                        }
1860
                                        vsent = TRUE;
1861
                                }
1862
                        }
1863
                }
1864
        }
1865

    
1866
        /* Get rid of the indexes */
1867
        janus_recordplay_frame_packet *tmp = NULL;
1868
        audio = session->aframes;
1869
        while(audio) {
1870
                tmp = audio->next;
1871
                g_free(audio);
1872
                audio = tmp;
1873
        }
1874
        session->aframes = NULL;
1875
        video = session->vframes;
1876
        while(video) {
1877
                tmp = video->next;
1878
                g_free(video);
1879
                video = tmp;
1880
        }
1881
        session->vframes = NULL;
1882

    
1883
        if(afile)
1884
                fclose(afile);
1885
        afile = NULL;
1886
        if(vfile)
1887
                fclose(vfile);
1888
        vfile = NULL;
1889

    
1890
        /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */
1891
        gateway->close_pc(session->handle);
1892
        
1893
        JANUS_LOG(LOG_INFO, "Leaving playout thread\n");
1894
        g_thread_unref(g_thread_self());
1895
        return NULL;
1896
}