Statistics
| Branch: | Revision:

peerstreamer-src / src / janus_instance.c @ 58fb2cdc

History | View | Annotate | Download (17.3 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<janus_instance.h>
21
#include<signal.h>
22
#include<string.h>
23
#include<debug.h>
24
#include<unistd.h>
25
#include<tokens.h>
26
#include<grapes_config.h>
27
#include <sys/prctl.h>  // for process sync
28

    
29
#define INVALID_PID -1
30

    
31
#define JANUS_MSG_SESSION_CREATE "{\"transaction\": \"random\", \"janus\": \"create\"}"
32
#define JANUS_MSG_SESSION_KEEPALIVE "{\"transaction\": \"ciao\", \"janus\": \"keepalive\"}"
33
#define JANUS_MSG_STREAMING_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.streaming\"}"
34
#define JANUS_MSG_VIDEOROOM_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.videoroom\"}"
35

    
36

    
37
struct janus_instance {
38
        pid_t janus_pid;
39
        char* endpoint;
40
        char * executable;
41
        char * conf_param;
42
        char * logfile;
43
        struct mg_mgr *mongoose_srv;
44
        struct periodic_task * heartbeat;
45
        uint64_t management_session;
46
        uint64_t streaming_plugin_handle;
47
        uint64_t videoroom_plugin_handle;
48
        struct task_manager * tm;
49
};
50

    
51
uint64_t janus_instance_msg_get_id(char *msg)
52
{
53
        char ** records;
54
        uint32_t ntoks, i;
55
        uint64_t res = 0;
56
        
57
        records = tokens_create(msg, ' ', &ntoks);
58
        if ((i = tokens_check(records, ntoks, "\"id\":")) > 0)
59
        {
60
                if (records[i+1][strlen(records[i+1])-1] == ',')
61
                        records[i+1][strlen(records[i+1])-1] = '\0';
62
                sscanf(records[i+1], "%"PRId64"", &res);
63
                debug("ID string: %s\t ID integer: %"PRId64"\n", records[i+1], res);
64
        }
65
        tokens_destroy(&records, ntoks);
66
        return res;
67
}
68

    
69
char * janus_instance_streaming_handle_path(const struct janus_instance * janus)
70
{
71
        char * res = NULL;
72
        if (janus && janus->management_session && janus->streaming_plugin_handle && janus->endpoint)
73
        {
74
                res = malloc(sizeof(char) * (strlen(janus->endpoint) + 43));  // each identifier comprises of 20 characters at most
75
                sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->streaming_plugin_handle);
76
        }
77
        return res;
78
}
79

    
80
char * janus_instance_videoroom_handle_path(const struct janus_instance * janus)
81
{
82
        char * res = NULL;
83
        if (janus && janus->management_session && janus->videoroom_plugin_handle && janus->endpoint)
84
        {
85
                res = malloc(sizeof(char) * (strlen(janus->endpoint) + 43));  // each identifier comprises of 20 characters at most
86
                sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->videoroom_plugin_handle);
87
        }
88
        return res;
89
}
90

    
91
char * janus_instance_session_path(const struct janus_instance * janus)
92
{
93
        char * res = NULL;
94
        if (janus && janus->management_session && janus->endpoint)
95
        {
96
                res = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));  // session identifier can be at most 20 characters long
97
                sprintf(res, "%s/%"PRId64"", janus->endpoint, janus->management_session);
98
        }
99
        return res;
100
}
101

    
102
struct janus_instance * janus_instance_create(struct mg_mgr *mongoose_srv, struct task_manager *tm, const char *config)
103
{
104
        struct janus_instance * ji = NULL;
105
        struct tag* tags;
106

    
107
        if (mongoose_srv && tm)
108
        {
109
                tags = grapes_config_parse(config);
110

    
111
                ji = malloc(sizeof(struct janus_instance));
112
                ji->endpoint = strdup(grapes_config_value_str_default(tags, "janus_endpoint", "127.0.0.1:8088/janus"));
113
                ji->executable = strdup(grapes_config_value_str_default(tags, "janus_executable", "Tools/janus/bin/janus"));
114
                ji->conf_param = strdup(grapes_config_value_str_default(tags, "janus_param", "--configs-folder=Tools/janus_conf"));
115
                ji->logfile = strdup(grapes_config_value_str_default(tags, "janus_logfile", "janus.log"));
116
                ji->janus_pid = INVALID_PID;
117
                ji->management_session = 0;
118
                ji->streaming_plugin_handle = 0;
119
                ji->videoroom_plugin_handle = 0;
120
                ji->tm = tm;
121
                ji->heartbeat = NULL;
122
                ji->mongoose_srv = mongoose_srv;
123

    
124
                if(tags)
125
                        free(tags);
126
        }
127
        return ji;
128
}
129

    
130
void janus_instance_destroy(struct janus_instance ** ji)
131
{
132
        if (ji && (*ji))
133
        {
134
                if ((*ji)->janus_pid != INVALID_PID)
135
                        kill((*ji)->janus_pid, SIGHUP);
136
                        
137
                if ((*ji)->heartbeat)
138
                        task_manager_destroy_task((*ji)->tm, &((*ji)->heartbeat));
139
                free((*ji)->endpoint);
140
                free((*ji)->executable);
141
                free((*ji)->conf_param);
142
                free((*ji)->logfile);
143
                free(*ji);
144
                *ji = NULL;
145
        }
146
}
147

    
148
void janus_instance_generic_handler(struct mg_connection *nc, int ev, void *ev_data)
149
{
150
        struct http_message *hm = (struct http_message *) ev_data;
151

    
152
        switch (ev) {
153
                case MG_EV_CONNECT:
154
                        if (*(int *) ev_data != 0)
155
                                debug("Janus communication failure\n");
156
                        break;
157
                case MG_EV_HTTP_REPLY:
158
                        switch (hm->resp_code) {
159
                                case 200:
160
                                default:
161
                                        debug("Janus answers: %d\n", hm->resp_code);
162
                        }
163
                        nc->flags |= MG_F_SEND_AND_CLOSE;
164
                        break;
165
                case MG_EV_CLOSE:
166
                        debug("Janus server closed connection\n");
167
                        break;
168
                default:
169
                        break;
170
        }
171
}
172

    
173
void janus_instance_streaming_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
174
{
175
        struct janus_instance * janus;
176
        struct http_message *hm = (struct http_message *) ev_data;
177
        char *buff;
178

    
179
        janus = nc->user_data;
180
        switch (ev) {
181
                case MG_EV_CONNECT:
182
                        if (*(int *) ev_data != 0)
183
                                debug("Janus communication failure\n");
184
                        break;
185
                case MG_EV_HTTP_REPLY:
186
                        switch (hm->resp_code) {
187
                                case 200:
188
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
189
                                        strncpy(buff, hm->body.p, hm->body.len);
190
                                        buff[hm->body.len] = '\0';  // make sure string terminates
191
                                        janus->streaming_plugin_handle = janus_instance_msg_get_id(buff);
192
                                        free(buff);
193
                                        debug("Got plugin streaming_handle!\n");
194
                                default:
195
                                        debug("Janus answers: %d\n", hm->resp_code);
196
                        }
197
                        nc->flags |= MG_F_SEND_AND_CLOSE;
198
                        break;
199
                case MG_EV_CLOSE:
200
                        debug("Janus server closed connection\n");
201
                        break;
202
                default:
203
                        break;
204
        }
205
}
206

    
207
void janus_instance_videoroom_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
208
{
209
        struct janus_instance * janus;
210
        struct http_message *hm = (struct http_message *) ev_data;
211
        char *buff;
212

    
213
        janus = nc->user_data;
214
        switch (ev) {
215
                case MG_EV_CONNECT:
216
                        if (*(int *) ev_data != 0)
217
                                debug("Janus communication failure\n");
218
                        break;
219
                case MG_EV_HTTP_REPLY:
220
                        switch (hm->resp_code) {
221
                                case 200:
222
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
223
                                        strncpy(buff, hm->body.p, hm->body.len);
224
                                        buff[hm->body.len] = '\0';  // make sure string terminates
225
                                        janus->videoroom_plugin_handle = janus_instance_msg_get_id(buff);
226
                                        free(buff);
227
                                        debug("Got plugin videoroom_handle!\n");
228
                                default:
229
                                        debug("Janus answers: %d\n", hm->resp_code);
230
                        }
231
                        nc->flags |= MG_F_SEND_AND_CLOSE;
232
                        break;
233
                case MG_EV_CLOSE:
234
                        debug("Janus server closed connection\n");
235
                        break;
236
                default:
237
                        break;
238
        }
239
}
240

    
241
void janus_instance_session_handler(struct mg_connection *nc, int ev, void *ev_data)
242
{
243
        struct janus_instance * janus;
244
        struct mg_connection * conn;
245
        struct http_message *hm = (struct http_message *) ev_data;
246
        char *buff;
247

    
248
        janus = nc->user_data;
249
        switch (ev) {
250
                case MG_EV_CONNECT:
251
                        if (*(int *) ev_data != 0)
252
                                debug("Janus communication failure\n");
253
                        break;
254
                case MG_EV_HTTP_REPLY:
255
                        switch (hm->resp_code) {
256
                                case 200:
257
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
258
                                        strncpy(buff, hm->body.p, hm->body.len);
259
                                        buff[hm->body.len] = '\0';  // make sure string terminates
260
                                        janus->management_session = janus_instance_msg_get_id(buff);
261
                                        free(buff);
262
                                        
263
                                        // Requesting handle for the streaming plugin
264
                                        buff = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));
265
                                        sprintf(buff, "%s/%"PRId64"", janus->endpoint, janus->management_session);
266
                                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_streaming_plugin_handler, buff, NULL, JANUS_MSG_STREAMING_PLUGIN_CREATE);
267
                                        free(buff);
268
                                        if (conn)
269
                                                conn->user_data = (void *) janus;
270

    
271
                                        // Requesting handle for the videoroom plugin
272
                                        buff = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));
273
                                        sprintf(buff, "%s/%"PRId64"", janus->endpoint, janus->management_session);
274
                                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_videoroom_plugin_handler, buff, NULL, JANUS_MSG_VIDEOROOM_PLUGIN_CREATE);
275
                                        free(buff);
276
                                        if (conn)
277
                                                conn->user_data = (void *) janus;
278
                                default:
279
                                        debug("Janus answers: %d\n", hm->resp_code);
280
                        }
281
                        nc->flags |= MG_F_SEND_AND_CLOSE;
282
                        break;
283
                case MG_EV_CLOSE:
284
                        debug("Janus server closed connection\n");
285
                        break;
286
                default:
287
                        break;
288
        }
289
}
290

    
291
int8_t        janus_instance_create_management_handle(struct janus_instance *janus)
292
{
293
        struct mg_connection * conn;
294
        int8_t res = -1;
295

    
296
        if (janus)
297
        {
298
                conn = mg_connect_http(janus->mongoose_srv, janus_instance_session_handler, janus->endpoint, NULL, JANUS_MSG_SESSION_CREATE);
299
                if (conn)
300
                {
301
                        conn->user_data = (void *) janus;
302
                        res = 0;
303
                } 
304
        }
305
        return res;
306
}
307

    
308
uint8_t janus_instance_heartbeat(struct periodic_task * pt)
309
{
310
        struct janus_instance * janus;
311
        struct mg_connection * conn;
312
        char * uri;
313

    
314
        janus = (struct janus_instance *) periodic_task_get_data(pt);
315
        uri = janus_instance_session_path(janus);
316
        if (uri)
317
        {
318
                conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, JANUS_MSG_SESSION_KEEPALIVE);
319
                if (conn)
320
                        conn->user_data = (void *) janus;
321
                free(uri);
322
        }
323
        return 0;
324
}
325

    
326
int8_t janus_instance_launch(struct janus_instance * ji)
327
{
328
        int8_t res = -1;
329
        struct stat s;
330
        char * argv[4];
331
        int fd;
332

    
333
        if (ji && ji->janus_pid == INVALID_PID)
334
        {
335
                info("%s - %s\n", ji->executable, ji->conf_param);
336
                res = stat(ji->executable, &s);
337
                // check exe existence
338
                if (res == 0 && S_ISREG(s.st_mode))
339
                {
340
                        ji->janus_pid = fork();
341
                        if (ji->janus_pid != INVALID_PID)
342
                        {
343
                                if (ji->janus_pid) // the parent
344
                                {
345
                                        sleep(1); // let janus bootstrap
346
                                        res = janus_instance_create_management_handle(ji);
347
                                        if (res == 0)
348
                                                ji->heartbeat = task_manager_new_task(ji->tm, janus_instance_heartbeat, NULL, 30000, ji);
349
                                }
350
                                else // the child
351
                                {
352
                                        fd = creat(ji->logfile, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
353
                                        dup2(fd, 1);   // make stdout go to file
354
                                        dup2(fd, 2);   // make stderr go to file - you may choose to not do this
355
                                        close(fd);
356

    
357
                                        prctl(PR_SET_PDEATHSIG, SIGHUP); // makes kernel dispatch me a SIGHUP if parent dies
358

    
359
                                        argv[0] = ji->executable;
360
                                        argv[1] = ji->conf_param;
361
                                        argv[2] = NULL;
362
                                        res = execve(ji->executable, argv, NULL);
363
                                        info("Error on launching Janus execution\n");
364
                                }
365
                        } else
366
                        {
367
                                info("Error on forking\n");
368
                                res = -1;
369
                        }
370

    
371
                        
372
                } else
373
                        info("Janus executable not found\n");
374
        }
375
        return res;
376
}
377

    
378
void janus_instance_streaming_point_handler(struct mg_connection *nc, int ev, void *ev_data)
379
{
380
        uint64_t * mp_id;
381
        struct http_message *hm = (struct http_message *) ev_data;
382
        char *buff;
383
        void ** data;
384
        struct streamer_creation_callback * scc;
385

    
386
        data = nc->user_data;
387
        mp_id = data[0];
388
        scc = data[1];
389
        switch (ev) {
390
                case MG_EV_CONNECT:
391
                        if (*(int *) ev_data != 0)
392
                                debug("Janus communication failure\n");
393
                                debug("Ora triggero!\n");
394
                        break;
395
                case MG_EV_HTTP_REPLY:
396
                        switch (hm->resp_code) {
397
                                case 200:
398
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
399
                                        strncpy(buff, hm->body.p, hm->body.len);
400
                                        buff[hm->body.len] = '\0';  // make sure string terminates
401
                                        debug(buff);
402
                                        *mp_id = janus_instance_msg_get_id(buff);
403
                                        free(buff);
404
                                default:
405
                                        debug("Janus answers: %d\n", hm->resp_code);
406
                        }
407
                        nc->flags |= MG_F_SEND_AND_CLOSE;
408
                        break;
409
                case MG_EV_CLOSE:
410
                        debug("Janus server closed connection\n");
411
                        if (scc)
412
                        {
413
                                debug("Janus instance calls creation trigger\n");
414
                                streamer_creation_callback_trigger(scc, *mp_id ? 0 : 1);
415
                                streamer_creation_callback_destroy(&scc);
416
                                free(data);
417
                        }
418
                        break;
419
                default:
420
                        break;
421
        }
422
}
423

    
424
void janus_instance_videoroom_creation_handler(struct mg_connection *nc, int ev, void *ev_data)
425
{
426
        struct http_message *hm = (struct http_message *) ev_data;
427
        void ** data;
428
        struct streamer_creation_callback * scc;
429

    
430
        data = nc->user_data;
431
        scc = data[0];
432
        switch (ev) {
433
                case MG_EV_CONNECT:
434
                        if (*(int *) ev_data != 0)
435
                                debug("Janus communication failure\n");
436
                        break;
437
                case MG_EV_HTTP_REPLY:
438
                        switch (hm->resp_code) {
439
                                case 200:
440
                                        info("Room created\n");
441
                                default:
442
                                        debug("Janus answers: %d\n", hm->resp_code);
443
                        }
444
                        nc->flags |= MG_F_SEND_AND_CLOSE;
445
                        break;
446
                case MG_EV_CLOSE:
447
                        debug("Janus server closed connection\n");
448
                        if (scc)
449
                        {
450
                                debug("Janus instance calls creation trigger\n");
451
                                streamer_creation_callback_trigger(scc, 0);
452
                                streamer_creation_callback_destroy(&scc);
453
                                free(data);
454
                        }
455
                        break;
456
                default:
457
                        break;
458
        }
459
}
460

    
461
int8_t janus_instance_create_streaming_point(struct janus_instance const * janus, uint64_t *mp_id, uint16_t audio_port, uint16_t video_port, struct streamer_creation_callback *scc)
462
{
463
        struct mg_connection * conn;
464
        int8_t res = -1;
465
        char * uri;
466
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"create\",\"type\":\"rtp\",\
467
                                  \"audio\":true,\"audioport\":%"PRId16",\"audiopt\":111,\"audiortpmap\":\"opus/48000/2\",\
468
                                  \"video\":true,\"videoport\":%"PRId16",\"videopt\":100,\"videortpmap\":\"VP8/90000\"}}";
469
        char buff[280];
470
        void ** data;
471

    
472
        if (janus && mp_id && audio_port && video_port)
473
        {
474
                uri = janus_instance_streaming_handle_path(janus);
475
                if (uri)
476
                {
477
                        sprintf(buff, fmt, audio_port, video_port);
478
                   debug("Conctating Janus to create a new mountpoint\n");        
479
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_streaming_point_handler, uri, NULL, buff);
480
                        if (conn)
481
                        {
482
                                data = malloc(sizeof(void *) * 2);
483
                                data[0] = mp_id;
484
                                data[1] = scc;
485
                                conn->user_data = data;
486
                                res = 0;
487
                        } else
488
                           debug("Aaargh, no connection!\n");        
489
                        free(uri);
490
                }
491
        }
492
        return res;
493
}
494

    
495
int8_t janus_instance_destroy_streaming_point(struct janus_instance const * janus, uint64_t mp_id)
496
{
497
        struct mg_connection * conn;
498
        int8_t res = -1;
499
        char * uri;
500
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"destroy\",\"id\": %"PRId64"}}";
501
        char buff[120];
502

    
503
        if (janus && mp_id)
504
        {
505
                uri = janus_instance_streaming_handle_path(janus);
506
                if (uri)
507
                {
508
                        sprintf(buff, fmt, mp_id);
509
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
510
                        if (conn)
511
                        {
512
                                conn->user_data = (void *) mp_id;
513
                                res = 0;
514
                        } 
515
                        free(uri);
516
                }
517
        }
518
        return res;
519
}
520

    
521
int8_t janus_instance_create_videoroom(struct janus_instance const * janus, const char * room_id, struct streamer_creation_callback *scc)
522
{
523
        struct mg_connection * conn;
524
        int8_t res = -1;
525
        char * uri;
526
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"create\",\"room\":%s,\"publishers\":1,\"bitrate\":128000,\"record\":false,\"description\":\"Room %s\",\"fir_freq\":100}}";
527

    
528
        char buff[280];
529
        void ** data;
530

    
531
        if (janus && room_id)
532
        {
533
                uri = janus_instance_videoroom_handle_path(janus);
534
                if (uri)
535
                {
536
                        sprintf(buff, fmt, room_id, room_id);
537
                   debug("Conctating Janus to create a new video room\n");        
538
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_videoroom_creation_handler, uri, NULL, buff);
539
                        if (conn)
540
                        {
541
                                data = malloc(sizeof(void *));
542
                                data[0] = scc;
543
                                conn->user_data = data;
544
                                res = 0;
545
                        } else
546
                           debug("Aaargh, no connection!\n");        
547
                        free(uri);
548
                }
549
        }
550
        return res;
551
}
552

    
553
int8_t janus_instance_destroy_videoroom(struct janus_instance const * janus, const char * room_id)
554
{
555
        struct mg_connection * conn;
556
        int8_t res = -1;
557
        char * uri;
558
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"destroy\",\"room\": %s}}";
559
        char buff[120];
560

    
561
        if (janus && room_id)
562
        {
563
                uri = janus_instance_videoroom_handle_path(janus);
564
                if (uri)
565
                {
566
                        sprintf(buff, fmt, room_id);
567
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
568
                        if (conn)
569
                        {
570
                                conn->user_data = (void *) room_id;
571
                                res = 0;
572
                        } 
573
                        free(uri);
574
                }
575
        }
576
        return res;
577
}
578

    
579
int8_t janus_instance_forward_rtp(struct janus_instance const * janus, const char * room_id, uint64_t participant_id, const char * rtp_dest, uint16_t audio_port, uint16_t video_port)
580
{
581
        struct mg_connection * conn;
582
        int8_t res = -1;
583
        char * uri;
584
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"rtp_forward\",\"room\":%s,\"publisher_id\":%"PRId64", \"host\": \"%s\",\"audio_port\":%"PRId16",\"video_port\":%"PRId16",\"audio_pt\":111,\"video_pt\":98}}";
585

    
586
        char buff[280];
587

    
588
        if (janus && room_id && rtp_dest && audio_port > 0 && video_port > 0)
589
        {
590
                uri = janus_instance_videoroom_handle_path(janus);
591
                if (uri)
592
                {
593
                        sprintf(buff, fmt, room_id, participant_id, rtp_dest, audio_port, video_port);
594
                    debug("Conctating Janus to create a new video room\n");        
595
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
596
                        if (conn)
597
                                res = 0;
598
                        else
599
                           debug("Aaargh, no connection!\n");        
600
                        free(uri);
601
                }
602
        }
603
        return res;
604
}