Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_echotest.c @ 71a04f89

History | View | Annotate | Download (37.7 KB)

1
/*! \file   janus_echotest.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus EchoTest plugin
5
 * \details  This is a trivial EchoTest plugin for Janus, just used to
6
 * showcase the plugin interface. A peer attaching to this plugin will
7
 * receive back the same RTP packets and RTCP messages he sends: the
8
 * RTCP messages, of course, would be modified on the way by the gateway
9
 * to make sure they are coherent with the involved SSRCs. In order to
10
 * demonstrate how peer-provided messages can change the behaviour of a
11
 * plugin, this plugin implements a simple API based on three messages:
12
 * 
13
 * 1. a message to enable/disable audio (that is, to tell the plugin
14
 * whether incoming audio RTP packets need to be sent back or discarded);
15
 * 2. a message to enable/disable video (that is, to tell the plugin
16
 * whether incoming video RTP packets need to be sent back or discarded);
17
 * 3. a message to cap the bitrate (which would modify incoming RTCP
18
 * REMB messages before sending them back, in order to trick the peer into
19
 * thinking the available bandwidth is different).
20
 * 
21
 * \section echoapi Echo Test API
22
 * 
23
 * There's a single unnamed request you can send and it's asynchronous,
24
 * which means all responses (successes and errors) will be delivered
25
 * as events with the same transaction. 
26
 * 
27
 * The request has to be formatted as follows. All the attributes are
28
 * optional, so any request can contain a subset of them:
29
 *
30
\verbatim
31
{
32
        "audio" : true|false,
33
        "video" : true|false,
34
        "bitrate" : <numeric bitrate value>,
35
        "record" : true|false,
36
        "filename" : <base path/filename to use for the recording>
37
}
38
\endverbatim
39
 *
40
 * \c audio instructs the plugin to do or do not bounce back audio
41
 * frames; \c video does the same for video; \c bitrate caps the
42
 * bandwidth to force on the browser encoding side (e.g., 128000 for
43
 * 128kbps).
44
 * 
45
 * The first request must be sent together with a JSEP offer to
46
 * negotiate a PeerConnection: a JSEP answer will be provided with
47
 * the asynchronous response notification. Subsequent requests (e.g., to
48
 * dynamically manipulate the bitrate while testing) have to be sent
49
 * without any JSEP payload attached.
50
 * 
51
 * A successful request will result in an \c ok event:
52
 * 
53
\verbatim
54
{
55
        "echotest" : "event",
56
        "result": "ok"
57
}
58
\endverbatim
59
 * 
60
 * An error instead will provide both an error code and a more verbose
61
 * description of the cause of the issue:
62
 * 
63
\verbatim
64
{
65
        "echotest" : "event",
66
        "error_code" : <numeric ID, check Macros below>,
67
        "error" : "<error description as a string>"
68
}
69
\endverbatim
70
 *
71
 * If the plugin detects a loss of the associated PeerConnection, a
72
 * "done" notification is triggered to inform the application the Echo
73
 * Test session is over:
74
 * 
75
\verbatim
76
{
77
        "echotest" : "event",
78
        "result": "done"
79
}
80
\endverbatim
81
 *
82
 * \ingroup plugins
83
 * \ref plugins
84
 */
85

    
86
#include "plugin.h"
87

    
88
#include <jansson.h>
89

    
90
#include "../debug.h"
91
#include "../apierror.h"
92
#include "../config.h"
93
#include "../mutex.h"
94
#include "../record.h"
95
#include "../rtcp.h"
96
#include "../utils.h"
97

    
98

    
99
/* Plugin information */
100
#define JANUS_ECHOTEST_VERSION                        6
101
#define JANUS_ECHOTEST_VERSION_STRING        "0.0.6"
102
#define JANUS_ECHOTEST_DESCRIPTION                "This is a trivial EchoTest plugin for Janus, just used to showcase the plugin interface."
103
#define JANUS_ECHOTEST_NAME                                "JANUS EchoTest plugin"
104
#define JANUS_ECHOTEST_AUTHOR                        "Meetecho s.r.l."
105
#define JANUS_ECHOTEST_PACKAGE                        "janus.plugin.echotest"
106

    
107
/* Plugin methods */
108
janus_plugin *create(void);
109
int janus_echotest_init(janus_callbacks *callback, const char *config_path);
110
void janus_echotest_destroy(void);
111
int janus_echotest_get_api_compatibility(void);
112
int janus_echotest_get_version(void);
113
const char *janus_echotest_get_version_string(void);
114
const char *janus_echotest_get_description(void);
115
const char *janus_echotest_get_name(void);
116
const char *janus_echotest_get_author(void);
117
const char *janus_echotest_get_package(void);
118
void janus_echotest_create_session(janus_plugin_session *handle, int *error);
119
struct janus_plugin_result *janus_echotest_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
120
void janus_echotest_setup_media(janus_plugin_session *handle);
121
void janus_echotest_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
122
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
123
void janus_echotest_incoming_data(janus_plugin_session *handle, char *buf, int len);
124
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video);
125
void janus_echotest_hangup_media(janus_plugin_session *handle);
126
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error);
127
char *janus_echotest_query_session(janus_plugin_session *handle);
128

    
129
/* Plugin setup */
130
static janus_plugin janus_echotest_plugin =
131
        JANUS_PLUGIN_INIT (
132
                .init = janus_echotest_init,
133
                .destroy = janus_echotest_destroy,
134

    
135
                .get_api_compatibility = janus_echotest_get_api_compatibility,
136
                .get_version = janus_echotest_get_version,
137
                .get_version_string = janus_echotest_get_version_string,
138
                .get_description = janus_echotest_get_description,
139
                .get_name = janus_echotest_get_name,
140
                .get_author = janus_echotest_get_author,
141
                .get_package = janus_echotest_get_package,
142
                
143
                .create_session = janus_echotest_create_session,
144
                .handle_message = janus_echotest_handle_message,
145
                .setup_media = janus_echotest_setup_media,
146
                .incoming_rtp = janus_echotest_incoming_rtp,
147
                .incoming_rtcp = janus_echotest_incoming_rtcp,
148
                .incoming_data = janus_echotest_incoming_data,
149
                .slow_link = janus_echotest_slow_link,
150
                .hangup_media = janus_echotest_hangup_media,
151
                .destroy_session = janus_echotest_destroy_session,
152
                .query_session = janus_echotest_query_session,
153
        );
154

    
155
/* Plugin creator */
156
janus_plugin *create(void) {
157
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_ECHOTEST_NAME);
158
        return &janus_echotest_plugin;
159
}
160

    
161

    
162
/* Useful stuff */
163
static volatile gint initialized = 0, stopping = 0;
164
static gboolean notify_events = TRUE;
165
static janus_callbacks *gateway = NULL;
166
static GThread *handler_thread;
167
static GThread *watchdog;
168
static void *janus_echotest_handler(void *data);
169

    
170
typedef struct janus_echotest_message {
171
        janus_plugin_session *handle;
172
        char *transaction;
173
        char *message;
174
        char *sdp_type;
175
        char *sdp;
176
} janus_echotest_message;
177
static GAsyncQueue *messages = NULL;
178
static janus_echotest_message exit_message;
179

    
180
typedef struct janus_echotest_session {
181
        janus_plugin_session *handle;
182
        gboolean has_audio;
183
        gboolean has_video;
184
        gboolean has_data;
185
        gboolean audio_active;
186
        gboolean video_active;
187
        uint64_t bitrate;
188
        janus_recorder *arc;        /* The Janus recorder instance for this user's audio, if enabled */
189
        janus_recorder *vrc;        /* The Janus recorder instance for this user's video, if enabled */
190
        janus_recorder *drc;        /* The Janus recorder instance for this user's data, if enabled */
191
        janus_mutex rec_mutex;        /* Mutex to protect the recorders from race conditions */
192
        guint16 slowlink_count;
193
        volatile gint hangingup;
194
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
195
} janus_echotest_session;
196
static GHashTable *sessions;
197
static GList *old_sessions;
198
static janus_mutex sessions_mutex;
199

    
200
static void janus_echotest_message_free(janus_echotest_message *msg) {
201
        if(!msg || msg == &exit_message)
202
                return;
203

    
204
        msg->handle = NULL;
205

    
206
        g_free(msg->transaction);
207
        msg->transaction = NULL;
208
        g_free(msg->message);
209
        msg->message = NULL;
210
        g_free(msg->sdp_type);
211
        msg->sdp_type = NULL;
212
        g_free(msg->sdp);
213
        msg->sdp = NULL;
214

    
215
        g_free(msg);
216
}
217

    
218

    
219
/* Error codes */
220
#define JANUS_ECHOTEST_ERROR_NO_MESSAGE                        411
221
#define JANUS_ECHOTEST_ERROR_INVALID_JSON                412
222
#define JANUS_ECHOTEST_ERROR_INVALID_ELEMENT        413
223

    
224

    
225
/* EchoTest watchdog/garbage collector (sort of) */
226
void *janus_echotest_watchdog(void *data);
227
void *janus_echotest_watchdog(void *data) {
228
        JANUS_LOG(LOG_INFO, "EchoTest watchdog started\n");
229
        gint64 now = 0;
230
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
231
                janus_mutex_lock(&sessions_mutex);
232
                /* Iterate on all the sessions */
233
                now = janus_get_monotonic_time();
234
                if(old_sessions != NULL) {
235
                        GList *sl = old_sessions;
236
                        JANUS_LOG(LOG_HUGE, "Checking %d old EchoTest sessions...\n", g_list_length(old_sessions));
237
                        while(sl) {
238
                                janus_echotest_session *session = (janus_echotest_session *)sl->data;
239
                                if(!session) {
240
                                        sl = sl->next;
241
                                        continue;
242
                                }
243
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
244
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
245
                                        JANUS_LOG(LOG_VERB, "Freeing old EchoTest session\n");
246
                                        GList *rm = sl->next;
247
                                        old_sessions = g_list_delete_link(old_sessions, sl);
248
                                        sl = rm;
249
                                        session->handle = NULL;
250
                                        g_free(session);
251
                                        session = NULL;
252
                                        continue;
253
                                }
254
                                sl = sl->next;
255
                        }
256
                }
257
                janus_mutex_unlock(&sessions_mutex);
258
                g_usleep(500000);
259
        }
260
        JANUS_LOG(LOG_INFO, "EchoTest watchdog stopped\n");
261
        return NULL;
262
}
263

    
264

    
265
/* Plugin implementation */
266
int janus_echotest_init(janus_callbacks *callback, const char *config_path) {
267
        if(g_atomic_int_get(&stopping)) {
268
                /* Still stopping from before */
269
                return -1;
270
        }
271
        if(callback == NULL || config_path == NULL) {
272
                /* Invalid arguments */
273
                return -1;
274
        }
275

    
276
        /* Read configuration */
277
        char filename[255];
278
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_ECHOTEST_PACKAGE);
279
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
280
        janus_config *config = janus_config_parse(filename);
281
        if(config != NULL) {
282
                janus_config_print(config);
283
                janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
284
                if(events != NULL && events->value != NULL)
285
                        notify_events = janus_is_true(events->value);
286
                if(!notify_events && callback->events_is_enabled()) {
287
                        JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_ECHOTEST_NAME);
288
                }
289
        }
290
        janus_config_destroy(config);
291
        config = NULL;
292
        
293
        sessions = g_hash_table_new(NULL, NULL);
294
        janus_mutex_init(&sessions_mutex);
295
        messages = g_async_queue_new_full((GDestroyNotify) janus_echotest_message_free);
296
        /* This is the callback we'll need to invoke to contact the gateway */
297
        gateway = callback;
298
        g_atomic_int_set(&initialized, 1);
299

    
300
        GError *error = NULL;
301
        /* Start the sessions watchdog */
302
        watchdog = g_thread_try_new("etest watchdog", &janus_echotest_watchdog, NULL, &error);
303
        if(error != NULL) {
304
                g_atomic_int_set(&initialized, 0);
305
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest watchdog thread...\n", error->code, error->message ? error->message : "??");
306
                return -1;
307
        }
308
        /* Launch the thread that will handle incoming messages */
309
        handler_thread = g_thread_try_new("janus echotest handler", janus_echotest_handler, NULL, &error);
310
        if(error != NULL) {
311
                g_atomic_int_set(&initialized, 0);
312
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest handler thread...\n", error->code, error->message ? error->message : "??");
313
                return -1;
314
        }
315
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_ECHOTEST_NAME);
316
        return 0;
317
}
318

    
319
void janus_echotest_destroy(void) {
320
        if(!g_atomic_int_get(&initialized))
321
                return;
322
        g_atomic_int_set(&stopping, 1);
323

    
324
        g_async_queue_push(messages, &exit_message);
325
        if(handler_thread != NULL) {
326
                g_thread_join(handler_thread);
327
                handler_thread = NULL;
328
        }
329
        if(watchdog != NULL) {
330
                g_thread_join(watchdog);
331
                watchdog = NULL;
332
        }
333

    
334
        /* FIXME We should destroy the sessions cleanly */
335
        janus_mutex_lock(&sessions_mutex);
336
        g_hash_table_destroy(sessions);
337
        janus_mutex_unlock(&sessions_mutex);
338
        g_async_queue_unref(messages);
339
        messages = NULL;
340
        sessions = NULL;
341

    
342
        g_atomic_int_set(&initialized, 0);
343
        g_atomic_int_set(&stopping, 0);
344
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_ECHOTEST_NAME);
345
}
346

    
347
int janus_echotest_get_api_compatibility(void) {
348
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
349
        return JANUS_PLUGIN_API_VERSION;
350
}
351

    
352
int janus_echotest_get_version(void) {
353
        return JANUS_ECHOTEST_VERSION;
354
}
355

    
356
const char *janus_echotest_get_version_string(void) {
357
        return JANUS_ECHOTEST_VERSION_STRING;
358
}
359

    
360
const char *janus_echotest_get_description(void) {
361
        return JANUS_ECHOTEST_DESCRIPTION;
362
}
363

    
364
const char *janus_echotest_get_name(void) {
365
        return JANUS_ECHOTEST_NAME;
366
}
367

    
368
const char *janus_echotest_get_author(void) {
369
        return JANUS_ECHOTEST_AUTHOR;
370
}
371

    
372
const char *janus_echotest_get_package(void) {
373
        return JANUS_ECHOTEST_PACKAGE;
374
}
375

    
376
void janus_echotest_create_session(janus_plugin_session *handle, int *error) {
377
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
378
                *error = -1;
379
                return;
380
        }        
381
        janus_echotest_session *session = (janus_echotest_session *)g_malloc0(sizeof(janus_echotest_session));
382
        if(session == NULL) {
383
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
384
                *error = -2;
385
                return;
386
        }
387
        session->handle = handle;
388
        session->has_audio = FALSE;
389
        session->has_video = FALSE;
390
        session->has_data = FALSE;
391
        session->audio_active = TRUE;
392
        session->video_active = TRUE;
393
        janus_mutex_init(&session->rec_mutex);
394
        session->bitrate = 0;        /* No limit */
395
        session->destroyed = 0;
396
        g_atomic_int_set(&session->hangingup, 0);
397
        handle->plugin_handle = session;
398
        janus_mutex_lock(&sessions_mutex);
399
        g_hash_table_insert(sessions, handle, session);
400
        janus_mutex_unlock(&sessions_mutex);
401

    
402
        return;
403
}
404

    
405
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error) {
406
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
407
                *error = -1;
408
                return;
409
        }        
410
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
411
        if(!session) {
412
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
413
                *error = -2;
414
                return;
415
        }
416
        JANUS_LOG(LOG_VERB, "Removing Echo Test session...\n");
417
        janus_mutex_lock(&sessions_mutex);
418
        if(!session->destroyed) {
419
                session->destroyed = janus_get_monotonic_time();
420
                g_hash_table_remove(sessions, handle);
421
                /* Cleaning up and removing the session is done in a lazy way */
422
                old_sessions = g_list_append(old_sessions, session);
423
        }
424
        janus_mutex_unlock(&sessions_mutex);
425
        return;
426
}
427

    
428
char *janus_echotest_query_session(janus_plugin_session *handle) {
429
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
430
                return NULL;
431
        }        
432
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
433
        if(!session) {
434
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
435
                return NULL;
436
        }
437
        /* In the echo test, every session is the same: we just provide some configure info */
438
        json_t *info = json_object();
439
        json_object_set_new(info, "audio_active", json_string(session->audio_active ? "true" : "false"));
440
        json_object_set_new(info, "video_active", json_string(session->video_active ? "true" : "false"));
441
        json_object_set_new(info, "bitrate", json_integer(session->bitrate));
442
        if(session->arc || session->vrc || session->drc) {
443
                json_t *recording = json_object();
444
                if(session->arc && session->arc->filename)
445
                        json_object_set_new(recording, "audio", json_string(session->arc->filename));
446
                if(session->vrc && session->vrc->filename)
447
                        json_object_set_new(recording, "video", json_string(session->vrc->filename));
448
                if(session->drc && session->drc->filename)
449
                        json_object_set_new(recording, "data", json_string(session->drc->filename));
450
                json_object_set_new(info, "recording", recording);
451
        }
452
        json_object_set_new(info, "slowlink_count", json_integer(session->slowlink_count));
453
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
454
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
455
        json_decref(info);
456
        return info_text;
457
}
458

    
459
struct janus_plugin_result *janus_echotest_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
460
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
461
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
462
        janus_echotest_message *msg = g_malloc0(sizeof(janus_echotest_message));
463
        if(msg == NULL) {
464
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
465
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
466
        }
467
        msg->handle = handle;
468
        msg->transaction = transaction;
469
        msg->message = message;
470
        msg->sdp_type = sdp_type;
471
        msg->sdp = sdp;
472
        g_async_queue_push(messages, msg);
473

    
474
        /* All the requests to this plugin are handled asynchronously */
475
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, "I'm taking my time!");
476
}
477

    
478
void janus_echotest_setup_media(janus_plugin_session *handle) {
479
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
480
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
481
                return;
482
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
483
        if(!session) {
484
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
485
                return;
486
        }
487
        if(session->destroyed)
488
                return;
489
        g_atomic_int_set(&session->hangingup, 0);
490
        /* We really don't care, as we only send RTP/RTCP we get in the first place back anyway */
491
}
492

    
493
void janus_echotest_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
494
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
495
                return;
496
        /* Simple echo test */
497
        if(gateway) {
498
                /* Honour the audio/video active flags */
499
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
500
                if(!session) {
501
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
502
                        return;
503
                }
504
                if(session->destroyed)
505
                        return;
506
                if((!video && session->audio_active) || (video && session->video_active)) {
507
                        /* Save the frame if we're recording */
508
                        janus_recorder_save_frame(video ? session->vrc : session->arc, buf, len);
509
                        /* Send the frame back */
510
                        gateway->relay_rtp(handle, video, buf, len);
511
                }
512
        }
513
}
514

    
515
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
516
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
517
                return;
518
        /* Simple echo test */
519
        if(gateway) {
520
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
521
                if(!session) {
522
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
523
                        return;
524
                }
525
                if(session->destroyed)
526
                        return;
527
                if(session->bitrate > 0)
528
                        janus_rtcp_cap_remb(buf, len, session->bitrate);
529
                gateway->relay_rtcp(handle, video, buf, len);
530
        }
531
}
532

    
533
void janus_echotest_incoming_data(janus_plugin_session *handle, char *buf, int len) {
534
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
535
                return;
536
        /* Simple echo test */
537
        if(gateway) {
538
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
539
                if(!session) {
540
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
541
                        return;
542
                }
543
                if(session->destroyed)
544
                        return;
545
                if(buf == NULL || len <= 0)
546
                        return;
547
                char *text = g_malloc0(len+1);
548
                memcpy(text, buf, len);
549
                *(text+len) = '\0';
550
                JANUS_LOG(LOG_VERB, "Got a DataChannel message (%zu bytes) to bounce back: %s\n", strlen(text), text);
551
                /* Save the frame if we're recording */
552
                janus_recorder_save_frame(session->drc, text, strlen(text));
553
                /* We send back the same text with a custom prefix */
554
                const char *prefix = "Janus EchoTest here! You wrote: ";
555
                char *reply = g_malloc0(strlen(prefix)+len+1);
556
                g_snprintf(reply, strlen(prefix)+len+1, "%s%s", prefix, text);
557
                g_free(text);
558
                gateway->relay_data(handle, reply, strlen(reply));
559
                g_free(reply);
560
        }
561
}
562

    
563
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video) {
564
        /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */
565
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
566
                return;
567
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
568
        if(!session) {
569
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
570
                return;
571
        }
572
        if(session->destroyed)
573
                return;
574
        session->slowlink_count++;
575
        if(uplink && !video && !session->audio_active) {
576
                /* We're not relaying audio and the peer is expecting it, so NACKs are normal */
577
                JANUS_LOG(LOG_VERB, "Getting a lot of NACKs (slow uplink) for audio, but that's expected, a configure disabled the audio forwarding\n");
578
        } else if(uplink && video && !session->video_active) {
579
                /* We're not relaying video and the peer is expecting it, so NACKs are normal */
580
                JANUS_LOG(LOG_VERB, "Getting a lot of NACKs (slow uplink) for video, but that's expected, a configure disabled the video forwarding\n");
581
        } else {
582
                /* Slow uplink or downlink, maybe we set the bitrate cap too high? */
583
                if(video) {
584
                        /* Halve the bitrate, but don't go too low... */
585
                        session->bitrate = session->bitrate > 0 ? session->bitrate : 512*1024;
586
                        session->bitrate = session->bitrate/2;
587
                        if(session->bitrate < 64*1024)
588
                                session->bitrate = 64*1024;
589
                        JANUS_LOG(LOG_WARN, "Getting a lot of NACKs (slow %s) for %s, forcing a lower REMB: %"SCNu64"\n",
590
                                uplink ? "uplink" : "downlink", video ? "video" : "audio", session->bitrate);
591
                        /* ... and send a new REMB back */
592
                        char rtcpbuf[24];
593
                        janus_rtcp_remb((char *)(&rtcpbuf), 24, session->bitrate);
594
                        gateway->relay_rtcp(handle, 1, rtcpbuf, 24);
595
                        /* As a last thing, notify the user about this */
596
                        json_t *event = json_object();
597
                        json_object_set_new(event, "echotest", json_string("event"));
598
                        json_t *result = json_object();
599
                        json_object_set_new(result, "status", json_string("slow_link"));
600
                        json_object_set_new(result, "bitrate", json_integer(session->bitrate));
601
                        json_object_set_new(event, "result", result);
602
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
603
                        json_decref(event);
604
                        event = NULL;
605
                        gateway->push_event(session->handle, &janus_echotest_plugin, NULL, event_text, NULL, NULL);
606
                        g_free(event_text);
607
                }
608
        }
609
}
610

    
611
void janus_echotest_hangup_media(janus_plugin_session *handle) {
612
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
613
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
614
                return;
615
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
616
        if(!session) {
617
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
618
                return;
619
        }
620
        if(session->destroyed)
621
                return;
622
        if(g_atomic_int_add(&session->hangingup, 1))
623
                return;
624
        /* Send an event to the browser and tell it's over */
625
        json_t *event = json_object();
626
        json_object_set_new(event, "echotest", json_string("event"));
627
        json_object_set_new(event, "result", json_string("done"));
628
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
629
        json_decref(event);
630
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
631
        int ret = gateway->push_event(handle, &janus_echotest_plugin, NULL, event_text, NULL, NULL);
632
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
633
        g_free(event_text);
634
        /* Get rid of the recorders, if available */
635
        janus_mutex_lock(&session->rec_mutex);
636
        if(session->arc) {
637
                janus_recorder_close(session->arc);
638
                JANUS_LOG(LOG_INFO, "Closed audio recording %s\n", session->arc->filename ? session->arc->filename : "??");
639
                janus_recorder_free(session->arc);
640
        }
641
        session->arc = NULL;
642
        if(session->vrc) {
643
                janus_recorder_close(session->vrc);
644
                JANUS_LOG(LOG_INFO, "Closed video recording %s\n", session->vrc->filename ? session->vrc->filename : "??");
645
                janus_recorder_free(session->vrc);
646
        }
647
        session->vrc = NULL;
648
        if(session->drc) {
649
                janus_recorder_close(session->drc);
650
                JANUS_LOG(LOG_INFO, "Closed data recording %s\n", session->drc->filename ? session->drc->filename : "??");
651
                janus_recorder_free(session->drc);
652
        }
653
        session->drc = NULL;
654
        janus_mutex_unlock(&session->rec_mutex);
655
        /* Reset controls */
656
        session->has_audio = FALSE;
657
        session->has_video = FALSE;
658
        session->has_data = FALSE;
659
        session->audio_active = TRUE;
660
        session->video_active = TRUE;
661
        session->bitrate = 0;
662
}
663

    
664
/* Thread to handle incoming messages */
665
static void *janus_echotest_handler(void *data) {
666
        JANUS_LOG(LOG_VERB, "Joining EchoTest handler thread\n");
667
        janus_echotest_message *msg = NULL;
668
        int error_code = 0;
669
        char *error_cause = g_malloc0(512);
670
        if(error_cause == NULL) {
671
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
672
                return NULL;
673
        }
674
        json_t *root = NULL;
675
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
676
                msg = g_async_queue_pop(messages);
677
                if(msg == NULL)
678
                        continue;
679
                if(msg == &exit_message)
680
                        break;
681
                if(msg->handle == NULL) {
682
                        janus_echotest_message_free(msg);
683
                        continue;
684
                }
685
                janus_echotest_session *session = NULL;
686
                janus_mutex_lock(&sessions_mutex);
687
                if(g_hash_table_lookup(sessions, msg->handle) != NULL ) {
688
                        session = (janus_echotest_session *)msg->handle->plugin_handle;
689
                }
690
                janus_mutex_unlock(&sessions_mutex);
691
                if(!session) {
692
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
693
                        janus_echotest_message_free(msg);
694
                        continue;
695
                }
696
                if(session->destroyed) {
697
                        janus_echotest_message_free(msg);
698
                        continue;
699
                }
700
                /* Handle request */
701
                error_code = 0;
702
                root = NULL;
703
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
704
                if(msg->message == NULL) {
705
                        JANUS_LOG(LOG_ERR, "No message??\n");
706
                        error_code = JANUS_ECHOTEST_ERROR_NO_MESSAGE;
707
                        g_snprintf(error_cause, 512, "%s", "No message??");
708
                        goto error;
709
                }
710
                json_error_t error;
711
                root = json_loads(msg->message, 0, &error);
712
                if(!root) {
713
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
714
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_JSON;
715
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
716
                        goto error;
717
                }
718
                if(!json_is_object(root)) {
719
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
720
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_JSON;
721
                        g_snprintf(error_cause, 512, "JSON error: not an object");
722
                        goto error;
723
                }
724
                /* Parse request */
725
                json_t *audio = json_object_get(root, "audio");
726
                if(audio && !json_is_boolean(audio)) {
727
                        JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
728
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
729
                        g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
730
                        goto error;
731
                }
732
                json_t *video = json_object_get(root, "video");
733
                if(video && !json_is_boolean(video)) {
734
                        JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
735
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
736
                        g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
737
                        goto error;
738
                }
739
                json_t *bitrate = json_object_get(root, "bitrate");
740
                if(bitrate && (!json_is_integer(bitrate) || json_integer_value(bitrate) < 0)) {
741
                        JANUS_LOG(LOG_ERR, "Invalid element (bitrate should be a positive integer)\n");
742
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
743
                        g_snprintf(error_cause, 512, "Invalid value (bitrate should be a positive integer)");
744
                        goto error;
745
                }
746
                json_t *record = json_object_get(root, "record");
747
                if(record && !json_is_boolean(record)) {
748
                        JANUS_LOG(LOG_ERR, "Invalid element (record should be a boolean)\n");
749
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
750
                        g_snprintf(error_cause, 512, "Invalid value (record should be a boolean)");
751
                        goto error;
752
                }
753
                json_t *recfile = json_object_get(root, "filename");
754
                if(recfile && !json_is_string(recfile)) {
755
                        JANUS_LOG(LOG_ERR, "Invalid element (filename should be a string)\n");
756
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
757
                        g_snprintf(error_cause, 512, "Invalid value (filename should be a string)");
758
                        goto error;
759
                }
760
                /* Enforce request */
761
                if(audio) {
762
                        session->audio_active = json_is_true(audio);
763
                        JANUS_LOG(LOG_VERB, "Setting audio property: %s\n", session->audio_active ? "true" : "false");
764
                }
765
                if(video) {
766
                        if(!session->video_active && json_is_true(video)) {
767
                                /* Send a PLI */
768
                                JANUS_LOG(LOG_VERB, "Just (re-)enabled video, sending a PLI to recover it\n");
769
                                char buf[12];
770
                                memset(buf, 0, 12);
771
                                janus_rtcp_pli((char *)&buf, 12);
772
                                gateway->relay_rtcp(session->handle, 1, buf, 12);
773
                        }
774
                        session->video_active = json_is_true(video);
775
                        JANUS_LOG(LOG_VERB, "Setting video property: %s\n", session->video_active ? "true" : "false");
776
                }
777
                if(bitrate) {
778
                        session->bitrate = json_integer_value(bitrate);
779
                        JANUS_LOG(LOG_VERB, "Setting video bitrate: %"SCNu64"\n", session->bitrate);
780
                        if(session->bitrate > 0) {
781
                                /* FIXME Generate a new REMB (especially useful for Firefox, which doesn't send any we can cap later) */
782
                                char buf[24];
783
                                memset(buf, 0, 24);
784
                                janus_rtcp_remb((char *)&buf, 24, session->bitrate);
785
                                JANUS_LOG(LOG_VERB, "Sending REMB\n");
786
                                gateway->relay_rtcp(session->handle, 1, buf, 24);
787
                                /* FIXME How should we handle a subsequent "no limit" bitrate? */
788
                        }
789
                }
790
                if(record) {
791
                        if(msg->sdp) {
792
                                session->has_audio = (strstr(msg->sdp, "m=audio") != NULL);
793
                                session->has_video = (strstr(msg->sdp, "m=video") != NULL);
794
                                session->has_data = (strstr(msg->sdp, "DTLS/SCTP") != NULL);
795
                        }
796
                        gboolean recording = json_is_true(record);
797
                        const char *recording_base = json_string_value(recfile);
798
                        JANUS_LOG(LOG_VERB, "Recording %s (base filename: %s)\n", recording ? "enabled" : "disabled", recording_base ? recording_base : "not provided");
799
                        janus_mutex_lock(&session->rec_mutex);
800
                        if(!recording) {
801
                                /* Not recording (anymore?) */
802
                                if(session->arc) {
803
                                        janus_recorder_close(session->arc);
804
                                        JANUS_LOG(LOG_INFO, "Closed audio recording %s\n", session->arc->filename ? session->arc->filename : "??");
805
                                        janus_recorder_free(session->arc);
806
                                }
807
                                session->arc = NULL;
808
                                if(session->vrc) {
809
                                        janus_recorder_close(session->vrc);
810
                                        JANUS_LOG(LOG_INFO, "Closed video recording %s\n", session->vrc->filename ? session->vrc->filename : "??");
811
                                        janus_recorder_free(session->vrc);
812
                                }
813
                                session->vrc = NULL;
814
                                if(session->drc) {
815
                                        janus_recorder_close(session->drc);
816
                                        JANUS_LOG(LOG_INFO, "Closed data recording %s\n", session->drc->filename ? session->drc->filename : "??");
817
                                        janus_recorder_free(session->drc);
818
                                }
819
                                session->drc = NULL;
820
                        } else {
821
                                /* We've started recording, send a PLI and go on */
822
                                char filename[255];
823
                                gint64 now = janus_get_real_time();
824
                                if(session->has_audio) {
825
                                        /* FIXME We assume we're recording Opus, here */
826
                                        memset(filename, 0, 255);
827
                                        if(recording_base) {
828
                                                /* Use the filename and path we have been provided */
829
                                                g_snprintf(filename, 255, "%s-audio", recording_base);
830
                                                session->arc = janus_recorder_create(NULL, "opus", filename);
831
                                                if(session->arc == NULL) {
832
                                                        /* FIXME We should notify the fact the recorder could not be created */
833
                                                        JANUS_LOG(LOG_ERR, "Couldn't open an audio recording file for this EchoTest user!\n");
834
                                                }
835
                                        } else {
836
                                                /* Build a filename */
837
                                                g_snprintf(filename, 255, "echotest-%p-%"SCNi64"-audio", session, now);
838
                                                session->arc = janus_recorder_create(NULL, "opus", filename);
839
                                                if(session->arc == NULL) {
840
                                                        /* FIXME We should notify the fact the recorder could not be created */
841
                                                        JANUS_LOG(LOG_ERR, "Couldn't open an audio recording file for this EchoTest user!\n");
842
                                                }
843
                                        }
844
                                }
845
                                if(session->has_video) {
846
                                        /* FIXME We assume we're recording VP8, here */
847
                                        memset(filename, 0, 255);
848
                                        if(recording_base) {
849
                                                /* Use the filename and path we have been provided */
850
                                                g_snprintf(filename, 255, "%s-video", recording_base);
851
                                                session->vrc = janus_recorder_create(NULL, "vp8", filename);
852
                                                if(session->vrc == NULL) {
853
                                                        /* FIXME We should notify the fact the recorder could not be created */
854
                                                        JANUS_LOG(LOG_ERR, "Couldn't open an video recording file for this EchoTest user!\n");
855
                                                }
856
                                        } else {
857
                                                /* Build a filename */
858
                                                g_snprintf(filename, 255, "echotest-%p-%"SCNi64"-video", session, now);
859
                                                session->vrc = janus_recorder_create(NULL, "vp8", filename);
860
                                                if(session->vrc == NULL) {
861
                                                        /* FIXME We should notify the fact the recorder could not be created */
862
                                                        JANUS_LOG(LOG_ERR, "Couldn't open an video recording file for this EchoTest user!\n");
863
                                                }
864
                                        }
865
                                        /* Send a PLI */
866
                                        JANUS_LOG(LOG_VERB, "Recording video, sending a PLI to kickstart it\n");
867
                                        char buf[12];
868
                                        memset(buf, 0, 12);
869
                                        janus_rtcp_pli((char *)&buf, 12);
870
                                        gateway->relay_rtcp(session->handle, 1, buf, 12);
871
                                }
872
                                if(session->has_data) {
873
                                        memset(filename, 0, 255);
874
                                        if(recording_base) {
875
                                                /* Use the filename and path we have been provided */
876
                                                g_snprintf(filename, 255, "%s-data", recording_base);
877
                                                session->drc = janus_recorder_create(NULL, "text", filename);
878
                                                if(session->drc == NULL) {
879
                                                        /* FIXME We should notify the fact the recorder could not be created */
880
                                                        JANUS_LOG(LOG_ERR, "Couldn't open a text data recording file for this EchoTest user!\n");
881
                                                }
882
                                        } else {
883
                                                /* Build a filename */
884
                                                g_snprintf(filename, 255, "echotest-%p-%"SCNi64"-data", session, now);
885
                                                session->drc = janus_recorder_create(NULL, "text", filename);
886
                                                if(session->drc == NULL) {
887
                                                        /* FIXME We should notify the fact the recorder could not be created */
888
                                                        JANUS_LOG(LOG_ERR, "Couldn't open a text data recording file for this EchoTest user!\n");
889
                                                }
890
                                        }
891
                                }
892
                        }
893
                        janus_mutex_unlock(&session->rec_mutex);
894
                }
895
                /* Any SDP to handle? */
896
                if(msg->sdp) {
897
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
898
                        session->has_audio = (strstr(msg->sdp, "m=audio") != NULL);
899
                        session->has_video = (strstr(msg->sdp, "m=video") != NULL);
900
                        session->has_data = (strstr(msg->sdp, "DTLS/SCTP") != NULL);
901
                }
902

    
903
                if(!audio && !video && !bitrate && !record && !msg->sdp) {
904
                        JANUS_LOG(LOG_ERR, "No supported attributes (audio, video, bitrate, record, jsep) found\n");
905
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
906
                        g_snprintf(error_cause, 512, "Message error: no supported attributes (audio, video, bitrate, record, jsep) found");
907
                        goto error;
908
                }
909

    
910
                json_decref(root);
911
                /* Prepare JSON event */
912
                json_t *event = json_object();
913
                json_object_set_new(event, "echotest", json_string("event"));
914
                json_object_set_new(event, "result", json_string("ok"));
915
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
916
                json_decref(event);
917
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
918
                if(!msg->sdp) {
919
                        int ret = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, NULL, NULL);
920
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
921
                } else {
922
                        /* Forward the same offer to the gateway, to start the echo test */
923
                        const char *type = NULL;
924
                        if(!strcasecmp(msg->sdp_type, "offer"))
925
                                type = "answer";
926
                        if(!strcasecmp(msg->sdp_type, "answer"))
927
                                type = "offer";
928
                        /* Any media direction that needs to be fixed? */
929
                        char *sdp = g_strdup(msg->sdp);
930
                        if(strstr(sdp, "a=recvonly")) {
931
                                /* Turn recvonly to inactive, as we simply bounce media back */
932
                                sdp = janus_string_replace(sdp, "a=recvonly", "a=inactive");
933
                        } else if(strstr(sdp, "a=sendonly")) {
934
                                /* Turn sendonly to recvonly */
935
                                sdp = janus_string_replace(sdp, "a=sendonly", "a=recvonly");
936
                                /* FIXME We should also actually not echo this media back, though... */
937
                        }
938
                        /* Make also sure we get rid of ULPfec, red, etc. */
939
                        if(strstr(sdp, "ulpfec")) {
940
                                /* FIXME This really needs some better code */
941
                                sdp = janus_string_replace(sdp, "a=rtpmap:116 red/90000\r\n", "");
942
                                sdp = janus_string_replace(sdp, "a=rtpmap:117 ulpfec/90000\r\n", "");
943
                                sdp = janus_string_replace(sdp, "a=rtpmap:96 rtx/90000\r\n", "");
944
                                sdp = janus_string_replace(sdp, "a=fmtp:96 apt=100\r\n", "");
945
                                sdp = janus_string_replace(sdp, "a=rtpmap:97 rtx/90000\r\n", "");
946
                                sdp = janus_string_replace(sdp, "a=fmtp:97 apt=101\r\n", "");
947
                                sdp = janus_string_replace(sdp, "a=rtpmap:98 rtx/90000\r\n", "");
948
                                sdp = janus_string_replace(sdp, "a=fmtp:98 apt=116\r\n", "");
949
                                sdp = janus_string_replace(sdp, " 116", "");
950
                                sdp = janus_string_replace(sdp, " 117", "");
951
                                sdp = janus_string_replace(sdp, " 96", "");
952
                                sdp = janus_string_replace(sdp, " 97", "");
953
                                sdp = janus_string_replace(sdp, " 98", "");
954
                        }
955
                        /* How long will the gateway take to push the event? */
956
                        g_atomic_int_set(&session->hangingup, 0);
957
                        gint64 start = janus_get_monotonic_time();
958
                        int res = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, type, sdp);
959
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n",
960
                                res, janus_get_monotonic_time()-start);
961
                        g_free(sdp);
962
                }
963
                g_free(event_text);
964
                janus_echotest_message_free(msg);
965

    
966
                if(notify_events && gateway->events_is_enabled()) {
967
                        /* Just to showcase how you can notify handlers, let's update them on our configuration */
968
                        json_t *info = json_object();
969
                        json_object_set_new(info, "audio_active", session->audio_active ? json_true() : json_false());
970
                        json_object_set_new(info, "video_active", session->video_active ? json_true() : json_false());
971
                        json_object_set_new(info, "bitrate", json_integer(session->bitrate));
972
                        if(session->arc || session->vrc) {
973
                                json_t *recording = json_object();
974
                                if(session->arc && session->arc->filename)
975
                                        json_object_set_new(recording, "audio", json_string(session->arc->filename));
976
                                if(session->vrc && session->vrc->filename)
977
                                        json_object_set_new(recording, "video", json_string(session->vrc->filename));
978
                                json_object_set_new(info, "recording", recording);
979
                        }
980
                        gateway->notify_event(session->handle, info);
981
                }
982

    
983
                /* Done, on to the next request */
984
                continue;
985
                
986
error:
987
                {
988
                        if(root != NULL)
989
                                json_decref(root);
990
                        /* Prepare JSON error event */
991
                        json_t *event = json_object();
992
                        json_object_set_new(event, "echotest", json_string("event"));
993
                        json_object_set_new(event, "error_code", json_integer(error_code));
994
                        json_object_set_new(event, "error", json_string(error_cause));
995
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
996
                        json_decref(event);
997
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
998
                        int ret = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, NULL, NULL);
999
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
1000
                        g_free(event_text);
1001
                        janus_echotest_message_free(msg);
1002
                }
1003
        }
1004
        g_free(error_cause);
1005
        JANUS_LOG(LOG_VERB, "Leaving EchoTest handler thread\n");
1006
        return NULL;
1007
}