Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_voicemail.c @ 793d18b1

History | View | Annotate | Download (31.8 KB)

1
/*! \file   janus_voicemail.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus VoiceMail plugin
5
 * \details  This is a plugin implementing a very simple VoiceMail service
6
 * for Janus, specifically recording Opus streams. This means that it replies
7
 * by providing in the SDP only support for Opus, and disabling video.
8
 * When a peer contacts the plugin, the plugin starts recording the audio
9
 * frames it receives and, after 10 seconds, it shuts the PeerConnection
10
 * down and returns an URL to the recorded file.
11
 * 
12
 * Since an URL is returned, the plugin allows you to configure where the
13
 * recordings whould be stored (e.g., a folder in your web server, writable
14
 * by the plugin) and the base path to use when returning URLs (e.g.,
15
 * /my/recordings/ or http://www.example.com/my/recordings).
16
 * 
17
 * By default the plugin saves the recordings in the \c html folder of
18
 * this project, meaning that it can work out of the box with the VoiceMail
19
 * demo we provide in the same folder.
20
 *
21
 * \section vmailapi VoiceMail API
22
 * 
23
 * The VoiceMail API supports just two requests, \c record and \c stop 
24
 * and they're both asynchronous, which means all responses (successes
25
 * and errors) will be delivered as events with the same transaction. 
26
 * 
27
 * \c record will instruct the plugin to start recording, while \c stop
28
 * will make the recording stop before the 10 seconds have passed.
29
 * Never send a JSEP offer with any of these requests: it's always the
30
 * VoiceMail plugin that originates a JSEP offer, in response to a
31
 * \c record request, which means your application will only have to
32
 * send a JSEP answer when that happens.
33
 * 
34
 * The \c record request has to be formatted as follows:
35
 *
36
\verbatim
37
{
38
        "request" : "record"
39
}
40
\endverbatim
41
 *
42
 * A successful request will result in an \c starting status event:
43
 * 
44
\verbatim
45
{
46
        "voicemail" : "event",
47
        "status": "starting"
48
}
49
\endverbatim
50
 * 
51
 * which will be followed by a \c started as soon as the associated
52
 * PeerConnection has been made available to the plugin: 
53
 * 
54
\verbatim
55
{
56
        "voicemail" : "event",
57
        "status": "started"
58
}
59
\endverbatim
60
 * 
61
 * An error instead would provide both an error code and a more verbose
62
 * description of the cause of the issue:
63
 * 
64
\verbatim
65
{
66
        "voicemail" : "event",
67
        "error_code" : <numeric ID, check Macros below>,
68
        "error" : "<error description as a string>"
69
}
70
\endverbatim
71
 * 
72
 * The \c stop request instead has to be formatted as follows:
73
 *
74
\verbatim
75
{
76
        "request" : "stop"
77
}
78
\endverbatim
79
 *
80
 * If the plugin detects a loss of the associated PeerConnection, whether
81
 * as a result of a \c stop request or because the 10 seconds passed, a
82
 * \c done status notification is triggered to inform the application
83
 * the recording session is over, together with the path to the
84
 * recording file itself:
85
 * 
86
\verbatim
87
{
88
        "voicemail" : "event",
89
        "status" : "done",
90
        "recording : "<path to the .opus file>"
91
}
92
\endverbatim
93
 *
94
 * \ingroup plugins
95
 * \ref plugins
96
 */
97

    
98
#include "plugin.h"
99

    
100
#include <jansson.h>
101
#include <sys/stat.h>
102
#include <sys/time.h>
103

    
104
#include <ogg/ogg.h>
105

    
106
#include "../debug.h"
107
#include "../apierror.h"
108
#include "../config.h"
109
#include "../mutex.h"
110
#include "../rtp.h"
111
#include "../utils.h"
112

    
113

    
114
/* Plugin information */
115
#define JANUS_VOICEMAIL_VERSION                        6
116
#define JANUS_VOICEMAIL_VERSION_STRING        "0.0.6"
117
#define JANUS_VOICEMAIL_DESCRIPTION                "This is a plugin implementing a very simple VoiceMail service for Janus, recording Opus streams."
118
#define JANUS_VOICEMAIL_NAME                        "JANUS VoiceMail plugin"
119
#define JANUS_VOICEMAIL_AUTHOR                        "Meetecho s.r.l."
120
#define JANUS_VOICEMAIL_PACKAGE                        "janus.plugin.voicemail"
121

    
122
/* Plugin methods */
123
janus_plugin *create(void);
124
int janus_voicemail_init(janus_callbacks *callback, const char *config_path);
125
void janus_voicemail_destroy(void);
126
int janus_voicemail_get_api_compatibility(void);
127
int janus_voicemail_get_version(void);
128
const char *janus_voicemail_get_version_string(void);
129
const char *janus_voicemail_get_description(void);
130
const char *janus_voicemail_get_name(void);
131
const char *janus_voicemail_get_author(void);
132
const char *janus_voicemail_get_package(void);
133
void janus_voicemail_create_session(janus_plugin_session *handle, int *error);
134
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
135
void janus_voicemail_setup_media(janus_plugin_session *handle);
136
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
137
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
138
void janus_voicemail_hangup_media(janus_plugin_session *handle);
139
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error);
140
char *janus_voicemail_query_session(janus_plugin_session *handle);
141

    
142
/* Plugin setup */
143
static janus_plugin janus_voicemail_plugin =
144
        JANUS_PLUGIN_INIT (
145
                .init = janus_voicemail_init,
146
                .destroy = janus_voicemail_destroy,
147

    
148
                .get_api_compatibility = janus_voicemail_get_api_compatibility,
149
                .get_version = janus_voicemail_get_version,
150
                .get_version_string = janus_voicemail_get_version_string,
151
                .get_description = janus_voicemail_get_description,
152
                .get_name = janus_voicemail_get_name,
153
                .get_author = janus_voicemail_get_author,
154
                .get_package = janus_voicemail_get_package,
155
                
156
                .create_session = janus_voicemail_create_session,
157
                .handle_message = janus_voicemail_handle_message,
158
                .setup_media = janus_voicemail_setup_media,
159
                .incoming_rtp = janus_voicemail_incoming_rtp,
160
                .incoming_rtcp = janus_voicemail_incoming_rtcp,
161
                .hangup_media = janus_voicemail_hangup_media,
162
                .destroy_session = janus_voicemail_destroy_session,
163
                .query_session = janus_voicemail_query_session,
164
        );
165

    
166
/* Plugin creator */
167
janus_plugin *create(void) {
168
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_VOICEMAIL_NAME);
169
        return &janus_voicemail_plugin;
170
}
171

    
172

    
173
/* Useful stuff */
174
static gint initialized = 0, stopping = 0;
175
static janus_callbacks *gateway = NULL;
176
static GThread *handler_thread;
177
static GThread *watchdog;
178
static void *janus_voicemail_handler(void *data);
179

    
180
typedef struct janus_voicemail_message {
181
        janus_plugin_session *handle;
182
        char *transaction;
183
        char *message;
184
        char *sdp_type;
185
        char *sdp;
186
} janus_voicemail_message;
187
static GAsyncQueue *messages = NULL;
188

    
189
void janus_voicemail_message_free(janus_voicemail_message *msg);
190
void janus_voicemail_message_free(janus_voicemail_message *msg) {
191
        if(!msg)
192
                return;
193

    
194
        msg->handle = NULL;
195

    
196
        g_free(msg->transaction);
197
        msg->transaction = NULL;
198
        g_free(msg->message);
199
        msg->message = NULL;
200
        g_free(msg->sdp_type);
201
        msg->sdp_type = NULL;
202
        g_free(msg->sdp);
203
        msg->sdp = NULL;
204

    
205
        g_free(msg);
206
}
207

    
208

    
209
typedef struct janus_voicemail_session {
210
        janus_plugin_session *handle;
211
        guint64 recording_id;
212
        gint64 start_time;
213
        char *filename;
214
        FILE *file;
215
        ogg_stream_state *stream;
216
        int seq;
217
        gboolean started;
218
        gboolean stopping;
219
        guint64 destroyed;        /* Time at which this session was marked as destroyed */
220
} janus_voicemail_session;
221
static GHashTable *sessions;
222
static GList *old_sessions;
223
static janus_mutex sessions_mutex;
224

    
225
static char *recordings_path = NULL;
226
static char *recordings_base = NULL;
227

    
228
/* SDP offer/answer template */
229
#define sdp_template \
230
                "v=0\r\n" \
231
                "o=- %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n"        /* We need current time here */ \
232
                "s=VoiceMail %"SCNu64"\r\n"                                                /* VoiceMail recording ID */ \
233
                "t=0 0\r\n" \
234
                "m=audio 1 RTP/SAVPF %d\r\n"                /* Opus payload type */ \
235
                "c=IN IP4 1.1.1.1\r\n" \
236
                "a=rtpmap:%d opus/48000/2\r\n"                /* Opus payload type */ \
237
                "a=recvonly\r\n"                                        /* This plugin doesn't send any frames */
238

    
239

    
240
/* OGG/Opus helpers */
241
void le32(unsigned char *p, int v);
242
void le16(unsigned char *p, int v);
243
ogg_packet *op_opushead(void);
244
ogg_packet *op_opustags(void);
245
ogg_packet *op_from_pkt(const unsigned char *pkt, int len);
246
void op_free(ogg_packet *op);
247
int ogg_write(janus_voicemail_session *session);
248
int ogg_flush(janus_voicemail_session *session);
249

    
250

    
251
/* Error codes */
252
#define JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR                499
253
#define JANUS_VOICEMAIL_ERROR_NO_MESSAGE                460
254
#define JANUS_VOICEMAIL_ERROR_INVALID_JSON                461
255
#define JANUS_VOICEMAIL_ERROR_INVALID_REQUEST        462
256
#define JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT        463
257
#define JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT        464
258
#define JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING        465
259
#define JANUS_VOICEMAIL_ERROR_IO_ERROR                        466
260
#define JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR                467
261

    
262

    
263
/* VoiceMail watchdog/garbage collector (sort of) */
264
void *janus_voicemail_watchdog(void *data);
265
void *janus_voicemail_watchdog(void *data) {
266
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog started\n");
267
        gint64 now = 0;
268
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
269
                janus_mutex_lock(&sessions_mutex);
270
                /* Iterate on all the sessions */
271
                now = janus_get_monotonic_time();
272
                if(old_sessions != NULL) {
273
                        GList *sl = old_sessions;
274
                        JANUS_LOG(LOG_HUGE, "Checking %d old VoiceMail sessions...\n", g_list_length(old_sessions));
275
                        while(sl) {
276
                                janus_voicemail_session *session = (janus_voicemail_session *)sl->data;
277
                                if(!session) {
278
                                        sl = sl->next;
279
                                        continue;
280
                                }
281
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
282
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
283
                                        JANUS_LOG(LOG_VERB, "Freeing old VoiceMail session\n");
284
                                        GList *rm = sl->next;
285
                                        old_sessions = g_list_delete_link(old_sessions, sl);
286
                                        sl = rm;
287
                                        session->handle = NULL;
288
                                        g_free(session);
289
                                        session = NULL;
290
                                        continue;
291
                                }
292
                                sl = sl->next;
293
                        }
294
                }
295
                janus_mutex_unlock(&sessions_mutex);
296
                g_usleep(500000);
297
        }
298
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog stopped\n");
299
        return NULL;
300
}
301

    
302

    
303
/* Plugin implementation */
304
int janus_voicemail_init(janus_callbacks *callback, const char *config_path) {
305
        if(g_atomic_int_get(&stopping)) {
306
                /* Still stopping from before */
307
                return -1;
308
        }
309
        if(callback == NULL || config_path == NULL) {
310
                /* Invalid arguments */
311
                return -1;
312
        }
313

    
314
        /* Read configuration */
315
        char filename[255];
316
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_VOICEMAIL_PACKAGE);
317
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
318
        janus_config *config = janus_config_parse(filename);
319
        if(config != NULL)
320
                janus_config_print(config);
321
        
322
        sessions = g_hash_table_new(NULL, NULL);
323
        janus_mutex_init(&sessions_mutex);
324
        messages = g_async_queue_new_full((GDestroyNotify) janus_voicemail_message_free);
325
        /* This is the callback we'll need to invoke to contact the gateway */
326
        gateway = callback;
327

    
328
        /* Parse configuration */
329
        if(config != NULL) {
330
                janus_config_item *path = janus_config_get_item_drilldown(config, "general", "path");
331
                if(path && path->value)
332
                        recordings_path = g_strdup(path->value);
333
                janus_config_item *base = janus_config_get_item_drilldown(config, "general", "base");
334
                if(base && base->value)
335
                        recordings_base = g_strdup(base->value);
336
                /* Done */
337
                janus_config_destroy(config);
338
                config = NULL;
339
        }
340
        if(recordings_path == NULL)
341
                recordings_path = g_strdup("./html/recordings/");
342
        if(recordings_base == NULL)
343
                recordings_base = g_strdup("/recordings/");
344
        JANUS_LOG(LOG_VERB, "Recordings path: %s\n", recordings_path);
345
        JANUS_LOG(LOG_VERB, "Recordings base: %s\n", recordings_base);
346
        /* Create the folder, if needed */
347
        struct stat st = {0};
348
        if(stat(recordings_path, &st) == -1) {
349
                int res = janus_mkdir(recordings_path, 0755);
350
                JANUS_LOG(LOG_VERB, "Creating folder: %d\n", res);
351
                if(res != 0) {
352
                        JANUS_LOG(LOG_ERR, "%s", strerror(res));
353
                        return -1;        /* No point going on... */
354
                }
355
        }
356
        
357
        g_atomic_int_set(&initialized, 1);
358

    
359
        GError *error = NULL;
360
        /* Start the sessions watchdog */
361
        watchdog = g_thread_try_new("vmail watchdog", &janus_voicemail_watchdog, NULL, &error);
362
        if(error != NULL) {
363
                g_atomic_int_set(&initialized, 0);
364
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail watchdog thread...\n", error->code, error->message ? error->message : "??");
365
                return -1;
366
        }
367
        /* Launch the thread that will handle incoming messages */
368
        handler_thread = g_thread_try_new("janus voicemail handler", janus_voicemail_handler, NULL, &error);
369
        if(error != NULL) {
370
                g_atomic_int_set(&initialized, 0);
371
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail handler thread...\n", error->code, error->message ? error->message : "??");
372
                return -1;
373
        }
374
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_VOICEMAIL_NAME);
375
        return 0;
376
}
377

    
378
void janus_voicemail_destroy(void) {
379
        if(!g_atomic_int_get(&initialized))
380
                return;
381
        g_atomic_int_set(&stopping, 1);
382
        if(handler_thread != NULL) {
383
                g_thread_join(handler_thread);
384
                handler_thread = NULL;
385
        }
386
        if(watchdog != NULL) {
387
                g_thread_join(watchdog);
388
                watchdog = NULL;
389
        }
390
        /* FIXME We should destroy the sessions cleanly */
391
        janus_mutex_lock(&sessions_mutex);
392
        g_hash_table_destroy(sessions);
393
        janus_mutex_unlock(&sessions_mutex);
394
        g_async_queue_unref(messages);
395
        messages = NULL;
396
        sessions = NULL;
397
        g_atomic_int_set(&initialized, 0);
398
        g_atomic_int_set(&stopping, 0);
399
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_VOICEMAIL_NAME);
400
}
401

    
402
int janus_voicemail_get_api_compatibility(void) {
403
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
404
        return JANUS_PLUGIN_API_VERSION;
405
}
406

    
407
int janus_voicemail_get_version(void) {
408
        return JANUS_VOICEMAIL_VERSION;
409
}
410

    
411
const char *janus_voicemail_get_version_string(void) {
412
        return JANUS_VOICEMAIL_VERSION_STRING;
413
}
414

    
415
const char *janus_voicemail_get_description(void) {
416
        return JANUS_VOICEMAIL_DESCRIPTION;
417
}
418

    
419
const char *janus_voicemail_get_name(void) {
420
        return JANUS_VOICEMAIL_NAME;
421
}
422

    
423
const char *janus_voicemail_get_author(void) {
424
        return JANUS_VOICEMAIL_AUTHOR;
425
}
426

    
427
const char *janus_voicemail_get_package(void) {
428
        return JANUS_VOICEMAIL_PACKAGE;
429
}
430

    
431
void janus_voicemail_create_session(janus_plugin_session *handle, int *error) {
432
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
433
                *error = -1;
434
                return;
435
        }        
436
        janus_voicemail_session *session = (janus_voicemail_session *)calloc(1, sizeof(janus_voicemail_session));
437
        if(session == NULL) {
438
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
439
                *error = -2;
440
                return;
441
        }
442
        session->handle = handle;
443
        session->recording_id = g_random_int();
444
        session->start_time = 0;
445
        session->stream = NULL;
446
        char f[255];
447
        g_snprintf(f, 255, "%s/janus-voicemail-%"SCNu64".opus", recordings_path, session->recording_id);
448
        session->filename = g_strdup(f);
449
        if(session->filename == NULL) {
450
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
451
                *error = -2;
452
                return;
453
        }
454
        session->file = NULL;
455
        session->seq = 0;
456
        session->started = FALSE;
457
        session->stopping = FALSE;
458
        session->destroyed = 0;
459
        handle->plugin_handle = session;
460
        janus_mutex_lock(&sessions_mutex);
461
        g_hash_table_insert(sessions, handle, session);
462
        janus_mutex_unlock(&sessions_mutex);
463

    
464
        return;
465
}
466

    
467
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error) {
468
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
469
                *error = -1;
470
                return;
471
        }        
472
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle; 
473
        if(!session) {
474
                JANUS_LOG(LOG_ERR, "No VoiceMail session associated with this handle...\n");
475
                *error = -2;
476
                return;
477
        }
478
        janus_mutex_lock(&sessions_mutex);
479
        if(!session->destroyed) {
480
                session->destroyed = janus_get_monotonic_time();
481
                JANUS_LOG(LOG_VERB, "Removing VoiceMail session...\n");
482
                g_hash_table_remove(sessions, handle);
483
                janus_voicemail_hangup_media(handle);
484
                /* Cleaning up and removing the session is done in a lazy way */
485
                old_sessions = g_list_append(old_sessions, session);
486
        }
487
        janus_mutex_unlock(&sessions_mutex);
488

    
489
        return;
490
}
491

    
492
char *janus_voicemail_query_session(janus_plugin_session *handle) {
493
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
494
                return NULL;
495
        }        
496
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
497
        if(!session) {
498
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
499
                return NULL;
500
        }
501
        /* In the echo test, every session is the same: we just provide some configure info */
502
        json_t *info = json_object();
503
        json_object_set_new(info, "state", json_string(session->stream ? "recording" : "idle"));
504
        if(session->stream) {
505
                json_object_set_new(info, "id", json_integer(session->recording_id));
506
                json_object_set_new(info, "start_time", json_integer(session->start_time));
507
                json_object_set_new(info, "filename", session->filename ? json_string(session->filename) : NULL);
508
        }
509
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
510
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
511
        json_decref(info);
512
        return info_text;
513
}
514

    
515
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
516
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
517
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
518
        janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
519
        if(msg == NULL) {
520
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
521
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
522
        }
523
        msg->handle = handle;
524
        msg->transaction = transaction;
525
        msg->message = message;
526
        msg->sdp_type = sdp_type;
527
        msg->sdp = sdp;
528
        g_async_queue_push(messages, msg);
529

    
530
        /* All the requests to this plugin are handled asynchronously */
531
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
532
}
533

    
534
void janus_voicemail_setup_media(janus_plugin_session *handle) {
535
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
536
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
537
                return;
538
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
539
        if(!session) {
540
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
541
                return;
542
        }
543
        if(session->destroyed)
544
                return;
545
        /* Only start recording this peer when we get this event */
546
        session->start_time = janus_get_monotonic_time();
547
        session->started = TRUE;
548
        /* Prepare JSON event */
549
        json_t *event = json_object();
550
        json_object_set_new(event, "voicemail", json_string("event"));
551
        json_object_set_new(event, "status", json_string("started"));
552
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
553
        json_decref(event);
554
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
555
        int ret = gateway->push_event(handle, &janus_voicemail_plugin, NULL, event_text, NULL, NULL);
556
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
557
        g_free(event_text);
558
}
559

    
560
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
561
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
562
                return;
563
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
564
        if(!session || session->destroyed || session->stopping || !session->started || session->start_time == 0)
565
                return;
566
        gint64 now = janus_get_monotonic_time();
567
        /* Have 10 seconds passed? */
568
        if((now-session->start_time) >= 10*G_USEC_PER_SEC) {
569
                /* FIXME Simulate a "stop" coming from the browser */
570
                session->started = FALSE;
571
                janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
572
                if(msg == NULL) {
573
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
574
                        return;
575
                }
576
                msg->handle = handle;
577
                msg->message = g_strdup("{\"request\":\"stop\"}");
578
                msg->transaction = NULL;
579
                msg->sdp_type = NULL;
580
                msg->sdp = NULL;
581
                g_async_queue_push(messages, msg);
582
                return;
583
        }
584
        /* Save the frame */
585
        rtp_header *rtp = (rtp_header *)buf;
586
        uint16_t seq = ntohs(rtp->seq_number);
587
        if(session->seq == 0)
588
                session->seq = seq;
589
        ogg_packet *op = op_from_pkt((const unsigned char *)(buf+12), len-12);        /* TODO Check RTP extensions... */
590
        //~ JANUS_LOG(LOG_VERB, "\tWriting at position %d (%d)\n", seq-session->seq+1, 960*(seq-session->seq+1));
591
        op->granulepos = 960*(seq-session->seq+1); // FIXME: get this from the toc byte
592
        ogg_stream_packetin(session->stream, op);
593
        free(op);
594
        ogg_write(session);
595
}
596

    
597
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
598
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
599
                return;
600
        /* FIXME Should we care? */
601
}
602

    
603
void janus_voicemail_hangup_media(janus_plugin_session *handle) {
604
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
605
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
606
                return;
607
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
608
        if(!session) {
609
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
610
                return;
611
        }
612
        if(session->destroyed)
613
                return;
614
        session->started = FALSE;
615
        /* Close and reset stuff */
616
        if(session->file)
617
                fclose(session->file);
618
        session->file = NULL;
619
        if(session->stream)
620
                ogg_stream_destroy(session->stream);
621
        session->stream = NULL;
622
}
623

    
624
/* Thread to handle incoming messages */
625
static void *janus_voicemail_handler(void *data) {
626
        JANUS_LOG(LOG_VERB, "Joining VoiceMail handler thread\n");
627
        janus_voicemail_message *msg = NULL;
628
        int error_code = 0;
629
        char *error_cause = calloc(512, sizeof(char));
630
        if(error_cause == NULL) {
631
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
632
                return NULL;
633
        }
634
        json_t *root = NULL;
635
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
636
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
637
                        usleep(50000);
638
                        continue;
639
                }
640
                janus_voicemail_session *session = (janus_voicemail_session *)msg->handle->plugin_handle;        
641
                if(!session) {
642
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
643
                        janus_voicemail_message_free(msg);
644
                        continue;
645
                }
646
                if(session->destroyed) {
647
                        janus_voicemail_message_free(msg);
648
                        continue;
649
                }
650
                /* Handle request */
651
                error_code = 0;
652
                root = NULL;
653
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
654
                if(msg->message == NULL) {
655
                        JANUS_LOG(LOG_ERR, "No message??\n");
656
                        error_code = JANUS_VOICEMAIL_ERROR_NO_MESSAGE;
657
                        g_snprintf(error_cause, 512, "%s", "No message??");
658
                        goto error;
659
                }
660
                json_error_t error;
661
                root = json_loads(msg->message, 0, &error);
662
                if(!root) {
663
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
664
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
665
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
666
                        goto error;
667
                }
668
                if(!json_is_object(root)) {
669
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
670
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
671
                        g_snprintf(error_cause, 512, "JSON error: not an object");
672
                        goto error;
673
                }
674
                /* Get the request first */
675
                json_t *request = json_object_get(root, "request");
676
                if(!request) {
677
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
678
                        error_code = JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT;
679
                        g_snprintf(error_cause, 512, "Missing element (request)");
680
                        goto error;
681
                }
682
                if(!json_is_string(request)) {
683
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
684
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT;
685
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
686
                        goto error;
687
                }
688
                const char *request_text = json_string_value(request);
689
                json_t *event = NULL;
690
                if(!strcasecmp(request_text, "record")) {
691
                        JANUS_LOG(LOG_VERB, "Starting new recording\n");
692
                        if(session->file != NULL) {
693
                                JANUS_LOG(LOG_ERR, "Already recording (%s)\n", session->filename ? session->filename : "??");
694
                                error_code = JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING;
695
                                g_snprintf(error_cause, 512, "Already recording");
696
                                goto error;
697
                        }
698
                        session->stream = malloc(sizeof(ogg_stream_state));
699
                        if(session->stream == NULL) {
700
                                JANUS_LOG(LOG_ERR, "Couldn't allocate stream struct\n");
701
                                error_code = JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR;
702
                                g_snprintf(error_cause, 512, "Couldn't allocate stream struct");
703
                                goto error;
704
                        }
705
                        if(ogg_stream_init(session->stream, rand()) < 0) {
706
                                JANUS_LOG(LOG_ERR, "Couldn't initialize Ogg stream state\n");
707
                                error_code = JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR;
708
                                g_snprintf(error_cause, 512, "Couldn't initialize Ogg stream state\n");
709
                                goto error;
710
                        }
711
                        session->file = fopen(session->filename, "wb");
712
                        if(session->file == NULL) {
713
                                JANUS_LOG(LOG_ERR, "Couldn't open output file\n");
714
                                error_code = JANUS_VOICEMAIL_ERROR_IO_ERROR;
715
                                g_snprintf(error_cause, 512, "Couldn't open output file");
716
                                goto error;
717
                        }
718
                        session->seq = 0;
719
                        /* Write stream headers */
720
                        ogg_packet *op = op_opushead();
721
                        ogg_stream_packetin(session->stream, op);
722
                        op_free(op);
723
                        op = op_opustags();
724
                        ogg_stream_packetin(session->stream, op);
725
                        op_free(op);
726
                        ogg_flush(session);
727
                        /* Done: now wait for the setup_media callback to be called */
728
                        event = json_object();
729
                        json_object_set_new(event, "voicemail", json_string("event"));
730
                        json_object_set_new(event, "status", json_string(session->started ? "started" : "starting"));
731
                } else if(!strcasecmp(request_text, "stop")) {
732
                        /* Stop the recording */
733
                        session->started = FALSE;
734
                        session->stopping = TRUE;
735
                        if(session->file)
736
                                fclose(session->file);
737
                        session->file = NULL;
738
                        if(session->stream)
739
                                ogg_stream_destroy(session->stream);
740
                        session->stream = NULL;
741
                        /* Done: send the event and close the handle */
742
                        event = json_object();
743
                        json_object_set_new(event, "voicemail", json_string("event"));
744
                        json_object_set_new(event, "status", json_string("done"));
745
                        char url[1024];
746
                        g_snprintf(url, 1024, "%s/janus-voicemail-%"SCNu64".opus", recordings_base, session->recording_id);
747
                        json_object_set_new(event, "recording", json_string(url));
748
                } else {
749
                        JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);
750
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_REQUEST;
751
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
752
                        goto error;
753
                }
754

    
755
                json_decref(root);
756
                /* Prepare JSON event */
757
                JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");
758
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
759
                json_decref(event);
760
                /* Any SDP to handle? */
761
                if(!msg->sdp) {
762
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
763
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
764
                } else {
765
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
766
                        const char *type = NULL;
767
                        if(!strcasecmp(msg->sdp_type, "offer"))
768
                                type = "answer";
769
                        if(!strcasecmp(msg->sdp_type, "answer"))
770
                                type = "offer";
771
                        /* Fill the SDP template and use that as our answer */
772
                        char sdp[1024];
773
                        /* What is the Opus payload type? */
774
                        int opus_pt = 0;
775
                        char *fmtp = strstr(msg->sdp, "opus/48000");
776
                        if(fmtp != NULL) {
777
                                fmtp -= 5;
778
                                fmtp = strstr(fmtp, ":");
779
                                if(fmtp)
780
                                        fmtp++;
781
                                opus_pt = atoi(fmtp);
782
                        }
783
                        JANUS_LOG(LOG_VERB, "Opus payload type is %d\n", opus_pt);
784
                        g_snprintf(sdp, 1024, sdp_template,
785
                                janus_get_monotonic_time(),                /* We need current time here */
786
                                janus_get_monotonic_time(),                /* We need current time here */
787
                                session->recording_id,                        /* Recording ID */
788
                                opus_pt,                                                /* Opus payload type */
789
                                opus_pt                                                        /* Opus payload type */);
790
                        /* Did the peer negotiate video? */
791
                        if(strstr(msg->sdp, "m=video") != NULL) {
792
                                /* If so, reject it */
793
                                g_strlcat(sdp, "m=video 0 RTP/SAVPF 0\r\n", 1024);                                
794
                        }
795
                        /* How long will the gateway take to push the event? */
796
                        gint64 start = janus_get_monotonic_time();
797
                        int res = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, type, sdp);
798
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);
799
                        if(res != JANUS_OK) {
800
                                /* TODO Failed to negotiate? We should remove this participant */
801
                        }
802
                }
803
                g_free(event_text);
804
                janus_voicemail_message_free(msg);
805
                
806
                if(session->stopping) {
807
                        gateway->end_session(session->handle);
808
                }
809

    
810
                continue;
811
                
812
error:
813
                {
814
                        if(root != NULL)
815
                                json_decref(root);
816
                        /* Prepare JSON error event */
817
                        json_t *event = json_object();
818
                        json_object_set_new(event, "voicemail", json_string("event"));
819
                        json_object_set_new(event, "error_code", json_integer(error_code));
820
                        json_object_set_new(event, "error", json_string(error_cause));
821
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
822
                        json_decref(event);
823
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
824
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
825
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
826
                        g_free(event_text);
827
                        janus_voicemail_message_free(msg);
828
                }
829
        }
830
        g_free(error_cause);
831
        JANUS_LOG(LOG_VERB, "Leaving VoiceMail handler thread\n");
832
        return NULL;
833
}
834

    
835

    
836
/* OGG/Opus helpers */
837
/* Write a little-endian 32 bit int to memory */
838
void le32(unsigned char *p, int v) {
839
        p[0] = v & 0xff;
840
        p[1] = (v >> 8) & 0xff;
841
        p[2] = (v >> 16) & 0xff;
842
        p[3] = (v >> 24) & 0xff;
843
}
844

    
845

    
846
/* Write a little-endian 16 bit int to memory */
847
void le16(unsigned char *p, int v) {
848
        p[0] = v & 0xff;
849
        p[1] = (v >> 8) & 0xff;
850
}
851

    
852
/* ;anufacture a generic OpusHead packet */
853
ogg_packet *op_opushead(void) {
854
        int size = 19;
855
        unsigned char *data = malloc(size);
856
        ogg_packet *op = malloc(sizeof(*op));
857

    
858
        if(!data) {
859
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
860
                return NULL;
861
        }
862
        if(!op) {
863
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
864
                return NULL;
865
        }
866

    
867
        memcpy(data, "OpusHead", 8);  /* identifier */
868
        data[8] = 1;                  /* version */
869
        data[9] = 2;                  /* channels */
870
        le16(data+10, 0);             /* pre-skip */
871
        le32(data + 12, 48000);       /* original sample rate */
872
        le16(data + 16, 0);           /* gain */
873
        data[18] = 0;                 /* channel mapping family */
874

    
875
        op->packet = data;
876
        op->bytes = size;
877
        op->b_o_s = 1;
878
        op->e_o_s = 0;
879
        op->granulepos = 0;
880
        op->packetno = 0;
881

    
882
        return op;
883
}
884

    
885
/* Manufacture a generic OpusTags packet */
886
ogg_packet *op_opustags(void) {
887
        const char *identifier = "OpusTags";
888
        const char *vendor = "Janus VoiceMail plugin";
889
        int size = strlen(identifier) + 4 + strlen(vendor) + 4;
890
        unsigned char *data = malloc(size);
891
        ogg_packet *op = malloc(sizeof(*op));
892

    
893
        if(!data) {
894
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
895
                return NULL;
896
        }
897
        if(!op) {
898
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
899
                return NULL;
900
        }
901

    
902
        memcpy(data, identifier, 8);
903
        le32(data + 8, strlen(vendor));
904
        memcpy(data + 12, vendor, strlen(vendor));
905
        le32(data + 12 + strlen(vendor), 0);
906

    
907
        op->packet = data;
908
        op->bytes = size;
909
        op->b_o_s = 0;
910
        op->e_o_s = 0;
911
        op->granulepos = 0;
912
        op->packetno = 1;
913

    
914
        return op;
915
}
916

    
917
/* Allocate an ogg_packet */
918
ogg_packet *op_from_pkt(const unsigned char *pkt, int len) {
919
        ogg_packet *op = malloc(sizeof(*op));
920
        if(!op) {
921
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet.\n");
922
                return NULL;
923
        }
924

    
925
        op->packet = (unsigned char *)pkt;
926
        op->bytes = len;
927
        op->b_o_s = 0;
928
        op->e_o_s = 0;
929

    
930
        return op;
931
}
932

    
933
/* Free a packet and its contents */
934
void op_free(ogg_packet *op) {
935
        if(op) {
936
                if(op->packet) {
937
                        free(op->packet);
938
                }
939
                free(op);
940
        }
941
}
942

    
943
/* Write out available ogg pages */
944
int ogg_write(janus_voicemail_session *session) {
945
        ogg_page page;
946
        size_t written;
947

    
948
        if(!session || !session->stream || !session->file) {
949
                return -1;
950
        }
951

    
952
        while (ogg_stream_pageout(session->stream, &page)) {
953
                written = fwrite(page.header, 1, page.header_len, session->file);
954
                if(written != (size_t)page.header_len) {
955
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
956
                        return -2;
957
                }
958
                written = fwrite(page.body, 1, page.body_len, session->file);
959
                if(written != (size_t)page.body_len) {
960
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
961
                        return -3;
962
                }
963
        }
964
        return 0;
965
}
966

    
967
/* Flush remaining ogg data */
968
int ogg_flush(janus_voicemail_session *session) {
969
        ogg_page page;
970
        size_t written;
971

    
972
        if(!session || !session->stream || !session->file) {
973
                return -1;
974
        }
975

    
976
        while (ogg_stream_flush(session->stream, &page)) {
977
                written = fwrite(page.header, 1, page.header_len, session->file);
978
                if(written != (size_t)page.header_len) {
979
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
980
                        return -2;
981
                }
982
                written = fwrite(page.body, 1, page.body_len, session->file);
983
                if(written != (size_t)page.body_len) {
984
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
985
                        return -3;
986
                }
987
        }
988
        return 0;
989
}