Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_voicemail.c @ 71a04f89

History | View | Annotate | Download (32.8 KB)

1
/*! \file   janus_voicemail.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus VoiceMail plugin
5
 * \details  This is a plugin implementing a very simple VoiceMail service
6
 * for Janus, specifically recording Opus streams. This means that it replies
7
 * by providing in the SDP only support for Opus, and disabling video.
8
 * When a peer contacts the plugin, the plugin starts recording the audio
9
 * frames it receives and, after 10 seconds, it shuts the PeerConnection
10
 * down and returns an URL to the recorded file.
11
 * 
12
 * Since an URL is returned, the plugin allows you to configure where the
13
 * recordings whould be stored (e.g., a folder in your web server, writable
14
 * by the plugin) and the base path to use when returning URLs (e.g.,
15
 * /my/recordings/ or http://www.example.com/my/recordings).
16
 * 
17
 * By default the plugin saves the recordings in the \c html folder of
18
 * this project, meaning that it can work out of the box with the VoiceMail
19
 * demo we provide in the same folder.
20
 *
21
 * \section vmailapi VoiceMail API
22
 * 
23
 * The VoiceMail API supports just two requests, \c record and \c stop 
24
 * and they're both asynchronous, which means all responses (successes
25
 * and errors) will be delivered as events with the same transaction. 
26
 * 
27
 * \c record will instruct the plugin to start recording, while \c stop
28
 * will make the recording stop before the 10 seconds have passed.
29
 * Never send a JSEP offer with any of these requests: it's always the
30
 * VoiceMail plugin that originates a JSEP offer, in response to a
31
 * \c record request, which means your application will only have to
32
 * send a JSEP answer when that happens.
33
 * 
34
 * The \c record request has to be formatted as follows:
35
 *
36
\verbatim
37
{
38
        "request" : "record"
39
}
40
\endverbatim
41
 *
42
 * A successful request will result in an \c starting status event:
43
 * 
44
\verbatim
45
{
46
        "voicemail" : "event",
47
        "status": "starting"
48
}
49
\endverbatim
50
 * 
51
 * which will be followed by a \c started as soon as the associated
52
 * PeerConnection has been made available to the plugin: 
53
 * 
54
\verbatim
55
{
56
        "voicemail" : "event",
57
        "status": "started"
58
}
59
\endverbatim
60
 * 
61
 * An error instead would provide both an error code and a more verbose
62
 * description of the cause of the issue:
63
 * 
64
\verbatim
65
{
66
        "voicemail" : "event",
67
        "error_code" : <numeric ID, check Macros below>,
68
        "error" : "<error description as a string>"
69
}
70
\endverbatim
71
 * 
72
 * The \c stop request instead has to be formatted as follows:
73
 *
74
\verbatim
75
{
76
        "request" : "stop"
77
}
78
\endverbatim
79
 *
80
 * If the plugin detects a loss of the associated PeerConnection, whether
81
 * as a result of a \c stop request or because the 10 seconds passed, a
82
 * \c done status notification is triggered to inform the application
83
 * the recording session is over, together with the path to the
84
 * recording file itself:
85
 * 
86
\verbatim
87
{
88
        "voicemail" : "event",
89
        "status" : "done",
90
        "recording : "<path to the .opus file>"
91
}
92
\endverbatim
93
 *
94
 * \ingroup plugins
95
 * \ref plugins
96
 */
97

    
98
#include "plugin.h"
99

    
100
#include <jansson.h>
101
#include <sys/stat.h>
102
#include <sys/time.h>
103

    
104
#include <ogg/ogg.h>
105

    
106
#include "../debug.h"
107
#include "../apierror.h"
108
#include "../config.h"
109
#include "../mutex.h"
110
#include "../rtp.h"
111
#include "../utils.h"
112

    
113

    
114
/* Plugin information */
115
#define JANUS_VOICEMAIL_VERSION                        6
116
#define JANUS_VOICEMAIL_VERSION_STRING        "0.0.6"
117
#define JANUS_VOICEMAIL_DESCRIPTION                "This is a plugin implementing a very simple VoiceMail service for Janus, recording Opus streams."
118
#define JANUS_VOICEMAIL_NAME                        "JANUS VoiceMail plugin"
119
#define JANUS_VOICEMAIL_AUTHOR                        "Meetecho s.r.l."
120
#define JANUS_VOICEMAIL_PACKAGE                        "janus.plugin.voicemail"
121

    
122
/* Plugin methods */
123
janus_plugin *create(void);
124
int janus_voicemail_init(janus_callbacks *callback, const char *config_path);
125
void janus_voicemail_destroy(void);
126
int janus_voicemail_get_api_compatibility(void);
127
int janus_voicemail_get_version(void);
128
const char *janus_voicemail_get_version_string(void);
129
const char *janus_voicemail_get_description(void);
130
const char *janus_voicemail_get_name(void);
131
const char *janus_voicemail_get_author(void);
132
const char *janus_voicemail_get_package(void);
133
void janus_voicemail_create_session(janus_plugin_session *handle, int *error);
134
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
135
void janus_voicemail_setup_media(janus_plugin_session *handle);
136
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
137
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
138
void janus_voicemail_hangup_media(janus_plugin_session *handle);
139
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error);
140
char *janus_voicemail_query_session(janus_plugin_session *handle);
141

    
142
/* Plugin setup */
143
static janus_plugin janus_voicemail_plugin =
144
        JANUS_PLUGIN_INIT (
145
                .init = janus_voicemail_init,
146
                .destroy = janus_voicemail_destroy,
147

    
148
                .get_api_compatibility = janus_voicemail_get_api_compatibility,
149
                .get_version = janus_voicemail_get_version,
150
                .get_version_string = janus_voicemail_get_version_string,
151
                .get_description = janus_voicemail_get_description,
152
                .get_name = janus_voicemail_get_name,
153
                .get_author = janus_voicemail_get_author,
154
                .get_package = janus_voicemail_get_package,
155
                
156
                .create_session = janus_voicemail_create_session,
157
                .handle_message = janus_voicemail_handle_message,
158
                .setup_media = janus_voicemail_setup_media,
159
                .incoming_rtp = janus_voicemail_incoming_rtp,
160
                .incoming_rtcp = janus_voicemail_incoming_rtcp,
161
                .hangup_media = janus_voicemail_hangup_media,
162
                .destroy_session = janus_voicemail_destroy_session,
163
                .query_session = janus_voicemail_query_session,
164
        );
165

    
166
/* Plugin creator */
167
janus_plugin *create(void) {
168
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_VOICEMAIL_NAME);
169
        return &janus_voicemail_plugin;
170
}
171

    
172
/* Parameter validation */
173
static struct janus_json_parameter request_parameters[] = {
174
        {"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
175
};
176

    
177
/* Useful stuff */
178
static volatile gint initialized = 0, stopping = 0;
179
static gboolean notify_events = TRUE;
180
static janus_callbacks *gateway = NULL;
181
static GThread *handler_thread;
182
static GThread *watchdog;
183
static void *janus_voicemail_handler(void *data);
184

    
185
typedef struct janus_voicemail_message {
186
        janus_plugin_session *handle;
187
        char *transaction;
188
        char *message;
189
        char *sdp_type;
190
        char *sdp;
191
} janus_voicemail_message;
192
static GAsyncQueue *messages = NULL;
193
static janus_voicemail_message exit_message;
194

    
195
static void janus_voicemail_message_free(janus_voicemail_message *msg) {
196
        if(!msg || msg == &exit_message)
197
                return;
198

    
199
        msg->handle = NULL;
200

    
201
        g_free(msg->transaction);
202
        msg->transaction = NULL;
203
        g_free(msg->message);
204
        msg->message = NULL;
205
        g_free(msg->sdp_type);
206
        msg->sdp_type = NULL;
207
        g_free(msg->sdp);
208
        msg->sdp = NULL;
209

    
210
        g_free(msg);
211
}
212

    
213

    
214
typedef struct janus_voicemail_session {
215
        janus_plugin_session *handle;
216
        guint64 recording_id;
217
        gint64 start_time;
218
        char *filename;
219
        FILE *file;
220
        ogg_stream_state *stream;
221
        int seq;
222
        gboolean started;
223
        gboolean stopping;
224
        volatile gint hangingup;
225
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
226
} janus_voicemail_session;
227
static GHashTable *sessions;
228
static GList *old_sessions;
229
static janus_mutex sessions_mutex;
230

    
231
static char *recordings_path = NULL;
232
static char *recordings_base = NULL;
233

    
234
/* SDP offer/answer template */
235
#define sdp_template \
236
                "v=0\r\n" \
237
                "o=- %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n"        /* We need current time here */ \
238
                "s=VoiceMail %"SCNu64"\r\n"                                                /* VoiceMail recording ID */ \
239
                "t=0 0\r\n" \
240
                "m=audio 1 RTP/SAVPF %d\r\n"                /* Opus payload type */ \
241
                "c=IN IP4 1.1.1.1\r\n" \
242
                "a=rtpmap:%d opus/48000/2\r\n"                /* Opus payload type */ \
243
                "a=recvonly\r\n"                                        /* This plugin doesn't send any frames */
244

    
245

    
246
/* OGG/Opus helpers */
247
void le32(unsigned char *p, int v);
248
void le16(unsigned char *p, int v);
249
ogg_packet *op_opushead(void);
250
ogg_packet *op_opustags(void);
251
ogg_packet *op_from_pkt(const unsigned char *pkt, int len);
252
void op_free(ogg_packet *op);
253
int ogg_write(janus_voicemail_session *session);
254
int ogg_flush(janus_voicemail_session *session);
255

    
256

    
257
/* Error codes */
258
#define JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR                499
259
#define JANUS_VOICEMAIL_ERROR_NO_MESSAGE                460
260
#define JANUS_VOICEMAIL_ERROR_INVALID_JSON                461
261
#define JANUS_VOICEMAIL_ERROR_INVALID_REQUEST        462
262
#define JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT        463
263
#define JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT        464
264
#define JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING        465
265
#define JANUS_VOICEMAIL_ERROR_IO_ERROR                        466
266
#define JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR                467
267

    
268

    
269
/* VoiceMail watchdog/garbage collector (sort of) */
270
void *janus_voicemail_watchdog(void *data);
271
void *janus_voicemail_watchdog(void *data) {
272
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog started\n");
273
        gint64 now = 0;
274
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
275
                janus_mutex_lock(&sessions_mutex);
276
                /* Iterate on all the sessions */
277
                now = janus_get_monotonic_time();
278
                if(old_sessions != NULL) {
279
                        GList *sl = old_sessions;
280
                        JANUS_LOG(LOG_HUGE, "Checking %d old VoiceMail sessions...\n", g_list_length(old_sessions));
281
                        while(sl) {
282
                                janus_voicemail_session *session = (janus_voicemail_session *)sl->data;
283
                                if(!session) {
284
                                        sl = sl->next;
285
                                        continue;
286
                                }
287
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
288
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
289
                                        JANUS_LOG(LOG_VERB, "Freeing old VoiceMail session\n");
290
                                        GList *rm = sl->next;
291
                                        old_sessions = g_list_delete_link(old_sessions, sl);
292
                                        sl = rm;
293
                                        session->handle = NULL;
294
                                        g_free(session);
295
                                        session = NULL;
296
                                        continue;
297
                                }
298
                                sl = sl->next;
299
                        }
300
                }
301
                janus_mutex_unlock(&sessions_mutex);
302
                g_usleep(500000);
303
        }
304
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog stopped\n");
305
        return NULL;
306
}
307

    
308

    
309
/* Plugin implementation */
310
int janus_voicemail_init(janus_callbacks *callback, const char *config_path) {
311
        if(g_atomic_int_get(&stopping)) {
312
                /* Still stopping from before */
313
                return -1;
314
        }
315
        if(callback == NULL || config_path == NULL) {
316
                /* Invalid arguments */
317
                return -1;
318
        }
319

    
320
        /* Read configuration */
321
        char filename[255];
322
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_VOICEMAIL_PACKAGE);
323
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
324
        janus_config *config = janus_config_parse(filename);
325
        if(config != NULL)
326
                janus_config_print(config);
327
        
328
        sessions = g_hash_table_new(NULL, NULL);
329
        janus_mutex_init(&sessions_mutex);
330
        messages = g_async_queue_new_full((GDestroyNotify) janus_voicemail_message_free);
331
        /* This is the callback we'll need to invoke to contact the gateway */
332
        gateway = callback;
333

    
334
        /* Parse configuration */
335
        if(config != NULL) {
336
                janus_config_item *path = janus_config_get_item_drilldown(config, "general", "path");
337
                if(path && path->value)
338
                        recordings_path = g_strdup(path->value);
339
                janus_config_item *base = janus_config_get_item_drilldown(config, "general", "base");
340
                if(base && base->value)
341
                        recordings_base = g_strdup(base->value);
342
                janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
343
                if(events != NULL && events->value != NULL)
344
                        notify_events = janus_is_true(events->value);
345
                if(!notify_events && callback->events_is_enabled()) {
346
                        JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_VOICEMAIL_NAME);
347
                }
348
                /* Done */
349
                janus_config_destroy(config);
350
                config = NULL;
351
        }
352
        if(recordings_path == NULL)
353
                recordings_path = g_strdup("./html/recordings/");
354
        if(recordings_base == NULL)
355
                recordings_base = g_strdup("/recordings/");
356
        JANUS_LOG(LOG_VERB, "Recordings path: %s\n", recordings_path);
357
        JANUS_LOG(LOG_VERB, "Recordings base: %s\n", recordings_base);
358
        /* Create the folder, if needed */
359
        struct stat st = {0};
360
        if(stat(recordings_path, &st) == -1) {
361
                int res = janus_mkdir(recordings_path, 0755);
362
                JANUS_LOG(LOG_VERB, "Creating folder: %d\n", res);
363
                if(res != 0) {
364
                        JANUS_LOG(LOG_ERR, "%s", strerror(res));
365
                        return -1;        /* No point going on... */
366
                }
367
        }
368
        
369
        g_atomic_int_set(&initialized, 1);
370

    
371
        GError *error = NULL;
372
        /* Start the sessions watchdog */
373
        watchdog = g_thread_try_new("vmail watchdog", &janus_voicemail_watchdog, NULL, &error);
374
        if(error != NULL) {
375
                g_atomic_int_set(&initialized, 0);
376
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail watchdog thread...\n", error->code, error->message ? error->message : "??");
377
                return -1;
378
        }
379
        /* Launch the thread that will handle incoming messages */
380
        handler_thread = g_thread_try_new("janus voicemail handler", janus_voicemail_handler, NULL, &error);
381
        if(error != NULL) {
382
                g_atomic_int_set(&initialized, 0);
383
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail handler thread...\n", error->code, error->message ? error->message : "??");
384
                return -1;
385
        }
386
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_VOICEMAIL_NAME);
387
        return 0;
388
}
389

    
390
void janus_voicemail_destroy(void) {
391
        if(!g_atomic_int_get(&initialized))
392
                return;
393
        g_atomic_int_set(&stopping, 1);
394

    
395
        g_async_queue_push(messages, &exit_message);
396
        if(handler_thread != NULL) {
397
                g_thread_join(handler_thread);
398
                handler_thread = NULL;
399
        }
400
        if(watchdog != NULL) {
401
                g_thread_join(watchdog);
402
                watchdog = NULL;
403
        }
404
        /* FIXME We should destroy the sessions cleanly */
405
        janus_mutex_lock(&sessions_mutex);
406
        g_hash_table_destroy(sessions);
407
        janus_mutex_unlock(&sessions_mutex);
408
        g_async_queue_unref(messages);
409
        messages = NULL;
410
        sessions = NULL;
411
        g_atomic_int_set(&initialized, 0);
412
        g_atomic_int_set(&stopping, 0);
413
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_VOICEMAIL_NAME);
414
}
415

    
416
int janus_voicemail_get_api_compatibility(void) {
417
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
418
        return JANUS_PLUGIN_API_VERSION;
419
}
420

    
421
int janus_voicemail_get_version(void) {
422
        return JANUS_VOICEMAIL_VERSION;
423
}
424

    
425
const char *janus_voicemail_get_version_string(void) {
426
        return JANUS_VOICEMAIL_VERSION_STRING;
427
}
428

    
429
const char *janus_voicemail_get_description(void) {
430
        return JANUS_VOICEMAIL_DESCRIPTION;
431
}
432

    
433
const char *janus_voicemail_get_name(void) {
434
        return JANUS_VOICEMAIL_NAME;
435
}
436

    
437
const char *janus_voicemail_get_author(void) {
438
        return JANUS_VOICEMAIL_AUTHOR;
439
}
440

    
441
const char *janus_voicemail_get_package(void) {
442
        return JANUS_VOICEMAIL_PACKAGE;
443
}
444

    
445
void janus_voicemail_create_session(janus_plugin_session *handle, int *error) {
446
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
447
                *error = -1;
448
                return;
449
        }        
450
        janus_voicemail_session *session = (janus_voicemail_session *)g_malloc0(sizeof(janus_voicemail_session));
451
        if(session == NULL) {
452
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
453
                *error = -2;
454
                return;
455
        }
456
        session->handle = handle;
457
        session->recording_id = janus_random_uint64();
458
        session->start_time = 0;
459
        session->stream = NULL;
460
        char f[255];
461
        g_snprintf(f, 255, "%s/janus-voicemail-%"SCNu64".opus", recordings_path, session->recording_id);
462
        session->filename = g_strdup(f);
463
        if(session->filename == NULL) {
464
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
465
                *error = -2;
466
                return;
467
        }
468
        session->file = NULL;
469
        session->seq = 0;
470
        session->started = FALSE;
471
        session->stopping = FALSE;
472
        session->destroyed = 0;
473
        g_atomic_int_set(&session->hangingup, 0);
474
        handle->plugin_handle = session;
475
        janus_mutex_lock(&sessions_mutex);
476
        g_hash_table_insert(sessions, handle, session);
477
        janus_mutex_unlock(&sessions_mutex);
478

    
479
        return;
480
}
481

    
482
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error) {
483
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
484
                *error = -1;
485
                return;
486
        }        
487
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle; 
488
        if(!session) {
489
                JANUS_LOG(LOG_ERR, "No VoiceMail session associated with this handle...\n");
490
                *error = -2;
491
                return;
492
        }
493
        janus_mutex_lock(&sessions_mutex);
494
        if(!session->destroyed) {
495
                JANUS_LOG(LOG_VERB, "Removing VoiceMail session...\n");
496
                g_hash_table_remove(sessions, handle);
497
                janus_voicemail_hangup_media(handle);
498
                session->destroyed = janus_get_monotonic_time();
499
                /* Cleaning up and removing the session is done in a lazy way */
500
                old_sessions = g_list_append(old_sessions, session);
501
        }
502
        janus_mutex_unlock(&sessions_mutex);
503

    
504
        return;
505
}
506

    
507
char *janus_voicemail_query_session(janus_plugin_session *handle) {
508
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
509
                return NULL;
510
        }        
511
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
512
        if(!session) {
513
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
514
                return NULL;
515
        }
516
        /* In the echo test, every session is the same: we just provide some configure info */
517
        json_t *info = json_object();
518
        json_object_set_new(info, "state", json_string(session->stream ? "recording" : "idle"));
519
        if(session->stream) {
520
                json_object_set_new(info, "id", json_integer(session->recording_id));
521
                json_object_set_new(info, "start_time", json_integer(session->start_time));
522
                json_object_set_new(info, "filename", session->filename ? json_string(session->filename) : NULL);
523
        }
524
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
525
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
526
        json_decref(info);
527
        return info_text;
528
}
529

    
530
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
531
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
532
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
533
        janus_voicemail_message *msg = g_malloc0(sizeof(janus_voicemail_message));
534
        if(msg == NULL) {
535
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
536
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
537
        }
538
        msg->handle = handle;
539
        msg->transaction = transaction;
540
        msg->message = message;
541
        msg->sdp_type = sdp_type;
542
        msg->sdp = sdp;
543
        g_async_queue_push(messages, msg);
544

    
545
        /* All the requests to this plugin are handled asynchronously */
546
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
547
}
548

    
549
void janus_voicemail_setup_media(janus_plugin_session *handle) {
550
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
551
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
552
                return;
553
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
554
        if(!session) {
555
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
556
                return;
557
        }
558
        if(session->destroyed)
559
                return;
560
        g_atomic_int_set(&session->hangingup, 0);
561
        /* Only start recording this peer when we get this event */
562
        session->start_time = janus_get_monotonic_time();
563
        session->started = TRUE;
564
        /* Prepare JSON event */
565
        json_t *event = json_object();
566
        json_object_set_new(event, "voicemail", json_string("event"));
567
        json_object_set_new(event, "status", json_string("started"));
568
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
569
        json_decref(event);
570
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
571
        int ret = gateway->push_event(handle, &janus_voicemail_plugin, NULL, event_text, NULL, NULL);
572
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
573
        g_free(event_text);
574
}
575

    
576
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
577
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
578
                return;
579
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
580
        if(!session || session->destroyed || session->stopping || !session->started || session->start_time == 0)
581
                return;
582
        gint64 now = janus_get_monotonic_time();
583
        /* Have 10 seconds passed? */
584
        if((now-session->start_time) >= 10*G_USEC_PER_SEC) {
585
                /* FIXME Simulate a "stop" coming from the browser */
586
                session->started = FALSE;
587
                janus_voicemail_message *msg = g_malloc0(sizeof(janus_voicemail_message));
588
                if(msg == NULL) {
589
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
590
                        return;
591
                }
592
                msg->handle = handle;
593
                msg->message = g_strdup("{\"request\":\"stop\"}");
594
                msg->transaction = NULL;
595
                msg->sdp_type = NULL;
596
                msg->sdp = NULL;
597
                g_async_queue_push(messages, msg);
598
                return;
599
        }
600
        /* Save the frame */
601
        rtp_header *rtp = (rtp_header *)buf;
602
        uint16_t seq = ntohs(rtp->seq_number);
603
        if(session->seq == 0)
604
                session->seq = seq;
605
        ogg_packet *op = op_from_pkt((const unsigned char *)(buf+12), len-12);        /* TODO Check RTP extensions... */
606
        //~ JANUS_LOG(LOG_VERB, "\tWriting at position %d (%d)\n", seq-session->seq+1, 960*(seq-session->seq+1));
607
        op->granulepos = 960*(seq-session->seq+1); // FIXME: get this from the toc byte
608
        ogg_stream_packetin(session->stream, op);
609
        g_free(op);
610
        ogg_write(session);
611
}
612

    
613
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
614
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
615
                return;
616
        /* FIXME Should we care? */
617
}
618

    
619
void janus_voicemail_hangup_media(janus_plugin_session *handle) {
620
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
621
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
622
                return;
623
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
624
        if(!session) {
625
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
626
                return;
627
        }
628
        session->started = FALSE;
629
        if(session->destroyed)
630
                return;
631
        if(g_atomic_int_add(&session->hangingup, 1))
632
                return;
633
        /* Close and reset stuff */
634
        if(session->file)
635
                fclose(session->file);
636
        session->file = NULL;
637
        if(session->stream)
638
                ogg_stream_destroy(session->stream);
639
        session->stream = NULL;
640
}
641

    
642
/* Thread to handle incoming messages */
643
static void *janus_voicemail_handler(void *data) {
644
        JANUS_LOG(LOG_VERB, "Joining VoiceMail handler thread\n");
645
        janus_voicemail_message *msg = NULL;
646
        int error_code = 0;
647
        char error_cause[512];
648
        json_t *root = NULL;
649
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
650
                msg = g_async_queue_pop(messages);
651
                if(msg == NULL)
652
                        continue;
653
                if(msg == &exit_message)
654
                        break;
655
                if(msg->handle == NULL) {
656
                        janus_voicemail_message_free(msg);
657
                        continue;
658
                }
659
                janus_voicemail_session *session = NULL;
660
                janus_mutex_lock(&sessions_mutex);
661
                if(g_hash_table_lookup(sessions, msg->handle) != NULL ) {
662
                        session = (janus_voicemail_session *)msg->handle->plugin_handle;
663
                }
664
                janus_mutex_unlock(&sessions_mutex);
665
                if(!session) {
666
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
667
                        janus_voicemail_message_free(msg);
668
                        continue;
669
                }
670
                if(session->destroyed) {
671
                        janus_voicemail_message_free(msg);
672
                        continue;
673
                }
674
                /* Handle request */
675
                error_code = 0;
676
                root = NULL;
677
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
678
                if(msg->message == NULL) {
679
                        JANUS_LOG(LOG_ERR, "No message??\n");
680
                        error_code = JANUS_VOICEMAIL_ERROR_NO_MESSAGE;
681
                        g_snprintf(error_cause, 512, "%s", "No message??");
682
                        goto error;
683
                }
684
                json_error_t error;
685
                root = json_loads(msg->message, 0, &error);
686
                if(!root) {
687
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
688
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
689
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
690
                        goto error;
691
                }
692
                if(!json_is_object(root)) {
693
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
694
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
695
                        g_snprintf(error_cause, 512, "JSON error: not an object");
696
                        goto error;
697
                }
698
                /* Get the request first */
699
                JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,
700
                        error_code, error_cause, TRUE,
701
                        JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT, JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT);
702
                if(error_code != 0)
703
                        goto error;
704
                json_t *request = json_object_get(root, "request");
705
                const char *request_text = json_string_value(request);
706
                json_t *event = NULL;
707
                if(!strcasecmp(request_text, "record")) {
708
                        JANUS_LOG(LOG_VERB, "Starting new recording\n");
709
                        if(session->file != NULL) {
710
                                JANUS_LOG(LOG_ERR, "Already recording (%s)\n", session->filename ? session->filename : "??");
711
                                error_code = JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING;
712
                                g_snprintf(error_cause, 512, "Already recording");
713
                                goto error;
714
                        }
715
                        session->stream = g_malloc0(sizeof(ogg_stream_state));
716
                        if(session->stream == NULL) {
717
                                JANUS_LOG(LOG_ERR, "Couldn't allocate stream struct\n");
718
                                error_code = JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR;
719
                                g_snprintf(error_cause, 512, "Couldn't allocate stream struct");
720
                                goto error;
721
                        }
722
                        if(ogg_stream_init(session->stream, rand()) < 0) {
723
                                JANUS_LOG(LOG_ERR, "Couldn't initialize Ogg stream state\n");
724
                                error_code = JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR;
725
                                g_snprintf(error_cause, 512, "Couldn't initialize Ogg stream state\n");
726
                                goto error;
727
                        }
728
                        session->file = fopen(session->filename, "wb");
729
                        if(session->file == NULL) {
730
                                JANUS_LOG(LOG_ERR, "Couldn't open output file\n");
731
                                error_code = JANUS_VOICEMAIL_ERROR_IO_ERROR;
732
                                g_snprintf(error_cause, 512, "Couldn't open output file");
733
                                goto error;
734
                        }
735
                        session->seq = 0;
736
                        /* Write stream headers */
737
                        ogg_packet *op = op_opushead();
738
                        ogg_stream_packetin(session->stream, op);
739
                        op_free(op);
740
                        op = op_opustags();
741
                        ogg_stream_packetin(session->stream, op);
742
                        op_free(op);
743
                        ogg_flush(session);
744
                        /* Done: now wait for the setup_media callback to be called */
745
                        event = json_object();
746
                        json_object_set_new(event, "voicemail", json_string("event"));
747
                        json_object_set_new(event, "status", json_string(session->started ? "started" : "starting"));
748
                        /* Also notify event handlers */
749
                        if(notify_events && gateway->events_is_enabled()) {
750
                                json_t *info = json_object();
751
                                json_object_set_new(info, "event", json_string("starting"));
752
                                gateway->notify_event(session->handle, info);
753
                        }
754
                } else if(!strcasecmp(request_text, "stop")) {
755
                        /* Stop the recording */
756
                        session->started = FALSE;
757
                        session->stopping = TRUE;
758
                        if(session->file)
759
                                fclose(session->file);
760
                        session->file = NULL;
761
                        if(session->stream)
762
                                ogg_stream_destroy(session->stream);
763
                        session->stream = NULL;
764
                        /* Done: send the event and close the handle */
765
                        event = json_object();
766
                        json_object_set_new(event, "voicemail", json_string("event"));
767
                        json_object_set_new(event, "status", json_string("done"));
768
                        char url[1024];
769
                        g_snprintf(url, 1024, "%s/janus-voicemail-%"SCNu64".opus", recordings_base, session->recording_id);
770
                        json_object_set_new(event, "recording", json_string(url));
771
                        /* Also notify event handlers */
772
                        if(notify_events && gateway->events_is_enabled()) {
773
                                json_t *info = json_object();
774
                                json_object_set_new(info, "event", json_string("done"));
775
                                gateway->notify_event(session->handle, info);
776
                        }
777
                } else {
778
                        JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);
779
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_REQUEST;
780
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
781
                        goto error;
782
                }
783

    
784
                json_decref(root);
785
                /* Prepare JSON event */
786
                JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");
787
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
788
                json_decref(event);
789
                /* Any SDP to handle? */
790
                if(!msg->sdp) {
791
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
792
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
793
                } else {
794
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
795
                        const char *type = NULL;
796
                        if(!strcasecmp(msg->sdp_type, "offer"))
797
                                type = "answer";
798
                        if(!strcasecmp(msg->sdp_type, "answer"))
799
                                type = "offer";
800
                        /* Fill the SDP template and use that as our answer */
801
                        char sdp[1024];
802
                        /* What is the Opus payload type? */
803
                        int opus_pt = janus_get_codec_pt(msg->sdp, "opus");
804
                        JANUS_LOG(LOG_VERB, "Opus payload type is %d\n", opus_pt);
805
                        g_snprintf(sdp, 1024, sdp_template,
806
                                janus_get_real_time(),                        /* We need current time here */
807
                                janus_get_real_time(),                        /* We need current time here */
808
                                session->recording_id,                        /* Recording ID */
809
                                opus_pt,                                                /* Opus payload type */
810
                                opus_pt                                                        /* Opus payload type */);
811
                        /* Did the peer negotiate video? */
812
                        if(strstr(msg->sdp, "m=video") != NULL) {
813
                                /* If so, reject it */
814
                                g_strlcat(sdp, "m=video 0 RTP/SAVPF 0\r\n", 1024);                                
815
                        }
816
                        /* How long will the gateway take to push the event? */
817
                        g_atomic_int_set(&session->hangingup, 0);
818
                        gint64 start = janus_get_monotonic_time();
819
                        int res = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, type, sdp);
820
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);
821
                        if(res != JANUS_OK) {
822
                                /* TODO Failed to negotiate? We should remove this participant */
823
                        }
824
                }
825
                g_free(event_text);
826
                janus_voicemail_message_free(msg);
827
                
828
                if(session->stopping) {
829
                        gateway->end_session(session->handle);
830
                }
831

    
832
                continue;
833
                
834
error:
835
                {
836
                        if(root != NULL)
837
                                json_decref(root);
838
                        /* Prepare JSON error event */
839
                        json_t *event = json_object();
840
                        json_object_set_new(event, "voicemail", json_string("event"));
841
                        json_object_set_new(event, "error_code", json_integer(error_code));
842
                        json_object_set_new(event, "error", json_string(error_cause));
843
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
844
                        json_decref(event);
845
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
846
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
847
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
848
                        g_free(event_text);
849
                        janus_voicemail_message_free(msg);
850
                }
851
        }
852
        JANUS_LOG(LOG_VERB, "Leaving VoiceMail handler thread\n");
853
        return NULL;
854
}
855

    
856

    
857
/* OGG/Opus helpers */
858
/* Write a little-endian 32 bit int to memory */
859
void le32(unsigned char *p, int v) {
860
        p[0] = v & 0xff;
861
        p[1] = (v >> 8) & 0xff;
862
        p[2] = (v >> 16) & 0xff;
863
        p[3] = (v >> 24) & 0xff;
864
}
865

    
866

    
867
/* Write a little-endian 16 bit int to memory */
868
void le16(unsigned char *p, int v) {
869
        p[0] = v & 0xff;
870
        p[1] = (v >> 8) & 0xff;
871
}
872

    
873
/* ;anufacture a generic OpusHead packet */
874
ogg_packet *op_opushead(void) {
875
        int size = 19;
876
        unsigned char *data = g_malloc0(size);
877
        ogg_packet *op = g_malloc0(sizeof(*op));
878

    
879
        if(!data) {
880
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
881
                return NULL;
882
        }
883
        if(!op) {
884
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
885
                return NULL;
886
        }
887

    
888
        memcpy(data, "OpusHead", 8);  /* identifier */
889
        data[8] = 1;                  /* version */
890
        data[9] = 2;                  /* channels */
891
        le16(data+10, 0);             /* pre-skip */
892
        le32(data + 12, 48000);       /* original sample rate */
893
        le16(data + 16, 0);           /* gain */
894
        data[18] = 0;                 /* channel mapping family */
895

    
896
        op->packet = data;
897
        op->bytes = size;
898
        op->b_o_s = 1;
899
        op->e_o_s = 0;
900
        op->granulepos = 0;
901
        op->packetno = 0;
902

    
903
        return op;
904
}
905

    
906
/* Manufacture a generic OpusTags packet */
907
ogg_packet *op_opustags(void) {
908
        const char *identifier = "OpusTags";
909
        const char *vendor = "Janus VoiceMail plugin";
910
        int size = strlen(identifier) + 4 + strlen(vendor) + 4;
911
        unsigned char *data = g_malloc0(size);
912
        ogg_packet *op = g_malloc0(sizeof(*op));
913

    
914
        if(!data) {
915
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
916
                return NULL;
917
        }
918
        if(!op) {
919
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
920
                return NULL;
921
        }
922

    
923
        memcpy(data, identifier, 8);
924
        le32(data + 8, strlen(vendor));
925
        memcpy(data + 12, vendor, strlen(vendor));
926
        le32(data + 12 + strlen(vendor), 0);
927

    
928
        op->packet = data;
929
        op->bytes = size;
930
        op->b_o_s = 0;
931
        op->e_o_s = 0;
932
        op->granulepos = 0;
933
        op->packetno = 1;
934

    
935
        return op;
936
}
937

    
938
/* Allocate an ogg_packet */
939
ogg_packet *op_from_pkt(const unsigned char *pkt, int len) {
940
        ogg_packet *op = g_malloc0(sizeof(*op));
941
        if(!op) {
942
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet.\n");
943
                return NULL;
944
        }
945

    
946
        op->packet = (unsigned char *)pkt;
947
        op->bytes = len;
948
        op->b_o_s = 0;
949
        op->e_o_s = 0;
950

    
951
        return op;
952
}
953

    
954
/* Free a packet and its contents */
955
void op_free(ogg_packet *op) {
956
        if(op) {
957
                if(op->packet) {
958
                        g_free(op->packet);
959
                }
960
                g_free(op);
961
        }
962
}
963

    
964
/* Write out available ogg pages */
965
int ogg_write(janus_voicemail_session *session) {
966
        ogg_page page;
967
        size_t written;
968

    
969
        if(!session || !session->stream || !session->file) {
970
                return -1;
971
        }
972

    
973
        while (ogg_stream_pageout(session->stream, &page)) {
974
                written = fwrite(page.header, 1, page.header_len, session->file);
975
                if(written != (size_t)page.header_len) {
976
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
977
                        return -2;
978
                }
979
                written = fwrite(page.body, 1, page.body_len, session->file);
980
                if(written != (size_t)page.body_len) {
981
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
982
                        return -3;
983
                }
984
        }
985
        return 0;
986
}
987

    
988
/* Flush remaining ogg data */
989
int ogg_flush(janus_voicemail_session *session) {
990
        ogg_page page;
991
        size_t written;
992

    
993
        if(!session || !session->stream || !session->file) {
994
                return -1;
995
        }
996

    
997
        while (ogg_stream_flush(session->stream, &page)) {
998
                written = fwrite(page.header, 1, page.header_len, session->file);
999
                if(written != (size_t)page.header_len) {
1000
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
1001
                        return -2;
1002
                }
1003
                written = fwrite(page.body, 1, page.body_len, session->file);
1004
                if(written != (size_t)page.body_len) {
1005
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
1006
                        return -3;
1007
                }
1008
        }
1009
        return 0;
1010
}