Statistics
| Branch: | Revision:

janus-gateway / transports / janus_http.c @ 74a68610

History | View | Annotate | Download (72.7 KB)

1
/*! \file   janus_http.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus RESTs transport plugin
5
 * \details  This is an implementation of a RESTs transport for the
6
 * Janus API, using the libmicrohttpd library (http://www.gnu.org/software/libmicrohttpd/).
7
 * This module allows browsers to make use of HTTP to talk to the gateway.
8
 * Since the gateway may be deployed on a different domain than the web
9
 * server hosting the web applications using it, the gateway automatically
10
 * handles OPTIONS request to comply with the CORS specification.
11
 * POST requests can be used to ask for the management of a session with
12
 * the gateway, to attach to a plugin, to send messages to the plugin
13
 * itself and so on. GET requests instead are used for getting events
14
 * associated to a gateway session (and as such to all its plugin handles
15
 * and the events plugins push in the session itself), using a long poll
16
 * approach. A JavaScript library (janus.js) implements all of this on
17
 * the client side automatically.
18
 * \note There's a well known bug in libmicrohttpd that may cause it to
19
 * spike to 100% of the CPU when using HTTPS on some distributions. In
20
 * case you're interested in HTTPS support, it's better to just rely on
21
 * HTTP in Janus, and put a frontend like Apache HTTPD or nginx to take
22
 * care of securing the traffic. More details are available in \ref deploy.
23
 * 
24
 * \ingroup transports
25
 * \ref transports
26
 */
27

    
28
#include "transport.h"
29

    
30
#include <arpa/inet.h>
31
#include <ifaddrs.h>
32
#include <net/if.h>
33
#include <sys/socket.h>
34
#include <sys/types.h>
35
#include <netdb.h>
36

    
37
#include <microhttpd.h>
38

    
39
#include "../debug.h"
40
#include "../apierror.h"
41
#include "../config.h"
42
#include "../mutex.h"
43
#include "../ip-utils.h"
44
#include "../utils.h"
45

    
46

    
47
/* Transport plugin information */
48
#define JANUS_REST_VERSION                        2
49
#define JANUS_REST_VERSION_STRING        "0.0.2"
50
#define JANUS_REST_DESCRIPTION                "This transport plugin adds REST (HTTP/HTTPS) support to the Janus API via libmicrohttpd."
51
#define JANUS_REST_NAME                                "JANUS REST (HTTP/HTTPS) transport plugin"
52
#define JANUS_REST_AUTHOR                        "Meetecho s.r.l."
53
#define JANUS_REST_PACKAGE                        "janus.transport.http"
54

    
55
/* Transport methods */
56
janus_transport *create(void);
57
int janus_http_init(janus_transport_callbacks *callback, const char *config_path);
58
void janus_http_destroy(void);
59
int janus_http_get_api_compatibility(void);
60
int janus_http_get_version(void);
61
const char *janus_http_get_version_string(void);
62
const char *janus_http_get_description(void);
63
const char *janus_http_get_name(void);
64
const char *janus_http_get_author(void);
65
const char *janus_http_get_package(void);
66
gboolean janus_http_is_janus_api_enabled(void);
67
gboolean janus_http_is_admin_api_enabled(void);
68
int janus_http_send_message(void *transport, void *request_id, gboolean admin, json_t *message);
69
void janus_http_session_created(void *transport, guint64 session_id);
70
void janus_http_session_over(void *transport, guint64 session_id, gboolean timeout);
71

    
72

    
73
/* Transport setup */
74
static janus_transport janus_http_transport =
75
        JANUS_TRANSPORT_INIT (
76
                .init = janus_http_init,
77
                .destroy = janus_http_destroy,
78

    
79
                .get_api_compatibility = janus_http_get_api_compatibility,
80
                .get_version = janus_http_get_version,
81
                .get_version_string = janus_http_get_version_string,
82
                .get_description = janus_http_get_description,
83
                .get_name = janus_http_get_name,
84
                .get_author = janus_http_get_author,
85
                .get_package = janus_http_get_package,
86

    
87
                .is_janus_api_enabled = janus_http_is_janus_api_enabled,
88
                .is_admin_api_enabled = janus_http_is_admin_api_enabled,
89

    
90
                .send_message = janus_http_send_message,
91
                .session_created = janus_http_session_created,
92
                .session_over = janus_http_session_over,
93
        );
94

    
95
/* Transport creator */
96
janus_transport *create(void) {
97
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_REST_NAME);
98
        return &janus_http_transport;
99
}
100

    
101

    
102
/* Useful stuff */
103
static gint initialized = 0, stopping = 0;
104
static janus_transport_callbacks *gateway = NULL;
105
static gboolean http_janus_api_enabled = FALSE;
106
static gboolean http_admin_api_enabled = FALSE;
107
static gboolean notify_events = TRUE;
108

    
109
/* JSON serialization options */
110
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
111

    
112

    
113
/* Incoming HTTP message */
114
typedef struct janus_http_msg {
115
        struct MHD_Connection *connection;        /* The MHD connection this message came from */
116
        gchar *acrh;                                                /* Value of the Access-Control-Request-Headers HTTP header, if any (needed for CORS) */
117
        gchar *acrm;                                                /* Value of the Access-Control-Request-Method HTTP header, if any (needed for CORS) */
118
        gchar *contenttype;                                        /* Content-Type of the payload */
119
        gchar *payload;                                                /* Payload of the message */
120
        size_t len;                                                        /* Length of the message in octets */
121
        gint64 session_id;                                        /* Gateway-Client session identifier this message belongs to */
122
        janus_mutex wait_mutex;                                /* Mutex to wait on the response condition */
123
        janus_condition wait_cond;                        /* Response condition */
124
        gboolean got_response;                                /* Whether this message got a response from the core */
125
        json_t *response;                                        /* The response from the core */
126
} janus_http_msg;
127
static GHashTable *messages = NULL;
128
static janus_mutex messages_mutex;
129

    
130

    
131
/* Helper for long poll: HTTP events to push per session */
132
typedef struct janus_http_session {
133
        GAsyncQueue *events;        /* Events to notify for this session */
134
        gint64 destroyed;                /* Whether this session has been destroyed */
135
} janus_http_session;
136
/* We keep track of created sessions as we handle long polls */
137
const char *keepalive_id = "keepalive";
138
GHashTable *sessions = NULL;
139
GList *old_sessions = NULL;
140
GThread *sessions_watchdog = NULL;
141
janus_mutex sessions_mutex;
142

    
143

    
144
/* Callback (libmicrohttpd) invoked when a new connection is attempted on the REST API */
145
int janus_http_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen);
146
/* Callback (libmicrohttpd) invoked when a new connection is attempted on the admin/monitor webserver */
147
int janus_http_admin_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen);
148
/* Callback (libmicrohttpd) invoked when an HTTP message (GET, POST, OPTIONS, etc.) is available */
149
int janus_http_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr);
150
/* Callback (libmicrohttpd) invoked when an admin/monitor HTTP message (GET, POST, OPTIONS, etc.) is available */
151
int janus_http_admin_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr);
152
/* Callback (libmicrohttpd) invoked when headers of an incoming HTTP message have been parsed */
153
int janus_http_headers(void *cls, enum MHD_ValueKind kind, const char *key, const char *value);
154
/* Callback (libmicrohttpd) invoked when a request has been processed and can be freed */
155
void janus_http_request_completed (void *cls, struct MHD_Connection *connection, void **con_cls, enum MHD_RequestTerminationCode toe);
156
/* Worker to handle requests that are actually long polls */
157
int janus_http_notifier(janus_http_msg *msg, int max_events);
158
/* Helper to quickly send a success response */
159
int janus_http_return_success(janus_http_msg *msg, char *payload);
160
/* Helper to quickly send an error response */
161
int janus_http_return_error(janus_http_msg *msg, uint64_t session_id, const char *transaction, gint error, const char *format, ...) G_GNUC_PRINTF(5, 6);
162

    
163

    
164
/* MHD Web Server */
165
static struct MHD_Daemon *ws = NULL, *sws = NULL;
166
static char *ws_path = NULL;
167
static char *cert_pem_bytes = NULL, *cert_key_bytes = NULL; 
168

    
169
/* Admin/Monitor MHD Web Server */
170
static struct MHD_Daemon *admin_ws = NULL, *admin_sws = NULL;
171
static char *admin_ws_path = NULL;
172

    
173

    
174
/* REST and Admin/Monitor ACL list */
175
GList *janus_http_access_list = NULL, *janus_http_admin_access_list = NULL;
176
janus_mutex access_list_mutex;
177
static void janus_http_allow_address(const char *ip, gboolean admin) {
178
        if(ip == NULL)
179
                return;
180
        /* Is this an IP or an interface? */
181
        janus_mutex_lock(&access_list_mutex);
182
        if(!admin)
183
                janus_http_access_list = g_list_append(janus_http_access_list, (gpointer)ip);
184
        else
185
                janus_http_admin_access_list = g_list_append(janus_http_admin_access_list, (gpointer)ip);
186
        janus_mutex_unlock(&access_list_mutex);
187
}
188
static gboolean janus_http_is_allowed(const char *ip, gboolean admin) {
189
        if(ip == NULL)
190
                return FALSE;
191
        if(!admin && janus_http_access_list == NULL)
192
                return TRUE;
193
        if(admin && janus_http_admin_access_list == NULL)
194
                return TRUE;
195
        janus_mutex_lock(&access_list_mutex);
196
        GList *temp = admin ? janus_http_admin_access_list : janus_http_access_list;
197
        while(temp) {
198
                const char *allowed = (const char *)temp->data;
199
                if(allowed != NULL && strstr(ip, allowed)) {
200
                        janus_mutex_unlock(&access_list_mutex);
201
                        return TRUE;
202
                }
203
                temp = temp->next;
204
        }
205
        janus_mutex_unlock(&access_list_mutex);
206
        return FALSE;
207
}
208

    
209
/* Helper method to get the port from a struct sockaddr */
210
static uint16_t janus_http_sockaddr_to_port(struct sockaddr *address) {
211
        if(address == NULL)
212
                return 0;
213
        struct sockaddr_in *sin = NULL;
214
        struct sockaddr_in6 *sin6 = NULL;
215

    
216
        switch(address->sa_family) {
217
                case AF_INET:
218
                        sin = (struct sockaddr_in *)address;
219
                        return ntohs(sin->sin_port);
220
                case AF_INET6:
221
                        sin6 = (struct sockaddr_in6 *)address;
222
                        return ntohs(sin6->sin6_port);
223
                default:
224
                        /* Unknown family */
225
                        break;
226
        }
227
        return 0;
228
}
229

    
230
/* Random string helper (for transactions) */
231
static char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
232
static void janus_http_random_string(int length, char *buffer) {
233
        if(length > 0 && buffer) {
234
                int l = (int)(sizeof(charset)-1);
235
                int i=0;
236
                for(i=0; i<length; i++) {
237
                        int key = rand() % l;
238
                        buffer[i] = charset[key];
239
                }
240
                buffer[length-1] = '\0';
241
        }
242
}
243

    
244

    
245
/* Helper to create a MHD daemon */
246
static struct MHD_Daemon *janus_http_create_daemon(gboolean admin, char *path,
247
                const char *interface, const char *ip, int port,
248
                gint64 threads, const char *server_pem, const char *server_key) {
249
        struct MHD_Daemon *daemon = NULL;
250
        gboolean secure = server_pem && server_key;
251
        /* Any interface or IP address we need to limit ourselves to?
252
         * NOTE WELL: specifying an interface does NOT bind to all IPs associated
253
         * with that interface, but only to the first one that's detected */
254
        static struct sockaddr_in addr;
255
        struct sockaddr_in6 addr6;
256
        gboolean ipv6 = FALSE;
257
        if(ip && strstr(ip, ":"))
258
                ipv6 = TRUE;
259
        if(ip || interface) {
260
                gboolean found = FALSE;
261
                struct ifaddrs *ifaddr = NULL, *ifa = NULL;
262
                int family = 0, s = 0, n = 0;
263
                char host[NI_MAXHOST];
264
                if(getifaddrs(&ifaddr) == -1) {
265
                        JANUS_LOG(LOG_ERR, "Error getting list of interfaces to bind %s API %s webserver...\n",
266
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
267
                        return NULL;
268
                } else {
269
                        for(ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) {
270
                                family = ifa->ifa_addr->sa_family;
271
                                if(interface && strcasecmp(ifa->ifa_name, interface))
272
                                        continue;
273
                                if(ifa->ifa_addr == NULL)
274
                                        continue;
275
                                /* Skip interfaces which are not up and running */
276
                                if(!((ifa->ifa_flags & IFF_UP) && (ifa->ifa_flags & IFF_RUNNING)))
277
                                        continue;
278
                                /* FIXME When being explicit about the interface only, we only bind IPv4 for now:
279
                                 * specifying or adding a precise IPv6 address gets you an IPv6 binding instead */
280
                                if(!ipv6 && family == AF_INET) {
281
                                        s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
282
                                        if(s != 0) {
283
                                                JANUS_LOG(LOG_ERR, "Error doing a getnameinfo() to bind %s API %s webserver to '%s'...\n",
284
                                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", ip ? ip : interface);
285
                                                return NULL;
286
                                        }
287
                                        if(ip && strcmp(host, ip))
288
                                                continue;
289
                                        found = TRUE;
290
                                        break;
291
                                } else if(ipv6 && family == AF_INET6) {
292
                                        s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in6), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
293
                                        if(s != 0) {
294
                                                JANUS_LOG(LOG_ERR, "Error doing a getnameinfo() to bind %s API %s webserver to '%s'...\n",
295
                                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", ip ? ip : interface);
296
                                                return NULL;
297
                                        }
298
                                        if(ip && strcmp(host, ip))
299
                                                continue;
300
                                        found = TRUE;
301
                                        break;
302
                                }
303
                        }
304
                        freeifaddrs(ifaddr);
305
                }
306
                if(!found) {
307
                        JANUS_LOG(LOG_ERR, "Error binding to %s '%s' for %s API %s webserver...\n",
308
                                ip ? "IP" : "interface", ip ? ip : interface,
309
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
310
                        return NULL;
311
                }
312
                JANUS_LOG(LOG_VERB, "Going to bind the %s API %s webserver to %s (asked for %s)\n",
313
                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", host, ip ? ip : interface);
314
                if(!ipv6) {
315
                        memset(&addr, 0, sizeof (struct sockaddr_in));
316
                        addr.sin_family = AF_INET;
317
                        addr.sin_port = htons(port);
318
                        int res = inet_pton(AF_INET, host, &addr.sin_addr);
319
                        if(res != 1) {
320
                                JANUS_LOG(LOG_ERR, "Failed to convert address '%s' (%d)\n", host, res);
321
                                return NULL;
322
                        }
323
                } else {
324
                        memset(&addr6, 0, sizeof (struct sockaddr_in6));
325
                        addr6.sin6_family = AF_INET6;
326
                        addr6.sin6_port = htons(port);
327
                        int res = inet_pton(AF_INET6, host, &addr6.sin6_addr);
328
                        if(res != 1) {
329
                                JANUS_LOG(LOG_ERR, "Failed to convert address '%s' (%d)\n", host, res);
330
                                return NULL;
331
                        }
332
                }
333
        }
334

    
335
        if(!secure) {
336
                /* HTTP web server */
337
                if(threads == 0) {
338
                        JANUS_LOG(LOG_VERB, "Using a thread per connection for the %s API %s webserver\n",
339
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
340
                        if(!interface && !ip) {
341
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
342
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
343
                                /* Bind to all interfaces */
344
                                daemon = MHD_start_daemon(
345
                                        MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | MHD_USE_DUAL_STACK,
346
                                        port,
347
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
348
                                        NULL,
349
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
350
                                        path,
351
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
352
                                        MHD_OPTION_END);
353
                        } else {
354
                                /* Bind to the interface that was specified */
355
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
356
                                        ip ? "IP" : "interface", ip ? ip : interface,
357
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
358
                                daemon = MHD_start_daemon(
359
                                        MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | (ipv6 ? MHD_USE_IPv6 : 0),
360
                                        port,
361
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
362
                                        NULL,
363
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
364
                                        path,
365
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
366
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
367
                                        MHD_OPTION_END);
368
                        }
369
                } else {
370
                        JANUS_LOG(LOG_VERB, "Using a thread pool of size %"SCNi64" the %s API %s webserver\n", threads,
371
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
372
                        if(!interface && !ip) {
373
                                /* Bind to all interfaces */
374
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
375
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
376
                                daemon = MHD_start_daemon(
377
                                        MHD_USE_SELECT_INTERNALLY | MHD_USE_DUAL_STACK,
378
                                        port,
379
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
380
                                        NULL,
381
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
382
                                        path,
383
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
384
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
385
                                        MHD_OPTION_END);
386
                        } else {
387
                                /* Bind to the interface that was specified */
388
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
389
                                        ip ? "IP" : "interface", ip ? ip : interface,
390
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
391
                                daemon = MHD_start_daemon(
392
                                        MHD_USE_SELECT_INTERNALLY | (ipv6 ? MHD_USE_IPv6 : 0),
393
                                        port,
394
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
395
                                        NULL,
396
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
397
                                        path,
398
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
399
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
400
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
401
                                        MHD_OPTION_END);
402
                        }
403
                }
404
        } else {
405
                /* HTTPS web server, read certificate and key */
406
                g_file_get_contents(server_pem, &cert_pem_bytes, NULL, NULL);
407
                g_file_get_contents(server_key, &cert_key_bytes, NULL, NULL);
408

    
409
                /* Start webserver */
410
                if(threads == 0) {
411
                        JANUS_LOG(LOG_VERB, "Using a thread per connection for the %s API %s webserver\n",
412
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
413
                        if(!interface && !ip) {
414
                                /* Bind to all interfaces */
415
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
416
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
417
                                daemon = MHD_start_daemon(
418
                                        MHD_USE_SSL | MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | MHD_USE_DUAL_STACK,
419
                                        port,
420
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
421
                                        NULL,
422
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
423
                                        path,
424
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
425
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
426
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
427
                                        MHD_OPTION_END);
428
                        } else {
429
                                /* Bind to the interface that was specified */
430
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
431
                                        ip ? "IP" : "interface", ip ? ip : interface,
432
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
433
                                daemon = MHD_start_daemon(
434
                                        MHD_USE_SSL | MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | (ipv6 ? MHD_USE_IPv6 : 0),
435
                                        port,
436
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
437
                                        NULL,
438
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
439
                                        path,
440
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
441
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
442
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
443
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
444
                                        MHD_OPTION_END);
445
                        }
446
                } else {
447
                        JANUS_LOG(LOG_VERB, "Using a thread pool of size %"SCNi64" the %s API %s webserver\n", threads,
448
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
449
                        if(!interface && !ip) {
450
                                /* Bind to all interfaces */
451
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
452
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
453
                                daemon = MHD_start_daemon(
454
                                        MHD_USE_SSL | MHD_USE_SELECT_INTERNALLY | MHD_USE_DUAL_STACK,
455
                                        port,
456
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
457
                                        NULL,
458
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
459
                                        path,
460
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
461
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
462
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
463
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
464
                                        MHD_OPTION_END);
465
                        } else {
466
                                /* Bind to the interface that was specified */
467
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
468
                                        ip ? "IP" : "interface", ip ? ip : interface,
469
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
470
                                daemon = MHD_start_daemon(
471
                                        MHD_USE_SSL | MHD_USE_SELECT_INTERNALLY | (ipv6 ? MHD_USE_IPv6 : 0),
472
                                        port,
473
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
474
                                        NULL,
475
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
476
                                        path,
477
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
478
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
479
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
480
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
481
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
482
                                        MHD_OPTION_END);
483
                        }
484
                }
485
        }
486
        return daemon;
487
}
488

    
489

    
490
/* HTTP/Janus sessions watchdog/garbage collector (sort of) */
491
static void *janus_http_sessions_watchdog(void *data) {
492
        JANUS_LOG(LOG_INFO, "HTTP/Janus sessions watchdog started\n");
493
        gint64 now = 0;
494
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
495
                janus_mutex_lock(&sessions_mutex);
496
                /* Iterate on all the sessions */
497
                now = janus_get_monotonic_time();
498
                if(old_sessions != NULL) {
499
                        GList *sl = old_sessions;
500
                        JANUS_LOG(LOG_HUGE, "Checking %d old HTTP/Janus sessions sessions...\n", g_list_length(old_sessions));
501
                        while(sl) {
502
                                janus_http_session *session = (janus_http_session *)sl->data;
503
                                if(!session) {
504
                                        sl = sl->next;
505
                                        continue;
506
                                }
507
                                if(now-session->destroyed >= G_USEC_PER_SEC) {
508
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
509
                                        JANUS_LOG(LOG_VERB, "Freeing old HTTP/Janus session\n");
510
                                        GList *rm = sl->next;
511
                                        old_sessions = g_list_delete_link(old_sessions, sl);
512
                                        sl = rm;
513
                                        /* Remove all events */
514
                                        json_t *event = NULL;
515
                                        while((event = g_async_queue_try_pop(session->events)) != NULL)
516
                                                json_decref(event);
517
                                        g_async_queue_unref(session->events);
518
                                        g_free(session);
519
                                        continue;
520
                                }
521
                                sl = sl->next;
522
                        }
523
                }
524
                janus_mutex_unlock(&sessions_mutex);
525
                g_usleep(500000);
526
        }
527
        JANUS_LOG(LOG_INFO, "HTTP/Janus sessions watchdog stopped\n");
528
        return NULL;
529
}
530

    
531

    
532
/* Transport implementation */
533
int janus_http_init(janus_transport_callbacks *callback, const char *config_path) {
534
        if(g_atomic_int_get(&stopping)) {
535
                /* Still stopping from before */
536
                return -1;
537
        }
538
        if(callback == NULL || config_path == NULL) {
539
                /* Invalid arguments */
540
                return -1;
541
        }
542

    
543
        /* This is the callback we'll need to invoke to contact the gateway */
544
        gateway = callback;
545

    
546
        /* Read configuration */
547
        char filename[255];
548
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_REST_PACKAGE);
549
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
550
        janus_config *config = janus_config_parse(filename);
551
        if(config != NULL) {
552
                janus_config_print(config);
553

    
554
                /* Handle configuration */
555
                janus_config_item *item = janus_config_get_item_drilldown(config, "general", "json");
556
                if(item && item->value) {
557
                        /* Check how we need to format/serialize the JSON output */
558
                        if(!strcasecmp(item->value, "indented")) {
559
                                /* Default: indented, we use three spaces for that */
560
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
561
                        } else if(!strcasecmp(item->value, "plain")) {
562
                                /* Not indented and no new lines, but still readable */
563
                                json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
564
                        } else if(!strcasecmp(item->value, "compact")) {
565
                                /* Compact, so no spaces between separators */
566
                                json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
567
                        } else {
568
                                JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
569
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
570
                        }
571
                }
572

    
573
                /* Check if we need to send events to handlers */
574
                janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
575
                if(events != NULL && events->value != NULL)
576
                        notify_events = janus_is_true(events->value);
577
                if(!notify_events && callback->events_is_enabled()) {
578
                        JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_REST_NAME);
579
                }
580

    
581
                /* Check the base paths */
582
                item = janus_config_get_item_drilldown(config, "general", "base_path");
583
                if(item && item->value) {
584
                        if(item->value[0] != '/') {
585
                                JANUS_LOG(LOG_FATAL, "Invalid base path %s (it should start with a /, e.g., /janus\n", item->value);
586
                                return -1;
587
                        }
588
                        ws_path = g_strdup(item->value);
589
                        if(strlen(ws_path) > 1 && ws_path[strlen(ws_path)-1] == '/') {
590
                                /* Remove the trailing slash, it makes things harder when we parse requests later */
591
                                ws_path[strlen(ws_path)-1] = '\0';
592
                        }
593
                } else {
594
                        ws_path = g_strdup("/janus");
595
                }
596
                /* Do the same for the admin/monitor interface */
597
                item = janus_config_get_item_drilldown(config, "admin", "admin_base_path");
598
                if(item && item->value) {
599
                        if(item->value[0] != '/') {
600
                                JANUS_LOG(LOG_FATAL, "Invalid admin/monitor base path %s (it should start with a /, e.g., /admin\n", item->value);
601
                                return -1;
602
                        }
603
                        admin_ws_path = g_strdup(item->value);
604
                        if(strlen(admin_ws_path) > 1 && ws_path[strlen(admin_ws_path)-1] == '/') {
605
                                /* Remove the trailing slash, it makes things harder when we parse requests later */
606
                                admin_ws_path[strlen(admin_ws_path)-1] = '\0';
607
                        }
608
                } else {
609
                        admin_ws_path = g_strdup("/admin");
610
                }
611

    
612
                /* Any ACL for either the Janus or Admin API? */
613
                item = janus_config_get_item_drilldown(config, "general", "acl");
614
                if(item && item->value) {
615
                        gchar **list = g_strsplit(item->value, ",", -1);
616
                        gchar *index = list[0];
617
                        if(index != NULL) {
618
                                int i=0;
619
                                while(index != NULL) {
620
                                        if(strlen(index) > 0) {
621
                                                JANUS_LOG(LOG_INFO, "Adding '%s' to the Janus API allowed list...\n", index);
622
                                                janus_http_allow_address(g_strdup(index), FALSE);
623
                                        }
624
                                        i++;
625
                                        index = list[i];
626
                                }
627
                        }
628
                        g_strfreev(list);
629
                        list = NULL;
630
                }
631
                item = janus_config_get_item_drilldown(config, "admin", "admin_acl");
632
                if(item && item->value) {
633
                        gchar **list = g_strsplit(item->value, ",", -1);
634
                        gchar *index = list[0];
635
                        if(index != NULL) {
636
                                int i=0;
637
                                while(index != NULL) {
638
                                        if(strlen(index) > 0) {
639
                                                JANUS_LOG(LOG_INFO, "Adding '%s' to the Admin/monitor allowed list...\n", index);
640
                                                janus_http_allow_address(g_strdup(index), TRUE);
641
                                        }
642
                                        i++;
643
                                        index = list[i];
644
                                }
645
                        }
646
                        g_strfreev(list);
647
                        list = NULL;
648
                }
649

    
650
                /* Start with the Janus API web server now */
651
                gint64 threads = 0;
652
                item = janus_config_get_item_drilldown(config, "general", "threads");
653
                if(item && item->value) {
654
                        if(!strcasecmp(item->value, "unlimited")) {
655
                                /* No limit on threads, use a thread per connection */
656
                                threads = 0;
657
                        } else {
658
                                /* Use a thread pool */
659
                                threads = atoll(item->value);
660
                                if(threads == 0) {
661
                                        JANUS_LOG(LOG_WARN, "Chose '0' as size for the thread pool, which is equivalent to 'unlimited'\n");
662
                                } else if(threads < 0) {
663
                                        JANUS_LOG(LOG_WARN, "Invalid value '%"SCNi64"' as size for the thread pool, falling back to to 'unlimited'\n", threads);
664
                                        threads = 0;
665
                                }
666
                        }
667
                }
668
                item = janus_config_get_item_drilldown(config, "general", "http");
669
                if(!item || !item->value || !janus_is_true(item->value)) {
670
                        JANUS_LOG(LOG_WARN, "HTTP webserver disabled\n");
671
                } else {
672
                        int wsport = 8088;
673
                        item = janus_config_get_item_drilldown(config, "general", "port");
674
                        if(item && item->value)
675
                                wsport = atoi(item->value);
676
                        const char *interface = NULL;
677
                        item = janus_config_get_item_drilldown(config, "general", "interface");
678
                        if(item && item->value)
679
                                interface = item->value;
680
                        const char *ip = NULL;
681
                        item = janus_config_get_item_drilldown(config, "general", "ip");
682
                        if(item && item->value)
683
                                ip = item->value;
684
                        ws = janus_http_create_daemon(FALSE, ws_path, interface, ip, wsport, threads, NULL, NULL);
685
                        if(ws == NULL) {
686
                                JANUS_LOG(LOG_FATAL, "Couldn't start webserver on port %d...\n", wsport);
687
                        } else {
688
                                JANUS_LOG(LOG_INFO, "HTTP webserver started (port %d, %s path listener)...\n", wsport, ws_path);
689
                        }
690
                }
691
                /* Do we also have to provide an HTTPS one? */
692
                char *server_pem = NULL;
693
                item = janus_config_get_item_drilldown(config, "certificates", "cert_pem");
694
                if(item && item->value)
695
                        server_pem = (char *)item->value;
696
                char *server_key = NULL;
697
                item = janus_config_get_item_drilldown(config, "certificates", "cert_key");
698
                if(item && item->value)
699
                        server_key = (char *)item->value;
700
                if(server_key)
701
                        JANUS_LOG(LOG_VERB, "Using certificates:\n\t%s\n\t%s\n", server_pem, server_key);
702
                item = janus_config_get_item_drilldown(config, "general", "https");
703
                if(!item || !item->value || !janus_is_true(item->value)) {
704
                        JANUS_LOG(LOG_WARN, "HTTPS webserver disabled\n");
705
                } else {
706
                        if(!server_key || !server_pem) {
707
                                JANUS_LOG(LOG_FATAL, "Missing certificate/key path\n");
708
                        } else {
709
                                int swsport = 8089;
710
                                item = janus_config_get_item_drilldown(config, "general", "secure_port");
711
                                if(item && item->value)
712
                                        swsport = atoi(item->value);
713
                                const char *interface = NULL;
714
                                item = janus_config_get_item_drilldown(config, "general", "secure_interface");
715
                                if(item && item->value)
716
                                        interface = item->value;
717
                                const char *ip = NULL;
718
                                item = janus_config_get_item_drilldown(config, "general", "secure_ip");
719
                                if(item && item->value)
720
                                        ip = item->value;
721
                                sws = janus_http_create_daemon(FALSE, ws_path, interface, ip, swsport, threads, server_pem, server_key);
722
                                if(sws == NULL) {
723
                                        JANUS_LOG(LOG_FATAL, "Couldn't start secure webserver on port %d...\n", swsport);
724
                                } else {
725
                                        JANUS_LOG(LOG_INFO, "HTTPS webserver started (port %d, %s path listener)...\n", swsport, ws_path);
726
                                }
727
                        }
728
                }
729
                /* Admin/monitor time: start web server, if enabled */
730
                threads = 0;
731
                item = janus_config_get_item_drilldown(config, "admin", "admin_threads");
732
                if(item && item->value) {
733
                        if(!strcasecmp(item->value, "unlimited")) {
734
                                /* No limit on threads, use a thread per connection */
735
                                threads = 0;
736
                        } else {
737
                                /* Use a thread pool */
738
                                threads = atoll(item->value);
739
                                if(threads == 0) {
740
                                        JANUS_LOG(LOG_WARN, "Chose '0' as size for the admin/monitor thread pool, which is equivalent to 'unlimited'\n");
741
                                } else if(threads < 0) {
742
                                        JANUS_LOG(LOG_WARN, "Invalid value '%"SCNi64"' as size for the admin/monitor thread pool, falling back to to 'unlimited'\n", threads);
743
                                        threads = 0;
744
                                }
745
                        }
746
                }
747
                item = janus_config_get_item_drilldown(config, "admin", "admin_http");
748
                if(!item || !item->value || !janus_is_true(item->value)) {
749
                        JANUS_LOG(LOG_WARN, "Admin/monitor HTTP webserver disabled\n");
750
                } else {
751
                        int wsport = 7088;
752
                        item = janus_config_get_item_drilldown(config, "admin", "admin_port");
753
                        if(item && item->value)
754
                                wsport = atoi(item->value);
755
                        const char *interface = NULL;
756
                        item = janus_config_get_item_drilldown(config, "admin", "admin_interface");
757
                        if(item && item->value)
758
                                interface = item->value;
759
                        const char *ip = NULL;
760
                        item = janus_config_get_item_drilldown(config, "admin", "admin_ip");
761
                        if(item && item->value)
762
                                ip = item->value;
763
                        admin_ws = janus_http_create_daemon(TRUE, admin_ws_path, interface, ip, wsport, threads, NULL, NULL);
764
                        if(admin_ws == NULL) {
765
                                JANUS_LOG(LOG_FATAL, "Couldn't start admin/monitor webserver on port %d...\n", wsport);
766
                        } else {
767
                                JANUS_LOG(LOG_INFO, "Admin/monitor HTTP webserver started (port %d, %s path listener)...\n", wsport, admin_ws_path);
768
                        }
769
                }
770
                /* Do we also have to provide an HTTPS one? */
771
                item = janus_config_get_item_drilldown(config, "admin", "admin_https");
772
                if(!item || !item->value || !janus_is_true(item->value)) {
773
                        JANUS_LOG(LOG_WARN, "Admin/monitor HTTPS webserver disabled\n");
774
                } else {
775
                        if(!server_key) {
776
                                JANUS_LOG(LOG_FATAL, "Missing certificate/key path\n");
777
                        } else {
778
                                int swsport = 7889;
779
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_port");
780
                                if(item && item->value)
781
                                        swsport = atoi(item->value);
782
                                const char *interface = NULL;
783
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_interface");
784
                                if(item && item->value)
785
                                        interface = item->value;
786
                                const char *ip = NULL;
787
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_ip");
788
                                if(item && item->value)
789
                                        ip = item->value;
790
                                admin_sws = janus_http_create_daemon(TRUE, admin_ws_path, interface, ip, swsport, threads, server_pem, server_key);
791
                                if(admin_sws == NULL) {
792
                                        JANUS_LOG(LOG_FATAL, "Couldn't start secure admin/monitor webserver on port %d...\n", swsport);
793
                                } else {
794
                                        JANUS_LOG(LOG_INFO, "Admin/monitor HTTPS webserver started (port %d, %s path listener)...\n", swsport, admin_ws_path);
795
                                }
796
                        }
797
                }
798
        }
799
        janus_config_destroy(config);
800
        config = NULL;
801
        if(!ws && !sws && !admin_ws && !admin_sws) {
802
                JANUS_LOG(LOG_WARN, "No HTTP/HTTPS server started, giving up...\n");
803
                return -1;        /* No point in keeping the plugin loaded */
804
        }
805
        http_janus_api_enabled = ws || sws;
806
        http_admin_api_enabled = admin_ws || admin_sws;
807

    
808
        messages = g_hash_table_new(NULL, NULL);
809
        janus_mutex_init(&messages_mutex);
810
        sessions = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL);
811
        old_sessions = NULL;
812
        janus_mutex_init(&sessions_mutex);
813
        GError *error = NULL;
814
        /* Start the HTTP/Janus sessions watchdog */
815
        sessions_watchdog = g_thread_try_new("http watchdog", &janus_http_sessions_watchdog, NULL, &error);
816
        if(error != NULL) {
817
                g_atomic_int_set(&initialized, 0);
818
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the HTTP/Janus sessions watchdog thread...\n", error->code, error->message ? error->message : "??");
819
                return -1;
820
        }
821
        
822
        /* Done */
823
        g_atomic_int_set(&initialized, 1);
824
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_REST_NAME);
825
        return 0;
826
}
827

    
828
void janus_http_destroy(void) {
829
        if(!g_atomic_int_get(&initialized))
830
                return;
831
        g_atomic_int_set(&stopping, 1);
832

    
833
        JANUS_LOG(LOG_INFO, "Stopping webserver(s)...\n");
834
        if(ws)
835
                MHD_stop_daemon(ws);
836
        ws = NULL;
837
        if(sws)
838
                MHD_stop_daemon(sws);
839
        sws = NULL;
840
        if(admin_ws)
841
                MHD_stop_daemon(admin_ws);
842
        admin_ws = NULL;
843
        if(admin_sws)
844
                MHD_stop_daemon(admin_sws);
845
        admin_sws = NULL;
846
        if(cert_pem_bytes != NULL)
847
                g_free((gpointer)cert_pem_bytes);
848
        cert_pem_bytes = NULL;
849
        if(cert_key_bytes != NULL)
850
                g_free((gpointer)cert_key_bytes);
851
        cert_key_bytes = NULL;
852

    
853
        g_hash_table_destroy(messages);
854
        if(sessions_watchdog != NULL) {
855
                g_thread_join(sessions_watchdog);
856
                sessions_watchdog = NULL;
857
        }
858

    
859
        g_atomic_int_set(&initialized, 0);
860
        g_atomic_int_set(&stopping, 0);
861
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_REST_NAME);
862
}
863

    
864
int janus_http_get_api_compatibility(void) {
865
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
866
        return JANUS_TRANSPORT_API_VERSION;
867
}
868

    
869
int janus_http_get_version(void) {
870
        return JANUS_REST_VERSION;
871
}
872

    
873
const char *janus_http_get_version_string(void) {
874
        return JANUS_REST_VERSION_STRING;
875
}
876

    
877
const char *janus_http_get_description(void) {
878
        return JANUS_REST_DESCRIPTION;
879
}
880

    
881
const char *janus_http_get_name(void) {
882
        return JANUS_REST_NAME;
883
}
884

    
885
const char *janus_http_get_author(void) {
886
        return JANUS_REST_AUTHOR;
887
}
888

    
889
const char *janus_http_get_package(void) {
890
        return JANUS_REST_PACKAGE;
891
}
892

    
893
gboolean janus_http_is_janus_api_enabled(void) {
894
        return http_janus_api_enabled;
895
}
896

    
897
gboolean janus_http_is_admin_api_enabled(void) {
898
        return http_admin_api_enabled;
899
}
900

    
901
int janus_http_send_message(void *transport, void *request_id, gboolean admin, json_t *message) {
902
        JANUS_LOG(LOG_HUGE, "Got a %s API %s to send (%p)\n", admin ? "admin" : "Janus", request_id ? "response" : "event", transport);
903
        if(message == NULL) {
904
                JANUS_LOG(LOG_ERR, "No message...\n");
905
                return -1;
906
        }
907
        if(request_id == NULL) {
908
                /* This is an event, add to the session queue */
909
                json_t *s = json_object_get(message, "session_id");
910
                if(!s || !json_is_integer(s)) {
911
                        JANUS_LOG(LOG_ERR, "Can't notify event, no session_id...\n");
912
                        json_decref(message);
913
                        return -1;
914
                }
915
                guint64 session_id = json_integer_value(s);
916
                janus_mutex_lock(&sessions_mutex);
917
                janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
918
                if(session == NULL || session->destroyed) {
919
                        JANUS_LOG(LOG_ERR, "Can't notify event, no session object...\n");
920
                        janus_mutex_unlock(&sessions_mutex);
921
                        json_decref(message);
922
                        return -1;
923
                }
924
                g_async_queue_push(session->events, message);
925
                janus_mutex_unlock(&sessions_mutex);
926
        } else {
927
                if(request_id == keepalive_id) {
928
                        /* It's a response from our fake long-poll related keepalive, ignore */
929
                        json_decref(message);
930
                        return 0;
931
                }
932
                /* This is a response, we need a valid transport instance */
933
                if(transport == NULL) {
934
                        JANUS_LOG(LOG_ERR, "Invalid HTTP instance...\n");
935
                        json_decref(message);
936
                        return -1;
937
                }
938
                /* We have a response */
939
                janus_http_msg *msg = (janus_http_msg *)transport;
940
                janus_mutex_lock(&messages_mutex);
941
                if(g_hash_table_lookup(messages, msg) == NULL) {
942
                        janus_mutex_unlock(&messages_mutex);
943
                        JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");
944
                        json_decref(message);
945
                        return -1;
946
                }
947
                janus_mutex_unlock(&messages_mutex);
948
                if(!msg->connection) {
949
                        JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");
950
                        json_decref(message);
951
                        return -1;
952
                }
953
                janus_mutex_lock(&msg->wait_mutex);
954
                msg->response = message;
955
                msg->got_response = TRUE;
956
                janus_condition_signal(&msg->wait_cond);
957
                janus_mutex_unlock(&msg->wait_mutex);
958
        }
959
        return 0;
960
}
961

    
962
void janus_http_session_created(void *transport, guint64 session_id) {
963
        if(transport == NULL)
964
                return;
965
        JANUS_LOG(LOG_VERB, "Session created (%"SCNu64"), create a queue for the long poll\n", session_id);
966
        /* Create a queue of events for this session */
967
        janus_mutex_lock(&sessions_mutex);
968
        if(g_hash_table_lookup(sessions, &session_id) != NULL) {
969
                JANUS_LOG(LOG_WARN, "Ignoring created session, apparently we're already handling it?\n");
970
                janus_mutex_unlock(&sessions_mutex);
971
                return;
972
        }
973
        janus_http_session *session = g_malloc0(sizeof(janus_http_session));
974
        session->events = g_async_queue_new();
975
        session->destroyed = 0;
976
        g_hash_table_insert(sessions, janus_uint64_dup(session_id), session);
977
        janus_mutex_unlock(&sessions_mutex);
978
}
979

    
980
void janus_http_session_over(void *transport, guint64 session_id, gboolean timeout) {
981
        if(transport == NULL)
982
                return;
983
        JANUS_LOG(LOG_VERB, "Session %s (%"SCNu64"), getting rid of the queue for the long poll\n",
984
                timeout ? "has timed out" : "is over", session_id);
985
        /* Get rid of the session's queue of events */
986
        janus_mutex_lock(&sessions_mutex);
987
        janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
988
        if(session == NULL || session->destroyed) {
989
                /* Nothing to do */
990
                janus_mutex_unlock(&sessions_mutex);
991
                return;
992
        }
993
        g_hash_table_remove(sessions, &session_id);
994
        /* We leave it to the watchdog to remove the session */
995
        session->destroyed = janus_get_monotonic_time();
996
        old_sessions = g_list_append(old_sessions, session);
997
        janus_mutex_unlock(&sessions_mutex);
998
}
999

    
1000
/* Connection notifiers */
1001
int janus_http_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
1002
        janus_network_address naddr;
1003
        janus_network_address_string_buffer naddr_buf;
1004
        if(janus_network_address_from_sockaddr((struct sockaddr *)addr, &naddr) != 0 ||
1005
                        janus_network_address_to_string_buffer(&naddr, &naddr_buf) != 0) {
1006
                JANUS_LOG(LOG_WARN, "Error trying to resolve connection address...\n");
1007
                /* Should this be MHD_NO instead? */
1008
                return MHD_YES;
1009
        }
1010
        const char *ip = janus_network_address_string_from_buffer(&naddr_buf);
1011
        JANUS_LOG(LOG_HUGE, "New connection on REST API: %s\n", ip);
1012
        /* Any access limitation based on this IP address? */
1013
        if(!janus_http_is_allowed(ip, FALSE)) {
1014
                JANUS_LOG(LOG_ERR, "IP %s is unauthorized to connect to the Janus API interface\n", ip);
1015
                return MHD_NO;
1016
        }
1017
        return MHD_YES;
1018
}
1019

    
1020
int janus_http_admin_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
1021
        janus_network_address naddr;
1022
        janus_network_address_string_buffer naddr_buf;
1023
        if(janus_network_address_from_sockaddr((struct sockaddr *)addr, &naddr) != 0 ||
1024
                        janus_network_address_to_string_buffer(&naddr, &naddr_buf) != 0) {
1025
                JANUS_LOG(LOG_WARN, "Error trying to resolve Admin connection address...\n");
1026
                /* Should this be MHD_NO instead? */
1027
                return MHD_YES;
1028
        }
1029
        const char *ip = janus_network_address_string_from_buffer(&naddr_buf);
1030
        JANUS_LOG(LOG_HUGE, "New connection on admin/monitor: %s\n", ip);
1031
        /* Any access limitation based on this IP address? */
1032
        if(!janus_http_is_allowed(ip, TRUE)) {
1033
                JANUS_LOG(LOG_ERR, "IP %s is unauthorized to connect to the admin/monitor interface\n", ip);
1034
                return MHD_NO;
1035
        }
1036
        return MHD_YES;
1037
}
1038

    
1039

    
1040
/* WebServer requests handler */
1041
int janus_http_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
1042
{
1043
        char *payload = NULL;
1044
        json_t *root = NULL;
1045
        struct MHD_Response *response = NULL;
1046
        int ret = MHD_NO;
1047
        gchar *session_path = NULL, *handle_path = NULL;
1048
        gchar **basepath = NULL, **path = NULL;
1049
        guint64 session_id = 0, handle_id = 0;
1050

    
1051
        /* Is this the first round? */
1052
        int firstround = 0;
1053
        janus_http_msg *msg = (janus_http_msg *)*ptr;
1054
        if (msg == NULL) {
1055
                firstround = 1;
1056
                JANUS_LOG(LOG_DBG, "Got a HTTP %s request on %s...\n", method, url);
1057
                JANUS_LOG(LOG_DBG, " ... Just parsing headers for now...\n");
1058
                msg = g_malloc0(sizeof(janus_http_msg));
1059
                if(msg == NULL) {
1060
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1061
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1062
                        MHD_destroy_response(response);
1063
                        goto done;
1064
                }
1065
                msg->connection = connection;
1066
                msg->acrh = NULL;
1067
                msg->acrm = NULL;
1068
                msg->payload = NULL;
1069
                msg->len = 0;
1070
                msg->session_id = 0;
1071
                msg->got_response = FALSE;
1072
                msg->response = NULL;
1073
                janus_mutex_init(&msg->wait_mutex);
1074
                janus_condition_init(&msg->wait_cond);
1075
                janus_mutex_lock(&messages_mutex);
1076
                g_hash_table_insert(messages, msg, msg);
1077
                janus_mutex_unlock(&messages_mutex);
1078
                *ptr = msg;
1079
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
1080
                ret = MHD_YES;
1081
                /* Notify handlers about this new transport instance */
1082
                if(notify_events && gateway->events_is_enabled()) {
1083
                        json_t *info = json_object();
1084
                        json_object_set_new(info, "event", json_string("request"));
1085
                        json_object_set_new(info, "admin_api", json_false());
1086
                        const union MHD_ConnectionInfo *conninfo = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_CLIENT_ADDRESS);
1087
                        if(conninfo != NULL) {
1088
                                janus_network_address addr;
1089
                                janus_network_address_string_buffer addr_buf;
1090
                                if(janus_network_address_from_sockaddr((struct sockaddr *)conninfo->client_addr, &addr) == 0 &&
1091
                                                janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) {
1092
                                        const char *ip = janus_network_address_string_from_buffer(&addr_buf);
1093
                                        json_object_set_new(info, "ip", json_string(ip));
1094
                                }
1095
                                uint16_t port = janus_http_sockaddr_to_port((struct sockaddr *)conninfo->client_addr);
1096
                                json_object_set_new(info, "port", json_integer(port));
1097
                        }
1098
                        gateway->notify_event(&janus_http_transport, msg, info);
1099
                }
1100
        } else {
1101
                JANUS_LOG(LOG_DBG, "Processing HTTP %s request on %s...\n", method, url);
1102
        }
1103
        /* Parse request */
1104
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
1105
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_TRANSPORT_SPECIFIC, "Unsupported method %s", method);
1106
                goto done;
1107
        }
1108
        if (!strcasecmp(method, "OPTIONS")) {
1109
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1110
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1111
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1112
                if(msg->acrm)
1113
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1114
                if(msg->acrh)
1115
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1116
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
1117
                MHD_destroy_response(response);
1118
        }
1119
        /* Get path components */
1120
        if(strcasecmp(url, ws_path)) {
1121
                if(strlen(ws_path) > 1) {
1122
                        basepath = g_strsplit(url, ws_path, -1);
1123
                } else {
1124
                        /* The base path is the web server too itself, we process the url itself */
1125
                        basepath = g_malloc0(3);
1126
                        basepath[0] = g_strdup("/");
1127
                        basepath[1] = g_strdup(url);
1128
                }
1129
                if(basepath[0] == NULL || basepath[1] == NULL || basepath[1][0] != '/') {
1130
                        JANUS_LOG(LOG_ERR, "Invalid url %s\n", url);
1131
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1132
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1133
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1134
                        if(msg->acrm)
1135
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1136
                        if(msg->acrh)
1137
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1138
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1139
                        MHD_destroy_response(response);
1140
                }
1141
                if(firstround) {
1142
                        g_strfreev(basepath);
1143
                        return ret;
1144
                }
1145
                path = g_strsplit(basepath[1], "/", -1);
1146
                if(path == NULL || path[1] == NULL) {
1147
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
1148
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1149
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1150
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1151
                        if(msg->acrm)
1152
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1153
                        if(msg->acrh)
1154
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1155
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1156
                        MHD_destroy_response(response);
1157
                }
1158
        }
1159
        if(firstround)
1160
                return ret;
1161
        JANUS_LOG(LOG_DBG, " ... parsing request...\n");
1162
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
1163
                session_path = g_strdup(path[1]);
1164
                if(session_path == NULL) {
1165
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1166
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1167
                        MHD_destroy_response(response);
1168
                        goto done;
1169
                }
1170
                JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);
1171
        }
1172
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
1173
                handle_path = g_strdup(path[2]);
1174
                if(handle_path == NULL) {
1175
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1176
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1177
                        MHD_destroy_response(response);
1178
                        goto done;
1179
                }
1180
                JANUS_LOG(LOG_HUGE, "Handle: %s\n", handle_path);
1181
        }
1182
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
1183
                JANUS_LOG(LOG_ERR, "Too many components...\n");
1184
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1185
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1186
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1187
                if(msg->acrm)
1188
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1189
                if(msg->acrh)
1190
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1191
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1192
                MHD_destroy_response(response);
1193
                goto done;
1194
        }
1195
        /* Get payload, if any */
1196
        if(!strcasecmp(method, "POST")) {
1197
                JANUS_LOG(LOG_HUGE, "Processing POST data (%s) (%zu bytes)...\n", msg->contenttype, *upload_data_size);
1198
                if(*upload_data_size != 0) {
1199
                        if(msg->payload == NULL)
1200
                                msg->payload = g_malloc0(*upload_data_size+1);
1201
                        else
1202
                                msg->payload = g_realloc(msg->payload, msg->len+*upload_data_size+1);
1203
                        if(msg->payload == NULL) {
1204
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1205
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1206
                                MHD_destroy_response(response);
1207
                                goto done;
1208
                        }
1209
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
1210
                        msg->len += *upload_data_size;
1211
                        memset(msg->payload + msg->len, '\0', 1);
1212
                        JANUS_LOG(LOG_DBG, "  -- Data we have now (%zu bytes)\n", msg->len);
1213
                        *upload_data_size = 0;        /* Go on */
1214
                        ret = MHD_YES;
1215
                        goto done;
1216
                }
1217
                JANUS_LOG(LOG_DBG, "Done getting payload, we can answer\n");
1218
                if(msg->payload == NULL) {
1219
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
1220
                        ret = MHD_NO;
1221
                        goto done;
1222
                }
1223
                payload = msg->payload;
1224
                JANUS_LOG(LOG_HUGE, "%s\n", payload);
1225
        }
1226

    
1227
        /* Is this a generic request for info? */
1228
        if(session_path != NULL && !strcmp(session_path, "info")) {
1229
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
1230
                if(strcasecmp(method, "GET")) {
1231
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1232
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1233
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1234
                        if(msg->acrm)
1235
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1236
                        if(msg->acrh)
1237
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1238
                        ret = MHD_queue_response(connection, MHD_HTTP_BAD_REQUEST, response);
1239
                        MHD_destroy_response(response);
1240
                        goto done;
1241
                }
1242
                /* Turn this into a fake "info" request */
1243
                method = "POST";
1244
                char tr[12];
1245
                janus_http_random_string(12, (char *)&tr);                
1246
                root = json_object();
1247
                json_object_set_new(root, "janus", json_string("info"));
1248
                json_object_set_new(root, "transaction", json_string(tr));
1249
                goto parsingdone;
1250
        }
1251
        
1252
        /* Or maybe a long poll */
1253
        if(!strcasecmp(method, "GET") || !payload) {
1254
                session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1255
                if(session_id < 1) {
1256
                        JANUS_LOG(LOG_ERR, "Invalid session %s\n", session_path);
1257
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1258
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1259
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1260
                        if(msg->acrm)
1261
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1262
                        if(msg->acrh)
1263
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1264
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1265
                        MHD_destroy_response(response);
1266
                        goto done;
1267
                }
1268
                msg->session_id = session_id;
1269

    
1270
                /* Since we handle long polls ourselves, the core isn't involved (if not for providing us with events)
1271
                 * A long poll, though, can act as a keepalive, so we pass a fake one to the core to avoid undesirable timeouts */
1272

    
1273
                /* First of all, though, API secret and token based authentication may be enabled in the core, so since
1274
                 * we're bypassing it for notifications we'll have to check those ourselves */
1275
                const char *secret = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "apisecret");
1276
                const char *token = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "token");
1277
                gboolean secret_authorized = FALSE, token_authorized = FALSE;
1278
                if(!gateway->is_api_secret_needed(&janus_http_transport) && !gateway->is_auth_token_needed(&janus_http_transport)) {
1279
                        /* Nothing to check */
1280
                        secret_authorized = TRUE;
1281
                        token_authorized = TRUE;
1282
                } else {
1283
                        if(gateway->is_api_secret_valid(&janus_http_transport, secret)) {
1284
                                /* API secret is valid */
1285
                                secret_authorized = TRUE;
1286
                        }
1287
                        if(gateway->is_auth_token_valid(&janus_http_transport, token)) {
1288
                                /* Token is valid */
1289
                                token_authorized = TRUE;
1290
                        }
1291
                        /* We consider a request authorized if either the proper API secret or a valid token has been provided */
1292
                        if(!secret_authorized && !token_authorized) {
1293
                                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1294
                                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1295
                                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1296
                                if(msg->acrm)
1297
                                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1298
                                if(msg->acrh)
1299
                                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1300
                                ret = MHD_queue_response(connection, MHD_HTTP_FORBIDDEN, response);
1301
                                MHD_destroy_response(response);
1302
                                goto done;
1303
                        }
1304
                }
1305
                /* Ok, go on with the keepalive */
1306
                char tr[12];
1307
                janus_http_random_string(12, (char *)&tr);                
1308
                root = json_object();
1309
                json_object_set_new(root, "janus", json_string("keepalive"));
1310
                json_object_set_new(root, "session_id", json_integer(session_id));
1311
                json_object_set_new(root, "transaction", json_string(tr));
1312
                if(secret)
1313
                        json_object_set_new(root, "apisecret", json_string(secret));
1314
                if(token)
1315
                        json_object_set_new(root, "token", json_string(token));
1316
                gateway->incoming_request(&janus_http_transport, msg, (void *)keepalive_id, FALSE, root, NULL);
1317
                /* Ok, go on */
1318
                if(handle_path) {
1319
                        char *location = (char *)g_malloc0(strlen(ws_path) + strlen(session_path) + 2);
1320
                        g_sprintf(location, "%s/%s", ws_path, session_path);
1321
                        JANUS_LOG(LOG_ERR, "Invalid GET to %s, redirecting to %s\n", url, location);
1322
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1323
                        MHD_add_response_header(response, "Location", location);
1324
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1325
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1326
                        if(msg->acrm)
1327
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1328
                        if(msg->acrh)
1329
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1330
                        ret = MHD_queue_response(connection, 302, response);
1331
                        MHD_destroy_response(response);
1332
                        g_free(location);
1333
                        goto done;
1334
                }
1335
                janus_mutex_lock(&sessions_mutex);
1336
                janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
1337
                janus_mutex_unlock(&sessions_mutex);
1338
                if(!session || session->destroyed) {
1339
                        JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
1340
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1341
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1342
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1343
                        if(msg->acrm)
1344
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1345
                        if(msg->acrh)
1346
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1347
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1348
                        MHD_destroy_response(response);
1349
                        goto done;
1350
                }
1351
                /* How many messages can we send back in a single response? (just one by default) */
1352
                int max_events = 1;
1353
                const char *maxev = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "maxev");
1354
                if(maxev != NULL) {
1355
                        max_events = atoi(maxev);
1356
                        if(max_events < 1) {
1357
                                JANUS_LOG(LOG_WARN, "Invalid maxev parameter passed (%d), defaulting to 1\n", max_events);
1358
                                max_events = 1;
1359
                        }
1360
                }
1361
                JANUS_LOG(LOG_VERB, "Session %"SCNu64" found... returning up to %d messages\n", session_id, max_events);
1362
                /* Handle GET, taking the first message from the list */
1363
                json_t *event = g_async_queue_try_pop(session->events);
1364
                if(event != NULL) {
1365
                        if(max_events == 1) {
1366
                                /* Return just this message and leave */
1367
                                gchar *event_text = json_dumps(event, json_format);
1368
                                json_decref(event);
1369
                                ret = janus_http_return_success(msg, event_text);
1370
                        } else {
1371
                                /* The application is willing to receive more events at the same time, anything to report? */
1372
                                json_t *list = json_array();
1373
                                json_array_append_new(list, event);
1374
                                int events = 1;
1375
                                while(events < max_events) {
1376
                                        event = g_async_queue_try_pop(session->events);
1377
                                        if(event == NULL)
1378
                                                break;
1379
                                        json_array_append_new(list, event);
1380
                                        events++;
1381
                                }
1382
                                /* Return the array of messages and leave */
1383
                                gchar *list_text = json_dumps(list, json_format);
1384
                                json_decref(list);
1385
                                ret = janus_http_return_success(msg, list_text);
1386
                        }
1387
                } else {
1388
                        /* Still no message, wait */
1389
                        ret = janus_http_notifier(msg, max_events);
1390
                }
1391
                goto done;
1392
        }
1393
        
1394
        json_error_t error;
1395
        /* Parse the JSON payload */
1396
        root = json_loads(payload, 0, &error);
1397
        if(!root) {
1398
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
1399
                goto done;
1400
        }
1401
        if(!json_is_object(root)) {
1402
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
1403
                json_decref(root);
1404
                goto done;
1405
        }
1406

    
1407
parsingdone:
1408
        /* Check if we have session and handle identifiers */
1409
        session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1410
        handle_id = handle_path ? g_ascii_strtoull(handle_path, NULL, 10) : 0;
1411
        if(session_id > 0)
1412
                json_object_set_new(root, "session_id", json_integer(session_id));
1413
        if(handle_id > 0)
1414
                json_object_set_new(root, "handle_id", json_integer(handle_id));
1415

    
1416
        /* Suspend the connection and pass the ball to the core */
1417
        JANUS_LOG(LOG_HUGE, "Forwarding request to the core (%p)\n", msg);
1418
        gateway->incoming_request(&janus_http_transport, msg, msg, FALSE, root, &error);
1419
        /* Wait for a response (but not forever) */
1420
        struct timeval now;
1421
        gettimeofday(&now, NULL);
1422
        struct timespec wakeup;
1423
        wakeup.tv_sec = now.tv_sec+10;        /* Wait at max 10 seconds for a response */
1424
        wakeup.tv_nsec = now.tv_usec*1000UL;
1425
        pthread_mutex_lock(&msg->wait_mutex);
1426
        while(!msg->got_response) {
1427
                int res = pthread_cond_timedwait(&msg->wait_cond, &msg->wait_mutex, &wakeup);
1428
                if(msg->got_response || res == ETIMEDOUT)
1429
                        break;
1430
        }
1431
        pthread_mutex_unlock(&msg->wait_mutex);
1432
        if(!msg->response) {
1433
                ret = MHD_NO;
1434
        } else {
1435
                char *response_text = json_dumps(msg->response, json_format);
1436
                json_decref(msg->response);
1437
                msg->response = NULL;
1438
                ret = janus_http_return_success(msg, response_text);
1439
        }
1440

    
1441
done:
1442
        g_strfreev(basepath);
1443
        g_strfreev(path);
1444
        g_free(session_path);
1445
        g_free(handle_path);
1446
        return ret;
1447
}
1448

    
1449
/* Admin/monitor WebServer requests handler */
1450
int janus_http_admin_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
1451
{
1452
        char *payload = NULL;
1453
        json_t *root = NULL;
1454
        struct MHD_Response *response = NULL;
1455
        int ret = MHD_NO;
1456
        gchar *session_path = NULL, *handle_path = NULL;
1457
        gchar **basepath = NULL, **path = NULL;
1458
        guint64 session_id = 0, handle_id = 0;
1459

    
1460
        /* Is this the first round? */
1461
        int firstround = 0;
1462
        janus_http_msg *msg = (janus_http_msg *)*ptr;
1463
        if (msg == NULL) {
1464
                firstround = 1;
1465
                JANUS_LOG(LOG_VERB, "Got an admin/monitor HTTP %s request on %s...\n", method, url);
1466
                JANUS_LOG(LOG_DBG, " ... Just parsing headers for now...\n");
1467
                msg = g_malloc0(sizeof(janus_http_msg));
1468
                if(msg == NULL) {
1469
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1470
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1471
                        MHD_destroy_response(response);
1472
                        goto done;
1473
                }
1474
                msg->connection = connection;
1475
                msg->acrh = NULL;
1476
                msg->acrm = NULL;
1477
                msg->payload = NULL;
1478
                msg->len = 0;
1479
                msg->session_id = 0;
1480
                msg->got_response = FALSE;
1481
                msg->response = NULL;
1482
                janus_mutex_init(&msg->wait_mutex);
1483
                janus_condition_init(&msg->wait_cond);
1484
                janus_mutex_lock(&messages_mutex);
1485
                g_hash_table_insert(messages, msg, msg);
1486
                janus_mutex_unlock(&messages_mutex);
1487
                *ptr = msg;
1488
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
1489
                ret = MHD_YES;
1490
                /* Notify handlers about this new transport instance */
1491
                if(notify_events && gateway->events_is_enabled()) {
1492
                        json_t *info = json_object();
1493
                        json_object_set_new(info, "event", json_string("request"));
1494
                        json_object_set_new(info, "admin_api", json_true());
1495
                        const union MHD_ConnectionInfo *conninfo = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_CLIENT_ADDRESS);
1496
                        if(conninfo != NULL) {
1497
                                janus_network_address addr;
1498
                                janus_network_address_string_buffer addr_buf;
1499
                                if(janus_network_address_from_sockaddr((struct sockaddr *)conninfo->client_addr, &addr) == 0 &&
1500
                                                janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) {
1501
                                        const char *ip = janus_network_address_string_from_buffer(&addr_buf);
1502
                                        json_object_set_new(info, "ip", json_string(ip));
1503
                                }
1504
                                uint16_t port = janus_http_sockaddr_to_port((struct sockaddr *)conninfo->client_addr);
1505
                                json_object_set_new(info, "port", json_integer(port));
1506
                        }
1507
                        gateway->notify_event(&janus_http_transport, msg, info);
1508
                }
1509
        }
1510
        /* Parse request */
1511
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
1512
                JANUS_LOG(LOG_ERR, "Unsupported method...\n");
1513
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1514
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1515
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1516
                if(msg->acrm)
1517
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1518
                if(msg->acrh)
1519
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1520
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_IMPLEMENTED, response);
1521
                MHD_destroy_response(response);
1522
                return ret;
1523
        }
1524
        if (!strcasecmp(method, "OPTIONS")) {
1525
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1526
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1527
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1528
                if(msg->acrm)
1529
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1530
                if(msg->acrh)
1531
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1532
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
1533
                MHD_destroy_response(response);
1534
        }
1535
        /* Get path components */
1536
        if(strcasecmp(url, admin_ws_path)) {
1537
                if(strlen(admin_ws_path) > 1) {
1538
                        basepath = g_strsplit(url, admin_ws_path, -1);
1539
                } else {
1540
                        /* The base path is the web server too itself, we process the url itself */
1541
                        basepath = g_malloc0(3);
1542
                        basepath[0] = g_strdup("/");
1543
                        basepath[1] = g_strdup(url);
1544
                }
1545
                if(basepath[0] == NULL || basepath[1] == NULL || basepath[1][0] != '/') {
1546
                        JANUS_LOG(LOG_ERR, "Invalid url %s\n", url);
1547
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1548
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1549
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1550
                        if(msg->acrm)
1551
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1552
                        if(msg->acrh)
1553
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1554
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1555
                        MHD_destroy_response(response);
1556
                }
1557
                if(firstround) {
1558
                        g_strfreev(basepath);
1559
                        return ret;
1560
                }
1561
                path = g_strsplit(basepath[1], "/", -1);
1562
                if(path == NULL || path[1] == NULL) {
1563
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
1564
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1565
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1566
                        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1567
                        if(msg->acrm)
1568
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1569
                        if(msg->acrh)
1570
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1571
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1572
                        MHD_destroy_response(response);
1573
                }
1574
        }
1575
        if(firstround)
1576
                return ret;
1577
        JANUS_LOG(LOG_DBG, " ... parsing request...\n");
1578
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
1579
                session_path = g_strdup(path[1]);
1580
                if(session_path == NULL) {
1581
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1582
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1583
                        MHD_destroy_response(response);
1584
                        goto done;
1585
                }
1586
                JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);
1587
        }
1588
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
1589
                handle_path = g_strdup(path[2]);
1590
                if(handle_path == NULL) {
1591
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1592
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1593
                        MHD_destroy_response(response);
1594
                        goto done;
1595
                }
1596
                JANUS_LOG(LOG_HUGE, "Handle: %s\n", handle_path);
1597
        }
1598
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
1599
                JANUS_LOG(LOG_ERR, "Too many components...\n");
1600
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1601
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1602
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1603
                if(msg->acrm)
1604
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1605
                if(msg->acrh)
1606
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1607
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1608
                MHD_destroy_response(response);
1609
                goto done;
1610
        }
1611
        /* Get payload, if any */
1612
        if(!strcasecmp(method, "POST")) {
1613
                JANUS_LOG(LOG_HUGE, "Processing POST data (%s) (%zu bytes)...\n", msg->contenttype, *upload_data_size);
1614
                if(*upload_data_size != 0) {
1615
                        if(msg->payload == NULL)
1616
                                msg->payload = g_malloc0(*upload_data_size+1);
1617
                        else
1618
                                msg->payload = g_realloc(msg->payload, msg->len+*upload_data_size+1);
1619
                        if(msg->payload == NULL) {
1620
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1621
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1622
                                MHD_destroy_response(response);
1623
                                goto done;
1624
                        }
1625
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
1626
                        msg->len += *upload_data_size;
1627
                        memset(msg->payload + msg->len, '\0', 1);
1628
                        JANUS_LOG(LOG_DBG, "  -- Data we have now (%zu bytes)\n", msg->len);
1629
                        *upload_data_size = 0;        /* Go on */
1630
                        ret = MHD_YES;
1631
                        goto done;
1632
                }
1633
                JANUS_LOG(LOG_DBG, "Done getting payload, we can answer\n");
1634
                if(msg->payload == NULL) {
1635
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
1636
                        ret = MHD_NO;
1637
                        goto done;
1638
                }
1639
                payload = msg->payload;
1640
                JANUS_LOG(LOG_HUGE, "%s\n", payload);
1641
        }
1642

    
1643
        /* Is this a generic request for info? */
1644
        if(session_path != NULL && !strcmp(session_path, "info")) {
1645
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
1646
                if(strcasecmp(method, "GET")) {
1647
                        ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_TRANSPORT_SPECIFIC, "Use GET for the info endpoint");
1648
                        goto done;
1649
                }
1650
                /* Turn this into a fake "info" request */
1651
                method = "POST";
1652
                char tr[12];
1653
                janus_http_random_string(12, (char *)&tr);                
1654
                root = json_object();
1655
                json_object_set_new(root, "janus", json_string("info"));
1656
                json_object_set_new(root, "transaction", json_string(tr));
1657
                goto parsingdone;
1658
        }
1659
        
1660
        /* Without a payload we don't know what to do */
1661
        if(!payload) {
1662
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "Request payload missing");
1663
                goto done;
1664
        }
1665
        json_error_t error;
1666
        /* Parse the JSON payload */
1667
        root = json_loads(payload, 0, &error);
1668
        if(!root) {
1669
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
1670
                goto done;
1671
        }
1672
        if(!json_is_object(root)) {
1673
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
1674
                json_decref(root);
1675
                goto done;
1676
        }
1677

    
1678
parsingdone:
1679
        /* Check if we have session and handle identifiers */
1680
        session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1681
        handle_id = handle_path ? g_ascii_strtoull(handle_path, NULL, 10) : 0;
1682
        if(session_id > 0)
1683
                json_object_set_new(root, "session_id", json_integer(session_id));
1684
        if(handle_id > 0)
1685
                json_object_set_new(root, "handle_id", json_integer(handle_id));
1686

    
1687
        /* Suspend the connection and pass the ball to the core */
1688
        JANUS_LOG(LOG_HUGE, "Forwarding admin request to the core (%p)\n", msg);
1689
        gateway->incoming_request(&janus_http_transport, msg, msg, TRUE, root, &error);
1690
        /* Wait for a response (but not forever) */
1691
        struct timeval now;
1692
        gettimeofday(&now, NULL);
1693
        struct timespec wakeup;
1694
        wakeup.tv_sec = now.tv_sec+10;        /* Wait at max 10 seconds for a response */
1695
        wakeup.tv_nsec = now.tv_usec*1000UL;
1696
        pthread_mutex_lock(&msg->wait_mutex);
1697
        while(!msg->got_response) {
1698
                int res = pthread_cond_timedwait(&msg->wait_cond, &msg->wait_mutex, &wakeup);
1699
                if(msg->got_response || res == ETIMEDOUT)
1700
                        break;
1701
        }
1702
        pthread_mutex_unlock(&msg->wait_mutex);
1703
        if(!msg->response) {
1704
                ret = MHD_NO;
1705
        } else {
1706
                char *response_text = json_dumps(msg->response, json_format);
1707
                json_decref(msg->response);
1708
                msg->response = NULL;
1709
                ret = janus_http_return_success(msg, response_text);
1710
        }
1711

    
1712
done:
1713
        g_strfreev(basepath);
1714
        g_strfreev(path);
1715
        g_free(session_path);
1716
        g_free(handle_path);
1717
        return ret;
1718
}
1719

    
1720
int janus_http_headers(void *cls, enum MHD_ValueKind kind, const char *key, const char *value) {
1721
        janus_http_msg *request = cls;
1722
        JANUS_LOG(LOG_DBG, "%s: %s\n", key, value);
1723
        if(!strcasecmp(key, MHD_HTTP_HEADER_CONTENT_TYPE)) {
1724
                if(request)
1725
                        request->contenttype = strdup(value);
1726
        } else if(!strcasecmp(key, "Access-Control-Request-Method")) {
1727
                if(request)
1728
                        request->acrm = strdup(value);
1729
        } else if(!strcasecmp(key, "Access-Control-Request-Headers")) {
1730
                if(request)
1731
                        request->acrh = strdup(value);
1732
        }
1733
        return MHD_YES;
1734
}
1735

    
1736
void janus_http_request_completed(void *cls, struct MHD_Connection *connection, void **con_cls, enum MHD_RequestTerminationCode toe) {
1737
        JANUS_LOG(LOG_DBG, "Request completed, freeing data\n");
1738
        janus_http_msg *request = *con_cls;
1739
        if(!request)
1740
                return;
1741
        janus_mutex_lock(&messages_mutex);
1742
        g_hash_table_remove(messages, request);
1743
        janus_mutex_unlock(&messages_mutex);
1744
        if(request->payload != NULL)
1745
                g_free(request->payload);
1746
        if(request->contenttype != NULL)
1747
                free(request->contenttype);
1748
        if(request->acrh != NULL)
1749
                g_free(request->acrh);
1750
        if(request->acrm != NULL)
1751
                g_free(request->acrm);
1752
        g_free(request);
1753
        *con_cls = NULL;   
1754
}
1755

    
1756
/* Worker to handle notifications */
1757
int janus_http_notifier(janus_http_msg *msg, int max_events) {
1758
        if(!msg || !msg->connection)
1759
                return MHD_NO;
1760
        struct MHD_Connection *connection = msg->connection;
1761
        if(max_events < 1)
1762
                max_events = 1;
1763
        JANUS_LOG(LOG_DBG, "... handling long poll...\n");
1764
        struct MHD_Response *response = NULL;
1765
        int ret = MHD_NO;
1766
        guint64 session_id = msg->session_id;
1767
        janus_mutex_lock(&sessions_mutex);
1768
        janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
1769
        janus_mutex_unlock(&sessions_mutex);
1770
        if(!session || session->destroyed) {
1771
                JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
1772
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1773
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1774
                MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1775
                if(msg->acrm)
1776
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1777
                if(msg->acrh)
1778
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1779
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1780
                MHD_destroy_response(response);
1781
                return ret;
1782
        }
1783
        gint64 start = janus_get_monotonic_time();
1784
        gint64 end = 0;
1785
        json_t *event = NULL, *list = NULL;
1786
        gboolean found = FALSE;
1787
        /* We have a timeout for the long poll: 30 seconds */
1788
        while(end-start < 30*G_USEC_PER_SEC) {
1789
                if(session->destroyed)
1790
                        break;
1791
                event = g_async_queue_try_pop(session->events);
1792
                if(session->destroyed || g_atomic_int_get(&stopping) || event != NULL) {
1793
                        if(event == NULL)
1794
                                break;
1795
                        /* Gotcha! */
1796
                        found = TRUE;
1797
                        if(max_events == 1) {
1798
                                break;
1799
                        } else {
1800
                                /* The application is willing to receive more events at the same time, anything to report? */
1801
                                list = json_array();
1802
                                json_array_append_new(list, event);
1803
                                int events = 1;
1804
                                while(events < max_events) {
1805
                                        event = g_async_queue_try_pop(session->events);
1806
                                        if(event == NULL)
1807
                                                break;
1808
                                        json_array_append_new(list, event);
1809
                                        events++;
1810
                                }
1811
                                break;
1812
                        }
1813
                }
1814
                /* Sleep 100ms */
1815
                g_usleep(100000);
1816
                end = janus_get_monotonic_time();
1817
        }
1818
        if((max_events == 1 && event == NULL) || (max_events > 1 && list == NULL))
1819
                found = FALSE;
1820
        if(!found) {
1821
                JANUS_LOG(LOG_VERB, "Long poll time out for session %"SCNu64"...\n", session_id);
1822
                /* Turn this into a "keepalive" response */
1823
                char tr[12];
1824
                janus_http_random_string(12, (char *)&tr);
1825
                if(max_events == 1) {
1826
                        event = json_object();
1827
                        json_object_set_new(event, "janus", json_string("keepalive"));
1828
                } else {
1829
                        list = json_array();
1830
                        event = json_object();
1831
                        json_object_set_new(event, "janus", json_string("keepalive"));
1832
                        json_array_append_new(list, event);
1833
                }
1834
                /* FIXME Improve the Janus protocol keep-alive mechanism in JavaScript */
1835
        }
1836
        char *payload_text = json_dumps(max_events == 1 ? event : list, json_format);
1837
        json_decref(max_events == 1 ? event : list);
1838
        /* Finish the request by sending the response */
1839
        JANUS_LOG(LOG_HUGE, "We have a message to serve...\n\t%s\n", payload_text);
1840
        /* Send event */
1841
        ret = janus_http_return_success(msg, payload_text);
1842
        return ret;
1843
}
1844

    
1845
/* Helper to quickly send a success response */
1846
int janus_http_return_success(janus_http_msg *msg, char *payload) {
1847
        if(!msg || !msg->connection) {
1848
                if(payload)
1849
                        free(payload);
1850
                return MHD_NO;
1851
        }
1852
        struct MHD_Response *response = MHD_create_response_from_buffer(
1853
                payload ? strlen(payload) : 0,
1854
                (void*)payload,
1855
                MHD_RESPMEM_MUST_FREE);
1856
        MHD_add_response_header(response, "Content-Type", "application/json");
1857
        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1858
        MHD_add_response_header(response, "Access-Control-Max-Age", "86400");
1859
        if(msg->acrm)
1860
                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1861
        if(msg->acrh)
1862
                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1863
        int ret = MHD_queue_response(msg->connection, MHD_HTTP_OK, response);
1864
        MHD_destroy_response(response);
1865
        return ret;
1866
}
1867

    
1868
/* Helper to quickly send an error response */
1869
int janus_http_return_error(janus_http_msg *msg, uint64_t session_id, const char *transaction, gint error, const char *format, ...) {
1870
        gchar *error_string = NULL;
1871
        gchar error_buf[512];
1872
        if(format == NULL) {
1873
                /* No error string provided, use the default one */
1874
                error_string = (gchar *)janus_get_api_error(error);
1875
        } else {
1876
                /* This callback has variable arguments (error string) */
1877
                va_list ap;
1878
                va_start(ap, format);
1879
                g_vsnprintf(error_buf, 512, format, ap);
1880
                va_end(ap);
1881
                error_string = error_buf;
1882
        }
1883
        /* Done preparing error */
1884
        JANUS_LOG(LOG_VERB, "[%s] Returning error %d (%s)\n", transaction, error, error_string ? error_string : "no text");
1885
        /* Prepare JSON error */
1886
        json_t *reply = json_object();
1887
        json_object_set_new(reply, "janus", json_string("error"));
1888
        if(session_id > 0)
1889
                json_object_set_new(reply, "session_id", json_integer(session_id));
1890
        if(transaction != NULL)
1891
                json_object_set_new(reply, "transaction", json_string(transaction));
1892
        json_t *error_data = json_object();
1893
        json_object_set_new(error_data, "code", json_integer(error));
1894
        json_object_set_new(error_data, "reason", json_string(error_string));
1895
        json_object_set_new(reply, "error", error_data);
1896
        gchar *reply_text = json_dumps(reply, json_format);
1897
        json_decref(reply);
1898
        /* Use janus_http_return_error to send the error response */
1899
        return janus_http_return_success(msg, reply_text);
1900
}