Statistics
| Branch: | Revision:

peerstreamer-src / src / janus_instance.c @ c81c126e

History | View | Annotate | Download (11.6 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<janus_instance.h>
21
#include<signal.h>
22
#include<string.h>
23
#include<debug.h>
24
#include<unistd.h>
25
#include<tokens.h>
26
#include<grapes_config.h>
27

    
28
#define INVALID_PID -1
29

    
30
#define JANUS_MSG_SESSION_CREATE "{\"transaction\": \"random\", \"janus\": \"create\"}"
31
#define JANUS_MSG_SESSION_KEEPALIVE "{\"transaction\": \"ciao\", \"janus\": \"keepalive\"}"
32
#define JANUS_MSG_PLUGIN_CREATE "{\"transaction\": \"ciao\", \"janus\": \"attach\", \"plugin\":\"janus.plugin.streaming\"}"
33

    
34

    
35
struct janus_instance {
36
        pid_t janus_pid;
37
        char* endpoint;
38
        char * executable;
39
        char * conf_param;
40
        char * logfile;
41
        struct mg_mgr *mongoose_srv;
42
        struct periodic_task * heartbeat;
43
        uint64_t management_session;
44
        uint64_t plugin_handle;
45
        struct task_manager * tm;
46
};
47

    
48
uint64_t janus_instance_msg_get_id(char *msg)
49
{
50
        char ** records;
51
        uint32_t ntoks, i;
52
        uint64_t res = 0;
53
        
54
        records = tokens_create(msg, ' ', &ntoks);
55
        if ((i = tokens_check(records, ntoks, "\"id\":")) > 0)
56
        {
57
                if (records[i+1][strlen(records[i+1])-1] == ',')
58
                        records[i+1][strlen(records[i+1])-1] = '\0';
59
                sscanf(records[i+1], "%"PRId64"", &res);
60
                debug("ID string: %s\t ID integer: %"PRId64"\n", records[i+1], res);
61
        }
62
        tokens_destroy(&records, ntoks);
63
        return res;
64
}
65

    
66
char * janus_instance_handle_path(const struct janus_instance * janus)
67
{
68
        char * res = NULL;
69
        if (janus && janus->management_session && janus->plugin_handle && janus->endpoint)
70
        {
71
                res = malloc(sizeof(char) * (strlen(janus->endpoint) + 43));  // each identifier comprises of 20 characters at most
72
                sprintf(res, "%s/%"PRId64"/%"PRId64"", janus->endpoint, janus->management_session, janus->plugin_handle);
73
        }
74
        return res;
75
}
76

    
77
char * janus_instance_session_path(const struct janus_instance * janus)
78
{
79
        char * res = NULL;
80
        if (janus && janus->management_session && janus->endpoint)
81
        {
82
                res = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));  // session identifier can be at most 20 characters long
83
                sprintf(res, "%s/%"PRId64"", janus->endpoint, janus->management_session);
84
        }
85
        return res;
86
}
87

    
88
struct janus_instance * janus_instance_create(struct mg_mgr *mongoose_srv, struct task_manager *tm, const char *config)
89
{
90
        struct janus_instance * ji = NULL;
91
        struct tag* tags;
92

    
93
        if (mongoose_srv && tm)
94
        {
95
                tags = grapes_config_parse(config);
96

    
97
                ji = malloc(sizeof(struct janus_instance));
98
                ji->endpoint = strdup(grapes_config_value_str_default(tags, "janus_endpoint", "127.0.0.1:8088/janus"));
99
                ji->executable = strdup(grapes_config_value_str_default(tags, "janus_executable", "Tools/janus/bin/janus"));
100
                ji->conf_param = strdup(grapes_config_value_str_default(tags, "janus_param", "--configs-folder=Tools/janus_conf"));
101
                ji->logfile = strdup(grapes_config_value_str_default(tags, "janus_logfile", "janus.log"));
102
                ji->janus_pid = INVALID_PID;
103
                ji->management_session = 0;
104
                ji->plugin_handle = 0;
105
                ji->tm = tm;
106
                ji->heartbeat = NULL;
107
                ji->mongoose_srv = mongoose_srv;
108

    
109
                if(tags)
110
                        free(tags);
111
        }
112
        return ji;
113
}
114

    
115
void janus_instance_destroy(struct janus_instance ** ji)
116
{
117
        if (ji && (*ji))
118
        {
119
                if ((*ji)->janus_pid != INVALID_PID)
120
                        kill((*ji)->janus_pid, SIGTERM);
121
                        
122
                if ((*ji)->heartbeat)
123
                        task_manager_destroy_task((*ji)->tm, &((*ji)->heartbeat));
124
                free((*ji)->endpoint);
125
                free((*ji)->executable);
126
                free((*ji)->conf_param);
127
                free((*ji)->logfile);
128
                free(*ji);
129
                *ji = NULL;
130
        }
131
}
132

    
133
void janus_instance_generic_handler(struct mg_connection *nc, int ev, void *ev_data)
134
{
135
        struct http_message *hm = (struct http_message *) ev_data;
136

    
137
        switch (ev) {
138
                case MG_EV_CONNECT:
139
                        if (*(int *) ev_data != 0)
140
                                debug("Janus communication failure\n");
141
                        break;
142
                case MG_EV_HTTP_REPLY:
143
                        switch (hm->resp_code) {
144
                                case 200:
145
                                default:
146
                                        debug("Janus answers: %d\n", hm->resp_code);
147
                        }
148
                        nc->flags |= MG_F_SEND_AND_CLOSE;
149
                        break;
150
                case MG_EV_CLOSE:
151
                        debug("Janus server closed connection\n");
152
                        break;
153
                default:
154
                        break;
155
        }
156
}
157

    
158
void janus_instance_plugin_handler(struct mg_connection *nc, int ev, void *ev_data)
159
{
160
        struct janus_instance * janus;
161
        struct http_message *hm = (struct http_message *) ev_data;
162
        char *buff;
163

    
164
        janus = nc->user_data;
165
        switch (ev) {
166
                case MG_EV_CONNECT:
167
                        if (*(int *) ev_data != 0)
168
                                debug("Janus communication failure\n");
169
                        break;
170
                case MG_EV_HTTP_REPLY:
171
                        switch (hm->resp_code) {
172
                                case 200:
173
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
174
                                        strncpy(buff, hm->body.p, hm->body.len);
175
                                        buff[hm->body.len] = '\0';  // make sure string terminates
176
                                        janus->plugin_handle = janus_instance_msg_get_id(buff);
177
                                        free(buff);
178
                                        debug("Got plugin handle!\n");
179
                                default:
180
                                        debug("Janus answers: %d\n", hm->resp_code);
181
                        }
182
                        nc->flags |= MG_F_SEND_AND_CLOSE;
183
                        break;
184
                case MG_EV_CLOSE:
185
                        debug("Janus server closed connection\n");
186
                        break;
187
                default:
188
                        break;
189
        }
190
}
191

    
192
void janus_instance_session_handler(struct mg_connection *nc, int ev, void *ev_data)
193
{
194
        struct janus_instance * janus;
195
        struct mg_connection * conn;
196
        struct http_message *hm = (struct http_message *) ev_data;
197
        char *buff;
198

    
199
        janus = nc->user_data;
200
        switch (ev) {
201
                case MG_EV_CONNECT:
202
                        if (*(int *) ev_data != 0)
203
                                debug("Janus communication failure\n");
204
                        break;
205
                case MG_EV_HTTP_REPLY:
206
                        switch (hm->resp_code) {
207
                                case 200:
208
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
209
                                        strncpy(buff, hm->body.p, hm->body.len);
210
                                        buff[hm->body.len] = '\0';  // make sure string terminates
211
                                        janus->management_session = janus_instance_msg_get_id(buff);
212
                                        free(buff);
213
                                        buff = malloc(sizeof(char) * (strlen(janus->endpoint) + 22));
214
                                        sprintf(buff, "%s/%"PRId64"", janus->endpoint, janus->management_session);
215
                                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_plugin_handler, buff, NULL, JANUS_MSG_PLUGIN_CREATE);
216
                                        free(buff);
217
                                        if (conn)
218
                                                conn->user_data = (void *) janus;
219
                                default:
220
                                        debug("Janus answers: %d\n", hm->resp_code);
221
                        }
222
                        nc->flags |= MG_F_SEND_AND_CLOSE;
223
                        break;
224
                case MG_EV_CLOSE:
225
                        debug("Janus server closed connection\n");
226
                        break;
227
                default:
228
                        break;
229
        }
230
}
231

    
232
int8_t        janus_instance_create_management_handle(struct janus_instance *janus)
233
{
234
        struct mg_connection * conn;
235
        int8_t res = -1;
236

    
237
        if (janus)
238
        {
239
                conn = mg_connect_http(janus->mongoose_srv, janus_instance_session_handler, janus->endpoint, NULL, JANUS_MSG_SESSION_CREATE);
240
                if (conn)
241
                {
242
                        conn->user_data = (void *) janus;
243
                        res = 0;
244
                } 
245
        }
246
        return res;
247
}
248

    
249
uint8_t janus_instance_heartbeat(struct periodic_task * pt)
250
{
251
        struct janus_instance * janus;
252
        struct mg_connection * conn;
253
        char * uri;
254

    
255
        janus = (struct janus_instance *) periodic_task_get_data(pt);
256
        uri = janus_instance_session_path(janus);
257
        if (uri)
258
        {
259
                conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, JANUS_MSG_SESSION_KEEPALIVE);
260
                if (conn)
261
                        conn->user_data = (void *) janus;
262
                free(uri);
263
        }
264
        return 0;
265
}
266

    
267
int8_t janus_instance_launch(struct janus_instance * ji)
268
{
269
        int8_t res = -1;
270
        struct stat s;
271
        char * argv[4];
272
        int fd;
273

    
274
        if (ji && ji->janus_pid == INVALID_PID)
275
        {
276
                res = stat(ji->executable, &s);
277
                // check exe existence
278
                if (res == 0 && S_ISREG(s.st_mode))
279
                {
280
                        ji->janus_pid = fork();
281
                        if (ji->janus_pid != INVALID_PID)
282
                        {
283
                                if (ji->janus_pid) // the parent
284
                                {
285
                                        sleep(1); // let janus bootstrap
286
                                        res = janus_instance_create_management_handle(ji);
287
                                        if (res == 0)
288
                                                ji->heartbeat = task_manager_new_task(ji->tm, janus_instance_heartbeat, NULL, 30000, ji);
289
                                }
290
                                else // the child
291
                                {
292
                                        fd = creat(ji->logfile, 'w');
293
                                        dup2(fd, 1);   // make stdout go to file
294
                                        dup2(fd, 2);   // make stderr go to file - you may choose to not do this
295
                                        close(fd);
296

    
297
                                        argv[0] = ji->executable;
298
                                        argv[1] = ji->conf_param;
299
                                        argv[2] = NULL;
300
                                        res = execve(ji->executable, argv, NULL);
301
                                        info("Error on launching Janus execution\n");
302
                                }
303
                        } else
304
                        {
305
                                info("Error on forking\n");
306
                                res = -1;
307
                        }
308

    
309
                        
310
                } else
311
                        info("Janus executable not found\n");
312
        }
313
        return res;
314
}
315

    
316
void janus_instance_streaming_point_handler(struct mg_connection *nc, int ev, void *ev_data)
317
{
318
        uint64_t * mp_id;
319
        struct http_message *hm = (struct http_message *) ev_data;
320
        char *buff;
321
        void ** data;
322
        struct streamer_creation_callback * scc;
323

    
324
        data = nc->user_data;
325
        mp_id = data[0];
326
        scc = data[1];
327
        switch (ev) {
328
                case MG_EV_CONNECT:
329
                        if (*(int *) ev_data != 0)
330
                                debug("Janus communication failure\n");
331
                                debug("Ora triggero!\n");
332
                        break;
333
                case MG_EV_HTTP_REPLY:
334
                        switch (hm->resp_code) {
335
                                case 200:
336
                                        buff = malloc(sizeof(char) * (hm->body.len + 1));
337
                                        strncpy(buff, hm->body.p, hm->body.len);
338
                                        buff[hm->body.len] = '\0';  // make sure string terminates
339
                                        debug(buff);
340
                                        *mp_id = janus_instance_msg_get_id(buff);
341
                                        free(buff);
342
                                default:
343
                                        debug("Janus answers: %d\n", hm->resp_code);
344
                        }
345
                        nc->flags |= MG_F_SEND_AND_CLOSE;
346
                        break;
347
                case MG_EV_CLOSE:
348
                        debug("Janus server closed connection\n");
349
                        if (scc)
350
                        {
351
                                debug("Janus instance calls creation trigger\n");
352
                                streamer_creation_callback_trigger(scc, *mp_id ? 0 : 1);
353
                                streamer_creation_callback_destroy(&scc);
354
                                free(data);
355
                        }
356
                        break;
357
                default:
358
                        break;
359
        }
360
}
361

    
362
int8_t janus_instance_create_streaming_point(struct janus_instance const * janus, uint64_t *mp_id, uint16_t audio_port, uint16_t video_port, struct streamer_creation_callback *scc)
363
{
364
        struct mg_connection * conn;
365
        int8_t res = -1;
366
        char * uri;
367
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"create\",\"type\":\"rtp\",\
368
                                  \"audio\":true,\"audioport\":%"PRId16",\"audiopt\":111,\"audiortpmap\":\"opus/48000/2\",\
369
                                  \"video\":true,\"videoport\":%"PRId16",\"videopt\": 100,\"videortpmap\":\"VP8/90000\"}}";
370
        char buff[280];
371
        void ** data;
372

    
373
        if (janus && mp_id && audio_port && video_port)
374
        {
375
                uri = janus_instance_handle_path(janus);
376
                if (uri)
377
                {
378
                        sprintf(buff, fmt, audio_port, video_port);
379
                   debug("Conctating Janus to create a new mountpoint\n");        
380
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_streaming_point_handler, uri, NULL, buff);
381
                        if (conn)
382
                        {
383
                                data = malloc(sizeof(void *) * 2);
384
                                data[0] = mp_id;
385
                                data[1] = scc;
386
                                conn->user_data = data;
387
                                res = 0;
388
                        } else
389
                           debug("Aaargh, no connection!\n");        
390
                        free(uri);
391
                }
392
        }
393
        return res;
394
}
395

    
396
int8_t janus_instance_destroy_streaming_point(struct janus_instance const * janus, uint64_t mp_id)
397
{
398
        struct mg_connection * conn;
399
        int8_t res = -1;
400
        char * uri;
401
        char * fmt = "{\"transaction\":\"random_str\",\"janus\":\"message\",\"body\":{\"request\":\"destroy\",\"id\": %"PRId64"}}";
402
        char buff[120];
403

    
404
        if (janus && mp_id)
405
        {
406
                uri = janus_instance_handle_path(janus);
407
                if (uri)
408
                {
409
                        sprintf(buff, fmt, mp_id);
410
                        conn = mg_connect_http(janus->mongoose_srv, janus_instance_generic_handler, uri, NULL, buff);
411
                        if (conn)
412
                        {
413
                                conn->user_data = (void *) mp_id;
414
                                res = 0;
415
                        } 
416
                        free(uri);
417
                }
418
        }
419
        return res;
420
}