Statistics
| Branch: | Revision:

peerstreamer-src / src / pstreamer.c @ 58fb2cdc

History | View | Annotate | Download (11.9 KB)

1 8f5b2a1b Luca Baldesi
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19
20
#include<pstreamer.h>
21
#include<time.h>
22
#include<stdlib.h>
23
#include<string.h>
24
#include<stdio.h>
25
#include<ord_set.h>
26 b87dc7a2 Luca Baldesi
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
29 4d6f8fd5 Luca Baldesi
#include<debug.h>
30 8f5b2a1b Luca Baldesi
31 1ae420f6 Luca Baldesi
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32 3af4c8d7 Luca Baldesi
#define STREAMER_PEER_CONF "port=%d,dechunkiser=rtp,base=%d,addr=%s"
33
#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,base=%d,addr=%s,max_delay_ms=50"
34 1ae420f6 Luca Baldesi
35 8f5b2a1b Luca Baldesi
struct pstreamer {
36
        char source_ip[MAX_IPADDR_LENGTH];
37 b87dc7a2 Luca Baldesi
        uint16_t source_port;
38 8f5b2a1b Luca Baldesi
        char id[PSID_LENGTH];  // identifier for the streamer instance
39 b87dc7a2 Luca Baldesi
        struct psinstance * psc;
40 8f5b2a1b Luca Baldesi
        time_t last_beat;
41 c039490c Luca Baldesi
        uint16_t base_port;
42 b87dc7a2 Luca Baldesi
        struct periodic_task * topology_task;
43
        struct periodic_task * offer_task;
44 3af4c8d7 Luca Baldesi
        struct periodic_task * inject_task;
45 b87dc7a2 Luca Baldesi
        struct periodic_task * msg_task;
46
        struct task_manager * tm;
47
        timeout topology_interval;
48 c81c126e Luca Baldesi
        uint64_t janus_streaming_id;
49 8f5b2a1b Luca Baldesi
};
50
51
struct pstreamer_manager {
52
        struct ord_set * streamers;
53 b87dc7a2 Luca Baldesi
        uint16_t initial_streaming_port;
54 1ae420f6 Luca Baldesi
        char * streamer_opts;
55 c81c126e Luca Baldesi
        struct janus_instance const * janus;
56 8f5b2a1b Luca Baldesi
};
57
58 3af4c8d7 Luca Baldesi
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_ip, const char * fmt, const char * opts)
59 b87dc7a2 Luca Baldesi
/* we assume source_ip and source_port are valid strings */
60
{
61 1ae420f6 Luca Baldesi
        char config[MAX_PSINSTANCE_CONFIG_LENGTH];
62
        int count;
63 b87dc7a2 Luca Baldesi
64 3af4c8d7 Luca Baldesi
        count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_ip);
65 1ae420f6 Luca Baldesi
        if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
66
                snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
67 b87dc7a2 Luca Baldesi
        ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
68
69 7bbe95e4 Luca Baldesi
        ps->topology_task = NULL;
70
        ps->offer_task = NULL;
71 3af4c8d7 Luca Baldesi
        ps->inject_task = NULL;
72 7bbe95e4 Luca Baldesi
        ps->msg_task = NULL;
73
        ps->tm = NULL;
74
        ps->topology_interval = 400;
75 c81c126e Luca Baldesi
        ps->janus_streaming_id = 0;
76 7bbe95e4 Luca Baldesi
77 b87dc7a2 Luca Baldesi
        if (ps->psc)
78 7bbe95e4 Luca Baldesi
                return 0;
79
        else 
80
                return -1;
81 b87dc7a2 Luca Baldesi
82
        return 0;
83
}
84
85
int8_t pstreamer_deinit(struct pstreamer * ps)
86
{
87 4d6f8fd5 Luca Baldesi
        if (ps->topology_task)
88
                task_manager_destroy_task(ps->tm, &(ps->topology_task));
89
        if (ps->offer_task)
90
                task_manager_destroy_task(ps->tm, &(ps->offer_task));
91 3af4c8d7 Luca Baldesi
        if (ps->inject_task)
92
                task_manager_destroy_task(ps->tm, &(ps->inject_task));
93 4d6f8fd5 Luca Baldesi
        if (ps->msg_task)
94
                task_manager_destroy_task(ps->tm, &(ps->msg_task));
95 b87dc7a2 Luca Baldesi
        psinstance_destroy(&(ps->psc));
96
        return 0;
97
}
98
99 8f5b2a1b Luca Baldesi
int8_t pstreamer_cmp(const void * v1, const void * v2)
100
{
101
        const struct pstreamer *ps1, *ps2;
102
103
        ps1 = (struct pstreamer *) v1;
104
        ps2 = (struct pstreamer *) v2;
105
106
        if (ps1 && ps2)
107
        {
108
                return strcmp(ps1->id, ps2->id);
109
        }
110
        return 0;
111
}
112
113
char * pstreamer_to_json(const struct pstreamer * ps)
114
{
115 c81c126e Luca Baldesi
        char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\",\"janus_streaming_id\":\"%"PRId64"\"}";
116 8f5b2a1b Luca Baldesi
        size_t reslength;
117
        char * res = NULL;
118
119
        if (ps)
120
        {
121
                reslength = strlen(fmt) - 2*3 + PSID_LENGTH + MAX_IPADDR_LENGTH + MAX_PORT_LENGTH + 1;
122
                res = malloc(reslength * sizeof(char));
123 c81c126e Luca Baldesi
                sprintf(res, fmt, ps->id, ps->source_ip, ps->source_port, ps->janus_streaming_id);
124 8f5b2a1b Luca Baldesi
        }
125
        return res;
126
}
127
128 c81c126e Luca Baldesi
struct pstreamer_manager * pstreamer_manager_new(uint16_t starting_port, const struct janus_instance *janus)
129 8f5b2a1b Luca Baldesi
{
130
        struct pstreamer_manager * psm = NULL;
131
132
        psm = malloc(sizeof(struct pstreamer_manager));
133
        psm->streamers = ord_set_new(1, pstreamer_cmp);
134 23a5e2b2 Luca Baldesi
        /* VLC players assume RTP port numbers are even numbers, we allocate RTP ports starting from starting_port + 1
135
         * (as the first one is the pstreamer port). So starting_port must be an odd number */
136
        psm->initial_streaming_port = (starting_port % 2 == 1) ? starting_port : starting_port + 1;
137 1ae420f6 Luca Baldesi
        psm->streamer_opts = NULL;
138 c81c126e Luca Baldesi
        psm->janus = janus;
139 8f5b2a1b Luca Baldesi
140
        return psm;
141
}
142
143 4d6f8fd5 Luca Baldesi
void pstreamer_manager_remove_orphans(struct pstreamer_manager * psm, time_t interval)
144
{
145
        struct ord_set * orphans;
146
        struct pstreamer * ps;
147
        const void * iter;
148
        time_t now;
149
150
        now = time(NULL);
151
        orphans = ord_set_new(10, ord_set_dummy_cmp);
152
        ord_set_for_each(iter, psm->streamers)
153
        {
154
                ps = (struct pstreamer *) iter;
155
                if (now - ps->last_beat > interval)
156
                        ord_set_insert(orphans, (void *) iter, 0);
157
        }
158
        ord_set_for_each(iter, orphans)
159
        {
160
                ps = (struct pstreamer *) iter;
161
                debug("Destroying inactive pstreamer instance %s\n", ps->id);
162
                pstreamer_manager_destroy_streamer(psm, ps);
163
        }
164
        ord_set_destroy(&orphans, 0);
165
}
166
167 8f5b2a1b Luca Baldesi
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
168
{
169 b87dc7a2 Luca Baldesi
        const void * ps = NULL;
170
        
171 8f5b2a1b Luca Baldesi
        if (psm && *psm)
172
        {
173 b87dc7a2 Luca Baldesi
                ord_set_for_each(ps, (*psm)->streamers)
174 c81c126e Luca Baldesi
                {
175
                        if ((*psm)->janus)
176 3af4c8d7 Luca Baldesi
                                if (pstreamer_is_source(ps))
177
                                        janus_instance_destroy_videoroom((*psm)->janus, ((struct pstreamer*)ps)->id);
178
                                else
179
                                        janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
180 b87dc7a2 Luca Baldesi
                        pstreamer_deinit((struct pstreamer *)ps);
181 c81c126e Luca Baldesi
                }
182 b87dc7a2 Luca Baldesi
183 8f5b2a1b Luca Baldesi
                ord_set_destroy(&((*psm)->streamers), 1);
184 1ae420f6 Luca Baldesi
                if((*psm)->streamer_opts)
185
                {
186
                        free((*psm)->streamer_opts);
187
                        (*psm)->streamer_opts = NULL;
188
                }
189 8f5b2a1b Luca Baldesi
                free(*psm);
190
                *psm = NULL;
191
        }
192
}
193
194
void pstreamer_touch(struct pstreamer *ps)
195
{
196
        ps->last_beat = time(NULL);
197
}
198
199 b87dc7a2 Luca Baldesi
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
200
{
201
        if (ps && tm && ps->psc)
202
        {
203
                ps->tm = tm;
204 c2bc5145 Luca Baldesi
                ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
205 b87dc7a2 Luca Baldesi
                ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
206
                ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
207 3af4c8d7 Luca Baldesi
                if (pstreamer_is_source(ps))
208
                        ps->inject_task = task_manager_new_task(tm, NULL, pstreamer_inject_task_callback, 25, ps->psc); 
209 b87dc7a2 Luca Baldesi
        }
210
        return 0;
211
}
212
213
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
214
{
215
        const struct pstreamer * ps = NULL;
216
        const void * iter = NULL;
217
        uint16_t base_port;
218
        uint8_t found = 1;
219
220
        base_port = psm->initial_streaming_port;
221
        while (found==1)
222
        {
223
                found = 0;
224
                ord_set_for_each(iter, psm->streamers)
225
                {
226
                        ps = (const struct pstreamer *) iter;
227
                        if (ps->base_port == base_port)
228
                                found = 1;
229
                }
230
                if (found)
231 23a5e2b2 Luca Baldesi
                        base_port += 6;  /* we allocate 5 port numbers per each streamer; the first is the pstreamer port, the following 4 are the RTP ports (two
232
                                            for audio and two for video). We add the sixth one for compatibility with the VLC players which expect the RTP base
233
                                            port numbers to be even (base_port is kept odd) */
234 b87dc7a2 Luca Baldesi
        }
235
        return base_port;
236
}
237
238 4d6f8fd5 Luca Baldesi
const struct pstreamer * pstreamer_manager_get_streamer(const struct pstreamer_manager *psm, const char * id)
239
{
240
        struct pstreamer * ps = NULL;
241
        const void * ptr = NULL;
242
243
        ps = malloc(sizeof(struct pstreamer));
244
        strncpy(ps->id, id, PSID_LENGTH);
245
        ptr = ord_set_find(psm->streamers, (const void *) ps);
246
247
        free(ps);
248
        return (const struct pstreamer*) ptr;
249
}
250
251 c81c126e Luca Baldesi
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id, const char * rtp_dst_ip, struct streamer_creation_callback * scc)
252 8f5b2a1b Luca Baldesi
{
253
        struct pstreamer * ps = NULL;
254
        const void * ptr = NULL;
255
256 91e9f1a7 Luca Baldesi
        if (psm && source_ip && source_port && id && rtp_dst_ip)
257 8f5b2a1b Luca Baldesi
        {
258
                ps = malloc(sizeof(struct pstreamer));
259
                strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
260 b87dc7a2 Luca Baldesi
                ps->source_port = atoi(source_port);
261 8f5b2a1b Luca Baldesi
                strncpy(ps->id, id, PSID_LENGTH);
262 b87dc7a2 Luca Baldesi
                ps->base_port = assign_streaming_ports(psm); 
263 8f5b2a1b Luca Baldesi
                ptr = ord_set_find(psm->streamers, (const void *) ps);
264
                if (ptr == NULL)
265
                {
266
                        pstreamer_touch(ps);
267 c81c126e Luca Baldesi
                        streamer_creation_set_pstreamer_ref(scc, ps);
268
                        if (psm->janus)
269
                        {
270
                                debug("calling janus upon notification of perstreamer creation\n");
271
                                janus_instance_create_streaming_point(psm->janus, &(ps->janus_streaming_id), ps->base_port+1, ps->base_port+3, scc);
272
                        }
273
                        else
274
                                if (scc)
275
                                        streamer_creation_callback_trigger(scc, 0);
276 3af4c8d7 Luca Baldesi
                        pstreamer_init(ps, rtp_dst_ip, STREAMER_PEER_CONF, psm->streamer_opts);
277 8f5b2a1b Luca Baldesi
                        ord_set_insert(psm->streamers, ps, 0);
278
                } else
279
                {
280
                        free(ps);
281
                        ps = NULL;
282
                }
283
        }
284
285
        return ps;
286
}
287
288
uint8_t pstreamer_manager_destroy_streamer(struct pstreamer_manager *psm, const struct pstreamer *ps)
289
{
290
        uint8_t res = 1;
291
292
        if (psm && ps)
293 b87dc7a2 Luca Baldesi
        {
294 c81c126e Luca Baldesi
                if(psm->janus)
295
                        janus_instance_destroy_streaming_point(psm->janus, ps->janus_streaming_id);
296 b87dc7a2 Luca Baldesi
                pstreamer_deinit((struct pstreamer *)ps);
297 8f5b2a1b Luca Baldesi
                res = ord_set_remove(psm->streamers, ps, 1);
298 b87dc7a2 Luca Baldesi
        }
299 8f5b2a1b Luca Baldesi
300
        return res;
301
}
302 c039490c Luca Baldesi
303
const char * pstreamer_id(const struct pstreamer * ps)
304
{
305
        if (ps)
306
                return ps->id;
307
        return NULL;
308
}
309
310
uint16_t pstreamer_base_port(const struct pstreamer * ps)
311
{
312
        if (ps)
313
                return ps->base_port;
314
        return 0;
315
}
316 1ae420f6 Luca Baldesi
317
int8_t pstreamer_manager_set_streamer_options(struct pstreamer_manager *psm, const char * opts)
318
{
319
        int8_t res = -1;
320
        
321
        if (psm && opts)
322
        {
323
                if(psm->streamer_opts)
324
                {
325
                        free(psm->streamer_opts);
326
                        psm->streamer_opts = NULL;
327
                }
328
                psm->streamer_opts = strdup(opts);
329
                res = 0;
330
        }
331
332
        return res;
333
}
334 92a361ca Luca Baldesi
335
const char * pstreamer_source_ipaddr(const struct pstreamer *ps)
336
{
337
        if (ps)
338
                return ps->source_ip;
339
        return NULL;
340
}
341
342
const char * pstreamer_source_port(const struct pstreamer *ps)
343
{
344
        static char buff[6];
345
346
        if (ps)
347
        {
348
                sprintf(buff, "%"PRId16"", ps->source_port);
349
                return buff;
350
        }
351
        return NULL;
352
}
353 3af4c8d7 Luca Baldesi
354
uint8_t pstreamer_is_source(const struct pstreamer * ps)
355
{
356
        return ps && (ps->source_port == 0) ? 1 : 0;
357
}
358
359
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm)
360
{
361
        char * res = NULL, * ps_json;
362
        uint32_t pos;
363
        const void * iter;
364
        const struct pstreamer * ps;
365
366
        if (psm)
367
        {
368
                res = malloc(sizeof(char)*3);
369
                res[0] = '[';
370
                pos = 1;
371
372
                ord_set_for_each(iter, psm->streamers)
373
                {
374
                        ps = (const struct pstreamer *) iter;
375
                        if (pstreamer_is_source(ps))
376
                        {
377
                                ps_json = pstreamer_to_json(ps);
378
                                if (ps_json)
379
                                {
380
                                        res = realloc(res, sizeof(char)*(pos+strlen(ps_json) + (pos == 1? 2 : 3)));
381
                                        if (pos > 1)
382
                                                res[pos++] = ',';
383
                                        strcpy(res+pos, ps_json);
384
                                        pos += strlen(ps_json);
385
                                        free(ps_json);
386
                                }
387
                        }
388
                }
389
                res[pos++] = ']';
390
                res[pos] = '\0';
391
        }
392
        return res;
393
}
394
395
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc)
396
{
397
        struct pstreamer * ps = NULL;
398
        const void * ptr = NULL;
399
400
        if (psm && id && rtp_src_ip)
401
        {
402
                ps = malloc(sizeof(struct pstreamer));
403
                strncpy(ps->source_ip, rtp_src_ip, MAX_IPADDR_LENGTH);
404
                ps->source_port = 0;
405
                strncpy(ps->id, id, PSID_LENGTH);
406
                ps->base_port = assign_streaming_ports(psm); 
407
                ptr = ord_set_find(psm->streamers, (const void *) ps);
408
                if (ptr == NULL)
409
                {
410
                        pstreamer_touch(ps);
411
                        streamer_creation_set_pstreamer_ref(scc, ps);
412
                        if (psm->janus)
413
                        {
414
                                debug("calling janus upon notification of perstreamer source creation\n");
415
                                janus_instance_create_videoroom(psm->janus, ps->id, scc);
416
                        }
417
                        else
418
                                if (scc)
419
                                        streamer_creation_callback_trigger(scc, 0);
420
                        pstreamer_init(ps, ps->source_ip, STREAMER_SOURCE_CONF, psm->streamer_opts);
421
                        ord_set_insert(psm->streamers, ps, 0);
422
                } else
423
                {
424
                        free(ps);
425
                        ps = NULL;
426
                }
427
        }
428
429
        return ps;
430
}
431
432
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id)
433
{
434
        if (psm && ps)
435
        {
436
                pstreamer_touch(ps);
437
                if (psm->janus && pstreamer_is_source(ps))
438
                {
439
                        janus_instance_forward_rtp(psm->janus, ps->id, janus_id, ps->source_ip, ps->base_port+1, ps->base_port+3);
440
                        ps->janus_streaming_id = janus_id;
441
                }
442
        }
443
}