Statistics
| Branch: | Revision:

peerstreamer-src / src / pstreamer.c @ 3af4c8d7

History | View | Annotate | Download (11.9 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<pstreamer.h>
21
#include<time.h>
22
#include<stdlib.h>
23
#include<string.h>
24
#include<stdio.h>
25
#include<ord_set.h>
26
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
29
#include<debug.h>
30

    
31
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32
#define STREAMER_PEER_CONF "port=%d,dechunkiser=rtp,base=%d,addr=%s"
33
#define STREAMER_SOURCE_CONF "port=%d,chunkiser=rtp,base=%d,addr=%s,max_delay_ms=50"
34

    
35
struct pstreamer {
36
        char source_ip[MAX_IPADDR_LENGTH];
37
        uint16_t source_port;
38
        char id[PSID_LENGTH];  // identifier for the streamer instance
39
        struct psinstance * psc;
40
        time_t last_beat;
41
        uint16_t base_port;
42
        struct periodic_task * topology_task;
43
        struct periodic_task * offer_task;
44
        struct periodic_task * inject_task;
45
        struct periodic_task * msg_task;
46
        struct task_manager * tm;
47
        timeout topology_interval;
48
        uint64_t janus_streaming_id;
49
};
50

    
51
struct pstreamer_manager {
52
        struct ord_set * streamers;
53
        uint16_t initial_streaming_port;
54
        char * streamer_opts;
55
        struct janus_instance const * janus;
56
};
57

    
58
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_ip, const char * fmt, const char * opts)
59
/* we assume source_ip and source_port are valid strings */
60
{
61
        char config[MAX_PSINSTANCE_CONFIG_LENGTH];
62
        int count;
63

    
64
        count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_ip);
65
        if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
66
                snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
67
        ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
68

    
69
        ps->topology_task = NULL;
70
        ps->offer_task = NULL;
71
        ps->inject_task = NULL;
72
        ps->msg_task = NULL;
73
        ps->tm = NULL;
74
        ps->topology_interval = 400;
75
        ps->janus_streaming_id = 0;
76

    
77
        if (ps->psc)
78
                return 0;
79
        else 
80
                return -1;
81

    
82
        return 0;
83
}
84

    
85
int8_t pstreamer_deinit(struct pstreamer * ps)
86
{
87
        if (ps->topology_task)
88
                task_manager_destroy_task(ps->tm, &(ps->topology_task));
89
        if (ps->offer_task)
90
                task_manager_destroy_task(ps->tm, &(ps->offer_task));
91
        if (ps->inject_task)
92
                task_manager_destroy_task(ps->tm, &(ps->inject_task));
93
        if (ps->msg_task)
94
                task_manager_destroy_task(ps->tm, &(ps->msg_task));
95
        psinstance_destroy(&(ps->psc));
96
        return 0;
97
}
98

    
99
int8_t pstreamer_cmp(const void * v1, const void * v2)
100
{
101
        const struct pstreamer *ps1, *ps2;
102

    
103
        ps1 = (struct pstreamer *) v1;
104
        ps2 = (struct pstreamer *) v2;
105

    
106
        if (ps1 && ps2)
107
        {
108
                return strcmp(ps1->id, ps2->id);
109
        }
110
        return 0;
111
}
112

    
113
char * pstreamer_to_json(const struct pstreamer * ps)
114
{
115
        char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\",\"janus_streaming_id\":\"%"PRId64"\"}";
116
        size_t reslength;
117
        char * res = NULL;
118

    
119
        if (ps)
120
        {
121
                reslength = strlen(fmt) - 2*3 + PSID_LENGTH + MAX_IPADDR_LENGTH + MAX_PORT_LENGTH + 1;
122
                res = malloc(reslength * sizeof(char));
123
                sprintf(res, fmt, ps->id, ps->source_ip, ps->source_port, ps->janus_streaming_id);
124
        }
125
        return res;
126
}
127

    
128
struct pstreamer_manager * pstreamer_manager_new(uint16_t starting_port, const struct janus_instance *janus)
129
{
130
        struct pstreamer_manager * psm = NULL;
131

    
132
        psm = malloc(sizeof(struct pstreamer_manager));
133
        psm->streamers = ord_set_new(1, pstreamer_cmp);
134
        /* VLC players assume RTP port numbers are even numbers, we allocate RTP ports starting from starting_port + 1
135
         * (as the first one is the pstreamer port). So starting_port must be an odd number */
136
        psm->initial_streaming_port = (starting_port % 2 == 1) ? starting_port : starting_port + 1;
137
        psm->streamer_opts = NULL;
138
        psm->janus = janus;
139

    
140
        return psm;
141
}
142

    
143
void pstreamer_manager_remove_orphans(struct pstreamer_manager * psm, time_t interval)
144
{
145
        struct ord_set * orphans;
146
        struct pstreamer * ps;
147
        const void * iter;
148
        time_t now;
149

    
150
        now = time(NULL);
151
        orphans = ord_set_new(10, ord_set_dummy_cmp);
152
        ord_set_for_each(iter, psm->streamers)
153
        {
154
                ps = (struct pstreamer *) iter;
155
                if (now - ps->last_beat > interval)
156
                        ord_set_insert(orphans, (void *) iter, 0);
157
        }
158
        ord_set_for_each(iter, orphans)
159
        {
160
                ps = (struct pstreamer *) iter;
161
                debug("Destroying inactive pstreamer instance %s\n", ps->id);
162
                pstreamer_manager_destroy_streamer(psm, ps);
163
        }
164
        ord_set_destroy(&orphans, 0);
165
}
166

    
167
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
168
{
169
        const void * ps = NULL;
170
        
171
        if (psm && *psm)
172
        {
173
                ord_set_for_each(ps, (*psm)->streamers)
174
                {
175
                        if ((*psm)->janus)
176
                                if (pstreamer_is_source(ps))
177
                                        janus_instance_destroy_videoroom((*psm)->janus, ((struct pstreamer*)ps)->id);
178
                                else
179
                                        janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
180
                        pstreamer_deinit((struct pstreamer *)ps);
181
                }
182

    
183
                ord_set_destroy(&((*psm)->streamers), 1);
184
                if((*psm)->streamer_opts)
185
                {
186
                        free((*psm)->streamer_opts);
187
                        (*psm)->streamer_opts = NULL;
188
                }
189
                free(*psm);
190
                *psm = NULL;
191
        }
192
}
193

    
194
void pstreamer_touch(struct pstreamer *ps)
195
{
196
        ps->last_beat = time(NULL);
197
}
198

    
199
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
200
{
201
        if (ps && tm && ps->psc)
202
        {
203
                ps->tm = tm;
204
                ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
205
                ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
206
                ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
207
                if (pstreamer_is_source(ps))
208
                        ps->inject_task = task_manager_new_task(tm, NULL, pstreamer_inject_task_callback, 25, ps->psc); 
209
        }
210
        return 0;
211
}
212

    
213
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
214
{
215
        const struct pstreamer * ps = NULL;
216
        const void * iter = NULL;
217
        uint16_t base_port;
218
        uint8_t found = 1;
219

    
220
        base_port = psm->initial_streaming_port;
221
        while (found==1)
222
        {
223
                found = 0;
224
                ord_set_for_each(iter, psm->streamers)
225
                {
226
                        ps = (const struct pstreamer *) iter;
227
                        if (ps->base_port == base_port)
228
                                found = 1;
229
                }
230
                if (found)
231
                        base_port += 6;  /* we allocate 5 port numbers per each streamer; the first is the pstreamer port, the following 4 are the RTP ports (two
232
                                            for audio and two for video). We add the sixth one for compatibility with the VLC players which expect the RTP base
233
                                            port numbers to be even (base_port is kept odd) */
234
        }
235
        return base_port;
236
}
237

    
238
const struct pstreamer * pstreamer_manager_get_streamer(const struct pstreamer_manager *psm, const char * id)
239
{
240
        struct pstreamer * ps = NULL;
241
        const void * ptr = NULL;
242

    
243
        ps = malloc(sizeof(struct pstreamer));
244
        strncpy(ps->id, id, PSID_LENGTH);
245
        ptr = ord_set_find(psm->streamers, (const void *) ps);
246

    
247
        free(ps);
248
        return (const struct pstreamer*) ptr;
249
}
250

    
251
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id, const char * rtp_dst_ip, struct streamer_creation_callback * scc)
252
{
253
        struct pstreamer * ps = NULL;
254
        const void * ptr = NULL;
255

    
256
        if (psm && source_ip && source_port && id && rtp_dst_ip)
257
        {
258
                ps = malloc(sizeof(struct pstreamer));
259
                strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
260
                ps->source_port = atoi(source_port);
261
                strncpy(ps->id, id, PSID_LENGTH);
262
                ps->base_port = assign_streaming_ports(psm); 
263
                ptr = ord_set_find(psm->streamers, (const void *) ps);
264
                if (ptr == NULL)
265
                {
266
                        pstreamer_touch(ps);
267
                        streamer_creation_set_pstreamer_ref(scc, ps);
268
                        if (psm->janus)
269
                        {
270
                                debug("calling janus upon notification of perstreamer creation\n");
271
                                janus_instance_create_streaming_point(psm->janus, &(ps->janus_streaming_id), ps->base_port+1, ps->base_port+3, scc);
272
                        }
273
                        else
274
                                if (scc)
275
                                        streamer_creation_callback_trigger(scc, 0);
276
                        pstreamer_init(ps, rtp_dst_ip, STREAMER_PEER_CONF, psm->streamer_opts);
277
                        ord_set_insert(psm->streamers, ps, 0);
278
                } else
279
                {
280
                        free(ps);
281
                        ps = NULL;
282
                }
283
        }
284

    
285
        return ps;
286
}
287

    
288
uint8_t pstreamer_manager_destroy_streamer(struct pstreamer_manager *psm, const struct pstreamer *ps)
289
{
290
        uint8_t res = 1;
291

    
292
        if (psm && ps)
293
        {
294
                if(psm->janus)
295
                        janus_instance_destroy_streaming_point(psm->janus, ps->janus_streaming_id);
296
                pstreamer_deinit((struct pstreamer *)ps);
297
                res = ord_set_remove(psm->streamers, ps, 1);
298
        }
299

    
300
        return res;
301
}
302

    
303
const char * pstreamer_id(const struct pstreamer * ps)
304
{
305
        if (ps)
306
                return ps->id;
307
        return NULL;
308
}
309

    
310
uint16_t pstreamer_base_port(const struct pstreamer * ps)
311
{
312
        if (ps)
313
                return ps->base_port;
314
        return 0;
315
}
316

    
317
int8_t pstreamer_manager_set_streamer_options(struct pstreamer_manager *psm, const char * opts)
318
{
319
        int8_t res = -1;
320
        
321
        if (psm && opts)
322
        {
323
                if(psm->streamer_opts)
324
                {
325
                        free(psm->streamer_opts);
326
                        psm->streamer_opts = NULL;
327
                }
328
                psm->streamer_opts = strdup(opts);
329
                res = 0;
330
        }
331

    
332
        return res;
333
}
334

    
335
const char * pstreamer_source_ipaddr(const struct pstreamer *ps)
336
{
337
        if (ps)
338
                return ps->source_ip;
339
        return NULL;
340
}
341

    
342
const char * pstreamer_source_port(const struct pstreamer *ps)
343
{
344
        static char buff[6];
345

    
346
        if (ps)
347
        {
348
                sprintf(buff, "%"PRId16"", ps->source_port);
349
                return buff;
350
        }
351
        return NULL;
352
}
353

    
354
uint8_t pstreamer_is_source(const struct pstreamer * ps)
355
{
356
        return ps && (ps->source_port == 0) ? 1 : 0;
357
}
358

    
359
char * pstreamer_manager_sources_to_json(const struct pstreamer_manager *psm)
360
{
361
        char * res = NULL, * ps_json;
362
        uint32_t pos;
363
        const void * iter;
364
        const struct pstreamer * ps;
365

    
366
        if (psm)
367
        {
368
                res = malloc(sizeof(char)*3);
369
                res[0] = '[';
370
                pos = 1;
371

    
372
                ord_set_for_each(iter, psm->streamers)
373
                {
374
                        ps = (const struct pstreamer *) iter;
375
                        if (pstreamer_is_source(ps))
376
                        {
377
                                ps_json = pstreamer_to_json(ps);
378
                                if (ps_json)
379
                                {
380
                                        res = realloc(res, sizeof(char)*(pos+strlen(ps_json) + (pos == 1? 2 : 3)));
381
                                        if (pos > 1)
382
                                                res[pos++] = ',';
383
                                        strcpy(res+pos, ps_json);
384
                                        pos += strlen(ps_json);
385
                                        free(ps_json);
386
                                }
387
                        }
388
                }
389
                res[pos++] = ']';
390
                res[pos] = '\0';
391
        }
392
        return res;
393
}
394

    
395
const struct pstreamer * pstreamer_manager_create_source_streamer(struct pstreamer_manager * psm, const char * id, const char * rtp_src_ip, struct streamer_creation_callback * scc)
396
{
397
        struct pstreamer * ps = NULL;
398
        const void * ptr = NULL;
399

    
400
        if (psm && id && rtp_src_ip)
401
        {
402
                ps = malloc(sizeof(struct pstreamer));
403
                strncpy(ps->source_ip, rtp_src_ip, MAX_IPADDR_LENGTH);
404
                ps->source_port = 0;
405
                strncpy(ps->id, id, PSID_LENGTH);
406
                ps->base_port = assign_streaming_ports(psm); 
407
                ptr = ord_set_find(psm->streamers, (const void *) ps);
408
                if (ptr == NULL)
409
                {
410
                        pstreamer_touch(ps);
411
                        streamer_creation_set_pstreamer_ref(scc, ps);
412
                        if (psm->janus)
413
                        {
414
                                debug("calling janus upon notification of perstreamer source creation\n");
415
                                janus_instance_create_videoroom(psm->janus, ps->id, scc);
416
                        }
417
                        else
418
                                if (scc)
419
                                        streamer_creation_callback_trigger(scc, 0);
420
                        pstreamer_init(ps, ps->source_ip, STREAMER_SOURCE_CONF, psm->streamer_opts);
421
                        ord_set_insert(psm->streamers, ps, 0);
422
                } else
423
                {
424
                        free(ps);
425
                        ps = NULL;
426
                }
427
        }
428

    
429
        return ps;
430
}
431

    
432
void pstreamer_source_touch(const struct pstreamer_manager *psm, struct pstreamer *ps, uint64_t janus_id)
433
{
434
        if (psm && ps)
435
        {
436
                pstreamer_touch(ps);
437
                if (psm->janus && pstreamer_is_source(ps))
438
                {
439
                        janus_instance_forward_rtp(psm->janus, ps->id, janus_id, ps->source_ip, ps->base_port+1, ps->base_port+3);
440
                        ps->janus_streaming_id = janus_id;
441
                }
442
        }
443
}