Statistics
| Branch: | Revision:

janus-gateway / transports / janus_http.c @ 88b5da7b

History | View | Annotate | Download (68.3 KB)

1
/*! \file   janus_http.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus RESTs transport plugin
5
 * \details  This is an implementation of a RESTs transport for the
6
 * Janus API, using the libmicrohttpd library (http://www.gnu.org/software/libmicrohttpd/).
7
 * This module allows browsers to make use of HTTP to talk to the gateway.
8
 * Since the gateway may be deployed on a different domain than the web
9
 * server hosting the web applications using it, the gateway automatically
10
 * handles OPTIONS request to comply with the CORS specification.
11
 * POST requests can be used to ask for the management of a session with
12
 * the gateway, to attach to a plugin, to send messages to the plugin
13
 * itself and so on. GET requests instead are used for getting events
14
 * associated to a gateway session (and as such to all its plugin handles
15
 * and the events plugins push in the session itself), using a long poll
16
 * approach. A JavaScript library (janus.js) implements all of this on
17
 * the client side automatically.
18
 * \note There's a well known bug in libmicrohttpd that may cause it to
19
 * spike to 100% of the CPU when using HTTPS on some distributions. In
20
 * case you're interested in HTTPS support, it's better to just rely on
21
 * HTTP in Janus, and put a frontend like Apache HTTPD or nginx to take
22
 * care of securing the traffic. More details are available in \ref deploy.
23
 * 
24
 * \ingroup transports
25
 * \ref transports
26
 */
27

    
28
#include "transport.h"
29

    
30
#include <arpa/inet.h>
31
#include <ifaddrs.h>
32
#include <net/if.h>
33
#include <sys/socket.h>
34
#include <sys/types.h>
35
#include <netdb.h>
36

    
37
#include <microhttpd.h>
38

    
39
#include "../debug.h"
40
#include "../apierror.h"
41
#include "../config.h"
42
#include "../mutex.h"
43
#include "../utils.h"
44

    
45

    
46
/* Transport plugin information */
47
#define JANUS_REST_VERSION                        2
48
#define JANUS_REST_VERSION_STRING        "0.0.2"
49
#define JANUS_REST_DESCRIPTION                "This transport plugin adds REST (HTTP/HTTPS) support to the Janus API via libmicrohttpd."
50
#define JANUS_REST_NAME                                "JANUS REST (HTTP/HTTPS) transport plugin"
51
#define JANUS_REST_AUTHOR                        "Meetecho s.r.l."
52
#define JANUS_REST_PACKAGE                        "janus.transport.http"
53

    
54
/* Transport methods */
55
janus_transport *create(void);
56
int janus_http_init(janus_transport_callbacks *callback, const char *config_path);
57
void janus_http_destroy(void);
58
int janus_http_get_api_compatibility(void);
59
int janus_http_get_version(void);
60
const char *janus_http_get_version_string(void);
61
const char *janus_http_get_description(void);
62
const char *janus_http_get_name(void);
63
const char *janus_http_get_author(void);
64
const char *janus_http_get_package(void);
65
gboolean janus_http_is_janus_api_enabled(void);
66
gboolean janus_http_is_admin_api_enabled(void);
67
int janus_http_send_message(void *transport, void *request_id, gboolean admin, json_t *message);
68
void janus_http_session_created(void *transport, guint64 session_id);
69
void janus_http_session_over(void *transport, guint64 session_id, gboolean timeout);
70

    
71

    
72
/* Transport setup */
73
static janus_transport janus_http_transport =
74
        JANUS_TRANSPORT_INIT (
75
                .init = janus_http_init,
76
                .destroy = janus_http_destroy,
77

    
78
                .get_api_compatibility = janus_http_get_api_compatibility,
79
                .get_version = janus_http_get_version,
80
                .get_version_string = janus_http_get_version_string,
81
                .get_description = janus_http_get_description,
82
                .get_name = janus_http_get_name,
83
                .get_author = janus_http_get_author,
84
                .get_package = janus_http_get_package,
85

    
86
                .is_janus_api_enabled = janus_http_is_janus_api_enabled,
87
                .is_admin_api_enabled = janus_http_is_admin_api_enabled,
88

    
89
                .send_message = janus_http_send_message,
90
                .session_created = janus_http_session_created,
91
                .session_over = janus_http_session_over,
92
        );
93

    
94
/* Transport creator */
95
janus_transport *create(void) {
96
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_REST_NAME);
97
        return &janus_http_transport;
98
}
99

    
100

    
101
/* Useful stuff */
102
static gint initialized = 0, stopping = 0;
103
static janus_transport_callbacks *gateway = NULL;
104
static gboolean http_janus_api_enabled = FALSE;
105
static gboolean http_admin_api_enabled = FALSE;
106

    
107
/* JSON serialization options */
108
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
109

    
110

    
111
/* Incoming HTTP message */
112
typedef struct janus_http_msg {
113
        struct MHD_Connection *connection;        /* The MHD connection this message came from */
114
        gchar *acrh;                                                /* Value of the Access-Control-Request-Headers HTTP header, if any (needed for CORS) */
115
        gchar *acrm;                                                /* Value of the Access-Control-Request-Method HTTP header, if any (needed for CORS) */
116
        gchar *contenttype;                                        /* Content-Type of the payload */
117
        gchar *payload;                                                /* Payload of the message */
118
        size_t len;                                                        /* Length of the message in octets */
119
        gint64 session_id;                                        /* Gateway-Client session identifier this message belongs to */
120
        janus_mutex wait_mutex;                                /* Mutex to wait on the response condition */
121
        janus_condition wait_cond;                        /* Response condition */
122
        gboolean got_response;                                /* Whether this message got a response from the core */
123
        json_t *response;                                        /* The response from the core */
124
} janus_http_msg;
125
static GHashTable *messages = NULL;
126
static janus_mutex messages_mutex;
127

    
128

    
129
/* Helper for long poll: HTTP events to push per session */
130
typedef struct janus_http_session {
131
        GAsyncQueue *events;        /* Events to notify for this session */
132
        gint64 destroyed;                /* Whether this session has been destroyed */
133
} janus_http_session;
134
/* We keep track of created sessions as we handle long polls */
135
const char *keepalive_id = "keepalive";
136
GHashTable *sessions = NULL;
137
GList *old_sessions = NULL;
138
GThread *sessions_watchdog = NULL;
139
janus_mutex sessions_mutex;
140

    
141

    
142
/* Callback (libmicrohttpd) invoked when a new connection is attempted on the REST API */
143
int janus_http_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen);
144
/* Callback (libmicrohttpd) invoked when a new connection is attempted on the admin/monitor webserver */
145
int janus_http_admin_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen);
146
/* Callback (libmicrohttpd) invoked when an HTTP message (GET, POST, OPTIONS, etc.) is available */
147
int janus_http_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr);
148
/* Callback (libmicrohttpd) invoked when an admin/monitor HTTP message (GET, POST, OPTIONS, etc.) is available */
149
int janus_http_admin_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr);
150
/* Callback (libmicrohttpd) invoked when headers of an incoming HTTP message have been parsed */
151
int janus_http_headers(void *cls, enum MHD_ValueKind kind, const char *key, const char *value);
152
/* Callback (libmicrohttpd) invoked when a request has been processed and can be freed */
153
void janus_http_request_completed (void *cls, struct MHD_Connection *connection, void **con_cls, enum MHD_RequestTerminationCode toe);
154
/* Worker to handle requests that are actually long polls */
155
int janus_http_notifier(janus_http_msg *msg, int max_events);
156
/* Helper to quickly send a success response */
157
int janus_http_return_success(janus_http_msg *msg, char *payload);
158
/* Helper to quickly send an error response */
159
int janus_http_return_error(janus_http_msg *msg, uint64_t session_id, const char *transaction, gint error, const char *format, ...) G_GNUC_PRINTF(5, 6);
160

    
161

    
162
/* MHD Web Server */
163
static struct MHD_Daemon *ws = NULL, *sws = NULL;
164
static char *ws_path = NULL;
165
static char *cert_pem_bytes = NULL, *cert_key_bytes = NULL; 
166

    
167
/* Admin/Monitor MHD Web Server */
168
static struct MHD_Daemon *admin_ws = NULL, *admin_sws = NULL;
169
static char *admin_ws_path = NULL;
170

    
171

    
172
/* REST and Admin/Monitor ACL list */
173
GList *janus_http_access_list = NULL, *janus_http_admin_access_list = NULL;
174
janus_mutex access_list_mutex;
175
static void janus_http_allow_address(const char *ip, gboolean admin) {
176
        if(ip == NULL)
177
                return;
178
        /* Is this an IP or an interface? */
179
        janus_mutex_lock(&access_list_mutex);
180
        if(!admin)
181
                janus_http_access_list = g_list_append(janus_http_access_list, (gpointer)ip);
182
        else
183
                janus_http_admin_access_list = g_list_append(janus_http_admin_access_list, (gpointer)ip);
184
        janus_mutex_unlock(&access_list_mutex);
185
}
186
static gboolean janus_http_is_allowed(const char *ip, gboolean admin) {
187
        if(ip == NULL)
188
                return FALSE;
189
        if(!admin && janus_http_access_list == NULL)
190
                return TRUE;
191
        if(admin && janus_http_admin_access_list == NULL)
192
                return TRUE;
193
        janus_mutex_lock(&access_list_mutex);
194
        GList *temp = admin ? janus_http_admin_access_list : janus_http_access_list;
195
        while(temp) {
196
                const char *allowed = (const char *)temp->data;
197
                if(allowed != NULL && strstr(ip, allowed)) {
198
                        janus_mutex_unlock(&access_list_mutex);
199
                        return TRUE;
200
                }
201
                temp = temp->next;
202
        }
203
        janus_mutex_unlock(&access_list_mutex);
204
        return FALSE;
205
}
206

    
207
/* Random string helper (for transactions) */
208
static char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
209
static void janus_http_random_string(int length, char *buffer) {
210
        if(length > 0 && buffer) {
211
                int l = (int)(sizeof(charset)-1);
212
                int i=0;
213
                for(i=0; i<length; i++) {
214
                        int key = rand() % l;
215
                        buffer[i] = charset[key];
216
                }
217
                buffer[length-1] = '\0';
218
        }
219
}
220

    
221

    
222
/* Helper to create a MHD daemon */
223
static struct MHD_Daemon *janus_http_create_daemon(gboolean admin, char *path,
224
                const char *interface, const char *ip, int port,
225
                gint64 threads, const char *server_pem, const char *server_key) {
226
        struct MHD_Daemon *daemon = NULL;
227
        gboolean secure = server_pem && server_key;
228
        /* Any interface or IP address we need to limit ourselves to?
229
         * NOTE WELL: specifying an interface does NOT bind to all IPs associated
230
         * with that interface, but only to the first one that's detected */
231
        static struct sockaddr_in addr;
232
        struct sockaddr_in6 addr6;
233
        gboolean ipv6 = FALSE;
234
        if(ip && strstr(ip, ":"))
235
                ipv6 = TRUE;
236
        if(ip || interface) {
237
                gboolean found = FALSE;
238
                struct ifaddrs *ifaddr = NULL, *ifa = NULL;
239
                int family = 0, s = 0, n = 0;
240
                char host[NI_MAXHOST];
241
                if(getifaddrs(&ifaddr) == -1) {
242
                        JANUS_LOG(LOG_ERR, "Error getting list of interfaces to bind %s API %s webserver...\n",
243
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
244
                        return NULL;
245
                } else {
246
                        for(ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) {
247
                                family = ifa->ifa_addr->sa_family;
248
                                if(interface && strcasecmp(ifa->ifa_name, interface))
249
                                        continue;
250
                                if(ifa->ifa_addr == NULL)
251
                                        continue;
252
                                /* Skip interfaces which are not up and running */
253
                                if(!((ifa->ifa_flags & IFF_UP) && (ifa->ifa_flags & IFF_RUNNING)))
254
                                        continue;
255
                                /* FIXME When being explicit about the interface only, we only bind IPv4 for now:
256
                                 * specifying or adding a precise IPv6 address gets you an IPv6 binding instead */
257
                                if(!ipv6 && family == AF_INET) {
258
                                        s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
259
                                        if(s != 0) {
260
                                                JANUS_LOG(LOG_ERR, "Error doing a getnameinfo() to bind %s API %s webserver to '%s'...\n",
261
                                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", ip ? ip : interface);
262
                                                return NULL;
263
                                        }
264
                                        if(ip && strcmp(host, ip))
265
                                                continue;
266
                                        found = TRUE;
267
                                        break;
268
                                } else if(ipv6 && family == AF_INET6) {
269
                                        s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in6), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
270
                                        if(s != 0) {
271
                                                JANUS_LOG(LOG_ERR, "Error doing a getnameinfo() to bind %s API %s webserver to '%s'...\n",
272
                                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", ip ? ip : interface);
273
                                                return NULL;
274
                                        }
275
                                        if(ip && strcmp(host, ip))
276
                                                continue;
277
                                        found = TRUE;
278
                                        break;
279
                                }
280
                        }
281
                        freeifaddrs(ifaddr);
282
                }
283
                if(!found) {
284
                        JANUS_LOG(LOG_ERR, "Error binding to %s '%s' for %s API %s webserver...\n",
285
                                ip ? "IP" : "interface", ip ? ip : interface,
286
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
287
                        return NULL;
288
                }
289
                JANUS_LOG(LOG_VERB, "Going to bind the %s API %s webserver to %s (asked for %s)\n",
290
                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP", host, ip ? ip : interface);
291
                if(!ipv6) {
292
                        memset(&addr, 0, sizeof (struct sockaddr_in));
293
                        addr.sin_family = AF_INET;
294
                        addr.sin_port = htons(port);
295
                        int res = inet_pton(AF_INET, host, &addr.sin_addr);
296
                        if(res != 1) {
297
                                JANUS_LOG(LOG_ERR, "Failed to convert address '%s' (%d)\n", host, res);
298
                                return NULL;
299
                        }
300
                } else {
301
                        memset(&addr6, 0, sizeof (struct sockaddr_in6));
302
                        addr6.sin6_family = AF_INET6;
303
                        addr6.sin6_port = htons(port);
304
                        int res = inet_pton(AF_INET6, host, &addr6.sin6_addr);
305
                        if(res != 1) {
306
                                JANUS_LOG(LOG_ERR, "Failed to convert address '%s' (%d)\n", host, res);
307
                                return NULL;
308
                        }
309
                }
310
        }
311

    
312
        if(!secure) {
313
                /* HTTP web server */
314
                if(threads == 0) {
315
                        JANUS_LOG(LOG_VERB, "Using a thread per connection for the %s API %s webserver\n",
316
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
317
                        if(!interface && !ip) {
318
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
319
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
320
                                /* Bind to all interfaces */
321
                                daemon = MHD_start_daemon(
322
                                        MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | MHD_USE_DUAL_STACK,
323
                                        port,
324
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
325
                                        NULL,
326
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
327
                                        path,
328
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
329
                                        MHD_OPTION_END);
330
                        } else {
331
                                /* Bind to the interface that was specified */
332
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
333
                                        ip ? "IP" : "interface", ip ? ip : interface,
334
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
335
                                daemon = MHD_start_daemon(
336
                                        MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | (ipv6 ? MHD_USE_IPv6 : 0),
337
                                        port,
338
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
339
                                        NULL,
340
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
341
                                        path,
342
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
343
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
344
                                        MHD_OPTION_END);
345
                        }
346
                } else {
347
                        JANUS_LOG(LOG_VERB, "Using a thread pool of size %"SCNi64" the %s API %s webserver\n", threads,
348
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
349
                        if(!interface && !ip) {
350
                                /* Bind to all interfaces */
351
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
352
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
353
                                daemon = MHD_start_daemon(
354
                                        MHD_USE_SELECT_INTERNALLY | MHD_USE_DUAL_STACK,
355
                                        port,
356
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
357
                                        NULL,
358
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
359
                                        path,
360
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
361
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
362
                                        MHD_OPTION_END);
363
                        } else {
364
                                /* Bind to the interface that was specified */
365
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
366
                                        ip ? "IP" : "interface", ip ? ip : interface,
367
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
368
                                daemon = MHD_start_daemon(
369
                                        MHD_USE_SELECT_INTERNALLY | (ipv6 ? MHD_USE_IPv6 : 0),
370
                                        port,
371
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
372
                                        NULL,
373
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
374
                                        path,
375
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
376
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
377
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
378
                                        MHD_OPTION_END);
379
                        }
380
                }
381
        } else {
382
                /* HTTPS web server, read certificate and key */
383
                FILE *pem = fopen(server_pem, "rb");
384
                if(pem) {
385
                        fseek(pem, 0L, SEEK_END);
386
                        size_t size = ftell(pem);
387
                        fseek(pem, 0L, SEEK_SET);
388
                        cert_pem_bytes = g_malloc0(size);
389
                        char *index = cert_pem_bytes;
390
                        int read = 0, tot = size;
391
                        while((read = fread(index, sizeof(char), tot, pem)) > 0) {
392
                                tot -= read;
393
                                index += read;
394
                        }
395
                        fclose(pem);
396
                }
397
                FILE *key = fopen(server_key, "rb");
398
                if(key) {
399
                        fseek(key, 0L, SEEK_END);
400
                        size_t size = ftell(key);
401
                        fseek(key, 0L, SEEK_SET);
402
                        cert_key_bytes = g_malloc0(size);
403
                        char *index = cert_key_bytes;
404
                        int read = 0, tot = size;
405
                        while((read = fread(index, sizeof(char), tot, key)) > 0) {
406
                                tot -= read;
407
                                index += read;
408
                        }
409
                        fclose(key);
410
                }
411
                /* Start webserver */
412
                if(threads == 0) {
413
                        JANUS_LOG(LOG_VERB, "Using a thread per connection for the %s API %s webserver\n",
414
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
415
                        if(!interface && !ip) {
416
                                /* Bind to all interfaces */
417
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
418
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
419
                                daemon = MHD_start_daemon(
420
                                        MHD_USE_SSL | MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | MHD_USE_DUAL_STACK,
421
                                        port,
422
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
423
                                        NULL,
424
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
425
                                        path,
426
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
427
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
428
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
429
                                        MHD_OPTION_END);
430
                        } else {
431
                                /* Bind to the interface that was specified */
432
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
433
                                        ip ? "IP" : "interface", ip ? ip : interface,
434
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
435
                                daemon = MHD_start_daemon(
436
                                        MHD_USE_SSL | MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL | (ipv6 ? MHD_USE_IPv6 : 0),
437
                                        port,
438
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
439
                                        NULL,
440
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
441
                                        path,
442
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
443
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
444
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
445
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
446
                                        MHD_OPTION_END);
447
                        }
448
                } else {
449
                        JANUS_LOG(LOG_VERB, "Using a thread pool of size %"SCNi64" the %s API %s webserver\n", threads,
450
                                admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
451
                        if(!interface && !ip) {
452
                                /* Bind to all interfaces */
453
                                JANUS_LOG(LOG_VERB, "Binding to all interfaces for the %s API %s webserver\n",
454
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
455
                                daemon = MHD_start_daemon(
456
                                        MHD_USE_SSL | MHD_USE_SELECT_INTERNALLY | MHD_USE_DUAL_STACK,
457
                                        port,
458
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
459
                                        NULL,
460
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
461
                                        path,
462
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
463
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
464
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
465
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
466
                                        MHD_OPTION_END);
467
                        } else {
468
                                /* Bind to the interface that was specified */
469
                                JANUS_LOG(LOG_VERB, "Binding to %s '%s' for the %s API %s webserver\n",
470
                                        ip ? "IP" : "interface", ip ? ip : interface,
471
                                        admin ? "Admin" : "Janus", secure ? "HTTPS" : "HTTP");
472
                                daemon = MHD_start_daemon(
473
                                        MHD_USE_SSL | MHD_USE_SELECT_INTERNALLY | (ipv6 ? MHD_USE_IPv6 : 0),
474
                                        port,
475
                                        admin ? janus_http_admin_client_connect : janus_http_client_connect,
476
                                        NULL,
477
                                        admin ? &janus_http_admin_handler : &janus_http_handler,
478
                                        path,
479
                                        MHD_OPTION_THREAD_POOL_SIZE, threads,
480
                                        MHD_OPTION_NOTIFY_COMPLETED, &janus_http_request_completed, NULL,
481
                                        MHD_OPTION_HTTPS_MEM_CERT, cert_pem_bytes,
482
                                        MHD_OPTION_HTTPS_MEM_KEY, cert_key_bytes,
483
                                        MHD_OPTION_SOCK_ADDR, ipv6 ? (struct sockaddr *)&addr6 : (struct sockaddr *)&addr,
484
                                        MHD_OPTION_END);
485
                        }
486
                }
487
        }
488
        return daemon;
489
}
490

    
491

    
492
/* HTTP/Janus sessions watchdog/garbage collector (sort of) */
493
static void *janus_http_sessions_watchdog(void *data) {
494
        JANUS_LOG(LOG_INFO, "HTTP/Janus sessions watchdog started\n");
495
        gint64 now = 0;
496
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
497
                janus_mutex_lock(&sessions_mutex);
498
                /* Iterate on all the sessions */
499
                now = janus_get_monotonic_time();
500
                if(old_sessions != NULL) {
501
                        GList *sl = old_sessions;
502
                        JANUS_LOG(LOG_HUGE, "Checking %d old HTTP/Janus sessions sessions...\n", g_list_length(old_sessions));
503
                        while(sl) {
504
                                janus_http_session *session = (janus_http_session *)sl->data;
505
                                if(!session) {
506
                                        sl = sl->next;
507
                                        continue;
508
                                }
509
                                if(now-session->destroyed >= G_USEC_PER_SEC) {
510
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
511
                                        JANUS_LOG(LOG_VERB, "Freeing old HTTP/Janus session\n");
512
                                        GList *rm = sl->next;
513
                                        old_sessions = g_list_delete_link(old_sessions, sl);
514
                                        sl = rm;
515
                                        /* Remove all events */
516
                                        json_t *event = NULL;
517
                                        while((event = g_async_queue_try_pop(session->events)) != NULL)
518
                                                json_decref(event);
519
                                        g_async_queue_unref(session->events);
520
                                        g_free(session);
521
                                        continue;
522
                                }
523
                                sl = sl->next;
524
                        }
525
                }
526
                janus_mutex_unlock(&sessions_mutex);
527
                g_usleep(500000);
528
        }
529
        JANUS_LOG(LOG_INFO, "HTTP/Janus sessions watchdog stopped\n");
530
        return NULL;
531
}
532

    
533

    
534
/* Transport implementation */
535
int janus_http_init(janus_transport_callbacks *callback, const char *config_path) {
536
        if(g_atomic_int_get(&stopping)) {
537
                /* Still stopping from before */
538
                return -1;
539
        }
540
        if(callback == NULL || config_path == NULL) {
541
                /* Invalid arguments */
542
                return -1;
543
        }
544

    
545
        /* This is the callback we'll need to invoke to contact the gateway */
546
        gateway = callback;
547

    
548
        /* Read configuration */
549
        char filename[255];
550
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_REST_PACKAGE);
551
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
552
        janus_config *config = janus_config_parse(filename);
553
        if(config != NULL) {
554
                janus_config_print(config);
555

    
556
                /* Handle configuration */
557
                janus_config_item *item = janus_config_get_item_drilldown(config, "general", "json");
558
                if(item && item->value) {
559
                        /* Check how we need to format/serialize the JSON output */
560
                        if(!strcasecmp(item->value, "indented")) {
561
                                /* Default: indented, we use three spaces for that */
562
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
563
                        } else if(!strcasecmp(item->value, "plain")) {
564
                                /* Not indented and no new lines, but still readable */
565
                                json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
566
                        } else if(!strcasecmp(item->value, "compact")) {
567
                                /* Compact, so no spaces between separators */
568
                                json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
569
                        } else {
570
                                JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
571
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
572
                        }
573
                }
574

    
575
                /* Check the base paths */
576
                item = janus_config_get_item_drilldown(config, "general", "base_path");
577
                if(item && item->value) {
578
                        if(item->value[0] != '/') {
579
                                JANUS_LOG(LOG_FATAL, "Invalid base path %s (it should start with a /, e.g., /janus\n", item->value);
580
                                return -1;
581
                        }
582
                        ws_path = g_strdup(item->value);
583
                        if(strlen(ws_path) > 1 && ws_path[strlen(ws_path)-1] == '/') {
584
                                /* Remove the trailing slash, it makes things harder when we parse requests later */
585
                                ws_path[strlen(ws_path)-1] = '\0';
586
                        }
587
                } else {
588
                        ws_path = g_strdup("/janus");
589
                }
590
                /* Do the same for the admin/monitor interface */
591
                item = janus_config_get_item_drilldown(config, "admin", "admin_base_path");
592
                if(item && item->value) {
593
                        if(item->value[0] != '/') {
594
                                JANUS_LOG(LOG_FATAL, "Invalid admin/monitor base path %s (it should start with a /, e.g., /admin\n", item->value);
595
                                return -1;
596
                        }
597
                        admin_ws_path = g_strdup(item->value);
598
                        if(strlen(admin_ws_path) > 1 && ws_path[strlen(admin_ws_path)-1] == '/') {
599
                                /* Remove the trailing slash, it makes things harder when we parse requests later */
600
                                admin_ws_path[strlen(admin_ws_path)-1] = '\0';
601
                        }
602
                } else {
603
                        admin_ws_path = g_strdup("/admin");
604
                }
605

    
606
                /* Any ACL for either the Janus or Admin API? */
607
                item = janus_config_get_item_drilldown(config, "general", "acl");
608
                if(item && item->value) {
609
                        gchar **list = g_strsplit(item->value, ",", -1);
610
                        gchar *index = list[0];
611
                        if(index != NULL) {
612
                                int i=0;
613
                                while(index != NULL) {
614
                                        if(strlen(index) > 0) {
615
                                                JANUS_LOG(LOG_INFO, "Adding '%s' to the Janus API allowed list...\n", index);
616
                                                janus_http_allow_address(g_strdup(index), FALSE);
617
                                        }
618
                                        i++;
619
                                        index = list[i];
620
                                }
621
                        }
622
                        g_strfreev(list);
623
                        list = NULL;
624
                }
625
                item = janus_config_get_item_drilldown(config, "admin", "admin_acl");
626
                if(item && item->value) {
627
                        gchar **list = g_strsplit(item->value, ",", -1);
628
                        gchar *index = list[0];
629
                        if(index != NULL) {
630
                                int i=0;
631
                                while(index != NULL) {
632
                                        if(strlen(index) > 0) {
633
                                                JANUS_LOG(LOG_INFO, "Adding '%s' to the Admin/monitor allowed list...\n", index);
634
                                                janus_http_allow_address(g_strdup(index), TRUE);
635
                                        }
636
                                        i++;
637
                                        index = list[i];
638
                                }
639
                        }
640
                        g_strfreev(list);
641
                        list = NULL;
642
                }
643

    
644
                /* Start with the Janus API web server now */
645
                gint64 threads = 0;
646
                item = janus_config_get_item_drilldown(config, "general", "threads");
647
                if(item && item->value) {
648
                        if(!strcasecmp(item->value, "unlimited")) {
649
                                /* No limit on threads, use a thread per connection */
650
                                threads = 0;
651
                        } else {
652
                                /* Use a thread pool */
653
                                threads = atoll(item->value);
654
                                if(threads == 0) {
655
                                        JANUS_LOG(LOG_WARN, "Chose '0' as size for the thread pool, which is equivalent to 'unlimited'\n");
656
                                } else if(threads < 0) {
657
                                        JANUS_LOG(LOG_WARN, "Invalid value '%"SCNi64"' as size for the thread pool, falling back to to 'unlimited'\n", threads);
658
                                        threads = 0;
659
                                }
660
                        }
661
                }
662
                item = janus_config_get_item_drilldown(config, "general", "http");
663
                if(!item || !item->value || !janus_is_true(item->value)) {
664
                        JANUS_LOG(LOG_WARN, "HTTP webserver disabled\n");
665
                } else {
666
                        int wsport = 8088;
667
                        item = janus_config_get_item_drilldown(config, "general", "port");
668
                        if(item && item->value)
669
                                wsport = atoi(item->value);
670
                        const char *interface = NULL;
671
                        item = janus_config_get_item_drilldown(config, "general", "interface");
672
                        if(item && item->value)
673
                                interface = item->value;
674
                        const char *ip = NULL;
675
                        item = janus_config_get_item_drilldown(config, "general", "ip");
676
                        if(item && item->value)
677
                                ip = item->value;
678
                        ws = janus_http_create_daemon(FALSE, ws_path, interface, ip, wsport, threads, NULL, NULL);
679
                        if(ws == NULL) {
680
                                JANUS_LOG(LOG_FATAL, "Couldn't start webserver on port %d...\n", wsport);
681
                        } else {
682
                                JANUS_LOG(LOG_INFO, "HTTP webserver started (port %d, %s path listener)...\n", wsport, ws_path);
683
                        }
684
                }
685
                /* Do we also have to provide an HTTPS one? */
686
                char *server_pem = NULL;
687
                item = janus_config_get_item_drilldown(config, "certificates", "cert_pem");
688
                if(item && item->value)
689
                        server_pem = (char *)item->value;
690
                char *server_key = NULL;
691
                item = janus_config_get_item_drilldown(config, "certificates", "cert_key");
692
                if(item && item->value)
693
                        server_key = (char *)item->value;
694
                if(server_key)
695
                        JANUS_LOG(LOG_VERB, "Using certificates:\n\t%s\n\t%s\n", server_pem, server_key);
696
                item = janus_config_get_item_drilldown(config, "general", "https");
697
                if(!item || !item->value || !janus_is_true(item->value)) {
698
                        JANUS_LOG(LOG_WARN, "HTTPS webserver disabled\n");
699
                } else {
700
                        if(!server_key || !server_pem) {
701
                                JANUS_LOG(LOG_FATAL, "Missing certificate/key path\n");
702
                        } else {
703
                                int swsport = 8089;
704
                                item = janus_config_get_item_drilldown(config, "general", "secure_port");
705
                                if(item && item->value)
706
                                        swsport = atoi(item->value);
707
                                const char *interface = NULL;
708
                                item = janus_config_get_item_drilldown(config, "general", "secure_interface");
709
                                if(item && item->value)
710
                                        interface = item->value;
711
                                const char *ip = NULL;
712
                                item = janus_config_get_item_drilldown(config, "general", "secure_ip");
713
                                if(item && item->value)
714
                                        ip = item->value;
715
                                sws = janus_http_create_daemon(FALSE, ws_path, interface, ip, swsport, threads, server_pem, server_key);
716
                                if(sws == NULL) {
717
                                        JANUS_LOG(LOG_FATAL, "Couldn't start secure webserver on port %d...\n", swsport);
718
                                } else {
719
                                        JANUS_LOG(LOG_INFO, "HTTPS webserver started (port %d, %s path listener)...\n", swsport, ws_path);
720
                                }
721
                        }
722
                }
723
                /* Admin/monitor time: start web server, if enabled */
724
                threads = 0;
725
                item = janus_config_get_item_drilldown(config, "admin", "admin_threads");
726
                if(item && item->value) {
727
                        if(!strcasecmp(item->value, "unlimited")) {
728
                                /* No limit on threads, use a thread per connection */
729
                                threads = 0;
730
                        } else {
731
                                /* Use a thread pool */
732
                                threads = atoll(item->value);
733
                                if(threads == 0) {
734
                                        JANUS_LOG(LOG_WARN, "Chose '0' as size for the admin/monitor thread pool, which is equivalent to 'unlimited'\n");
735
                                } else if(threads < 0) {
736
                                        JANUS_LOG(LOG_WARN, "Invalid value '%"SCNi64"' as size for the admin/monitor thread pool, falling back to to 'unlimited'\n", threads);
737
                                        threads = 0;
738
                                }
739
                        }
740
                }
741
                item = janus_config_get_item_drilldown(config, "admin", "admin_http");
742
                if(!item || !item->value || !janus_is_true(item->value)) {
743
                        JANUS_LOG(LOG_WARN, "Admin/monitor HTTP webserver disabled\n");
744
                } else {
745
                        int wsport = 7088;
746
                        item = janus_config_get_item_drilldown(config, "admin", "admin_port");
747
                        if(item && item->value)
748
                                wsport = atoi(item->value);
749
                        const char *interface = NULL;
750
                        item = janus_config_get_item_drilldown(config, "admin", "admin_interface");
751
                        if(item && item->value)
752
                                interface = item->value;
753
                        const char *ip = NULL;
754
                        item = janus_config_get_item_drilldown(config, "admin", "admin_ip");
755
                        if(item && item->value)
756
                                ip = item->value;
757
                        admin_ws = janus_http_create_daemon(TRUE, admin_ws_path, interface, ip, wsport, threads, NULL, NULL);
758
                        if(admin_ws == NULL) {
759
                                JANUS_LOG(LOG_FATAL, "Couldn't start admin/monitor webserver on port %d...\n", wsport);
760
                        } else {
761
                                JANUS_LOG(LOG_INFO, "Admin/monitor HTTP webserver started (port %d, %s path listener)...\n", wsport, admin_ws_path);
762
                        }
763
                }
764
                /* Do we also have to provide an HTTPS one? */
765
                item = janus_config_get_item_drilldown(config, "admin", "admin_https");
766
                if(!item || !item->value || !janus_is_true(item->value)) {
767
                        JANUS_LOG(LOG_WARN, "Admin/monitor HTTPS webserver disabled\n");
768
                } else {
769
                        if(!server_key) {
770
                                JANUS_LOG(LOG_FATAL, "Missing certificate/key path\n");
771
                        } else {
772
                                int swsport = 7889;
773
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_port");
774
                                if(item && item->value)
775
                                        swsport = atoi(item->value);
776
                                const char *interface = NULL;
777
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_interface");
778
                                if(item && item->value)
779
                                        interface = item->value;
780
                                const char *ip = NULL;
781
                                item = janus_config_get_item_drilldown(config, "admin", "admin_secure_ip");
782
                                if(item && item->value)
783
                                        ip = item->value;
784
                                admin_sws = janus_http_create_daemon(TRUE, admin_ws_path, interface, ip, swsport, threads, server_pem, server_key);
785
                                if(admin_sws == NULL) {
786
                                        JANUS_LOG(LOG_FATAL, "Couldn't start secure admin/monitor webserver on port %d...\n", swsport);
787
                                } else {
788
                                        JANUS_LOG(LOG_INFO, "Admin/monitor HTTPS webserver started (port %d, %s path listener)...\n", swsport, admin_ws_path);
789
                                }
790
                        }
791
                }
792
        }
793
        janus_config_destroy(config);
794
        config = NULL;
795
        if(!ws && !sws && !admin_ws && !admin_sws) {
796
                JANUS_LOG(LOG_FATAL, "No HTTP/HTTPS server started, giving up...\n"); 
797
                return -1;        /* No point in keeping the plugin loaded */
798
        }
799
        http_janus_api_enabled = ws || sws;
800
        http_admin_api_enabled = admin_ws || admin_sws;
801

    
802
        messages = g_hash_table_new(NULL, NULL);
803
        janus_mutex_init(&messages_mutex);
804
        sessions = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL);
805
        old_sessions = NULL;
806
        janus_mutex_init(&sessions_mutex);
807
        GError *error = NULL;
808
        /* Start the HTTP/Janus sessions watchdog */
809
        sessions_watchdog = g_thread_try_new("http watchdog", &janus_http_sessions_watchdog, NULL, &error);
810
        if(error != NULL) {
811
                g_atomic_int_set(&initialized, 0);
812
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the HTTP/Janus sessions watchdog thread...\n", error->code, error->message ? error->message : "??");
813
                return -1;
814
        }
815
        
816
        /* Done */
817
        g_atomic_int_set(&initialized, 1);
818
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_REST_NAME);
819
        return 0;
820
}
821

    
822
void janus_http_destroy(void) {
823
        if(!g_atomic_int_get(&initialized))
824
                return;
825
        g_atomic_int_set(&stopping, 1);
826

    
827
        JANUS_LOG(LOG_INFO, "Stopping webserver(s)...\n");
828
        if(ws)
829
                MHD_stop_daemon(ws);
830
        ws = NULL;
831
        if(sws)
832
                MHD_stop_daemon(sws);
833
        sws = NULL;
834
        if(admin_ws)
835
                MHD_stop_daemon(admin_ws);
836
        admin_ws = NULL;
837
        if(admin_sws)
838
                MHD_stop_daemon(admin_sws);
839
        admin_sws = NULL;
840
        if(cert_pem_bytes != NULL)
841
                g_free((gpointer)cert_pem_bytes);
842
        cert_pem_bytes = NULL;
843
        if(cert_key_bytes != NULL)
844
                g_free((gpointer)cert_key_bytes);
845
        cert_key_bytes = NULL;
846

    
847
        g_hash_table_destroy(messages);
848
        if(sessions_watchdog != NULL) {
849
                g_thread_join(sessions_watchdog);
850
                sessions_watchdog = NULL;
851
        }
852

    
853
        g_atomic_int_set(&initialized, 0);
854
        g_atomic_int_set(&stopping, 0);
855
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_REST_NAME);
856
}
857

    
858
int janus_http_get_api_compatibility(void) {
859
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
860
        return JANUS_TRANSPORT_API_VERSION;
861
}
862

    
863
int janus_http_get_version(void) {
864
        return JANUS_REST_VERSION;
865
}
866

    
867
const char *janus_http_get_version_string(void) {
868
        return JANUS_REST_VERSION_STRING;
869
}
870

    
871
const char *janus_http_get_description(void) {
872
        return JANUS_REST_DESCRIPTION;
873
}
874

    
875
const char *janus_http_get_name(void) {
876
        return JANUS_REST_NAME;
877
}
878

    
879
const char *janus_http_get_author(void) {
880
        return JANUS_REST_AUTHOR;
881
}
882

    
883
const char *janus_http_get_package(void) {
884
        return JANUS_REST_PACKAGE;
885
}
886

    
887
gboolean janus_http_is_janus_api_enabled(void) {
888
        return http_janus_api_enabled;
889
}
890

    
891
gboolean janus_http_is_admin_api_enabled(void) {
892
        return http_admin_api_enabled;
893
}
894

    
895
int janus_http_send_message(void *transport, void *request_id, gboolean admin, json_t *message) {
896
        JANUS_LOG(LOG_HUGE, "Got a %s API %s to send (%p)\n", admin ? "admin" : "Janus", request_id ? "response" : "event", transport);
897
        if(message == NULL) {
898
                JANUS_LOG(LOG_ERR, "No message...\n");
899
                return -1;
900
        }
901
        if(request_id == NULL) {
902
                /* This is an event, add to the session queue */
903
                json_t *s = json_object_get(message, "session_id");
904
                if(!s || !json_is_integer(s)) {
905
                        JANUS_LOG(LOG_ERR, "Can't notify event, no session_id...\n");
906
                        json_decref(message);
907
                        return -1;
908
                }
909
                guint64 session_id = json_integer_value(s);
910
                janus_mutex_lock(&sessions_mutex);
911
                janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
912
                if(session == NULL || session->destroyed) {
913
                        JANUS_LOG(LOG_ERR, "Can't notify event, no session object...\n");
914
                        janus_mutex_unlock(&sessions_mutex);
915
                        json_decref(message);
916
                        return -1;
917
                }
918
                g_async_queue_push(session->events, message);
919
                janus_mutex_unlock(&sessions_mutex);
920
        } else {
921
                if(request_id == keepalive_id) {
922
                        /* It's a response from our fake long-poll related keepalive, ignore */
923
                        json_decref(message);
924
                        return 0;
925
                }
926
                /* This is a response, we need a valid transport instance */
927
                if(transport == NULL) {
928
                        JANUS_LOG(LOG_ERR, "Invalid HTTP instance...\n");
929
                        json_decref(message);
930
                        return -1;
931
                }
932
                /* We have a response */
933
                janus_http_msg *msg = (janus_http_msg *)transport;
934
                janus_mutex_lock(&messages_mutex);
935
                if(g_hash_table_lookup(messages, msg) == NULL) {
936
                        janus_mutex_unlock(&messages_mutex);
937
                        JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");
938
                        json_decref(message);
939
                        return -1;
940
                }
941
                janus_mutex_unlock(&messages_mutex);
942
                if(!msg->connection) {
943
                        JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");
944
                        json_decref(message);
945
                        return -1;
946
                }
947
                janus_mutex_lock(&msg->wait_mutex);
948
                msg->response = message;
949
                msg->got_response = TRUE;
950
                janus_condition_signal(&msg->wait_cond);
951
                janus_mutex_unlock(&msg->wait_mutex);
952
        }
953
        return 0;
954
}
955

    
956
void janus_http_session_created(void *transport, guint64 session_id) {
957
        if(transport == NULL)
958
                return;
959
        JANUS_LOG(LOG_VERB, "Session created (%"SCNu64"), create a queue for the long poll\n", session_id);
960
        /* Create a queue of events for this session */
961
        janus_mutex_lock(&sessions_mutex);
962
        if(g_hash_table_lookup(sessions, &session_id) != NULL) {
963
                JANUS_LOG(LOG_WARN, "Ignoring created session, apparently we're already handling it?\n");
964
                janus_mutex_unlock(&sessions_mutex);
965
                return;
966
        }
967
        janus_http_session *session = g_malloc0(sizeof(janus_http_session));
968
        session->events = g_async_queue_new();
969
        session->destroyed = 0;
970
        g_hash_table_insert(sessions, janus_uint64_dup(session_id), session);
971
        janus_mutex_unlock(&sessions_mutex);
972
}
973

    
974
void janus_http_session_over(void *transport, guint64 session_id, gboolean timeout) {
975
        if(transport == NULL)
976
                return;
977
        JANUS_LOG(LOG_VERB, "Session %s (%"SCNu64"), getting rid of the queue for the long poll\n",
978
                timeout ? "has timed out" : "is over", session_id);
979
        /* Get rid of the session's queue of events */
980
        janus_mutex_lock(&sessions_mutex);
981
        janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
982
        if(session == NULL || session->destroyed) {
983
                /* Nothing to do */
984
                janus_mutex_unlock(&sessions_mutex);
985
                return;
986
        }
987
        g_hash_table_remove(sessions, &session_id);
988
        /* We leave it to the watchdog to remove the session */
989
        session->destroyed = janus_get_monotonic_time();
990
        old_sessions = g_list_append(old_sessions, session);
991
        janus_mutex_unlock(&sessions_mutex);
992
}
993

    
994
/* Connection notifiers */
995
int janus_http_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
996
        char *ip = janus_address_to_ip((struct sockaddr *)addr);
997
        JANUS_LOG(LOG_HUGE, "New connection on REST API: %s\n", ip);
998
        /* Any access limitation based on this IP address? */
999
        if(!janus_http_is_allowed(ip, FALSE)) {
1000
                JANUS_LOG(LOG_ERR, "IP %s is unauthorized to connect to the Janus API interface\n", ip);
1001
                g_free(ip);
1002
                return MHD_NO;
1003
        }
1004
        g_free(ip);
1005
        return MHD_YES;
1006
}
1007

    
1008
int janus_http_admin_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
1009
        char *ip = janus_address_to_ip((struct sockaddr *)addr);
1010
        JANUS_LOG(LOG_HUGE, "New connection on admin/monitor: %s\n", ip);
1011
        /* Any access limitation based on this IP address? */
1012
        if(!janus_http_is_allowed(ip, TRUE)) {
1013
                JANUS_LOG(LOG_ERR, "IP %s is unauthorized to connect to the admin/monitor interface\n", ip);
1014
                g_free(ip);
1015
                return MHD_NO;
1016
        }
1017
        g_free(ip);
1018
        return MHD_YES;
1019
}
1020

    
1021

    
1022
/* WebServer requests handler */
1023
int janus_http_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
1024
{
1025
        char *payload = NULL;
1026
        json_t *root = NULL;
1027
        struct MHD_Response *response = NULL;
1028
        int ret = MHD_NO;
1029
        gchar *session_path = NULL, *handle_path = NULL;
1030
        gchar **basepath = NULL, **path = NULL;
1031
        guint64 session_id = 0, handle_id = 0;
1032

    
1033
        /* Is this the first round? */
1034
        int firstround = 0;
1035
        janus_http_msg *msg = (janus_http_msg *)*ptr;
1036
        if (msg == NULL) {
1037
                firstround = 1;
1038
                JANUS_LOG(LOG_DBG, "Got a HTTP %s request on %s...\n", method, url);
1039
                JANUS_LOG(LOG_DBG, " ... Just parsing headers for now...\n");
1040
                msg = g_malloc0(sizeof(janus_http_msg));
1041
                if(msg == NULL) {
1042
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1043
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1044
                        MHD_destroy_response(response);
1045
                        goto done;
1046
                }
1047
                msg->connection = connection;
1048
                msg->acrh = NULL;
1049
                msg->acrm = NULL;
1050
                msg->payload = NULL;
1051
                msg->len = 0;
1052
                msg->session_id = 0;
1053
                msg->got_response = FALSE;
1054
                msg->response = NULL;
1055
                janus_mutex_init(&msg->wait_mutex);
1056
                janus_condition_init(&msg->wait_cond);
1057
                janus_mutex_lock(&messages_mutex);
1058
                g_hash_table_insert(messages, msg, msg);
1059
                janus_mutex_unlock(&messages_mutex);
1060
                *ptr = msg;
1061
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
1062
                ret = MHD_YES;
1063
        } else {
1064
                JANUS_LOG(LOG_DBG, "Processing HTTP %s request on %s...\n", method, url);
1065
        }
1066
        /* Parse request */
1067
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
1068
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_TRANSPORT_SPECIFIC, "Unsupported method %s", method);
1069
                goto done;
1070
        }
1071
        if (!strcasecmp(method, "OPTIONS")) {
1072
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1073
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1074
                if(msg->acrm)
1075
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1076
                if(msg->acrh)
1077
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1078
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
1079
                MHD_destroy_response(response);
1080
        }
1081
        /* Get path components */
1082
        if(strcasecmp(url, ws_path)) {
1083
                if(strlen(ws_path) > 1) {
1084
                        basepath = g_strsplit(url, ws_path, -1);
1085
                } else {
1086
                        /* The base path is the web server too itself, we process the url itself */
1087
                        basepath = g_malloc0(3);
1088
                        basepath[0] = g_strdup("/");
1089
                        basepath[1] = g_strdup(url);
1090
                }
1091
                if(basepath[0] == NULL || basepath[1] == NULL || basepath[1][0] != '/') {
1092
                        JANUS_LOG(LOG_ERR, "Invalid url %s\n", url);
1093
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1094
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1095
                        if(msg->acrm)
1096
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1097
                        if(msg->acrh)
1098
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1099
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1100
                        MHD_destroy_response(response);
1101
                }
1102
                if(firstround) {
1103
                        g_strfreev(basepath);
1104
                        return ret;
1105
                }
1106
                path = g_strsplit(basepath[1], "/", -1);
1107
                if(path == NULL || path[1] == NULL) {
1108
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
1109
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1110
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1111
                        if(msg->acrm)
1112
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1113
                        if(msg->acrh)
1114
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1115
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1116
                        MHD_destroy_response(response);
1117
                }
1118
        }
1119
        if(firstround)
1120
                return ret;
1121
        JANUS_LOG(LOG_DBG, " ... parsing request...\n");
1122
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
1123
                session_path = g_strdup(path[1]);
1124
                if(session_path == NULL) {
1125
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1126
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1127
                        MHD_destroy_response(response);
1128
                        goto done;
1129
                }
1130
                JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);
1131
        }
1132
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
1133
                handle_path = g_strdup(path[2]);
1134
                if(handle_path == NULL) {
1135
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1136
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1137
                        MHD_destroy_response(response);
1138
                        goto done;
1139
                }
1140
                JANUS_LOG(LOG_HUGE, "Handle: %s\n", handle_path);
1141
        }
1142
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
1143
                JANUS_LOG(LOG_ERR, "Too many components...\n");
1144
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1145
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1146
                if(msg->acrm)
1147
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1148
                if(msg->acrh)
1149
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1150
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1151
                MHD_destroy_response(response);
1152
                goto done;
1153
        }
1154
        /* Get payload, if any */
1155
        if(!strcasecmp(method, "POST")) {
1156
                JANUS_LOG(LOG_HUGE, "Processing POST data (%s) (%zu bytes)...\n", msg->contenttype, *upload_data_size);
1157
                if(*upload_data_size != 0) {
1158
                        if(msg->payload == NULL)
1159
                                msg->payload = g_malloc0(*upload_data_size+1);
1160
                        else
1161
                                msg->payload = g_realloc(msg->payload, msg->len+*upload_data_size+1);
1162
                        if(msg->payload == NULL) {
1163
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1164
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1165
                                MHD_destroy_response(response);
1166
                                goto done;
1167
                        }
1168
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
1169
                        msg->len += *upload_data_size;
1170
                        memset(msg->payload + msg->len, '\0', 1);
1171
                        JANUS_LOG(LOG_DBG, "  -- Data we have now (%zu bytes)\n", msg->len);
1172
                        *upload_data_size = 0;        /* Go on */
1173
                        ret = MHD_YES;
1174
                        goto done;
1175
                }
1176
                JANUS_LOG(LOG_DBG, "Done getting payload, we can answer\n");
1177
                if(msg->payload == NULL) {
1178
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
1179
                        ret = MHD_NO;
1180
                        goto done;
1181
                }
1182
                payload = msg->payload;
1183
                JANUS_LOG(LOG_HUGE, "%s\n", payload);
1184
        }
1185

    
1186
        /* Is this a generic request for info? */
1187
        if(session_path != NULL && !strcmp(session_path, "info")) {
1188
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
1189
                if(strcasecmp(method, "GET")) {
1190
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1191
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1192
                        if(msg->acrm)
1193
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1194
                        if(msg->acrh)
1195
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1196
                        ret = MHD_queue_response(connection, MHD_HTTP_BAD_REQUEST, response);
1197
                        MHD_destroy_response(response);
1198
                        goto done;
1199
                }
1200
                /* Turn this into a fake "info" request */
1201
                method = "POST";
1202
                char tr[12];
1203
                janus_http_random_string(12, (char *)&tr);                
1204
                root = json_object();
1205
                json_object_set_new(root, "janus", json_string("info"));
1206
                json_object_set_new(root, "transaction", json_string(tr));
1207
                goto parsingdone;
1208
        }
1209
        
1210
        /* Or maybe a long poll */
1211
        if(!strcasecmp(method, "GET") || !payload) {
1212
                session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1213
                if(session_id < 1) {
1214
                        JANUS_LOG(LOG_ERR, "Invalid session %s\n", session_path);
1215
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1216
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1217
                        if(msg->acrm)
1218
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1219
                        if(msg->acrh)
1220
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1221
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1222
                        MHD_destroy_response(response);
1223
                        goto done;
1224
                }
1225
                msg->session_id = session_id;
1226

    
1227
                /* Since we handle long polls ourselves, the core isn't involved (if not for providing us with events)
1228
                 * A long poll, though, can act as a keepalive, so we pass a fake one to the core to avoid undesirable timeouts */
1229

    
1230
                /* First of all, though, API secret and token based authentication may be enabled in the core, so since
1231
                 * we're bypassing it for notifications we'll have to check those ourselves */
1232
                const char *secret = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "apisecret");
1233
                const char *token = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "token");
1234
                gboolean secret_authorized = FALSE, token_authorized = FALSE;
1235
                if(!gateway->is_api_secret_needed(&janus_http_transport) && !gateway->is_auth_token_needed(&janus_http_transport)) {
1236
                        /* Nothing to check */
1237
                        secret_authorized = TRUE;
1238
                        token_authorized = TRUE;
1239
                } else {
1240
                        if(gateway->is_api_secret_valid(&janus_http_transport, secret)) {
1241
                                /* API secret is valid */
1242
                                secret_authorized = TRUE;
1243
                        }
1244
                        if(gateway->is_auth_token_valid(&janus_http_transport, token)) {
1245
                                /* Token is valid */
1246
                                token_authorized = TRUE;
1247
                        }
1248
                        /* We consider a request authorized if either the proper API secret or a valid token has been provided */
1249
                        if(!secret_authorized && !token_authorized) {
1250
                                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1251
                                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1252
                                if(msg->acrm)
1253
                                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1254
                                if(msg->acrh)
1255
                                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1256
                                ret = MHD_queue_response(connection, MHD_HTTP_FORBIDDEN, response);
1257
                                MHD_destroy_response(response);
1258
                                goto done;
1259
                        }
1260
                }
1261
                /* Ok, go on with the keepalive */
1262
                char tr[12];
1263
                janus_http_random_string(12, (char *)&tr);                
1264
                root = json_object();
1265
                json_object_set_new(root, "janus", json_string("keepalive"));
1266
                json_object_set_new(root, "session_id", json_integer(session_id));
1267
                json_object_set_new(root, "transaction", json_string(tr));
1268
                if(secret)
1269
                        json_object_set_new(root, "apisecret", json_string(secret));
1270
                if(token)
1271
                        json_object_set_new(root, "token", json_string(token));
1272
                gateway->incoming_request(&janus_http_transport, msg, (void *)keepalive_id, FALSE, root, NULL);
1273
                /* Ok, go on */
1274
                if(handle_path) {
1275
                        char *location = (char *)g_malloc0(strlen(ws_path) + strlen(session_path) + 2);
1276
                        g_sprintf(location, "%s/%s", ws_path, session_path);
1277
                        JANUS_LOG(LOG_ERR, "Invalid GET to %s, redirecting to %s\n", url, location);
1278
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1279
                        MHD_add_response_header(response, "Location", location);
1280
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1281
                        if(msg->acrm)
1282
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1283
                        if(msg->acrh)
1284
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1285
                        ret = MHD_queue_response(connection, 302, response);
1286
                        MHD_destroy_response(response);
1287
                        g_free(location);
1288
                        goto done;
1289
                }
1290
                janus_mutex_lock(&sessions_mutex);
1291
                janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
1292
                janus_mutex_unlock(&sessions_mutex);
1293
                if(!session || session->destroyed) {
1294
                        JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
1295
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1296
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1297
                        if(msg->acrm)
1298
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1299
                        if(msg->acrh)
1300
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1301
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1302
                        MHD_destroy_response(response);
1303
                        goto done;
1304
                }
1305
                /* How many messages can we send back in a single response? (just one by default) */
1306
                int max_events = 1;
1307
                const char *maxev = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "maxev");
1308
                if(maxev != NULL) {
1309
                        max_events = atoi(maxev);
1310
                        if(max_events < 1) {
1311
                                JANUS_LOG(LOG_WARN, "Invalid maxev parameter passed (%d), defaulting to 1\n", max_events);
1312
                                max_events = 1;
1313
                        }
1314
                }
1315
                JANUS_LOG(LOG_VERB, "Session %"SCNu64" found... returning up to %d messages\n", session_id, max_events);
1316
                /* Handle GET, taking the first message from the list */
1317
                json_t *event = g_async_queue_try_pop(session->events);
1318
                if(event != NULL) {
1319
                        if(max_events == 1) {
1320
                                /* Return just this message and leave */
1321
                                gchar *event_text = json_dumps(event, json_format);
1322
                                json_decref(event);
1323
                                ret = janus_http_return_success(msg, event_text);
1324
                        } else {
1325
                                /* The application is willing to receive more events at the same time, anything to report? */
1326
                                json_t *list = json_array();
1327
                                json_array_append_new(list, event);
1328
                                int events = 1;
1329
                                while(events < max_events) {
1330
                                        event = g_async_queue_try_pop(session->events);
1331
                                        if(event == NULL)
1332
                                                break;
1333
                                        json_array_append_new(list, event);
1334
                                        events++;
1335
                                }
1336
                                /* Return the array of messages and leave */
1337
                                gchar *list_text = json_dumps(list, json_format);
1338
                                json_decref(list);
1339
                                ret = janus_http_return_success(msg, list_text);
1340
                        }
1341
                } else {
1342
                        /* Still no message, wait */
1343
                        ret = janus_http_notifier(msg, max_events);
1344
                }
1345
                goto done;
1346
        }
1347
        
1348
        json_error_t error;
1349
        /* Parse the JSON payload */
1350
        root = json_loads(payload, 0, &error);
1351
        if(!root) {
1352
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
1353
                goto done;
1354
        }
1355
        if(!json_is_object(root)) {
1356
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
1357
                json_decref(root);
1358
                goto done;
1359
        }
1360

    
1361
parsingdone:
1362
        /* Check if we have session and handle identifiers */
1363
        session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1364
        handle_id = handle_path ? g_ascii_strtoull(handle_path, NULL, 10) : 0;
1365
        if(session_id > 0)
1366
                json_object_set_new(root, "session_id", json_integer(session_id));
1367
        if(handle_id > 0)
1368
                json_object_set_new(root, "handle_id", json_integer(handle_id));
1369

    
1370
        /* Suspend the connection and pass the ball to the core */
1371
        JANUS_LOG(LOG_HUGE, "Forwarding request to the core (%p)\n", msg);
1372
        gateway->incoming_request(&janus_http_transport, msg, msg, FALSE, root, &error);
1373
        /* Wait for a response (but not forever) */
1374
        struct timeval now;
1375
        gettimeofday(&now, NULL);
1376
        struct timespec wakeup;
1377
        wakeup.tv_sec = now.tv_sec+10;        /* Wait at max 10 seconds for a response */
1378
        wakeup.tv_nsec = now.tv_usec*1000UL;
1379
        pthread_mutex_lock(&msg->wait_mutex);
1380
        while(!msg->got_response) {
1381
                int res = pthread_cond_timedwait(&msg->wait_cond, &msg->wait_mutex, &wakeup);
1382
                if(msg->got_response || res == ETIMEDOUT)
1383
                        break;
1384
        }
1385
        pthread_mutex_unlock(&msg->wait_mutex);
1386
        if(!msg->response) {
1387
                ret = MHD_NO;
1388
        } else {
1389
                char *response_text = json_dumps(msg->response, json_format);
1390
                json_decref(msg->response);
1391
                msg->response = NULL;
1392
                ret = janus_http_return_success(msg, response_text);
1393
        }
1394

    
1395
done:
1396
        g_strfreev(basepath);
1397
        g_strfreev(path);
1398
        g_free(session_path);
1399
        g_free(handle_path);
1400
        return ret;
1401
}
1402

    
1403
/* Admin/monitor WebServer requests handler */
1404
int janus_http_admin_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
1405
{
1406
        char *payload = NULL;
1407
        json_t *root = NULL;
1408
        struct MHD_Response *response = NULL;
1409
        int ret = MHD_NO;
1410
        gchar *session_path = NULL, *handle_path = NULL;
1411
        gchar **basepath = NULL, **path = NULL;
1412
        guint64 session_id = 0, handle_id = 0;
1413

    
1414
        /* Is this the first round? */
1415
        int firstround = 0;
1416
        janus_http_msg *msg = (janus_http_msg *)*ptr;
1417
        if (msg == NULL) {
1418
                firstround = 1;
1419
                JANUS_LOG(LOG_VERB, "Got an admin/monitor HTTP %s request on %s...\n", method, url);
1420
                JANUS_LOG(LOG_DBG, " ... Just parsing headers for now...\n");
1421
                msg = g_malloc0(sizeof(janus_http_msg));
1422
                if(msg == NULL) {
1423
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1424
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1425
                        MHD_destroy_response(response);
1426
                        goto done;
1427
                }
1428
                msg->connection = connection;
1429
                msg->acrh = NULL;
1430
                msg->acrm = NULL;
1431
                msg->payload = NULL;
1432
                msg->len = 0;
1433
                msg->session_id = 0;
1434
                msg->got_response = FALSE;
1435
                msg->response = NULL;
1436
                janus_mutex_init(&msg->wait_mutex);
1437
                janus_condition_init(&msg->wait_cond);
1438
                janus_mutex_lock(&messages_mutex);
1439
                g_hash_table_insert(messages, msg, msg);
1440
                janus_mutex_unlock(&messages_mutex);
1441
                *ptr = msg;
1442
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
1443
                ret = MHD_YES;
1444
        }
1445
        /* Parse request */
1446
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
1447
                JANUS_LOG(LOG_ERR, "Unsupported method...\n");
1448
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1449
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1450
                if(msg->acrm)
1451
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1452
                if(msg->acrh)
1453
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1454
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_IMPLEMENTED, response);
1455
                MHD_destroy_response(response);
1456
                return ret;
1457
        }
1458
        if (!strcasecmp(method, "OPTIONS")) {
1459
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1460
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1461
                if(msg->acrm)
1462
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1463
                if(msg->acrh)
1464
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1465
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
1466
                MHD_destroy_response(response);
1467
        }
1468
        /* Get path components */
1469
        if(strcasecmp(url, admin_ws_path)) {
1470
                if(strlen(admin_ws_path) > 1) {
1471
                        basepath = g_strsplit(url, admin_ws_path, -1);
1472
                } else {
1473
                        /* The base path is the web server too itself, we process the url itself */
1474
                        basepath = g_malloc0(3);
1475
                        basepath[0] = g_strdup("/");
1476
                        basepath[1] = g_strdup(url);
1477
                }
1478
                if(basepath[0] == NULL || basepath[1] == NULL || basepath[1][0] != '/') {
1479
                        JANUS_LOG(LOG_ERR, "Invalid url %s\n", url);
1480
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1481
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1482
                        if(msg->acrm)
1483
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1484
                        if(msg->acrh)
1485
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1486
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1487
                        MHD_destroy_response(response);
1488
                }
1489
                if(firstround) {
1490
                        g_strfreev(basepath);
1491
                        return ret;
1492
                }
1493
                path = g_strsplit(basepath[1], "/", -1);
1494
                if(path == NULL || path[1] == NULL) {
1495
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
1496
                        response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1497
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1498
                        if(msg->acrm)
1499
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1500
                        if(msg->acrh)
1501
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1502
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1503
                        MHD_destroy_response(response);
1504
                }
1505
        }
1506
        if(firstround)
1507
                return ret;
1508
        JANUS_LOG(LOG_DBG, " ... parsing request...\n");
1509
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
1510
                session_path = g_strdup(path[1]);
1511
                if(session_path == NULL) {
1512
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1513
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1514
                        MHD_destroy_response(response);
1515
                        goto done;
1516
                }
1517
                JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);
1518
        }
1519
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
1520
                handle_path = g_strdup(path[2]);
1521
                if(handle_path == NULL) {
1522
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1523
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1524
                        MHD_destroy_response(response);
1525
                        goto done;
1526
                }
1527
                JANUS_LOG(LOG_HUGE, "Handle: %s\n", handle_path);
1528
        }
1529
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
1530
                JANUS_LOG(LOG_ERR, "Too many components...\n");
1531
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1532
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1533
                if(msg->acrm)
1534
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1535
                if(msg->acrh)
1536
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1537
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1538
                MHD_destroy_response(response);
1539
                goto done;
1540
        }
1541
        /* Get payload, if any */
1542
        if(!strcasecmp(method, "POST")) {
1543
                JANUS_LOG(LOG_HUGE, "Processing POST data (%s) (%zu bytes)...\n", msg->contenttype, *upload_data_size);
1544
                if(*upload_data_size != 0) {
1545
                        if(msg->payload == NULL)
1546
                                msg->payload = g_malloc0(*upload_data_size+1);
1547
                        else
1548
                                msg->payload = g_realloc(msg->payload, msg->len+*upload_data_size+1);
1549
                        if(msg->payload == NULL) {
1550
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1551
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1552
                                MHD_destroy_response(response);
1553
                                goto done;
1554
                        }
1555
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
1556
                        msg->len += *upload_data_size;
1557
                        memset(msg->payload + msg->len, '\0', 1);
1558
                        JANUS_LOG(LOG_DBG, "  -- Data we have now (%zu bytes)\n", msg->len);
1559
                        *upload_data_size = 0;        /* Go on */
1560
                        ret = MHD_YES;
1561
                        goto done;
1562
                }
1563
                JANUS_LOG(LOG_DBG, "Done getting payload, we can answer\n");
1564
                if(msg->payload == NULL) {
1565
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
1566
                        ret = MHD_NO;
1567
                        goto done;
1568
                }
1569
                payload = msg->payload;
1570
                JANUS_LOG(LOG_HUGE, "%s\n", payload);
1571
        }
1572

    
1573
        /* Is this a generic request for info? */
1574
        if(session_path != NULL && !strcmp(session_path, "info")) {
1575
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
1576
                if(strcasecmp(method, "GET")) {
1577
                        ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_TRANSPORT_SPECIFIC, "Use GET for the info endpoint");
1578
                        goto done;
1579
                }
1580
                /* Turn this into a fake "info" request */
1581
                method = "POST";
1582
                char tr[12];
1583
                janus_http_random_string(12, (char *)&tr);                
1584
                root = json_object();
1585
                json_object_set_new(root, "janus", json_string("info"));
1586
                json_object_set_new(root, "transaction", json_string(tr));
1587
                goto parsingdone;
1588
        }
1589
        
1590
        /* Without a payload we don't know what to do */
1591
        if(!payload) {
1592
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "Request payload missing");
1593
                goto done;
1594
        }
1595
        json_error_t error;
1596
        /* Parse the JSON payload */
1597
        root = json_loads(payload, 0, &error);
1598
        if(!root) {
1599
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
1600
                goto done;
1601
        }
1602
        if(!json_is_object(root)) {
1603
                ret = janus_http_return_error(msg, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
1604
                json_decref(root);
1605
                goto done;
1606
        }
1607

    
1608
parsingdone:
1609
        /* Check if we have session and handle identifiers */
1610
        session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;
1611
        handle_id = handle_path ? g_ascii_strtoull(handle_path, NULL, 10) : 0;
1612
        if(session_id > 0)
1613
                json_object_set_new(root, "session_id", json_integer(session_id));
1614
        if(handle_id > 0)
1615
                json_object_set_new(root, "handle_id", json_integer(handle_id));
1616

    
1617
        /* Suspend the connection and pass the ball to the core */
1618
        JANUS_LOG(LOG_HUGE, "Forwarding admin request to the core (%p)\n", msg);
1619
        gateway->incoming_request(&janus_http_transport, msg, msg, TRUE, root, &error);
1620
        /* Wait for a response (but not forever) */
1621
        struct timeval now;
1622
        gettimeofday(&now, NULL);
1623
        struct timespec wakeup;
1624
        wakeup.tv_sec = now.tv_sec+10;        /* Wait at max 10 seconds for a response */
1625
        wakeup.tv_nsec = now.tv_usec*1000UL;
1626
        pthread_mutex_lock(&msg->wait_mutex);
1627
        while(!msg->got_response) {
1628
                int res = pthread_cond_timedwait(&msg->wait_cond, &msg->wait_mutex, &wakeup);
1629
                if(msg->got_response || res == ETIMEDOUT)
1630
                        break;
1631
        }
1632
        pthread_mutex_unlock(&msg->wait_mutex);
1633
        if(!msg->response) {
1634
                ret = MHD_NO;
1635
        } else {
1636
                char *response_text = json_dumps(msg->response, json_format);
1637
                json_decref(msg->response);
1638
                msg->response = NULL;
1639
                ret = janus_http_return_success(msg, response_text);
1640
        }
1641

    
1642
done:
1643
        g_strfreev(basepath);
1644
        g_strfreev(path);
1645
        g_free(session_path);
1646
        g_free(handle_path);
1647
        return ret;
1648
}
1649

    
1650
int janus_http_headers(void *cls, enum MHD_ValueKind kind, const char *key, const char *value) {
1651
        janus_http_msg *request = cls;
1652
        JANUS_LOG(LOG_DBG, "%s: %s\n", key, value);
1653
        if(!strcasecmp(key, MHD_HTTP_HEADER_CONTENT_TYPE)) {
1654
                if(request)
1655
                        request->contenttype = strdup(value);
1656
        } else if(!strcasecmp(key, "Access-Control-Request-Method")) {
1657
                if(request)
1658
                        request->acrm = strdup(value);
1659
        } else if(!strcasecmp(key, "Access-Control-Request-Headers")) {
1660
                if(request)
1661
                        request->acrh = strdup(value);
1662
        }
1663
        return MHD_YES;
1664
}
1665

    
1666
void janus_http_request_completed(void *cls, struct MHD_Connection *connection, void **con_cls, enum MHD_RequestTerminationCode toe) {
1667
        JANUS_LOG(LOG_DBG, "Request completed, freeing data\n");
1668
        janus_http_msg *request = *con_cls;
1669
        if(!request)
1670
                return;
1671
        janus_mutex_lock(&messages_mutex);
1672
        g_hash_table_remove(messages, request);
1673
        janus_mutex_unlock(&messages_mutex);
1674
        if(request->payload != NULL)
1675
                g_free(request->payload);
1676
        if(request->contenttype != NULL)
1677
                free(request->contenttype);
1678
        if(request->acrh != NULL)
1679
                g_free(request->acrh);
1680
        if(request->acrm != NULL)
1681
                g_free(request->acrm);
1682
        g_free(request);
1683
        *con_cls = NULL;   
1684
}
1685

    
1686
/* Worker to handle notifications */
1687
int janus_http_notifier(janus_http_msg *msg, int max_events) {
1688
        if(!msg || !msg->connection)
1689
                return MHD_NO;
1690
        struct MHD_Connection *connection = msg->connection;
1691
        if(max_events < 1)
1692
                max_events = 1;
1693
        JANUS_LOG(LOG_DBG, "... handling long poll...\n");
1694
        struct MHD_Response *response = NULL;
1695
        int ret = MHD_NO;
1696
        guint64 session_id = msg->session_id;
1697
        janus_mutex_lock(&sessions_mutex);
1698
        janus_http_session *session = g_hash_table_lookup(sessions, &session_id);
1699
        janus_mutex_unlock(&sessions_mutex);
1700
        if(!session || session->destroyed) {
1701
                JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
1702
                response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
1703
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1704
                if(msg->acrm)
1705
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1706
                if(msg->acrh)
1707
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1708
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1709
                MHD_destroy_response(response);
1710
                return ret;
1711
        }
1712
        gint64 start = janus_get_monotonic_time();
1713
        gint64 end = 0;
1714
        json_t *event = NULL, *list = NULL;
1715
        gboolean found = FALSE;
1716
        /* We have a timeout for the long poll: 30 seconds */
1717
        while(end-start < 30*G_USEC_PER_SEC) {
1718
                if(session->destroyed)
1719
                        break;
1720
                event = g_async_queue_try_pop(session->events);
1721
                if(session->destroyed || g_atomic_int_get(&stopping) || event != NULL) {
1722
                        if(event == NULL)
1723
                                break;
1724
                        /* Gotcha! */
1725
                        found = TRUE;
1726
                        if(max_events == 1) {
1727
                                break;
1728
                        } else {
1729
                                /* The application is willing to receive more events at the same time, anything to report? */
1730
                                list = json_array();
1731
                                json_array_append_new(list, event);
1732
                                int events = 1;
1733
                                while(events < max_events) {
1734
                                        event = g_async_queue_try_pop(session->events);
1735
                                        if(event == NULL)
1736
                                                break;
1737
                                        json_array_append_new(list, event);
1738
                                        events++;
1739
                                }
1740
                                break;
1741
                        }
1742
                }
1743
                /* Sleep 100ms */
1744
                g_usleep(100000);
1745
                end = janus_get_monotonic_time();
1746
        }
1747
        if(!found) {
1748
                JANUS_LOG(LOG_VERB, "Long poll time out for session %"SCNu64"...\n", session_id);
1749
                /* Turn this into a "keepalive" response */
1750
                char tr[12];
1751
                janus_http_random_string(12, (char *)&tr);                
1752
                if(max_events == 1) {
1753
                        event = json_object();
1754
                        json_object_set_new(event, "janus", json_string("keepalive"));
1755
                } else {
1756
                        list = json_array();
1757
                        event = json_object();
1758
                        json_object_set_new(event, "janus", json_string("keepalive"));
1759
                        json_array_append_new(list, event);
1760
                }
1761
                /* FIXME Improve the Janus protocol keep-alive mechanism in JavaScript */
1762
        }
1763
        char *payload_text = json_dumps(list ? list : event, json_format);
1764
        json_decref(list ? list : event);
1765
        /* Finish the request by sending the response */
1766
        JANUS_LOG(LOG_VERB, "We have a message to serve...\n\t%s\n", payload_text);
1767
        /* Send event */
1768
        ret = janus_http_return_success(msg, payload_text);
1769
        return ret;
1770
}
1771

    
1772
/* Helper to quickly send a success response */
1773
int janus_http_return_success(janus_http_msg *msg, char *payload) {
1774
        if(!msg || !msg->connection) {
1775
                g_free(payload);
1776
                return MHD_NO;
1777
        }
1778
        struct MHD_Response *response = MHD_create_response_from_buffer(
1779
                strlen(payload),
1780
                (void*)payload,
1781
                MHD_RESPMEM_MUST_FREE);
1782
        MHD_add_response_header(response, "Content-Type", "application/json");
1783
        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1784
        if(msg->acrm)
1785
                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1786
        if(msg->acrh)
1787
                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1788
        int ret = MHD_queue_response(msg->connection, MHD_HTTP_OK, response);
1789
        MHD_destroy_response(response);
1790
        return ret;
1791
}
1792

    
1793
/* Helper to quickly send an error response */
1794
int janus_http_return_error(janus_http_msg *msg, uint64_t session_id, const char *transaction, gint error, const char *format, ...) {
1795
        gchar *error_string = NULL;
1796
        gchar error_buf[512];
1797
        if(format == NULL) {
1798
                /* No error string provided, use the default one */
1799
                error_string = (gchar *)janus_get_api_error(error);
1800
        } else {
1801
                /* This callback has variable arguments (error string) */
1802
                va_list ap;
1803
                va_start(ap, format);
1804
                g_vsnprintf(error_buf, 512, format, ap);
1805
                va_end(ap);
1806
                error_string = error_buf;
1807
        }
1808
        /* Done preparing error */
1809
        JANUS_LOG(LOG_VERB, "[%s] Returning error %d (%s)\n", transaction, error, error_string ? error_string : "no text");
1810
        /* Prepare JSON error */
1811
        json_t *reply = json_object();
1812
        json_object_set_new(reply, "janus", json_string("error"));
1813
        if(session_id > 0)
1814
                json_object_set_new(reply, "session_id", json_integer(session_id));
1815
        if(transaction != NULL)
1816
                json_object_set_new(reply, "transaction", json_string(transaction));
1817
        json_t *error_data = json_object();
1818
        json_object_set_new(error_data, "code", json_integer(error));
1819
        json_object_set_new(error_data, "reason", json_string(error_string));
1820
        json_object_set_new(reply, "error", error_data);
1821
        gchar *reply_text = json_dumps(reply, json_format);
1822
        json_decref(reply);
1823
        /* Use janus_http_return_error to send the error response */
1824
        return janus_http_return_success(msg, reply_text);
1825
}