Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_voicemail.c @ 78955474

History | View | Annotate | Download (31.9 KB)

1
/*! \file   janus_voicemail.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus VoiceMail plugin
5
 * \details  This is a plugin implementing a very simple VoiceMail service
6
 * for Janus, specifically recording Opus streams. This means that it replies
7
 * by providing in the SDP only support for Opus, and disabling video.
8
 * When a peer contacts the plugin, the plugin starts recording the audio
9
 * frames it receives and, after 10 seconds, it shuts the PeerConnection
10
 * down and returns an URL to the recorded file.
11
 * 
12
 * Since an URL is returned, the plugin allows you to configure where the
13
 * recordings whould be stored (e.g., a folder in your web server, writable
14
 * by the plugin) and the base path to use when returning URLs (e.g.,
15
 * /my/recordings/ or http://www.example.com/my/recordings).
16
 * 
17
 * By default the plugin saves the recordings in the \c html folder of
18
 * this project, meaning that it can work out of the box with the VoiceMail
19
 * demo we provide in the same folder.
20
 *
21
 * \section vmailapi VoiceMail API
22
 * 
23
 * The VoiceMail API supports just two requests, \c record and \c stop 
24
 * and they're both asynchronous, which means all responses (successes
25
 * and errors) will be delivered as events with the same transaction. 
26
 * 
27
 * \c record will instruct the plugin to start recording, while \c stop
28
 * will make the recording stop before the 10 seconds have passed.
29
 * Never send a JSEP offer with any of these requests: it's always the
30
 * VoiceMail plugin that originates a JSEP offer, in response to a
31
 * \c record request, which means your application will only have to
32
 * send a JSEP answer when that happens.
33
 * 
34
 * The \c record request has to be formatted as follows:
35
 *
36
\verbatim
37
{
38
        "request" : "record"
39
}
40
\endverbatim
41
 *
42
 * A successful request will result in an \c starting status event:
43
 * 
44
\verbatim
45
{
46
        "voicemail" : "event",
47
        "status": "starting"
48
}
49
\endverbatim
50
 * 
51
 * which will be followed by a \c started as soon as the associated
52
 * PeerConnection has been made available to the plugin: 
53
 * 
54
\verbatim
55
{
56
        "voicemail" : "event",
57
        "status": "started"
58
}
59
\endverbatim
60
 * 
61
 * An error instead would provide both an error code and a more verbose
62
 * description of the cause of the issue:
63
 * 
64
\verbatim
65
{
66
        "voicemail" : "event",
67
        "error_code" : <numeric ID, check Macros below>,
68
        "error" : "<error description as a string>"
69
}
70
\endverbatim
71
 * 
72
 * The \c stop request instead has to be formatted as follows:
73
 *
74
\verbatim
75
{
76
        "request" : "stop"
77
}
78
\endverbatim
79
 *
80
 * If the plugin detects a loss of the associated PeerConnection, whether
81
 * as a result of a \c stop request or because the 10 seconds passed, a
82
 * \c done status notification is triggered to inform the application
83
 * the recording session is over, together with the path to the
84
 * recording file itself:
85
 * 
86
\verbatim
87
{
88
        "voicemail" : "event",
89
        "status" : "done",
90
        "recording : "<path to the .opus file>"
91
}
92
\endverbatim
93
 *
94
 * \ingroup plugins
95
 * \ref plugins
96
 */
97

    
98
#include "plugin.h"
99

    
100
#include <jansson.h>
101
#include <sys/stat.h>
102
#include <sys/time.h>
103

    
104
#include <ogg/ogg.h>
105

    
106
#include "../debug.h"
107
#include "../apierror.h"
108
#include "../config.h"
109
#include "../mutex.h"
110
#include "../rtp.h"
111
#include "../utils.h"
112

    
113

    
114
/* Plugin information */
115
#define JANUS_VOICEMAIL_VERSION                        6
116
#define JANUS_VOICEMAIL_VERSION_STRING        "0.0.6"
117
#define JANUS_VOICEMAIL_DESCRIPTION                "This is a plugin implementing a very simple VoiceMail service for Janus, recording Opus streams."
118
#define JANUS_VOICEMAIL_NAME                        "JANUS VoiceMail plugin"
119
#define JANUS_VOICEMAIL_AUTHOR                        "Meetecho s.r.l."
120
#define JANUS_VOICEMAIL_PACKAGE                        "janus.plugin.voicemail"
121

    
122
/* Plugin methods */
123
janus_plugin *create(void);
124
int janus_voicemail_init(janus_callbacks *callback, const char *config_path);
125
void janus_voicemail_destroy(void);
126
int janus_voicemail_get_api_compatibility(void);
127
int janus_voicemail_get_version(void);
128
const char *janus_voicemail_get_version_string(void);
129
const char *janus_voicemail_get_description(void);
130
const char *janus_voicemail_get_name(void);
131
const char *janus_voicemail_get_author(void);
132
const char *janus_voicemail_get_package(void);
133
void janus_voicemail_create_session(janus_plugin_session *handle, int *error);
134
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
135
void janus_voicemail_setup_media(janus_plugin_session *handle);
136
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
137
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
138
void janus_voicemail_hangup_media(janus_plugin_session *handle);
139
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error);
140
char *janus_voicemail_query_session(janus_plugin_session *handle);
141

    
142
/* Plugin setup */
143
static janus_plugin janus_voicemail_plugin =
144
        JANUS_PLUGIN_INIT (
145
                .init = janus_voicemail_init,
146
                .destroy = janus_voicemail_destroy,
147

    
148
                .get_api_compatibility = janus_voicemail_get_api_compatibility,
149
                .get_version = janus_voicemail_get_version,
150
                .get_version_string = janus_voicemail_get_version_string,
151
                .get_description = janus_voicemail_get_description,
152
                .get_name = janus_voicemail_get_name,
153
                .get_author = janus_voicemail_get_author,
154
                .get_package = janus_voicemail_get_package,
155
                
156
                .create_session = janus_voicemail_create_session,
157
                .handle_message = janus_voicemail_handle_message,
158
                .setup_media = janus_voicemail_setup_media,
159
                .incoming_rtp = janus_voicemail_incoming_rtp,
160
                .incoming_rtcp = janus_voicemail_incoming_rtcp,
161
                .hangup_media = janus_voicemail_hangup_media,
162
                .destroy_session = janus_voicemail_destroy_session,
163
                .query_session = janus_voicemail_query_session,
164
        );
165

    
166
/* Plugin creator */
167
janus_plugin *create(void) {
168
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_VOICEMAIL_NAME);
169
        return &janus_voicemail_plugin;
170
}
171

    
172

    
173
/* Useful stuff */
174
static gint initialized = 0, stopping = 0;
175
static janus_callbacks *gateway = NULL;
176
static GThread *handler_thread;
177
static GThread *watchdog;
178
static void *janus_voicemail_handler(void *data);
179

    
180
typedef struct janus_voicemail_message {
181
        janus_plugin_session *handle;
182
        char *transaction;
183
        char *message;
184
        char *sdp_type;
185
        char *sdp;
186
} janus_voicemail_message;
187
static GAsyncQueue *messages = NULL;
188

    
189
void janus_voicemail_message_free(janus_voicemail_message *msg);
190
void janus_voicemail_message_free(janus_voicemail_message *msg) {
191
        if(!msg)
192
                return;
193

    
194
        msg->handle = NULL;
195

    
196
        g_free(msg->transaction);
197
        msg->transaction = NULL;
198
        g_free(msg->message);
199
        msg->message = NULL;
200
        g_free(msg->sdp_type);
201
        msg->sdp_type = NULL;
202
        g_free(msg->sdp);
203
        msg->sdp = NULL;
204

    
205
        g_free(msg);
206
}
207

    
208

    
209
typedef struct janus_voicemail_session {
210
        janus_plugin_session *handle;
211
        guint64 recording_id;
212
        gint64 start_time;
213
        char *filename;
214
        FILE *file;
215
        ogg_stream_state *stream;
216
        int seq;
217
        gboolean started;
218
        gboolean stopping;
219
        gboolean hangingup;
220
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
221
} janus_voicemail_session;
222
static GHashTable *sessions;
223
static GList *old_sessions;
224
static janus_mutex sessions_mutex;
225

    
226
static char *recordings_path = NULL;
227
static char *recordings_base = NULL;
228

    
229
/* SDP offer/answer template */
230
#define sdp_template \
231
                "v=0\r\n" \
232
                "o=- %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n"        /* We need current time here */ \
233
                "s=VoiceMail %"SCNu64"\r\n"                                                /* VoiceMail recording ID */ \
234
                "t=0 0\r\n" \
235
                "m=audio 1 RTP/SAVPF %d\r\n"                /* Opus payload type */ \
236
                "c=IN IP4 1.1.1.1\r\n" \
237
                "a=rtpmap:%d opus/48000/2\r\n"                /* Opus payload type */ \
238
                "a=recvonly\r\n"                                        /* This plugin doesn't send any frames */
239

    
240

    
241
/* OGG/Opus helpers */
242
void le32(unsigned char *p, int v);
243
void le16(unsigned char *p, int v);
244
ogg_packet *op_opushead(void);
245
ogg_packet *op_opustags(void);
246
ogg_packet *op_from_pkt(const unsigned char *pkt, int len);
247
void op_free(ogg_packet *op);
248
int ogg_write(janus_voicemail_session *session);
249
int ogg_flush(janus_voicemail_session *session);
250

    
251

    
252
/* Error codes */
253
#define JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR                499
254
#define JANUS_VOICEMAIL_ERROR_NO_MESSAGE                460
255
#define JANUS_VOICEMAIL_ERROR_INVALID_JSON                461
256
#define JANUS_VOICEMAIL_ERROR_INVALID_REQUEST        462
257
#define JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT        463
258
#define JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT        464
259
#define JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING        465
260
#define JANUS_VOICEMAIL_ERROR_IO_ERROR                        466
261
#define JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR                467
262

    
263

    
264
/* VoiceMail watchdog/garbage collector (sort of) */
265
void *janus_voicemail_watchdog(void *data);
266
void *janus_voicemail_watchdog(void *data) {
267
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog started\n");
268
        gint64 now = 0;
269
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
270
                janus_mutex_lock(&sessions_mutex);
271
                /* Iterate on all the sessions */
272
                now = janus_get_monotonic_time();
273
                if(old_sessions != NULL) {
274
                        GList *sl = old_sessions;
275
                        JANUS_LOG(LOG_HUGE, "Checking %d old VoiceMail sessions...\n", g_list_length(old_sessions));
276
                        while(sl) {
277
                                janus_voicemail_session *session = (janus_voicemail_session *)sl->data;
278
                                if(!session) {
279
                                        sl = sl->next;
280
                                        continue;
281
                                }
282
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
283
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
284
                                        JANUS_LOG(LOG_VERB, "Freeing old VoiceMail session\n");
285
                                        GList *rm = sl->next;
286
                                        old_sessions = g_list_delete_link(old_sessions, sl);
287
                                        sl = rm;
288
                                        session->handle = NULL;
289
                                        g_free(session);
290
                                        session = NULL;
291
                                        continue;
292
                                }
293
                                sl = sl->next;
294
                        }
295
                }
296
                janus_mutex_unlock(&sessions_mutex);
297
                g_usleep(500000);
298
        }
299
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog stopped\n");
300
        return NULL;
301
}
302

    
303

    
304
/* Plugin implementation */
305
int janus_voicemail_init(janus_callbacks *callback, const char *config_path) {
306
        if(g_atomic_int_get(&stopping)) {
307
                /* Still stopping from before */
308
                return -1;
309
        }
310
        if(callback == NULL || config_path == NULL) {
311
                /* Invalid arguments */
312
                return -1;
313
        }
314

    
315
        /* Read configuration */
316
        char filename[255];
317
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_VOICEMAIL_PACKAGE);
318
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
319
        janus_config *config = janus_config_parse(filename);
320
        if(config != NULL)
321
                janus_config_print(config);
322
        
323
        sessions = g_hash_table_new(NULL, NULL);
324
        janus_mutex_init(&sessions_mutex);
325
        messages = g_async_queue_new_full((GDestroyNotify) janus_voicemail_message_free);
326
        /* This is the callback we'll need to invoke to contact the gateway */
327
        gateway = callback;
328

    
329
        /* Parse configuration */
330
        if(config != NULL) {
331
                janus_config_item *path = janus_config_get_item_drilldown(config, "general", "path");
332
                if(path && path->value)
333
                        recordings_path = g_strdup(path->value);
334
                janus_config_item *base = janus_config_get_item_drilldown(config, "general", "base");
335
                if(base && base->value)
336
                        recordings_base = g_strdup(base->value);
337
                /* Done */
338
                janus_config_destroy(config);
339
                config = NULL;
340
        }
341
        if(recordings_path == NULL)
342
                recordings_path = g_strdup("./html/recordings/");
343
        if(recordings_base == NULL)
344
                recordings_base = g_strdup("/recordings/");
345
        JANUS_LOG(LOG_VERB, "Recordings path: %s\n", recordings_path);
346
        JANUS_LOG(LOG_VERB, "Recordings base: %s\n", recordings_base);
347
        /* Create the folder, if needed */
348
        struct stat st = {0};
349
        if(stat(recordings_path, &st) == -1) {
350
                int res = janus_mkdir(recordings_path, 0755);
351
                JANUS_LOG(LOG_VERB, "Creating folder: %d\n", res);
352
                if(res != 0) {
353
                        JANUS_LOG(LOG_ERR, "%s", strerror(res));
354
                        return -1;        /* No point going on... */
355
                }
356
        }
357
        
358
        g_atomic_int_set(&initialized, 1);
359

    
360
        GError *error = NULL;
361
        /* Start the sessions watchdog */
362
        watchdog = g_thread_try_new("vmail watchdog", &janus_voicemail_watchdog, NULL, &error);
363
        if(error != NULL) {
364
                g_atomic_int_set(&initialized, 0);
365
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail watchdog thread...\n", error->code, error->message ? error->message : "??");
366
                return -1;
367
        }
368
        /* Launch the thread that will handle incoming messages */
369
        handler_thread = g_thread_try_new("janus voicemail handler", janus_voicemail_handler, NULL, &error);
370
        if(error != NULL) {
371
                g_atomic_int_set(&initialized, 0);
372
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail handler thread...\n", error->code, error->message ? error->message : "??");
373
                return -1;
374
        }
375
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_VOICEMAIL_NAME);
376
        return 0;
377
}
378

    
379
void janus_voicemail_destroy(void) {
380
        if(!g_atomic_int_get(&initialized))
381
                return;
382
        g_atomic_int_set(&stopping, 1);
383
        if(handler_thread != NULL) {
384
                g_thread_join(handler_thread);
385
                handler_thread = NULL;
386
        }
387
        if(watchdog != NULL) {
388
                g_thread_join(watchdog);
389
                watchdog = NULL;
390
        }
391
        /* FIXME We should destroy the sessions cleanly */
392
        janus_mutex_lock(&sessions_mutex);
393
        g_hash_table_destroy(sessions);
394
        janus_mutex_unlock(&sessions_mutex);
395
        g_async_queue_unref(messages);
396
        messages = NULL;
397
        sessions = NULL;
398
        g_atomic_int_set(&initialized, 0);
399
        g_atomic_int_set(&stopping, 0);
400
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_VOICEMAIL_NAME);
401
}
402

    
403
int janus_voicemail_get_api_compatibility(void) {
404
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
405
        return JANUS_PLUGIN_API_VERSION;
406
}
407

    
408
int janus_voicemail_get_version(void) {
409
        return JANUS_VOICEMAIL_VERSION;
410
}
411

    
412
const char *janus_voicemail_get_version_string(void) {
413
        return JANUS_VOICEMAIL_VERSION_STRING;
414
}
415

    
416
const char *janus_voicemail_get_description(void) {
417
        return JANUS_VOICEMAIL_DESCRIPTION;
418
}
419

    
420
const char *janus_voicemail_get_name(void) {
421
        return JANUS_VOICEMAIL_NAME;
422
}
423

    
424
const char *janus_voicemail_get_author(void) {
425
        return JANUS_VOICEMAIL_AUTHOR;
426
}
427

    
428
const char *janus_voicemail_get_package(void) {
429
        return JANUS_VOICEMAIL_PACKAGE;
430
}
431

    
432
void janus_voicemail_create_session(janus_plugin_session *handle, int *error) {
433
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
434
                *error = -1;
435
                return;
436
        }        
437
        janus_voicemail_session *session = (janus_voicemail_session *)calloc(1, sizeof(janus_voicemail_session));
438
        if(session == NULL) {
439
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
440
                *error = -2;
441
                return;
442
        }
443
        session->handle = handle;
444
        session->recording_id = g_random_int();
445
        session->start_time = 0;
446
        session->stream = NULL;
447
        char f[255];
448
        g_snprintf(f, 255, "%s/janus-voicemail-%"SCNu64".opus", recordings_path, session->recording_id);
449
        session->filename = g_strdup(f);
450
        if(session->filename == NULL) {
451
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
452
                *error = -2;
453
                return;
454
        }
455
        session->file = NULL;
456
        session->seq = 0;
457
        session->started = FALSE;
458
        session->stopping = FALSE;
459
        session->destroyed = 0;
460
        handle->plugin_handle = session;
461
        janus_mutex_lock(&sessions_mutex);
462
        g_hash_table_insert(sessions, handle, session);
463
        janus_mutex_unlock(&sessions_mutex);
464

    
465
        return;
466
}
467

    
468
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error) {
469
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
470
                *error = -1;
471
                return;
472
        }        
473
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle; 
474
        if(!session) {
475
                JANUS_LOG(LOG_ERR, "No VoiceMail session associated with this handle...\n");
476
                *error = -2;
477
                return;
478
        }
479
        janus_mutex_lock(&sessions_mutex);
480
        if(!session->destroyed) {
481
                JANUS_LOG(LOG_VERB, "Removing VoiceMail session...\n");
482
                g_hash_table_remove(sessions, handle);
483
                janus_voicemail_hangup_media(handle);
484
                session->destroyed = janus_get_monotonic_time();
485
                /* Cleaning up and removing the session is done in a lazy way */
486
                old_sessions = g_list_append(old_sessions, session);
487
        }
488
        janus_mutex_unlock(&sessions_mutex);
489

    
490
        return;
491
}
492

    
493
char *janus_voicemail_query_session(janus_plugin_session *handle) {
494
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
495
                return NULL;
496
        }        
497
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
498
        if(!session) {
499
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
500
                return NULL;
501
        }
502
        /* In the echo test, every session is the same: we just provide some configure info */
503
        json_t *info = json_object();
504
        json_object_set_new(info, "state", json_string(session->stream ? "recording" : "idle"));
505
        if(session->stream) {
506
                json_object_set_new(info, "id", json_integer(session->recording_id));
507
                json_object_set_new(info, "start_time", json_integer(session->start_time));
508
                json_object_set_new(info, "filename", session->filename ? json_string(session->filename) : NULL);
509
        }
510
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
511
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
512
        json_decref(info);
513
        return info_text;
514
}
515

    
516
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
517
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
518
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
519
        janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
520
        if(msg == NULL) {
521
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
522
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
523
        }
524
        msg->handle = handle;
525
        msg->transaction = transaction;
526
        msg->message = message;
527
        msg->sdp_type = sdp_type;
528
        msg->sdp = sdp;
529
        g_async_queue_push(messages, msg);
530

    
531
        /* All the requests to this plugin are handled asynchronously */
532
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
533
}
534

    
535
void janus_voicemail_setup_media(janus_plugin_session *handle) {
536
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
537
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
538
                return;
539
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
540
        if(!session) {
541
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
542
                return;
543
        }
544
        if(session->destroyed)
545
                return;
546
        /* Only start recording this peer when we get this event */
547
        session->start_time = janus_get_monotonic_time();
548
        session->started = TRUE;
549
        /* Prepare JSON event */
550
        json_t *event = json_object();
551
        json_object_set_new(event, "voicemail", json_string("event"));
552
        json_object_set_new(event, "status", json_string("started"));
553
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
554
        json_decref(event);
555
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
556
        int ret = gateway->push_event(handle, &janus_voicemail_plugin, NULL, event_text, NULL, NULL);
557
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
558
        g_free(event_text);
559
}
560

    
561
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
562
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
563
                return;
564
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
565
        if(!session || session->destroyed || session->stopping || !session->started || session->start_time == 0)
566
                return;
567
        gint64 now = janus_get_monotonic_time();
568
        /* Have 10 seconds passed? */
569
        if((now-session->start_time) >= 10*G_USEC_PER_SEC) {
570
                /* FIXME Simulate a "stop" coming from the browser */
571
                session->started = FALSE;
572
                janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
573
                if(msg == NULL) {
574
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
575
                        return;
576
                }
577
                msg->handle = handle;
578
                msg->message = g_strdup("{\"request\":\"stop\"}");
579
                msg->transaction = NULL;
580
                msg->sdp_type = NULL;
581
                msg->sdp = NULL;
582
                g_async_queue_push(messages, msg);
583
                return;
584
        }
585
        /* Save the frame */
586
        rtp_header *rtp = (rtp_header *)buf;
587
        uint16_t seq = ntohs(rtp->seq_number);
588
        if(session->seq == 0)
589
                session->seq = seq;
590
        ogg_packet *op = op_from_pkt((const unsigned char *)(buf+12), len-12);        /* TODO Check RTP extensions... */
591
        //~ JANUS_LOG(LOG_VERB, "\tWriting at position %d (%d)\n", seq-session->seq+1, 960*(seq-session->seq+1));
592
        op->granulepos = 960*(seq-session->seq+1); // FIXME: get this from the toc byte
593
        ogg_stream_packetin(session->stream, op);
594
        free(op);
595
        ogg_write(session);
596
}
597

    
598
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
599
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
600
                return;
601
        /* FIXME Should we care? */
602
}
603

    
604
void janus_voicemail_hangup_media(janus_plugin_session *handle) {
605
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
606
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
607
                return;
608
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
609
        if(!session) {
610
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
611
                return;
612
        }
613
        session->started = FALSE;
614
        if(session->destroyed || session->hangingup)
615
                return;
616
        session->hangingup = TRUE;
617
        /* Close and reset stuff */
618
        if(session->file)
619
                fclose(session->file);
620
        session->file = NULL;
621
        if(session->stream)
622
                ogg_stream_destroy(session->stream);
623
        session->stream = NULL;
624
        /* Done */
625
        session->hangingup = FALSE;
626
}
627

    
628
/* Thread to handle incoming messages */
629
static void *janus_voicemail_handler(void *data) {
630
        JANUS_LOG(LOG_VERB, "Joining VoiceMail handler thread\n");
631
        janus_voicemail_message *msg = NULL;
632
        int error_code = 0;
633
        char *error_cause = calloc(512, sizeof(char));
634
        if(error_cause == NULL) {
635
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
636
                return NULL;
637
        }
638
        json_t *root = NULL;
639
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
640
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
641
                        usleep(50000);
642
                        continue;
643
                }
644
                janus_voicemail_session *session = (janus_voicemail_session *)msg->handle->plugin_handle;        
645
                if(!session) {
646
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
647
                        janus_voicemail_message_free(msg);
648
                        continue;
649
                }
650
                if(session->destroyed) {
651
                        janus_voicemail_message_free(msg);
652
                        continue;
653
                }
654
                /* Handle request */
655
                error_code = 0;
656
                root = NULL;
657
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
658
                if(msg->message == NULL) {
659
                        JANUS_LOG(LOG_ERR, "No message??\n");
660
                        error_code = JANUS_VOICEMAIL_ERROR_NO_MESSAGE;
661
                        g_snprintf(error_cause, 512, "%s", "No message??");
662
                        goto error;
663
                }
664
                json_error_t error;
665
                root = json_loads(msg->message, 0, &error);
666
                if(!root) {
667
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
668
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
669
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
670
                        goto error;
671
                }
672
                if(!json_is_object(root)) {
673
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
674
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
675
                        g_snprintf(error_cause, 512, "JSON error: not an object");
676
                        goto error;
677
                }
678
                /* Get the request first */
679
                json_t *request = json_object_get(root, "request");
680
                if(!request) {
681
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
682
                        error_code = JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT;
683
                        g_snprintf(error_cause, 512, "Missing element (request)");
684
                        goto error;
685
                }
686
                if(!json_is_string(request)) {
687
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
688
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT;
689
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
690
                        goto error;
691
                }
692
                const char *request_text = json_string_value(request);
693
                json_t *event = NULL;
694
                if(!strcasecmp(request_text, "record")) {
695
                        JANUS_LOG(LOG_VERB, "Starting new recording\n");
696
                        if(session->file != NULL) {
697
                                JANUS_LOG(LOG_ERR, "Already recording (%s)\n", session->filename ? session->filename : "??");
698
                                error_code = JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING;
699
                                g_snprintf(error_cause, 512, "Already recording");
700
                                goto error;
701
                        }
702
                        session->stream = malloc(sizeof(ogg_stream_state));
703
                        if(session->stream == NULL) {
704
                                JANUS_LOG(LOG_ERR, "Couldn't allocate stream struct\n");
705
                                error_code = JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR;
706
                                g_snprintf(error_cause, 512, "Couldn't allocate stream struct");
707
                                goto error;
708
                        }
709
                        if(ogg_stream_init(session->stream, rand()) < 0) {
710
                                JANUS_LOG(LOG_ERR, "Couldn't initialize Ogg stream state\n");
711
                                error_code = JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR;
712
                                g_snprintf(error_cause, 512, "Couldn't initialize Ogg stream state\n");
713
                                goto error;
714
                        }
715
                        session->file = fopen(session->filename, "wb");
716
                        if(session->file == NULL) {
717
                                JANUS_LOG(LOG_ERR, "Couldn't open output file\n");
718
                                error_code = JANUS_VOICEMAIL_ERROR_IO_ERROR;
719
                                g_snprintf(error_cause, 512, "Couldn't open output file");
720
                                goto error;
721
                        }
722
                        session->seq = 0;
723
                        /* Write stream headers */
724
                        ogg_packet *op = op_opushead();
725
                        ogg_stream_packetin(session->stream, op);
726
                        op_free(op);
727
                        op = op_opustags();
728
                        ogg_stream_packetin(session->stream, op);
729
                        op_free(op);
730
                        ogg_flush(session);
731
                        /* Done: now wait for the setup_media callback to be called */
732
                        event = json_object();
733
                        json_object_set_new(event, "voicemail", json_string("event"));
734
                        json_object_set_new(event, "status", json_string(session->started ? "started" : "starting"));
735
                } else if(!strcasecmp(request_text, "stop")) {
736
                        /* Stop the recording */
737
                        session->started = FALSE;
738
                        session->stopping = TRUE;
739
                        if(session->file)
740
                                fclose(session->file);
741
                        session->file = NULL;
742
                        if(session->stream)
743
                                ogg_stream_destroy(session->stream);
744
                        session->stream = NULL;
745
                        /* Done: send the event and close the handle */
746
                        event = json_object();
747
                        json_object_set_new(event, "voicemail", json_string("event"));
748
                        json_object_set_new(event, "status", json_string("done"));
749
                        char url[1024];
750
                        g_snprintf(url, 1024, "%s/janus-voicemail-%"SCNu64".opus", recordings_base, session->recording_id);
751
                        json_object_set_new(event, "recording", json_string(url));
752
                } else {
753
                        JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);
754
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_REQUEST;
755
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
756
                        goto error;
757
                }
758

    
759
                json_decref(root);
760
                /* Prepare JSON event */
761
                JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");
762
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
763
                json_decref(event);
764
                /* Any SDP to handle? */
765
                if(!msg->sdp) {
766
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
767
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
768
                } else {
769
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
770
                        const char *type = NULL;
771
                        if(!strcasecmp(msg->sdp_type, "offer"))
772
                                type = "answer";
773
                        if(!strcasecmp(msg->sdp_type, "answer"))
774
                                type = "offer";
775
                        /* Fill the SDP template and use that as our answer */
776
                        char sdp[1024];
777
                        /* What is the Opus payload type? */
778
                        int opus_pt = 0;
779
                        char *fmtp = strstr(msg->sdp, "opus/48000");
780
                        if(fmtp != NULL) {
781
                                fmtp -= 5;
782
                                fmtp = strstr(fmtp, ":");
783
                                if(fmtp)
784
                                        fmtp++;
785
                                opus_pt = atoi(fmtp);
786
                        }
787
                        JANUS_LOG(LOG_VERB, "Opus payload type is %d\n", opus_pt);
788
                        g_snprintf(sdp, 1024, sdp_template,
789
                                janus_get_monotonic_time(),                /* We need current time here */
790
                                janus_get_monotonic_time(),                /* We need current time here */
791
                                session->recording_id,                        /* Recording ID */
792
                                opus_pt,                                                /* Opus payload type */
793
                                opus_pt                                                        /* Opus payload type */);
794
                        /* Did the peer negotiate video? */
795
                        if(strstr(msg->sdp, "m=video") != NULL) {
796
                                /* If so, reject it */
797
                                g_strlcat(sdp, "m=video 0 RTP/SAVPF 0\r\n", 1024);                                
798
                        }
799
                        /* How long will the gateway take to push the event? */
800
                        gint64 start = janus_get_monotonic_time();
801
                        int res = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, type, sdp);
802
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);
803
                        if(res != JANUS_OK) {
804
                                /* TODO Failed to negotiate? We should remove this participant */
805
                        }
806
                }
807
                g_free(event_text);
808
                janus_voicemail_message_free(msg);
809
                
810
                if(session->stopping) {
811
                        gateway->end_session(session->handle);
812
                }
813

    
814
                continue;
815
                
816
error:
817
                {
818
                        if(root != NULL)
819
                                json_decref(root);
820
                        /* Prepare JSON error event */
821
                        json_t *event = json_object();
822
                        json_object_set_new(event, "voicemail", json_string("event"));
823
                        json_object_set_new(event, "error_code", json_integer(error_code));
824
                        json_object_set_new(event, "error", json_string(error_cause));
825
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
826
                        json_decref(event);
827
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
828
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
829
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
830
                        g_free(event_text);
831
                        janus_voicemail_message_free(msg);
832
                }
833
        }
834
        g_free(error_cause);
835
        JANUS_LOG(LOG_VERB, "Leaving VoiceMail handler thread\n");
836
        return NULL;
837
}
838

    
839

    
840
/* OGG/Opus helpers */
841
/* Write a little-endian 32 bit int to memory */
842
void le32(unsigned char *p, int v) {
843
        p[0] = v & 0xff;
844
        p[1] = (v >> 8) & 0xff;
845
        p[2] = (v >> 16) & 0xff;
846
        p[3] = (v >> 24) & 0xff;
847
}
848

    
849

    
850
/* Write a little-endian 16 bit int to memory */
851
void le16(unsigned char *p, int v) {
852
        p[0] = v & 0xff;
853
        p[1] = (v >> 8) & 0xff;
854
}
855

    
856
/* ;anufacture a generic OpusHead packet */
857
ogg_packet *op_opushead(void) {
858
        int size = 19;
859
        unsigned char *data = malloc(size);
860
        ogg_packet *op = malloc(sizeof(*op));
861

    
862
        if(!data) {
863
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
864
                return NULL;
865
        }
866
        if(!op) {
867
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
868
                return NULL;
869
        }
870

    
871
        memcpy(data, "OpusHead", 8);  /* identifier */
872
        data[8] = 1;                  /* version */
873
        data[9] = 2;                  /* channels */
874
        le16(data+10, 0);             /* pre-skip */
875
        le32(data + 12, 48000);       /* original sample rate */
876
        le16(data + 16, 0);           /* gain */
877
        data[18] = 0;                 /* channel mapping family */
878

    
879
        op->packet = data;
880
        op->bytes = size;
881
        op->b_o_s = 1;
882
        op->e_o_s = 0;
883
        op->granulepos = 0;
884
        op->packetno = 0;
885

    
886
        return op;
887
}
888

    
889
/* Manufacture a generic OpusTags packet */
890
ogg_packet *op_opustags(void) {
891
        const char *identifier = "OpusTags";
892
        const char *vendor = "Janus VoiceMail plugin";
893
        int size = strlen(identifier) + 4 + strlen(vendor) + 4;
894
        unsigned char *data = malloc(size);
895
        ogg_packet *op = malloc(sizeof(*op));
896

    
897
        if(!data) {
898
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
899
                return NULL;
900
        }
901
        if(!op) {
902
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
903
                return NULL;
904
        }
905

    
906
        memcpy(data, identifier, 8);
907
        le32(data + 8, strlen(vendor));
908
        memcpy(data + 12, vendor, strlen(vendor));
909
        le32(data + 12 + strlen(vendor), 0);
910

    
911
        op->packet = data;
912
        op->bytes = size;
913
        op->b_o_s = 0;
914
        op->e_o_s = 0;
915
        op->granulepos = 0;
916
        op->packetno = 1;
917

    
918
        return op;
919
}
920

    
921
/* Allocate an ogg_packet */
922
ogg_packet *op_from_pkt(const unsigned char *pkt, int len) {
923
        ogg_packet *op = malloc(sizeof(*op));
924
        if(!op) {
925
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet.\n");
926
                return NULL;
927
        }
928

    
929
        op->packet = (unsigned char *)pkt;
930
        op->bytes = len;
931
        op->b_o_s = 0;
932
        op->e_o_s = 0;
933

    
934
        return op;
935
}
936

    
937
/* Free a packet and its contents */
938
void op_free(ogg_packet *op) {
939
        if(op) {
940
                if(op->packet) {
941
                        free(op->packet);
942
                }
943
                free(op);
944
        }
945
}
946

    
947
/* Write out available ogg pages */
948
int ogg_write(janus_voicemail_session *session) {
949
        ogg_page page;
950
        size_t written;
951

    
952
        if(!session || !session->stream || !session->file) {
953
                return -1;
954
        }
955

    
956
        while (ogg_stream_pageout(session->stream, &page)) {
957
                written = fwrite(page.header, 1, page.header_len, session->file);
958
                if(written != (size_t)page.header_len) {
959
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
960
                        return -2;
961
                }
962
                written = fwrite(page.body, 1, page.body_len, session->file);
963
                if(written != (size_t)page.body_len) {
964
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
965
                        return -3;
966
                }
967
        }
968
        return 0;
969
}
970

    
971
/* Flush remaining ogg data */
972
int ogg_flush(janus_voicemail_session *session) {
973
        ogg_page page;
974
        size_t written;
975

    
976
        if(!session || !session->stream || !session->file) {
977
                return -1;
978
        }
979

    
980
        while (ogg_stream_flush(session->stream, &page)) {
981
                written = fwrite(page.header, 1, page.header_len, session->file);
982
                if(written != (size_t)page.header_len) {
983
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
984
                        return -2;
985
                }
986
                written = fwrite(page.body, 1, page.body_len, session->file);
987
                if(written != (size_t)page.body_len) {
988
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
989
                        return -3;
990
                }
991
        }
992
        return 0;
993
}