Statistics
| Branch: | Revision:

janus-gateway / janus.c @ 57953fa0

History | View | Annotate | Download (193 KB)

1
/*! \file   janus.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus core
5
 * \details Implementation of the gateway core. This code takes care of
6
 * the gateway initialization (command line/configuration) and setup,
7
 * and implements the web server (based on libmicrohttpd) and Janus protocol
8
 * (a JSON protocol implemented with Jansson) to interact with the web
9
 * applications. The core also takes care of bridging peers and plugins
10
 * accordingly. 
11
 * 
12
 * \ingroup core
13
 * \ref core
14
 */
15
 
16
#include <dlfcn.h>
17
#include <dirent.h>
18
#include <ifaddrs.h>
19
#include <net/if.h>
20
#include <signal.h>
21
#include <getopt.h>
22
#include <sys/resource.h>
23

    
24
#include "janus.h"
25
#include "cmdline.h"
26
#include "config.h"
27
#include "apierror.h"
28
#include "debug.h"
29
#include "rtcp.h"
30
#include "sdp.h"
31
#include "utils.h"
32

    
33

    
34
#define JANUS_NAME                                "Janus WebRTC Gateway"
35
#define JANUS_AUTHOR                        "Meetecho s.r.l."
36
#define JANUS_VERSION                        8
37
#define JANUS_VERSION_STRING        "0.0.8"
38

    
39
#ifdef __MACH__
40
#define SHLIB_EXT "0.dylib"
41
#else
42
#define SHLIB_EXT ".so"
43
#endif
44

    
45

    
46
static janus_config *config = NULL;
47
static char *config_file = NULL;
48
static char *configs_folder = NULL;
49

    
50
static GHashTable *plugins = NULL;
51
static GHashTable *plugins_so = NULL;
52

    
53
/* MHD Web Server */
54
static struct MHD_Daemon *ws = NULL, *sws = NULL;
55
static char *ws_path = NULL;
56
static char *ws_api_secret = NULL;
57

    
58
#ifdef HAVE_WEBSOCKETS
59
/* libwebsock WS server */
60
static libwebsock_context *wss = NULL, *swss = NULL;
61
#endif
62

    
63
#ifdef HAVE_RABBITMQ
64
/* RabbitMQ support */
65
amqp_connection_state_t rmq_conn = NULL;
66
amqp_channel_t rmq_channel = 0;
67
amqp_bytes_t to_janus_queue, from_janus_queue;
68
#endif
69

    
70

    
71
/* Admin/Monitor MHD Web Server */
72
static struct MHD_Daemon *admin_ws = NULL, *admin_sws = NULL;
73
static char *admin_ws_path = NULL;
74
static char *admin_ws_api_secret = NULL;
75

    
76
/* Admin/Monitor ACL list */
77
GList *janus_admin_access_list = NULL;
78
janus_mutex access_list_mutex;
79
void janus_admin_allow_address(const char *ip);
80
void janus_admin_allow_address(const char *ip) {
81
        if(ip == NULL)
82
                return;
83
        /* Is this an IP or an interface? */
84
        janus_mutex_lock(&access_list_mutex);
85
        janus_admin_access_list = g_list_append(janus_admin_access_list, (gpointer)ip);
86
        janus_mutex_unlock(&access_list_mutex);
87
}
88
gboolean janus_admin_is_allowed(const char *ip);
89
gboolean janus_admin_is_allowed(const char *ip) {
90
        if(ip == NULL)
91
                return FALSE;
92
        if(janus_admin_access_list == NULL)
93
                return TRUE;
94
        janus_mutex_lock(&access_list_mutex);
95
        GList *temp = janus_admin_access_list;
96
        while(temp) {
97
                const char *allowed = (const char *)temp->data;
98
                if(allowed != NULL && strstr(ip, allowed)) {
99
                        janus_mutex_unlock(&access_list_mutex);
100
                        return TRUE;
101
                }
102
                temp = temp->next;
103
        }
104
        janus_mutex_unlock(&access_list_mutex);
105
        return FALSE;
106
}
107

    
108
/* Admin/Monitor helpers */
109
json_t *janus_admin_stream_summary(janus_ice_stream *stream);
110
json_t *janus_admin_component_summary(janus_ice_component *component);
111

    
112

    
113
/* Certificates */
114
static char *server_pem = NULL;
115
gchar *janus_get_server_pem(void) {
116
        return server_pem;
117
}
118
static char *server_key = NULL;
119
gchar *janus_get_server_key(void) {
120
        return server_key;
121
}
122

    
123

    
124
/* Information */
125
char *janus_info(const char *transaction);
126
char *janus_info(const char *transaction) {
127
        /* Prepare a summary on the gateway */
128
        json_t *info = json_object();
129
        json_object_set_new(info, "janus", json_string("server_info"));
130
        if(transaction != NULL)
131
                json_object_set_new(info, "transaction", json_string(transaction));
132
        json_object_set_new(info, "name", json_string(JANUS_NAME));
133
        json_object_set_new(info, "version", json_integer(JANUS_VERSION));
134
        json_object_set_new(info, "version_string", json_string(JANUS_VERSION_STRING));
135
        json_object_set_new(info, "author", json_string(JANUS_AUTHOR));
136
#ifdef HAVE_SCTP
137
        json_object_set_new(info, "data_channels", json_string("true"));
138
#else
139
        json_object_set_new(info, "data_channels", json_string("false"));
140
#endif
141
#ifdef HAVE_WEBSOCKETS
142
        json_object_set_new(info, "websockets", json_string("true"));
143
#else
144
        json_object_set_new(info, "websockets", json_string("false"));
145
#endif
146
#ifdef HAVE_RABBITMQ
147
        json_object_set_new(info, "rabbitmq", json_string("true"));
148
#else
149
        json_object_set_new(info, "rabbitmq", json_string("false"));
150
#endif
151
        json_object_set_new(info, "ipv6", json_string(janus_ice_is_ipv6_enabled() ? "true" : "false"));
152
        json_object_set_new(info, "ice-tcp", json_string(janus_ice_is_ice_tcp_enabled() ? "true" : "false"));
153
        if(janus_ice_get_stun_server() != NULL) {
154
                char server[255];
155
                g_snprintf(server, 255, "%s:%"SCNu16, janus_ice_get_stun_server(), janus_ice_get_stun_port());
156
                json_object_set_new(info, "stun-server", json_string(server));
157
        }
158
        if(janus_ice_get_turn_server() != NULL) {
159
                char server[255];
160
                g_snprintf(server, 255, "%s:%"SCNu16, janus_ice_get_turn_server(), janus_ice_get_turn_port());
161
                json_object_set_new(info, "turn-server", json_string(server));
162
        }
163
        json_t *data = json_object();
164
        GHashTableIter iter;
165
        gpointer value;
166
        g_hash_table_iter_init(&iter, plugins);
167
        while (g_hash_table_iter_next(&iter, NULL, &value)) {
168
                janus_plugin *p = value;
169
                if(p == NULL) {
170
                        continue;
171
                }
172
                json_t *plugin = json_object();
173
                json_object_set_new(plugin, "name", json_string(p->get_name()));
174
                json_object_set_new(plugin, "author", json_string(p->get_author()));
175
                json_object_set_new(plugin, "description", json_string(p->get_description()));
176
                json_object_set_new(plugin, "version_string", json_string(p->get_version_string()));
177
                json_object_set_new(plugin, "version", json_integer(p->get_version()));
178
                json_object_set_new(data, p->get_package(), plugin);
179
        }
180
        json_object_set_new(info, "plugins", data);
181
        /* Convert to a string */
182
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
183
        json_decref(info);
184
        
185
        return info_text;
186
}
187

    
188
static gchar *local_ip = NULL;
189
gchar *janus_get_local_ip(void) {
190
        return local_ip;
191
}
192
static gchar *public_ip = NULL;
193
gchar *janus_get_public_ip(void) {
194
        /* Fallback to the local IP, if we have no public one */
195
        return public_ip ? public_ip : local_ip;
196
}
197
void janus_set_public_ip(const char *ip) {
198
        if(ip == NULL)
199
                return;
200
        if(public_ip != NULL)
201
                g_free(public_ip);
202
        public_ip = g_strdup(ip);
203
}
204
static volatile gint stop = 0;
205
gint janus_is_stopping(void) {
206
        return g_atomic_int_get(&stop);
207
}
208

    
209

    
210
/* Logging */
211
int log_level = 0;
212
int lock_debug = 0;
213

    
214

    
215
/*! \brief Signal handler (just used to intercept CTRL+C) */
216
void janus_handle_signal(int signum);
217
void janus_handle_signal(int signum)
218
{
219
        switch(g_atomic_int_get(&stop)) {
220
                case 0:
221
                        JANUS_PRINT("Stopping gateway, please wait...\n");
222
                        break;
223
                case 1:
224
                        JANUS_PRINT("In a hurry? I'm trying to free resources cleanly, here!\n");
225
                        break;
226
                default:
227
                        JANUS_PRINT("Ok, leaving immediately...\n");
228
                        break;
229
        }
230
        g_atomic_int_inc(&stop);
231
        if(g_atomic_int_get(&stop) > 2)
232
                exit(1);
233
}
234

    
235

    
236
/** @name Plugin callback interface
237
 * These are the callbacks implemented by the gateway core, as part of
238
 * the janus_callbacks interface. Everything the plugins send the
239
 * gateway is handled here.
240
 */
241
///@{
242
int janus_push_event(janus_plugin_session *plugin_session, janus_plugin *plugin, const char *transaction, const char *message, const char *sdp_type, const char *sdp);
243
json_t *janus_handle_sdp(janus_plugin_session *plugin_session, janus_plugin *plugin, const char *sdp_type, const char *sdp);
244
void janus_relay_rtp(janus_plugin_session *plugin_session, int video, char *buf, int len);
245
void janus_relay_rtcp(janus_plugin_session *plugin_session, int video, char *buf, int len);
246
void janus_relay_data(janus_plugin_session *plugin_session, char *buf, int len);
247
void janus_close_pc(janus_plugin_session *plugin_session);
248
void janus_end_session(janus_plugin_session *plugin_session);
249
static janus_callbacks janus_handler_plugin =
250
        {
251
                .push_event = janus_push_event,
252
                .relay_rtp = janus_relay_rtp,
253
                .relay_rtcp = janus_relay_rtcp,
254
                .relay_data = janus_relay_data,
255
                .close_pc = janus_close_pc,
256
                .end_session = janus_end_session,
257
        }; 
258
///@}
259

    
260

    
261
#ifdef HAVE_WEBSOCKETS
262
/* WebSocket sessions */
263
static janus_mutex wss_mutex;
264
static GHashTable *wss_sessions = NULL;
265
#endif
266

    
267
#ifdef HAVE_RABBITMQ
268
/* FIXME RabbitMQ session (always 1 at the moment) */
269
janus_rabbitmq_client *rmq_client = NULL;
270
#endif
271

    
272
/* Gateway Sessions */
273
static janus_mutex sessions_mutex;
274
static GHashTable *sessions = NULL, *old_sessions = NULL;
275
static GMainContext *sessions_watchdog_context = NULL;
276

    
277

    
278
#define SESSION_TIMEOUT                60                /* FIXME Should this be higher, e.g., 120 seconds? */
279

    
280
static gboolean janus_cleanup_session(gpointer user_data) {
281
        janus_session *session = (janus_session *) user_data;
282

    
283
        JANUS_LOG(LOG_INFO, "Cleaning up session %"SCNu64"...\n", session->session_id);
284
        janus_session_destroy(session->session_id);
285

    
286
        return G_SOURCE_REMOVE;
287
}
288

    
289
static gboolean janus_check_sessions(gpointer user_data) {
290
        GMainContext *watchdog_context = (GMainContext *) user_data;
291
        janus_mutex_lock(&sessions_mutex);
292
        if (sessions) {
293
                GHashTableIter iter;
294
                gpointer value;
295

    
296
                g_hash_table_iter_init(&iter, sessions);
297
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
298
                        janus_session *session = (janus_session *) value;
299

    
300
                        if (!session || session->destroy) {
301
                                continue;
302
                        }
303

    
304
                        gint64 now = janus_get_monotonic_time();
305
                        if (now - session->last_activity >= SESSION_TIMEOUT * G_USEC_PER_SEC && !session->timeout) {
306
                                JANUS_LOG(LOG_INFO, "Timeout expired for session %"SCNu64"...\n", session->session_id);
307

    
308
                                json_t *event = json_object();
309
                                json_object_set_new(event, "janus", json_string("timeout"));
310
                                json_object_set_new(event, "session_id", json_integer(session->session_id));
311
                                gchar *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
312
                                json_decref(event);
313

    
314
                                janus_http_event *notification = calloc(1, sizeof(janus_http_event));
315
                                notification->code = 200;
316
                                notification->payload = event_text;
317
                                notification->allocated = 1;
318
                                g_async_queue_push(session->messages, notification);
319
                                session->timeout = 1;
320

    
321
                                g_hash_table_iter_remove(&iter);
322
                                g_hash_table_insert(old_sessions, GUINT_TO_POINTER(session->session_id), session);
323

    
324
                                /* Schedule the session for deletion */
325
                                GSource *timeout_source = g_timeout_source_new_seconds(3);
326
                                g_source_set_callback(timeout_source, janus_cleanup_session, session, NULL);
327
                                g_source_attach(timeout_source, watchdog_context);
328
                                g_source_unref(timeout_source);
329
                        }
330
                }
331
        }
332
        janus_mutex_unlock(&sessions_mutex);
333

    
334
        return G_SOURCE_CONTINUE;
335
}
336

    
337
static gpointer janus_sessions_watchdog(gpointer user_data) {
338
        GMainLoop *loop = (GMainLoop *) user_data;
339
        GMainContext *watchdog_context = g_main_loop_get_context(loop);
340
        GSource *timeout_source;
341

    
342
        timeout_source = g_timeout_source_new_seconds(2);
343
        g_source_set_callback(timeout_source, janus_check_sessions, watchdog_context, NULL);
344
        g_source_attach(timeout_source, watchdog_context);
345
        g_source_unref(timeout_source);
346

    
347
        JANUS_LOG(LOG_INFO, "Sessions watchdog started\n");
348

    
349
        g_main_loop_run(loop);
350

    
351
        return NULL;
352
}
353

    
354
janus_session *janus_session_create(guint64 session_id) {
355
        if(session_id == 0) {
356
                while(session_id == 0) {
357
                        session_id = g_random_int();
358
                        if(janus_session_find(session_id) != NULL) {
359
                                /* Session ID already taken, try another one */
360
                                session_id = 0;
361
                        }
362
                }
363
        }
364
        JANUS_LOG(LOG_INFO, "Creating new session: %"SCNu64"\n", session_id);
365
        janus_session *session = (janus_session *)calloc(1, sizeof(janus_session));
366
        if(session == NULL) {
367
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
368
                return NULL;
369
        }
370
        session->session_id = session_id;
371
        session->messages = g_async_queue_new_full((GDestroyNotify) janus_http_event_free);
372
        session->destroy = 0;
373
        session->last_activity = janus_get_monotonic_time();
374
        janus_mutex_init(&session->mutex);
375
        janus_mutex_lock(&sessions_mutex);
376
        g_hash_table_insert(sessions, GUINT_TO_POINTER(session_id), session);
377
        janus_mutex_unlock(&sessions_mutex);
378
        return session;
379
}
380

    
381
janus_session *janus_session_find(guint64 session_id) {
382
        janus_mutex_lock(&sessions_mutex);
383
        janus_session *session = g_hash_table_lookup(sessions, GUINT_TO_POINTER(session_id));
384
        janus_mutex_unlock(&sessions_mutex);
385
        return session;
386
}
387

    
388
janus_session *janus_session_find_destroyed(guint64 session_id) {
389
        janus_mutex_lock(&sessions_mutex);
390
        janus_session *session = g_hash_table_lookup(old_sessions, GUINT_TO_POINTER(session_id));
391
        janus_mutex_unlock(&sessions_mutex);
392
        return session;
393
}
394

    
395
/* Destroys a session but does not remove it from the sessions hash table. */
396
gint janus_session_destroy(guint64 session_id) {
397
        janus_session *session = janus_session_find_destroyed(session_id);
398
        if(session == NULL) {
399
                JANUS_LOG(LOG_ERR, "Couldn't find session to destroy: %"SCNu64"\n", session_id);
400
                return -1;
401
        }
402
        JANUS_LOG(LOG_INFO, "Destroying session %"SCNu64"\n", session_id);
403
        session->destroy = 1;
404
        if (session->ice_handles != NULL) {
405
                GHashTableIter iter;
406
                gpointer value;
407

    
408
                /* Remove all handles */
409
                g_hash_table_iter_init(&iter, session->ice_handles);
410
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
411
                        janus_ice_handle *handle = value;
412

    
413
                        if(!handle || g_atomic_int_get(&stop)) {
414
                                continue;
415
                        }
416

    
417
                        janus_ice_handle_destroy(session, handle->handle_id);
418
                        g_hash_table_iter_remove(&iter);
419
                }
420
        }
421

    
422
        /* TODO Actually destroy session */
423
        janus_session_free(session);
424

    
425
        return 0;
426
}
427

    
428
void janus_session_free(janus_session *session) {
429
        if(session == NULL)
430
                return;
431
        janus_mutex_lock(&session->mutex);
432
        if(session->ice_handles != NULL) {
433
                g_hash_table_destroy(session->ice_handles);
434
                session->ice_handles = NULL;
435
        }
436
        if(session->messages != NULL) {
437
                g_async_queue_unref (session->messages);
438
                session->messages = NULL;
439
        }
440
        janus_mutex_unlock(&session->mutex);
441
        session = NULL;
442
}
443

    
444

    
445
/* Connection notifiers */
446
int janus_ws_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
447
        struct sockaddr_in *sin = (struct sockaddr_in *)addr;
448
        char *ip = inet_ntoa(sin->sin_addr);
449
        JANUS_LOG(LOG_VERB, "New connection on REST API: %s\n", ip);
450
        /* TODO Implement access limitation based on IP addresses */
451
        return MHD_YES;
452
}
453

    
454
int janus_admin_ws_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen) {
455
        struct sockaddr_in *sin = (struct sockaddr_in *)addr;
456
        char *ip = inet_ntoa(sin->sin_addr);
457
        JANUS_LOG(LOG_VERB, "New connection on admin/monitor: %s\n", ip);
458
        /* Any access limitation based on this IP address? */
459
        if(!janus_admin_is_allowed(ip)) {
460
                JANUS_LOG(LOG_ERR, "IP %s is unauthorized to connect to the admin/monitor interface\n", ip);
461
                return MHD_NO;
462
        }
463
        return MHD_YES;
464
}
465

    
466

    
467
/* WebServer requests handler */
468
int janus_ws_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
469
{
470
        char *payload = NULL;
471
        struct MHD_Response *response = NULL;
472
        int ret = MHD_NO;
473
        gchar *session_path = NULL, *handle_path = NULL;
474
        gchar **basepath = NULL, **path = NULL;
475

    
476
        JANUS_LOG(LOG_VERB, "Got a HTTP %s request on %s...\n", method, url);
477
        /* Is this the first round? */
478
        int firstround = 0;
479
        janus_http_msg *msg = (janus_http_msg *)*ptr;
480
        if (msg == NULL) {
481
                firstround = 1;
482
                JANUS_LOG(LOG_VERB, " ... Just parsing headers for now...\n");
483
                msg = calloc(1, sizeof(janus_http_msg));
484
                if(msg == NULL) {
485
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
486
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
487
                        MHD_destroy_response(response);
488
                        goto done;
489
                }
490
                msg->acrh = NULL;
491
                msg->acrm = NULL;
492
                msg->payload = NULL;
493
                msg->len = 0;
494
                msg->session_id = 0;
495
                *ptr = msg;
496
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_ws_headers, msg);
497
                ret = MHD_YES;
498
        }
499
        /* Parse request */
500
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
501
                JANUS_LOG(LOG_ERR, "Unsupported method...\n");
502
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
503
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
504
                if(msg->acrm)
505
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
506
                if(msg->acrh)
507
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
508
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_IMPLEMENTED, response);
509
                MHD_destroy_response(response);
510
                return ret;
511
        }
512
        if (!strcasecmp(method, "OPTIONS")) {
513
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO); 
514
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
515
                if(msg->acrm)
516
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
517
                if(msg->acrh)
518
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
519
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
520
                MHD_destroy_response(response);
521
        }
522
        /* Get path components */
523
        if(strcasecmp(url, ws_path)) {
524
                if(strlen(ws_path) > 1) {
525
                        basepath = g_strsplit(url, ws_path, -1);
526
                } else {
527
                        /* The base path is the web server too itself, we process the url itself */
528
                        basepath = calloc(3, sizeof(char *));
529
                        basepath[0] = g_strdup("/");
530
                        basepath[1] = g_strdup(url);
531
                }
532
                if(basepath[1] == NULL || basepath[1][0] != '/') {
533
                        JANUS_LOG(LOG_ERR, "Invalid url %s (%s)\n", url, basepath[1]);
534
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
535
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
536
                        if(msg->acrm)
537
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
538
                        if(msg->acrh)
539
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
540
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
541
                        MHD_destroy_response(response);
542
                }
543
                if(firstround) {
544
                        g_strfreev(basepath);
545
                        return ret;
546
                }
547
                path = g_strsplit(basepath[1], "/", -1);
548
                if(path == NULL || path[1] == NULL) {
549
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
550
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
551
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
552
                        if(msg->acrm)
553
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
554
                        if(msg->acrh)
555
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
556
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
557
                        MHD_destroy_response(response);
558
                }
559
        }
560
        if(firstround)
561
                return ret;
562
        JANUS_LOG(LOG_VERB, " ... parsing request...\n");
563
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
564
                session_path = g_strdup(path[1]);
565
                if(session_path == NULL) {
566
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
567
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
568
                        MHD_destroy_response(response);
569
                        goto done;
570
                }
571
                JANUS_LOG(LOG_VERB, "Session: %s\n", session_path);
572
        }
573
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
574
                handle_path = g_strdup(path[2]);
575
                if(handle_path == NULL) {
576
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
577
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
578
                        MHD_destroy_response(response);
579
                        goto done;
580
                }
581
                JANUS_LOG(LOG_VERB, "Handle: %s\n", handle_path);
582
        }
583
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
584
                JANUS_LOG(LOG_ERR, "Too many components...\n");
585
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
586
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
587
                if(msg->acrm)
588
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
589
                if(msg->acrh)
590
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
591
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
592
                MHD_destroy_response(response);
593
                goto done;
594
        }
595
        /* Get payload, if any */
596
        if(!strcasecmp(method, "POST")) {
597
                JANUS_LOG(LOG_VERB, "Processing POST data (%s)...\n", msg->contenttype);
598
                if(*upload_data_size != 0) {
599
                        JANUS_LOG(LOG_VERB, "  -- Uploaded data (%zu bytes)\n", *upload_data_size);
600
                        if(msg->payload == NULL)
601
                                msg->payload = calloc(1, *upload_data_size+1);
602
                        else
603
                                msg->payload = realloc(msg->payload, msg->len+*upload_data_size+1);
604
                        if(msg->payload == NULL) {
605
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
606
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
607
                                MHD_destroy_response(response);
608
                                goto done;
609
                        }
610
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
611
                        memset(msg->payload+msg->len+*upload_data_size, '\0', 1);
612
                        msg->len += *upload_data_size;
613
                        JANUS_LOG(LOG_VERB, "  -- Data we have now (%zu bytes)\n", msg->len);
614
                        *upload_data_size = 0;        /* Go on */
615
                        ret = MHD_YES;
616
                        goto done;
617
                }
618
                JANUS_LOG(LOG_VERB, "Done getting payload, we can answer\n");
619
                if(msg->payload == NULL) {
620
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
621
                        ret = MHD_NO;
622
                        goto done;
623
                }
624
                payload = msg->payload;
625
                JANUS_LOG(LOG_VERB, "%s\n", payload);
626
        }
627

    
628
        /* Process the request, specifying this HTTP connection is the source */
629
        janus_request_source source = {
630
                .type = JANUS_SOURCE_PLAIN_HTTP,
631
                .source = (void *)connection,
632
                .msg = (void *)msg,
633
        };
634
        
635
        /* Is this a generic request for info? */
636
        if(session_path != NULL && !strcmp(session_path, "info")) {
637
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
638
                if(strcasecmp(method, "GET")) {
639
                        ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_USE_GET, "Use GET for the info endpoint");
640
                        goto done;
641
                }
642
                /* Send the success reply */
643
                ret = janus_process_success(&source, "application/json", janus_info(NULL));
644
                goto done;
645
        }
646
        
647
        /* Or maybe a long poll */
648
        if(!strcasecmp(method, "GET") || !payload) {
649
                guint64 session_id = session_path ? g_ascii_strtoll(session_path, NULL, 10) : 0;
650
                if(session_id < 1) {
651
                        JANUS_LOG(LOG_ERR, "Invalid session %s\n", session_path);
652
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
653
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
654
                        if(msg->acrm)
655
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
656
                        if(msg->acrh)
657
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
658
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
659
                        MHD_destroy_response(response);
660
                        goto done;
661
                }
662
                msg->session_id = session_id;
663
                if(handle_path) {
664
                        char *location = (char *)calloc(strlen(ws_path) + strlen(session_path) + 2, sizeof(char));
665
                        g_sprintf(location, "%s/%s", ws_path, session_path);
666
                        JANUS_LOG(LOG_ERR, "Invalid GET to %s, redirecting to %s\n", url, location);
667
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
668
                        MHD_add_response_header(response, "Location", location);
669
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
670
                        if(msg->acrm)
671
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
672
                        if(msg->acrh)
673
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
674
                        ret = MHD_queue_response(connection, 302, response);
675
                        MHD_destroy_response(response);
676
                        g_free(location);
677
                        goto done;
678
                }
679
                janus_session *session = janus_session_find(session_id);
680
                if(!session) {
681
                        JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
682
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
683
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
684
                        if(msg->acrm)
685
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
686
                        if(msg->acrh)
687
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
688
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
689
                        MHD_destroy_response(response);
690
                        goto done;
691
                }
692
                if(ws_api_secret != NULL) {
693
                        /* There's an API secret, check that the client provided it */
694
                        const char *secret = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "apisecret");
695
                        if(!secret || strcmp(secret, ws_api_secret)) {
696
                                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
697
                                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
698
                                if(msg->acrm)
699
                                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
700
                                if(msg->acrh)
701
                                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
702
                                ret = MHD_queue_response(connection, MHD_HTTP_FORBIDDEN, response);
703
                                MHD_destroy_response(response);
704
                                goto done;
705
                        }
706
                }
707
                /* Update the last activity timer */
708
                session->last_activity = janus_get_monotonic_time();
709
                /* How many messages can we send back in a single response? (just one by default) */
710
                int max_events = 1;
711
                const char *maxev = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "maxev");
712
                if(maxev != NULL) {
713
                        max_events = atoi(maxev);
714
                        if(max_events < 1) {
715
                                JANUS_LOG(LOG_WARN, "Invalid maxev parameter passed (%d), defaulting to 1\n", max_events);
716
                                max_events = 1;
717
                        }
718
                }
719
                JANUS_LOG(LOG_VERB, "Session %"SCNu64" found... returning up to %d messages\n", session->session_id, max_events);
720
                /* Handle GET, taking the first message from the list */
721
                janus_http_event *event = g_async_queue_try_pop(session->messages);
722
                if(event != NULL) {
723
                        if(max_events == 1) {
724
                                /* Return just this message and leave */
725
                                ret = janus_process_success(&source, "application/json", event->payload);
726
                        } else {
727
                                /* The application is willing to receive more events at the same time, anything to report? */
728
                                json_t *list = json_array();
729
                                json_error_t error;
730
                                if(event->payload) {
731
                                        json_t *ev = json_loads(event->payload, 0, &error);
732
                                        if(ev && json_is_object(ev))        /* FIXME Should we fail if this is not valid JSON? */
733
                                                json_array_append_new(list, ev);
734
                                        g_free(event->payload);
735
                                        event->payload = NULL;
736
                                }
737
                                g_free(event);
738
                                event = NULL;
739
                                int events = 1;
740
                                while(events < max_events) {
741
                                        event = g_async_queue_try_pop(session->messages);
742
                                        if(event == NULL)
743
                                                break;
744
                                        if(event->payload) {
745
                                                json_t *ev = json_loads(event->payload, 0, &error);
746
                                                if(ev && json_is_object(ev))        /* FIXME Should we fail if this is not valid JSON? */
747
                                                        json_array_append_new(list, ev);
748
                                                g_free(event->payload);
749
                                                event->payload = NULL;
750
                                        }
751
                                        g_free(event);
752
                                        event = NULL;
753
                                        events++;
754
                                }
755
                                /* Return the array of messages and leave */
756
                                char *event_text = json_dumps(list, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
757
                                json_decref(list);
758
                                ret = janus_process_success(&source, "application/json", event_text);
759
                        }
760
                } else {
761
                        /* Still no message, wait */
762
                        ret = janus_ws_notifier(&source, max_events);
763
                }
764
                goto done;
765
        }
766
        
767
        /* Parse the JSON payload */
768
        json_error_t error;
769
        json_t *root = json_loads(payload, 0, &error);
770
        if(!root) {
771
                ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
772
                goto done;
773
        }
774
        if(!json_is_object(root)) {
775
                ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
776
                json_decref(root);
777
                goto done;
778
        }
779
        /* Check if we have session and handle identifiers */
780
        guint64 session_id = session_path ? g_ascii_strtoll(session_path, NULL, 10) : 0;
781
        guint64 handle_id = handle_path ? g_ascii_strtoll(handle_path, NULL, 10) : 0;
782
        if(session_id > 0)
783
                json_object_set_new(root, "session_id", json_integer(session_id));
784
        if(handle_id > 0)
785
                json_object_set_new(root, "handle_id", json_integer(handle_id));
786
        ret = janus_process_incoming_request(&source, root);
787

    
788
done:
789
        g_strfreev(basepath);
790
        g_strfreev(path);
791
        g_free(session_path);
792
        g_free(handle_path);
793
        return ret;
794
}
795

    
796
janus_request_source *janus_request_source_new(int type, void *source, void *msg) {
797
        janus_request_source *req_source = (janus_request_source *)calloc(1, sizeof(janus_request_source));
798
        req_source->type = type;
799
        req_source->source = source;
800
        req_source->msg = msg;
801
        return req_source;
802
}
803

    
804
void janus_request_source_destroy(janus_request_source *req_source) {
805
        if(req_source == NULL)
806
                return;
807
        req_source->source = NULL;
808
        req_source->msg = NULL;
809
        g_free(req_source);
810
}
811

    
812
int janus_process_incoming_request(janus_request_source *source, json_t *root) {
813
        int ret = MHD_NO;
814
        if(source == NULL || root == NULL) {
815
                JANUS_LOG(LOG_ERR, "Missing source or payload to process, giving up...\n");
816
                return ret;
817
        }
818
        /* Ok, let's start with the ids */
819
        guint64 session_id = 0, handle_id = 0;
820
        json_t *s = json_object_get(root, "session_id");
821
        if(s && json_is_integer(s))
822
                session_id = json_integer_value(s);
823
        json_t *h = json_object_get(root, "handle_id");
824
        if(h && json_is_integer(h))
825
                handle_id = json_integer_value(h);
826

    
827
        /* Get transaction and message request */
828
        json_t *transaction = json_object_get(root, "transaction");
829
        if(!transaction) {
830
                ret = janus_process_error(source, session_id, NULL, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (transaction)");
831
                goto jsondone;
832
        }
833
        if(!json_is_string(transaction)) {
834
                ret = janus_process_error(source, session_id, NULL, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (transaction should be a string)");
835
                goto jsondone;
836
        }
837
        const gchar *transaction_text = json_string_value(transaction);
838
        json_t *message = json_object_get(root, "janus");
839
        if(!message) {
840
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (janus)");
841
                goto jsondone;
842
        }
843
        if(!json_is_string(message)) {
844
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (janus should be a string)");
845
                goto jsondone;
846
        }
847
        const gchar *message_text = json_string_value(message);
848
        
849
        if(session_id == 0 && handle_id == 0) {
850
                /* Can only be a 'Create new session', a 'Get info' or a 'Ping/Pong' request */
851
                if(!strcasecmp(message_text, "info")) {
852
                        ret = janus_process_success(source, "application/json", janus_info(transaction_text));
853
                        goto jsondone;
854
                }
855
                if(!strcasecmp(message_text, "ping")) {
856
                        /* Prepare JSON reply */
857
                        json_t *reply = json_object();
858
                        json_object_set_new(reply, "janus", json_string("pong"));
859
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
860
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
861
                        json_decref(reply);
862
                        ret = janus_process_success(source, "application/json", g_strdup(reply_text));
863
                        goto jsondone;
864
                }
865
                if(strcasecmp(message_text, "create")) {
866
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
867
                        goto jsondone;
868
                }
869
                if(ws_api_secret != NULL) {
870
                        /* There's an API secret, check that the client provided it */
871
                        json_t *secret = json_object_get(root, "apisecret");
872
                        if(!secret || !json_is_string(secret) || strcmp(json_string_value(secret), ws_api_secret)) {
873
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);
874
                                goto jsondone;
875
                        }
876
                }
877
                session_id = 0;
878
                json_t *id = json_object_get(root, "id");
879
                if(id != NULL) {
880
                        /* The application provided the session ID to use */
881
                        if(!json_is_integer(id)) {
882
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (id should be an integer)");
883
                                goto jsondone;
884
                        }
885
                        session_id = json_integer_value(id);
886
                        if(session_id > 0 && janus_session_find(session_id) != NULL) {
887
                                /* Session ID already taken */
888
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_CONFLICT, "Session ID already in use");
889
                                goto jsondone;
890
                        }
891
                }
892
                /* Handle it */
893
                janus_session *session = janus_session_create(session_id);
894
                if(session == NULL) {
895
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Memory error");
896
                        goto jsondone;
897
                }
898
                session_id = session->session_id;
899
#ifdef HAVE_WEBSOCKETS
900
                if(source->type == JANUS_SOURCE_WEBSOCKETS) {
901
                        /* Add the new session to the list of sessions created by this WS client */
902
                        janus_websocket_client *client = (janus_websocket_client *)source->source;
903
                        if(client) {
904
                                janus_mutex_lock(&client->mutex);
905
                                if(client->sessions == NULL)
906
                                        client->sessions = g_hash_table_new(NULL, NULL);
907
                                g_hash_table_insert(client->sessions, GUINT_TO_POINTER(session_id), session);
908
                                janus_mutex_unlock(&client->mutex);
909
                        }
910
                }
911
#endif
912
#ifdef HAVE_RABBITMQ
913
                if(source->type == JANUS_SOURCE_RABBITMQ) {
914
                        /* Add the new session to the list of sessions created by this WS client */
915
                        janus_rabbitmq_client *client = (janus_rabbitmq_client *)source->source;
916
                        if(client) {
917
                                janus_mutex_lock(&client->mutex);
918
                                if(client->sessions == NULL)
919
                                        client->sessions = g_hash_table_new(NULL, NULL);
920
                                g_hash_table_insert(client->sessions, GUINT_TO_POINTER(session_id), session);
921
                                janus_mutex_unlock(&client->mutex);
922
                        }
923
                }
924
#endif
925
                /* Prepare JSON reply */
926
                json_t *reply = json_object();
927
                json_object_set_new(reply, "janus", json_string("success"));
928
                json_object_set_new(reply, "transaction", json_string(transaction_text));
929
                json_t *data = json_object();
930
                json_object_set_new(data, "id", json_integer(session_id));
931
                json_object_set(reply, "data", data);
932
                /* Convert to a string */
933
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
934
                json_decref(data);
935
                json_decref(reply);
936
                /* Send the success reply */
937
                ret = janus_process_success(source, "application/json", reply_text);
938
                goto jsondone;
939
        }
940
        if(session_id < 1) {
941
                JANUS_LOG(LOG_ERR, "Invalid session\n");
942
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);
943
                goto jsondone;
944
        }
945
        if(source->type == JANUS_SOURCE_PLAIN_HTTP) {
946
                janus_http_msg *msg = (janus_http_msg *)source->msg;
947
                if(msg != NULL)
948
                        msg->session_id = session_id;
949
        }
950
        if(h && handle_id < 1) {
951
                JANUS_LOG(LOG_ERR, "Invalid handle\n");
952
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);
953
                goto jsondone;
954
        }
955

    
956
        /* Go on with the processing */
957
        if(ws_api_secret != NULL) {
958
                /* There's an API secret, check that the client provided it */
959
                json_t *secret = json_object_get(root, "apisecret");
960
                if(!secret || !json_is_string(secret) || strcmp(json_string_value(secret), ws_api_secret)) {
961
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);
962
                        goto jsondone;
963
                }
964
        }
965

    
966
        /* If we got here, make sure we have a session (and/or a handle) */
967
        janus_session *session = janus_session_find(session_id);
968
        if(!session) {
969
                JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
970
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, "No such session %"SCNu64"", session_id);
971
                goto jsondone;
972
        }
973
        janus_ice_handle *handle = NULL;
974
        if(handle_id > 0) {
975
                handle = janus_ice_handle_find(session, handle_id);
976
                if(!handle) {
977
                        JANUS_LOG(LOG_ERR, "Couldn't find any handle %"SCNu64" in session %"SCNu64"...\n", handle_id, session_id);
978
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_HANDLE_NOT_FOUND, "No such handle %"SCNu64" in session %"SCNu64"", handle_id, session_id);
979
                        goto jsondone;
980
                }
981
        }
982
        /* Update the last activity timer */
983
        session->last_activity = janus_get_monotonic_time();
984

    
985
        /* What is this? */
986
        if(!strcasecmp(message_text, "keepalive")) {
987
                /* Just a keep-alive message, reply with an ack */
988
                JANUS_LOG(LOG_VERB, "Got a keep-alive on session %"SCNu64"\n", session_id);
989
                json_t *reply = json_object();
990
                json_object_set_new(reply, "janus", json_string("ack"));
991
                json_object_set_new(reply, "session_id", json_integer(session_id));
992
                json_object_set_new(reply, "transaction", json_string(transaction_text));
993
                /* Convert to a string */
994
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
995
                json_decref(reply);
996
                /* Send the success reply */
997
                ret = janus_process_success(source, "application/json", reply_text);
998
        } else if(!strcasecmp(message_text, "attach")) {
999
                if(handle != NULL) {
1000
                        /* Attach is a session-level command */
1001
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
1002
                        goto jsondone;
1003
                }
1004
                json_t *plugin = json_object_get(root, "plugin");
1005
                if(!plugin) {
1006
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (plugin)");
1007
                        goto jsondone;
1008
                }
1009
                if(!json_is_string(plugin)) {
1010
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (plugin should be a string)");
1011
                        goto jsondone;
1012
                }
1013
                const gchar *plugin_text = json_string_value(plugin);
1014
                janus_plugin *plugin_t = janus_plugin_find(plugin_text);
1015
                if(plugin_t == NULL) {
1016
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_NOT_FOUND, "No such plugin '%s'", plugin_text);
1017
                        goto jsondone;
1018
                }
1019
                /* Create handle */
1020
                handle = janus_ice_handle_create(session);
1021
                if(handle == NULL) {
1022
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Memory error");
1023
                        goto jsondone;
1024
                }
1025
                handle_id = handle->handle_id;
1026
                /* Attach to the plugin */
1027
                int error = 0;
1028
                if((error = janus_ice_handle_attach_plugin(session, handle_id, plugin_t)) != 0) {
1029
                        /* TODO Make error struct to pass verbose information */
1030
                        janus_ice_handle_destroy(session, handle_id);
1031
                        janus_mutex_lock(&session->mutex);
1032
                        g_hash_table_remove(session->ice_handles, GUINT_TO_POINTER(handle_id));
1033
                        janus_mutex_unlock(&session->mutex);
1034

    
1035
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_ATTACH, "Couldn't attach to plugin: error '%d'", error);
1036
                        goto jsondone;
1037
                }
1038
                /* Prepare JSON reply */
1039
                json_t *reply = json_object();
1040
                json_object_set_new(reply, "janus", json_string("success"));
1041
                json_object_set_new(reply, "session_id", json_integer(session_id));
1042
                json_object_set_new(reply, "transaction", json_string(transaction_text));
1043
                json_t *data = json_object();
1044
                json_object_set_new(data, "id", json_integer(handle_id));
1045
                json_object_set(reply, "data", data);
1046
                /* Convert to a string */
1047
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1048
                json_decref(data);
1049
                json_decref(reply);
1050
                /* Send the success reply */
1051
                ret = janus_process_success(source, "application/json", reply_text);
1052
        } else if(!strcasecmp(message_text, "destroy")) {
1053
                if(handle != NULL) {
1054
                        /* Query is a session-level command */
1055
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
1056
                        goto jsondone;
1057
                }
1058
#ifdef HAVE_WEBSOCKETS
1059
                if(source->type == JANUS_SOURCE_WEBSOCKETS) {
1060
                        /* Remove the session from the list of sessions created by this WS client */
1061
                        janus_websocket_client *client = (janus_websocket_client *)source->source;
1062
                        if(client) {
1063
                                janus_mutex_lock(&client->mutex);
1064
                                if(client->sessions)
1065
                                        g_hash_table_remove(client->sessions, GUINT_TO_POINTER(session_id));
1066
                                janus_mutex_unlock(&client->mutex);
1067
                        }
1068
                }
1069
#endif
1070
                //~ janus_session_destroy(session_id);        /* FIXME Should we check if this actually succeeded, or can we ignore it? */
1071

    
1072
                /* Schedule the session for deletion */
1073
                session->destroy = 1;
1074
                janus_mutex_lock(&sessions_mutex);
1075
                g_hash_table_remove(sessions, GUINT_TO_POINTER(session->session_id));
1076
                g_hash_table_insert(old_sessions, GUINT_TO_POINTER(session->session_id), session);
1077
                GSource *timeout_source = g_timeout_source_new_seconds(3);
1078
                g_source_set_callback(timeout_source, janus_cleanup_session, session, NULL);
1079
                g_source_attach(timeout_source, sessions_watchdog_context);
1080
                g_source_unref(timeout_source);
1081
                janus_mutex_unlock(&sessions_mutex);
1082

    
1083
                /* Prepare JSON reply */
1084
                json_t *reply = json_object();
1085
                json_object_set_new(reply, "janus", json_string("success"));
1086
                json_object_set_new(reply, "session_id", json_integer(session_id));
1087
                json_object_set_new(reply, "transaction", json_string(transaction_text));
1088
                /* Convert to a string */
1089
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1090
                json_decref(reply);
1091
                /* Send the success reply */
1092
                ret = janus_process_success(source, "application/json", reply_text);
1093
        } else if(!strcasecmp(message_text, "detach")) {
1094
                if(handle == NULL) {
1095
                        /* Query is an handle-level command */
1096
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
1097
                        goto jsondone;
1098
                }
1099
                if(handle->app == NULL || handle->app_handle == NULL) {
1100
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_DETACH, "No plugin to detach from");
1101
                        goto jsondone;
1102
                }
1103
                int error = janus_ice_handle_destroy(session, handle_id);
1104
                janus_mutex_lock(&session->mutex);
1105
                g_hash_table_remove(session->ice_handles, GUINT_TO_POINTER(handle_id));
1106
                janus_mutex_unlock(&session->mutex);
1107

    
1108
                if(error != 0) {
1109
                        /* TODO Make error struct to pass verbose information */
1110
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_DETACH, "Couldn't detach from plugin: error '%d'", error);
1111
                        /* TODO Delete handle instance */
1112
                        goto jsondone;
1113
                }
1114
                /* Prepare JSON reply */
1115
                json_t *reply = json_object();
1116
                json_object_set_new(reply, "janus", json_string("success"));
1117
                json_object_set_new(reply, "session_id", json_integer(session_id));
1118
                json_object_set_new(reply, "transaction", json_string(transaction_text));
1119
                /* Convert to a string */
1120
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1121
                json_decref(reply);
1122
                /* Send the success reply */
1123
                ret = janus_process_success(source, "application/json", reply_text);
1124
        } else if(!strcasecmp(message_text, "message")) {
1125
                if(handle == NULL) {
1126
                        /* Query is an handle-level command */
1127
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
1128
                        goto jsondone;
1129
                }
1130
                if(handle->app == NULL || handle->app_handle == NULL) {
1131
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "No plugin to handle this message");
1132
                        goto jsondone;
1133
                }
1134
                janus_plugin *plugin_t = (janus_plugin *)handle->app;
1135
                JANUS_LOG(LOG_INFO, "[%"SCNu64"] There's a message for %s\n", handle->handle_id, plugin_t->get_name());
1136
                json_t *body = json_object_get(root, "body");
1137
                if(body == NULL) {
1138
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (body)");
1139
                        goto jsondone;
1140
                }
1141
                if(!json_is_object(body)) {
1142
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_JSON_OBJECT, "Invalid body object");
1143
                        goto jsondone;
1144
                }
1145
                /* Is there an SDP attached? */
1146
                json_t *jsep = json_object_get(root, "jsep");
1147
                char *jsep_type = NULL;
1148
                char *jsep_sdp = NULL, *jsep_sdp_stripped = NULL;
1149
                if(jsep != NULL) {
1150
                        if(!json_is_object(jsep)) {
1151
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_JSON_OBJECT, "Invalid jsep object");
1152
                                goto jsondone;
1153
                        }
1154
                        json_t *type = json_object_get(jsep, "type");
1155
                        if(!type) {
1156
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "JSEP error: missing mandatory element (type)");
1157
                                goto jsondone;
1158
                        }
1159
                        if(!json_is_string(type)) {
1160
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "JSEP error: invalid element type (type should be a string)");
1161
                                goto jsondone;
1162
                        }
1163
                        jsep_type = g_strdup(json_string_value(type));
1164
                        if(jsep_type == NULL) {
1165
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1166
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Memory error");
1167
                                goto jsondone;
1168
                        }
1169
                        type = NULL;
1170
                        /* Are we still cleaning up from a previous media session? */
1171
                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {
1172
                                JANUS_LOG(LOG_INFO, "[%"SCNu64"] Still cleaning up from a previous media session, let's wait a bit...\n", handle->handle_id);
1173
                                gint64 waited = 0;
1174
                                while(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {
1175
                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still cleaning up from a previous media session, let's wait a bit...\n", handle->handle_id);
1176
                                        g_usleep(100000);
1177
                                        waited += 100000;
1178
                                        if(waited >= 3*G_USEC_PER_SEC) {
1179
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 3 seconds, that's enough!\n", handle->handle_id);
1180
                                                break;
1181
                                        }
1182
                                }
1183
                        }
1184
                        /* Check the JSEP type */
1185
                        int offer = 0;
1186
                        if(!strcasecmp(jsep_type, "offer")) {
1187
                                offer = 1;
1188
                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1189
                        } else if(!strcasecmp(jsep_type, "answer")) {
1190
                                offer = 0;
1191
                        } else {
1192
                                /* TODO Handle other message types as well */
1193
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_JSEP_UNKNOWN_TYPE, "JSEP error: unknown message type '%s'", jsep_type);
1194
                                g_free(jsep_type);
1195
                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1196
                                goto jsondone;
1197
                        }
1198
                        json_t *sdp = json_object_get(jsep, "sdp");
1199
                        if(!sdp) {
1200
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "JSEP error: missing mandatory element (sdp)");
1201
                                g_free(jsep_type);
1202
                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1203
                                goto jsondone;
1204
                        }
1205
                        if(!json_is_string(sdp)) {
1206
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "JSEP error: invalid element type (sdp should be a string)");
1207
                                g_free(jsep_type);
1208
                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1209
                                goto jsondone;
1210
                        }
1211
                        jsep_sdp = (char *)json_string_value(sdp);
1212
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Remote SDP:\n%s", handle->handle_id, jsep_sdp);
1213
                        /* Is this valid SDP? */
1214
                        int audio = 0, video = 0, data = 0, bundle = 0, rtcpmux = 0, trickle = 0;
1215
                        janus_sdp *parsed_sdp = janus_sdp_preparse(jsep_sdp, &audio, &video, &data, &bundle, &rtcpmux, &trickle);
1216
                        if(parsed_sdp == NULL) {
1217
                                /* Invalid SDP */
1218
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_JSEP_INVALID_SDP, "JSEP error: invalid SDP");
1219
                                g_free(jsep_type);
1220
                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1221
                                goto jsondone;
1222
                        }
1223
                        /* FIXME We're only handling single audio/video lines for now... */
1224
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Audio %s been negotiated\n", handle->handle_id, audio ? "has" : "has NOT");
1225
                        if(audio > 1) {
1226
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one audio line? only going to negotiate one...\n", handle->handle_id);
1227
                        }
1228
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Video %s been negotiated\n", handle->handle_id, video ? "has" : "has NOT");
1229
                        if(video > 1) {
1230
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one video line? only going to negotiate one...\n", handle->handle_id);
1231
                        }
1232
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] SCTP/DataChannels %s been negotiated\n", handle->handle_id, data ? "have" : "have NOT");
1233
                        if(data > 1) {
1234
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one data line? only going to negotiate one...\n", handle->handle_id);
1235
                        }
1236
#ifndef HAVE_SCTP
1237
                        if(data) {
1238
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"]   -- DataChannels have been negotiated, but support for them has not been compiled...\n", handle->handle_id);
1239
                        }
1240
#endif
1241
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] The browser %s BUNDLE\n", handle->handle_id, bundle ? "supports" : "does NOT support");
1242
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] The browser %s rtcp-mux\n", handle->handle_id, rtcpmux ? "supports" : "does NOT support");
1243
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] The browser %s doing Trickle ICE\n", handle->handle_id, trickle ? "is" : "is NOT");
1244
                        /* Check if it's a new session, or an update... */
1245
                        if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)
1246
                                        || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) {
1247
                                /* New session */
1248
                                if(offer) {
1249
                                        /* Setup ICE locally (we received an offer) */
1250
                                        if(janus_ice_setup_local(handle, offer, audio, video, data, bundle, rtcpmux, trickle) < 0) {
1251
                                                JANUS_LOG(LOG_ERR, "Error setting ICE locally\n");
1252
                                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Error setting ICE locally");
1253
                                                goto jsondone;
1254
                                        }
1255
                                } else {
1256
                                        /* Make sure we're waiting for an ANSWER in the first place */
1257
                                        if(!handle->agent) {
1258
                                                JANUS_LOG(LOG_ERR, "Unexpected ANSWER (did we offer?)\n");
1259
                                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNEXPECTED_ANSWER, "Unexpected ANSWER (did we offer?)");
1260
                                                goto jsondone;
1261
                                        }
1262
                                }
1263
                                janus_sdp_parse(handle, parsed_sdp);
1264
                                janus_sdp_free(parsed_sdp);
1265
                                if(!offer) {
1266
                                        /* Set remote candidates now (we received an answer) */
1267
                                        if(bundle) {
1268
                                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE);
1269
                                        } else {
1270
                                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE);
1271
                                        }
1272
                                        if(rtcpmux) {
1273
                                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX);
1274
                                        } else {
1275
                                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX);
1276
                                        }
1277
                                        if(trickle) {
1278
                                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);
1279
                                        } else {
1280
                                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);
1281
                                        }
1282
                                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)) {
1283
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- bundle is supported by the browser, getting rid of one of the RTP/RTCP components, if any...\n", handle->handle_id);
1284
                                                if(audio) {
1285
                                                        /* Get rid of video and data, if present */
1286
                                                        if(handle->streams && handle->video_stream) {
1287
                                                                handle->audio_stream->video_ssrc = handle->video_stream->video_ssrc;
1288
                                                                handle->audio_stream->video_ssrc_peer = handle->video_stream->video_ssrc_peer;
1289
                                                                janus_ice_stream_free(handle->streams, handle->video_stream);
1290
                                                        }
1291
                                                        handle->video_stream = NULL;
1292
                                                        if(handle->video_id > 0) {
1293
                                                                nice_agent_attach_recv (handle->agent, handle->video_id, 1, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1294
                                                                nice_agent_attach_recv (handle->agent, handle->video_id, 2, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1295
                                                        }
1296
                                                        handle->video_id = 0;
1297
                                                        if(handle->streams && handle->data_stream) {
1298
                                                                janus_ice_stream_free(handle->streams, handle->data_stream);
1299
                                                        }
1300
                                                        handle->data_stream = NULL;
1301
                                                        if(handle->data_id > 0) {
1302
                                                                nice_agent_attach_recv (handle->agent, handle->data_id, 1, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1303
                                                        }
1304
                                                        handle->data_id = 0;
1305
                                                } else if(video) {
1306
                                                        /* Get rid of data, if present */
1307
                                                        if(handle->streams && handle->data_stream) {
1308
                                                                janus_ice_stream_free(handle->streams, handle->data_stream);
1309
                                                        }
1310
                                                        handle->data_stream = NULL;
1311
                                                        if(handle->data_id > 0) {
1312
                                                                nice_agent_attach_recv (handle->agent, handle->data_id, 1, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1313
                                                        }
1314
                                                        handle->data_id = 0;
1315
                                                }
1316
                                        }
1317
                                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX)) {
1318
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- rtcp-mux is supported by the browser, getting rid of RTCP components, if any...\n", handle->handle_id);
1319
                                                if(handle->audio_stream && handle->audio_stream->components != NULL) {
1320
                                                        nice_agent_attach_recv (handle->agent, handle->audio_id, 2, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1321
                                                        janus_ice_component_free(handle->audio_stream->components, handle->audio_stream->rtcp_component);
1322
                                                        handle->audio_stream->rtcp_component = NULL;
1323
                                                }
1324
                                                if(handle->video_stream && handle->video_stream->components != NULL) {
1325
                                                        nice_agent_attach_recv (handle->agent, handle->video_id, 2, g_main_loop_get_context (handle->iceloop), NULL, NULL);
1326
                                                        janus_ice_component_free(handle->video_stream->components, handle->video_stream->rtcp_component);
1327
                                                        handle->video_stream->rtcp_component = NULL;
1328
                                                }
1329
                                        }
1330
                                        /* FIXME Any disabled m-line? */
1331
                                        if(strstr(jsep_sdp, "m=audio 0")) {
1332
                                                JANUS_LOG(LOG_VERB, "Audio disabled via SDP\n");
1333
                                                if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
1334
                                                                || (!video && !data)) {
1335
                                                        JANUS_LOG(LOG_VERB, "  -- Marking audio stream as disabled\n");
1336
                                                        janus_ice_stream *stream = g_hash_table_lookup(handle->streams, GUINT_TO_POINTER(handle->audio_id));
1337
                                                        if(stream)
1338
                                                                stream->disabled = TRUE;
1339
                                                }
1340
                                        }
1341
                                        if(strstr(jsep_sdp, "m=video 0")) {
1342
                                                JANUS_LOG(LOG_VERB, "Video disabled via SDP\n");
1343
                                                if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
1344
                                                                || (!audio && !data)) {
1345
                                                        JANUS_LOG(LOG_VERB, "  -- Marking video stream as disabled\n");
1346
                                                        janus_ice_stream *stream = NULL;
1347
                                                        if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)) {
1348
                                                                stream = g_hash_table_lookup(handle->streams, GUINT_TO_POINTER(handle->video_id));
1349
                                                        } else {
1350
                                                                gint id = handle->audio_id > 0 ? handle->audio_id : handle->video_id;
1351
                                                                stream = g_hash_table_lookup(handle->streams, GUINT_TO_POINTER(id));
1352
                                                        }
1353
                                                        if(stream)
1354
                                                                stream->disabled = TRUE;
1355
                                                }
1356
                                        }
1357
                                        if(strstr(jsep_sdp, "m=application 0 DTLS/SCTP")) {
1358
                                                JANUS_LOG(LOG_VERB, "Data Channel disabled via SDP\n");
1359
                                                if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
1360
                                                                || (!audio && !video)) {
1361
                                                        JANUS_LOG(LOG_VERB, "  -- Marking data channel stream as disabled\n");
1362
                                                        janus_ice_stream *stream = NULL;
1363
                                                        if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)) {
1364
                                                                stream = g_hash_table_lookup(handle->streams, GUINT_TO_POINTER(handle->data_id));
1365
                                                        } else {
1366
                                                                gint id = handle->audio_id > 0 ? handle->audio_id : (handle->video_id > 0 ? handle->video_id : handle->data_id);
1367
                                                                stream = g_hash_table_lookup(handle->streams, GUINT_TO_POINTER(id));
1368
                                                        }
1369
                                                        if(stream)
1370
                                                                stream->disabled = TRUE;
1371
                                                }
1372
                                        }
1373
                                        janus_mutex_lock(&handle->mutex);
1374
                                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) &&
1375
                                                        !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES)) {
1376
                                                JANUS_LOG(LOG_INFO, "[%"SCNu64"]   -- ICE Trickling is supported by the browser, waiting for remote candidates...\n", handle->handle_id);
1377
                                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_START);
1378
                                        } else {
1379
                                                JANUS_LOG(LOG_INFO, "[%"SCNu64"] Done! Sending connectivity checks...\n", handle->handle_id);
1380
                                                if(handle->audio_id > 0) {
1381
                                                        janus_ice_setup_remote_candidates(handle, handle->audio_id, 1);
1382
                                                        if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX))        /* http://tools.ietf.org/html/rfc5761#section-5.1.3 */
1383
                                                                janus_ice_setup_remote_candidates(handle, handle->audio_id, 2);
1384
                                                }
1385
                                                if(handle->video_id > 0) {
1386
                                                        janus_ice_setup_remote_candidates(handle, handle->video_id, 1);
1387
                                                        if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX))        /* http://tools.ietf.org/html/rfc5761#section-5.1.3 */
1388
                                                                janus_ice_setup_remote_candidates(handle, handle->video_id, 2);
1389
                                                }
1390
                                                if(handle->data_id > 0) {
1391
                                                        janus_ice_setup_remote_candidates(handle, handle->data_id, 1);
1392
                                                }
1393
                                        }
1394
                                        janus_mutex_unlock(&handle->mutex);
1395
                                        /* We got our answer */
1396
                                        janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1397
                                }
1398
                        } else {
1399
                                /* TODO Actually handle session updates: for now we ignore them, and just relay them to plugins */
1400
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"] Ignoring negotiation update, we don't support them yet...\n", handle->handle_id);
1401
                        }
1402
                        handle->remote_sdp = g_strdup(jsep_sdp);
1403
                        /* Anonymize SDP */
1404
                        jsep_sdp_stripped = janus_sdp_anonymize(jsep_sdp);
1405
                        if(jsep_sdp_stripped == NULL) {
1406
                                /* Invalid SDP */
1407
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_JSEP_INVALID_SDP, "JSEP error: invalid SDP");
1408
                                g_free(jsep_type);
1409
                                janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1410
                                goto jsondone;
1411
                        }
1412
                        sdp = NULL;
1413
                        janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
1414
                }
1415
                char *body_text = json_dumps(body, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1416
                /* Send the message to the plugin */
1417
                janus_plugin_result *result = plugin_t->handle_message(handle->app_handle, g_strdup((char *)transaction_text), body_text, jsep_type, jsep_sdp_stripped);
1418
                if(result == NULL) {
1419
                        /* Something went horribly wrong! */
1420
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "Plugin didn't give a result");
1421
                        goto jsondone;
1422
                }
1423
                if(result->type == JANUS_PLUGIN_OK) {
1424
                        /* The plugin gave a result already (synchronous request/response) */
1425
                        if(result->content == NULL) {
1426
                                /* Missing content... */
1427
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "Plugin didn't provide any content for this synchronous response");
1428
                                janus_plugin_result_destroy(result);
1429
                                goto jsondone;
1430
                        }
1431
                        json_error_t error;
1432
                        json_t *event = json_loads(result->content, 0, &error);
1433
                        if(!event) {
1434
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot send response from plugin (JSON error: on line %d: %s)\n", handle->handle_id, error.line, error.text);
1435
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "Plugin returned an invalid JSON response");
1436
                                janus_plugin_result_destroy(result);
1437
                                goto jsondone;
1438
                        }
1439
                        if(!json_is_object(event)) {
1440
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot send response from plugin (JSON error: not an object)\n", handle->handle_id);
1441
                                json_decref(event);
1442
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "Plugin returned an invalid JSON response");
1443
                                janus_plugin_result_destroy(result);
1444
                                goto jsondone;
1445
                        }
1446
                        /* Prepare JSON response */
1447
                        json_t *reply = json_object();
1448
                        json_object_set_new(reply, "janus", json_string("success"));
1449
                        json_object_set_new(reply, "session_id", json_integer(session->session_id));
1450
                        json_object_set_new(reply, "sender", json_integer(handle->handle_id));
1451
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
1452
                        json_t *plugin_data = json_object();
1453
                        json_object_set_new(plugin_data, "plugin", json_string(plugin_t->get_package()));
1454
                        json_object_set(plugin_data, "data", event);
1455
                        json_object_set(reply, "plugindata", plugin_data);
1456
                        /* Convert to a string */
1457
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1458
                        json_decref(event);
1459
                        json_decref(plugin_data);
1460
                        if(jsep != NULL)
1461
                                json_decref(jsep);
1462
                        json_decref(reply);
1463
                        /* Send the success reply */
1464
                        ret = janus_process_success(source, "application/json", reply_text);
1465
                } else if(result->type == JANUS_PLUGIN_OK_WAIT) {
1466
                        /* The plugin received the request but didn't process it yet, send an ack (asynchronous notifications may follow) */
1467
                        json_t *reply = json_object();
1468
                        json_object_set_new(reply, "janus", json_string("ack"));
1469
                        json_object_set_new(reply, "session_id", json_integer(session_id));
1470
                        if(result->content)
1471
                                json_object_set_new(reply, "hint", json_string(result->content));
1472
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
1473
                        /* Convert to a string */
1474
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1475
                        json_decref(reply);
1476
                        /* Send the success reply */
1477
                        ret = janus_process_success(source, "application/json", reply_text);
1478
                } else {
1479
                        /* Something went horribly wrong! */
1480
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "%s", result->content ? g_strdup(result->content) : "Plugin returned a severe (unknown) error");
1481
                        janus_plugin_result_destroy(result);
1482
                        goto jsondone;
1483
                }                        
1484
                janus_plugin_result_destroy(result);
1485
        } else if(!strcasecmp(message_text, "trickle")) {
1486
                if(handle == NULL) {
1487
                        /* Trickle is an handle-level command */
1488
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
1489
                        goto jsondone;
1490
                }
1491
                if(handle->app == NULL || handle->app_handle == NULL) {
1492
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "No plugin to handle this trickle candidate");
1493
                        goto jsondone;
1494
                }
1495
                json_t *candidate = json_object_get(root, "candidate");
1496
                json_t *candidates = json_object_get(root, "candidates");
1497
                if(candidate == NULL && candidates == NULL) {
1498
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (candidate|candidates)");
1499
                        goto jsondone;
1500
                }
1501
                if(candidate != NULL && candidates != NULL) {
1502
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_JSON, "Can't have both candidate and candidates");
1503
                        goto jsondone;
1504
                }
1505
                if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE)) {
1506
                        /* It looks like this peer supports Trickle, after all */
1507
                        JANUS_LOG(LOG_VERB, "Handle %"SCNu64" supports trickle even if it didn't negotiate it...\n", handle->handle_id);
1508
                        janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);
1509
                }
1510
                if(candidate != NULL) {
1511
                        /* We got a single candidate */
1512
                        if(!json_is_object(candidate) || json_object_get(candidate, "completed") != NULL) {
1513
                                JANUS_LOG(LOG_INFO, "No more remote candidates for handle %"SCNu64"!\n", handle->handle_id);
1514
                                janus_mutex_lock(&handle->mutex);
1515
                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES);
1516
                                janus_mutex_unlock(&handle->mutex);
1517
                        } else {
1518
                                /* Handle remote candidate */
1519
                                json_t *mid = json_object_get(candidate, "sdpMid");
1520
                                if(!mid) {
1521
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Trickle error: missing mandatory element (sdpMid)");
1522
                                        goto jsondone;
1523
                                }
1524
                                if(!json_is_string(mid)) {
1525
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Trickle error: invalid element type (sdpMid should be a string)");
1526
                                        goto jsondone;
1527
                                }
1528
                                json_t *mline = json_object_get(candidate, "sdpMLineIndex");
1529
                                if(!mline) {
1530
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Trickle error: missing mandatory element (sdpMLineIndex)");
1531
                                        goto jsondone;
1532
                                }
1533
                                if(!json_is_integer(mline)) {
1534
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Trickle error: invalid element type (sdpMLineIndex should be an integer)");
1535
                                        goto jsondone;
1536
                                }
1537
                                json_t *rc = json_object_get(candidate, "candidate");
1538
                                if(!rc) {
1539
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Trickle error: missing mandatory element (candidate)");
1540
                                        goto jsondone;
1541
                                }
1542
                                if(!json_is_string(rc)) {
1543
                                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Trickle error: invalid element type (candidate should be a string)");
1544
                                        goto jsondone;
1545
                                }
1546
                                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Trickle candidate (%s): %s\n", handle->handle_id, json_string_value(mid), json_string_value(rc));
1547
                                /* Is there any stream ready? this trickle may get here before the SDP it relates to */
1548
                                if(handle->audio_stream == NULL && handle->video_stream == NULL && handle->data_stream == NULL) {
1549
                                        /* No stream available, wait a bit */
1550
                                        gint64 waited = 0;
1551
                                        while(handle->audio_stream == NULL && handle->video_stream == NULL && handle->data_stream == NULL) {
1552
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"] No stream, wait a bit in case this trickle got here before the SDP...\n", handle->handle_id);
1553
                                                g_usleep(100000);
1554
                                                waited += 100000;
1555
                                                if(waited >= 3*G_USEC_PER_SEC) {
1556
                                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 3 seconds, that's enough!\n", handle->handle_id);
1557
                                                        break;
1558
                                                }
1559
                                        }
1560
                                }
1561
                                /* Is the ICE stack ready already? */
1562
                                if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER)) {
1563
                                        /* Still processing the offer, wait a bit */
1564
                                        gint64 waited = 0;
1565
                                        while(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER)) {
1566
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still processing the offer, waiting until we're done there...\n", handle->handle_id);
1567
                                                g_usleep(100000);
1568
                                                waited += 100000;
1569
                                                if(waited >= 5*G_USEC_PER_SEC) {
1570
                                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 5 seconds, that's enough!\n", handle->handle_id);
1571
                                                        break;
1572
                                                }
1573
                                        }
1574
                                }
1575
                                /* Parse it */
1576
                                int sdpMLineIndex = json_integer_value(mline);
1577
                                int video = 0, data = 0;
1578
                                /* FIXME badly, we should have an array of m-lines in the handle object */
1579
                                switch(sdpMLineIndex) {
1580
                                        case 0:
1581
                                                if(handle->audio_stream == NULL) {
1582
                                                        video = handle->video_stream ? 1 : 0;
1583
                                                        data = !video;
1584
                                                }
1585
                                                break;
1586
                                        case 1:
1587
                                                if(handle->audio_stream == NULL) {
1588
                                                        data = 1;
1589
                                                } else {
1590
                                                        video = handle->video_stream ? 1 : 0;
1591
                                                        data = !video;
1592
                                                }
1593
                                                break;
1594
                                        case 2:
1595
                                                data = 1;
1596
                                                break;
1597
                                        default:
1598
                                                /* FIXME We don't support more than 3 m-lines right now */
1599
                                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Trickle error: invalid element type (sdpMLineIndex not [0,2])");
1600
                                                goto jsondone;
1601
                                                break;
1602
                                }
1603
#ifndef HAVE_SCTP
1604
                                data = 0;
1605
#endif
1606
                                if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
1607
                                                && (
1608
                                                        ((video || data) && handle->audio_stream != NULL) || 
1609
                                                                ((data) && handle->video_stream != NULL))
1610
                                                        ) {
1611
                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Got a %s candidate but we're bundling, ignoring...\n", handle->handle_id, json_string_value(mid));
1612
                                } else {
1613
                                        janus_ice_stream *stream = video ? handle->video_stream : (data ? handle->data_stream : handle->audio_stream);
1614
                                        if(stream == NULL) {
1615
                                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_TRICKE_INVALID_STREAM, "Trickle error: no %s stream", json_string_value(mid));
1616
                                                goto jsondone;
1617
                                        }
1618
                                        int res = janus_sdp_parse_candidate(stream, json_string_value(rc), 1);
1619
                                        if(res != 0) {
1620
                                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Failed to parse candidate... (%d)\n", handle->handle_id, res);
1621
                                        }
1622
                                }
1623
                        }
1624
                } else {
1625
                        /* We got multiple candidates in an array */
1626
                        if(!json_is_array(candidates)) {
1627
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Can't have both candidate and candidates");
1628
                                goto jsondone;
1629
                        }
1630
                        JANUS_LOG(LOG_INFO, "Got multiple candidates (%zu)\n", json_array_size(candidates));
1631
                        gboolean last_candidate = FALSE;
1632
                        if(json_array_size(candidates) > 0) {
1633
                                /* Handle remote candidates */
1634
                                size_t i = 0;
1635
                                for(i=0; i<json_array_size(candidates); i++) {
1636
                                        json_t *candidate = json_array_get(candidates, i);
1637
                                        if(candidate == NULL || !json_is_object(candidate) || json_object_get(candidate, "completed") != NULL) {
1638
                                                /* A 'NULL' candidate is our cue */
1639
                                                last_candidate = TRUE;
1640
                                                continue;
1641
                                        }
1642
                                        json_t *mid = json_object_get(candidate, "sdpMid");
1643
                                        if(!mid) {
1644
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1645
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, missing mandatory element (sdpMid)", i);
1646
                                                continue;
1647
                                        }
1648
                                        if(!json_is_string(mid)) {
1649
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1650
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, invalid element type (sdpMid should be a string)", i);
1651
                                                continue;
1652
                                        }
1653
                                        json_t *mline = json_object_get(candidate, "sdpMLineIndex");
1654
                                        if(!mline) {
1655
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1656
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, missing mandatory element (sdpMLineIndex)", i);
1657
                                                continue;
1658
                                        }
1659
                                        if(!json_is_integer(mline)) {
1660
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1661
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, invalid element type (sdpMLineIndex should be an integer)", i);
1662
                                                continue;
1663
                                        }
1664
                                        json_t *rc = json_object_get(candidate, "candidate");
1665
                                        if(!rc) {
1666
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1667
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, missing mandatory element (candidate)", i);
1668
                                                continue;
1669
                                        }
1670
                                        if(!json_is_string(rc)) {
1671
                                                /* Invalid candidate but we don't return an error, we just ignore it */
1672
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, invalid element type (candidate should be a string)", i);
1673
                                                continue;
1674
                                        }
1675
                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Trickle candidate at index %zu (%s): %s\n", handle->handle_id, i, json_string_value(mid), json_string_value(rc));
1676
                                        /* Parse it */
1677
                                        int sdpMLineIndex = json_integer_value(mline);
1678
                                        if(sdpMLineIndex < 0 || sdpMLineIndex > 2) {
1679
                                                /* FIXME We don't support more than 3 m-lines right now */
1680
                                                JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, invalid element type (sdpMLineIndex not [0,2])", i);
1681
                                                continue;
1682
                                        }
1683
                                        /* Is there any stream ready? this trickle may get here before the SDP it relates to */
1684
                                        if(handle->audio_stream == NULL && handle->video_stream == NULL && handle->data_stream == NULL) {
1685
                                                /* No stream available, wait a bit */
1686
                                                gint64 waited = 0;
1687
                                                while(handle->audio_stream == NULL && handle->video_stream == NULL && handle->data_stream == NULL) {
1688
                                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] No stream, wait a bit in case this trickle got here before the SDP...\n", handle->handle_id);
1689
                                                        g_usleep(100000);
1690
                                                        waited += 100000;
1691
                                                        if(waited >= 3*G_USEC_PER_SEC) {
1692
                                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 3 seconds, that's enough!\n", handle->handle_id);
1693
                                                                break;
1694
                                                        }
1695
                                                }
1696
                                        }
1697
                                        /* Is the ICE stack ready already? */
1698
                                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER)) {
1699
                                                /* Still processing the offer, wait a bit */
1700
                                                gint64 waited = 0;
1701
                                                while(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER)) {
1702
                                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still processing the offer, waiting until we're done there...\n", handle->handle_id);
1703
                                                        g_usleep(100000);
1704
                                                        waited += 100000;
1705
                                                        if(waited >= 5*G_USEC_PER_SEC) {
1706
                                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 5 seconds, that's enough!\n", handle->handle_id);
1707
                                                                break;
1708
                                                        }
1709
                                                }
1710
                                        }
1711
                                        int video = 0, data = 0;
1712
                                        /* FIXME badly, we should have an array of m-lines in the handle object */
1713
                                        switch(sdpMLineIndex) {
1714
                                                case 0:
1715
                                                        if(handle->audio_stream == NULL) {
1716
                                                                video = handle->video_stream ? 1 : 0;
1717
                                                                data = !video;
1718
                                                        }
1719
                                                        break;
1720
                                                case 1:
1721
                                                        if(handle->audio_stream == NULL) {
1722
                                                                data = 1;
1723
                                                        } else {
1724
                                                                video = handle->video_stream ? 1 : 0;
1725
                                                                data = !video;
1726
                                                        }
1727
                                                        break;
1728
                                                case 2:
1729
                                                        data = 1;
1730
                                                        break;
1731
                                                default:
1732
                                                        break;
1733
                                        }
1734
#ifndef HAVE_SCTP
1735
                                        data = 0;
1736
#endif
1737
                                        if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
1738
                                                        && (
1739
                                                                ((video || data) && handle->audio_stream != NULL) || 
1740
                                                                        ((data) && handle->video_stream != NULL))
1741
                                                                ) {
1742
                                                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Got a %s candidate but we're bundling, ignoring...\n", handle->handle_id, json_string_value(mid));
1743
                                        } else {
1744
                                                janus_ice_stream *stream = video ? handle->video_stream : (data ? handle->data_stream : handle->audio_stream);
1745
                                                if(stream == NULL) {
1746
                                                        JANUS_LOG(LOG_WARN, "Trickle error: ignoring candidate at index %zu, no %s stream", i, json_string_value(mid));
1747
                                                        continue;
1748
                                                }
1749
                                                int res = janus_sdp_parse_candidate(stream, json_string_value(rc), 1);
1750
                                                if(res != 0) {
1751
                                                        JANUS_LOG(LOG_ERR, "[%"SCNu64"] Failed to parse candidate at index %zu... (%d)\n", handle->handle_id, i, res);
1752
                                                }
1753
                                        }
1754
                                }
1755
                        }
1756
                        if(last_candidate) {
1757
                                JANUS_LOG(LOG_INFO, "No more remote candidates for handle %"SCNu64"!\n", handle->handle_id);
1758
                                janus_mutex_lock(&handle->mutex);
1759
                                janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES);
1760
                                janus_mutex_unlock(&handle->mutex);
1761
                        }
1762
                }
1763
                /* We reply right away, not to block the web server... */
1764
                json_t *reply = json_object();
1765
                json_object_set_new(reply, "janus", json_string("ack"));
1766
                json_object_set_new(reply, "session_id", json_integer(session_id));
1767
                json_object_set_new(reply, "transaction", json_string(transaction_text));
1768
                /* Convert to a string */
1769
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
1770
                json_decref(reply);
1771
                /* Send the success reply */
1772
                ret = janus_process_success(source, "application/json", reply_text);
1773
        } else {
1774
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNKNOWN_REQUEST, "Unknown request '%s'", message_text);
1775
        }
1776
        goto jsondone;
1777

    
1778
jsondone:
1779
        json_decref(root);
1780
        
1781
        return ret;
1782
}
1783

    
1784
/* Admin/monitor WebServer requests handler */
1785
int janus_admin_ws_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
1786
{
1787
        char *payload = NULL;
1788
        struct MHD_Response *response = NULL;
1789
        int ret = MHD_NO;
1790
        gchar *session_path = NULL, *handle_path = NULL;
1791
        gchar **basepath = NULL, **path = NULL;
1792

    
1793
        JANUS_LOG(LOG_VERB, "Got an admin/monitor HTTP %s request on %s...\n", method, url);
1794
        /* Is this the first round? */
1795
        int firstround = 0;
1796
        janus_http_msg *msg = (janus_http_msg *)*ptr;
1797
        if (msg == NULL) {
1798
                firstround = 1;
1799
                JANUS_LOG(LOG_VERB, " ... Just parsing headers for now...\n");
1800
                msg = calloc(1, sizeof(janus_http_msg));
1801
                if(msg == NULL) {
1802
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1803
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1804
                        MHD_destroy_response(response);
1805
                        goto done;
1806
                }
1807
                msg->acrh = NULL;
1808
                msg->acrm = NULL;
1809
                msg->payload = NULL;
1810
                msg->len = 0;
1811
                msg->session_id = 0;
1812
                *ptr = msg;
1813
                MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_ws_headers, msg);
1814
                ret = MHD_YES;
1815
        }
1816
        /* Parse request */
1817
        if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
1818
                JANUS_LOG(LOG_ERR, "Unsupported method...\n");
1819
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
1820
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1821
                if(msg->acrm)
1822
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1823
                if(msg->acrh)
1824
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1825
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_IMPLEMENTED, response);
1826
                MHD_destroy_response(response);
1827
                return ret;
1828
        }
1829
        if (!strcasecmp(method, "OPTIONS")) {
1830
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO); 
1831
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1832
                if(msg->acrm)
1833
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1834
                if(msg->acrh)
1835
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1836
                ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
1837
                MHD_destroy_response(response);
1838
        }
1839
        /* Get path components */
1840
        if(strcasecmp(url, admin_ws_path)) {
1841
                if(strlen(admin_ws_path) > 1) {
1842
                        basepath = g_strsplit(url, admin_ws_path, -1);
1843
                } else {
1844
                        /* The base path is the web server too itself, we process the url itself */
1845
                        basepath = calloc(3, sizeof(char *));
1846
                        basepath[0] = g_strdup("/");
1847
                        basepath[1] = g_strdup(url);
1848
                }
1849
                if(basepath[1] == NULL || basepath[1][0] != '/') {
1850
                        JANUS_LOG(LOG_ERR, "Invalid url %s (%s)\n", url, basepath[1]);
1851
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
1852
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1853
                        if(msg->acrm)
1854
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1855
                        if(msg->acrh)
1856
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1857
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1858
                        MHD_destroy_response(response);
1859
                }
1860
                if(firstround) {
1861
                        g_strfreev(basepath);
1862
                        return ret;
1863
                }
1864
                path = g_strsplit(basepath[1], "/", -1);
1865
                if(path == NULL || path[1] == NULL) {
1866
                        JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path[1]);
1867
                        response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
1868
                        MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1869
                        if(msg->acrm)
1870
                                MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1871
                        if(msg->acrh)
1872
                                MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1873
                        ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1874
                        MHD_destroy_response(response);
1875
                }
1876
        }
1877
        if(firstround)
1878
                return ret;
1879
        JANUS_LOG(LOG_VERB, " ... parsing request...\n");
1880
        if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
1881
                session_path = g_strdup(path[1]);
1882
                if(session_path == NULL) {
1883
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1884
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1885
                        MHD_destroy_response(response);
1886
                        goto done;
1887
                }
1888
                JANUS_LOG(LOG_VERB, "Session: %s\n", session_path);
1889
        }
1890
        if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
1891
                handle_path = g_strdup(path[2]);
1892
                if(handle_path == NULL) {
1893
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
1894
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1895
                        MHD_destroy_response(response);
1896
                        goto done;
1897
                }
1898
                JANUS_LOG(LOG_VERB, "Handle: %s\n", handle_path);
1899
        }
1900
        if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {
1901
                JANUS_LOG(LOG_ERR, "Too many components...\n");
1902
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
1903
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
1904
                if(msg->acrm)
1905
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
1906
                if(msg->acrh)
1907
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
1908
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
1909
                MHD_destroy_response(response);
1910
                goto done;
1911
        }
1912
        /* Get payload, if any */
1913
        if(!strcasecmp(method, "POST")) {
1914
                JANUS_LOG(LOG_VERB, "Processing POST data (%s)...\n", msg->contenttype);
1915
                if(*upload_data_size != 0) {
1916
                        JANUS_LOG(LOG_VERB, "  -- Uploaded data (%zu bytes)\n", *upload_data_size);
1917
                        if(msg->payload == NULL)
1918
                                msg->payload = calloc(1, *upload_data_size+1);
1919
                        else
1920
                                msg->payload = realloc(msg->payload, msg->len+*upload_data_size+1);
1921
                        if(msg->payload == NULL) {
1922
                                JANUS_LOG(LOG_FATAL, "Memory error!\n");
1923
                                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
1924
                                MHD_destroy_response(response);
1925
                                goto done;
1926
                        }
1927
                        memcpy(msg->payload+msg->len, upload_data, *upload_data_size);
1928
                        memset(msg->payload+msg->len+*upload_data_size, '\0', 1);
1929
                        msg->len += *upload_data_size;
1930
                        JANUS_LOG(LOG_VERB, "  -- Data we have now (%zu bytes)\n", msg->len);
1931
                        *upload_data_size = 0;        /* Go on */
1932
                        ret = MHD_YES;
1933
                        goto done;
1934
                }
1935
                JANUS_LOG(LOG_VERB, "Done getting payload, we can answer\n");
1936
                if(msg->payload == NULL) {
1937
                        JANUS_LOG(LOG_ERR, "No payload :-(\n");
1938
                        ret = MHD_NO;
1939
                        goto done;
1940
                }
1941
                payload = msg->payload;
1942
                JANUS_LOG(LOG_VERB, "%s\n", payload);
1943
        }
1944

    
1945
        /* Process the request, specifying this HTTP connection is the source */
1946
        janus_request_source source = {
1947
                .type = JANUS_SOURCE_PLAIN_HTTP,
1948
                .source = (void *)connection,
1949
                .msg = (void *)msg,
1950
        };
1951
        
1952
        /* Is this a generic request for info? */
1953
        if(session_path != NULL && !strcmp(session_path, "info")) {
1954
                /* The info REST endpoint, if contacted through a GET, provides information on the gateway */
1955
                if(strcasecmp(method, "GET")) {
1956
                        ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_USE_GET, "Use GET for the info endpoint");
1957
                        goto done;
1958
                }
1959
                /* Send the success reply */
1960
                ret = janus_process_success(&source, "application/json", janus_info(NULL));
1961
                goto done;
1962
        }
1963
        
1964
        /* Without a payload we don't know what to do */
1965
        if(!payload) {
1966
                ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_INVALID_JSON, "Request payload missing");
1967
                goto done;
1968
        }
1969
        
1970
        /* Parse the JSON payload */
1971
        json_error_t error;
1972
        json_t *root = json_loads(payload, 0, &error);
1973
        if(!root) {
1974
                ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
1975
                goto done;
1976
        }
1977
        if(!json_is_object(root)) {
1978
                ret = janus_process_error(&source, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
1979
                json_decref(root);
1980
                goto done;
1981
        }
1982
        /* Check if we have session and handle identifiers */
1983
        guint64 session_id = session_path ? g_ascii_strtoll(session_path, NULL, 10) : 0;
1984
        guint64 handle_id = handle_path ? g_ascii_strtoll(handle_path, NULL, 10) : 0;
1985
        if(session_id > 0)
1986
                json_object_set_new(root, "session_id", json_integer(session_id));
1987
        if(handle_id > 0)
1988
                json_object_set_new(root, "handle_id", json_integer(handle_id));
1989
        ret = janus_process_incoming_admin_request(&source, root);
1990

    
1991
done:
1992
        g_strfreev(basepath);
1993
        g_strfreev(path);
1994
        g_free(session_path);
1995
        g_free(handle_path);
1996
        return ret;
1997
}
1998

    
1999
int janus_process_incoming_admin_request(janus_request_source *source, json_t *root) {
2000
        int ret = MHD_NO;
2001
        if(source == NULL || root == NULL) {
2002
                JANUS_LOG(LOG_ERR, "Missing source or payload to process, giving up...\n");
2003
                return ret;
2004
        }
2005
        /* Ok, let's start with the ids */
2006
        guint64 session_id = 0, handle_id = 0;
2007
        json_t *s = json_object_get(root, "session_id");
2008
        if(s && json_is_integer(s))
2009
                session_id = json_integer_value(s);
2010
        json_t *h = json_object_get(root, "handle_id");
2011
        if(h && json_is_integer(h))
2012
                handle_id = json_integer_value(h);
2013

    
2014
        /* Get transaction and message request */
2015
        json_t *transaction = json_object_get(root, "transaction");
2016
        if(!transaction) {
2017
                ret = janus_process_error(source, session_id, NULL, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (transaction)");
2018
                goto jsondone;
2019
        }
2020
        if(!json_is_string(transaction)) {
2021
                ret = janus_process_error(source, session_id, NULL, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (transaction should be a string)");
2022
                goto jsondone;
2023
        }
2024
        const gchar *transaction_text = json_string_value(transaction);
2025
        json_t *message = json_object_get(root, "janus");
2026
        if(!message) {
2027
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (janus)");
2028
                goto jsondone;
2029
        }
2030
        if(!json_is_string(message)) {
2031
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (janus should be a string)");
2032
                goto jsondone;
2033
        }
2034
        const gchar *message_text = json_string_value(message);
2035
        
2036
        if(session_id == 0 && handle_id == 0) {
2037
                /* Can only be a 'Get all sessions' or some general setting manipulation request */
2038
                if(!strcasecmp(message_text, "info")) {
2039
                        /* The generic info request */
2040
                        ret = janus_process_success(source, "application/json", janus_info(transaction_text));
2041
                        goto jsondone;
2042
                }
2043
                if(admin_ws_api_secret != NULL) {
2044
                        /* There's an admin/monitor secret, check that the client provided it */
2045
                        json_t *secret = json_object_get(root, "admin_secret");
2046
                        if(!secret || !json_is_string(secret) || strcmp(json_string_value(secret), admin_ws_api_secret)) {
2047
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);
2048
                                goto jsondone;
2049
                        }
2050
                }
2051
                if(!strcasecmp(message_text, "get_status")) {
2052
                        /* Return some info on the settings (mostly debug-related, at the moment) */
2053
                        json_t *reply = json_object();
2054
                        json_object_set_new(reply, "janus", json_string("success"));
2055
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2056
                        json_t *status = json_object();
2057
                        json_object_set_new(status, "log_level", json_integer(log_level));
2058
                        json_object_set_new(status, "locking_debug", json_integer(lock_debug));
2059
                        json_object_set_new(status, "libnice_debug", json_integer(janus_ice_is_ice_debugging_enabled()));
2060
                        json_object_set_new(status, "max_nack_queue", json_integer(janus_get_max_nack_queue()));
2061
                        json_object_set_new(reply, "status", status);
2062
                        /* Convert to a string */
2063
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2064
                        json_decref(reply);
2065
                        /* Send the success reply */
2066
                        ret = janus_process_success(source, "application/json", reply_text);
2067
                        goto jsondone;
2068
                } else if(!strcasecmp(message_text, "set_log_level")) {
2069
                        /* Change the debug logging level */
2070
                        json_t *level = json_object_get(root, "level");
2071
                        if(!level) {
2072
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (level)");
2073
                                goto jsondone;
2074
                        }
2075
                        if(!json_is_integer(level)) {
2076
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (level should be an integer)");
2077
                                goto jsondone;
2078
                        }
2079
                        int level_num = json_integer_value(level);
2080
                        if(level_num < LOG_NONE || level_num > LOG_MAX) {
2081
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (level should be between %d and %d)", LOG_NONE, LOG_MAX);
2082
                                goto jsondone;
2083
                        }
2084
                        log_level = level_num;
2085
                        /* Prepare JSON reply */
2086
                        json_t *reply = json_object();
2087
                        json_object_set_new(reply, "janus", json_string("success"));
2088
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2089
                        json_object_set_new(reply, "level", json_integer(log_level));
2090
                        /* Convert to a string */
2091
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2092
                        json_decref(reply);
2093
                        /* Send the success reply */
2094
                        ret = janus_process_success(source, "application/json", reply_text);
2095
                        goto jsondone;
2096
                } else if(!strcasecmp(message_text, "set_locking_debug")) {
2097
                        /* Enable/disable the locking debug (would show a message on the console for every lock attempt) */
2098
                        json_t *debug = json_object_get(root, "debug");
2099
                        if(!debug) {
2100
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (debug)");
2101
                                goto jsondone;
2102
                        }
2103
                        if(!json_is_integer(debug)) {
2104
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (debug should be an integer)");
2105
                                goto jsondone;
2106
                        }
2107
                        int debug_num = json_integer_value(debug);
2108
                        if(debug_num < 0 || debug_num > 1) {
2109
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (debug should be either 0 or 1)");
2110
                                goto jsondone;
2111
                        }
2112
                        lock_debug = debug_num;
2113
                        /* Prepare JSON reply */
2114
                        json_t *reply = json_object();
2115
                        json_object_set_new(reply, "janus", json_string("success"));
2116
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2117
                        json_object_set_new(reply, "debug", json_integer(lock_debug));
2118
                        /* Convert to a string */
2119
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2120
                        json_decref(reply);
2121
                        /* Send the success reply */
2122
                        ret = janus_process_success(source, "application/json", reply_text);
2123
                        goto jsondone;
2124
                } else if(!strcasecmp(message_text, "set_libnice_debug")) {
2125
                        /* Enable/disable the libnice debugging (http://nice.freedesktop.org/libnice/libnice-Debug-messages.html) */
2126
                        json_t *debug = json_object_get(root, "debug");
2127
                        if(!debug) {
2128
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (debug)");
2129
                                goto jsondone;
2130
                        }
2131
                        if(!json_is_integer(debug)) {
2132
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (debug should be an integer)");
2133
                                goto jsondone;
2134
                        }
2135
                        int debug_num = json_integer_value(debug);
2136
                        if(debug_num < 0 || debug_num > 1) {
2137
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (debug should be either 0 or 1)");
2138
                                goto jsondone;
2139
                        }
2140
                        if(debug_num) {
2141
                                janus_ice_debugging_enable();
2142
                        } else {
2143
                                janus_ice_debugging_disable();
2144
                        }
2145
                        /* Prepare JSON reply */
2146
                        json_t *reply = json_object();
2147
                        json_object_set_new(reply, "janus", json_string("success"));
2148
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2149
                        json_object_set_new(reply, "debug", json_integer(janus_ice_is_ice_debugging_enabled()));
2150
                        /* Convert to a string */
2151
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2152
                        json_decref(reply);
2153
                        /* Send the success reply */
2154
                        ret = janus_process_success(source, "application/json", reply_text);
2155
                        goto jsondone;
2156
                } else if(!strcasecmp(message_text, "set_max_nack_queue")) {
2157
                        /* Change the current value for the max NACK queue */
2158
                        json_t *mnq = json_object_get(root, "max_nack_queue");
2159
                        if(!mnq) {
2160
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (max_nack_queue)");
2161
                                goto jsondone;
2162
                        }
2163
                        if(!json_is_integer(mnq)) {
2164
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (max_nack_queue should be an integer)");
2165
                                goto jsondone;
2166
                        }
2167
                        int mnq_num = json_integer_value(mnq);
2168
                        if(mnq_num < 0) {
2169
                                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "Invalid element type (max_nack_queue should be a positive integer)");
2170
                                goto jsondone;
2171
                        }
2172
                        janus_set_max_nack_queue(mnq_num);
2173
                        /* Prepare JSON reply */
2174
                        json_t *reply = json_object();
2175
                        json_object_set_new(reply, "janus", json_string("success"));
2176
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2177
                        json_object_set_new(reply, "max_nack_queue", json_integer(janus_get_max_nack_queue()));
2178
                        /* Convert to a string */
2179
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2180
                        json_decref(reply);
2181
                        /* Send the success reply */
2182
                        ret = janus_process_success(source, "application/json", reply_text);
2183
                        goto jsondone;
2184
                } else if(!strcasecmp(message_text, "list_sessions")) {
2185
                        /* List sessions */
2186
                        session_id = 0;
2187
                        json_t *list = json_array();
2188
                        janus_mutex_lock(&sessions_mutex);
2189
                        GHashTableIter iter;
2190
                        gpointer value;
2191
                        g_hash_table_iter_init(&iter, sessions);
2192
                        while (g_hash_table_iter_next(&iter, NULL, &value)) {
2193
                                janus_session *session = value;
2194
                                if(session == NULL) {
2195
                                        continue;
2196
                                }
2197
                                json_array_append_new(list, json_integer(session->session_id));
2198
                        }
2199
                        janus_mutex_unlock(&sessions_mutex);
2200
                        /* Prepare JSON reply */
2201
                        json_t *reply = json_object();
2202
                        json_object_set_new(reply, "janus", json_string("success"));
2203
                        json_object_set_new(reply, "transaction", json_string(transaction_text));
2204
                        json_object_set_new(reply, "sessions", list);
2205
                        /* Convert to a string */
2206
                        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2207
                        json_decref(reply);
2208
                        /* Send the success reply */
2209
                        ret = janus_process_success(source, "application/json", reply_text);
2210
                        goto jsondone;
2211
                } else {
2212
                        /* No message we know of */
2213
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
2214
                        goto jsondone;
2215
                }
2216
        }
2217
        if(session_id < 1) {
2218
                JANUS_LOG(LOG_ERR, "Invalid session\n");
2219
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);
2220
                goto jsondone;
2221
        }
2222
        if(source->type == JANUS_SOURCE_PLAIN_HTTP) {
2223
                janus_http_msg *msg = (janus_http_msg *)source->msg;
2224
                if(msg != NULL)
2225
                        msg->session_id = session_id;
2226
        }
2227
        if(h && handle_id < 1) {
2228
                JANUS_LOG(LOG_ERR, "Invalid handle\n");
2229
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);
2230
                goto jsondone;
2231
        }
2232

    
2233
        /* Go on with the processing */
2234
        if(admin_ws_api_secret != NULL) {
2235
                /* There's an API secret, check that the client provided it */
2236
                json_t *secret = json_object_get(root, "admin_secret");
2237
                if(!secret || !json_is_string(secret) || strcmp(json_string_value(secret), admin_ws_api_secret)) {
2238
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);
2239
                        goto jsondone;
2240
                }
2241
        }
2242

    
2243
        /* If we got here, make sure we have a session (and/or a handle) */
2244
        janus_session *session = janus_session_find(session_id);
2245
        if(!session) {
2246
                JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
2247
                ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, "No such session %"SCNu64"", session_id);
2248
                goto jsondone;
2249
        }
2250
        janus_ice_handle *handle = NULL;
2251
        if(handle_id > 0) {
2252
                handle = janus_ice_handle_find(session, handle_id);
2253
                if(!handle) {
2254
                        JANUS_LOG(LOG_ERR, "Couldn't find any handle %"SCNu64" in session %"SCNu64"...\n", handle_id, session_id);
2255
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_HANDLE_NOT_FOUND, "No such handle %"SCNu64" in session %"SCNu64"", handle_id, session_id);
2256
                        goto jsondone;
2257
                }
2258
        }
2259

    
2260
        /* What is this? */
2261
        if(handle == NULL) {
2262
                /* Session-related */
2263
                if(strcasecmp(message_text, "list_handles")) {
2264
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
2265
                        goto jsondone;
2266
                }
2267

    
2268
                /* List handles */
2269
                GHashTableIter iter;
2270
                gpointer value;
2271
                json_t *list = json_array();
2272

    
2273
                janus_mutex_lock(&session->mutex);
2274

    
2275
                g_hash_table_iter_init(&iter, session->ice_handles);
2276
                while (g_hash_table_iter_next(&iter, NULL, &value)) {
2277
                        janus_ice_handle *handle = value;
2278

    
2279
                        if(handle == NULL) {
2280
                                continue;
2281
                        }
2282

    
2283
                        json_array_append_new(list, json_integer(handle->handle_id));
2284
                }
2285

    
2286
                janus_mutex_unlock(&session->mutex);
2287

    
2288
                /* Prepare JSON reply */
2289
                json_t *reply = json_object();
2290
                json_object_set_new(reply, "janus", json_string("success"));
2291
                json_object_set_new(reply, "transaction", json_string(transaction_text));
2292
                json_object_set_new(reply, "session_id", json_integer(session_id));
2293
                json_object_set_new(reply, "handles", list);
2294
                /* Convert to a string */
2295
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2296
                json_decref(reply);
2297
                /* Send the success reply */
2298
                ret = janus_process_success(source, "application/json", reply_text);
2299
                goto jsondone;
2300
        } else {
2301
                /* Handle-related */
2302
                if(strcasecmp(message_text, "handle_info")) {
2303
                        ret = janus_process_error(source, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);
2304
                        goto jsondone;
2305
                }
2306
                /* Prepare info */
2307
                janus_mutex_lock(&handle->mutex);
2308
                json_t *info = json_object();
2309
                json_object_set_new(info, "session_id", json_integer(session_id));
2310
                json_object_set_new(info, "handle_id", json_integer(handle_id));
2311
                if(handle->app) {
2312
                        janus_plugin *plugin = (janus_plugin *)handle->app;
2313
                        json_object_set_new(info, "plugin", json_string(plugin->get_package()));
2314
                        if(plugin->query_session) {
2315
                                /* FIXME This check will NOT work with legacy plugins that were compiled BEFORE the method was specified in plugin.h */
2316
                                char *query = plugin->query_session(handle->app_handle);
2317
                                if(query != NULL) {
2318
                                        /* Make sure this is JSON */
2319
                                        json_error_t error;
2320
                                        json_t *query_info = json_loads(query, 0, &error);
2321
                                        if(!query_info || !json_is_object(query_info)) {
2322
                                                JANUS_LOG(LOG_WARN, "Ignoring invalid query response from the plugin\n");
2323
                                        } else {
2324
                                                json_object_set_new(info, "plugin_specific", query_info);
2325
                                        }
2326
                                        g_free(query);
2327
                                        query = NULL;
2328
                                }
2329
                        }
2330
                }
2331
                json_t *flags = json_object();
2332
                json_object_set_new(flags, "processing-offer", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER)));
2333
                json_object_set_new(flags, "starting", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_START)));
2334
                json_object_set_new(flags, "ready", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)));
2335
                json_object_set_new(flags, "stopped", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)));
2336
                json_object_set_new(flags, "alert", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)));
2337
                json_object_set_new(flags, "bundle", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)));
2338
                json_object_set_new(flags, "rtcp-mux", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX)));
2339
                json_object_set_new(flags, "trickle", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE)));
2340
                json_object_set_new(flags, "all-trickles", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES)));
2341
                json_object_set_new(flags, "trickle-synced", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE_SYNCED)));
2342
                json_object_set_new(flags, "data-channels", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS)));
2343
                json_object_set_new(flags, "plan-b", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PLAN_B)));
2344
                json_object_set_new(flags, "cleaning", json_integer(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)));
2345
                json_object_set_new(info, "flags", flags);
2346
                json_t *sdps = json_object();
2347
                if(json_string(handle->local_sdp))
2348
                        json_object_set_new(sdps, "local", json_string(handle->local_sdp));
2349
                if(json_string(handle->remote_sdp))
2350
                        json_object_set_new(sdps, "remote", json_string(handle->remote_sdp));
2351
                json_object_set_new(info, "sdps", sdps);
2352
                //~ json_object_set_new(info, "candidates-gathered", json_integer(handle->cdone));
2353
                json_t *streams = json_array();
2354
                if(handle->audio_stream) {
2355
                        json_t *s = janus_admin_stream_summary(handle->audio_stream);
2356
                        if(s)
2357
                                json_array_append_new(streams, s);
2358
                }
2359
                if(handle->video_stream) {
2360
                        json_t *s = janus_admin_stream_summary(handle->video_stream);
2361
                        if(s)
2362
                                json_array_append_new(streams, s);
2363
                }
2364
                if(handle->data_stream) {
2365
                        json_t *s = janus_admin_stream_summary(handle->data_stream);
2366
                        if(s)
2367
                                json_array_append_new(streams, s);
2368
                }
2369
                json_object_set_new(info, "streams", streams);
2370
                janus_mutex_unlock(&handle->mutex);
2371
                /* Prepare JSON reply */
2372
                json_t *reply = json_object();
2373
                json_object_set_new(reply, "janus", json_string("success"));
2374
                json_object_set_new(reply, "transaction", json_string(transaction_text));
2375
                json_object_set_new(reply, "session_id", json_integer(session_id));
2376
                json_object_set_new(reply, "handle_id", json_integer(handle_id));
2377
                json_object_set_new(reply, "info", info);
2378
                /* Convert to a string */
2379
                char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2380
                json_decref(reply);
2381
                /* Send the success reply */
2382
                ret = janus_process_success(source, "application/json", reply_text);
2383
                goto jsondone;
2384
        }
2385

    
2386
jsondone:
2387
        json_decref(root);
2388
        
2389
        return ret;
2390
}
2391

    
2392
int janus_ws_headers(void *cls, enum MHD_ValueKind kind, const char *key, const char *value) {
2393
        janus_http_msg *request = cls;
2394
        JANUS_LOG(LOG_HUGE, "%s: %s\n", key, value);
2395
        if(!strcasecmp(key, MHD_HTTP_HEADER_CONTENT_TYPE)) {
2396
                if(request)
2397
                        request->contenttype = strdup(value);
2398
        } else if(!strcasecmp(key, "Access-Control-Request-Method")) {
2399
                if(request)
2400
                        request->acrm = strdup(value);
2401
        } else if(!strcasecmp(key, "Access-Control-Request-Headers")) {
2402
                if(request)
2403
                        request->acrh = strdup(value);
2404
        }
2405
        return MHD_YES;
2406
}
2407

    
2408
void janus_ws_request_completed(void *cls, struct MHD_Connection *connection, void **con_cls, enum MHD_RequestTerminationCode toe) {
2409
        JANUS_LOG(LOG_VERB, "Request completed, freeing data\n");
2410
        janus_http_msg *request = *con_cls;
2411
        if(!request)
2412
                return;
2413
        if(request->payload != NULL)
2414
                free(request->payload);
2415
        if(request->contenttype != NULL)
2416
                free(request->contenttype);
2417
        if(request->acrh != NULL)
2418
                free(request->acrh);
2419
        if(request->acrm != NULL)
2420
                free(request->acrm);
2421
        free(request);
2422
        *con_cls = NULL;   
2423
}
2424

    
2425
/* Worker to handle notifications */
2426
int janus_ws_notifier(janus_request_source *source, int max_events) {
2427
        if(!source || source->type != JANUS_SOURCE_PLAIN_HTTP)
2428
                return MHD_NO;
2429
        struct MHD_Connection *connection = (struct MHD_Connection *)source->source;
2430
        janus_http_msg *msg = (janus_http_msg *)source->msg;
2431
        if(!connection || !msg)
2432
                return MHD_NO;
2433
        if(max_events < 1)
2434
                max_events = 1;
2435
        JANUS_LOG(LOG_VERB, "... handling long poll...\n");
2436
        janus_http_event *event = NULL;
2437
        struct MHD_Response *response = NULL;
2438
        int ret = MHD_NO;
2439
        guint64 session_id = msg->session_id;
2440
        janus_session *session = janus_session_find(session_id);
2441
        if(!session) {
2442
                JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);
2443
                response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
2444
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
2445
                if(msg->acrm)
2446
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
2447
                if(msg->acrh)
2448
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
2449
                ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
2450
                MHD_destroy_response(response);
2451
                return ret;
2452
        }
2453
        gint64 start = janus_get_monotonic_time();
2454
        gint64 end = 0;
2455
        json_t *list = NULL;
2456
        gboolean found = FALSE;
2457
        /* We have a timeout for the long poll: 30 seconds */
2458
        while(end-start < 30*G_USEC_PER_SEC) {
2459
                event = g_async_queue_try_pop(session->messages);
2460
                if(!session || session->destroy || g_atomic_int_get(&stop) || event != NULL) {
2461
                        if(event == NULL)
2462
                                break;
2463
                        /* Gotcha! */
2464
                        found = TRUE;
2465
                        if(max_events == 1) {
2466
                                break;
2467
                        } else {
2468
                                /* The application is willing to receive more events at the same time, anything to report? */
2469
                                list = json_array();
2470
                                json_error_t error;
2471
                                if(event->payload) {
2472
                                        json_t *ev = json_loads(event->payload, 0, &error);
2473
                                        if(ev && json_is_object(ev))        /* FIXME Should we fail if this is not valid JSON? */
2474
                                                json_array_append_new(list, ev);
2475
                                        g_free(event->payload);
2476
                                        event->payload = NULL;
2477
                                }
2478
                                g_free(event);
2479
                                event = NULL;
2480
                                int events = 1;
2481
                                while(events < max_events) {
2482
                                        event = g_async_queue_try_pop(session->messages);
2483
                                        if(event == NULL)
2484
                                                break;
2485
                                        if(event->payload) {
2486
                                                json_t *ev = json_loads(event->payload, 0, &error);
2487
                                                if(ev && json_is_object(ev))        /* FIXME Should we fail if this is not valid JSON? */
2488
                                                        json_array_append_new(list, ev);
2489
                                                g_free(event->payload);
2490
                                                event->payload = NULL;
2491
                                        }
2492
                                        g_free(event);
2493
                                        event = NULL;
2494
                                        events++;
2495
                                }
2496
                                break;
2497
                        }
2498
                }
2499
                /* Sleep 100ms */
2500
                g_usleep(100000);
2501
                end = janus_get_monotonic_time();
2502
        }
2503
        if(!found) {
2504
                JANUS_LOG(LOG_VERB, "Long poll time out for session %"SCNu64"...\n", session_id);
2505
                event = (janus_http_event *)calloc(1, sizeof(janus_http_event));
2506
                if(event == NULL) {
2507
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
2508
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
2509
                        MHD_destroy_response(response);
2510
                        return ret;
2511
                }
2512
                event->code = 200;
2513
                /*! \todo Improve the Janus protocol keep-alive mechanism in JavaScript */
2514
                event->payload = g_strdup (max_events == 1 ? "{\"janus\" : \"keepalive\"}" : "[{\"janus\" : \"keepalive\"}]");
2515
                event->allocated = 0;
2516
        }
2517
        if(list != NULL) {
2518
                event = (janus_http_event *)calloc(1, sizeof(janus_http_event));
2519
                if(event == NULL) {
2520
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
2521
                        ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
2522
                        MHD_destroy_response(response);
2523
                        return ret;
2524
                }
2525
                event->code = 200;
2526
                char *event_text = json_dumps(list, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2527
                json_decref(list);
2528
                event->payload = event_text;
2529
                event->allocated = 1;
2530
        }
2531
        /* Finish the request by sending the response */
2532
        JANUS_LOG(LOG_VERB, "We have a message to serve...\n\t%s\n", event->payload);
2533
        /* Send event */
2534
        char *payload = g_strdup(event ? (event->payload ? event->payload : "") : "");
2535
        if(payload == NULL) {
2536
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2537
                ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
2538
                MHD_destroy_response(response);
2539
                if(event->payload && event->allocated) {
2540
                        g_free(event->payload);
2541
                        event->payload = NULL;
2542
                }
2543
                g_free(event);
2544
                return ret;
2545
        }
2546
        ret = janus_process_success(source, NULL, payload);
2547
        if(event != NULL) {
2548
                if(event->payload && event->allocated) {
2549
                        g_free(event->payload);
2550
                        event->payload = NULL;
2551
                }
2552
                g_free(event);
2553
        }
2554
        return ret;
2555
}
2556

    
2557
int janus_process_success(janus_request_source *source, const char *transaction, char *payload)
2558
{
2559
        if(!source || !payload)
2560
                return MHD_NO;
2561
        if(source->type == JANUS_SOURCE_PLAIN_HTTP) {
2562
                struct MHD_Connection *connection = (struct MHD_Connection *)source->source;
2563
                janus_http_msg *msg = (janus_http_msg *)source->msg;
2564
                if(!connection || !msg) {
2565
                        g_free(payload);
2566
                        return MHD_NO;
2567
                }
2568
                /* Send the reply */
2569
                struct MHD_Response *response = MHD_create_response_from_data(
2570
                        strlen(payload),
2571
                        (void*) payload,
2572
                        MHD_YES,
2573
                        MHD_NO);
2574
                MHD_add_response_header(response, "Content-Type", "application/json");
2575
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
2576
                if(msg->acrm)
2577
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
2578
                if(msg->acrh)
2579
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
2580
                int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
2581
                MHD_destroy_response(response);
2582
                return ret;
2583
        } else if(source->type == JANUS_SOURCE_WEBSOCKETS) {
2584
#ifdef HAVE_WEBSOCKETS
2585
                janus_websocket_client *client = (janus_websocket_client *)source->source;
2586
                g_async_queue_push(client->responses, payload);
2587
                return MHD_YES;
2588
#else
2589
                JANUS_LOG(LOG_ERR, "WebSockets support not compiled\n");
2590
                g_free(payload);
2591
                return MHD_NO;
2592
#endif
2593
        } else if(source->type == JANUS_SOURCE_RABBITMQ) {
2594
#ifdef HAVE_RABBITMQ
2595
                /* FIXME Add to the queue of outgoing responses */
2596
                janus_rabbitmq_response *response = (janus_rabbitmq_response *)calloc(1, sizeof(janus_rabbitmq_response));
2597
                response->payload = payload;
2598
                response->correlation_id = (char *)source->msg;
2599
                g_async_queue_push(rmq_client->responses, response);
2600
                return MHD_YES;
2601
#else
2602
                JANUS_LOG(LOG_ERR, "RabbitMQ support not compiled\n");
2603
                g_free(payload);
2604
                return MHD_NO;
2605
#endif
2606
        } else {
2607
                /* WTF? */
2608
                g_free(payload);
2609
                return MHD_NO;
2610
        }
2611
}
2612

    
2613
int janus_process_error(janus_request_source *source, uint64_t session_id, const char *transaction, gint error, const char *format, ...)
2614
{
2615
        if(!source)
2616
                return MHD_NO;
2617
        gchar *error_string = NULL;
2618
        if(format == NULL) {
2619
                /* No error string provided, use the default one */
2620
                error_string = (gchar *)janus_get_api_error(error);
2621
        } else {
2622
                /* This callback has variable arguments (error string) */
2623
                va_list ap;
2624
                va_start(ap, format);
2625
                /* FIXME 512 should be enough, but anyway... */
2626
                error_string = calloc(512, sizeof(char));
2627
                if(error_string == NULL) {
2628
                        JANUS_LOG(LOG_FATAL, "Memory error!\n");
2629
                        if(source->type == JANUS_SOURCE_PLAIN_HTTP) {
2630
                                struct MHD_Connection *connection = (struct MHD_Connection *)source->source;
2631
                                janus_http_msg *msg = (janus_http_msg *)source->msg;
2632
                                if(!connection || !msg)
2633
                                        return MHD_NO;
2634
                                struct MHD_Response *response = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
2635
                                int ret = MHD_queue_response(connection, MHD_HTTP_INTERNAL_SERVER_ERROR, response);
2636
                                MHD_destroy_response(response);
2637
                                return ret;
2638
                        } else if(source->type == JANUS_SOURCE_WEBSOCKETS || source->type == JANUS_SOURCE_RABBITMQ) {
2639
                                /* TODO We should send an error to the client... */
2640
                                return MHD_NO;
2641
                        } else {
2642
                                /* WTF? */
2643
                                return MHD_NO;
2644
                        }
2645
                }
2646
                vsprintf(error_string, format, ap);
2647
                va_end(ap);
2648
        }
2649
        /* Done preparing error */
2650
        JANUS_LOG(LOG_VERB, "[%s] Returning error %d (%s)\n", transaction, error, error_string ? error_string : "no text");
2651
        /* Prepare JSON error */
2652
        json_t *reply = json_object();
2653
        json_object_set_new(reply, "janus", json_string("error"));
2654
        if(session_id > 0)
2655
                json_object_set_new(reply, "session_id", json_integer(session_id));
2656
        if(transaction != NULL)
2657
                json_object_set_new(reply, "transaction", json_string(transaction));
2658
        json_t *error_data = json_object();
2659
        json_object_set_new(error_data, "code", json_integer(error));
2660
        json_object_set_new(error_data, "reason", json_string(error_string ? error_string : "no text"));
2661
        json_object_set_new(reply, "error", error_data);
2662
        /* Convert to a string */
2663
        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
2664
        json_decref(reply);
2665
        if(format != NULL && error_string != NULL)
2666
                free(error_string);
2667
        /* Send the error */
2668
        if(source->type == JANUS_SOURCE_PLAIN_HTTP) {
2669
                struct MHD_Connection *connection = (struct MHD_Connection *)source->source;
2670
                janus_http_msg *msg = (janus_http_msg *)source->msg;
2671
                if(!connection || !msg) {
2672
                        g_free(reply_text);
2673
                        return MHD_NO;
2674
                }
2675
                struct MHD_Response *response = MHD_create_response_from_data(
2676
                        strlen(reply_text),
2677
                        (void*)reply_text,
2678
                        MHD_YES,
2679
                        MHD_NO);
2680
                MHD_add_response_header(response, "Content-Type", "application/json");
2681
                MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
2682
                if(msg->acrm)
2683
                        MHD_add_response_header(response, "Access-Control-Allow-Methods", msg->acrm);
2684
                if(msg->acrh)
2685
                        MHD_add_response_header(response, "Access-Control-Allow-Headers", msg->acrh);
2686
                int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
2687
                MHD_destroy_response(response);
2688
                return ret;
2689
        } else if(source->type == JANUS_SOURCE_WEBSOCKETS) {
2690
#ifdef HAVE_WEBSOCKETS
2691
                janus_websocket_client *client = (janus_websocket_client *)source->source;
2692
                g_async_queue_push(client->responses, reply_text);
2693
                return MHD_YES;
2694
#else
2695
                JANUS_LOG(LOG_ERR, "WebSockets support not compiled\n");
2696
                g_free(reply_text);
2697
                return MHD_NO;
2698
#endif
2699
        } else if(source->type == JANUS_SOURCE_RABBITMQ) {
2700
#ifdef HAVE_RABBITMQ
2701
                /* FIXME Add to the queue of outgoing responses */
2702
                janus_rabbitmq_response *response = (janus_rabbitmq_response *)calloc(1, sizeof(janus_rabbitmq_response));
2703
                response->payload = reply_text;
2704
                response->correlation_id = (char *)source->msg;
2705
                g_async_queue_push(rmq_client->responses, response);
2706
                return MHD_YES;
2707
#else
2708
                JANUS_LOG(LOG_ERR, "RabbitMQ support not compiled\n");
2709
                g_free(reply_text);
2710
                return MHD_NO;
2711
#endif
2712
        } else {
2713
                /* WTF? */
2714
                g_free(reply_text);
2715
                return MHD_NO;
2716
        }
2717
}
2718

    
2719

    
2720
#ifdef HAVE_WEBSOCKETS
2721
/* WebSockets */
2722
int janus_wss_onopen(libwebsock_client_state *state) {
2723
        JANUS_LOG(LOG_INFO, "WebSocket onopen: #%d\n", state->sockfd);
2724
        janus_mutex_lock(&wss_mutex);
2725
        if(g_hash_table_lookup(wss_sessions, state) != NULL) {
2726
                JANUS_LOG(LOG_WARN, "  -- Client already handled\n");
2727
                janus_mutex_unlock(&wss_mutex);
2728
                return 0;
2729
        }
2730
        /* Create a new janus_websocket_client instance */
2731
        janus_websocket_client *ws_client = calloc(1, sizeof(janus_websocket_client));
2732
        if(ws_client == NULL) {
2733
                janus_mutex_unlock(&wss_mutex);
2734
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
2735
                libwebsock_close(state);
2736
                return 0;
2737
        }
2738
        /* Create a thread pool to handle incoming requests */
2739
        GError *error = NULL;
2740
        GThreadPool *thread_pool = g_thread_pool_new(janus_wss_task, ws_client, -1, FALSE, &error);
2741
        if(error != NULL) {
2742
                /* Something went wrong... */
2743
                g_free(ws_client);
2744
                janus_mutex_unlock(&wss_mutex);
2745
                JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the pool thread...\n", error->code, error->message ? error->message : "??");
2746
                libwebsock_close(state);
2747
                return 0;
2748
        }
2749
        ws_client->thread_pool = thread_pool;
2750
        ws_client->state = state;
2751
        ws_client->responses = g_async_queue_new();
2752
        ws_client->sessions = NULL;
2753
        /* Create a thread for notifications related to this session as well */
2754
        ws_client->thread = g_thread_try_new("wss_client", &janus_wss_thread, ws_client, &error);
2755
        if(error != NULL) {
2756
                /* Something went wrong... */
2757
                g_free(ws_client);
2758
                janus_mutex_unlock(&wss_mutex);
2759
                JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the notifications thread...\n", error->code, error->message ? error->message : "??");
2760
                libwebsock_close(state);
2761
                return 0;
2762
         }
2763
        ws_client->destroy = 0;
2764
        janus_mutex_init(&ws_client->mutex);
2765
        
2766
        g_hash_table_insert(wss_sessions, state, ws_client);
2767
        janus_mutex_unlock(&wss_mutex);
2768

    
2769
        return 0;
2770
}
2771

    
2772
int janus_wss_onmessage(libwebsock_client_state *state, libwebsock_message *msg) {
2773
        JANUS_LOG(LOG_VERB, "WebSocket onmessage: #%d\n", state->sockfd);
2774
        JANUS_LOG(LOG_HUGE, "  -- Message opcode: %d\n", msg->opcode);
2775
        JANUS_LOG(LOG_HUGE, "  -- Payload Length: %llu\n", msg->payload_len);
2776
        JANUS_LOG(LOG_HUGE, "  -- Payload: %s\n", msg->payload);
2777

    
2778
        janus_mutex_lock(&wss_mutex);
2779
        janus_websocket_client *client = g_hash_table_lookup(wss_sessions, state);
2780
        janus_mutex_unlock(&wss_mutex);
2781
        if(client == NULL) {
2782
                /* Sometimes we get onmessage before onclose */
2783
                JANUS_LOG(LOG_INFO, "Received a WebSocket message, but can't find a client associated to it, creating it now...\n");
2784
                janus_wss_onopen(state);
2785
                /* Let's try again */
2786
                janus_mutex_lock(&wss_mutex);
2787
                client = g_hash_table_lookup(wss_sessions, state);
2788
                janus_mutex_unlock(&wss_mutex);
2789
                if(client == NULL) {
2790
                        JANUS_LOG(LOG_ERR, "Still no client, giving up...\n");
2791
                        libwebsock_close(state);
2792
                        return 0;
2793
                }
2794
        }
2795
        /* Parse it */
2796
        janus_request_source *source = janus_request_source_new(JANUS_SOURCE_WEBSOCKETS, (void *)client, (void *)msg);
2797
        /* Parse the JSON payload */
2798
        json_error_t error;
2799
        json_t *root = json_loads(msg->payload, 0, &error);
2800
        if(!root) {
2801
                janus_process_error(source, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
2802
                janus_request_source_destroy(source);
2803
                return 0;
2804
        }
2805
        if(!json_is_object(root)) {
2806
                janus_process_error(source, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
2807
                janus_request_source_destroy(source);
2808
                json_decref(root);
2809
                return 0;
2810
        }
2811
        /* Parse the request now */
2812
        janus_websocket_request *request = (janus_websocket_request *)calloc(1, sizeof(janus_websocket_request));
2813
        request->source = source;
2814
        request->request = root;
2815
        GError *tperror = NULL;
2816
        g_thread_pool_push(client->thread_pool, request, &tperror);
2817
        if(tperror != NULL) {
2818
                /* Something went wrong... */
2819
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to push task in thread pool...\n", tperror->code, tperror->message ? tperror->message : "??");
2820
                json_t *transaction = json_object_get(root, "transaction");
2821
                const char *transaction_text = json_is_string(transaction) ? json_string_value(transaction) : NULL;
2822
                janus_process_error(source, 0, transaction_text, JANUS_ERROR_UNKNOWN, "Thread pool error");
2823
                janus_request_source_destroy(source);
2824
                json_decref(root);
2825
        }
2826
        return 0;
2827
}
2828

    
2829
int janus_wss_onclose(libwebsock_client_state *state) {
2830
        JANUS_LOG(LOG_INFO, "WebSocket onclose: #%d\n", state->sockfd);
2831

    
2832
        janus_mutex_lock(&wss_mutex);
2833
        janus_websocket_client *client = g_hash_table_lookup(wss_sessions, state);
2834
        g_hash_table_remove(wss_sessions, state);
2835
        if(client != NULL) {
2836
                JANUS_LOG(LOG_INFO, "Destroying WebSocket client #%d\n", state->sockfd);
2837
                client->destroy = 1;
2838
                g_thread_pool_free(client->thread_pool, FALSE, FALSE);
2839
                if(client->thread != NULL) {
2840
                        JANUS_LOG(LOG_INFO, "Joining thread #%d\n", state->sockfd);
2841
                        g_thread_join(client->thread);
2842
                }
2843
                client->thread = NULL;
2844
                client->state = NULL;
2845
                if(client->sessions != NULL) {
2846
                        /* Remove all sessions (and handles) created by this client */
2847
                        janus_mutex_lock(&sessions_mutex);
2848
                        GHashTableIter iter;
2849
                        gpointer value;
2850
                        g_hash_table_iter_init(&iter, client->sessions);
2851
                        while(g_hash_table_iter_next(&iter, NULL, &value)) {
2852
                                janus_session *session = value;
2853
                                if(!session)
2854
                                        continue;
2855
                                session->last_activity = 0;        /* This will trigger a timeout */
2856
                        }
2857
                        janus_mutex_unlock(&sessions_mutex);
2858
                        g_hash_table_destroy(client->sessions);
2859
                }
2860
                /* Remove responses queue too, if needed */
2861
                if(client->responses != NULL) {
2862
                        char *response = NULL;
2863
                        while((response = g_async_queue_try_pop(client->responses)) != NULL) {
2864
                                g_free(response);
2865
                        }
2866
                        g_async_queue_unref(client->responses);
2867
                }
2868
                client->sessions = NULL;
2869
                g_free(client);
2870
                client = NULL;
2871
        }
2872
        janus_mutex_unlock(&wss_mutex);
2873
        
2874
        JANUS_LOG(LOG_INFO, "  -- closed\n");
2875
        return 0;
2876
}
2877

    
2878
void *janus_wss_thread(void *data) {
2879
        janus_websocket_client *client = (janus_websocket_client *)data;
2880
        if(client == NULL) {
2881
                JANUS_LOG(LOG_ERR, "No WebSocket client??\n");
2882
                return NULL;
2883
        }
2884
        int fd = client->state->sockfd;
2885
        JANUS_LOG(LOG_INFO, "Joining WebSocket thread: #%d\n", fd);
2886
        while(!client->destroy && !g_atomic_int_get(&stop)) {
2887
                janus_mutex_lock(&client->mutex);
2888
                /* Responses first */
2889
                char *response = NULL;
2890
                while ((response = g_async_queue_try_pop(client->responses)) != NULL) {
2891
                        if(!client->destroy && !g_atomic_int_get(&stop) && response) {
2892
                                /* Gotcha! */
2893
                                JANUS_LOG(LOG_VERB, "#%d: Sending response (%zu bytes)...\n", fd, strlen(response));
2894
                                int res = libwebsock_send_text(client->state, response);
2895
                                JANUS_LOG(LOG_VERB, "#%d  -- Sent (res=%d)\n", fd, res);
2896
                        }
2897
                        g_free(response);
2898
                }
2899
                /* Now iterate on all the sessions handled by this WebSocket client */
2900
                if(client->sessions != NULL && g_hash_table_size(client->sessions) > 0) {
2901
                        GHashTableIter iter;
2902
                        gpointer value;
2903
                        g_hash_table_iter_init(&iter, client->sessions);
2904
                        while (g_hash_table_iter_next(&iter, NULL, &value)) {
2905
                                janus_session *session = value;
2906
                                if(client->destroy || !session || session->destroy || g_atomic_int_get(&stop)) {
2907
                                        continue;
2908
                                }
2909
                                janus_http_event *event;
2910
                                while ((event = g_async_queue_try_pop(session->messages)) != NULL) {
2911
                                        if(!client->destroy && session && !session->destroy && !g_atomic_int_get(&stop) && event && event->payload) {
2912
                                                /* Gotcha! */
2913
                                                JANUS_LOG(LOG_VERB, "#%d: Sending event (%zu bytes)...\n", fd, strlen(event->payload));
2914
                                                int res = libwebsock_send_text(client->state, event->payload);
2915
                                                JANUS_LOG(LOG_VERB, "#%d  -- Sent (res=%d)\n", fd, res);
2916
                                        }
2917
                                }
2918
                                if(session->timeout) {
2919
                                        /* Close the websocket */
2920
                                        libwebsock_close(client->state);
2921
                                        //~ janus_wss_onclose(client->state);
2922
                                        break;
2923
                                }
2924
                        }
2925
                }
2926
                janus_mutex_unlock(&client->mutex);
2927
                /* Sleep 100ms */
2928
                g_usleep(100000);
2929
        }
2930
        JANUS_LOG(LOG_INFO, "Leaving WebSocket thread: #%d\n", fd);
2931
        return NULL;
2932
}
2933

    
2934
void janus_wss_task(gpointer data, gpointer user_data) {
2935
        JANUS_LOG(LOG_VERB, "Thread pool, serving request\n");
2936
        janus_websocket_request *request = (janus_websocket_request *)data;
2937
        janus_websocket_client *client = (janus_websocket_client *)data;
2938
        if(request == NULL || client == NULL) {
2939
                JANUS_LOG(LOG_ERR, "Missing request or client\n");
2940
                return;
2941
        }
2942
        janus_request_source *source = (janus_request_source *)request->source;
2943
        json_t *root = (json_t *)request->request;
2944
        janus_process_incoming_request(source, root);
2945
        janus_request_source_destroy(source);
2946
        request->source = NULL;
2947
        request->request = NULL;
2948
        g_free(request);
2949
}
2950
#endif
2951

    
2952

    
2953
#ifdef HAVE_RABBITMQ
2954
void *janus_rmq_in_thread(void *data) {
2955
        if(rmq_client == NULL) {
2956
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
2957
                return NULL;
2958
        }
2959
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ in thread\n");
2960

    
2961
        struct timeval timeout;
2962
        timeout.tv_sec = 0;
2963
        timeout.tv_usec = 20000;
2964
        amqp_frame_t frame;
2965
        while(!rmq_client->destroy && !g_atomic_int_get(&stop)) {
2966
                amqp_maybe_release_buffers(rmq_conn);
2967
                /* Wait for a frame */
2968
                int res = amqp_simple_wait_frame_noblock(rmq_conn, &frame, &timeout);
2969
                if(res != AMQP_STATUS_OK) {
2970
                        /* No data */
2971
                        if(res == AMQP_STATUS_TIMEOUT)
2972
                                continue;
2973
                        JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));
2974
                        break;
2975
                }
2976
                /* We expect method first */
2977
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
2978
                if(frame.frame_type != AMQP_FRAME_METHOD)
2979
                        continue;
2980
                JANUS_LOG(LOG_VERB, "Method %s\n", amqp_method_name(frame.payload.method.id));
2981
                if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD) {
2982
                        amqp_basic_deliver_t *d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
2983
                        JANUS_LOG(LOG_VERB, "Delivery #%u, %.*s\n", (unsigned) d->delivery_tag, (int) d->routing_key.len, (char *) d->routing_key.bytes);
2984
                }
2985
                /* Then the header */
2986
                amqp_simple_wait_frame(rmq_conn, &frame);
2987
                JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
2988
                if(frame.frame_type != AMQP_FRAME_HEADER)
2989
                        continue;
2990
                amqp_basic_properties_t *p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
2991
                if(p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
2992
                        JANUS_LOG(LOG_VERB, "  -- Reply-to: %.*s\n", (int) p->reply_to.len, (char *) p->reply_to.bytes);
2993
                }
2994
                char *correlation = NULL;
2995
                if(p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
2996
                        correlation = (char *)calloc(p->correlation_id.len+1, sizeof(char));
2997
                        sprintf(correlation, "%.*s", (int) p->correlation_id.len, (char *) p->correlation_id.bytes);
2998
                        JANUS_LOG(LOG_VERB, "  -- Correlation-id: %s\n", correlation);
2999
                }
3000
                if(p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
3001
                        JANUS_LOG(LOG_VERB, "  -- Content-type: %.*s\n", (int) p->content_type.len, (char *) p->content_type.bytes);
3002
                }
3003
                /* And the body */
3004
                uint64_t total = frame.payload.properties.body_size, received = 0;
3005
                char *payload = (char *)calloc(total+1, sizeof(char)), *index = payload;
3006
                while(received < total) {
3007
                        amqp_simple_wait_frame(rmq_conn, &frame);
3008
                        JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel);
3009
                        if(frame.frame_type != AMQP_FRAME_BODY)
3010
                                break;
3011
                        sprintf(index, "%.*s", (int) frame.payload.body_fragment.len, (char *) frame.payload.body_fragment.bytes);
3012
                        received += frame.payload.body_fragment.len;
3013
                        index = payload+received;
3014
                }
3015
                JANUS_LOG(LOG_VERB, "Got %"SCNu64"/%"SCNu64" bytes (%"SCNu64")\n", received, total, frame.payload.body_fragment.len);
3016
                JANUS_LOG(LOG_HUGE, "%s\n", payload);
3017
                /* Parse it */
3018
                janus_request_source *source = janus_request_source_new(JANUS_SOURCE_RABBITMQ, (void *)rmq_client, (void *)correlation);
3019
                /* Parse the JSON payload */
3020
                json_error_t error;
3021
                json_t *root = json_loads(payload, 0, &error);
3022
                if(!root) {
3023
                        janus_process_error(source, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);
3024
                        g_free(payload);
3025
                        continue;
3026
                }
3027
                if(!json_is_object(root)) {
3028
                        janus_process_error(source, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");
3029
                        g_free(payload);
3030
                        json_decref(root);
3031
                        continue;
3032
                }
3033
                g_free(payload);
3034
                /* Parse the request now */
3035
                janus_rabbitmq_request *request = (janus_rabbitmq_request *)calloc(1, sizeof(janus_rabbitmq_request));
3036
                request->source = source;
3037
                request->request = root;
3038
                GError *tperror = NULL;
3039
                g_thread_pool_push(rmq_client->thread_pool, request, &tperror);
3040
                if(tperror != NULL) {
3041
                        /* Something went wrong... */
3042
                        JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to push task in thread pool...\n", tperror->code, tperror->message ? tperror->message : "??");
3043
                }
3044
        }
3045
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ in thread\n");
3046
        return NULL;
3047
}
3048

    
3049
void *janus_rmq_out_thread(void *data) {
3050
        if(rmq_client == NULL) {
3051
                JANUS_LOG(LOG_ERR, "No RabbitMQ connection??\n");
3052
                return NULL;
3053
        }
3054
        JANUS_LOG(LOG_VERB, "Joining RabbitMQ out thread\n");
3055
        while(!rmq_client->destroy && !g_atomic_int_get(&stop)) {
3056
                janus_mutex_lock(&rmq_client->mutex);
3057
                /* We send responses from here as well, not only notifications */
3058
                janus_rabbitmq_response *response = NULL;
3059
                while ((response = g_async_queue_try_pop(rmq_client->responses)) != NULL) {
3060
                        if(!g_atomic_int_get(&stop) && response && response->payload) {
3061
                                /* Gotcha! */
3062
                                JANUS_LOG(LOG_VERB, "Sending response to RabbitMQ (%zu bytes)...\n", strlen(response->payload));
3063
                                JANUS_LOG(LOG_HUGE, "%s\n", response->payload);
3064
                                amqp_basic_properties_t props;
3065
                                props._flags = 0;
3066
                                props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
3067
                                props.reply_to = amqp_cstring_bytes("Janus");
3068
                                if(response->correlation_id) {
3069
                                        props._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
3070
                                        props.correlation_id = amqp_cstring_bytes(response->correlation_id);
3071
                                }
3072
                                props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
3073
                                props.content_type = amqp_cstring_bytes("application/json");
3074
                                amqp_bytes_t message = amqp_cstring_bytes(response->payload);
3075
                                int status = amqp_basic_publish(rmq_conn, rmq_channel, amqp_empty_bytes, from_janus_queue, 0, 0, &props, message);
3076
                                if(status != AMQP_STATUS_OK) {
3077
                                        JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
3078
                                }
3079
                                g_free(response->correlation_id);
3080
                                response->correlation_id = NULL;
3081
                                g_free(response->payload);
3082
                                response->payload = NULL;
3083
                                g_free(response);
3084
                                response = NULL;
3085
                        }
3086
                }
3087
                if(rmq_client->sessions != NULL && g_hash_table_size(rmq_client->sessions) > 0) {
3088
                        /* Iterate on all the sessions handled by this rmq_client */
3089
                        GHashTableIter iter;
3090
                        gpointer value;
3091
                        g_hash_table_iter_init(&iter, rmq_client->sessions);
3092
                        while (g_hash_table_iter_next(&iter, NULL, &value)) {
3093
                                janus_session *session = value;
3094
                                if(rmq_client->destroy || !session || session->destroy || g_atomic_int_get(&stop)) {
3095
                                        continue;
3096
                                }
3097
                                janus_http_event *event;
3098
                                while ((event = g_async_queue_try_pop(session->messages)) != NULL) {
3099
                                        if(!rmq_client->destroy && session && !session->destroy && !g_atomic_int_get(&stop) && event && event->payload) {
3100
                                                /* Gotcha! */
3101
                                                JANUS_LOG(LOG_VERB, "Sending event to RabbitMQ (%zu bytes)...\n", strlen(event->payload));
3102
                                                JANUS_LOG(LOG_HUGE, "%s\n", event->payload);
3103
                                                amqp_basic_properties_t props;
3104
                                                props._flags = 0;
3105
                                                props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
3106
                                                props.reply_to = amqp_cstring_bytes("Janus");
3107
                                                props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
3108
                                                props.content_type = amqp_cstring_bytes("application/json");
3109
                                                amqp_bytes_t message = amqp_cstring_bytes(event->payload);
3110
                                                int status = amqp_basic_publish(rmq_conn, rmq_channel, amqp_empty_bytes, from_janus_queue, 0, 0, &props, message);
3111
                                                if(status != AMQP_STATUS_OK) {
3112
                                                        JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
3113
                                                }
3114
                                        }
3115
                                }
3116
                                if(session->timeout) {
3117
                                        /* A session timed out, anything we should do? */
3118
                                        continue;
3119
                                }
3120
                        }
3121
                }
3122
                janus_mutex_unlock(&rmq_client->mutex);
3123
                /* Sleep 100ms */
3124
                g_usleep(100000);
3125
        }
3126
        JANUS_LOG(LOG_INFO, "Leaving RabbitMQ out thread\n");
3127
        return NULL;
3128
}
3129

    
3130
void janus_rmq_task(gpointer data, gpointer user_data) {
3131
        JANUS_LOG(LOG_VERB, "Thread pool, serving request\n");
3132
        janus_rabbitmq_request *request = (janus_rabbitmq_request *)data;
3133
        janus_rabbitmq_client *client = (janus_rabbitmq_client *)data;
3134
        if(request == NULL || client == NULL) {
3135
                JANUS_LOG(LOG_ERR, "Missing request or client\n");
3136
                return;
3137
        }
3138
        janus_request_source *source = (janus_request_source *)request->source;
3139
        json_t *root = (json_t *)request->request;
3140
        janus_process_incoming_request(source, root);
3141
        janus_request_source_destroy(source);
3142
        request->source = NULL;
3143
        request->request = NULL;
3144
        g_free(request);
3145
}
3146
#endif
3147

    
3148

    
3149
/* Admin/monitor helpers */
3150
json_t *janus_admin_stream_summary(janus_ice_stream *stream) {
3151
        if(stream == NULL)
3152
                return NULL;
3153
        json_t *s = json_object();
3154
        json_object_set_new(s, "id", json_integer(stream->stream_id));
3155
        json_object_set_new(s, "ready", json_integer(stream->cdone));
3156
        json_object_set_new(s, "disabled", json_string(stream->disabled ? "true" : "false"));
3157
        json_t *ss = json_object();
3158
        if(stream->audio_ssrc)
3159
                json_object_set_new(ss, "audio", json_integer(stream->audio_ssrc));
3160
        if(stream->video_ssrc)
3161
                json_object_set_new(ss, "video", json_integer(stream->video_ssrc));
3162
        if(stream->audio_ssrc_peer)
3163
                json_object_set_new(ss, "audio-peer", json_integer(stream->audio_ssrc_peer));
3164
        if(stream->video_ssrc_peer)
3165
                json_object_set_new(ss, "video-peer", json_integer(stream->video_ssrc_peer));
3166
        json_object_set_new(s, "ssrc", ss);
3167
        json_t *components = json_array();
3168
        if(stream->rtp_component) {
3169
                json_t *c = janus_admin_component_summary(stream->rtp_component);
3170
                if(c)
3171
                        json_array_append_new(components, c);
3172
        }
3173
        if(stream->rtcp_component) {
3174
                json_t *c = janus_admin_component_summary(stream->rtcp_component);
3175
                if(c)
3176
                        json_array_append_new(components, c);
3177
        }
3178
        json_object_set_new(s, "components", components);
3179
        return s;
3180
}
3181

    
3182
json_t *janus_admin_component_summary(janus_ice_component *component) {
3183
        if(component == NULL)
3184
                return NULL;
3185
        json_t *c = json_object();
3186
        json_object_set_new(c, "id", json_integer(component->component_id));
3187
        json_object_set_new(c, "state", json_string(janus_get_ice_state_name(component->state)));
3188
        if(component->local_candidates) {
3189
                json_t *cs = json_array();
3190
                GSList *candidates = component->local_candidates, *i = NULL;
3191
                for (i = candidates; i; i = i->next) {
3192
                        gchar *lc = (gchar *) i->data;
3193
                        if(lc)
3194
                                json_array_append_new(cs, json_string(lc));
3195
                }
3196
                json_object_set_new(c, "local-candidates", cs);
3197
        }
3198
        if(component->remote_candidates) {
3199
                json_t *cs = json_array();
3200
                GSList *candidates = component->remote_candidates, *i = NULL;
3201
                for (i = candidates; i; i = i->next) {
3202
                        gchar *rc = (gchar *) i->data;
3203
                        if(rc)
3204
                                json_array_append_new(cs, json_string(rc));
3205
                }
3206
                json_object_set_new(c, "remote-candidates", cs);
3207
        }
3208
        if(component->selected_pair) {
3209
                json_object_set_new(c, "selected-pair", json_string(component->selected_pair));
3210
        }
3211
        json_t *d = json_object();
3212
        json_t *in_stats = json_object();
3213
        json_t *out_stats = json_object();
3214
        if(component->dtls) {
3215
                janus_dtls_srtp *dtls = component->dtls;
3216
                json_object_set_new(d, "fingerprint", json_string(janus_dtls_get_local_fingerprint()));
3217
                json_object_set_new(d, "remote-fingerprint", json_string(component->stream->handle->remote_fingerprint));
3218
                json_object_set_new(d, "dtls-role", json_string(janus_get_dtls_srtp_role(component->stream->dtls_role)));
3219
                json_object_set_new(d, "dtls-state", json_string(janus_get_dtls_srtp_state(dtls->dtls_state)));
3220
                json_object_set_new(d, "valid", json_integer(dtls->srtp_valid));
3221
                json_object_set_new(d, "ready", json_integer(dtls->ready));
3222
                json_object_set_new(in_stats, "audio_bytes", json_integer(component->in_stats.audio_bytes));
3223
                json_object_set_new(in_stats, "video_bytes", json_integer(component->in_stats.video_bytes));
3224
                json_object_set_new(in_stats, "data_bytes", json_integer(component->in_stats.data_bytes));
3225
                json_object_set_new(in_stats, "audio_nacks", json_integer(component->in_stats.audio_nacks));
3226
                json_object_set_new(in_stats, "video_nacks", json_integer(component->in_stats.video_nacks));
3227
                json_object_set_new(out_stats, "audio_bytes", json_integer(component->out_stats.audio_bytes));
3228
                json_object_set_new(out_stats, "video_bytes", json_integer(component->out_stats.video_bytes));
3229
                json_object_set_new(out_stats, "data_bytes", json_integer(component->out_stats.data_bytes));
3230
                json_object_set_new(out_stats, "audio_nacks", json_integer(component->out_stats.audio_nacks));
3231
                json_object_set_new(out_stats, "video_nacks", json_integer(component->out_stats.video_nacks));
3232
                /* Compute the last second stuff too */
3233
                gint64 now = janus_get_monotonic_time();
3234
                guint64 bytes = 0;
3235
                if(component->in_stats.audio_bytes_lastsec) {
3236
                        GList *lastsec = component->in_stats.audio_bytes_lastsec;
3237
                        while(lastsec) {
3238
                                janus_ice_stats_item *s = (janus_ice_stats_item *)lastsec->data;
3239
                                if(s && now-s->when < G_USEC_PER_SEC)
3240
                                        bytes += s->bytes;
3241
                                lastsec = lastsec->next;
3242
                        }
3243
                }
3244
                json_object_set_new(in_stats, "audio_bytes_lastsec", json_integer(bytes));
3245
                bytes = 0;
3246
                if(component->in_stats.video_bytes_lastsec) {
3247
                        GList *lastsec = component->in_stats.video_bytes_lastsec;
3248
                        while(lastsec) {
3249
                                janus_ice_stats_item *s = (janus_ice_stats_item *)lastsec->data;
3250
                                if(s && now-s->when < G_USEC_PER_SEC)
3251
                                        bytes += s->bytes;
3252
                                lastsec = lastsec->next;
3253
                        }
3254
                }
3255
                json_object_set_new(in_stats, "video_bytes_lastsec", json_integer(bytes));
3256
#ifdef HAVE_SCTP
3257
                if(dtls->sctp)        /* FIXME */
3258
                        json_object_set_new(d, "sctp-association", json_integer(1));
3259
#endif
3260
        }
3261
        json_object_set_new(c, "dtls", d);
3262
        json_object_set_new(c, "in_stats", in_stats);
3263
        json_object_set_new(c, "out_stats", out_stats);
3264
        return c;
3265
}
3266

    
3267
/* Plugins */
3268
void janus_plugin_close(gpointer key, gpointer value, gpointer user_data) {
3269
        janus_plugin *plugin = (janus_plugin *)value;
3270
        if(!plugin)
3271
                return;
3272
        plugin->destroy();
3273
}
3274

    
3275
void janus_pluginso_close(gpointer key, gpointer value, gpointer user_data) {
3276
        void *plugin = (janus_plugin *)value;
3277
        if(!plugin)
3278
                return;
3279
        //~ dlclose(plugin);
3280
}
3281

    
3282
janus_plugin *janus_plugin_find(const gchar *package) {
3283
        if(package != NULL && plugins != NULL)        /* FIXME Do we need to fix the key pointer? */
3284
                return g_hash_table_lookup(plugins, package);
3285
        return NULL;
3286
}
3287

    
3288

    
3289
/* Plugin callback interface */
3290
int janus_push_event(janus_plugin_session *handle, janus_plugin *plugin, const char *transaction, const char *message, const char *sdp_type, const char *sdp) {
3291
        if(!plugin || !message)
3292
                return -1;
3293
        if(!handle || handle->stopped)
3294
                return -2;
3295
        janus_ice_handle *ice_handle = (janus_ice_handle *)handle->gateway_handle;
3296
        if(!ice_handle || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP))
3297
                return JANUS_ERROR_SESSION_NOT_FOUND;
3298
        janus_session *session = ice_handle->session;
3299
        if(!session || session->destroy)
3300
                return JANUS_ERROR_SESSION_NOT_FOUND;
3301
        /* Make sure this is JSON */
3302
        json_error_t error;
3303
        json_t *event = json_loads(message, 0, &error);
3304
        if(!event) {
3305
                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot push event (JSON error: on line %d: %s)\n", ice_handle->handle_id, error.line, error.text);
3306
                return JANUS_ERROR_INVALID_JSON;
3307
        }
3308
        if(!json_is_object(event)) {
3309
                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot push event (JSON error: not an object)\n", ice_handle->handle_id);
3310
                return JANUS_ERROR_INVALID_JSON_OBJECT;
3311
        }
3312
        /* Attach JSEP if possible? */
3313
        json_t *jsep = NULL;
3314
        if(sdp_type != NULL && sdp != NULL) {
3315
                jsep = janus_handle_sdp(handle, plugin, sdp_type, sdp);
3316
                if(jsep == NULL) {
3317
                        if(ice_handle == NULL || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3318
                                        || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) {
3319
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot push event (handle not available anymore or negotiation stopped)\n", ice_handle->handle_id);
3320
                                return JANUS_ERROR_HANDLE_NOT_FOUND;
3321
                        } else {
3322
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Cannot push event (JSON error: problem with the SDP)\n", ice_handle->handle_id);
3323
                                return JANUS_ERROR_JSEP_INVALID_SDP;
3324
                        }
3325
                }
3326
        }
3327
        /* Prepare JSON event */
3328
        json_t *reply = json_object();
3329
        json_object_set_new(reply, "janus", json_string("event"));
3330
        json_object_set_new(reply, "session_id", json_integer(session->session_id));
3331
        json_object_set_new(reply, "sender", json_integer(ice_handle->handle_id));
3332
        if(transaction != NULL)
3333
                json_object_set_new(reply, "transaction", json_string(transaction));
3334
        json_t *plugin_data = json_object();
3335
        json_object_set_new(plugin_data, "plugin", json_string(plugin->get_package()));
3336
        json_object_set(plugin_data, "data", event);
3337
        json_object_set(reply, "plugindata", plugin_data);
3338
        if(jsep != NULL)
3339
                json_object_set(reply, "jsep", jsep);
3340
        /* Convert to a string */
3341
        char *reply_text = json_dumps(reply, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
3342
        json_decref(event);
3343
        json_decref(plugin_data);
3344
        if(jsep != NULL)
3345
                json_decref(jsep);
3346
        json_decref(reply);
3347
        /* Send the event */
3348
        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Adding event to queue of messages...\n", ice_handle->handle_id);
3349
        janus_http_event *notification = (janus_http_event *)calloc(1, sizeof(janus_http_event));
3350
        if(notification == NULL) {
3351
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
3352
                return JANUS_ERROR_UNKNOWN;        /* FIXME Do we need something like "Internal Server Error"? */
3353
        }
3354
        notification->code = 200;
3355
        notification->payload = reply_text;
3356
        notification->allocated = 1;
3357

    
3358
        g_async_queue_push(session->messages, notification);
3359

    
3360
        return JANUS_OK;
3361
}
3362

    
3363
json_t *janus_handle_sdp(janus_plugin_session *handle, janus_plugin *plugin, const char *sdp_type, const char *sdp) {
3364
        if(handle == NULL || handle->stopped || plugin == NULL || sdp_type == NULL || sdp == NULL) {
3365
                JANUS_LOG(LOG_ERR, "Invalid arguments\n");
3366
                return NULL;
3367
        }
3368
        int offer = 0;
3369
        if(!strcasecmp(sdp_type, "offer")) {
3370
                /* This is an offer from a plugin */
3371
                offer = 1;
3372
        } else if(!strcasecmp(sdp_type, "answer")) {
3373
                /* This is an answer from a plugin */
3374
        } else {
3375
                /* TODO Handle other messages */
3376
                JANUS_LOG(LOG_ERR, "Unknown type '%s'\n", sdp_type);
3377
                return NULL;
3378
        }
3379
        janus_ice_handle *ice_handle = (janus_ice_handle *)handle->gateway_handle;
3380
        //~ if(ice_handle == NULL || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
3381
        if(ice_handle == NULL) {
3382
                JANUS_LOG(LOG_ERR, "Invalid ICE handle\n");
3383
                return NULL;
3384
        }
3385
        /* Is this valid SDP? */
3386
        int audio = 0, video = 0, data = 0, bundle = 0, rtcpmux = 0, trickle = 0;
3387
        janus_sdp *parsed_sdp = janus_sdp_preparse(sdp, &audio, &video, &data, &bundle, &rtcpmux, &trickle);
3388
        if(parsed_sdp == NULL) {
3389
                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Couldn't parse SDP...\n", ice_handle->handle_id);
3390
                return NULL;
3391
        }
3392
        janus_sdp_free(parsed_sdp);
3393
        gboolean updating = FALSE;
3394
        if(offer) {
3395
                /* We still don't have a local ICE setup */
3396
                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Audio %s been negotiated\n", ice_handle->handle_id, audio ? "has" : "has NOT");
3397
                if(audio > 1) {
3398
                        JANUS_LOG(LOG_ERR, "[%"SCNu64"] More than one audio line? only going to negotiate one...\n", ice_handle->handle_id);
3399
                }
3400
                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Video %s been negotiated\n", ice_handle->handle_id, video ? "has" : "has NOT");
3401
                if(video > 1) {
3402
                        JANUS_LOG(LOG_ERR, "[%"SCNu64"] More than one video line? only going to negotiate one...\n", ice_handle->handle_id);
3403
                }
3404
                JANUS_LOG(LOG_VERB, "[%"SCNu64"] SCTP/DataChannels %s been negotiated\n", ice_handle->handle_id, data ? "have" : "have NOT");
3405
                if(data > 1) {
3406
                        JANUS_LOG(LOG_ERR, "[%"SCNu64"] More than one data line? only going to negotiate one...\n", ice_handle->handle_id);
3407
                }
3408
#ifndef HAVE_SCTP
3409
                if(data) {
3410
                        JANUS_LOG(LOG_WARN, "[%"SCNu64"]   -- DataChannels have been negotiated, but support for them has not been compiled...\n", ice_handle->handle_id);
3411
                }
3412
#endif
3413
                /* Are we still cleaning up from a previous media session? */
3414
                if(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {
3415
                        JANUS_LOG(LOG_INFO, "[%"SCNu64"] Still cleaning up from a previous media session, let's wait a bit...\n", ice_handle->handle_id);
3416
                        gint64 waited = 0;
3417
                        while(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {
3418
                                JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still cleaning up from a previous media session, let's wait a bit...\n", ice_handle->handle_id);
3419
                                g_usleep(100000);
3420
                                waited += 100000;
3421
                                if(waited >= 3*G_USEC_PER_SEC) {
3422
                                        JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 3 seconds, that's enough!\n", ice_handle->handle_id);
3423
                                        break;
3424
                                }
3425
                        }
3426
                }
3427
                if(ice_handle->agent == NULL) {
3428
                        /* Process SDP in order to setup ICE locally (this is going to result in an answer from the browser) */
3429
                        if(janus_ice_setup_local(ice_handle, 0, audio, video, data, bundle, rtcpmux, trickle) < 0) {
3430
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error setting ICE locally\n", ice_handle->handle_id);
3431
                                return NULL;
3432
                        }
3433
                } else {
3434
                        updating = TRUE;
3435
                        JANUS_LOG(LOG_INFO, "[%"SCNu64"] Updating existing session\n", ice_handle->handle_id);
3436
                }
3437
        }
3438
        if(!updating) {
3439
                /* Wait for candidates-done callback */
3440
                while(ice_handle->cdone < ice_handle->streams_num) {
3441
                        if(ice_handle == NULL || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3442
                                        || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) {
3443
                                JANUS_LOG(LOG_WARN, "[%"SCNu64"] Handle detached or PC closed, giving up...!\n", ice_handle ? ice_handle->handle_id : 0);
3444
                                return NULL;
3445
                        }
3446
                        JANUS_LOG(LOG_INFO, "[%"SCNu64"] Waiting for candidates-done callback...\n", ice_handle->handle_id);
3447
                        g_usleep(100000);
3448
                        if(ice_handle->cdone < 0) {
3449
                                JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error gathering candidates!\n", ice_handle->handle_id);
3450
                                return NULL;
3451
                        }
3452
                }
3453
        }
3454
        /* Anonymize SDP */
3455
        char *sdp_stripped = janus_sdp_anonymize(sdp);
3456
        if(sdp_stripped == NULL) {
3457
                /* Invalid SDP */
3458
                JANUS_LOG(LOG_ERR, "Invalid SDP\n");
3459
                return NULL;
3460
        }
3461
        /* Add our details */
3462
        char *sdp_merged = janus_sdp_merge(ice_handle, sdp_stripped);
3463
        if(sdp_merged == NULL) {
3464
                /* Couldn't merge SDP */
3465
                JANUS_LOG(LOG_ERR, "Error merging SDP\n");
3466
                g_free(sdp_stripped);
3467
                return NULL;
3468
        }
3469
        /* FIXME Any disabled m-line? */
3470
        if(strstr(sdp_merged, "m=audio 0")) {
3471
                JANUS_LOG(LOG_VERB, "Audio disabled via SDP\n");
3472
                if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
3473
                                || (!video && !data)) {
3474
                        JANUS_LOG(LOG_VERB, "  -- Marking audio stream as disabled\n");
3475
                        janus_ice_stream *stream = g_hash_table_lookup(ice_handle->streams, GUINT_TO_POINTER(ice_handle->audio_id));
3476
                        if(stream)
3477
                                stream->disabled = TRUE;
3478
                }
3479
        }
3480
        if(strstr(sdp_merged, "m=video 0")) {
3481
                JANUS_LOG(LOG_VERB, "Video disabled via SDP\n");
3482
                if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
3483
                                || (!audio && !data)) {
3484
                        JANUS_LOG(LOG_VERB, "  -- Marking video stream as disabled\n");
3485
                        janus_ice_stream *stream = NULL;
3486
                        if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)) {
3487
                                stream = g_hash_table_lookup(ice_handle->streams, GUINT_TO_POINTER(ice_handle->video_id));
3488
                        } else {
3489
                                gint id = ice_handle->audio_id > 0 ? ice_handle->audio_id : ice_handle->video_id;
3490
                                stream = g_hash_table_lookup(ice_handle->streams, GUINT_TO_POINTER(id));
3491
                        }
3492
                        if(stream)
3493
                                stream->disabled = TRUE;
3494
                }
3495
        }
3496
        if(strstr(sdp_merged, "m=application 0 DTLS/SCTP")) {
3497
                JANUS_LOG(LOG_VERB, "Data Channel disabled via SDP\n");
3498
                if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)
3499
                                || (!audio && !video)) {
3500
                        JANUS_LOG(LOG_VERB, "  -- Marking data channel stream as disabled\n");
3501
                        janus_ice_stream *stream = NULL;
3502
                        if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE)) {
3503
                                stream = g_hash_table_lookup(ice_handle->streams, GUINT_TO_POINTER(ice_handle->data_id));
3504
                        } else {
3505
                                gint id = ice_handle->audio_id > 0 ? ice_handle->audio_id : (ice_handle->video_id > 0 ? ice_handle->video_id : ice_handle->data_id);
3506
                                stream = g_hash_table_lookup(ice_handle->streams, GUINT_TO_POINTER(id));
3507
                        }
3508
                        if(stream)
3509
                                stream->disabled = TRUE;
3510
                }
3511
        }
3512

    
3513
        if(!updating) {
3514
                if(offer) {
3515
                        /* We set the flag to wait for an answer before handling trickle candidates */
3516
                        janus_flags_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);
3517
                } else {
3518
                        JANUS_LOG(LOG_INFO, "[%"SCNu64"] Done! Ready to setup remote candidates and send connectivity checks...\n", ice_handle->handle_id);
3519
                        if(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_BUNDLE) && audio && video) {
3520
                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- bundle is supported by the browser, getting rid of one of the RTP/RTCP components, if any...\n", ice_handle->handle_id);
3521
                                if(audio) {
3522
                                        /* Get rid of video and data, if present */
3523
                                        if(ice_handle->streams && ice_handle->video_stream) {
3524
                                                ice_handle->audio_stream->video_ssrc = ice_handle->video_stream->video_ssrc;
3525
                                                ice_handle->audio_stream->video_ssrc_peer = ice_handle->video_stream->video_ssrc_peer;
3526
                                                janus_ice_stream_free(ice_handle->streams, ice_handle->video_stream);
3527
                                        }
3528
                                        ice_handle->video_stream = NULL;
3529
                                        if(ice_handle->video_id > 0) {
3530
                                                nice_agent_attach_recv (ice_handle->agent, ice_handle->video_id, 1, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3531
                                                nice_agent_attach_recv (ice_handle->agent, ice_handle->video_id, 2, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3532
                                        }
3533
                                        ice_handle->video_id = 0;
3534
                                        if(ice_handle->streams && ice_handle->data_stream) {
3535
                                                janus_ice_stream_free(ice_handle->streams, ice_handle->data_stream);
3536
                                        }
3537
                                        ice_handle->data_stream = NULL;
3538
                                        if(ice_handle->data_id > 0) {
3539
                                                nice_agent_attach_recv (ice_handle->agent, ice_handle->data_id, 1, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3540
                                        }
3541
                                        ice_handle->data_id = 0;
3542
                                } else if(video) {
3543
                                        /* Get rid of data, if present */
3544
                                        if(ice_handle->streams && ice_handle->data_stream) {
3545
                                                janus_ice_stream_free(ice_handle->streams, ice_handle->data_stream);
3546
                                        }
3547
                                        ice_handle->data_stream = NULL;
3548
                                        if(ice_handle->data_id > 0) {
3549
                                                nice_agent_attach_recv (ice_handle->agent, ice_handle->data_id, 1, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3550
                                        }
3551
                                        ice_handle->data_id = 0;
3552
                                }
3553
                        }
3554
                        if(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX)) {
3555
                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- rtcp-mux is supported by the browser, getting rid of RTCP components, if any...\n", ice_handle->handle_id);
3556
                                if(ice_handle->audio_stream && ice_handle->audio_stream->rtcp_component && ice_handle->audio_stream->components != NULL) {
3557
                                        nice_agent_attach_recv (ice_handle->agent, ice_handle->audio_id, 2, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3558
                                        janus_ice_component_free(ice_handle->audio_stream->components, ice_handle->audio_stream->rtcp_component);
3559
                                        ice_handle->audio_stream->rtcp_component = NULL;
3560
                                }
3561
                                if(ice_handle->video_stream && ice_handle->video_stream->rtcp_component && ice_handle->video_stream->components != NULL) {
3562
                                        nice_agent_attach_recv (ice_handle->agent, ice_handle->video_id, 2, g_main_loop_get_context (ice_handle->iceloop), NULL, NULL);
3563
                                        janus_ice_component_free(ice_handle->video_stream->components, ice_handle->video_stream->rtcp_component);
3564
                                        ice_handle->video_stream->rtcp_component = NULL;
3565
                                }
3566
                        }
3567
                        janus_mutex_lock(&ice_handle->mutex);
3568
                        /* Not trickling (anymore?), set remote candidates now */
3569
                        if(ice_handle->audio_id > 0) {
3570
                                janus_ice_setup_remote_candidates(ice_handle, ice_handle->audio_id, 1);
3571
                                if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX))        /* http://tools.ietf.org/html/rfc5761#section-5.1.3 */
3572
                                        janus_ice_setup_remote_candidates(ice_handle, ice_handle->audio_id, 2);
3573
                        }
3574
                        if(ice_handle->video_id > 0) {
3575
                                janus_ice_setup_remote_candidates(ice_handle, ice_handle->video_id, 1);
3576
                                if(!janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RTCPMUX))        /* http://tools.ietf.org/html/rfc5761#section-5.1.3 */
3577
                                        janus_ice_setup_remote_candidates(ice_handle, ice_handle->video_id, 2);
3578
                        }
3579
                        if(ice_handle->data_id > 0) {
3580
                                janus_ice_setup_remote_candidates(ice_handle, ice_handle->data_id, 1);
3581
                        }
3582
                        if(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) &&
3583
                                        !janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES)) {
3584
                                /* Still trickling, but take note of the fact ICE has started now */
3585
                                JANUS_LOG(LOG_VERB, "Still trickling, but we can start send connectivity checks already, now\n");
3586
                                janus_flags_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_START);
3587
                        }
3588
                        janus_mutex_unlock(&ice_handle->mutex);
3589
                }
3590
        }
3591
        
3592
        /* Prepare JSON event */
3593
        json_t *jsep = json_object();
3594
        json_object_set_new(jsep, "type", json_string(sdp_type));
3595
        json_object_set_new(jsep, "sdp", json_string(sdp_merged));
3596
        g_free(sdp_stripped);
3597
        //~ g_free(sdp_merged);
3598
        ice_handle->local_sdp = sdp_merged;
3599
        return jsep;
3600
}
3601

    
3602
void janus_relay_rtp(janus_plugin_session *plugin_session, int video, char *buf, int len) {
3603
        if(!plugin_session || plugin_session->stopped || buf == NULL || len < 1)
3604
                return;
3605
        janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle;
3606
        if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3607
                        || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
3608
                return;
3609
        janus_ice_relay_rtp(handle, video, buf, len);
3610
}
3611

    
3612
void janus_relay_rtcp(janus_plugin_session *plugin_session, int video, char *buf, int len) {
3613
        if(!plugin_session || plugin_session->stopped || buf == NULL || len < 1)
3614
                return;
3615
        janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle;
3616
        if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3617
                        || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
3618
                return;
3619
        janus_ice_relay_rtcp(handle, video, buf, len);
3620
}
3621

    
3622
void janus_relay_data(janus_plugin_session *plugin_session, char *buf, int len) {
3623
        if(!plugin_session || plugin_session->stopped || buf == NULL || len < 1)
3624
                return;
3625
        janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle;
3626
        if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3627
                        || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
3628
                return;
3629
#ifdef HAVE_SCTP
3630
        janus_ice_relay_data(handle, buf, len);
3631
#else
3632
        JANUS_LOG(LOG_WARN, "Asked to relay data, but Data Channels support has not been compiled...\n");
3633
#endif
3634
}
3635

    
3636
void janus_close_pc(janus_plugin_session *plugin_session) {
3637
        /* A plugin asked to get rid of a PeerConnection */
3638
        if(!plugin_session)
3639
                return;
3640
        janus_ice_handle *ice_handle = (janus_ice_handle *)plugin_session->gateway_handle;
3641
        if(!ice_handle)
3642
                return;
3643
        if(janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP)
3644
                        || janus_flags_is_set(&ice_handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
3645
                return;
3646
        janus_session *session = (janus_session *)ice_handle->session;
3647
        if(!session)
3648
                return;
3649
                
3650
        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Plugin asked to hangup PeerConnection: sending alert\n", ice_handle->handle_id);
3651
        /* Send an alert on all the DTLS connections */
3652
        janus_ice_webrtc_hangup(ice_handle);
3653
        /* Get rid of the PeerConnection */
3654
        if(ice_handle->iceloop) {
3655
                gint64 waited = 0;
3656
                while(ice_handle->iceloop && !g_main_loop_is_running(ice_handle->iceloop)) {
3657
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] ICE loop exists but is not running, waiting for it to run\n", ice_handle->handle_id);
3658
                        g_usleep (100000);
3659
                        waited += 100000;
3660
                        if(waited >= G_USEC_PER_SEC) {
3661
                                JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited a second, that's enough!\n", ice_handle->handle_id);
3662
                                break;
3663
                        }
3664
                }
3665
                if(ice_handle->iceloop) {
3666
                        JANUS_LOG(LOG_VERB, "[%"SCNu64"] Forcing ICE loop to quit (%s)\n", ice_handle->handle_id, g_main_loop_is_running(ice_handle->iceloop) ? "running" : "NOT running");
3667
                        g_main_loop_quit(ice_handle->iceloop);
3668
                        g_main_context_wakeup(ice_handle->icectx);
3669
                }
3670
        }
3671
}
3672

    
3673
void janus_end_session(janus_plugin_session *plugin_session) {
3674
        /* A plugin asked to get rid of a handle */
3675
        if(!plugin_session)
3676
                return;
3677
        janus_ice_handle *ice_handle = (janus_ice_handle *)plugin_session->gateway_handle;
3678
        if(!ice_handle)
3679
                return;
3680
        /* Destroy the handle */
3681
        janus_ice_handle_destroy(ice_handle->session, ice_handle->handle_id);
3682
}
3683

    
3684

    
3685
/* Main */
3686
gint main(int argc, char *argv[])
3687
{
3688
        /* Core dumps may be disallowed by parent of this process; change that */
3689
        struct rlimit core_limits;
3690
        core_limits.rlim_cur = core_limits.rlim_max = RLIM_INFINITY;
3691
        setrlimit(RLIMIT_CORE, &core_limits);
3692

    
3693
        struct gengetopt_args_info args_info;
3694
        /* Let's call our cmdline parser */
3695
        if(cmdline_parser(argc, argv, &args_info) != 0)
3696
                exit(1);
3697
        
3698
        JANUS_PRINT("---------------------------------------------------\n");
3699
        JANUS_PRINT("  Starting Meetecho Janus (WebRTC Gateway) v%s\n", JANUS_VERSION_STRING);
3700
        JANUS_PRINT("---------------------------------------------------\n\n");
3701
        
3702
        /* Handle SIGINT */
3703
        signal(SIGINT, janus_handle_signal);
3704

    
3705
        /* Setup Glib */
3706
#if !GLIB_CHECK_VERSION(2, 36, 0)
3707
        g_type_init();
3708
#endif
3709
        
3710
        /* Logging level: default is info */
3711
        log_level = LOG_INFO;
3712
        if(args_info.debug_level_given) {
3713
                if(args_info.debug_level_arg < LOG_NONE)
3714
                        args_info.debug_level_arg = 0;
3715
                else if(args_info.debug_level_arg > LOG_MAX)
3716
                        args_info.debug_level_arg = LOG_MAX;
3717
                log_level = args_info.debug_level_arg;
3718
        }
3719

    
3720
        /* Any configuration to open? */
3721
        if(args_info.config_given) {
3722
                config_file = g_strdup(args_info.config_arg);
3723
                if(config_file == NULL) {
3724
                        JANUS_PRINT("Memory error!\n");
3725
                        exit(1);
3726
                }
3727
        }
3728
        if(args_info.configs_folder_given) {
3729
                configs_folder = g_strdup(args_info.configs_folder_arg);
3730
                if(configs_folder == NULL) {
3731
                        JANUS_PRINT("Memory error!\n");
3732
                        exit(1);
3733
                }
3734
        } else {
3735
                configs_folder = g_strdup (CONFDIR);
3736
        }
3737
        if(config_file == NULL) {
3738
                char file[255];
3739
                g_snprintf(file, 255, "%s/janus.cfg", configs_folder);
3740
                config_file = g_strdup(file);
3741
                if(config_file == NULL) {
3742
                        JANUS_PRINT("Memory error!\n");
3743
                        exit(1);
3744
                }
3745
        }
3746
        JANUS_PRINT("Reading configuration from %s\n", config_file);
3747
        if((config = janus_config_parse(config_file)) == NULL) {
3748
                if(args_info.config_given) {
3749
                        /* We only give up if the configuration file was explicitly provided */
3750
                        exit(1);
3751
                }
3752
                JANUS_PRINT("Error reading/parsing the configuration file, going on with the defaults and the command line arguments\n");
3753
                config = janus_config_create("janus.cfg");
3754
                if(config == NULL) {
3755
                        /* If we can't even create an empty configuration, something's definitely wrong */
3756
                        exit(1);
3757
                }
3758
        }
3759
        janus_config_print(config);
3760
        if(args_info.debug_level_given) {
3761
                char debug[5];
3762
                g_snprintf(debug, 5, "%d", args_info.debug_level_arg);
3763
                janus_config_add_item(config, "general", "debug_level", debug);
3764
        } else {
3765
                /* No command line directive on logging, try the configuration file */
3766
                janus_config_item *item = janus_config_get_item_drilldown(config, "general", "debug_level");
3767
                if(item && item->value) {
3768
                        int temp_level = atoi(item->value);
3769
                        if(temp_level == 0 && strcmp(item->value, "0")) {
3770
                                JANUS_PRINT("Invalid debug level %s (configuration), using default (info=4)\n", item->value);
3771
                        } else {
3772
                                log_level = temp_level;
3773
                                if(log_level < LOG_NONE)
3774
                                        log_level = 0;
3775
                                else if(log_level > LOG_MAX)
3776
                                        log_level = LOG_MAX;
3777
                        }
3778
                }
3779
        }
3780
        /* Any command line argument that should overwrite the configuration? */
3781
        JANUS_PRINT("Checking command line arguments...\n");
3782
        if(args_info.interface_given) {
3783
                janus_config_add_item(config, "general", "interface", args_info.interface_arg);
3784
        }
3785
        if(args_info.configs_folder_given) {
3786
                janus_config_add_item(config, "general", "configs_folder", args_info.configs_folder_arg);
3787
        }
3788
        if(args_info.plugins_folder_given) {
3789
                janus_config_add_item(config, "general", "plugins_folder", args_info.plugins_folder_arg);
3790
        }
3791
        if(args_info.apisecret_given) {
3792
                janus_config_add_item(config, "general", "api_secret", args_info.apisecret_arg);
3793
        }
3794
        if(args_info.no_http_given) {
3795
                janus_config_add_item(config, "webserver", "http", "no");
3796
        }
3797
        if(args_info.port_given) {
3798
                char port[20];
3799
                g_snprintf(port, 20, "%d", args_info.port_arg);
3800
                janus_config_add_item(config, "webserver", "port", port);
3801
        }
3802
        if(args_info.secure_port_given) {
3803
                janus_config_add_item(config, "webserver", "https", "yes");
3804
                char port[20];
3805
                g_snprintf(port, 20, "%d", args_info.secure_port_arg);
3806
                janus_config_add_item(config, "webserver", "secure_port", port);
3807
        }
3808
        if(args_info.base_path_given) {
3809
                janus_config_add_item(config, "webserver", "base_path", args_info.base_path_arg);
3810
        }
3811
        if(args_info.no_websockets_given) {
3812
                janus_config_add_item(config, "webserver", "ws", "no");
3813
        }
3814
        if(args_info.ws_port_given) {
3815
                char port[20];
3816
                g_snprintf(port, 20, "%d", args_info.port_arg);
3817
                janus_config_add_item(config, "webserver", "ws_port", port);
3818
        }
3819
        if(args_info.ws_secure_port_given) {
3820
                janus_config_add_item(config, "webserver", "ws_ssl", "yes");
3821
                char port[20];
3822
                g_snprintf(port, 20, "%d", args_info.ws_secure_port_arg);
3823
                janus_config_add_item(config, "webserver", "ws_secure_port", port);
3824
        }
3825
        if(args_info.enable_rabbitmq_given) {
3826
                janus_config_add_item(config, "rabbitmq", "enable", "yes");
3827
        }
3828
#ifdef HAVE_RABBITMQ
3829
        if(args_info.rabbitmq_server_given) {
3830
                /* Split in server and port (if port missing, use AMQP_PROTOCOL_PORT as default) */
3831
                char *rmqport = strrchr(args_info.stun_server_arg, ':');
3832
                if(rmqport != NULL) {
3833
                        *rmqport = '\0';
3834
                        rmqport++;
3835
                        janus_config_add_item(config, "rabbitmq", "host", args_info.rabbitmq_server_arg);
3836
                        janus_config_add_item(config, "rabbitmq", "port", rmqport);
3837
                } else {
3838
                        janus_config_add_item(config, "rabbitmq", "host", args_info.rabbitmq_server_arg);
3839
                        char port[10];
3840
                        g_snprintf(port, 10, "%d", AMQP_PROTOCOL_PORT);
3841
                        janus_config_add_item(config, "rabbitmq", "port", port);
3842
                }
3843
        }
3844
        if(args_info.rabbitmq_in_queue_given) {
3845
                janus_config_add_item(config, "rabbitmq", "to_janus", args_info.rabbitmq_in_queue_arg);
3846
        }
3847
        if(args_info.rabbitmq_out_queue_given) {
3848
                janus_config_add_item(config, "rabbitmq", "from_janus", args_info.rabbitmq_out_queue_arg);
3849
        }
3850
#endif
3851
        if(args_info.admin_secret_given) {
3852
                janus_config_add_item(config, "admin", "admin_secret", args_info.admin_secret_arg);
3853
        }
3854
        if(args_info.no_admin_given) {
3855
                janus_config_add_item(config, "admin", "admin_http", "no");
3856
        }
3857
        if(args_info.admin_port_given) {
3858
                char port[20];
3859
                g_snprintf(port, 20, "%d", args_info.admin_port_arg);
3860
                janus_config_add_item(config, "admin", "admin_port", port);
3861
        }
3862
        if(args_info.admin_secure_port_given) {
3863
                janus_config_add_item(config, "admin", "admin_https", "yes");
3864
                char port[20];
3865
                g_snprintf(port, 20, "%d", args_info.admin_secure_port_arg);
3866
                janus_config_add_item(config, "admin", "admin_secure_port", port);
3867
        }
3868
        if(args_info.admin_base_path_given) {
3869
                janus_config_add_item(config, "admin", "admin_base_path", args_info.admin_base_path_arg);
3870
        }
3871
        if(args_info.admin_acl_given) {
3872
                janus_config_add_item(config, "admin", "admin_acl", args_info.admin_acl_arg);
3873
        }
3874
        if(args_info.cert_pem_given) {
3875
                janus_config_add_item(config, "certificates", "cert_pem", args_info.cert_pem_arg);
3876
        }
3877
        if(args_info.cert_key_given) {
3878
                janus_config_add_item(config, "certificates", "cert_key", args_info.cert_key_arg);
3879
        }
3880
        if(args_info.stun_server_given) {
3881
                /* Split in server and port (if port missing, use 3478 as default) */
3882
                char *stunport = strrchr(args_info.stun_server_arg, ':');
3883
                if(stunport != NULL) {
3884
                        *stunport = '\0';
3885
                        stunport++;
3886
                        janus_config_add_item(config, "nat", "stun_server", args_info.stun_server_arg);
3887
                        janus_config_add_item(config, "nat", "stun_port", stunport);
3888
                } else {
3889
                        janus_config_add_item(config, "nat", "stun_server", args_info.stun_server_arg);
3890
                        janus_config_add_item(config, "nat", "stun_port", "3478");
3891
                }
3892
        }
3893
        if(args_info.public_ip_given) {
3894
                janus_config_add_item(config, "nat", "public_ip", args_info.public_ip_arg);
3895
        }
3896
        if(args_info.ice_ignore_list_given) {
3897
                janus_config_add_item(config, "nat", "ice_ignore_list", args_info.ice_ignore_list_arg);
3898
        }
3899
        if(args_info.libnice_debug_given) {
3900
                janus_config_add_item(config, "nat", "nice_debug", "true");
3901
        }
3902
        if(args_info.ice_lite_given) {
3903
                janus_config_add_item(config, "nat", "ice_lite", "true");
3904
        }
3905
        if(args_info.ice_tcp_given) {
3906
                janus_config_add_item(config, "nat", "ice_tcp", "true");
3907
        }
3908
        if(args_info.ipv6_candidates_given) {
3909
                janus_config_add_item(config, "media", "ipv6", "true");
3910
        }
3911
        if(args_info.max_nack_queue_given) {
3912
                char mnq[20];
3913
                g_snprintf(mnq, 20, "%d", args_info.max_nack_queue_arg);
3914
                janus_config_add_item(config, "media", "max_nack_queue", mnq);
3915
        }
3916
        if(args_info.rtp_port_range_given) {
3917
                janus_config_add_item(config, "media", "rtp_port_range", args_info.rtp_port_range_arg);
3918
        }
3919
        janus_config_print(config);
3920
        
3921
        JANUS_PRINT("Debug/log level is %d\n", log_level);
3922

    
3923
        /* Any IP/interface to ignore? */
3924
        janus_config_item *item = janus_config_get_item_drilldown(config, "nat", "ice_ignore_list");
3925
        if(item && item->value) {
3926
                gchar **list = g_strsplit(item->value, ",", -1);
3927
                gchar *index = list[0];
3928
                if(index != NULL) {
3929
                        int i=0;
3930
                        while(index != NULL) {
3931
                                if(strlen(index) > 0) {
3932
                                        JANUS_LOG(LOG_INFO, "Adding '%s' to the ICE ignore list...\n", index);
3933
                                        janus_ice_ignore_interface(g_strdup(index));
3934
                                }
3935
                                i++;
3936
                                index = list[i];
3937
                        }
3938
                }
3939
                g_strfreev(list);
3940
                list = NULL;
3941
        }
3942
        /* What is the local public IP? */
3943
        JANUS_LOG(LOG_VERB, "Available interfaces:\n");
3944
        item = janus_config_get_item_drilldown(config, "general", "interface");
3945
        if(item && item->value) {
3946
                JANUS_LOG(LOG_VERB, "  -- Will try to use %s\n", item->value);
3947
        }
3948
        struct ifaddrs *myaddrs, *ifa;
3949
        int status = getifaddrs(&myaddrs);
3950
        char *tmp = NULL;
3951
        if (status == 0) {
3952
                for (ifa = myaddrs; ifa != NULL; ifa = ifa->ifa_next) {
3953
                        if(ifa->ifa_addr == NULL) {
3954
                                continue;
3955
                        }
3956
                        if((ifa->ifa_flags & IFF_UP) == 0) {
3957
                                continue;
3958
                        }
3959
                        /* Check the interface name first: we can ignore that as well */
3960
                        if(ifa->ifa_name != NULL && janus_ice_is_ignored(ifa->ifa_name))
3961
                                continue;
3962
                        if(ifa->ifa_addr->sa_family == AF_INET) {
3963
                                struct sockaddr_in *ip = (struct sockaddr_in *)(ifa->ifa_addr);
3964
                                char buf[16];
3965
                                if(inet_ntop(ifa->ifa_addr->sa_family, (void *)&(ip->sin_addr), buf, sizeof(buf)) == NULL) {
3966
                                        JANUS_LOG(LOG_ERR, "\t%s:\tinet_ntop failed!\n", ifa->ifa_name);
3967
                                } else {
3968
                                        JANUS_LOG(LOG_VERB, "\t%s:\t%s\n", ifa->ifa_name, buf);
3969
                                        /* Check if this IP address is in the ignore list, now */
3970