Statistics
| Branch: | Revision:

janus-gateway / transports / janus_pfunix.c @ 88b5da7b

History | View | Annotate | Download (25.3 KB)

1
/*! \file   janus_pfunix.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus Unix Sockets transport plugin
5
 * \details  This is an implementation of a Unix Sockets transport for the
6
 * Janus API. This means that, with the help of this module, local
7
 * applications can use Unix Sockets to make requests to the gateway.
8
 * This plugin can make use of either the \c SOCK_SEQPACKET or the
9
 * \c SOCK_DGRAM socket type according to what you configure, so make
10
 * sure you're using the right one when writing a client application.
11
 * Pretty much as it happens with WebSockets, the same client socket can
12
 * be used for both sending requests and receiving notifications, without
13
 * any need for long polls. At the same time, without the concept of a
14
 * REST path, requests sent through the Unix Sockets interface will need
15
 * to include, when needed, additional pieces of information like
16
 * \c session_id and \c handle_id. That is, where you'd send a Janus
17
 * request related to a specific session to the \c /janus/<session> path,
18
 * with Unix Sockets you'd have to send the same request with an additional
19
 * \c session_id field in the JSON payload. The same applies for the handle.
20
 * \note When you create a session using Unix Sockets, a subscription to
21
 * the events related to it is done automatically, so no need for an
22
 * explicit request as the GET in the plain HTTP API. Closing a client
23
 * Unix Socket will also destroy all the sessions it created.
24
 *
25
 * \ingroup transports
26
 * \ref transports
27
 */
28

    
29
#include "transport.h"
30

    
31
#include <poll.h>
32
#include <sys/un.h>
33

    
34
#include "../debug.h"
35
#include "../apierror.h"
36
#include "../config.h"
37
#include "../mutex.h"
38
#include "../utils.h"
39

    
40

    
41
/* Transport plugin information */
42
#define JANUS_PFUNIX_VERSION                        1
43
#define JANUS_PFUNIX_VERSION_STRING                "0.0.1"
44
#define JANUS_PFUNIX_DESCRIPTION                "This transport plugin adds Unix Sockets support to the Janus API."
45
#define JANUS_PFUNIX_NAME                                "JANUS Unix Sockets transport plugin"
46
#define JANUS_PFUNIX_AUTHOR                                "Meetecho s.r.l."
47
#define JANUS_PFUNIX_PACKAGE                        "janus.transport.pfunix"
48

    
49
/* Transport methods */
50
janus_transport *create(void);
51
int janus_pfunix_init(janus_transport_callbacks *callback, const char *config_path);
52
void janus_pfunix_destroy(void);
53
int janus_pfunix_get_api_compatibility(void);
54
int janus_pfunix_get_version(void);
55
const char *janus_pfunix_get_version_string(void);
56
const char *janus_pfunix_get_description(void);
57
const char *janus_pfunix_get_name(void);
58
const char *janus_pfunix_get_author(void);
59
const char *janus_pfunix_get_package(void);
60
gboolean janus_pfunix_is_janus_api_enabled(void);
61
gboolean janus_pfunix_is_admin_api_enabled(void);
62
int janus_pfunix_send_message(void *transport, void *request_id, gboolean admin, json_t *message);
63
void janus_pfunix_session_created(void *transport, guint64 session_id);
64
void janus_pfunix_session_over(void *transport, guint64 session_id, gboolean timeout);
65

    
66

    
67
/* Transport setup */
68
static janus_transport janus_pfunix_transport =
69
        JANUS_TRANSPORT_INIT (
70
                .init = janus_pfunix_init,
71
                .destroy = janus_pfunix_destroy,
72

    
73
                .get_api_compatibility = janus_pfunix_get_api_compatibility,
74
                .get_version = janus_pfunix_get_version,
75
                .get_version_string = janus_pfunix_get_version_string,
76
                .get_description = janus_pfunix_get_description,
77
                .get_name = janus_pfunix_get_name,
78
                .get_author = janus_pfunix_get_author,
79
                .get_package = janus_pfunix_get_package,
80

    
81
                .is_janus_api_enabled = janus_pfunix_is_janus_api_enabled,
82
                .is_admin_api_enabled = janus_pfunix_is_admin_api_enabled,
83

    
84
                .send_message = janus_pfunix_send_message,
85
                .session_created = janus_pfunix_session_created,
86
                .session_over = janus_pfunix_session_over,
87
        );
88

    
89
/* Transport creator */
90
janus_transport *create(void) {
91
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_PFUNIX_NAME);
92
        return &janus_pfunix_transport;
93
}
94

    
95

    
96
/* Useful stuff */
97
static gint initialized = 0, stopping = 0;
98
static janus_transport_callbacks *gateway = NULL;
99

    
100
/* JSON serialization options */
101
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
102

    
103
#define BUFFER_SIZE                8192
104

    
105
struct sockaddr_un sizecheck;
106
#ifndef UNIX_PATH_MAX
107
#define UNIX_PATH_MAX sizeof(sizecheck.sun_path)
108
#endif
109

    
110
/* Unix Sockets server thread */
111
static GThread *pfunix_thread = NULL;
112
void *janus_pfunix_thread(void *data);
113

    
114
/* Unix Sockets servers (and whether they should be SOCK_SEQPACKET or SOCK_DGRAM) */
115
static int pfd = -1, admin_pfd = -1;
116
static gboolean dgram = FALSE, admin_dgram = FALSE;
117
/* Socket pair to notify about the need for outgoing data */
118
static int write_fd[2];
119

    
120
/* Unix Sockets client session */
121
typedef struct janus_pfunix_client {
122
        int fd;                                                /* Client socket (in case SOCK_SEQPACKET is used) */
123
        struct sockaddr_un addr;        /* Client address (in case SOCK_DGRAM is used) */
124
        gboolean admin;                                /* Whether this client is for the Admin or Janus API */
125
        GAsyncQueue *messages;                /* Queue of outgoing messages to push */
126
        gboolean session_timeout;        /* Whether a Janus session timeout occurred in the core */
127
} janus_pfunix_client;
128
static GHashTable *clients = NULL, *clients_by_fd = NULL, *clients_by_path = NULL;
129
static janus_mutex clients_mutex;
130

    
131

    
132
/* Helper to create a named Unix Socket out of the path to link to */
133
static int janus_pfunix_create_socket(char *pfname, gboolean use_dgram) {
134
        if(pfname == NULL)
135
                return -1;
136
        int fd = -1;
137
        if(strlen(pfname) > UNIX_PATH_MAX) {
138
                JANUS_LOG(LOG_WARN, "The provided path name (%s) is longer than %lu characters, it will be truncated\n", pfname, UNIX_PATH_MAX);
139
                pfname[UNIX_PATH_MAX] = '\0';
140
        }
141
        /* Create socket */
142
        int flags = use_dgram ? SOCK_DGRAM | SOCK_NONBLOCK : SOCK_SEQPACKET | SOCK_NONBLOCK;
143
        fd = socket(use_dgram ? AF_UNIX : PF_UNIX, flags, 0);
144
        if(fd < 0) {
145
                JANUS_LOG(LOG_FATAL, "Unix Sockets %s creation failed: %d, %s\n", pfname, errno, strerror(errno));
146
        } else {
147
                /* Unlink before binding */
148
                unlink(pfname);
149
                /* Let's bind to the provided path now */
150
                struct sockaddr_un address;
151
                memset(&address, 0, sizeof(address));
152
                address.sun_family = AF_UNIX;
153
                g_snprintf(address.sun_path, UNIX_PATH_MAX, "%s", pfname);
154
                JANUS_LOG(LOG_VERB, "Binding Unix Socket %s... (Janus API)\n", pfname);
155
                if(bind(fd, (struct sockaddr *)&address, sizeof(address)) != 0) {
156
                        JANUS_LOG(LOG_FATAL, "Bind for Unix Socket %s failed: %d, %s\n", pfname, errno, strerror(errno));
157
                        close(fd);
158
                        fd = -1;
159
                        return fd;
160
                }
161
                if(!use_dgram) {
162
                        JANUS_LOG(LOG_VERB, "Listening on Unix Socket %s...\n", pfname);
163
                        if(listen(fd, 128) != 0) {
164
                                JANUS_LOG(LOG_FATAL, "Listening on Unix Socket %s failed: %d, %s\n", pfname, errno, strerror(errno));
165
                                close(fd);
166
                                fd = -1;
167
                        }
168
                }
169
        }
170
        return fd;
171
}
172

    
173
/* Transport implementation */
174
int janus_pfunix_init(janus_transport_callbacks *callback, const char *config_path) {
175
        if(g_atomic_int_get(&stopping)) {
176
                /* Still stopping from before */
177
                return -1;
178
        }
179
        if(callback == NULL || config_path == NULL) {
180
                /* Invalid arguments */
181
                return -1;
182
        }
183

    
184
        /* This is the callback we'll need to invoke to contact the gateway */
185
        gateway = callback;
186

    
187
        /* Read configuration */
188
        char filename[255];
189
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_PFUNIX_PACKAGE);
190
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
191
        janus_config *config = janus_config_parse(filename);
192
        if(config != NULL) {
193
                /* Handle configuration */
194
                janus_config_print(config);
195

    
196
                janus_config_item *item = janus_config_get_item_drilldown(config, "general", "json");
197
                if(item && item->value) {
198
                        /* Check how we need to format/serialize the JSON output */
199
                        if(!strcasecmp(item->value, "indented")) {
200
                                /* Default: indented, we use three spaces for that */
201
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
202
                        } else if(!strcasecmp(item->value, "plain")) {
203
                                /* Not indented and no new lines, but still readable */
204
                                json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
205
                        } else if(!strcasecmp(item->value, "compact")) {
206
                                /* Compact, so no spaces between separators */
207
                                json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
208
                        } else {
209
                                JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
210
                                json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
211
                        }
212
                }
213

    
214
                /* First of all, initialize the socketpair for writeable notifications */
215
                if(socketpair(PF_LOCAL, SOCK_STREAM, 0, write_fd) < 0) {
216
                        JANUS_LOG(LOG_FATAL, "Error creating socket pair for writeable events: %d, %s\n", errno, strerror(errno));
217
                        return -1;
218
                }
219

    
220
                /* Setup the Janus API Unix Sockets server(s) */
221
                item = janus_config_get_item_drilldown(config, "general", "enabled");
222
                if(!item || !item->value || !janus_is_true(item->value)) {
223
                        JANUS_LOG(LOG_WARN, "Unix Sockets server disabled (Janus API)\n");
224
                } else {
225
                        item = janus_config_get_item_drilldown(config, "general", "path");
226
                        char *pfname = (char *)(item && item->value ? item->value : "/tmp/ux-janusapi");
227
                        item = janus_config_get_item_drilldown(config, "general", "type");
228
                        const char *type = item && item->value ? item->value : "SOCK_SEQPACKET";
229
                        dgram = FALSE;
230
                        if(!strcasecmp(type, "SOCK_SEQPACKET")) {
231
                                dgram = FALSE;
232
                        } else if(!strcasecmp(type, "SOCK_DGRAM")) {
233
                                dgram = TRUE;
234
                        } else {
235
                                JANUS_LOG(LOG_WARN, "Unknown type %s, assuming SOCK_SEQPACKET\n", type);
236
                                type = "SOCK_SEQPACKET";
237
                        }
238
                        JANUS_LOG(LOG_INFO, "Configuring %s Unix Sockets server (Janus API)\n", type);
239
                        pfd = janus_pfunix_create_socket(pfname, dgram);
240
                }
241
                /* Do the same for the Admin API, if enabled */
242
                item = janus_config_get_item_drilldown(config, "admin", "admin_enabled");
243
                if(!item || !item->value || !janus_is_true(item->value)) {
244
                        JANUS_LOG(LOG_WARN, "Unix Sockets server disabled (Admin API)\n");
245
                } else {
246
                        item = janus_config_get_item_drilldown(config, "admin", "admin_path");
247
                        char *pfname = (char *)(item && item->value ? item->value : "/tmp/ux-janusadmin");
248
                        item = janus_config_get_item_drilldown(config, "admin", "admin_type");
249
                        const char *type = item && item->value ? item->value : "SOCK_SEQPACKET";
250
                        if(!strcasecmp(type, "SOCK_SEQPACKET")) {
251
                                admin_dgram = FALSE;
252
                        } else if(!strcasecmp(type, "SOCK_DGRAM")) {
253
                                admin_dgram = TRUE;
254
                        } else {
255
                                JANUS_LOG(LOG_WARN, "Unknown type %s, assuming SOCK_SEQPACKET\n", type);
256
                                type = "SOCK_SEQPACKET";
257
                        }
258
                        JANUS_LOG(LOG_INFO, "Configuring %s Unix Sockets server (Admin API)\n", type);
259
                        admin_pfd = janus_pfunix_create_socket(pfname, admin_dgram);
260
                }
261
        }
262
        janus_config_destroy(config);
263
        config = NULL;
264
        if(pfd < 0 && admin_pfd < 0) {
265
                JANUS_LOG(LOG_FATAL, "No Unix Sockets server started, giving up...\n");
266
                return -1;        /* No point in keeping the plugin loaded */
267
        }
268

    
269
        /* Create a couple of hashtables for all clients */
270
        clients = g_hash_table_new(NULL, NULL);
271
        clients_by_fd = g_hash_table_new(NULL, NULL);
272
        clients_by_path = g_hash_table_new(g_str_hash, g_str_equal);
273
        janus_mutex_init(&clients_mutex);
274

    
275
        /* Start the Unix Sockets service thread */
276
        GError *error = NULL;
277
        pfunix_thread = g_thread_try_new("pfunix thread", &janus_pfunix_thread, NULL, &error);
278
        if(!pfunix_thread) {
279
                g_atomic_int_set(&initialized, 0);
280
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the Unix Sockets thread...\n", error->code, error->message ? error->message : "??");
281
                return -1;
282
        }
283

    
284
        /* Done */
285
        g_atomic_int_set(&initialized, 1);
286
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_PFUNIX_NAME);
287
        return 0;
288
}
289

    
290
void janus_pfunix_destroy(void) {
291
        if(!g_atomic_int_get(&initialized))
292
                return;
293
        g_atomic_int_set(&stopping, 1);
294

    
295
        /* Stop the service thread */
296
        int res = 0;
297
        do {
298
                res = write(write_fd[1], "x", 1);
299
        } while(res == -1 && errno == EINTR);
300

    
301
        if(pfunix_thread != NULL) {
302
                g_thread_join(pfunix_thread);
303
                pfunix_thread = NULL;
304
        }
305

    
306
        g_atomic_int_set(&initialized, 0);
307
        g_atomic_int_set(&stopping, 0);
308
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_PFUNIX_NAME);
309
}
310

    
311
int janus_pfunix_get_api_compatibility(void) {
312
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
313
        return JANUS_TRANSPORT_API_VERSION;
314
}
315

    
316
int janus_pfunix_get_version(void) {
317
        return JANUS_PFUNIX_VERSION;
318
}
319

    
320
const char *janus_pfunix_get_version_string(void) {
321
        return JANUS_PFUNIX_VERSION_STRING;
322
}
323

    
324
const char *janus_pfunix_get_description(void) {
325
        return JANUS_PFUNIX_DESCRIPTION;
326
}
327

    
328
const char *janus_pfunix_get_name(void) {
329
        return JANUS_PFUNIX_NAME;
330
}
331

    
332
const char *janus_pfunix_get_author(void) {
333
        return JANUS_PFUNIX_AUTHOR;
334
}
335

    
336
const char *janus_pfunix_get_package(void) {
337
        return JANUS_PFUNIX_PACKAGE;
338
}
339

    
340
gboolean janus_pfunix_is_janus_api_enabled(void) {
341
        return pfd > -1;
342
}
343

    
344
gboolean janus_pfunix_is_admin_api_enabled(void) {
345
        return admin_pfd > -1;
346
}
347

    
348
int janus_pfunix_send_message(void *transport, void *request_id, gboolean admin, json_t *message) {
349
        if(message == NULL)
350
                return -1;
351
        if(transport == NULL) {
352
                g_free(message);
353
                return -1;
354
        }
355
        /* Make sure this is related to a still valid Unix Sockets session */
356
        janus_pfunix_client *client = (janus_pfunix_client *)transport;
357
        janus_mutex_lock(&clients_mutex);
358
        if(g_hash_table_lookup(clients, client) == NULL) {
359
                janus_mutex_unlock(&clients_mutex);
360
                JANUS_LOG(LOG_WARN, "Outgoing message for invalid client %p\n", client);
361
                g_free(message);
362
                message = NULL;
363
                return -1;
364
        }
365
        janus_mutex_unlock(&clients_mutex);
366
        /* Convert to string */
367
        char *payload = json_dumps(message, json_format);
368
        json_decref(message);
369
        if(client->fd != -1) {
370
                /* SOCK_SEQPACKET, enqueue the packet and have poll tell us when it's time to send it */
371
                g_async_queue_push(client->messages, payload);
372
                /* Notify the thread there's data to send */
373
                int res = 0;
374
                do {
375
                        res = write(write_fd[1], "x", 1);
376
                } while(res == -1 && errno == EINTR);
377
        } else {
378
                /* SOCK_DGRAM, send it right away */
379
                int res = 0;
380
                do {
381
                        res = sendto(client->admin ? admin_pfd : pfd, payload, strlen(payload), 0, (struct sockaddr *)&client->addr, sizeof(struct sockaddr_un));
382
                } while(res == -1 && errno == EINTR);
383
                g_free(payload);
384
        }
385
        return 0;
386
}
387

    
388
void janus_pfunix_session_created(void *transport, guint64 session_id) {
389
        /* We don't care */
390
}
391

    
392
void janus_pfunix_session_over(void *transport, guint64 session_id, gboolean timeout) {
393
        /* We only care if it's a timeout: if so, close the connection */
394
        if(transport == NULL || !timeout)
395
                return;
396
        /* FIXME Should we really close the connection in case of a timeout? */
397
        janus_pfunix_client *client = (janus_pfunix_client *)transport;
398
        janus_mutex_lock(&clients_mutex);
399
        if(g_hash_table_lookup(clients, client) != NULL) {
400
                client->session_timeout = TRUE;
401
                if(client->fd != -1) {
402
                        /* Shutdown the client socket */
403
                        shutdown(client->fd, SHUT_WR);
404
                } else {
405
                        /* Destroy the client */
406
                        g_hash_table_remove(clients_by_path, client->addr.sun_path);
407
                        g_hash_table_remove(clients, client);
408
                        if(client->messages != NULL) {
409
                                char *response = NULL;
410
                                while((response = g_async_queue_try_pop(client->messages)) != NULL) {
411
                                        g_free(response);
412
                                }
413
                                g_async_queue_unref(client->messages);
414
                        }
415
                        g_free(client);
416
                }
417
        }
418
        janus_mutex_unlock(&clients_mutex);
419
}
420

    
421

    
422
/* Thread */
423
void *janus_pfunix_thread(void *data) {
424
        JANUS_LOG(LOG_INFO, "Unix Sockets thread started\n");
425

    
426
        int fds = 0;
427
        struct pollfd poll_fds[1024];        /* FIXME Should we allow for more clients? */
428
        char buffer[BUFFER_SIZE];
429
        struct iovec iov[1];
430
        struct msghdr msg;
431
        memset(&msg, 0, sizeof(msg));
432
        memset(iov, 0, sizeof(iov));
433
        iov[0].iov_base = buffer;
434
        iov[0].iov_len = sizeof(buffer);
435
        msg.msg_iov = iov;
436
        msg.msg_iovlen = 1;
437

    
438
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
439
                /* Prepare poll list of file descriptors */
440
                fds = 0;
441
                /* Writeable monitor */
442
                poll_fds[fds].fd = write_fd[0];
443
                poll_fds[fds].events = POLLIN;
444
                fds++;
445
                if(pfd > -1) {
446
                        /* Janus API */
447
                        poll_fds[fds].fd = pfd;
448
                        poll_fds[fds].events = POLLIN;
449
                        fds++;
450
                }
451
                if(admin_pfd > -1) {
452
                        /* Admin API */
453
                        poll_fds[fds].fd = admin_pfd;
454
                        poll_fds[fds].events = POLLIN;
455
                        fds++;
456
                }
457
                /* Iterate on available clients, to see if we need to POLLIN or POLLOUT too */
458
                janus_mutex_lock(&clients_mutex);
459
                GHashTableIter iter;
460
                gpointer value;
461
                g_hash_table_iter_init(&iter, clients_by_fd);
462
                while(g_hash_table_iter_next(&iter, NULL, &value)) {
463
                        janus_pfunix_client *client = value;
464
                        if(client->fd > -1) {
465
                                poll_fds[fds].fd = client->fd;
466
                                poll_fds[fds].events = g_async_queue_length(client->messages) > 0 ? POLLIN | POLLOUT : POLLIN;
467
                                fds++;
468
                        }
469
                }
470
                janus_mutex_unlock(&clients_mutex);
471

    
472
                /* Start polling */
473
                int res = poll(poll_fds, fds, -1);
474
                if(res == 0)
475
                        continue;
476
                if(res < 0) {
477
                        JANUS_LOG(LOG_ERR, "poll() failed\n");
478
                        break;
479
                }
480
                int i = 0;
481
                for(i=0; i<fds; i++) {
482
                        if(poll_fds[i].revents & (POLLERR | POLLHUP)) {
483
                                /* Socket error? Shall we do something? */
484
                                if(poll_fds[i].fd == write_fd[0]) {
485
                                        /* Error in the wake-up socketpair, that sucks: try recreating it */
486
                                        JANUS_LOG(LOG_WARN, "Error polling wake-up socketpair: %s...\n",
487
                                                poll_fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP");
488
                                        close(write_fd[0]);
489
                                        write_fd[0] = -1;
490
                                        close(write_fd[1]);
491
                                        write_fd[1] = -1;
492
                                        if(socketpair(PF_LOCAL, SOCK_STREAM, 0, write_fd) < 0) {
493
                                                JANUS_LOG(LOG_FATAL, "Error creating socket pair for writeable events: %d, %s\n", errno, strerror(errno));
494
                                                continue;
495
                                        }
496
                                } else if(poll_fds[i].fd == pfd) {
497
                                        /* Error in the Janus API socket */
498
                                        JANUS_LOG(LOG_WARN, "Error polling Unix Sockets Janus API interface (%s), disabling it\n",
499
                                                poll_fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP");
500
                                        close(pfd);
501
                                        pfd = -1;
502
                                        continue;
503
                                } else if(poll_fds[i].fd == admin_pfd) {
504
                                        /* Error in the Admin API socket */
505
                                        JANUS_LOG(LOG_WARN, "Error polling Unix Sockets Admin API interface (%s), disabling it\n",
506
                                                poll_fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP");
507
                                        close(admin_pfd);
508
                                        admin_pfd = -1;
509
                                        continue;
510
                                } else {
511
                                        /* Error in a client socket, find and remove it */
512
                                        janus_mutex_lock(&clients_mutex);
513
                                        janus_pfunix_client *client = g_hash_table_lookup(clients_by_fd, GINT_TO_POINTER(poll_fds[i].fd));
514
                                        if(client == NULL) {
515
                                                /* We're not handling this, ignore */
516
                                                continue;
517
                                        }
518
                                        JANUS_LOG(LOG_INFO, "Unix Sockets client disconnected (%d)\n", poll_fds[i].fd);
519
                                        /* Notify core */
520
                                        gateway->transport_gone(&janus_pfunix_transport, client);
521
                                        /* Close socket */
522
                                        shutdown(SHUT_RDWR, poll_fds[i].fd);
523
                                        close(poll_fds[i].fd);
524
                                        client->fd = -1;
525
                                        /* Destroy the client */
526
                                        g_hash_table_remove(clients_by_fd, GINT_TO_POINTER(poll_fds[i].fd));
527
                                        g_hash_table_remove(clients, client);
528
                                        if(client->messages != NULL) {
529
                                                char *response = NULL;
530
                                                while((response = g_async_queue_try_pop(client->messages)) != NULL) {
531
                                                        g_free(response);
532
                                                }
533
                                                g_async_queue_unref(client->messages);
534
                                        }
535
                                        g_free(client);
536
                                        janus_mutex_unlock(&clients_mutex);
537
                                        continue;
538
                                }
539
                                continue;
540
                        }
541
                        if(poll_fds[i].revents & POLLOUT) {
542
                                /* Find the client from its file descriptor */
543
                                janus_mutex_lock(&clients_mutex);
544
                                janus_pfunix_client *client = g_hash_table_lookup(clients_by_fd, GINT_TO_POINTER(poll_fds[i].fd));
545
                                if(client != NULL) {
546
                                        char *payload = NULL;
547
                                        while((payload = g_async_queue_try_pop(client->messages)) != NULL) {
548
                                                int res = 0;
549
                                                do {
550
                                                        res = write(client->fd, payload, strlen(payload));
551
                                                } while(res == -1 && errno == EINTR);
552
                                                /* FIXME Should we check if sent everything? */
553
                                                JANUS_LOG(LOG_HUGE, "Written %d/%zu bytes on %d\n", res, strlen(payload), client->fd);
554
                                                g_free(payload);
555
                                        }
556
                                }
557
                                janus_mutex_unlock(&clients_mutex);
558
                        }
559
                        if(poll_fds[i].revents & POLLIN) {
560
                                if(poll_fds[i].fd == write_fd[0]) {
561
                                        /* Read and ignore: we use this to unlock the poll if there's data to write */
562
                                        res = read(poll_fds[i].fd, buffer, BUFFER_SIZE);
563
                                } else if(poll_fds[i].fd == pfd || poll_fds[i].fd == admin_pfd) {
564
                                        /* Janus/Admin API: accept the new client (SOCK_SEQPACKET) or receive data (SOCK_DGRAM) */
565
                                        struct sockaddr_un address;
566
                                        socklen_t addrlen = sizeof(address);
567
                                        if((poll_fds[i].fd == pfd && !dgram) || (poll_fds[i].fd == admin_pfd && !admin_dgram)) {
568
                                                /* SOCK_SEQPACKET */
569
                                                int cfd = accept(poll_fds[i].fd, (struct sockaddr *) &address, &addrlen);
570
                                                if(cfd > -1) {
571
                                                        JANUS_LOG(LOG_INFO, "Got new Unix Sockets %s API client: %d\n",
572
                                                                poll_fds[i].fd == pfd ? "Janus" : "Admin", cfd);
573
                                                        /* Allocate new client */
574
                                                        janus_pfunix_client *client = g_malloc0(sizeof(janus_pfunix_client));
575
                                                        client->fd = cfd;
576
                                                        client->admin = (poll_fds[i].fd == admin_pfd);        /* API client type */
577
                                                        client->messages = g_async_queue_new();
578
                                                        client->session_timeout = FALSE;
579
                                                        /* Take note of this new client */
580
                                                        janus_mutex_lock(&clients_mutex);
581
                                                        g_hash_table_insert(clients_by_fd, GINT_TO_POINTER(cfd), client);
582
                                                        g_hash_table_insert(clients, client, client);
583
                                                        janus_mutex_unlock(&clients_mutex);
584
                                                }
585
                                        } else {
586
                                                /* SOCK_DGRAM */
587
                                                struct sockaddr_storage address;
588
                                                res = recvfrom(poll_fds[i].fd, buffer, sizeof(buffer), 0, (struct sockaddr *)&address, &addrlen);
589
                                                if(res < 0) {
590
                                                        if(errno != EAGAIN && errno != EWOULDBLOCK) {
591
                                                                JANUS_LOG(LOG_ERR, "Error reading from client (%s API)...\n",
592
                                                                        poll_fds[i].fd == pfd ? "Janus" : "Admin");
593
                                                        }
594
                                                        continue;
595
                                                }
596
                                                buffer[res] = '\0';
597
                                                /* Is this a new client, or one we knew about already? */
598
                                                struct sockaddr_un *uaddr = (struct sockaddr_un *)&address;
599
                                                if(strlen(uaddr->sun_path) == 0) {
600
                                                        /* No path provided, drop the packet */
601
                                                        JANUS_LOG(LOG_WARN, "Dropping packet from unknown source (no path provided)\n");
602
                                                        continue;
603
                                                }
604
                                                janus_mutex_lock(&clients_mutex);
605
                                                janus_pfunix_client *client = g_hash_table_lookup(clients_by_path, uaddr->sun_path);
606
                                                if(client == NULL) {
607
                                                        JANUS_LOG(LOG_INFO, "Got new Unix Sockets %s API client: %s\n",
608
                                                                poll_fds[i].fd == pfd ? "Janus" : "Admin", uaddr->sun_path);
609
                                                        /* Allocate new client */
610
                                                        client = g_malloc0(sizeof(janus_pfunix_client));
611
                                                        client->fd = -1;
612
                                                        memcpy(&client->addr, uaddr, sizeof(struct sockaddr_un));
613
                                                        client->admin = (poll_fds[i].fd == admin_pfd);        /* API client type */
614
                                                        client->messages = g_async_queue_new();
615
                                                        client->session_timeout = FALSE;
616
                                                        /* Take note of this new client */
617
                                                        g_hash_table_insert(clients_by_path, uaddr->sun_path, client);
618
                                                        g_hash_table_insert(clients, client, client);
619
                                                }
620
                                                janus_mutex_unlock(&clients_mutex);
621
                                                JANUS_LOG(LOG_VERB, "Message from client %s (%d bytes)\n", uaddr->sun_path, res);
622
                                                JANUS_LOG(LOG_HUGE, "%s\n", buffer);
623
                                                /* Parse the JSON payload */
624
                                                json_error_t error;
625
                                                json_t *root = json_loads(buffer, 0, &error);
626
                                                /* Notify the core, passing both the object and, since it may be needed, the error */
627
                                                gateway->incoming_request(&janus_pfunix_transport, client, NULL, client->admin, root, &error);
628
                                        }
629
                                } else {
630
                                        /* Client data: receive message */
631
                                        iov[0].iov_len = sizeof(buffer);
632
                                        res = recvmsg(poll_fds[i].fd, &msg, MSG_WAITALL);
633
                                        if(res < 0) {
634
                                                if(errno != EAGAIN && errno != EWOULDBLOCK) {
635
                                                        JANUS_LOG(LOG_ERR, "Error reading from client %d...\n", poll_fds[i].fd);
636
                                                }
637
                                                continue;
638
                                        }
639
                                        if(msg.msg_flags & MSG_TRUNC) {
640
                                                /* Apparently our buffer is not large enough? */
641
                                                JANUS_LOG(LOG_WARN, "Incoming message from client %d truncated (%d bytes), dropping it...\n", poll_fds[i].fd, res);
642
                                                continue;
643
                                        }
644
                                        /* Find the client from its file descriptor */
645
                                        janus_mutex_lock(&clients_mutex);
646
                                        janus_pfunix_client *client = g_hash_table_lookup(clients_by_fd, GINT_TO_POINTER(poll_fds[i].fd));
647
                                        if(client == NULL) {
648
                                                janus_mutex_unlock(&clients_mutex);
649
                                                JANUS_LOG(LOG_WARN, "Got data from unknown Unix Sockets client %d, closing connection...\n", poll_fds[i].fd);
650
                                                /* Close socket */
651
                                                shutdown(SHUT_RDWR, poll_fds[i].fd);
652
                                                close(poll_fds[i].fd);
653
                                                continue;
654
                                        }
655
                                        if(res == 0) {
656
                                                JANUS_LOG(LOG_INFO, "Unix Sockets client disconnected (%d)\n", poll_fds[i].fd);
657
                                                /* Notify core */
658
                                                gateway->transport_gone(&janus_pfunix_transport, client);
659
                                                /* Close socket */
660
                                                shutdown(SHUT_RDWR, poll_fds[i].fd);
661
                                                close(poll_fds[i].fd);
662
                                                client->fd = -1;
663
                                                /* Destroy the client */
664
                                                g_hash_table_remove(clients_by_fd, GINT_TO_POINTER(poll_fds[i].fd));
665
                                                g_hash_table_remove(clients, client);
666
                                                if(client->messages != NULL) {
667
                                                        char *response = NULL;
668
                                                        while((response = g_async_queue_try_pop(client->messages)) != NULL) {
669
                                                                g_free(response);
670
                                                        }
671
                                                        g_async_queue_unref(client->messages);
672
                                                }
673
                                                g_free(client);
674
                                                janus_mutex_unlock(&clients_mutex);
675
                                                continue;
676
                                        }
677
                                        janus_mutex_unlock(&clients_mutex);
678
                                        /* If we got here, there's data to handle */
679
                                        buffer[res] = '\0';
680
                                        JANUS_LOG(LOG_VERB, "Message from client %d (%d bytes)\n", poll_fds[i].fd, res);
681
                                        JANUS_LOG(LOG_HUGE, "%s\n", buffer);
682
                                        /* Parse the JSON payload */
683
                                        json_error_t error;
684
                                        json_t *root = json_loads(buffer, 0, &error);
685
                                        /* Notify the core, passing both the object and, since it may be needed, the error */
686
                                        gateway->incoming_request(&janus_pfunix_transport, client, NULL, client->admin, root, &error);
687
                                }
688
                        }
689
                }
690
        }
691

    
692
        if(pfd > -1)
693
                close(pfd);
694
        pfd = -1;
695
        if(admin_pfd > -1)
696
                close(admin_pfd);
697
        admin_pfd = -1;
698

    
699
        g_hash_table_destroy(clients_by_path);
700
        g_hash_table_destroy(clients_by_fd);
701
        g_hash_table_destroy(clients);
702

    
703
        /* Done */
704
        JANUS_LOG(LOG_INFO, "Unix Sockets thread ended\n");
705
        return NULL;
706
}