Statistics
| Branch: | Revision:

janus-gateway / events / janus_sampleevh.c @ 84c1291c

History | View | Annotate | Download (18.4 KB)

1
/*! \file   janus_sampleevh.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus SampleEventHandler plugin
5
 * \details  This is a trivial event handler plugin for Janus, which is only
6
 * there to showcase how you can handle an event coming from the Janus core
7
 * or one of the plugins. This specific plugin forwards every event it receives
8
 * to a web server via an HTTP POST request, using libcurl.
9
 * 
10
 * \ingroup eventhandlers
11
 * \ref eventhandlers
12
 */
13

    
14
#include "eventhandler.h"
15

    
16
#include <curl/curl.h>
17

    
18
#include "../debug.h"
19
#include "../config.h"
20
#include "../mutex.h"
21
#include "../utils.h"
22

    
23

    
24
/* Plugin information */
25
#define JANUS_SAMPLEEVH_VERSION                        1
26
#define JANUS_SAMPLEEVH_VERSION_STRING        "0.0.1"
27
#define JANUS_SAMPLEEVH_DESCRIPTION                "This is a trivial sample event handler plugin for Janus, which forwards events via HTTP POST."
28
#define JANUS_SAMPLEEVH_NAME                        "JANUS SampleEventHandler plugin"
29
#define JANUS_SAMPLEEVH_AUTHOR                        "Meetecho s.r.l."
30
#define JANUS_SAMPLEEVH_PACKAGE                        "janus.eventhandler.sampleevh"
31

    
32
/* Plugin methods */
33
janus_eventhandler *create(void);
34
int janus_sampleevh_init(const char *config_path);
35
void janus_sampleevh_destroy(void);
36
int janus_sampleevh_get_api_compatibility(void);
37
int janus_sampleevh_get_version(void);
38
const char *janus_sampleevh_get_version_string(void);
39
const char *janus_sampleevh_get_description(void);
40
const char *janus_sampleevh_get_name(void);
41
const char *janus_sampleevh_get_author(void);
42
const char *janus_sampleevh_get_package(void);
43
void janus_sampleevh_incoming_event(json_t *event);
44

    
45
/* Event handler setup */
46
static janus_eventhandler janus_sampleevh =
47
        JANUS_EVENTHANDLER_INIT (
48
                .init = janus_sampleevh_init,
49
                .destroy = janus_sampleevh_destroy,
50

    
51
                .get_api_compatibility = janus_sampleevh_get_api_compatibility,
52
                .get_version = janus_sampleevh_get_version,
53
                .get_version_string = janus_sampleevh_get_version_string,
54
                .get_description = janus_sampleevh_get_description,
55
                .get_name = janus_sampleevh_get_name,
56
                .get_author = janus_sampleevh_get_author,
57
                .get_package = janus_sampleevh_get_package,
58
                
59
                .incoming_event = janus_sampleevh_incoming_event,
60

    
61
                .events_mask = JANUS_EVENT_TYPE_NONE
62
        );
63

    
64
/* Plugin creator */
65
janus_eventhandler *create(void) {
66
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_SAMPLEEVH_NAME);
67
        return &janus_sampleevh;
68
}
69

    
70

    
71
/* Useful stuff */
72
static volatile gint initialized = 0, stopping = 0;
73
static GThread *handler_thread;
74
static void *janus_sampleevh_handler(void *data);
75

    
76
/* Queue of events to handle */
77
static GAsyncQueue *events = NULL;
78
static gboolean group_events = FALSE;
79
static json_t exit_event;
80
static void janus_sampleevh_event_free(json_t *event) {
81
        if(!event || event == &exit_event)
82
                return;
83
        json_decref(event);
84
}
85

    
86
/* Web backend to send the events to */
87
static char *backend = NULL;
88
static char *backend_user = NULL, *backend_pwd = NULL;
89
static size_t janus_sampleehv_write_data(void *buffer, size_t size, size_t nmemb, void *userp) {
90
        return size*nmemb;
91
}
92

    
93
/* Plugin implementation */
94
int janus_sampleevh_init(const char *config_path) {
95
        if(g_atomic_int_get(&stopping)) {
96
                /* Still stopping from before */
97
                return -1;
98
        }
99
        if(config_path == NULL) {
100
                /* Invalid arguments */
101
                return -1;
102
        }
103

    
104
        /* Read configuration */
105
        gboolean enabled = FALSE;
106
        char filename[255];
107
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_SAMPLEEVH_PACKAGE);
108
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
109
        janus_config *config = janus_config_parse(filename);
110
        if(config != NULL) {
111
                /* Handle configuration */
112
                janus_config_print(config);
113

    
114
                /* Setup the sample event handler, if required */
115
                janus_config_item *item = janus_config_get_item_drilldown(config, "general", "enabled");
116
                if(!item || !item->value || !janus_is_true(item->value)) {
117
                        JANUS_LOG(LOG_WARN, "Sample event handler disabled (Janus API)\n");
118
                } else {
119
                        /* Backend to send events to */
120
                        item = janus_config_get_item_drilldown(config, "general", "backend");
121
                        if(!item || !item->value || strstr(item->value, "http") != item->value) {
122
                                JANUS_LOG(LOG_WARN, "Missing or invalid backend\n");
123
                        } else {
124
                                backend = g_strdup(item->value);
125
                                /* Any credentials needed? */
126
                                item = janus_config_get_item_drilldown(config, "general", "backend_user");
127
                                backend_user = (item && item->value) ? g_strdup(item->value) : NULL;
128
                                item = janus_config_get_item_drilldown(config, "general", "backend_pwd");
129
                                backend_pwd = (item && item->value) ? g_strdup(item->value) : NULL;
130
                                /* Which events should we subscribe to? */
131
                                item = janus_config_get_item_drilldown(config, "general", "events");
132
                                if(item && item->value) {
133
                                        if(!strcasecmp(item->value, "none")) {
134
                                                /* Don't subscribe to anything at all */
135
                                                janus_flags_reset(&janus_sampleevh.events_mask);
136
                                        } else if(!strcasecmp(item->value, "all")) {
137
                                                /* Subscribe to everything */
138
                                                janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_ALL);
139
                                        } else {
140
                                                /* Check what we need to subscribe to */
141
                                                gchar **subscribe = g_strsplit(item->value, ",", -1);
142
                                                if(subscribe != NULL) {
143
                                                        gchar *index = subscribe[0];
144
                                                        if(index != NULL) {
145
                                                                int i=0;
146
                                                                while(index != NULL) {
147
                                                                        while(isspace(*index))
148
                                                                                index++;
149
                                                                        if(strlen(index)) {
150
                                                                                if(!strcasecmp(index, "sessions")) {
151
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_SESSION);
152
                                                                                } else if(!strcasecmp(index, "handles")) {
153
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_HANDLE);
154
                                                                                } else if(!strcasecmp(index, "jsep")) {
155
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_JSEP);
156
                                                                                } else if(!strcasecmp(index, "webrtc")) {
157
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_WEBRTC);
158
                                                                                } else if(!strcasecmp(index, "media")) {
159
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_MEDIA);
160
                                                                                } else if(!strcasecmp(index, "plugins")) {
161
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_PLUGIN);
162
                                                                                } else if(!strcasecmp(index, "transports")) {
163
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_TRANSPORT);
164
                                                                                } else if(!strcasecmp(index, "core")) {
165
                                                                                        janus_flags_set(&janus_sampleevh.events_mask, JANUS_EVENT_TYPE_CORE);
166
                                                                                } else {
167
                                                                                        JANUS_LOG(LOG_WARN, "Unknown event type '%s'\n", index);
168
                                                                                }
169
                                                                        }
170
                                                                        i++;
171
                                                                        index = subscribe[i];
172
                                                                }
173
                                                        }
174
                                                        g_strfreev(subscribe);
175
                                                }
176
                                        }
177
                                }
178
                                /* Is grouping of events ok? */
179
                                item = janus_config_get_item_drilldown(config, "general", "grouping");
180
                                group_events = item && item->value && janus_is_true(item->value);
181
                                /* Done */
182
                                enabled = TRUE;
183
                        }
184
                }
185
        }
186

    
187
        janus_config_destroy(config);
188
        config = NULL;
189
        if(!enabled) {
190
                JANUS_LOG(LOG_FATAL, "Sample event handler not enabled/needed, giving up...\n");
191
                return -1;        /* No point in keeping the plugin loaded */
192
        }
193
        JANUS_LOG(LOG_VERB, "Sample event handler configured: %s\n", backend);
194

    
195
        /* Initialize libcurl, needed for forwarding events via HTTP POST */
196
        curl_global_init(CURL_GLOBAL_ALL);
197

    
198
        /* Initialize the events queue */
199
        events = g_async_queue_new_full((GDestroyNotify) janus_sampleevh_event_free);
200
        
201
        g_atomic_int_set(&initialized, 1);
202

    
203
        /* Launch the thread that will handle incoming events */
204
        GError *error = NULL;
205
        handler_thread = g_thread_try_new("janus sampleevh handler", janus_sampleevh_handler, NULL, &error);
206
        if(error != NULL) {
207
                g_atomic_int_set(&initialized, 0);
208
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the SampleEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
209
                return -1;
210
        }
211
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_SAMPLEEVH_NAME);
212
        return 0;
213
}
214

    
215
void janus_sampleevh_destroy(void) {
216
        if(!g_atomic_int_get(&initialized))
217
                return;
218
        g_atomic_int_set(&stopping, 1);
219

    
220
        g_async_queue_push(events, &exit_event);
221
        if(handler_thread != NULL) {
222
                g_thread_join(handler_thread);
223
                handler_thread = NULL;
224
        }
225

    
226
        g_async_queue_unref(events);
227
        events = NULL;
228

    
229
        g_free(backend);
230

    
231
        g_atomic_int_set(&initialized, 0);
232
        g_atomic_int_set(&stopping, 0);
233
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_SAMPLEEVH_NAME);
234
}
235

    
236
int janus_sampleevh_get_api_compatibility(void) {
237
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
238
        return JANUS_EVENTHANDLER_API_VERSION;
239
}
240

    
241
int janus_sampleevh_get_version(void) {
242
        return JANUS_SAMPLEEVH_VERSION;
243
}
244

    
245
const char *janus_sampleevh_get_version_string(void) {
246
        return JANUS_SAMPLEEVH_VERSION_STRING;
247
}
248

    
249
const char *janus_sampleevh_get_description(void) {
250
        return JANUS_SAMPLEEVH_DESCRIPTION;
251
}
252

    
253
const char *janus_sampleevh_get_name(void) {
254
        return JANUS_SAMPLEEVH_NAME;
255
}
256

    
257
const char *janus_sampleevh_get_author(void) {
258
        return JANUS_SAMPLEEVH_AUTHOR;
259
}
260

    
261
const char *janus_sampleevh_get_package(void) {
262
        return JANUS_SAMPLEEVH_PACKAGE;
263
}
264

    
265
void janus_sampleevh_incoming_event(json_t *event) {
266
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
267
                /* Janus is closing or the plugin is: unref the event as we won't handle it */
268
                json_decref(event);
269
                return;
270
        }
271

    
272
        /* Do NOT handle the event here in this callback! Since Janus notifies you right
273
         * away when something happens, these events are triggered from working threads and
274
         * not some sort of message bus. As such, performing I/O or network operations in
275
         * here could dangerously slow Janus down. Let's just reference and enqueue the event,
276
         * and handle it in our own thread: the event contains a monotonic time indicator of
277
         * when the event actually happened on this machine, so that, if relevant, we can compute
278
         * any delay in the actual event processing ourselves. */
279
        json_incref(event);
280
        g_async_queue_push(events, event);
281

    
282
}
283

    
284

    
285
/* Thread to handle incoming events */
286
static void *janus_sampleevh_handler(void *data) {
287
        JANUS_LOG(LOG_VERB, "Joining SampleEventHandler handler thread\n");
288
        json_t *event = NULL, *output = NULL;
289
        int count = 0, max = group_events ? 100 : 1;
290
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
291
                event = g_async_queue_pop(events);
292
                if(event == NULL)
293
                        continue;
294
                if(event == &exit_event)
295
                        break;
296
                count = 0;
297
                output = NULL;
298

    
299
                while(TRUE) {
300
                        /* Handle event: just for fun, let's see how long it took for us to take care of this */
301
                        json_t *created = json_object_get(event, "timestamp");
302
                        if(created && json_is_integer(created)) {
303
                                gint64 then = json_integer_value(created);
304
                                gint64 now = janus_get_monotonic_time();
305
                                JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
306
                        }
307

    
308
                        /* Let's check what kind of event this is: we don't really do anything
309
                         * with it in this plugin, it's just to show how you can handle
310
                         * different types of events in an event handler. */
311
                        int type = json_integer_value(json_object_get(event, "type"));
312
                        switch(type) {
313
                                case JANUS_EVENT_TYPE_SESSION:
314
                                        /* This is a session related event. The only info that is
315
                                         * provided is a name for the event itself. Here's an example
316
                                         * of a new session being created:
317
                                                {
318
                                                   "type": 1,
319
                                                   "timestamp": 3583879627,
320
                                                   "session_id": 2004798115,
321
                                                   "event": {
322
                                                          "name": "created"
323
                                                   }
324
                                                }
325
                                        */
326
                                        break;
327
                                case JANUS_EVENT_TYPE_HANDLE:
328
                                        /* This is a handle related event. The only info that is provided
329
                                         * are the name for the event itself and the package name of the
330
                                         * plugin this handle refers to (e.g., "janus.plugin.echotest").
331
                                         * Here's an example of a new handled being attached in a session
332
                                         * to the EchoTest plugin:
333
                                                {
334
                                                   "type": 2,
335
                                                   "timestamp": 3570304977,
336
                                                   "session_id": 2004798115,
337
                                                   "handle_id": 3708519405,
338
                                                   "event": {
339
                                                          "name": "attached",
340
                                                          "plugin: "janus.plugin.echotest"
341
                                                   }
342
                                                }
343
                                        */
344
                                        break;
345
                                case JANUS_EVENT_TYPE_JSEP:
346
                                        /* This is a JSEP/SDP related event. It provides information
347
                                         * about an ongoing WebRTC negotiation, and so tells you
348
                                         * about the SDP being sent/received, and who's sending it
349
                                         * ("local" means Janus, "remote" means the user). Here's an
350
                                         * example, where the user originated an offer towards Janus:
351
                                                {
352
                                                   "type": 8,
353
                                                   "timestamp": 3570400208,
354
                                                   "session_id": 2004798115,
355
                                                   "handle_id": 3708519405,
356
                                                   "event": {
357
                                                          "owner": "remote",
358
                                                          "jsep": {
359
                                                                 "type": "offer",
360
                                                                 "sdp": "v=0[..]\r\n"
361
                                                          }
362
                                                   }
363
                                                }
364
                                        */
365
                                        break;
366
                                case JANUS_EVENT_TYPE_WEBRTC:
367
                                        /* This is a WebRTC related event, and so the content of
368
                                         * the event may vary quite a bit. In fact, you may be notified
369
                                         * about ICE or DTLS states, or when a WebRTC PeerConnection
370
                                         * goes up or down. Here are some examples, in no particular order:
371
                                                {
372
                                                   "type": 16,
373
                                                   "timestamp": 3570416659,
374
                                                   "session_id": 2004798115,
375
                                                   "handle_id": 3708519405,
376
                                                   "event": {
377
                                                          "ice": "connecting",
378
                                                          "stream_id": 1,
379
                                                          "component_id": 1
380
                                                   }
381
                                                }
382
                                         *
383
                                                {
384
                                                   "type": 16,
385
                                                   "timestamp": 3570637554,
386
                                                   "session_id": 2004798115,
387
                                                   "handle_id": 3708519405,
388
                                                   "event": {
389
                                                          "selected-pair": "[..]",
390
                                                          "stream_id": 1,
391
                                                          "component_id": 1
392
                                                   }
393
                                                }
394
                                         *
395
                                                {
396
                                                   "type": 16,
397
                                                   "timestamp": 3570656112,
398
                                                   "session_id": 2004798115,
399
                                                   "handle_id": 3708519405,
400
                                                   "event": {
401
                                                          "dtls": "connected",
402
                                                          "stream_id": 1,
403
                                                          "component_id": 1
404
                                                   }
405
                                                }
406
                                         *
407
                                                {
408
                                                   "type": 16,
409
                                                   "timestamp": 3570657237,
410
                                                   "session_id": 2004798115,
411
                                                   "handle_id": 3708519405,
412
                                                   "event": {
413
                                                          "connection": "webrtcup"
414
                                                   }
415
                                                }
416
                                        */
417
                                        break;
418
                                case JANUS_EVENT_TYPE_MEDIA:
419
                                        /* This is a media related event. This can contain different
420
                                         * information about the health of a media session, or about
421
                                         * what's going on in general (e.g., when Janus started/stopped
422
                                         * receiving media of a certain type, or (TODO) when some media related
423
                                         * statistics are available). Here's an example of Janus getting
424
                                         * video from the peer for the first time, or after a second
425
                                         * of no video at all (which would have triggered a "receiving": false):
426
                                                {
427
                                                   "type": 32,
428
                                                   "timestamp": 3571078797,
429
                                                   "session_id": 2004798115,
430
                                                   "handle_id": 3708519405,
431
                                                   "event": {
432
                                                          "media": "video",
433
                                                          "receiving": "true"
434
                                                   }
435
                                                }
436
                                        */
437
                                        break;
438
                                case JANUS_EVENT_TYPE_PLUGIN:
439
                                        /* This is a plugin related event. Since each plugin may
440
                                         * provide info in a very custom way, the format of this event
441
                                         * is in general very dynamic. You'll always find, though,
442
                                         * an "event" object containing the package name of the
443
                                         * plugin (e.g., "janus.plugin.echotest") and a "data"
444
                                         * object that contains whatever the plugin decided to
445
                                         * notify you about, that will always vary from plugin to
446
                                         * plugin. Here's an example:
447
                                                {
448
                                                   "type": 64,
449
                                                   "timestamp": 3570336031,
450
                                                   "session_id": 2004798115,
451
                                                   "handle_id": 3708519405,
452
                                                   "event": {
453
                                                          "plugin": "janus.plugin.echotest",
454
                                                          "data": {
455
                                                                 "audio_active": "true",
456
                                                                 "video_active": "true",
457
                                                                 "bitrate": 0
458
                                                          }
459
                                                   }
460
                                                }
461
                                        */
462
                                        break;
463
                                case JANUS_EVENT_TYPE_TRANSPORT:
464
                                        /* This is a transport related event (TODO). The syntax of
465
                                         * the common format (transport specific data aside) is
466
                                         * exactly the same as that of the plugin related events
467
                                         * above, with a "transport" property instead of "plugin"
468
                                         * to contain the transport package name. */
469
                                        break;
470
                                case JANUS_EVENT_TYPE_CORE:
471
                                        /* This is a core related event. This can contain different
472
                                         * information about the health of the Janus instance, or
473
                                         * more generically on some events in the Janus life cycle
474
                                         * (e.g., when it's just been started or when a shutdown
475
                                         * has been requested). Considering the heterogeneous nature
476
                                         * of the information being reported, the content is always
477
                                         * a JSON object (event). Core events are the only ones
478
                                         * missing a session_id. Here's an example:
479
                                                {
480
                                                   "type": 256,
481
                                                   "timestamp": 28381185382,
482
                                                   "event": {
483
                                                          "status": "started"
484
                                                   }
485
                                                }
486
                                        */
487
                                        break;
488
                                default:
489
                                        JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
490
                                        break;
491
                        }
492
                        if(!group_events) {
493
                                /* We're done here, we just need a single event */
494
                                output = event;
495
                                break;
496
                        }
497
                        /* If we got here, we're grouping */
498
                        if(output == NULL)
499
                                output = json_array();
500
                        json_array_append_new(output, event);
501
                        /* Never group more than a maximum number of events, though, or we might stay here forever */
502
                        count++;
503
                        if(count == max)
504
                                break;
505
                        event = g_async_queue_try_pop(events);
506
                        if(event == NULL || event == &exit_event)
507
                                break;
508
                }
509

    
510
                /* Since this a simple plugin, it does the same for all events: so just convert to string... */
511
                char *event_text = json_dumps(output, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
512
                /* ... and send via HTTP POST */
513
                CURLcode res;
514
                CURL *curl = curl_easy_init();
515
                if(curl == NULL) {
516
                        JANUS_LOG(LOG_ERR, "Error initializing CURL context\n");
517
                        goto done;
518
                }
519
                curl_easy_setopt(curl, CURLOPT_URL, backend);
520
                struct curl_slist *headers = NULL;
521
                headers = curl_slist_append(headers, "Accept: application/json");
522
                headers = curl_slist_append(headers, "Content-Type: application/json");
523
                headers = curl_slist_append(headers, "charsets: utf-8");
524
                curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
525
                curl_easy_setopt(curl, CURLOPT_POSTFIELDS, event_text);
526
                curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, janus_sampleehv_write_data);
527
                /* Any credentials? */
528
                if(backend_user != NULL && backend_pwd != NULL) {
529
                        curl_easy_setopt(curl, CURLOPT_USERNAME, backend_user);
530
                        curl_easy_setopt(curl, CURLOPT_PASSWORD, backend_pwd);
531
                }
532
                /* Don't wait forever (let's say, 10 seconds) */
533
                curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L);
534
                /* Send the request */
535
                res = curl_easy_perform(curl);
536
                if(res != CURLE_OK) {
537
                        JANUS_LOG(LOG_ERR, "Couldn't relay event to the backend: %s\n", curl_easy_strerror(res));
538
                } else {
539
                        JANUS_LOG(LOG_DBG, "Event sent!\n");
540
                }
541
done:
542
                /* Cleanup */
543
                if(curl)
544
                        curl_easy_cleanup(curl);
545
                g_free(event_text);
546

    
547
                /* Done, let's unref the event */
548
                json_decref(output);
549
        }
550
        JANUS_LOG(LOG_VERB, "Leaving SampleEventHandler handler thread\n");
551
        return NULL;
552
}