Statistics
| Branch: | Revision:

peerstreamer-src / src / pstreamer.c @ ea2bce5b

History | View | Annotate | Download (12 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<pstreamer.h>
21
#include<time.h>
22
#include<stdlib.h>
23
#include<string.h>
24
#include<stdio.h>
25
#include<ord_set.h>
26
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
29
#include<debug.h>
30

    
31
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32
#define STREAMER_PEER_CONF "port=%d,dechunkiser=rtp,audio=%d,video=%d,addr=%s"
33
#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,audio=%d,video=%d,addr=%s,max_delay_ms=5,chunk_size=20"
34
//#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,base=%d,addr=%s,max_delay_ms=5,chunk_size=200,rfc3551=1"
35

    
36
struct pstreamer {
37
        char source_ip[MAX_IPADDR_LENGTH];
38
        uint16_t source_port;
39
        char id[PSID_LENGTH];  // identifier for the streamer instance
40
        struct psinstance * psc;
41
        time_t last_beat;
42
        uint16_t base_port;
43
        struct periodic_task * topology_task;
44
        struct periodic_task * offer_task;
45
        struct periodic_task * inject_task;
46
        struct periodic_task * msg_task;
47
        struct task_manager * tm;
48
        timeout topology_interval;
49
        uint64_t janus_streaming_id;
50
};
51

    
52
struct pstreamer_manager {
53
        struct ord_set * streamers;
54
        uint16_t initial_streaming_port;
55
        char * streamer_opts;
56
        struct janus_instance const * janus;
57
};
58

    
59
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_ip, const char * fmt, const char * opts)
60
/* we assume source_ip and source_port are valid strings */
61
{
62
        char config[MAX_PSINSTANCE_CONFIG_LENGTH];
63
        int count;
64

    
65
        count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, ps->base_port+3, rtp_ip);
66
        if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
67
                snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
68
        ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
69

    
70
        ps->topology_task = NULL;
71
        ps->offer_task = NULL;
72
        ps->inject_task = NULL;
73
        ps->msg_task = NULL;
74
        ps->tm = NULL;
75
        ps->topology_interval = 400;
76
        ps->janus_streaming_id = 0;
77

    
78
        if (ps->psc)
79
                return 0;
80
        else 
81
                return -1;
82

    
83
        return 0;
84
}
85

    
86
int8_t pstreamer_deinit(struct pstreamer * ps)
87
{
88
        if (ps->topology_task)
89
                task_manager_destroy_task(ps->tm, &(ps->topology_task));
90
        if (ps->offer_task)
91
                task_manager_destroy_task(ps->tm, &(ps->offer_task));
92
        if (ps->inject_task)
93
                task_manager_destroy_task(ps->tm, &(ps->inject_task));
94
        if (ps->msg_task)
95
                task_manager_destroy_task(ps->tm, &(ps->msg_task));
96
        psinstance_destroy(&(ps->psc));
97
        return 0;
98
}
99

    
100
int8_t pstreamer_cmp(const void * v1, const void * v2)
101
{
102
        const struct pstreamer *ps1, *ps2;
103

    
104
        ps1 = (struct pstreamer *) v1;
105
        ps2 = (struct pstreamer *) v2;
106

    
107
        if (ps1 && ps2)
108
        {
109
                return strcmp(ps1->id, ps2->id);
110
        }
111
        return 0;
112
}
113

    
114
char * pstreamer_to_json(const struct pstreamer * ps)
115
{
116
        char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\",\"janus_streaming_id\":\"%"PRId64"\"}";
117
        size_t reslength;
118
        char * res = NULL;
119

    
120
        if (ps)
121
        {
122
                reslength = strlen(fmt) - 2*3 + PSID_LENGTH + MAX_IPADDR_LENGTH + MAX_PORT_LENGTH + 1;
123
                res = malloc(reslength * sizeof(char));
124
                sprintf(res, fmt, ps->id, ps->source_ip, ps->source_port, ps->janus_streaming_id);
125
        }
126
        return res;
127
}
128

    
129
struct pstreamer_manager * pstreamer_manager_new(uint16_t starting_port, const struct janus_instance *janus)
130
{
131
        struct pstreamer_manager * psm = NULL;
132

    
133
        psm = malloc(sizeof(struct pstreamer_manager));
134
        psm->streamers = ord_set_new(1, pstreamer_cmp);
135
        /* VLC players assume RTP port numbers are even numbers, we allocate RTP ports starting from starting_port + 1
136
         * (as the first one is the pstreamer port). So starting_port must be an odd number */
137
        psm->initial_streaming_port = (starting_port % 2 == 1) ? starting_port : starting_port + 1;
138
        psm->streamer_opts = NULL;
139
        psm->janus = janus;
140

    
141
        return psm;
142
}
143

    
144
void pstreamer_manager_remove_orphans(struct pstreamer_manager * psm, time_t interval)
145
{
146
        struct ord_set * orphans;
147
        struct pstreamer * ps;
148
        const void * iter;
149
        time_t now;
150

    
151
        now = time(NULL);
152
        orphans = ord_set_new(10, ord_set_dummy_cmp);
153
        ord_set_for_each(iter, psm->streamers)
154
        {
155
                ps = (struct pstreamer *) iter;
156
                if (now - ps->last_beat > interval)
157
                        ord_set_insert(orphans, (void *) iter, 0);
158
        }
159
        ord_set_for_each(iter, orphans)
160
        {
161
                ps = (struct pstreamer *) iter;
162
                debug("Destroying inactive pstreamer instance %s\n", ps->id);
163
                pstreamer_manager_destroy_streamer(psm, ps);
164
        }
165
        ord_set_destroy(&orphans, 0);
166
}
167

    
168
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
169
{
170
        const void * ps = NULL;
171
        
172
        if (psm && *psm)
173
        {
174
                ord_set_for_each(ps, (*psm)->streamers)
175
                {
176
                        if ((*psm)->janus)
177
                                if (pstreamer_is_source(ps))
178
                                        janus_instance_destroy_videoroom((*psm)->janus, ((struct pstreamer*)ps)->id);
179
                                else
180
                                        janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
181
                        pstreamer_deinit((struct pstreamer *)ps);
182
                }
183

    
184
                ord_set_destroy(&((*psm)->streamers), 1);
185
                if((*psm)->streamer_opts)
186
                {
187
                        free((*psm)->streamer_opts);
188
                        (*psm)->streamer_opts = NULL;
189
                }
190
                free(*psm);
191
                *psm = NULL;
192
        }
193
}
194

    
195
void pstreamer_touch(struct pstreamer *ps)
196
{
197
        ps->last_beat = time(NULL);
198
}
199

    
200
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
201
{
202
        if (ps && tm && ps->psc)
203
        {
204
                ps->tm = tm;
205
                ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
206
                ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
207
                ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
208
                if (pstreamer_is_source(ps))
209
                        ps->inject_task = task_manager_new_task(tm, NULL, pstreamer_inject_task_callback, 25, ps->psc); 
210
        }
211
        return 0;
212
}
213

    
214
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
215
{
216
        const struct pstreamer * ps = NULL;
217
        const void * iter = NULL;
218
        uint16_t base_port;
219
        uint8_t found = 1;
220

    
221
        base_port = psm->initial_streaming_port;
222
        while (found==1)
223
        {
224
                found = 0;
225
                ord_set_for_each(iter, psm->streamers)
226
                {
227
                        ps = (const struct pstreamer *) iter;
228
                        if (ps->base_port == base_port)
229
                                found = 1;
230
                }
231
                if (found)
232
                        base_port += 6;  /* we allocate 5 port numbers per each streamer; the first is the pstreamer port, the following 4 are the RTP ports (two
233
                                            for audio and two for video). We add the sixth one for compatibility with the VLC players which expect the RTP base
234
                                            port numbers to be even (base_port is kept odd) */
235
        }
236
        return base_port;
237
}
238

    
239
const struct pstreamer * pstreamer_manager_get_streamer(const struct pstreamer_manager *psm, const char * id)
240
{
241
        struct pstreamer * ps = NULL;
242
        const void * ptr = NULL;
243

    
244
        ps = malloc(sizeof(struct pstreamer));
245
        strncpy(ps->id, id, PSID_LENGTH);
246
        ptr = ord_set_find(psm->streamers, (const void *) ps);
247

    
248
        free(ps);
249
        return (const struct pstreamer*) ptr;
250
}
251

    
252
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id, const char * rtp_dst_ip, struct streamer_creation_callback * scc)
253
{
254
        struct pstreamer * ps = NULL;
255
        const void * ptr = NULL;
256

    
257
        if (psm && source_ip && source_port && id && rtp_dst_ip)
258
        {
259
                ps = malloc(sizeof(struct pstreamer));
260
                strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
261
                ps->source_port = atoi(source_port);
262
                strncpy(ps->id, id, PSID_LENGTH);
263
                ps->base_port = assign_streaming_ports(psm); 
264
                ptr = ord_set_find(psm->streamers, (const void *) ps);
265
                if (ptr == NULL)
266
                {
267
                        pstreamer_touch(ps);
268
                        streamer_creation_set_pstreamer_ref(scc, ps);
269
                        if (psm->janus)
270
                        {
271
                                debug("calling janus upon notification of perstreamer creation\n");
272
                                janus_instance_create_streaming_point(psm->janus, &(ps->janus_streaming_id), ps->base_port+1, ps->base_port+3, scc);
273
                        }
274
                        else
275
                                if (scc)
276
                                        streamer_creation_callback_trigger(scc, 0);
277
                        pstreamer_init(ps, rtp_dst_ip, STREAMER_PEER_CONF, psm->streamer_opts);
278
                        ord_set_insert(psm->streamers, ps, 0);
279
                } else
280
                {
281
                        free(ps);
282
                        ps = NULL;
283
                }
284
        }
285

    
286
        return ps;
287
}
288

    
289
uint8_t pstreamer_manager_destroy_streamer(struct pstreamer_manager *psm, const struct pstreamer *ps)
290
{
291
        uint8_t res = 1;
292

    
293
        if (psm && ps)
294
        {
295
                if(psm->janus)
296
                        janus_instance_destroy_streaming_point(psm->janus, ps->janus_streaming_id);
297
                pstreamer_deinit((struct pstreamer *)ps);
298
                res = ord_set_remove(psm->streamers, ps, 1);
299
        }
300

    
301
        return res;
302
}
303

    
304
const char * pstreamer_id(const struct pstreamer * ps)
305
{
306
        if (ps)
307
                return ps->id;
308
        return NULL;
309
}
310

    
311
uint16_t pstreamer_base_port(const struct pstreamer * ps)
312
{
313
        if (ps)
314
                return ps->base_port;
315
        return 0;
316
}
317

    
318
int8_t pstreamer_manager_set_streamer_options(struct pstreamer_manager *psm, const char * opts)
319
{
320
        int8_t res = -1;
321
        
322
        if (psm && opts)
323
        {
324
                if(psm->streamer_opts)
325
                {
326
                        free(psm->streamer_opts);
327
                        psm->streamer_opts = NULL;
328
                }
329
                psm->streamer_opts = strdup(opts);
330
                res = 0;
331
        }
332

    
333
        return res;
334
}
335

    
336
const char * pstreamer_source_ipaddr(const struct pstreamer *ps)
337
{
338
        if (ps)
339
                return ps->source_ip;
340
        return NULL;
341
}
342

    
343
const char * pstreamer_source_port(const struct pstreamer *ps)
344
{
345
        static char buff[6];
346

    
347
        if (ps)
348
        {
349
                sprintf(buff, "%"PRId16"", ps->source_port);
350
                return buff;
351
        }
352
        return NULL;
353
}
354

    
355
uint8_t pstreamer_is_source(const struct pstreamer * ps)
356
{
357
        return ps && (ps->source_port == 0) ? 1 : 0;
358
}
359

    
360
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm)
361
{
362
        char * res = NULL, * ps_json;
363
        uint32_t pos;
364
        const void * iter;
365
        const struct pstreamer * ps;
366

    
367
        if (psm)
368
        {
369
                res = malloc(sizeof(char)*3);
370
                res[0] = '[';
371
                pos = 1;
372

    
373
                ord_set_for_each(iter, psm->streamers)
374
                {
375
                        ps = (const struct pstreamer *) iter;
376
                        if (pstreamer_is_source(ps))
377
                        {
378
                                ps_json = pstreamer_to_json(ps);
379
                                if (ps_json)
380
                                {
381
                                        res = realloc(res, sizeof(char)*(pos+strlen(ps_json) + (pos == 1? 2 : 3)));
382
                                        if (pos > 1)
383
                                                res[pos++] = ',';
384
                                        strcpy(res+pos, ps_json);
385
                                        pos += strlen(ps_json);
386
                                        free(ps_json);
387
                                }
388
                        }
389
                }
390
                res[pos++] = ']';
391
                res[pos] = '\0';
392
        }
393
        return res;
394
}
395

    
396
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc)
397
{
398
        struct pstreamer * ps = NULL;
399
        const void * ptr = NULL;
400

    
401
        if (psm && id && rtp_src_ip)
402
        {
403
                ps = malloc(sizeof(struct pstreamer));
404
                strncpy(ps->source_ip, rtp_src_ip, MAX_IPADDR_LENGTH);
405
                ps->source_port = 0;
406
                strncpy(ps->id, id, PSID_LENGTH);
407
                ps->base_port = assign_streaming_ports(psm); 
408
                ptr = ord_set_find(psm->streamers, (const void *) ps);
409
                if (ptr == NULL)
410
                {
411
                        pstreamer_touch(ps);
412
                        streamer_creation_set_pstreamer_ref(scc, ps);
413
                        if (psm->janus)
414
                        {
415
                                debug("calling janus upon notification of perstreamer source creation\n");
416
                                janus_instance_create_videoroom(psm->janus, ps->id, scc);
417
                        }
418
                        else
419
                                if (scc)
420
                                        streamer_creation_callback_trigger(scc, 0);
421
                        pstreamer_init(ps, ps->source_ip, STREAMER_SOURCE_CONF, psm->streamer_opts);
422
                        ord_set_insert(psm->streamers, ps, 0);
423
                } else
424
                {
425
                        free(ps);
426
                        ps = NULL;
427
                }
428
        }
429

    
430
        return ps;
431
}
432

    
433
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id)
434
{
435
        if (psm && ps)
436
        {
437
                pstreamer_touch(ps);
438
                if (psm->janus && pstreamer_is_source(ps))
439
                {
440
                        janus_instance_forward_rtp(psm->janus, ps->id, janus_id, ps->source_ip, ps->base_port+1, ps->base_port+3);
441
                        ps->janus_streaming_id = janus_id;
442
                }
443
        }
444
}