Statistics
| Branch: | Revision:

janus-gateway / transports / janus_rabbitmq.c @ b9d3ca04

History | View | Annotate | Download (31 KB)

1
/*! \file   janus_rabbitmq.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus RabbitMQ transport plugin
5
 * \details  This is an implementation of a RabbitMQ transport for the
6
 * Janus API, using the rabbitmq-c library (https://github.com/alanxz/rabbitmq-c).
7
 * This means that this module adds support for RabbitMQ based messaging as
8
 * an alternative "transport" for API requests, responses and notifications.
9
 * This is only useful when you're wrapping Janus requests in your server
10
 * application, and handling the communication with clients your own way.
11
 * At the moment, only a single "application" can be handled at the same
12
 * time, meaning that Janus won't implement multiple queues to handle
13
 * multiple concurrent "application servers" taking advantage of its
14
 * features. Support for this is planned, though (e.g., through some kind
15
 * of negotiation to create queues on the fly). Right now, you can only
16
 * configure the address of the RabbitMQ server to use, and the queues to
17
 * make use of to receive (to-janus) and send (from-janus) messages
18
 * from/to an external application. As with WebSockets, considering that
19
 * requests wouldn't include a path to address some mandatory information,
20
 * these requests addressed to Janus should include as part of their payload,
21
 * when needed, additional pieces of information like \c session_id and
22
 * \c handle_id. That is, where you'd send a Janus request related to a
23
 * specific session to the \c /janus/<session> path, with RabbitMQ
24
 * you'd have to send the same request with an additional \c session_id
25
 * field in the JSON payload.
26
 * \note When you create a session using RabbitMQ, a subscription to the
27
 * events related to it is done automatically through the outgoing queue,
28
 * so no need for an explicit request as the GET in the plain HTTP API.
29
 *
30
 * \ingroup transports
31
 * \ref transports
32
 */
33

    
34
#include "transport.h"
35

    
36
#include <amqp.h>
37
#include <amqp_framing.h>
38
#include <amqp_tcp_socket.h>
39
#include <amqp_ssl_socket.h>
40

    
41
#include "../debug.h"
42
#include "../apierror.h"
43
#include "../config.h"
44
#include "../mutex.h"
45
#include "../utils.h"
46

    
47

    
48
/* Transport plugin information */
49
#define JANUS_RABBITMQ_VERSION                        1
50
#define JANUS_RABBITMQ_VERSION_STRING        "0.0.1"
51
#define JANUS_RABBITMQ_DESCRIPTION                "This transport plugin adds RabbitMQ support to the Janus API via rabbitmq-c."
52
#define JANUS_RABBITMQ_NAME                                "JANUS RabbitMQ transport plugin"
53
#define JANUS_RABBITMQ_AUTHOR                        "Meetecho s.r.l."
54
#define JANUS_RABBITMQ_PACKAGE                        "janus.transport.rabbitmq"
55

    
56
/* Transport methods */
57
janus_transport *create(void);
58
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path);
59
void janus_rabbitmq_destroy(void);
60
int janus_rabbitmq_get_api_compatibility(void);
61
int janus_rabbitmq_get_version(void);
62
const char *janus_rabbitmq_get_version_string(void);
63
const char *janus_rabbitmq_get_description(void);
64
const char *janus_rabbitmq_get_name(void);
65
const char *janus_rabbitmq_get_author(void);
66
const char *janus_rabbitmq_get_package(void);
67
gboolean janus_rabbitmq_is_janus_api_enabled(void);
68
gboolean janus_rabbitmq_is_admin_api_enabled(void);
69
int janus_rabbitmq_send_message(void *transport, void *request_id, gboolean admin, json_t *message);
70
void janus_rabbitmq_session_created(void *transport, guint64 session_id);
71
void janus_rabbitmq_session_over(void *transport, guint64 session_id, gboolean timeout);
72

    
73

    
74
/* Transport setup */
75
static janus_transport janus_rabbitmq_transport =
76
        JANUS_TRANSPORT_INIT (
77
                .init = janus_rabbitmq_init,
78
                .destroy = janus_rabbitmq_destroy,
79

    
80
                .get_api_compatibility = janus_rabbitmq_get_api_compatibility,
81
                .get_version = janus_rabbitmq_get_version,
82
                .get_version_string = janus_rabbitmq_get_version_string,
83
                .get_description = janus_rabbitmq_get_description,
84
                .get_name = janus_rabbitmq_get_name,
85
                .get_author = janus_rabbitmq_get_author,
86
                .get_package = janus_rabbitmq_get_package,
87

    
88
                .is_janus_api_enabled = janus_rabbitmq_is_janus_api_enabled,
89
                .is_admin_api_enabled = janus_rabbitmq_is_admin_api_enabled,
90

    
91
                .send_message = janus_rabbitmq_send_message,
92
                .session_created = janus_rabbitmq_session_created,
93
                .session_over = janus_rabbitmq_session_over,
94
        );
95

    
96
/* Transport creator */
97
janus_transport *create(void) {
98
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_RABBITMQ_NAME);
99
        return &janus_rabbitmq_transport;
100
}
101

    
102

    
103
/* Useful stuff */
104
static gint initialized = 0, stopping = 0;
105
static janus_transport_callbacks *gateway = NULL;
106
static gboolean rmq_janus_api_enabled = FALSE;
107
static gboolean rmq_admin_api_enabled = FALSE;
108
static gboolean notify_events = TRUE;
109

    
110
/* FIXME: Should it be configable? */
111
#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
112

    
113
/* JSON serialization options */
114
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
115

    
116

    
117
/* RabbitMQ client session: we only create a single one as of now */
118
typedef struct janus_rabbitmq_client {
119
        amqp_connection_state_t rmq_conn;                /* AMQP connection state */
120
        amqp_channel_t rmq_channel;                                /* AMQP channel */
121
        gboolean janus_api_enabled;                                /* Whether the Janus API via RabbitMQ is enabled */
122
        amqp_bytes_t janus_exchange;                        /* AMQP exchange for outgoing messages */
123
        amqp_bytes_t to_janus_queue;                        /* AMQP outgoing messages queue (Janus API) */
124
        amqp_bytes_t from_janus_queue;                        /* AMQP incoming messages queue (Janus API) */
125
        gboolean admin_api_enabled;                                /* Whether the Janus API via RabbitMQ is enabled */
126
        amqp_bytes_t to_janus_admin_queue;                /* AMQP outgoing messages queue (Admin API) */
127
        amqp_bytes_t from_janus_admin_queue;        /* AMQP incoming messages queue (Admin API) */
128
        GThread *in_thread, *out_thread;                /* Threads to handle incoming and outgoing queues */
129
        GAsyncQueue *messages;                                        /* Queue of outgoing messages to push */
130
        janus_mutex mutex;                                                /* Mutex to lock/unlock this session */
131
        gint session_timeout:1;                                        /* Whether a Janus session timeout occurred in the core */
132
        gint destroy:1;                                                        /* Flag to trigger a lazy session destruction */
133
} janus_rabbitmq_client;
134

    
135
/* RabbitMQ response */
136
typedef struct janus_rabbitmq_response {
137
        gboolean admin;                        /* Whether this is a Janus or Admin API response */
138
        gchar *correlation_id;        /* Correlation ID, if any */
139
        json_t *payload;                /* Payload to send to the client */
140
} janus_rabbitmq_response;
141
static janus_rabbitmq_response exit_message;
142

    
143
/* Threads */
144
void *janus_rmq_in_thread(void *data);
145
void *janus_rmq_out_thread(void *data);
146

    
147

    
148
/* We only handle a single client per time, as the queues are fixed */
149
static janus_rabbitmq_client *rmq_client = NULL;
150

    
151

    
152
/* Transport implementation */
153
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) {
154
        if(g_atomic_int_get(&stopping)) {
155
                /* Still stopping from before */
156
                return -1;
157
        }
158
        if(callback == NULL || config_path == NULL) {
159
                /* Invalid arguments */
160
                return -1;
161
        }
162

    
163
        /* This is the callback we'll need to invoke to contact the gateway */
164
        gateway = callback;
165

    
166
        /* Read configuration */
167
        char filename[255];
168
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_RABBITMQ_PACKAGE);
169
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
170
        janus_config *config = janus_config_parse(filename);
171
        if(config != NULL)
172
                janus_config_print(config);
173

    
174
        janus_config_item *item = janus_config_get_item_drilldown(config, "general", "json");
175
        if(item && item->value) {
176
                /* Check how we need to format/serialize the JSON output */
177
                if(!strcasecmp(item->value, "indented")) {
178
                        /* Default: indented, we use three spaces for that */
179
                        json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
180
                } else if(!strcasecmp(item->value, "plain")) {
181
                        /* Not indented and no new lines, but still readable */
182
                        json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
183
                } else if(!strcasecmp(item->value, "compact")) {
184
                        /* Compact, so no spaces between separators */
185
                        json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
186
                } else {
187
                        JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
188
                        json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
189
                }
190
        }
191

    
192
        /* Check if we need to send events to handlers */
193
        janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
194
        if(events != NULL && events->value != NULL)
195
                notify_events = janus_is_true(events->value);
196
        if(!notify_events && callback->events_is_enabled()) {
197
                JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_RABBITMQ_NAME);
198
        }
199

    
200
        /* Handle configuration, starting from the server details */
201
        char *rmqhost = NULL;
202
        item = janus_config_get_item_drilldown(config, "general", "host");
203
        if(item && item->value)
204
                rmqhost = g_strdup(item->value);
205
        else
206
                rmqhost = g_strdup("localhost");
207
        int rmqport = AMQP_PROTOCOL_PORT;
208
        item = janus_config_get_item_drilldown(config, "general", "port");
209
        if(item && item->value)
210
                rmqport = atoi(item->value);
211

    
212
        /* Credentials and Virtual Host */
213
        const char *vhost = NULL, *username = NULL, *password = NULL;
214
        item = janus_config_get_item_drilldown(config, "general", "vhost");
215
        if(item && item->value)
216
                vhost = g_strdup(item->value);
217
        else
218
        vhost = g_strdup("/");
219
        item = janus_config_get_item_drilldown(config, "general", "username");
220
        if(item && item->value)
221
                username = g_strdup(item->value);
222
        else
223
                username = g_strdup("guest");
224
        item = janus_config_get_item_drilldown(config, "general", "password");
225
        if(item && item->value)
226
                password = g_strdup(item->value);
227
        else
228
                password = g_strdup("guest");
229

    
230
        /* SSL config*/
231
        const char *ssl_cacert_file = NULL;
232
        const char *ssl_cert_file = NULL;
233
        const char *ssl_key_file = NULL;
234
        gboolean ssl_enable = FALSE;
235
        gboolean ssl_verify_peer = FALSE;
236
        gboolean ssl_verify_hostname = FALSE;
237
        item = janus_config_get_item_drilldown(config, "general", "ssl_enable");
238
        if(!item || !item->value || !janus_is_true(item->value)) {
239
                JANUS_LOG(LOG_INFO, "RabbitMQ SSL support disabled\n");
240
        } else {
241
                ssl_enable = TRUE;
242
                item = janus_config_get_item_drilldown(config, "general", "ssl_cacert");
243
                if(item && item->value)
244
                        ssl_cacert_file = g_strdup(item->value);
245
                item = janus_config_get_item_drilldown(config, "general", "ssl_cert");
246
                if(item && item->value)
247
                        ssl_cert_file = g_strdup(item->value);
248
                item = janus_config_get_item_drilldown(config, "general", "ssl_key");
249
                if(item && item->value)
250
                        ssl_key_file = g_strdup(item->value);
251
                item = janus_config_get_item_drilldown(config, "general", "ssl_verify_peer");
252
                if(item && item->value && janus_is_true(item->value))
253
                        ssl_verify_peer = TRUE;
254
                item = janus_config_get_item_drilldown(config, "general", "ssl_verify_hostname");
255
                if(item && item->value && janus_is_true(item->value))
256
                        ssl_verify_hostname = TRUE;
257
        }
258

    
259
        /* Now check if the Janus API must be supported */
260
        const char *to_janus = NULL, *from_janus = NULL;
261
        const char *to_janus_admin = NULL, *from_janus_admin = NULL;
262
        const char *janus_exchange = NULL;
263
        item = janus_config_get_item_drilldown(config, "general", "enable");
264
        if(!item || !item->value || !janus_is_true(item->value)) {
265
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
266
        } else {
267
                /* Parse configuration */
268
                item = janus_config_get_item_drilldown(config, "general", "to_janus");
269
                if(!item || !item->value) {
270
                        JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
271
                        goto error;
272
                }
273
                to_janus = g_strdup(item->value);
274
                item = janus_config_get_item_drilldown(config, "general", "from_janus");
275
                if(!item || !item->value) {
276
                        JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
277
                        goto error;
278
                }
279
                from_janus = g_strdup(item->value);
280
                item = janus_config_get_item_drilldown(config, "general", "janus_exchange");
281
                if(!item || !item->value) {
282
                        JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
283
                } else {
284
                        janus_exchange = g_strdup(item->value);
285
                }
286
                if (janus_exchange == NULL) {
287
                        JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
288
                } else {
289
                        JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s)\n", rmqhost, rmqport, to_janus, from_janus, janus_exchange);
290
                }
291
                rmq_janus_api_enabled = TRUE;
292
        }
293
        /* Do the same for the admin API */
294
        item = janus_config_get_item_drilldown(config, "admin", "admin_enable");
295
        if(!item || !item->value || !janus_is_true(item->value)) {
296
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Admin API)\n");
297
        } else {
298
                /* Parse configuration */
299
                item = janus_config_get_item_drilldown(config, "admin", "to_janus_admin");
300
                if(!item || !item->value) {
301
                        JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
302
                        goto error;
303
                }
304
                to_janus_admin = g_strdup(item->value);
305
                item = janus_config_get_item_drilldown(config, "admin", "from_janus_admin");
306
                if(!item || !item->value) {
307
                        JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
308
                        goto error;
309
                }
310
                from_janus_admin = g_strdup(item->value);
311
                JANUS_LOG(LOG_INFO, "RabbitMQ support for Admin API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus_admin, from_janus_admin);
312
                rmq_admin_api_enabled = TRUE;
313
        }
314
        if(!rmq_janus_api_enabled && !rmq_admin_api_enabled) {
315
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled for both Janus and Admin API, giving up\n");
316
                goto error;
317
        } else {
318
                /* FIXME We currently support a single application, create a new janus_rabbitmq_client instance */
319
                rmq_client = g_malloc0(sizeof(janus_rabbitmq_client));
320
                if(rmq_client == NULL) {
321
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
322
                        goto error;
323
                }
324
                /* Connect */
325
                rmq_client->rmq_conn = amqp_new_connection();
326
                amqp_socket_t *socket = NULL;
327
                int status;
328
                JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
329
                if (ssl_enable) {
330
                        socket = amqp_ssl_socket_new(rmq_client->rmq_conn);
331
                        if(socket == NULL) {
332
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
333
                                goto error;
334
                        }
335
                        if(ssl_verify_peer) {
336
                                amqp_ssl_socket_set_verify_peer(socket, 1);
337
                        } else {
338
                                amqp_ssl_socket_set_verify_peer(socket, 0);
339
                        }
340
                        if(ssl_verify_hostname) {
341
                                amqp_ssl_socket_set_verify_hostname(socket, 1);
342
                        } else {
343
                                amqp_ssl_socket_set_verify_hostname(socket, 0);
344
                        }
345
                        if(ssl_cacert_file) {
346
                                status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
347
                                if(status != AMQP_STATUS_OK) {
348
                                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
349
                                        goto error;
350
                                }
351
                        }
352
                        if(ssl_cert_file && ssl_key_file) {
353
                                amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
354
                                if(status != AMQP_STATUS_OK) {
355
                                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
356
                                        goto error;
357
                                }
358
                        }
359
                } else {
360
                        socket = amqp_tcp_socket_new(rmq_client->rmq_conn);
361
                        if(socket == NULL) {
362
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
363
                                goto error;
364
                        }
365
                }
366
                JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
367
                status = amqp_socket_open(socket, rmqhost, rmqport);
368
                if(status != AMQP_STATUS_OK) {
369
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
370
                        goto error;
371
                }
372
                JANUS_LOG(LOG_VERB, "Logging in...\n");
373
                amqp_rpc_reply_t result = amqp_login(rmq_client->rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
374
                if(result.reply_type != AMQP_RESPONSE_NORMAL) {
375
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
376
                        goto error;
377
                }
378
                rmq_client->rmq_channel = 1;
379
                JANUS_LOG(LOG_VERB, "Opening channel...\n");
380
                amqp_channel_open(rmq_client->rmq_conn, rmq_client->rmq_channel);
381
                result = amqp_get_rpc_reply(rmq_client->rmq_conn);
382
                if(result.reply_type != AMQP_RESPONSE_NORMAL) {
383
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
384
                        goto error;
385
                }
386
                rmq_client->janus_exchange = amqp_empty_bytes;
387
                if(janus_exchange != NULL) {
388
                        JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
389
                        rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange);
390
                        amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
391
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
392
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
393
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
394
                                goto error;
395
                        }
396
                }
397
                rmq_client->janus_api_enabled = FALSE;
398
                if(rmq_janus_api_enabled) {
399
                        rmq_client->janus_api_enabled = TRUE;
400
                        JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus);
401
                        rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
402
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
403
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
404
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
405
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
406
                                goto error;
407
                        }
408
                        JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus);
409
                        rmq_client->from_janus_queue = amqp_cstring_bytes(from_janus);
410
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_queue, 0, 0, 0, 0, amqp_empty_table);
411
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
412
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
413
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
414
                                goto error;
415
                        }
416
                        amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
417
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
418
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
419
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
420
                                goto error;
421
                        }
422
                }
423
                rmq_client->admin_api_enabled = FALSE;
424
                if(rmq_admin_api_enabled) {
425
                        rmq_client->admin_api_enabled = TRUE;
426
                        JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin);
427
                        rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
428
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
429
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
430
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
431
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
432
                                goto error;
433
                        }
434
                        JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin);
435
                        rmq_client->from_janus_admin_queue = amqp_cstring_bytes(from_janus_admin);
436
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
437
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
438
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
439
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
440
                                goto error;
441
                        }
442
                        amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
443
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
444
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
445
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
446
                                goto error;
447
                        }
448
                }
449
                rmq_client->messages = g_async_queue_new();
450
                rmq_client->destroy = 0;
451
                GError *error = NULL;
452
                rmq_client->in_thread = g_thread_try_new("rmq_in_thread", &janus_rmq_in_thread, rmq_client, &error);
453
                if(error != NULL) {
454
                        /* Something went wrong... */
455
                        JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ incoming thread...\n", error->code, error->message ? error->message : "??");
456
                        g_free(rmq_client);
457
                        janus_config_destroy(config);
458
                        return -1;
459
                }
460
                rmq_client->out_thread = g_thread_try_new("rmq_out_thread", &janus_rmq_out_thread, rmq_client, &error);
461
                if(error != NULL) {
462
                        /* Something went wrong... */
463
                        JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ outgoing thread...\n", error->code, error->message ? error->message : "??");
464
                        g_free(rmq_client);
465
                        janus_config_destroy(config);
466
                        return -1;
467
                }
468
                janus_mutex_init(&rmq_client->mutex);
469
                /* Done */
470
                JANUS_LOG(LOG_INFO, "Setup of RabbitMQ integration completed\n");
471
                /* Notify handlers about this new transport */
472
                if(notify_events && gateway->events_is_enabled()) {
473
                        json_t *info = json_object();
474
                        json_object_set_new(info, "event", json_string("connected"));
475
                        gateway->notify_event(&janus_rabbitmq_transport, rmq_client, info);
476
                }
477
        }
478
        g_free(rmqhost);
479
        janus_config_destroy(config);
480
        config = NULL;
481

    
482
        /* Done */
483
        g_atomic_int_set(&initialized, 1);
484
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQ_NAME);
485
        return 0;
486

    
487
error:
488
        /* If we got here, something went wrong */
489
        if(rmq_client)
490
                g_free(rmq_client);
491
        if(rmqhost)
492
                g_free(rmqhost);
493
        if(vhost)
494
                g_free((char *)vhost);
495
        if(username)
496
                g_free((char *)username);
497
        if(password)
498
                g_free((char *)password);
499
        if(janus_exchange)
500
                g_free((char *)janus_exchange);
501
        if(to_janus)
502
                g_free((char *)to_janus);
503
        if(from_janus)
504
                g_free((char *)from_janus);
505
        if(to_janus_admin)
506
                g_free((char *)to_janus_admin);
507
        if(from_janus_admin)
508
                g_free((char *)from_janus_admin);
509
        if(ssl_cacert_file)
510
                g_free((char *)ssl_cacert_file);
511
        if(ssl_cert_file)
512
                g_free((char *)ssl_cert_file);
513
        if(ssl_key_file)
514
                g_free((char *)ssl_key_file);
515
        if(config)
516
                janus_config_destroy(config);
517
        return -1;
518
}
519

    
520
void janus_rabbitmq_destroy(void) {
521
        if(!g_atomic_int_get(&initialized))
522
                return;
523
        g_atomic_int_set(&stopping, 1);
524

    
525
        if(rmq_client) {
526
                rmq_client->destroy = 1;
527
                g_async_queue_push(rmq_client->messages, &exit_message);
528
                if(rmq_client->in_thread)
529
                        g_thread_join(rmq_client->in_thread);
530
                if(rmq_client->out_thread)
531
                        g_thread_join(rmq_client->out_thread);
532
                if(rmq_client->rmq_conn && rmq_client->rmq_channel) {
533
                        amqp_channel_close(rmq_client->rmq_conn, rmq_client->rmq_channel, AMQP_REPLY_SUCCESS);
534
                        amqp_connection_close(rmq_client->rmq_conn, AMQP_REPLY_SUCCESS);
535
                        amqp_destroy_connection(rmq_client->rmq_conn);
536
                }
537
                if(rmq_client->to_janus_queue.bytes)
538
                        g_free((char *)rmq_client->to_janus_queue.bytes);
539
                if(rmq_client->from_janus_queue.bytes)
540
                        g_free((char *)rmq_client->from_janus_queue.bytes);
541
                if(rmq_client->to_janus_admin_queue.bytes)
542
                        g_free((char *)rmq_client->to_janus_admin_queue.bytes);
543
                if(rmq_client->from_janus_admin_queue.bytes)
544
                        g_free((char *)rmq_client->from_janus_admin_queue.bytes);
545
                if(rmq_client->janus_exchange.bytes)
546
                        g_free((char *)rmq_client->janus_exchange.bytes);
547
        }
548
        g_free(rmq_client);
549

    
550
        g_atomic_int_set(&initialized, 0);
551
        g_atomic_int_set(&stopping, 0);
552
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_RABBITMQ_NAME);
553
}
554

    
555
int janus_rabbitmq_get_api_compatibility(void) {
556
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
557
        return JANUS_TRANSPORT_API_VERSION;
558
}
559

    
560
int janus_rabbitmq_get_version(void) {
561
        return JANUS_RABBITMQ_VERSION;
562
}
563

    
564
const char *janus_rabbitmq_get_version_string(void) {
565
        return JANUS_RABBITMQ_VERSION_STRING;
566
}
567

    
568
const char *janus_rabbitmq_get_description(void) {
569
        return JANUS_RABBITMQ_DESCRIPTION;
570
}
571

    
572
const char *janus_rabbitmq_get_name(void) {
573
        return JANUS_RABBITMQ_NAME;
574
}
575

    
576
const char *janus_rabbitmq_get_author(void) {
577
        return JANUS_RABBITMQ_AUTHOR;
578
}
579

    
580
const char *janus_rabbitmq_get_package(void) {
581
        return JANUS_RABBITMQ_PACKAGE;
582
}
583

    
584
gboolean janus_rabbitmq_is_janus_api_enabled(void) {
585
        return rmq_janus_api_enabled;
586
}
587

    
588
gboolean janus_rabbitmq_is_admin_api_enabled(void) {
589
        return rmq_admin_api_enabled;
590
}
591

    
592
int janus_rabbitmq_send_message(void *transport, void *request_id, gboolean admin, json_t *message) {
593
        if(rmq_client == NULL)
594
                return -1;
595
        if(message == NULL)
596
                return -1;
597
        if(transport == NULL) {
598
                json_decref(message);
599
                return -1;
600
        }
601
        JANUS_LOG(LOG_HUGE, "Sending %s API %s via RabbitMQ\n", admin ? "admin" : "Janus", request_id ? "response" : "event");
602
        /* FIXME Add to the queue of outgoing messages */
603
        janus_rabbitmq_response *response = (janus_rabbitmq_response *)g_malloc0(sizeof(janus_rabbitmq_response));
604
        response->admin = admin;
605
        response->payload = message;
606
        response->correlation_id = (char *)request_id;
607
        g_async_queue_push(rmq_client->messages, response);
608
        return 0;
609
}
610

    
611
void janus_rabbitmq_session_created(void *transport, guint64 session_id) {
612
        /* We don't care */
613
}
614

    
615
void janus_rabbitmq_session_over(void *transport, guint64 session_id, gboolean timeout) {
616
        /* We don't care, not even if it's a timeout (should we?), our client is always up */
617
}
618

    
619

    
620
/* Threads */
621
void *janus_rmq_in_thread(void *data) {
622
        if(rmq_client == NULL) {
623
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
624
                return NULL;
625
        }
626
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ in thread\n");
627

    
628
        struct timeval timeout;
629
        timeout.tv_sec = 0;
630
        timeout.tv_usec = 20000;
631
        amqp_frame_t frame;
632
        while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
633
                amqp_maybe_release_buffers(rmq_client->rmq_conn);
634
                /* Wait for a frame */
635
                int res = amqp_simple_wait_frame_noblock(rmq_client->rmq_conn, &frame, &timeout);
636
                if(res != AMQP_STATUS_OK) {
637
                        /* No data */
638
                        if(res == AMQP_STATUS_TIMEOUT)
639
                                continue;
640
                        JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));
641
                        break;
642
                }
643
                /* We expect method first */
644
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
645
                if(frame.frame_type != AMQP_FRAME_METHOD)
646
                        continue;
647
                JANUS_LOG(LOG_VERB, "Method %s\n", amqp_method_name(frame.payload.method.id));
648
                gboolean admin = FALSE;
649
                if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD) {
650
                        amqp_basic_deliver_t *d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
651
                        JANUS_LOG(LOG_VERB, "Delivery #%u, %.*s\n", (unsigned) d->delivery_tag, (int) d->routing_key.len, (char *) d->routing_key.bytes);
652
                        /* Check if this is a Janus or Admin API request */
653
                        if(rmq_client->admin_api_enabled) {
654
                                if(d->routing_key.len == rmq_client->to_janus_admin_queue.len) {
655
                                        size_t i=0;
656
                                        admin = TRUE;
657
                                        char *inq = (char *)d->routing_key.bytes;
658
                                        char *expq = (char *)rmq_client->to_janus_admin_queue.bytes;
659
                                        for(i=0; i< d->routing_key.len; i++) {
660
                                                if(inq[i] != expq[i]) {
661
                                                        admin = FALSE;
662
                                                        break;
663
                                                }
664
                                        }
665
                                }
666
                        }
667
                        JANUS_LOG(LOG_VERB, "  -- This is %s API request\n", admin ? "an admin" : "a Janus");
668
                }
669
                /* Then the header */
670
                amqp_simple_wait_frame(rmq_client->rmq_conn, &frame);
671
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
672
                if(frame.frame_type != AMQP_FRAME_HEADER)
673
                        continue;
674
                amqp_basic_properties_t *p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
675
                if(p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
676
                        JANUS_LOG(LOG_VERB, "  -- Reply-to: %.*s\n", (int) p->reply_to.len, (char *) p->reply_to.bytes);
677
                }
678
                char *correlation = NULL;
679
                if(p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
680
                        correlation = (char *)g_malloc0(p->correlation_id.len+1);
681
                        sprintf(correlation, "%.*s", (int) p->correlation_id.len, (char *) p->correlation_id.bytes);
682
                        JANUS_LOG(LOG_VERB, "  -- Correlation-id: %s\n", correlation);
683
                }
684
                if(p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
685
                        JANUS_LOG(LOG_VERB, "  -- Content-type: %.*s\n", (int) p->content_type.len, (char *) p->content_type.bytes);
686
                }
687
                /* And the body */
688
                uint64_t total = frame.payload.properties.body_size, received = 0;
689
                char *payload = (char *)g_malloc0(total+1), *index = payload;
690
                while(received < total) {
691
                        amqp_simple_wait_frame(rmq_client->rmq_conn, &frame);
692
                        JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
693
                        if(frame.frame_type != AMQP_FRAME_BODY)
694
                                break;
695
                        sprintf(index, "%.*s", (int) frame.payload.body_fragment.len, (char *) frame.payload.body_fragment.bytes);
696
                        received += frame.payload.body_fragment.len;
697
                        index = payload+received;
698
                }
699
                JANUS_LOG(LOG_VERB, "Got %"SCNu64"/%"SCNu64" bytes from the %s queue (%"SCNu64")\n",
700
                        received, total, admin ? "admin API" : "Janus API", frame.payload.body_fragment.len);
701
                JANUS_LOG(LOG_VERB, "%s\n", payload);
702
                /* Parse the JSON payload */
703
                json_error_t error;
704
                json_t *root = json_loads(payload, 0, &error);
705
                g_free(payload);
706
                /* Notify the core, passing both the object and, since it may be needed, the error
707
                 * We also specify the correlation ID as an opaque request identifier: we'll need it later */
708
                gateway->incoming_request(&janus_rabbitmq_transport, rmq_client, correlation, admin, root, &error);
709
        }
710
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ in thread\n");
711
        return NULL;
712
}
713

    
714
void *janus_rmq_out_thread(void *data) {
715
        if(rmq_client == NULL) {
716
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
717
                return NULL;
718
        }
719
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ out thread\n");
720
        while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
721
                /* We send messages from here as well, not only notifications */
722
                janus_rabbitmq_response *response = g_async_queue_pop(rmq_client->messages);
723
                if(response == NULL)
724
                        continue;
725
                if(response == &exit_message)
726
                        break;
727
                if(!rmq_client->destroy && !g_atomic_int_get(&stopping) && response->payload) {
728
                        janus_mutex_lock(&rmq_client->mutex);
729
                        /* Gotcha! Convert json_t to string */
730
                        char *payload_text = json_dumps(response->payload, json_format);
731
                        json_decref(response->payload);
732
                        response->payload = NULL;
733
                        JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes)...\n", response->admin ? "Admin" : "Janus", strlen(payload_text));
734
                        JANUS_LOG(LOG_VERB, "%s\n", payload_text);
735
                        amqp_basic_properties_t props;
736
                        props._flags = 0;
737
                        props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
738
                        props.reply_to = amqp_cstring_bytes("Janus");
739
                        if(response->correlation_id) {
740
                                props._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
741
                                props.correlation_id = amqp_cstring_bytes(response->correlation_id);
742
                        }
743
                        props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
744
                        props.content_type = amqp_cstring_bytes("application/json");
745
                        amqp_bytes_t message = amqp_cstring_bytes(payload_text);
746
                        int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange,
747
                                response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
748
                                0, 0, &props, message);
749
                        if(status != AMQP_STATUS_OK) {
750
                                JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
751
                        }
752
                        g_free(response->correlation_id);
753
                        response->correlation_id = NULL;
754
                        free(payload_text);
755
                        payload_text = NULL;
756
                        g_free(response);
757
                        response = NULL;
758
                        janus_mutex_unlock(&rmq_client->mutex);
759
                }
760
        }
761
        g_async_queue_unref(rmq_client->messages);
762
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ out thread\n");
763
        return NULL;
764
}