Statistics
| Branch: | Revision:

peerstreamer-src / src / pstreamer.c @ b87dc7a2

History | View | Annotate | Download (5.69 KB)

1 8f5b2a1b Luca Baldesi
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19
20
#include<pstreamer.h>
21
#include<time.h>
22
#include<stdlib.h>
23
#include<string.h>
24
#include<stdio.h>
25
#include<ord_set.h>
26 b87dc7a2 Luca Baldesi
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
29 8f5b2a1b Luca Baldesi
30
struct pstreamer {
31
        char source_ip[MAX_IPADDR_LENGTH];
32 b87dc7a2 Luca Baldesi
        uint16_t source_port;
33 8f5b2a1b Luca Baldesi
        char id[PSID_LENGTH];  // identifier for the streamer instance
34 b87dc7a2 Luca Baldesi
        struct psinstance * psc;
35 8f5b2a1b Luca Baldesi
        time_t last_beat;
36 c039490c Luca Baldesi
        uint16_t base_port;
37 b87dc7a2 Luca Baldesi
        struct periodic_task * topology_task;
38
        struct periodic_task * offer_task;
39
        struct periodic_task * msg_task;
40
        struct task_manager * tm;
41
        timeout topology_interval;
42 8f5b2a1b Luca Baldesi
};
43
44
struct pstreamer_manager {
45
        struct ord_set * streamers;
46 b87dc7a2 Luca Baldesi
        uint16_t initial_streaming_port;
47 8f5b2a1b Luca Baldesi
};
48
49 b87dc7a2 Luca Baldesi
int8_t pstreamer_init(struct pstreamer * ps)
50
/* we assume source_ip and source_port are valid strings */
51
{
52
        char config[255];
53
        char * fmt = "port=%d,dechunkiser=rtp,base=%d";
54
55
        sprintf(config, fmt, ps->base_port, ps->base_port+1);
56
        ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
57
58
        if (ps->psc)
59
        {
60
                ps->topology_task = NULL;
61
                ps->offer_task = NULL;
62
                ps->msg_task = NULL;
63
                ps->tm = NULL;
64
                ps->topology_interval = 400;
65
        }
66
67
        return 0;
68
}
69
70
int8_t pstreamer_deinit(struct pstreamer * ps)
71
{
72
        task_manager_destroy_task(ps->tm, &(ps->topology_task));
73
        task_manager_destroy_task(ps->tm, &(ps->offer_task));
74
        task_manager_destroy_task(ps->tm, &(ps->msg_task));
75
        psinstance_destroy(&(ps->psc));
76
        return 0;
77
}
78
79 8f5b2a1b Luca Baldesi
int8_t pstreamer_cmp(const void * v1, const void * v2)
80
{
81
        const struct pstreamer *ps1, *ps2;
82
83
        ps1 = (struct pstreamer *) v1;
84
        ps2 = (struct pstreamer *) v2;
85
86
        if (ps1 && ps2)
87
        {
88
                return strcmp(ps1->id, ps2->id);
89
        }
90
        return 0;
91
}
92
93
char * pstreamer_to_json(const struct pstreamer * ps)
94
{
95 b87dc7a2 Luca Baldesi
        char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\"}";
96 8f5b2a1b Luca Baldesi
        size_t reslength;
97
        char * res = NULL;
98
99
        if (ps)
100
        {
101
                reslength = strlen(fmt) - 2*3 + PSID_LENGTH + MAX_IPADDR_LENGTH + MAX_PORT_LENGTH + 1;
102
                res = malloc(reslength * sizeof(char));
103
                sprintf(res, fmt, ps->id, ps->source_ip, ps->source_port);
104
        }
105
        return res;
106
}
107
108 c039490c Luca Baldesi
struct pstreamer_manager * pstreamer_manager_new(uint16_t starting_port)
109 8f5b2a1b Luca Baldesi
{
110
        struct pstreamer_manager * psm = NULL;
111
112
        psm = malloc(sizeof(struct pstreamer_manager));
113
        psm->streamers = ord_set_new(1, pstreamer_cmp);
114 b87dc7a2 Luca Baldesi
        psm->initial_streaming_port = starting_port;
115 8f5b2a1b Luca Baldesi
116
        return psm;
117
}
118
119
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
120
{
121 b87dc7a2 Luca Baldesi
        const void * ps = NULL;
122
        
123 8f5b2a1b Luca Baldesi
        if (psm && *psm)
124
        {
125 b87dc7a2 Luca Baldesi
                ord_set_for_each(ps, (*psm)->streamers)
126
                        pstreamer_deinit((struct pstreamer *)ps);
127
128 8f5b2a1b Luca Baldesi
                ord_set_destroy(&((*psm)->streamers), 1);
129
                free(*psm);
130
                *psm = NULL;
131
        }
132
}
133
134
void pstreamer_touch(struct pstreamer *ps)
135
{
136
        ps->last_beat = time(NULL);
137
}
138
139 b87dc7a2 Luca Baldesi
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
140
{
141
        if (ps && tm && ps->psc)
142
        {
143
                ps->tm = tm;
144
                ps->topology_task = task_manager_new_task(tm, pstreamer_topology_task_reinit, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
145
                ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
146
                ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
147
        }
148
        return 0;
149
}
150
151
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
152
{
153
        const struct pstreamer * ps = NULL;
154
        const void * iter = NULL;
155
        uint16_t base_port;
156
        uint8_t found = 1;
157
158
        base_port = psm->initial_streaming_port;
159
        while (found==1)
160
        {
161
                found = 0;
162
                ord_set_for_each(iter, psm->streamers)
163
                {
164
                        ps = (const struct pstreamer *) iter;
165
                        if (ps->base_port == base_port)
166
                                found = 1;
167
                }
168
                if (found)
169
                        base_port += 5;  // we consider RTP streamers uses 4 ports
170
        }
171
        return base_port;
172
}
173
174 8f5b2a1b Luca Baldesi
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id)
175
{
176
        struct pstreamer * ps = NULL;
177
        const void * ptr = NULL;
178
179
        if (psm && source_ip && source_port && id)
180
        {
181
                ps = malloc(sizeof(struct pstreamer));
182
                strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
183 b87dc7a2 Luca Baldesi
                ps->source_port = atoi(source_port);
184 8f5b2a1b Luca Baldesi
                strncpy(ps->id, id, PSID_LENGTH);
185 b87dc7a2 Luca Baldesi
                ps->base_port = assign_streaming_ports(psm); 
186 8f5b2a1b Luca Baldesi
                ptr = ord_set_find(psm->streamers, (const void *) ps);
187
                if (ptr == NULL)
188
                {
189
                        pstreamer_touch(ps);
190 b87dc7a2 Luca Baldesi
                        pstreamer_init(ps);
191 8f5b2a1b Luca Baldesi
                        ord_set_insert(psm->streamers, ps, 0);
192
                } else
193
                {
194
                        free(ps);
195
                        ps = NULL;
196
                }
197
        }
198
199
        return ps;
200
}
201
202
uint8_t pstreamer_manager_destroy_streamer(struct pstreamer_manager *psm, const struct pstreamer *ps)
203
{
204
        uint8_t res = 1;
205
206
        if (psm && ps)
207 b87dc7a2 Luca Baldesi
        {
208
                pstreamer_deinit((struct pstreamer *)ps);
209 8f5b2a1b Luca Baldesi
                res = ord_set_remove(psm->streamers, ps, 1);
210 b87dc7a2 Luca Baldesi
        }
211 8f5b2a1b Luca Baldesi
212
        return res;
213
}
214 c039490c Luca Baldesi
215
const char * pstreamer_id(const struct pstreamer * ps)
216
{
217
        if (ps)
218
                return ps->id;
219
        return NULL;
220
}
221
222
uint16_t pstreamer_base_port(const struct pstreamer * ps)
223
{
224
        if (ps)
225
                return ps->base_port;
226
        return 0;
227
}