Statistics
| Branch: | Revision:

pstreamer / src / psinstance.c @ 5820d286

History | View | Annotate | Download (8.6 KB)

1
/*
2
 * Copyright (c) 2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20

    
21
#include<psinstance.h>
22
#include<psinstance_internal.h>
23
#include<grapes_config.h>
24
#include<malloc.h>
25
#include<string.h>
26
#include<net_helpers.h>
27
#include<net_helper.h>
28
#include<output.h>
29
#include<input.h>
30
#include<streaming.h>
31
#include<measures.h>
32
#include<topology.h>
33
#include<grapes_msg_types.h>
34
#include<dbg.h>
35
#include<chunk_signaling.h>
36
#include<streaming_timers.h>
37
#include<pstreamer_event.h>
38

    
39
struct psinstance {
40
        struct nodeID * my_sock;
41
        struct chunk_output * chunk_out;
42
        struct measures * measure;
43
        struct topology * topology;
44
        struct streaming_context * streaming;
45
        struct input_context inc;
46
        struct streaming_timers timers;
47
        char * iface;
48
        int port;
49
        int outbuff_size;
50
        int chunkbuffer_size;
51
        uint8_t num_offers;
52
        uint8_t chunks_per_offer;
53
        suseconds_t chunk_time_interval; // microseconds
54
        suseconds_t chunk_offer_interval; // microseconds
55
        int source_multiplicity;
56
};
57

    
58
int config_parse(struct psinstance * ps,const char * config)
59
{
60
        struct tag * tags;
61
        const char *tmp_str;
62

    
63
        tags = grapes_config_parse(config);
64

    
65
        tmp_str = grapes_config_value_str_default(tags, "iface", NULL);
66
        ps->iface = tmp_str ? strdup(tmp_str) : NULL;
67
        grapes_config_value_int_default(tags, "port", &(ps->port), 6000);
68
        grapes_config_value_int_default(tags, "outbuff_size", &(ps->outbuff_size), 75);
69
        grapes_config_value_int_default(tags, "chunkbuffer_size", &(ps->chunkbuffer_size), 50);
70
        grapes_config_value_int_default(tags, "source_multiplicity", &(ps->source_multiplicity), 3);
71
        tmp_str = grapes_config_value_str_default(tags, "filename", NULL);
72
        strcpy((ps->inc).filename, tmp_str ? tmp_str : "");
73

    
74
        free(tags);
75
        return 0;
76
}
77

    
78
int node_init(struct psinstance * ps)
79
{
80
        char * my_addr;
81

    
82
        if (ps->iface)
83
                my_addr = iface_addr(ps->iface);
84
        else
85
                my_addr = default_ip_addr();
86
        if (my_addr == NULL)
87
        {
88
                fprintf(stderr, "[ERROR] cannot get a valid ip address\n");
89
                return -1;
90
        }
91
        ps->my_sock = net_helper_init(my_addr, ps->port, "");
92
        free(my_addr);
93

    
94
        if (ps->my_sock)
95
                return 0;
96
        else
97
                return -2;
98
}
99

    
100
struct psinstance * psinstance_create(const char * srv_ip, const int srv_port, const char * config)
101
{
102
        struct psinstance * ps = NULL;
103
        struct nodeID * srv;
104
        int res;
105

    
106
        if (srv_ip && srv_port >= 0 && srv_port < 65536)
107
        {
108
                ps = malloc(sizeof(struct psinstance));
109
                memset(ps, 0, sizeof(struct psinstance));
110

    
111
                ps->num_offers = 1;
112
                ps->chunks_per_offer = 1;
113
                ps->chunk_time_interval = 0;
114
                ps->chunk_offer_interval = 1000000/25;  // microseconds divided by frame (chunks) per second
115
                config_parse(ps, config);
116
                res = node_init(ps);
117
                if (res == 0)
118
                {
119
                        ps->measure = measures_create(nodeid_static_str(ps->my_sock));
120
                        ps->topology = topology_create(ps, config);
121
                        streaming_timers_init(&(ps->timers), ps->chunk_offer_interval);
122
                        ps->chunk_out = NULL;  // To be used as a flag if current role is source or peer role
123
                        if (srv_port)
124
                        {  // creating a normal peer
125
                                srv = create_node(srv_ip, srv_port);
126
                                if (srv)
127
                                {
128
                                        ps->inc.fds[0] = -1;
129
                                        ps->streaming = streaming_create(ps, NULL, config);
130
                                        topology_node_insert(ps->topology, srv);
131
                                        ps->chunk_out = output_create(ps->outbuff_size, config, ps);
132
                                        nodeid_free(srv);
133
                                } else
134
                                        psinstance_destroy(&ps);
135
                        } else  // creating a source peer
136
                                ps->streaming = streaming_create(ps, &(ps->inc), config);
137
                }
138
                else
139
                        psinstance_destroy(&ps);
140
        }
141

    
142
        return ps;
143
}
144

    
145
void psinstance_destroy(struct psinstance ** ps)
146
{
147
        if (ps && *ps)
148
        {
149
                if ((*ps)->measure)
150
                        measures_destroy(&(*ps)->measure);
151
                if ((*ps)->topology)
152
                        topology_destroy(&(*ps)->topology);
153
                if ((*ps)->chunk_out)
154
                        output_destroy(&(*ps)->chunk_out);
155
                if ((*ps)->streaming)
156
                        streaming_destroy(&(*ps)->streaming);
157
                if ((*ps)->iface)
158
                        free((*ps)->iface);
159
                if ((*ps)->my_sock)
160
                        nodeid_free((*ps)->my_sock);
161
                free(*ps);
162
                *ps = NULL;
163
        }
164
}
165

    
166
struct nodeID * psinstance_nodeid(const struct psinstance * ps)
167
{
168
        return ps->my_sock;
169
}
170

    
171
int8_t psinstance_is_source(const struct psinstance * ps)
172
{
173
        return ps->chunk_out ? 0 : 1;
174
}
175

    
176
int  psinstance_chunkbuffer_size(const struct psinstance * ps)
177
{
178
        return ps->chunkbuffer_size;
179
}
180

    
181
struct topology * psinstance_topology(const struct psinstance * ps)
182
{
183
        return ps->topology;
184
}
185

    
186
struct measures * psinstance_measures(const struct psinstance * ps)
187
{
188
        return ps->measure;
189
}
190

    
191
struct chunk_output * psinstance_output(const struct psinstance * ps)
192
{
193
        return ps->chunk_out;
194
}
195

    
196
uint8_t psinstance_num_offers(const struct psinstance * ps)
197
{
198
        return ps->num_offers;
199
}
200

    
201
uint8_t psinstance_chunks_per_offer(const struct psinstance * ps)
202
{
203
        return ps->chunks_per_offer;
204
}
205

    
206
const struct streaming_context * psinstance_streaming(const struct psinstance * ps)
207
{
208
        return ps->streaming;
209
}
210

    
211
int8_t psinstance_send_offer(struct psinstance * ps)
212
{
213
        send_offer(ps->streaming);
214
        return 0;
215
}
216

    
217
int8_t psinstance_inject_chunk(struct psinstance * ps)
218
{
219
        struct chunk * new_chunk;
220
        int8_t res = 0;
221

    
222
        if (ps && psinstance_is_source(ps))
223
        {
224
                new_chunk = generated_chunk(ps->streaming, &(ps->chunk_time_interval));
225
                if(new_chunk && add_chunk(ps->streaming, new_chunk))
226
                        inject_chunk(ps->streaming, new_chunk, ps->source_multiplicity);
227
                else
228
                        res = -1;
229
                if(new_chunk)
230
                        free(new_chunk);
231
        } else
232
                res = 1;
233
        return res;
234
}
235

    
236
int8_t psinstance_handle_msg(const struct psinstance * ps)
237
        /* WARNING: this is a blocking function on the network socket */
238
{
239
        uint8_t buff[MSG_BUFFSIZE];
240
        struct nodeID *remote = NULL;
241
        int len;
242
        int8_t res = 0;
243

    
244
        len = recv_from_peer(ps->my_sock, &remote, buff, MSG_BUFFSIZE);
245
        if (len < 0) {
246
                fprintf(stderr,"[ERROR] Error receiving message. Maybe larger than %d bytes\n", MSG_BUFFSIZE);
247
                res = -1;
248
        }else
249
                switch (buff[0] /* Message Type */) {
250
                        case MSG_TYPE_TMAN:
251
                        case MSG_TYPE_NEIGHBOURHOOD:
252
                        case MSG_TYPE_TOPOLOGY:
253
                                dtprintf("Topo message received:\n");
254
                                topology_message_parse(ps->topology, remote, buff, len);
255
                                res = 1;
256
                                break;
257
                        case MSG_TYPE_CHUNK:
258
                                dtprintf("Chunk message received:\n");
259
                                if(psinstance_is_source(ps))
260
                                        dtprintf("\tDiscarded as playing source role\n");
261
                                else
262
                                        received_chunk(ps->streaming, remote, buff, len);
263
                                res = 2;
264
                                break;
265
                        case MSG_TYPE_SIGNALLING:
266
                                dtprintf("Sign message received:\n");
267
                                sigParseData(ps, remote, buff, len);
268
                                res = 3;
269
                                break;
270
                        default:
271
                                fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
272
                                res = -2;
273
                }
274

    
275
        if (remote)
276
                nodeid_free(remote);
277
        return res;
278
}
279

    
280
int psinstance_poll(struct psinstance *ps, suseconds_t delta)
281
{
282
        enum streaming_action required_action;
283
        int data_state;
284

    
285
        if (ps)
286
        {
287
                streaming_timers_set_timeout(&ps->timers, delta, psinstance_is_source(ps) && ps->inc.fds[0] == -1);
288
                dtprintf("[DEBUG] timer: %lu %lu\n", ps->timers.sleep_timer.tv_sec, ps->timers.sleep_timer.tv_usec); 
289
                data_state = wait4data(ps->my_sock, &(ps->timers.sleep_timer), ps->inc.fds);
290

    
291
                required_action = streaming_timers_state_handler(&ps->timers, data_state, psinstance_is_source(ps));
292
                switch (required_action) {
293
                        case OFFER_ACTION:
294
                                dtprintf("Offer time!\n");
295
                                psinstance_send_offer(ps);
296
                                dtprintf("interval: %lu\n", ps->chunk_offer_interval);
297
                                streaming_timers_update_offer_time(&ps->timers, ps->chunk_offer_interval);
298
                                break;
299
                        case INJECT_ACTION:
300
                                dtprintf("Chunk seeding time!\n");
301
                                psinstance_inject_chunk(ps);
302
                                streaming_timers_update_chunk_time(&ps->timers, ps->chunk_time_interval);
303
                                break;
304
                        case PARSE_MSG_ACTION:
305
                                dtprintf("Got a message from the world!!\n");
306
                                psinstance_handle_msg(ps);
307
                                break;
308
                        case NO_ACTION:
309
                                dtprintf("Nothing happens...\n");
310
                        default:
311
                                break;
312
                }
313
                if (streaming_timers_update_flag(&ps->timers))
314
                                topology_update(ps->topology);
315
        }
316
        return data_state;
317
}
318

    
319
int8_t psinstance_topology_update(const struct psinstance * ps)
320
{
321
        if (ps && ps->topology)
322
                topology_update(ps->topology);
323
        return 0;
324
}
325

    
326
suseconds_t psinstance_offer_interval(const struct psinstance * ps)
327
{
328
        return ps->chunk_offer_interval;
329
}
330

    
331
int pstreamer_register_fds(const struct psinstance * ps, fd_register_f func, void *handler)
332
{
333
        register_network_fds(ps->my_sock, func, handler);
334
        return 0;
335
}