Statistics
| Branch: | Revision:

janus-gateway / transports / janus_rabbitmq.c @ 88b5da7b

History | View | Annotate | Download (26.4 KB)

1
/*! \file   janus_rabbitmq.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus RabbitMQ transport plugin
5
 * \details  This is an implementation of a RabbitMQ transport for the
6
 * Janus API, using the rabbitmq-c library (https://github.com/alanxz/rabbitmq-c).
7
 * This means that this module adds support for RabbitMQ based messaging as
8
 * an alternative "transport" for API requests, responses and notifications.
9
 * This is only useful when you're wrapping Janus requests in your server
10
 * application, and handling the communication with clients your own way.
11
 * At the moment, only a single "application" can be handled at the same
12
 * time, meaning that Janus won't implement multiple queues to handle
13
 * multiple concurrent "application servers" taking advantage of its
14
 * features. Support for this is planned, though (e.g., through some kind
15
 * of negotiation to create queues on the fly). Right now, you can only
16
 * configure the address of the RabbitMQ server to use, and the queues to
17
 * make use of to receive (to-janus) and send (from-janus) messages
18
 * from/to an external application. As with WebSockets, considering that
19
 * requests wouldn't include a path to address some mandatory information,
20
 * these requests addressed to Janus should include as part of their payload,
21
 * when needed, additional pieces of information like \c session_id and
22
 * \c handle_id. That is, where you'd send a Janus request related to a
23
 * specific session to the \c /janus/<session> path, with RabbitMQ
24
 * you'd have to send the same request with an additional \c session_id
25
 * field in the JSON payload.
26
 * \note When you create a session using RabbitMQ, a subscription to the
27
 * events related to it is done automatically through the outgoing queue,
28
 * so no need for an explicit request as the GET in the plain HTTP API.
29
 *
30
 * \ingroup transports
31
 * \ref transports
32
 */
33

    
34
#include "transport.h"
35

    
36
#include <amqp.h>
37
#include <amqp_framing.h>
38
#include <amqp_tcp_socket.h>
39

    
40
#include "../debug.h"
41
#include "../apierror.h"
42
#include "../config.h"
43
#include "../mutex.h"
44
#include "../utils.h"
45

    
46

    
47
/* Transport plugin information */
48
#define JANUS_RABBITMQ_VERSION                        1
49
#define JANUS_RABBITMQ_VERSION_STRING        "0.0.1"
50
#define JANUS_RABBITMQ_DESCRIPTION                "This transport plugin adds RabbitMQ support to the Janus API via rabbitmq-c."
51
#define JANUS_RABBITMQ_NAME                                "JANUS RabbitMQ transport plugin"
52
#define JANUS_RABBITMQ_AUTHOR                        "Meetecho s.r.l."
53
#define JANUS_RABBITMQ_PACKAGE                        "janus.transport.rabbitmq"
54

    
55
/* Transport methods */
56
janus_transport *create(void);
57
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path);
58
void janus_rabbitmq_destroy(void);
59
int janus_rabbitmq_get_api_compatibility(void);
60
int janus_rabbitmq_get_version(void);
61
const char *janus_rabbitmq_get_version_string(void);
62
const char *janus_rabbitmq_get_description(void);
63
const char *janus_rabbitmq_get_name(void);
64
const char *janus_rabbitmq_get_author(void);
65
const char *janus_rabbitmq_get_package(void);
66
gboolean janus_rabbitmq_is_janus_api_enabled(void);
67
gboolean janus_rabbitmq_is_admin_api_enabled(void);
68
int janus_rabbitmq_send_message(void *transport, void *request_id, gboolean admin, json_t *message);
69
void janus_rabbitmq_session_created(void *transport, guint64 session_id);
70
void janus_rabbitmq_session_over(void *transport, guint64 session_id, gboolean timeout);
71

    
72

    
73
/* Transport setup */
74
static janus_transport janus_rabbitmq_transport =
75
        JANUS_TRANSPORT_INIT (
76
                .init = janus_rabbitmq_init,
77
                .destroy = janus_rabbitmq_destroy,
78

    
79
                .get_api_compatibility = janus_rabbitmq_get_api_compatibility,
80
                .get_version = janus_rabbitmq_get_version,
81
                .get_version_string = janus_rabbitmq_get_version_string,
82
                .get_description = janus_rabbitmq_get_description,
83
                .get_name = janus_rabbitmq_get_name,
84
                .get_author = janus_rabbitmq_get_author,
85
                .get_package = janus_rabbitmq_get_package,
86

    
87
                .is_janus_api_enabled = janus_rabbitmq_is_janus_api_enabled,
88
                .is_admin_api_enabled = janus_rabbitmq_is_admin_api_enabled,
89

    
90
                .send_message = janus_rabbitmq_send_message,
91
                .session_created = janus_rabbitmq_session_created,
92
                .session_over = janus_rabbitmq_session_over,
93
        );
94

    
95
/* Transport creator */
96
janus_transport *create(void) {
97
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_RABBITMQ_NAME);
98
        return &janus_rabbitmq_transport;
99
}
100

    
101

    
102
/* Useful stuff */
103
static gint initialized = 0, stopping = 0;
104
static janus_transport_callbacks *gateway = NULL;
105
static gboolean rmq_janus_api_enabled = FALSE;
106
static gboolean rmq_admin_api_enabled = FALSE;
107

    
108
/* JSON serialization options */
109
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
110

    
111

    
112
/* RabbitMQ client session: we only create a single one as of now */
113
typedef struct janus_rabbitmq_client {
114
        amqp_connection_state_t rmq_conn;                /* AMQP connection state */
115
        amqp_channel_t rmq_channel;                                /* AMQP channel */
116
        gboolean janus_api_enabled;                                /* Whether the Janus API via RabbitMQ is enabled */
117
        amqp_bytes_t to_janus_queue;                        /* AMQP outgoing messages queue (Janus API) */
118
        amqp_bytes_t from_janus_queue;                        /* AMQP incoming messages queue (Janus API) */
119
        gboolean admin_api_enabled;                                /* Whether the Janus API via RabbitMQ is enabled */
120
        amqp_bytes_t to_janus_admin_queue;                /* AMQP outgoing messages queue (Admin API) */
121
        amqp_bytes_t from_janus_admin_queue;        /* AMQP incoming messages queue (Admin API) */
122
        GThread *in_thread, *out_thread;                /* Threads to handle incoming and outgoing queues */
123
        GAsyncQueue *messages;                                        /* Queue of outgoing messages to push */
124
        janus_mutex mutex;                                                /* Mutex to lock/unlock this session */
125
        gint session_timeout:1;                                        /* Whether a Janus session timeout occurred in the core */
126
        gint destroy:1;                                                        /* Flag to trigger a lazy session destruction */
127
} janus_rabbitmq_client;
128

    
129
/* RabbitMQ response */
130
typedef struct janus_rabbitmq_response {
131
        gboolean admin;                        /* Whether this is a Janus or Admin API response */
132
        gchar *correlation_id;        /* Correlation ID, if any */
133
        json_t *payload;                /* Payload to send to the client */
134
} janus_rabbitmq_response;
135
static janus_rabbitmq_response exit_message;
136

    
137
/* Threads */
138
void *janus_rmq_in_thread(void *data);
139
void *janus_rmq_out_thread(void *data);
140

    
141

    
142
/* We only handle a single client per time, as the queues are fixed */
143
static janus_rabbitmq_client *rmq_client = NULL;
144

    
145

    
146
/* Transport implementation */
147
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) {
148
        if(g_atomic_int_get(&stopping)) {
149
                /* Still stopping from before */
150
                return -1;
151
        }
152
        if(callback == NULL || config_path == NULL) {
153
                /* Invalid arguments */
154
                return -1;
155
        }
156

    
157
        /* This is the callback we'll need to invoke to contact the gateway */
158
        gateway = callback;
159

    
160
        /* Read configuration */
161
        char filename[255];
162
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_RABBITMQ_PACKAGE);
163
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
164
        janus_config *config = janus_config_parse(filename);
165
        if(config != NULL)
166
                janus_config_print(config);
167

    
168
        janus_config_item *item = janus_config_get_item_drilldown(config, "general", "json");
169
        if(item && item->value) {
170
                /* Check how we need to format/serialize the JSON output */
171
                if(!strcasecmp(item->value, "indented")) {
172
                        /* Default: indented, we use three spaces for that */
173
                        json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
174
                } else if(!strcasecmp(item->value, "plain")) {
175
                        /* Not indented and no new lines, but still readable */
176
                        json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
177
                } else if(!strcasecmp(item->value, "compact")) {
178
                        /* Compact, so no spaces between separators */
179
                        json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
180
                } else {
181
                        JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
182
                        json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
183
                }
184
        }
185

    
186
        /* Handle configuration, starting from the server details */
187
        char *rmqhost = NULL;
188
        item = janus_config_get_item_drilldown(config, "general", "host");
189
        if(item && item->value)
190
                rmqhost = g_strdup(item->value);
191
        else
192
                rmqhost = g_strdup("localhost");
193
        int rmqport = AMQP_PROTOCOL_PORT;
194
        item = janus_config_get_item_drilldown(config, "general", "port");
195
        if(item && item->value)
196
                rmqport = atoi(item->value);
197

    
198
        /* Credentials and Virtual Host */
199
        const char *vhost = NULL, *username = NULL, *password = NULL;
200
        item = janus_config_get_item_drilldown(config, "general", "vhost");
201
        if(item && item->value)
202
                vhost = g_strdup(item->value);
203
        else
204
        vhost = g_strdup("/");
205
        item = janus_config_get_item_drilldown(config, "general", "username");
206
        if(item && item->value)
207
                username = g_strdup(item->value);
208
        else
209
                username = g_strdup("guest");
210
        item = janus_config_get_item_drilldown(config, "general", "password");
211
        if(item && item->value)
212
                password = g_strdup(item->value);
213
        else
214
                password = g_strdup("guest");
215

    
216
        /* Now check if the Janus API must be supported */
217
        const char *to_janus = NULL, *from_janus = NULL;
218
        const char *to_janus_admin = NULL, *from_janus_admin = NULL;
219
        item = janus_config_get_item_drilldown(config, "general", "enable");
220
        if(!item || !item->value || !janus_is_true(item->value)) {
221
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
222
        } else {
223
                /* Parse configuration */
224
                item = janus_config_get_item_drilldown(config, "general", "to_janus");
225
                if(!item || !item->value) {
226
                        JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
227
                        goto error;
228
                }
229
                to_janus = g_strdup(item->value);
230
                item = janus_config_get_item_drilldown(config, "general", "from_janus");
231
                if(!item || !item->value) {
232
                        JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
233
                        goto error;
234
                }
235
                from_janus = g_strdup(item->value);
236
                JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
237
                rmq_janus_api_enabled = TRUE;
238
        }
239
        /* Do the same for the admin API */
240
        item = janus_config_get_item_drilldown(config, "admin", "admin_enable");
241
        if(!item || !item->value || !janus_is_true(item->value)) {
242
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Admin API)\n");
243
        } else {
244
                /* Parse configuration */
245
                item = janus_config_get_item_drilldown(config, "admin", "to_janus_admin");
246
                if(!item || !item->value) {
247
                        JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
248
                        goto error;
249
                }
250
                to_janus_admin = g_strdup(item->value);
251
                item = janus_config_get_item_drilldown(config, "admin", "from_janus_admin");
252
                if(!item || !item->value) {
253
                        JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
254
                        goto error;
255
                }
256
                from_janus_admin = g_strdup(item->value);
257
                JANUS_LOG(LOG_INFO, "RabbitMQ support for Admin API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus_admin, from_janus_admin);
258
                rmq_admin_api_enabled = TRUE;
259
        }
260
        if(!rmq_janus_api_enabled && !rmq_admin_api_enabled) {
261
                JANUS_LOG(LOG_WARN, "RabbitMQ support disabled for both Janus and Admin API, giving up\n");
262
                goto error;
263
        } else {
264
                /* FIXME We currently support a single application, create a new janus_rabbitmq_client instance */
265
                rmq_client = g_malloc0(sizeof(janus_rabbitmq_client));
266
                if(rmq_client == NULL) {
267
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
268
                        goto error;
269
                }
270
                /* Connect */
271
                rmq_client->rmq_conn = amqp_new_connection();
272
                JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
273
                amqp_socket_t *socket = amqp_tcp_socket_new(rmq_client->rmq_conn);
274
                if(socket == NULL) {
275
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
276
                        goto error;
277
                }
278
                JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
279
                int status = amqp_socket_open(socket, rmqhost, rmqport);
280
                if(status != AMQP_STATUS_OK) {
281
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
282
                        goto error;
283
                }
284
                JANUS_LOG(LOG_VERB, "Logging in...\n");
285
                amqp_rpc_reply_t result = amqp_login(rmq_client->rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
286
                if(result.reply_type != AMQP_RESPONSE_NORMAL) {
287
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
288
                        goto error;
289
                }
290
                rmq_client->rmq_channel = 1;
291
                JANUS_LOG(LOG_VERB, "Opening channel...\n");
292
                amqp_channel_open(rmq_client->rmq_conn, rmq_client->rmq_channel);
293
                result = amqp_get_rpc_reply(rmq_client->rmq_conn);
294
                if(result.reply_type != AMQP_RESPONSE_NORMAL) {
295
                        JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
296
                        goto error;
297
                }
298
                rmq_client->janus_api_enabled = FALSE;
299
                if(rmq_janus_api_enabled) {
300
                        rmq_client->janus_api_enabled = TRUE;
301
                        JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus);
302
                        rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
303
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
304
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
305
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
306
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
307
                                goto error;
308
                        }
309
                        JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus);
310
                        rmq_client->from_janus_queue = amqp_cstring_bytes(from_janus);
311
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_queue, 0, 0, 0, 0, amqp_empty_table);
312
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
313
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
314
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
315
                                goto error;
316
                        }
317
                        amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
318
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
319
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
320
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
321
                                goto error;
322
                        }
323
                }
324
                rmq_client->admin_api_enabled = FALSE;
325
                if(rmq_admin_api_enabled) {
326
                        rmq_client->admin_api_enabled = TRUE;
327
                        JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin);
328
                        rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
329
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
330
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
331
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
332
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
333
                                goto error;
334
                        }
335
                        JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin);
336
                        rmq_client->from_janus_admin_queue = amqp_cstring_bytes(from_janus_admin);
337
                        amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
338
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
339
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
340
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
341
                                goto error;
342
                        }
343
                        amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
344
                        result = amqp_get_rpc_reply(rmq_client->rmq_conn);
345
                        if(result.reply_type != AMQP_RESPONSE_NORMAL) {
346
                                JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
347
                                goto error;
348
                        }
349
                }
350
                rmq_client->messages = g_async_queue_new();
351
                rmq_client->destroy = 0;
352
                GError *error = NULL;
353
                rmq_client->in_thread = g_thread_try_new("rmq_in_thread", &janus_rmq_in_thread, rmq_client, &error);
354
                if(error != NULL) {
355
                        /* Something went wrong... */
356
                        JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ incoming thread...\n", error->code, error->message ? error->message : "??");
357
                        g_free(rmq_client);
358
                        janus_config_destroy(config);
359
                        return -1;
360
                }
361
                rmq_client->out_thread = g_thread_try_new("rmq_out_thread", &janus_rmq_out_thread, rmq_client, &error);
362
                if(error != NULL) {
363
                        /* Something went wrong... */
364
                        JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ outgoing thread...\n", error->code, error->message ? error->message : "??");
365
                        g_free(rmq_client);
366
                        janus_config_destroy(config);
367
                        return -1;
368
                }
369
                janus_mutex_init(&rmq_client->mutex);
370
                /* Done */
371
                JANUS_LOG(LOG_INFO, "Setup of RabbitMQ integration completed\n");
372
        }
373
        g_free(rmqhost);
374
        janus_config_destroy(config);
375
        config = NULL;
376

    
377
        /* Done */
378
        g_atomic_int_set(&initialized, 1);
379
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQ_NAME);
380
        return 0;
381

    
382
error:
383
        /* If we got here, something went wrong */
384
        if(rmq_client)
385
                g_free(rmq_client);
386
        if(rmqhost)
387
                g_free(rmqhost);
388
        if(vhost)
389
                g_free((char *)vhost);
390
        if(username)
391
                g_free((char *)username);
392
        if(password)
393
                g_free((char *)password);
394
        if(to_janus)
395
                g_free((char *)to_janus);
396
        if(from_janus)
397
                g_free((char *)from_janus);
398
        if(to_janus_admin)
399
                g_free((char *)to_janus_admin);
400
        if(from_janus_admin)
401
                g_free((char *)from_janus_admin);
402
        if(config)
403
                janus_config_destroy(config);
404
        return -1;
405
}
406

    
407
void janus_rabbitmq_destroy(void) {
408
        if(!g_atomic_int_get(&initialized))
409
                return;
410
        g_atomic_int_set(&stopping, 1);
411

    
412
        if(rmq_client) {
413
                rmq_client->destroy = 1;
414
                g_async_queue_push(rmq_client->messages, &exit_message);
415
                if(rmq_client->in_thread)
416
                        g_thread_join(rmq_client->in_thread);
417
                if(rmq_client->in_thread)
418
                        g_thread_join(rmq_client->out_thread);
419
                if(rmq_client->rmq_conn && rmq_client->rmq_channel) {
420
                        amqp_channel_close(rmq_client->rmq_conn, rmq_client->rmq_channel, AMQP_REPLY_SUCCESS);
421
                        amqp_connection_close(rmq_client->rmq_conn, AMQP_REPLY_SUCCESS);
422
                        amqp_destroy_connection(rmq_client->rmq_conn);
423
                }
424
                if(rmq_client->to_janus_queue.bytes)
425
                        g_free((char *)rmq_client->to_janus_queue.bytes);
426
                if(rmq_client->from_janus_queue.bytes)
427
                        g_free((char *)rmq_client->from_janus_queue.bytes);
428
                if(rmq_client->to_janus_admin_queue.bytes)
429
                        g_free((char *)rmq_client->to_janus_admin_queue.bytes);
430
                if(rmq_client->from_janus_admin_queue.bytes)
431
                        g_free((char *)rmq_client->from_janus_admin_queue.bytes);
432
        }
433
        g_free(rmq_client);
434

    
435
        g_atomic_int_set(&initialized, 0);
436
        g_atomic_int_set(&stopping, 0);
437
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_RABBITMQ_NAME);
438
}
439

    
440
int janus_rabbitmq_get_api_compatibility(void) {
441
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
442
        return JANUS_TRANSPORT_API_VERSION;
443
}
444

    
445
int janus_rabbitmq_get_version(void) {
446
        return JANUS_RABBITMQ_VERSION;
447
}
448

    
449
const char *janus_rabbitmq_get_version_string(void) {
450
        return JANUS_RABBITMQ_VERSION_STRING;
451
}
452

    
453
const char *janus_rabbitmq_get_description(void) {
454
        return JANUS_RABBITMQ_DESCRIPTION;
455
}
456

    
457
const char *janus_rabbitmq_get_name(void) {
458
        return JANUS_RABBITMQ_NAME;
459
}
460

    
461
const char *janus_rabbitmq_get_author(void) {
462
        return JANUS_RABBITMQ_AUTHOR;
463
}
464

    
465
const char *janus_rabbitmq_get_package(void) {
466
        return JANUS_RABBITMQ_PACKAGE;
467
}
468

    
469
gboolean janus_rabbitmq_is_janus_api_enabled(void) {
470
        return rmq_janus_api_enabled;
471
}
472

    
473
gboolean janus_rabbitmq_is_admin_api_enabled(void) {
474
        return rmq_admin_api_enabled;
475
}
476

    
477
int janus_rabbitmq_send_message(void *transport, void *request_id, gboolean admin, json_t *message) {
478
        if(rmq_client == NULL)
479
                return -1;
480
        if(message == NULL)
481
                return -1;
482
        if(transport == NULL) {
483
                json_decref(message);
484
                return -1;
485
        }
486
        JANUS_LOG(LOG_HUGE, "Sending %s API %s via RabbitMQ\n", admin ? "admin" : "Janus", request_id ? "response" : "event");
487
        /* FIXME Add to the queue of outgoing messages */
488
        janus_rabbitmq_response *response = (janus_rabbitmq_response *)g_malloc0(sizeof(janus_rabbitmq_response));
489
        response->admin = admin;
490
        response->payload = message;
491
        response->correlation_id = (char *)request_id;
492
        g_async_queue_push(rmq_client->messages, response);
493
        return 0;
494
}
495

    
496
void janus_rabbitmq_session_created(void *transport, guint64 session_id) {
497
        /* We don't care */
498
}
499

    
500
void janus_rabbitmq_session_over(void *transport, guint64 session_id, gboolean timeout) {
501
        /* We don't care, not even if it's a timeout (should we?), our client is always up */
502
}
503

    
504

    
505
/* Threads */
506
void *janus_rmq_in_thread(void *data) {
507
        if(rmq_client == NULL) {
508
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
509
                return NULL;
510
        }
511
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ in thread\n");
512

    
513
        struct timeval timeout;
514
        timeout.tv_sec = 0;
515
        timeout.tv_usec = 20000;
516
        amqp_frame_t frame;
517
        while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
518
                amqp_maybe_release_buffers(rmq_client->rmq_conn);
519
                /* Wait for a frame */
520
                int res = amqp_simple_wait_frame_noblock(rmq_client->rmq_conn, &frame, &timeout);
521
                if(res != AMQP_STATUS_OK) {
522
                        /* No data */
523
                        if(res == AMQP_STATUS_TIMEOUT)
524
                                continue;
525
                        JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));
526
                        break;
527
                }
528
                /* We expect method first */
529
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
530
                if(frame.frame_type != AMQP_FRAME_METHOD)
531
                        continue;
532
                JANUS_LOG(LOG_VERB, "Method %s\n", amqp_method_name(frame.payload.method.id));
533
                gboolean admin = FALSE;
534
                if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD) {
535
                        amqp_basic_deliver_t *d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
536
                        JANUS_LOG(LOG_VERB, "Delivery #%u, %.*s\n", (unsigned) d->delivery_tag, (int) d->routing_key.len, (char *) d->routing_key.bytes);
537
                        /* Check if this is a Janus or Admin API request */
538
                        if(rmq_client->admin_api_enabled) {
539
                                if(d->routing_key.len == rmq_client->to_janus_admin_queue.len) {
540
                                        size_t i=0;
541
                                        admin = TRUE;
542
                                        char *inq = (char *)d->routing_key.bytes;
543
                                        char *expq = (char *)rmq_client->to_janus_admin_queue.bytes;
544
                                        for(i=0; i< d->routing_key.len; i++) {
545
                                                if(inq[i] != expq[i]) {
546
                                                        admin = FALSE;
547
                                                        break;
548
                                                }
549
                                        }
550
                                }
551
                        }
552
                        JANUS_LOG(LOG_VERB, "  -- This is %s API request\n", admin ? "an admin" : "a Janus");
553
                }
554
                /* Then the header */
555
                amqp_simple_wait_frame(rmq_client->rmq_conn, &frame);
556
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
557
                if(frame.frame_type != AMQP_FRAME_HEADER)
558
                        continue;
559
                amqp_basic_properties_t *p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
560
                if(p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
561
                        JANUS_LOG(LOG_VERB, "  -- Reply-to: %.*s\n", (int) p->reply_to.len, (char *) p->reply_to.bytes);
562
                }
563
                char *correlation = NULL;
564
                if(p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
565
                        correlation = (char *)g_malloc0(p->correlation_id.len+1);
566
                        sprintf(correlation, "%.*s", (int) p->correlation_id.len, (char *) p->correlation_id.bytes);
567
                        JANUS_LOG(LOG_VERB, "  -- Correlation-id: %s\n", correlation);
568
                }
569
                if(p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
570
                        JANUS_LOG(LOG_VERB, "  -- Content-type: %.*s\n", (int) p->content_type.len, (char *) p->content_type.bytes);
571
                }
572
                /* And the body */
573
                uint64_t total = frame.payload.properties.body_size, received = 0;
574
                char *payload = (char *)g_malloc0(total+1), *index = payload;
575
                while(received < total) {
576
                        amqp_simple_wait_frame(rmq_client->rmq_conn, &frame);
577
                        JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
578
                        if(frame.frame_type != AMQP_FRAME_BODY)
579
                                break;
580
                        sprintf(index, "%.*s", (int) frame.payload.body_fragment.len, (char *) frame.payload.body_fragment.bytes);
581
                        received += frame.payload.body_fragment.len;
582
                        index = payload+received;
583
                }
584
                JANUS_LOG(LOG_VERB, "Got %"SCNu64"/%"SCNu64" bytes from the %s queue (%"SCNu64")\n",
585
                        received, total, admin ? "admin API" : "Janus API", frame.payload.body_fragment.len);
586
                JANUS_LOG(LOG_VERB, "%s\n", payload);
587
                /* Parse the JSON payload */
588
                json_error_t error;
589
                json_t *root = json_loads(payload, 0, &error);
590
                g_free(payload);
591
                /* Notify the core, passing both the object and, since it may be needed, the error
592
                 * We also specify the correlation ID as an opaque request identifier: we'll need it later */
593
                gateway->incoming_request(&janus_rabbitmq_transport, rmq_client, correlation, admin, root, &error);
594
        }
595
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ in thread\n");
596
        return NULL;
597
}
598

    
599
void *janus_rmq_out_thread(void *data) {
600
        if(rmq_client == NULL) {
601
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
602
                return NULL;
603
        }
604
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ out thread\n");
605
        while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
606
                /* We send messages from here as well, not only notifications */
607
                janus_rabbitmq_response *response = g_async_queue_pop(rmq_client->messages);
608
                if(response == NULL)
609
                        continue;
610
                if(response == &exit_message)
611
                        break;
612
                if(!rmq_client->destroy && !g_atomic_int_get(&stopping) && response->payload) {
613
                        janus_mutex_lock(&rmq_client->mutex);
614
                        /* Gotcha! Convert json_t to string */
615
                        char *payload_text = json_dumps(response->payload, json_format);
616
                        json_decref(response->payload);
617
                        response->payload = NULL;
618
                        JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes)...\n", response->admin ? "Admin" : "Janus", strlen(payload_text));
619
                        JANUS_LOG(LOG_VERB, "%s\n", payload_text);
620
                        amqp_basic_properties_t props;
621
                        props._flags = 0;
622
                        props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
623
                        props.reply_to = amqp_cstring_bytes("Janus");
624
                        if(response->correlation_id) {
625
                                props._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
626
                                props.correlation_id = amqp_cstring_bytes(response->correlation_id);
627
                        }
628
                        props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
629
                        props.content_type = amqp_cstring_bytes("application/json");
630
                        amqp_bytes_t message = amqp_cstring_bytes(payload_text);
631
                        int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_empty_bytes,
632
                                response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
633
                                0, 0, &props, message);
634
                        if(status != AMQP_STATUS_OK) {
635
                                JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
636
                        }
637
                        g_free(response->correlation_id);
638
                        response->correlation_id = NULL;
639
                        g_free(payload_text);
640
                        payload_text = NULL;
641
                        g_free(response);
642
                        response = NULL;
643
                        janus_mutex_unlock(&rmq_client->mutex);
644
                }
645
        }
646
        g_async_queue_unref(rmq_client->messages);
647
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ out thread\n");
648
        return NULL;
649
}