Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_voicemail.c @ febef1ea

History | View | Annotate | Download (30 KB)

1
/*! \file   janus_voicemail.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus VoiceMail plugin
5
 * \details  This is a plugin implementing a very simple VoiceMail service
6
 * for Janus, specifically recording Opus streams. This means that it replies
7
 * by providing in the SDP only support for Opus, and disabling video.
8
 * When a peer contacts the plugin, the plugin starts recording the audio
9
 * frames it receives and, after 10 seconds, it shuts the PeerConnection
10
 * down and returns an URL to the recorded file.
11
 * 
12
 * Since an URL is returned, the plugin allows you to configure where the
13
 * recordings whould be stored (e.g., a folder in your web server, writable
14
 * by the plugin) and the base path to use when returning URLs (e.g.,
15
 * /my/recordings/ or http://www.example.com/my/recordings).
16
 * 
17
 * By default the plugin saves the recordings in the \c html folder of
18
 * this project, meaning that it can work out of the box with the VoiceMail
19
 * demo we provide in the same folder.
20
 *
21
 * \ingroup plugins
22
 * \ref plugins
23
 */
24

    
25
#include "plugin.h"
26

    
27
#include <jansson.h>
28
#include <sys/stat.h>
29
#include <sys/time.h>
30

    
31
#include <ogg/ogg.h>
32

    
33
#include "../debug.h"
34
#include "../apierror.h"
35
#include "../config.h"
36
#include "../mutex.h"
37
#include "../rtp.h"
38
#include "../utils.h"
39

    
40

    
41
/* Plugin information */
42
#define JANUS_VOICEMAIL_VERSION                        5
43
#define JANUS_VOICEMAIL_VERSION_STRING        "0.0.5"
44
#define JANUS_VOICEMAIL_DESCRIPTION                "This is a plugin implementing a very simple VoiceMail service for Janus, recording Opus streams."
45
#define JANUS_VOICEMAIL_NAME                        "JANUS VoiceMail plugin"
46
#define JANUS_VOICEMAIL_AUTHOR                        "Meetecho s.r.l."
47
#define JANUS_VOICEMAIL_PACKAGE                        "janus.plugin.voicemail"
48

    
49
/* Plugin methods */
50
janus_plugin *create(void);
51
int janus_voicemail_init(janus_callbacks *callback, const char *config_path);
52
void janus_voicemail_destroy(void);
53
int janus_voicemail_get_api_compatibility(void);
54
int janus_voicemail_get_version(void);
55
const char *janus_voicemail_get_version_string(void);
56
const char *janus_voicemail_get_description(void);
57
const char *janus_voicemail_get_name(void);
58
const char *janus_voicemail_get_author(void);
59
const char *janus_voicemail_get_package(void);
60
void janus_voicemail_create_session(janus_plugin_session *handle, int *error);
61
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
62
void janus_voicemail_setup_media(janus_plugin_session *handle);
63
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
64
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
65
void janus_voicemail_hangup_media(janus_plugin_session *handle);
66
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error);
67
char *janus_voicemail_query_session(janus_plugin_session *handle);
68

    
69
/* Plugin setup */
70
static janus_plugin janus_voicemail_plugin =
71
        {
72
                .init = janus_voicemail_init,
73
                .destroy = janus_voicemail_destroy,
74

    
75
                .get_api_compatibility = janus_voicemail_get_api_compatibility,
76
                .get_version = janus_voicemail_get_version,
77
                .get_version_string = janus_voicemail_get_version_string,
78
                .get_description = janus_voicemail_get_description,
79
                .get_name = janus_voicemail_get_name,
80
                .get_author = janus_voicemail_get_author,
81
                .get_package = janus_voicemail_get_package,
82
                
83
                .create_session = janus_voicemail_create_session,
84
                .handle_message = janus_voicemail_handle_message,
85
                .setup_media = janus_voicemail_setup_media,
86
                .incoming_rtp = janus_voicemail_incoming_rtp,
87
                .incoming_rtcp = janus_voicemail_incoming_rtcp,
88
                .hangup_media = janus_voicemail_hangup_media,
89
                .destroy_session = janus_voicemail_destroy_session,
90
                .query_session = janus_voicemail_query_session,
91
        }; 
92

    
93
/* Plugin creator */
94
janus_plugin *create(void) {
95
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_VOICEMAIL_NAME);
96
        return &janus_voicemail_plugin;
97
}
98

    
99

    
100
/* Useful stuff */
101
static gint initialized = 0, stopping = 0;
102
static janus_callbacks *gateway = NULL;
103
static GThread *handler_thread;
104
static GThread *watchdog;
105
static void *janus_voicemail_handler(void *data);
106

    
107
typedef struct janus_voicemail_message {
108
        janus_plugin_session *handle;
109
        char *transaction;
110
        char *message;
111
        char *sdp_type;
112
        char *sdp;
113
} janus_voicemail_message;
114
static GAsyncQueue *messages = NULL;
115

    
116
void janus_voicemail_message_free(janus_voicemail_message *msg);
117
void janus_voicemail_message_free(janus_voicemail_message *msg) {
118
        if(!msg)
119
                return;
120

    
121
        msg->handle = NULL;
122

    
123
        g_free(msg->transaction);
124
        msg->transaction = NULL;
125
        g_free(msg->message);
126
        msg->message = NULL;
127
        g_free(msg->sdp_type);
128
        msg->sdp_type = NULL;
129
        g_free(msg->sdp);
130
        msg->sdp = NULL;
131

    
132
        g_free(msg);
133
}
134

    
135

    
136
typedef struct janus_voicemail_session {
137
        janus_plugin_session *handle;
138
        guint64 recording_id;
139
        gint64 start_time;
140
        char *filename;
141
        FILE *file;
142
        ogg_stream_state *stream;
143
        int seq;
144
        gboolean started;
145
        gboolean stopping;
146
        guint64 destroyed;        /* Time at which this session was marked as destroyed */
147
} janus_voicemail_session;
148
static GHashTable *sessions;
149
static GList *old_sessions;
150
static janus_mutex sessions_mutex;
151

    
152
static char *recordings_path = NULL;
153
static char *recordings_base = NULL;
154

    
155
/* SDP offer/answer template */
156
#define sdp_template \
157
                "v=0\r\n" \
158
                "o=- %"SCNu64" %"SCNu64" IN IP4 127.0.0.1\r\n"        /* We need current time here */ \
159
                "s=VoiceMail %"SCNu64"\r\n"                                                /* VoiceMail recording ID */ \
160
                "t=0 0\r\n" \
161
                "m=audio 1 RTP/SAVPF %d\r\n"                /* Opus payload type */ \
162
                "c=IN IP4 1.1.1.1\r\n" \
163
                "a=rtpmap:%d opus/48000/2\r\n"                /* Opus payload type */ \
164
                "a=recvonly\r\n"                                        /* This plugin doesn't send any frames */
165

    
166

    
167
/* OGG/Opus helpers */
168
void le32(unsigned char *p, int v);
169
void le16(unsigned char *p, int v);
170
ogg_packet *op_opushead(void);
171
ogg_packet *op_opustags(void);
172
ogg_packet *op_from_pkt(const unsigned char *pkt, int len);
173
void op_free(ogg_packet *op);
174
int ogg_write(janus_voicemail_session *session);
175
int ogg_flush(janus_voicemail_session *session);
176

    
177

    
178
/* Error codes */
179
#define JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR                499
180
#define JANUS_VOICEMAIL_ERROR_NO_MESSAGE                460
181
#define JANUS_VOICEMAIL_ERROR_INVALID_JSON                461
182
#define JANUS_VOICEMAIL_ERROR_INVALID_REQUEST        462
183
#define JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT        463
184
#define JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT        464
185
#define JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING        465
186
#define JANUS_VOICEMAIL_ERROR_IO_ERROR                        466
187
#define JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR                467
188

    
189

    
190
/* VoiceMail watchdog/garbage collector (sort of) */
191
void *janus_voicemail_watchdog(void *data);
192
void *janus_voicemail_watchdog(void *data) {
193
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog started\n");
194
        gint64 now = 0;
195
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
196
                janus_mutex_lock(&sessions_mutex);
197
                /* Iterate on all the sessions */
198
                now = janus_get_monotonic_time();
199
                if(old_sessions != NULL) {
200
                        GList *sl = old_sessions;
201
                        JANUS_LOG(LOG_VERB, "Checking %d old sessions\n", g_list_length(old_sessions));
202
                        while(sl) {
203
                                janus_voicemail_session *session = (janus_voicemail_session *)sl->data;
204
                                if(!session) {
205
                                        sl = sl->next;
206
                                        continue;
207
                                }
208
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
209
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
210
                                        GList *rm = sl->next;
211
                                        old_sessions = g_list_delete_link(old_sessions, sl);
212
                                        sl = rm;
213
                                        session->handle = NULL;
214
                                        g_free(session);
215
                                        session = NULL;
216
                                        continue;
217
                                }
218
                                sl = sl->next;
219
                        }
220
                }
221
                janus_mutex_unlock(&sessions_mutex);
222
                g_usleep(2000000);
223
        }
224
        JANUS_LOG(LOG_INFO, "VoiceMail watchdog stopped\n");
225
        return NULL;
226
}
227

    
228

    
229
/* Plugin implementation */
230
int janus_voicemail_init(janus_callbacks *callback, const char *config_path) {
231
        if(g_atomic_int_get(&stopping)) {
232
                /* Still stopping from before */
233
                return -1;
234
        }
235
        if(callback == NULL || config_path == NULL) {
236
                /* Invalid arguments */
237
                return -1;
238
        }
239

    
240
        /* Read configuration */
241
        char filename[255];
242
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_VOICEMAIL_PACKAGE);
243
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
244
        janus_config *config = janus_config_parse(filename);
245
        if(config != NULL)
246
                janus_config_print(config);
247
        
248
        sessions = g_hash_table_new(NULL, NULL);
249
        janus_mutex_init(&sessions_mutex);
250
        messages = g_async_queue_new_full((GDestroyNotify) janus_voicemail_message_free);
251
        /* This is the callback we'll need to invoke to contact the gateway */
252
        gateway = callback;
253

    
254
        /* Parse configuration */
255
        if(config != NULL) {
256
                janus_config_item *path = janus_config_get_item_drilldown(config, "general", "path");
257
                if(path && path->value)
258
                        recordings_path = g_strdup(path->value);
259
                janus_config_item *base = janus_config_get_item_drilldown(config, "general", "base");
260
                if(base && base->value)
261
                        recordings_base = g_strdup(base->value);
262
                /* Done */
263
                janus_config_destroy(config);
264
                config = NULL;
265
        }
266
        if(recordings_path == NULL)
267
                recordings_path = g_strdup("./html/recordings/");
268
        if(recordings_base == NULL)
269
                recordings_base = g_strdup("/recordings/");
270
        JANUS_LOG(LOG_VERB, "Recordings path: %s\n", recordings_path);
271
        JANUS_LOG(LOG_VERB, "Recordings base: %s\n", recordings_base);
272
        /* Create the folder, if needed */
273
        struct stat st = {0};
274
        if(stat(recordings_path, &st) == -1) {
275
                int res = janus_mkdir(recordings_path, 0755);
276
                JANUS_LOG(LOG_VERB, "Creating folder: %d\n", res);
277
                if(res != 0) {
278
                        JANUS_LOG(LOG_ERR, "%s", strerror(res));
279
                        return -1;        /* No point going on... */
280
                }
281
        }
282
        
283
        g_atomic_int_set(&initialized, 1);
284

    
285
        GError *error = NULL;
286
        /* Start the sessions watchdog */
287
        watchdog = g_thread_try_new("vmail watchdog", &janus_voicemail_watchdog, NULL, &error);
288
        if(error != NULL) {
289
                g_atomic_int_set(&initialized, 0);
290
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail watchdog thread...\n", error->code, error->message ? error->message : "??");
291
                return -1;
292
        }
293
        /* Launch the thread that will handle incoming messages */
294
        handler_thread = g_thread_try_new("janus voicemail handler", janus_voicemail_handler, NULL, &error);
295
        if(error != NULL) {
296
                g_atomic_int_set(&initialized, 0);
297
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VoiceMail handler thread...\n", error->code, error->message ? error->message : "??");
298
                return -1;
299
        }
300
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_VOICEMAIL_NAME);
301
        return 0;
302
}
303

    
304
void janus_voicemail_destroy(void) {
305
        if(!g_atomic_int_get(&initialized))
306
                return;
307
        g_atomic_int_set(&stopping, 1);
308
        if(handler_thread != NULL) {
309
                g_thread_join(handler_thread);
310
                handler_thread = NULL;
311
        }
312
        if(watchdog != NULL) {
313
                g_thread_join(watchdog);
314
                watchdog = NULL;
315
        }
316
        /* FIXME We should destroy the sessions cleanly */
317
        janus_mutex_lock(&sessions_mutex);
318
        g_hash_table_destroy(sessions);
319
        janus_mutex_unlock(&sessions_mutex);
320
        g_async_queue_unref(messages);
321
        messages = NULL;
322
        sessions = NULL;
323
        g_atomic_int_set(&initialized, 0);
324
        g_atomic_int_set(&stopping, 0);
325
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_VOICEMAIL_NAME);
326
}
327

    
328
int janus_voicemail_get_api_compatibility(void) {
329
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
330
        return JANUS_PLUGIN_API_VERSION;
331
}
332

    
333
int janus_voicemail_get_version(void) {
334
        return JANUS_VOICEMAIL_VERSION;
335
}
336

    
337
const char *janus_voicemail_get_version_string(void) {
338
        return JANUS_VOICEMAIL_VERSION_STRING;
339
}
340

    
341
const char *janus_voicemail_get_description(void) {
342
        return JANUS_VOICEMAIL_DESCRIPTION;
343
}
344

    
345
const char *janus_voicemail_get_name(void) {
346
        return JANUS_VOICEMAIL_NAME;
347
}
348

    
349
const char *janus_voicemail_get_author(void) {
350
        return JANUS_VOICEMAIL_AUTHOR;
351
}
352

    
353
const char *janus_voicemail_get_package(void) {
354
        return JANUS_VOICEMAIL_PACKAGE;
355
}
356

    
357
void janus_voicemail_create_session(janus_plugin_session *handle, int *error) {
358
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
359
                *error = -1;
360
                return;
361
        }        
362
        janus_voicemail_session *session = (janus_voicemail_session *)calloc(1, sizeof(janus_voicemail_session));
363
        if(session == NULL) {
364
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
365
                *error = -2;
366
                return;
367
        }
368
        session->handle = handle;
369
        session->recording_id = g_random_int();
370
        session->start_time = 0;
371
        session->stream = NULL;
372
        char f[255];
373
        g_snprintf(f, 255, "%s/janus-voicemail-%"SCNu64".opus", recordings_path, session->recording_id);
374
        session->filename = g_strdup(f);
375
        if(session->filename == NULL) {
376
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
377
                *error = -2;
378
                return;
379
        }
380
        session->file = NULL;
381
        session->seq = 0;
382
        session->started = FALSE;
383
        session->stopping = FALSE;
384
        session->destroyed = 0;
385
        handle->plugin_handle = session;
386
        janus_mutex_lock(&sessions_mutex);
387
        g_hash_table_insert(sessions, handle, session);
388
        janus_mutex_unlock(&sessions_mutex);
389

    
390
        return;
391
}
392

    
393
void janus_voicemail_destroy_session(janus_plugin_session *handle, int *error) {
394
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
395
                *error = -1;
396
                return;
397
        }        
398
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle; 
399
        if(!session) {
400
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
401
                *error = -2;
402
                return;
403
        }
404
        if(session->destroyed) {
405
                JANUS_LOG(LOG_WARN, "Session already destroyed...\n");
406
                return;
407
        }
408
        JANUS_LOG(LOG_VERB, "Removing VoiceMail session...\n");
409
        janus_mutex_lock(&sessions_mutex);
410
        g_hash_table_remove(sessions, handle);
411
        janus_mutex_unlock(&sessions_mutex);
412
        janus_voicemail_hangup_media(handle);
413
        /* Cleaning up and removing the session is done in a lazy way */
414
        session->destroyed = janus_get_monotonic_time();
415
        janus_mutex_lock(&sessions_mutex);
416
        old_sessions = g_list_append(old_sessions, session);
417
        janus_mutex_unlock(&sessions_mutex);
418

    
419
        return;
420
}
421

    
422
char *janus_voicemail_query_session(janus_plugin_session *handle) {
423
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
424
                return NULL;
425
        }        
426
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
427
        if(!session) {
428
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
429
                return NULL;
430
        }
431
        /* In the echo test, every session is the same: we just provide some configure info */
432
        json_t *info = json_object();
433
        json_object_set_new(info, "state", json_string(session->stream ? "recording" : "idle"));
434
        if(session->stream) {
435
                json_object_set_new(info, "id", json_integer(session->recording_id));
436
                json_object_set_new(info, "start_time", json_integer(session->start_time));
437
                json_object_set_new(info, "filename", session->filename ? json_string(session->filename) : NULL);
438
        }
439
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
440
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
441
        json_decref(info);
442
        return info_text;
443
}
444

    
445
struct janus_plugin_result *janus_voicemail_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
446
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
447
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
448
        JANUS_LOG(LOG_VERB, "%s\n", message);
449
        janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
450
        if(msg == NULL) {
451
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
452
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
453
        }
454
        msg->handle = handle;
455
        msg->transaction = transaction;
456
        msg->message = message;
457
        msg->sdp_type = sdp_type;
458
        msg->sdp = sdp;
459
        g_async_queue_push(messages, msg);
460

    
461
        /* All the requests to this plugin are handled asynchronously */
462
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL);
463
}
464

    
465
void janus_voicemail_setup_media(janus_plugin_session *handle) {
466
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
467
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
468
                return;
469
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
470
        if(!session) {
471
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
472
                return;
473
        }
474
        if(session->destroyed)
475
                return;
476
        /* Only start recording this peer when we get this event */
477
        session->start_time = janus_get_monotonic_time();
478
        session->started = TRUE;
479
        /* Prepare JSON event */
480
        json_t *event = json_object();
481
        json_object_set_new(event, "voicemail", json_string("event"));
482
        json_object_set_new(event, "status", json_string("started"));
483
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
484
        json_decref(event);
485
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
486
        int ret = gateway->push_event(handle, &janus_voicemail_plugin, NULL, event_text, NULL, NULL);
487
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
488
        g_free(event_text);
489
}
490

    
491
void janus_voicemail_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
492
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
493
                return;
494
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;        
495
        if(!session || session->destroyed || session->stopping || !session->started || session->start_time == 0)
496
                return;
497
        gint64 now = janus_get_monotonic_time();
498
        /* Have 10 seconds passed? */
499
        if((now-session->start_time) >= 10*G_USEC_PER_SEC) {
500
                /* FIXME Simulate a "stop" coming from the browser */
501
                session->started = FALSE;
502
                janus_voicemail_message *msg = calloc(1, sizeof(janus_voicemail_message));
503
                if(msg == NULL) {
504
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
505
                        return;
506
                }
507
                msg->handle = handle;
508
                msg->message = g_strdup("{\"request\":\"stop\"}");
509
                msg->transaction = NULL;
510
                msg->sdp_type = NULL;
511
                msg->sdp = NULL;
512
                g_async_queue_push(messages, msg);
513
                return;
514
        }
515
        /* Save the frame */
516
        rtp_header *rtp = (rtp_header *)buf;
517
        uint16_t seq = ntohs(rtp->seq_number);
518
        if(session->seq == 0)
519
                session->seq = seq;
520
        ogg_packet *op = op_from_pkt((const unsigned char *)(buf+12), len-12);        /* TODO Check RTP extensions... */
521
        //~ JANUS_LOG(LOG_VERB, "\tWriting at position %d (%d)\n", seq-session->seq+1, 960*(seq-session->seq+1));
522
        op->granulepos = 960*(seq-session->seq+1); // FIXME: get this from the toc byte
523
        ogg_stream_packetin(session->stream, op);
524
        free(op);
525
        ogg_write(session);
526
}
527

    
528
void janus_voicemail_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
529
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
530
                return;
531
        /* FIXME Should we care? */
532
}
533

    
534
void janus_voicemail_hangup_media(janus_plugin_session *handle) {
535
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
536
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
537
                return;
538
        janus_voicemail_session *session = (janus_voicemail_session *)handle->plugin_handle;
539
        if(!session) {
540
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
541
                return;
542
        }
543
        if(session->destroyed)
544
                return;
545
        session->started = FALSE;
546
        /* Close and reset stuff */
547
        if(session->file)
548
                fclose(session->file);
549
        session->file = NULL;
550
        if(session->stream)
551
                ogg_stream_destroy(session->stream);
552
        session->stream = NULL;
553
}
554

    
555
/* Thread to handle incoming messages */
556
static void *janus_voicemail_handler(void *data) {
557
        JANUS_LOG(LOG_VERB, "Joining VoiceMail handler thread\n");
558
        janus_voicemail_message *msg = NULL;
559
        int error_code = 0;
560
        char *error_cause = calloc(512, sizeof(char));        /* FIXME 512 should be enough, but anyway... */
561
        if(error_cause == NULL) {
562
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
563
                return NULL;
564
        }
565
        json_t *root = NULL;
566
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
567
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
568
                        usleep(50000);
569
                        continue;
570
                }
571
                janus_voicemail_session *session = (janus_voicemail_session *)msg->handle->plugin_handle;        
572
                if(!session) {
573
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
574
                        janus_voicemail_message_free(msg);
575
                        continue;
576
                }
577
                if(session->destroyed) {
578
                        janus_voicemail_message_free(msg);
579
                        continue;
580
                }
581
                /* Handle request */
582
                error_code = 0;
583
                root = NULL;
584
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
585
                if(msg->message == NULL) {
586
                        JANUS_LOG(LOG_ERR, "No message??\n");
587
                        error_code = JANUS_VOICEMAIL_ERROR_NO_MESSAGE;
588
                        g_snprintf(error_cause, 512, "%s", "No message??");
589
                        goto error;
590
                }
591
                json_error_t error;
592
                root = json_loads(msg->message, 0, &error);
593
                if(!root) {
594
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
595
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
596
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
597
                        goto error;
598
                }
599
                if(!json_is_object(root)) {
600
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
601
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_JSON;
602
                        g_snprintf(error_cause, 512, "JSON error: not an object");
603
                        goto error;
604
                }
605
                /* Get the request first */
606
                json_t *request = json_object_get(root, "request");
607
                if(!request) {
608
                        JANUS_LOG(LOG_ERR, "Missing element (request)\n");
609
                        error_code = JANUS_VOICEMAIL_ERROR_MISSING_ELEMENT;
610
                        g_snprintf(error_cause, 512, "Missing element (request)");
611
                        goto error;
612
                }
613
                if(!json_is_string(request)) {
614
                        JANUS_LOG(LOG_ERR, "Invalid element (request should be a string)\n");
615
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_ELEMENT;
616
                        g_snprintf(error_cause, 512, "Invalid element (request should be a string)");
617
                        goto error;
618
                }
619
                const char *request_text = json_string_value(request);
620
                json_t *event = NULL;
621
                if(!strcasecmp(request_text, "record")) {
622
                        JANUS_LOG(LOG_VERB, "Starting new recording\n");
623
                        if(session->file != NULL) {
624
                                JANUS_LOG(LOG_ERR, "Already recording (%s)\n", session->filename ? session->filename : "??");
625
                                error_code = JANUS_VOICEMAIL_ERROR_ALREADY_RECORDING;
626
                                g_snprintf(error_cause, 512, "Already recording");
627
                                goto error;
628
                        }
629
                        session->stream = malloc(sizeof(ogg_stream_state));
630
                        if(session->stream == NULL) {
631
                                JANUS_LOG(LOG_ERR, "Couldn't allocate stream struct\n");
632
                                error_code = JANUS_VOICEMAIL_ERROR_UNKNOWN_ERROR;
633
                                g_snprintf(error_cause, 512, "Couldn't allocate stream struct");
634
                                goto error;
635
                        }
636
                        if(ogg_stream_init(session->stream, rand()) < 0) {
637
                                JANUS_LOG(LOG_ERR, "Couldn't initialize Ogg stream state\n");
638
                                error_code = JANUS_VOICEMAIL_ERROR_LIBOGG_ERROR;
639
                                g_snprintf(error_cause, 512, "Couldn't initialize Ogg stream state\n");
640
                                goto error;
641
                        }
642
                        session->file = fopen(session->filename, "wb");
643
                        if(session->file == NULL) {
644
                                JANUS_LOG(LOG_ERR, "Couldn't open output file\n");
645
                                error_code = JANUS_VOICEMAIL_ERROR_IO_ERROR;
646
                                g_snprintf(error_cause, 512, "Couldn't open output file");
647
                                goto error;
648
                        }
649
                        session->seq = 0;
650
                        /* Write stream headers */
651
                        ogg_packet *op = op_opushead();
652
                        ogg_stream_packetin(session->stream, op);
653
                        op_free(op);
654
                        op = op_opustags();
655
                        ogg_stream_packetin(session->stream, op);
656
                        op_free(op);
657
                        ogg_flush(session);
658
                        /* Done: now wait for the setup_media callback to be called */
659
                        event = json_object();
660
                        json_object_set_new(event, "voicemail", json_string("event"));
661
                        json_object_set_new(event, "status", json_string(session->started ? "started" : "starting"));
662
                } else if(!strcasecmp(request_text, "stop")) {
663
                        /* Stop the recording */
664
                        session->started = FALSE;
665
                        session->stopping = TRUE;
666
                        if(session->file)
667
                                fclose(session->file);
668
                        session->file = NULL;
669
                        if(session->stream)
670
                                ogg_stream_destroy(session->stream);
671
                        session->stream = NULL;
672
                        /* Done: send the event and close the handle */
673
                        event = json_object();
674
                        json_object_set_new(event, "voicemail", json_string("event"));
675
                        json_object_set_new(event, "status", json_string("done"));
676
                        char url[1024];
677
                        g_snprintf(url, 1024, "%s/janus-voicemail-%"SCNu64".opus", recordings_base, session->recording_id);
678
                        json_object_set_new(event, "recording", json_string(url));
679
                } else {
680
                        JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);
681
                        error_code = JANUS_VOICEMAIL_ERROR_INVALID_REQUEST;
682
                        g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
683
                        goto error;
684
                }
685

    
686
                json_decref(root);
687
                /* Prepare JSON event */
688
                JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");
689
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
690
                json_decref(event);
691
                /* Any SDP to handle? */
692
                if(!msg->sdp) {
693
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
694
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
695
                } else {
696
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
697
                        const char *type = NULL;
698
                        if(!strcasecmp(msg->sdp_type, "offer"))
699
                                type = "answer";
700
                        if(!strcasecmp(msg->sdp_type, "answer"))
701
                                type = "offer";
702
                        /* Fill the SDP template and use that as our answer */
703
                        char sdp[1024];
704
                        /* What is the Opus payload type? */
705
                        int opus_pt = 0;
706
                        char *fmtp = strstr(msg->sdp, "opus/48000");
707
                        if(fmtp != NULL) {
708
                                fmtp -= 5;
709
                                fmtp = strstr(fmtp, ":");
710
                                if(fmtp)
711
                                        fmtp++;
712
                                opus_pt = atoi(fmtp);
713
                        }
714
                        JANUS_LOG(LOG_VERB, "Opus payload type is %d\n", opus_pt);
715
                        g_snprintf(sdp, 1024, sdp_template,
716
                                janus_get_monotonic_time(),                /* We need current time here */
717
                                janus_get_monotonic_time(),                /* We need current time here */
718
                                session->recording_id,                        /* Recording ID */
719
                                opus_pt,                                                /* Opus payload type */
720
                                opus_pt                                                        /* Opus payload type */);
721
                        /* Did the peer negotiate video? */
722
                        if(strstr(msg->sdp, "m=video") != NULL) {
723
                                /* If so, reject it */
724
                                g_strlcat(sdp, "m=video 0 RTP/SAVPF 0\r\n", 1024);                                
725
                        }
726
                        /* How long will the gateway take to push the event? */
727
                        gint64 start = janus_get_monotonic_time();
728
                        int res = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, type, sdp);
729
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);
730
                        if(res != JANUS_OK) {
731
                                /* TODO Failed to negotiate? We should remove this participant */
732
                        }
733
                }
734
                g_free(event_text);
735
                janus_voicemail_message_free(msg);
736
                
737
                if(session->stopping) {
738
                        gateway->end_session(session->handle);
739
                }
740

    
741
                continue;
742
                
743
error:
744
                {
745
                        if(root != NULL)
746
                                json_decref(root);
747
                        /* Prepare JSON error event */
748
                        json_t *event = json_object();
749
                        json_object_set_new(event, "voicemail", json_string("event"));
750
                        json_object_set_new(event, "error_code", json_integer(error_code));
751
                        json_object_set_new(event, "error", json_string(error_cause));
752
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
753
                        json_decref(event);
754
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
755
                        int ret = gateway->push_event(msg->handle, &janus_voicemail_plugin, msg->transaction, event_text, NULL, NULL);
756
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
757
                        g_free(event_text);
758
                        janus_voicemail_message_free(msg);
759
                }
760
        }
761
        g_free(error_cause);
762
        JANUS_LOG(LOG_VERB, "Leaving VoiceMail handler thread\n");
763
        return NULL;
764
}
765

    
766

    
767
/* OGG/Opus helpers */
768
/* Write a little-endian 32 bit int to memory */
769
void le32(unsigned char *p, int v) {
770
        p[0] = v & 0xff;
771
        p[1] = (v >> 8) & 0xff;
772
        p[2] = (v >> 16) & 0xff;
773
        p[3] = (v >> 24) & 0xff;
774
}
775

    
776

    
777
/* Write a little-endian 16 bit int to memory */
778
void le16(unsigned char *p, int v) {
779
        p[0] = v & 0xff;
780
        p[1] = (v >> 8) & 0xff;
781
}
782

    
783
/* ;anufacture a generic OpusHead packet */
784
ogg_packet *op_opushead(void) {
785
        int size = 19;
786
        unsigned char *data = malloc(size);
787
        ogg_packet *op = malloc(sizeof(*op));
788

    
789
        if(!data) {
790
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
791
                return NULL;
792
        }
793
        if(!op) {
794
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
795
                return NULL;
796
        }
797

    
798
        memcpy(data, "OpusHead", 8);  /* identifier */
799
        data[8] = 1;                  /* version */
800
        data[9] = 2;                  /* channels */
801
        le16(data+10, 0);             /* pre-skip */
802
        le32(data + 12, 48000);       /* original sample rate */
803
        le16(data + 16, 0);           /* gain */
804
        data[18] = 0;                 /* channel mapping family */
805

    
806
        op->packet = data;
807
        op->bytes = size;
808
        op->b_o_s = 1;
809
        op->e_o_s = 0;
810
        op->granulepos = 0;
811
        op->packetno = 0;
812

    
813
        return op;
814
}
815

    
816
/* Manufacture a generic OpusTags packet */
817
ogg_packet *op_opustags(void) {
818
        const char *identifier = "OpusTags";
819
        const char *vendor = "Janus VoiceMail plugin";
820
        int size = strlen(identifier) + 4 + strlen(vendor) + 4;
821
        unsigned char *data = malloc(size);
822
        ogg_packet *op = malloc(sizeof(*op));
823

    
824
        if(!data) {
825
                JANUS_LOG(LOG_ERR, "Couldn't allocate data buffer...\n");
826
                return NULL;
827
        }
828
        if(!op) {
829
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet...\n");
830
                return NULL;
831
        }
832

    
833
        memcpy(data, identifier, 8);
834
        le32(data + 8, strlen(vendor));
835
        memcpy(data + 12, vendor, strlen(vendor));
836
        le32(data + 12 + strlen(vendor), 0);
837

    
838
        op->packet = data;
839
        op->bytes = size;
840
        op->b_o_s = 0;
841
        op->e_o_s = 0;
842
        op->granulepos = 0;
843
        op->packetno = 1;
844

    
845
        return op;
846
}
847

    
848
/* Allocate an ogg_packet */
849
ogg_packet *op_from_pkt(const unsigned char *pkt, int len) {
850
        ogg_packet *op = malloc(sizeof(*op));
851
        if(!op) {
852
                JANUS_LOG(LOG_ERR, "Couldn't allocate Ogg packet.\n");
853
                return NULL;
854
        }
855

    
856
        op->packet = (unsigned char *)pkt;
857
        op->bytes = len;
858
        op->b_o_s = 0;
859
        op->e_o_s = 0;
860

    
861
        return op;
862
}
863

    
864
/* Free a packet and its contents */
865
void op_free(ogg_packet *op) {
866
        if(op) {
867
                if(op->packet) {
868
                        free(op->packet);
869
                }
870
                free(op);
871
        }
872
}
873

    
874
/* Write out available ogg pages */
875
int ogg_write(janus_voicemail_session *session) {
876
        ogg_page page;
877
        size_t written;
878

    
879
        if(!session || !session->stream || !session->file) {
880
                return -1;
881
        }
882

    
883
        while (ogg_stream_pageout(session->stream, &page)) {
884
                written = fwrite(page.header, 1, page.header_len, session->file);
885
                if(written != (size_t)page.header_len) {
886
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
887
                        return -2;
888
                }
889
                written = fwrite(page.body, 1, page.body_len, session->file);
890
                if(written != (size_t)page.body_len) {
891
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
892
                        return -3;
893
                }
894
        }
895
        return 0;
896
}
897

    
898
/* Flush remaining ogg data */
899
int ogg_flush(janus_voicemail_session *session) {
900
        ogg_page page;
901
        size_t written;
902

    
903
        if(!session || !session->stream || !session->file) {
904
                return -1;
905
        }
906

    
907
        while (ogg_stream_flush(session->stream, &page)) {
908
                written = fwrite(page.header, 1, page.header_len, session->file);
909
                if(written != (size_t)page.header_len) {
910
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page header\n");
911
                        return -2;
912
                }
913
                written = fwrite(page.body, 1, page.body_len, session->file);
914
                if(written != (size_t)page.body_len) {
915
                        JANUS_LOG(LOG_ERR, "Error writing Ogg page body\n");
916
                        return -3;
917
                }
918
        }
919
        return 0;
920
}