Statistics
| Branch: | Revision:

peerstreamer-src / src / pstreamer.c @ 92a361ca

History | View | Annotate | Download (9.16 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<pstreamer.h>
21
#include<time.h>
22
#include<stdlib.h>
23
#include<string.h>
24
#include<stdio.h>
25
#include<ord_set.h>
26
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
29
#include<debug.h>
30

    
31
#define MAX_PSINSTANCE_CONFIG_LENGTH 255
32

    
33
struct pstreamer {
34
        char source_ip[MAX_IPADDR_LENGTH];
35
        uint16_t source_port;
36
        char id[PSID_LENGTH];  // identifier for the streamer instance
37
        struct psinstance * psc;
38
        time_t last_beat;
39
        uint16_t base_port;
40
        struct periodic_task * topology_task;
41
        struct periodic_task * offer_task;
42
        struct periodic_task * msg_task;
43
        struct task_manager * tm;
44
        timeout topology_interval;
45
        uint64_t janus_streaming_id;
46
};
47

    
48
struct pstreamer_manager {
49
        struct ord_set * streamers;
50
        uint16_t initial_streaming_port;
51
        char * streamer_opts;
52
        struct janus_instance const * janus;
53
};
54

    
55
int8_t pstreamer_init(struct pstreamer * ps, const char * rtp_dst_ip, const char * opts)
56
/* we assume source_ip and source_port are valid strings */
57
{
58
        char config[MAX_PSINSTANCE_CONFIG_LENGTH];
59
        char * fmt = "port=%d,dechunkiser=rtp,base=%d,addr=%s";
60
        int count;
61

    
62
        count = snprintf(config, MAX_PSINSTANCE_CONFIG_LENGTH, fmt, ps->base_port, ps->base_port+1, rtp_dst_ip);
63
        if (opts && (size_t)(MAX_PSINSTANCE_CONFIG_LENGTH - count) > strlen(opts))
64
                snprintf(config + count, MAX_PSINSTANCE_CONFIG_LENGTH - count, ",%s", opts);
65
        ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
66

    
67
        ps->topology_task = NULL;
68
        ps->offer_task = NULL;
69
        ps->msg_task = NULL;
70
        ps->tm = NULL;
71
        ps->topology_interval = 400;
72
        ps->janus_streaming_id = 0;
73

    
74
        if (ps->psc)
75
                return 0;
76
        else 
77
                return -1;
78

    
79
        return 0;
80
}
81

    
82
int8_t pstreamer_deinit(struct pstreamer * ps)
83
{
84
        if (ps->topology_task)
85
                task_manager_destroy_task(ps->tm, &(ps->topology_task));
86
        if (ps->offer_task)
87
                task_manager_destroy_task(ps->tm, &(ps->offer_task));
88
        if (ps->msg_task)
89
                task_manager_destroy_task(ps->tm, &(ps->msg_task));
90
        psinstance_destroy(&(ps->psc));
91
        return 0;
92
}
93

    
94
int8_t pstreamer_cmp(const void * v1, const void * v2)
95
{
96
        const struct pstreamer *ps1, *ps2;
97

    
98
        ps1 = (struct pstreamer *) v1;
99
        ps2 = (struct pstreamer *) v2;
100

    
101
        if (ps1 && ps2)
102
        {
103
                return strcmp(ps1->id, ps2->id);
104
        }
105
        return 0;
106
}
107

    
108
char * pstreamer_to_json(const struct pstreamer * ps)
109
{
110
        char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\",\"janus_streaming_id\":\"%"PRId64"\"}";
111
        size_t reslength;
112
        char * res = NULL;
113

    
114
        if (ps)
115
        {
116
                reslength = strlen(fmt) - 2*3 + PSID_LENGTH + MAX_IPADDR_LENGTH + MAX_PORT_LENGTH + 1;
117
                res = malloc(reslength * sizeof(char));
118
                sprintf(res, fmt, ps->id, ps->source_ip, ps->source_port, ps->janus_streaming_id);
119
        }
120
        return res;
121
}
122

    
123
struct pstreamer_manager * pstreamer_manager_new(uint16_t starting_port, const struct janus_instance *janus)
124
{
125
        struct pstreamer_manager * psm = NULL;
126

    
127
        psm = malloc(sizeof(struct pstreamer_manager));
128
        psm->streamers = ord_set_new(1, pstreamer_cmp);
129
        /* VLC players assume RTP port numbers are even numbers, we allocate RTP ports starting from starting_port + 1
130
         * (as the first one is the pstreamer port). So starting_port must be an odd number */
131
        psm->initial_streaming_port = (starting_port % 2 == 1) ? starting_port : starting_port + 1;
132
        psm->streamer_opts = NULL;
133
        psm->janus = janus;
134

    
135
        return psm;
136
}
137

    
138
void pstreamer_manager_remove_orphans(struct pstreamer_manager * psm, time_t interval)
139
{
140
        struct ord_set * orphans;
141
        struct pstreamer * ps;
142
        const void * iter;
143
        time_t now;
144

    
145
        now = time(NULL);
146
        orphans = ord_set_new(10, ord_set_dummy_cmp);
147
        ord_set_for_each(iter, psm->streamers)
148
        {
149
                ps = (struct pstreamer *) iter;
150
                if (now - ps->last_beat > interval)
151
                        ord_set_insert(orphans, (void *) iter, 0);
152
        }
153
        ord_set_for_each(iter, orphans)
154
        {
155
                ps = (struct pstreamer *) iter;
156
                debug("Destroying inactive pstreamer instance %s\n", ps->id);
157
                pstreamer_manager_destroy_streamer(psm, ps);
158
        }
159
        ord_set_destroy(&orphans, 0);
160
}
161

    
162
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
163
{
164
        const void * ps = NULL;
165
        
166
        if (psm && *psm)
167
        {
168
                ord_set_for_each(ps, (*psm)->streamers)
169
                {
170
                        if ((*psm)->janus)
171
                                janus_instance_destroy_streaming_point((*psm)->janus, ((struct pstreamer*)ps)->janus_streaming_id);
172
                        pstreamer_deinit((struct pstreamer *)ps);
173
                }
174

    
175
                ord_set_destroy(&((*psm)->streamers), 1);
176
                if((*psm)->streamer_opts)
177
                {
178
                        free((*psm)->streamer_opts);
179
                        (*psm)->streamer_opts = NULL;
180
                }
181
                free(*psm);
182
                *psm = NULL;
183
        }
184
}
185

    
186
void pstreamer_touch(struct pstreamer *ps)
187
{
188
        ps->last_beat = time(NULL);
189
}
190

    
191
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
192
{
193
        if (ps && tm && ps->psc)
194
        {
195
                ps->tm = tm;
196
                ps->topology_task = task_manager_new_task(tm, NULL, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
197
                ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
198
                ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
199
        }
200
        return 0;
201
}
202

    
203
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
204
{
205
        const struct pstreamer * ps = NULL;
206
        const void * iter = NULL;
207
        uint16_t base_port;
208
        uint8_t found = 1;
209

    
210
        base_port = psm->initial_streaming_port;
211
        while (found==1)
212
        {
213
                found = 0;
214
                ord_set_for_each(iter, psm->streamers)
215
                {
216
                        ps = (const struct pstreamer *) iter;
217
                        if (ps->base_port == base_port)
218
                                found = 1;
219
                }
220
                if (found)
221
                        base_port += 6;  /* we allocate 5 port numbers per each streamer; the first is the pstreamer port, the following 4 are the RTP ports (two
222
                                            for audio and two for video). We add the sixth one for compatibility with the VLC players which expect the RTP base
223
                                            port numbers to be even (base_port is kept odd) */
224
        }
225
        return base_port;
226
}
227

    
228
const struct pstreamer * pstreamer_manager_get_streamer(const struct pstreamer_manager *psm, const char * id)
229
{
230
        struct pstreamer * ps = NULL;
231
        const void * ptr = NULL;
232

    
233
        ps = malloc(sizeof(struct pstreamer));
234
        strncpy(ps->id, id, PSID_LENGTH);
235
        ptr = ord_set_find(psm->streamers, (const void *) ps);
236

    
237
        free(ps);
238
        return (const struct pstreamer*) ptr;
239
}
240

    
241
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id, const char * rtp_dst_ip, struct streamer_creation_callback * scc)
242
{
243
        struct pstreamer * ps = NULL;
244
        const void * ptr = NULL;
245

    
246
        if (psm && source_ip && source_port && id && rtp_dst_ip)
247
        {
248
                ps = malloc(sizeof(struct pstreamer));
249
                strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
250
                ps->source_port = atoi(source_port);
251
                strncpy(ps->id, id, PSID_LENGTH);
252
                ps->base_port = assign_streaming_ports(psm); 
253
                ptr = ord_set_find(psm->streamers, (const void *) ps);
254
                if (ptr == NULL)
255
                {
256
                        pstreamer_touch(ps);
257
                        streamer_creation_set_pstreamer_ref(scc, ps);
258
                        if (psm->janus)
259
                        {
260
                                debug("calling janus upon notification of perstreamer creation\n");
261
                                janus_instance_create_streaming_point(psm->janus, &(ps->janus_streaming_id), ps->base_port+1, ps->base_port+3, scc);
262
                        }
263
                        else
264
                                if (scc)
265
                                        streamer_creation_callback_trigger(scc, 0);
266
                        pstreamer_init(ps, rtp_dst_ip, psm->streamer_opts);
267
                        ord_set_insert(psm->streamers, ps, 0);
268
                } else
269
                {
270
                        free(ps);
271
                        ps = NULL;
272
                }
273
        }
274

    
275
        return ps;
276
}
277

    
278
uint8_t pstreamer_manager_destroy_streamer(struct pstreamer_manager *psm, const struct pstreamer *ps)
279
{
280
        uint8_t res = 1;
281

    
282
        if (psm && ps)
283
        {
284
                if(psm->janus)
285
                        janus_instance_destroy_streaming_point(psm->janus, ps->janus_streaming_id);
286
                pstreamer_deinit((struct pstreamer *)ps);
287
                res = ord_set_remove(psm->streamers, ps, 1);
288
        }
289

    
290
        return res;
291
}
292

    
293
const char * pstreamer_id(const struct pstreamer * ps)
294
{
295
        if (ps)
296
                return ps->id;
297
        return NULL;
298
}
299

    
300
uint16_t pstreamer_base_port(const struct pstreamer * ps)
301
{
302
        if (ps)
303
                return ps->base_port;
304
        return 0;
305
}
306

    
307
int8_t pstreamer_manager_set_streamer_options(struct pstreamer_manager *psm, const char * opts)
308
{
309
        int8_t res = -1;
310
        
311
        if (psm && opts)
312
        {
313
                if(psm->streamer_opts)
314
                {
315
                        free(psm->streamer_opts);
316
                        psm->streamer_opts = NULL;
317
                }
318
                psm->streamer_opts = strdup(opts);
319
                res = 0;
320
        }
321

    
322
        return res;
323
}
324

    
325
const char * pstreamer_source_ipaddr(const struct pstreamer *ps)
326
{
327
        if (ps)
328
                return ps->source_ip;
329
        return NULL;
330
}
331

    
332
const char * pstreamer_source_port(const struct pstreamer *ps)
333
{
334
        static char buff[6];
335

    
336
        if (ps)
337
        {
338
                sprintf(buff, "%"PRId16"", ps->source_port);
339
                return buff;
340
        }
341
        return NULL;
342
}