Statistics
| Branch: | Revision:

janus-gateway / plugins / janus_echotest.c @ 18943780

History | View | Annotate | Download (28 KB)

1
/*! \file   janus_echotest.c
2
 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3
 * \copyright GNU General Public License v3
4
 * \brief  Janus EchoTest plugin
5
 * \details  This is a trivial EchoTest plugin for Janus, just used to
6
 * showcase the plugin interface. A peer attaching to this plugin will
7
 * receive back the same RTP packets and RTCP messages he sends: the
8
 * RTCP messages, of course, would be modified on the way by the gateway
9
 * to make sure they are coherent with the involved SSRCs. In order to
10
 * demonstrate how peer-provided messages can change the behaviour of a
11
 * plugin, this plugin implements a simple API based on three messages:
12
 * 
13
 * 1. a message to enable/disable audio (that is, to tell the plugin
14
 * whether incoming audio RTP packets need to be sent back or discarded);
15
 * 2. a message to enable/disable video (that is, to tell the plugin
16
 * whether incoming video RTP packets need to be sent back or discarded);
17
 * 3. a message to cap the bitrate (which would modify incoming RTCP
18
 * REMB messages before sending them back, in order to trick the peer into
19
 * thinking the available bandwidth is different).
20
 * 
21
 * \section echoapi Echo Test API
22
 * 
23
 * There's a single unnamed request you can send and it's asynchronous,
24
 * which means all responses (successes and errors) will be delivered
25
 * as events with the same transaction. 
26
 * 
27
 * The request has to be formatted as follows:
28
 *
29
\verbatim
30
{
31
        "audio" : true|false,
32
        "video" : true|false,
33
        "bitrate" : <numeric bitrate value>
34
}
35
\endverbatim
36
 *
37
 * \c audio instructs the plugin to do or do not bounce back audio
38
 * frames; \c video does the same for video; \c bitrate caps the
39
 * bandwidth to force on the browser encoding side (e.g., 128000 for
40
 * 128kbps).
41
 * 
42
 * The first request must be sent together with a JSEP offer to
43
 * negotiate a PeerConnection: a JSEP answer will be provided with
44
 * the asynchronous response notification. Subsequent requests (e.g., to
45
 * dynamically manipulate the bitrate while testing) have to be sent
46
 * without any JSEP payload attached.
47
 * 
48
 * A successful request will result in an \c ok event:
49
 * 
50
\verbatim
51
{
52
        "echotest" : "event",
53
        "result": "ok"
54
}
55
\endverbatim
56
 * 
57
 * An error instead will provide both an error code and a more verbose
58
 * description of the cause of the issue:
59
 * 
60
\verbatim
61
{
62
        "echotest" : "event",
63
        "error_code" : <numeric ID, check Macros below>,
64
        "error" : "<error description as a string>"
65
}
66
\endverbatim
67
 *
68
 * If the plugin detects a loss of the associated PeerConnection, a
69
 * "done" notification is triggered to inform the application the Echo
70
 * Test session is over:
71
 * 
72
\verbatim
73
{
74
        "echotest" : "event",
75
        "result": "done"
76
}
77
\endverbatim
78
 *
79
 * \ingroup plugins
80
 * \ref plugins
81
 */
82

    
83
#include "plugin.h"
84

    
85
#include <jansson.h>
86

    
87
#include "../debug.h"
88
#include "../apierror.h"
89
#include "../config.h"
90
#include "../mutex.h"
91
#include "../rtcp.h"
92
#include "../utils.h"
93

    
94

    
95
/* Plugin information */
96
#define JANUS_ECHOTEST_VERSION                        6
97
#define JANUS_ECHOTEST_VERSION_STRING        "0.0.6"
98
#define JANUS_ECHOTEST_DESCRIPTION                "This is a trivial EchoTest plugin for Janus, just used to showcase the plugin interface."
99
#define JANUS_ECHOTEST_NAME                                "JANUS EchoTest plugin"
100
#define JANUS_ECHOTEST_AUTHOR                        "Meetecho s.r.l."
101
#define JANUS_ECHOTEST_PACKAGE                        "janus.plugin.echotest"
102

    
103
/* Plugin methods */
104
janus_plugin *create(void);
105
int janus_echotest_init(janus_callbacks *callback, const char *config_path);
106
void janus_echotest_destroy(void);
107
int janus_echotest_get_api_compatibility(void);
108
int janus_echotest_get_version(void);
109
const char *janus_echotest_get_version_string(void);
110
const char *janus_echotest_get_description(void);
111
const char *janus_echotest_get_name(void);
112
const char *janus_echotest_get_author(void);
113
const char *janus_echotest_get_package(void);
114
void janus_echotest_create_session(janus_plugin_session *handle, int *error);
115
struct janus_plugin_result *janus_echotest_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp);
116
void janus_echotest_setup_media(janus_plugin_session *handle);
117
void janus_echotest_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len);
118
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len);
119
void janus_echotest_incoming_data(janus_plugin_session *handle, char *buf, int len);
120
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video);
121
void janus_echotest_hangup_media(janus_plugin_session *handle);
122
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error);
123
char *janus_echotest_query_session(janus_plugin_session *handle);
124

    
125
/* Plugin setup */
126
static janus_plugin janus_echotest_plugin =
127
        JANUS_PLUGIN_INIT (
128
                .init = janus_echotest_init,
129
                .destroy = janus_echotest_destroy,
130

    
131
                .get_api_compatibility = janus_echotest_get_api_compatibility,
132
                .get_version = janus_echotest_get_version,
133
                .get_version_string = janus_echotest_get_version_string,
134
                .get_description = janus_echotest_get_description,
135
                .get_name = janus_echotest_get_name,
136
                .get_author = janus_echotest_get_author,
137
                .get_package = janus_echotest_get_package,
138
                
139
                .create_session = janus_echotest_create_session,
140
                .handle_message = janus_echotest_handle_message,
141
                .setup_media = janus_echotest_setup_media,
142
                .incoming_rtp = janus_echotest_incoming_rtp,
143
                .incoming_rtcp = janus_echotest_incoming_rtcp,
144
                .incoming_data = janus_echotest_incoming_data,
145
                .slow_link = janus_echotest_slow_link,
146
                .hangup_media = janus_echotest_hangup_media,
147
                .destroy_session = janus_echotest_destroy_session,
148
                .query_session = janus_echotest_query_session,
149
        );
150

    
151
/* Plugin creator */
152
janus_plugin *create(void) {
153
        JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_ECHOTEST_NAME);
154
        return &janus_echotest_plugin;
155
}
156

    
157

    
158
/* Useful stuff */
159
static volatile gint initialized = 0, stopping = 0;
160
static janus_callbacks *gateway = NULL;
161
static GThread *handler_thread;
162
static GThread *watchdog;
163
static void *janus_echotest_handler(void *data);
164

    
165
typedef struct janus_echotest_message {
166
        janus_plugin_session *handle;
167
        char *transaction;
168
        char *message;
169
        char *sdp_type;
170
        char *sdp;
171
} janus_echotest_message;
172
static GAsyncQueue *messages = NULL;
173

    
174
typedef struct janus_echotest_session {
175
        janus_plugin_session *handle;
176
        gboolean audio_active;
177
        gboolean video_active;
178
        uint64_t bitrate;
179
        guint16 slowlink_count;
180
        volatile gint hangingup;
181
        gint64 destroyed;        /* Time at which this session was marked as destroyed */
182
} janus_echotest_session;
183
static GHashTable *sessions;
184
static GList *old_sessions;
185
static janus_mutex sessions_mutex;
186

    
187
void janus_echotest_message_free(janus_echotest_message *msg);
188
void janus_echotest_message_free(janus_echotest_message *msg) {
189
        if(!msg)
190
                return;
191

    
192
        msg->handle = NULL;
193

    
194
        g_free(msg->transaction);
195
        msg->transaction = NULL;
196
        g_free(msg->message);
197
        msg->message = NULL;
198
        g_free(msg->sdp_type);
199
        msg->sdp_type = NULL;
200
        g_free(msg->sdp);
201
        msg->sdp = NULL;
202

    
203
        g_free(msg);
204
}
205

    
206

    
207
/* Error codes */
208
#define JANUS_ECHOTEST_ERROR_NO_MESSAGE                        411
209
#define JANUS_ECHOTEST_ERROR_INVALID_JSON                412
210
#define JANUS_ECHOTEST_ERROR_INVALID_ELEMENT        413
211

    
212

    
213
/* EchoTest watchdog/garbage collector (sort of) */
214
void *janus_echotest_watchdog(void *data);
215
void *janus_echotest_watchdog(void *data) {
216
        JANUS_LOG(LOG_INFO, "EchoTest watchdog started\n");
217
        gint64 now = 0;
218
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
219
                janus_mutex_lock(&sessions_mutex);
220
                /* Iterate on all the sessions */
221
                now = janus_get_monotonic_time();
222
                if(old_sessions != NULL) {
223
                        GList *sl = old_sessions;
224
                        JANUS_LOG(LOG_HUGE, "Checking %d old EchoTest sessions...\n", g_list_length(old_sessions));
225
                        while(sl) {
226
                                janus_echotest_session *session = (janus_echotest_session *)sl->data;
227
                                if(!session) {
228
                                        sl = sl->next;
229
                                        continue;
230
                                }
231
                                if(now-session->destroyed >= 5*G_USEC_PER_SEC) {
232
                                        /* We're lazy and actually get rid of the stuff only after a few seconds */
233
                                        JANUS_LOG(LOG_VERB, "Freeing old EchoTest session\n");
234
                                        GList *rm = sl->next;
235
                                        old_sessions = g_list_delete_link(old_sessions, sl);
236
                                        sl = rm;
237
                                        session->handle = NULL;
238
                                        g_free(session);
239
                                        session = NULL;
240
                                        continue;
241
                                }
242
                                sl = sl->next;
243
                        }
244
                }
245
                janus_mutex_unlock(&sessions_mutex);
246
                g_usleep(500000);
247
        }
248
        JANUS_LOG(LOG_INFO, "EchoTest watchdog stopped\n");
249
        return NULL;
250
}
251

    
252

    
253
/* Plugin implementation */
254
int janus_echotest_init(janus_callbacks *callback, const char *config_path) {
255
        if(g_atomic_int_get(&stopping)) {
256
                /* Still stopping from before */
257
                return -1;
258
        }
259
        if(callback == NULL || config_path == NULL) {
260
                /* Invalid arguments */
261
                return -1;
262
        }
263

    
264
        /* Read configuration */
265
        char filename[255];
266
        g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_ECHOTEST_PACKAGE);
267
        JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
268
        janus_config *config = janus_config_parse(filename);
269
        if(config != NULL)
270
                janus_config_print(config);
271
        /* This plugin actually has nothing to configure... */
272
        janus_config_destroy(config);
273
        config = NULL;
274
        
275
        sessions = g_hash_table_new(NULL, NULL);
276
        janus_mutex_init(&sessions_mutex);
277
        messages = g_async_queue_new_full((GDestroyNotify) janus_echotest_message_free);
278
        /* This is the callback we'll need to invoke to contact the gateway */
279
        gateway = callback;
280
        g_atomic_int_set(&initialized, 1);
281

    
282
        GError *error = NULL;
283
        /* Start the sessions watchdog */
284
        watchdog = g_thread_try_new("etest watchdog", &janus_echotest_watchdog, NULL, &error);
285
        if(error != NULL) {
286
                g_atomic_int_set(&initialized, 0);
287
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest watchdog thread...\n", error->code, error->message ? error->message : "??");
288
                return -1;
289
        }
290
        /* Launch the thread that will handle incoming messages */
291
        handler_thread = g_thread_try_new("janus echotest handler", janus_echotest_handler, NULL, &error);
292
        if(error != NULL) {
293
                g_atomic_int_set(&initialized, 0);
294
                JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest handler thread...\n", error->code, error->message ? error->message : "??");
295
                return -1;
296
        }
297
        JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_ECHOTEST_NAME);
298
        return 0;
299
}
300

    
301
void janus_echotest_destroy(void) {
302
        if(!g_atomic_int_get(&initialized))
303
                return;
304
        g_atomic_int_set(&stopping, 1);
305

    
306
        if(handler_thread != NULL) {
307
                g_thread_join(handler_thread);
308
                handler_thread = NULL;
309
        }
310
        if(watchdog != NULL) {
311
                g_thread_join(watchdog);
312
                watchdog = NULL;
313
        }
314

    
315
        /* FIXME We should destroy the sessions cleanly */
316
        janus_mutex_lock(&sessions_mutex);
317
        g_hash_table_destroy(sessions);
318
        janus_mutex_unlock(&sessions_mutex);
319
        g_async_queue_unref(messages);
320
        messages = NULL;
321
        sessions = NULL;
322

    
323
        g_atomic_int_set(&initialized, 0);
324
        g_atomic_int_set(&stopping, 0);
325
        JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_ECHOTEST_NAME);
326
}
327

    
328
int janus_echotest_get_api_compatibility(void) {
329
        /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
330
        return JANUS_PLUGIN_API_VERSION;
331
}
332

    
333
int janus_echotest_get_version(void) {
334
        return JANUS_ECHOTEST_VERSION;
335
}
336

    
337
const char *janus_echotest_get_version_string(void) {
338
        return JANUS_ECHOTEST_VERSION_STRING;
339
}
340

    
341
const char *janus_echotest_get_description(void) {
342
        return JANUS_ECHOTEST_DESCRIPTION;
343
}
344

    
345
const char *janus_echotest_get_name(void) {
346
        return JANUS_ECHOTEST_NAME;
347
}
348

    
349
const char *janus_echotest_get_author(void) {
350
        return JANUS_ECHOTEST_AUTHOR;
351
}
352

    
353
const char *janus_echotest_get_package(void) {
354
        return JANUS_ECHOTEST_PACKAGE;
355
}
356

    
357
void janus_echotest_create_session(janus_plugin_session *handle, int *error) {
358
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
359
                *error = -1;
360
                return;
361
        }        
362
        janus_echotest_session *session = (janus_echotest_session *)calloc(1, sizeof(janus_echotest_session));
363
        if(session == NULL) {
364
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
365
                *error = -2;
366
                return;
367
        }
368
        session->handle = handle;
369
        session->audio_active = TRUE;
370
        session->video_active = TRUE;
371
        session->bitrate = 0;        /* No limit */
372
        session->destroyed = 0;
373
        g_atomic_int_set(&session->hangingup, 0);
374
        handle->plugin_handle = session;
375
        janus_mutex_lock(&sessions_mutex);
376
        g_hash_table_insert(sessions, handle, session);
377
        janus_mutex_unlock(&sessions_mutex);
378

    
379
        return;
380
}
381

    
382
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error) {
383
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
384
                *error = -1;
385
                return;
386
        }        
387
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
388
        if(!session) {
389
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
390
                *error = -2;
391
                return;
392
        }
393
        JANUS_LOG(LOG_VERB, "Removing Echo Test session...\n");
394
        janus_mutex_lock(&sessions_mutex);
395
        if(!session->destroyed) {
396
                session->destroyed = janus_get_monotonic_time();
397
                g_hash_table_remove(sessions, handle);
398
                /* Cleaning up and removing the session is done in a lazy way */
399
                old_sessions = g_list_append(old_sessions, session);
400
        }
401
        janus_mutex_unlock(&sessions_mutex);
402
        return;
403
}
404

    
405
char *janus_echotest_query_session(janus_plugin_session *handle) {
406
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
407
                return NULL;
408
        }        
409
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
410
        if(!session) {
411
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
412
                return NULL;
413
        }
414
        /* In the echo test, every session is the same: we just provide some configure info */
415
        json_t *info = json_object();
416
        json_object_set_new(info, "audio_active", json_string(session->audio_active ? "true" : "false"));
417
        json_object_set_new(info, "video_active", json_string(session->video_active ? "true" : "false"));
418
        json_object_set_new(info, "bitrate", json_integer(session->bitrate));
419
        json_object_set_new(info, "slowlink_count", json_integer(session->slowlink_count));
420
        json_object_set_new(info, "destroyed", json_integer(session->destroyed));
421
        char *info_text = json_dumps(info, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
422
        json_decref(info);
423
        return info_text;
424
}
425

    
426
struct janus_plugin_result *janus_echotest_handle_message(janus_plugin_session *handle, char *transaction, char *message, char *sdp_type, char *sdp) {
427
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
428
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized");
429
        janus_echotest_message *msg = calloc(1, sizeof(janus_echotest_message));
430
        if(msg == NULL) {
431
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
432
                return janus_plugin_result_new(JANUS_PLUGIN_ERROR, "Memory error");
433
        }
434
        msg->handle = handle;
435
        msg->transaction = transaction;
436
        msg->message = message;
437
        msg->sdp_type = sdp_type;
438
        msg->sdp = sdp;
439
        g_async_queue_push(messages, msg);
440

    
441
        /* All the requests to this plugin are handled asynchronously */
442
        return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, "I'm taking my time!");
443
}
444

    
445
void janus_echotest_setup_media(janus_plugin_session *handle) {
446
        JANUS_LOG(LOG_INFO, "WebRTC media is now available\n");
447
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
448
                return;
449
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
450
        if(!session) {
451
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
452
                return;
453
        }
454
        if(session->destroyed)
455
                return;
456
        g_atomic_int_set(&session->hangingup, 0);
457
        /* We really don't care, as we only send RTP/RTCP we get in the first place back anyway */
458
}
459

    
460
void janus_echotest_incoming_rtp(janus_plugin_session *handle, int video, char *buf, int len) {
461
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
462
                return;
463
        /* Simple echo test */
464
        if(gateway) {
465
                /* Honour the audio/video active flags */
466
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
467
                if(!session) {
468
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
469
                        return;
470
                }
471
                if(session->destroyed)
472
                        return;
473
                if((!video && session->audio_active) || (video && session->video_active)) {
474
                        gateway->relay_rtp(handle, video, buf, len);
475
                }
476
        }
477
}
478

    
479
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, int video, char *buf, int len) {
480
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
481
                return;
482
        /* Simple echo test */
483
        if(gateway) {
484
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
485
                if(!session) {
486
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
487
                        return;
488
                }
489
                if(session->destroyed)
490
                        return;
491
                if(session->bitrate > 0)
492
                        janus_rtcp_cap_remb(buf, len, session->bitrate);
493
                gateway->relay_rtcp(handle, video, buf, len);
494
        }
495
}
496

    
497
void janus_echotest_incoming_data(janus_plugin_session *handle, char *buf, int len) {
498
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
499
                return;
500
        /* Simple echo test */
501
        if(gateway) {
502
                janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
503
                if(!session) {
504
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
505
                        return;
506
                }
507
                if(session->destroyed)
508
                        return;
509
                if(buf == NULL || len <= 0)
510
                        return;
511
                char *text = g_malloc0(len+1);
512
                memcpy(text, buf, len);
513
                *(text+len) = '\0';
514
                JANUS_LOG(LOG_VERB, "Got a DataChannel message (%zu bytes) to bounce back: %s\n", strlen(text), text);
515
                /* We send back the same text with a custom prefix */
516
                const char *prefix = "Janus EchoTest here! You wrote: ";
517
                char *reply = g_malloc0(strlen(prefix)+len+1);
518
                g_snprintf(reply, strlen(prefix)+len, "%s%s", prefix, text);
519
                g_free(text);
520
                gateway->relay_data(handle, reply, strlen(reply));
521
                g_free(reply);
522
        }
523
}
524

    
525
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video) {
526
        /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */
527
        if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
528
                return;
529
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;        
530
        if(!session) {
531
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
532
                return;
533
        }
534
        if(session->destroyed)
535
                return;
536
        session->slowlink_count++;
537
        if(uplink && !video && !session->audio_active) {
538
                /* We're not relaying audio and the peer is expecting it, so NACKs are normal */
539
                JANUS_LOG(LOG_VERB, "Getting a lot of NACKs (slow uplink) for audio, but that's expected, a configure disabled the audio forwarding\n");
540
        } else if(uplink && video && !session->video_active) {
541
                /* We're not relaying video and the peer is expecting it, so NACKs are normal */
542
                JANUS_LOG(LOG_VERB, "Getting a lot of NACKs (slow uplink) for video, but that's expected, a configure disabled the video forwarding\n");
543
        } else {
544
                /* Slow uplink or downlink, maybe we set the bitrate cap too high? */
545
                if(video) {
546
                        /* Halve the bitrate, but don't go too low... */
547
                        session->bitrate = session->bitrate > 0 ? session->bitrate : 512*1024;
548
                        session->bitrate = session->bitrate/2;
549
                        if(session->bitrate < 64*1024)
550
                                session->bitrate = 64*1024;
551
                        JANUS_LOG(LOG_WARN, "Getting a lot of NACKs (slow %s) for %s, forcing a lower REMB: %"SCNu64"\n",
552
                                uplink ? "uplink" : "downlink", video ? "video" : "audio", session->bitrate);
553
                        /* ... and send a new REMB back */
554
                        char rtcpbuf[200];
555
                        memset(rtcpbuf, 0, 200);
556
                        /* FIXME First put a RR (fake)... */
557
                        int rrlen = 32;
558
                        rtcp_rr *rr = (rtcp_rr *)&rtcpbuf;
559
                        rr->header.version = 2;
560
                        rr->header.type = RTCP_RR;
561
                        rr->header.rc = 1;
562
                        rr->header.length = htons((rrlen/4)-1);
563
                        /* ... then put a SDES... */
564
                        int sdeslen = janus_rtcp_sdes((char *)(&rtcpbuf)+rrlen, 200-rrlen, "janusvideo", 10);
565
                        if(sdeslen > 0) {
566
                                /* ... and then finally a REMB */
567
                                janus_rtcp_remb((char *)(&rtcpbuf)+rrlen+sdeslen, 24, session->bitrate);
568
                                gateway->relay_rtcp(handle, 1, rtcpbuf, rrlen+sdeslen+24);
569
                        }
570
                        /* As a last thing, notify the user about this */
571
                        json_t *event = json_object();
572
                        json_object_set_new(event, "echotest", json_string("event"));
573
                        json_t *result = json_object();
574
                        json_object_set_new(result, "status", json_string("slow_link"));
575
                        json_object_set_new(result, "bitrate", json_integer(session->bitrate));
576
                        json_object_set_new(event, "result", result);
577
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
578
                        json_decref(event);
579
                        json_decref(result);
580
                        event = NULL;
581
                        gateway->push_event(session->handle, &janus_echotest_plugin, NULL, event_text, NULL, NULL);
582
                        g_free(event_text);
583
                }
584
        }
585
}
586

    
587
void janus_echotest_hangup_media(janus_plugin_session *handle) {
588
        JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n");
589
        if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
590
                return;
591
        janus_echotest_session *session = (janus_echotest_session *)handle->plugin_handle;
592
        if(!session) {
593
                JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
594
                return;
595
        }
596
        if(session->destroyed)
597
                return;
598
        if(g_atomic_int_add(&session->hangingup, 1))
599
                return;
600
        /* Send an event to the browser and tell it's over */
601
        json_t *event = json_object();
602
        json_object_set_new(event, "echotest", json_string("event"));
603
        json_object_set_new(event, "result", json_string("done"));
604
        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
605
        json_decref(event);
606
        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
607
        int ret = gateway->push_event(handle, &janus_echotest_plugin, NULL, event_text, NULL, NULL);
608
        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
609
        g_free(event_text);
610
        /* Reset controls */
611
        session->audio_active = TRUE;
612
        session->video_active = TRUE;
613
        session->bitrate = 0;
614
}
615

    
616
/* Thread to handle incoming messages */
617
static void *janus_echotest_handler(void *data) {
618
        JANUS_LOG(LOG_VERB, "Joining EchoTest handler thread\n");
619
        janus_echotest_message *msg = NULL;
620
        int error_code = 0;
621
        char *error_cause = calloc(512, sizeof(char));
622
        if(error_cause == NULL) {
623
                JANUS_LOG(LOG_FATAL, "Memory error!\n");
624
                return NULL;
625
        }
626
        json_t *root = NULL;
627
        while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
628
                if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
629
                        usleep(50000);
630
                        continue;
631
                }
632
                janus_echotest_session *session = (janus_echotest_session *)msg->handle->plugin_handle;
633
                if(!session) {
634
                        JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
635
                        janus_echotest_message_free(msg);
636
                        continue;
637
                }
638
                if(session->destroyed) {
639
                        janus_echotest_message_free(msg);
640
                        continue;
641
                }
642
                /* Handle request */
643
                error_code = 0;
644
                root = NULL;
645
                JANUS_LOG(LOG_VERB, "Handling message: %s\n", msg->message);
646
                if(msg->message == NULL) {
647
                        JANUS_LOG(LOG_ERR, "No message??\n");
648
                        error_code = JANUS_ECHOTEST_ERROR_NO_MESSAGE;
649
                        g_snprintf(error_cause, 512, "%s", "No message??");
650
                        goto error;
651
                }
652
                json_error_t error;
653
                root = json_loads(msg->message, 0, &error);
654
                if(!root) {
655
                        JANUS_LOG(LOG_ERR, "JSON error: on line %d: %s\n", error.line, error.text);
656
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_JSON;
657
                        g_snprintf(error_cause, 512, "JSON error: on line %d: %s", error.line, error.text);
658
                        goto error;
659
                }
660
                if(!json_is_object(root)) {
661
                        JANUS_LOG(LOG_ERR, "JSON error: not an object\n");
662
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_JSON;
663
                        g_snprintf(error_cause, 512, "JSON error: not an object");
664
                        goto error;
665
                }
666
                json_t *audio = json_object_get(root, "audio");
667
                if(audio && !json_is_boolean(audio)) {
668
                        JANUS_LOG(LOG_ERR, "Invalid element (audio should be a boolean)\n");
669
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
670
                        g_snprintf(error_cause, 512, "Invalid value (audio should be a boolean)");
671
                        goto error;
672
                }
673
                json_t *video = json_object_get(root, "video");
674
                if(video && !json_is_boolean(video)) {
675
                        JANUS_LOG(LOG_ERR, "Invalid element (video should be a boolean)\n");
676
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
677
                        g_snprintf(error_cause, 512, "Invalid value (video should be a boolean)");
678
                        goto error;
679
                }
680
                json_t *bitrate = json_object_get(root, "bitrate");
681
                if(bitrate && (!json_is_integer(bitrate) || json_integer_value(bitrate) < 0)) {
682
                        JANUS_LOG(LOG_ERR, "Invalid element (bitrate should be a positive integer)\n");
683
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
684
                        g_snprintf(error_cause, 512, "Invalid value (bitrate should be a positive integer)");
685
                        goto error;
686
                }
687
                if(audio) {
688
                        session->audio_active = json_is_true(audio);
689
                        JANUS_LOG(LOG_VERB, "Setting audio property: %s\n", session->audio_active ? "true" : "false");
690
                }
691
                if(video) {
692
                        if(!session->video_active && json_is_true(video)) {
693
                                /* Send a PLI */
694
                                JANUS_LOG(LOG_VERB, "Just (re-)enabled video, sending a PLI to recover it\n");
695
                                char buf[12];
696
                                memset(buf, 0, 12);
697
                                janus_rtcp_pli((char *)&buf, 12);
698
                                gateway->relay_rtcp(session->handle, 1, buf, 12);
699
                        }
700
                        session->video_active = json_is_true(video);
701
                        JANUS_LOG(LOG_VERB, "Setting video property: %s\n", session->video_active ? "true" : "false");
702
                }
703
                if(bitrate) {
704
                        session->bitrate = json_integer_value(bitrate);
705
                        JANUS_LOG(LOG_VERB, "Setting video bitrate: %"SCNu64"\n", session->bitrate);
706
                        if(session->bitrate > 0) {
707
                                /* FIXME Generate a new REMB (especially useful for Firefox, which doesn't send any we can cap later) */
708
                                char buf[24];
709
                                memset(buf, 0, 24);
710
                                janus_rtcp_remb((char *)&buf, 24, session->bitrate);
711
                                JANUS_LOG(LOG_VERB, "Sending REMB\n");
712
                                gateway->relay_rtcp(session->handle, 1, buf, 24);
713
                                /* FIXME How should we handle a subsequent "no limit" bitrate? */
714
                        }
715
                }
716
                /* Any SDP to handle? */
717
                if(msg->sdp) {
718
                        JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg->sdp_type, msg->sdp);
719
                }
720

    
721
                if(!audio && !video && !bitrate && !msg->sdp) {
722
                        JANUS_LOG(LOG_ERR, "No supported attributes (audio, video, bitrate, jsep) found\n");
723
                        error_code = JANUS_ECHOTEST_ERROR_INVALID_ELEMENT;
724
                        g_snprintf(error_cause, 512, "Message error: no supported attributes (audio, video, bitrate, jsep) found");
725
                        goto error;
726
                }
727

    
728
                json_decref(root);
729
                /* Prepare JSON event */
730
                json_t *event = json_object();
731
                json_object_set_new(event, "echotest", json_string("event"));
732
                json_object_set_new(event, "result", json_string("ok"));
733
                char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
734
                json_decref(event);
735
                JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
736
                if(!msg->sdp) {
737
                        int ret = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, NULL, NULL);
738
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
739
                } else {
740
                        /* Forward the same offer to the gateway, to start the echo test */
741
                        const char *type = NULL;
742
                        if(!strcasecmp(msg->sdp_type, "offer"))
743
                                type = "answer";
744
                        if(!strcasecmp(msg->sdp_type, "answer"))
745
                                type = "offer";
746
                        /* Any media direction that needs to be fixed? */
747
                        char *sdp = g_strdup(msg->sdp);
748
                        if(strstr(sdp, "a=recvonly")) {
749
                                /* Turn recvonly to inactive, as we simply bounce media back */
750
                                sdp = janus_string_replace(sdp, "a=recvonly", "a=inactive");
751
                        } else if(strstr(sdp, "a=sendonly")) {
752
                                /* Turn sendonly to recvonly */
753
                                sdp = janus_string_replace(sdp, "a=sendonly", "a=recvonly");
754
                                /* FIXME We should also actually not echo this media back, though... */
755
                        }
756
                        /* How long will the gateway take to push the event? */
757
                        gint64 start = janus_get_monotonic_time();
758
                        int res = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, type, sdp);
759
                        JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n",
760
                                res, janus_get_monotonic_time()-start);
761
                        g_free(sdp);
762
                }
763
                g_free(event_text);
764
                janus_echotest_message_free(msg);
765
                continue;
766
                
767
error:
768
                {
769
                        if(root != NULL)
770
                                json_decref(root);
771
                        /* Prepare JSON error event */
772
                        json_t *event = json_object();
773
                        json_object_set_new(event, "echotest", json_string("event"));
774
                        json_object_set_new(event, "error_code", json_integer(error_code));
775
                        json_object_set_new(event, "error", json_string(error_cause));
776
                        char *event_text = json_dumps(event, JSON_INDENT(3) | JSON_PRESERVE_ORDER);
777
                        json_decref(event);
778
                        JANUS_LOG(LOG_VERB, "Pushing event: %s\n", event_text);
779
                        int ret = gateway->push_event(msg->handle, &janus_echotest_plugin, msg->transaction, event_text, NULL, NULL);
780
                        JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));
781
                        g_free(event_text);
782
                        janus_echotest_message_free(msg);
783
                }
784
        }
785
        g_free(error_cause);
786
        JANUS_LOG(LOG_VERB, "Leaving EchoTest handler thread\n");
787
        return NULL;
788
}