Statistics
| Branch: | Revision:

pstreamer / src / psinstance.c @ 662a3ab9

History | View | Annotate | Download (8.76 KB)

1
/*
2
 * Copyright (c) 2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20

    
21
#include<psinstance.h>
22
#include<psinstance_internal.h>
23
#include<grapes_config.h>
24
#include<malloc.h>
25
#include<string.h>
26
#include<net_helpers.h>
27
#include<net_helper.h>
28
#include<output.h>
29
#include<input.h>
30
#include<streaming.h>
31
#include<measures.h>
32
#include<topology.h>
33
#include<grapes_msg_types.h>
34
#include<dbg.h>
35
#include<chunk_signaling.h>
36
#include<streaming_timers.h>
37
#include<pstreamer_event.h>
38

    
39
struct psinstance {
40
        struct nodeID * my_sock;
41
        struct chunk_output * chunk_out;
42
        struct measures * measure;
43
        struct topology * topology;
44
        struct streaming_context * streaming;
45
        struct input_context inc;
46
        struct streaming_timers timers;
47
        char * iface;
48
        int port;
49
        int outbuff_size;
50
        int chunkbuffer_size;
51
        uint8_t num_offers;
52
        uint8_t chunks_per_offer;
53
        suseconds_t chunk_time_interval; // microseconds
54
        suseconds_t chunk_offer_interval; // microseconds
55
        int source_multiplicity;
56
        enum L3PROTOCOL l3;
57
};
58

    
59
int config_parse(struct psinstance * ps,const char * config)
60
{
61
        struct tag * tags;
62
        const char *tmp_str;
63

    
64
        tags = grapes_config_parse(config);
65

    
66
        tmp_str = grapes_config_value_str_default(tags, "iface", NULL);
67
        ps->iface = tmp_str ? strdup(tmp_str) : NULL;
68
        grapes_config_value_int_default(tags, "port", &(ps->port), 6000);
69
        grapes_config_value_int_default(tags, "outbuff_size", &(ps->outbuff_size), 75);
70
        grapes_config_value_int_default(tags, "chunkbuffer_size", &(ps->chunkbuffer_size), 50);
71
        grapes_config_value_int_default(tags, "source_multiplicity", &(ps->source_multiplicity), 3);
72

    
73
        tmp_str = grapes_config_value_str_default(tags, "filename", NULL);
74
        strcpy((ps->inc).filename, tmp_str ? tmp_str : "");
75
        tmp_str = grapes_config_value_str_default(tags, "AF", NULL);
76
        ps->l3 = tmp_str && (strcmp(tmp_str, "INET6") == 0) ? IP6 : IP4;
77

    
78
        free(tags);
79
        return 0;
80
}
81

    
82
int node_init(struct psinstance * ps)
83
{
84
        char * my_addr;
85

    
86
        if (ps->iface)
87
                my_addr = iface_addr(ps->iface, ps->l3);
88
        else
89
                my_addr = default_ip_addr(ps->l3);
90
        if (my_addr == NULL)
91
        {
92
                fprintf(stderr, "[ERROR] cannot get a valid ip address\n");
93
                return -1;
94
        }
95
        ps->my_sock = net_helper_init(my_addr, ps->port, "");
96
        free(my_addr);
97

    
98
        if (ps->my_sock)
99
                return 0;
100
        else
101
                return -2;
102
}
103

    
104
struct psinstance * psinstance_create(const char * srv_ip, const int srv_port, const char * config)
105
{
106
        struct psinstance * ps = NULL;
107
        struct nodeID * srv;
108
        int res;
109

    
110
        if (srv_ip && srv_port >= 0 && srv_port < 65536)
111
        {
112
                ps = malloc(sizeof(struct psinstance));
113
                memset(ps, 0, sizeof(struct psinstance));
114

    
115
                ps->num_offers = 1;
116
                ps->chunks_per_offer = 1;
117
                ps->chunk_time_interval = 0;
118
                ps->chunk_offer_interval = 1000000/25;  // microseconds divided by frame (chunks) per second
119
                config_parse(ps, config);
120
                res = node_init(ps);
121
                if (res == 0)
122
                {
123
                        ps->measure = measures_create(nodeid_static_str(ps->my_sock));
124
                        ps->topology = topology_create(ps, "");
125
                        streaming_timers_init(&(ps->timers), ps->chunk_offer_interval);
126
                        ps->chunk_out = NULL;  // To be used as a flag if current role is source or peer role
127
                        if (srv_port)
128
                        {  // creating a normal peer
129
                                srv = create_node(srv_ip, srv_port);
130
                                if (srv)
131
                                {
132
                                        ps->inc.fds[0] = -1;
133
                                        ps->streaming = streaming_create(ps, NULL, config);
134
                                        topology_node_insert(ps->topology, srv);
135
                                        ps->chunk_out = output_create(ps->outbuff_size, config, ps);
136
                                        nodeid_free(srv);
137
                                } else
138
                                        psinstance_destroy(&ps);
139
                        } else  // creating a source peer
140
                                ps->streaming = streaming_create(ps, &(ps->inc), config);
141
                }
142
                else
143
                        psinstance_destroy(&ps);
144
        }
145

    
146
        return ps;
147
}
148

    
149
void psinstance_destroy(struct psinstance ** ps)
150
{
151
        if (ps && *ps)
152
        {
153
                if ((*ps)->measure)
154
                        measures_destroy(&(*ps)->measure);
155
                if ((*ps)->topology)
156
                        topology_destroy(&(*ps)->topology);
157
                if ((*ps)->chunk_out)
158
                        output_destroy(&(*ps)->chunk_out);
159
                if ((*ps)->streaming)
160
                        streaming_destroy(&(*ps)->streaming);
161
                if ((*ps)->iface)
162
                        free((*ps)->iface);
163
                if ((*ps)->my_sock)
164
                        nodeid_free((*ps)->my_sock);
165
                free(*ps);
166
                *ps = NULL;
167
        }
168
}
169

    
170
struct nodeID * psinstance_nodeid(const struct psinstance * ps)
171
{
172
        return ps->my_sock;
173
}
174

    
175
int8_t psinstance_is_source(const struct psinstance * ps)
176
{
177
        return ps->chunk_out ? 0 : 1;
178
}
179

    
180
int  psinstance_chunkbuffer_size(const struct psinstance * ps)
181
{
182
        return ps->chunkbuffer_size;
183
}
184

    
185
struct topology * psinstance_topology(const struct psinstance * ps)
186
{
187
        return ps->topology;
188
}
189

    
190
struct measures * psinstance_measures(const struct psinstance * ps)
191
{
192
        return ps->measure;
193
}
194

    
195
struct chunk_output * psinstance_output(const struct psinstance * ps)
196
{
197
        return ps->chunk_out;
198
}
199

    
200
uint8_t psinstance_num_offers(const struct psinstance * ps)
201
{
202
        return ps->num_offers;
203
}
204

    
205
uint8_t psinstance_chunks_per_offer(const struct psinstance * ps)
206
{
207
        return ps->chunks_per_offer;
208
}
209

    
210
const struct streaming_context * psinstance_streaming(const struct psinstance * ps)
211
{
212
        return ps->streaming;
213
}
214

    
215
int8_t psinstance_send_offer(struct psinstance * ps)
216
{
217
        send_offer(ps->streaming);
218
        return 0;
219
}
220

    
221
int8_t psinstance_inject_chunk(struct psinstance * ps)
222
{
223
        struct chunk * new_chunk;
224
        int8_t res = 0;
225

    
226
        if (ps && psinstance_is_source(ps))
227
        {
228
                new_chunk = generated_chunk(ps->streaming, &(ps->chunk_time_interval));
229
                if(new_chunk && add_chunk(ps->streaming, new_chunk))
230
                        inject_chunk(ps->streaming, new_chunk, ps->source_multiplicity);
231
                else
232
                        res = -1;
233
                if(new_chunk)
234
                        free(new_chunk);
235
        } else
236
                res = 1;
237
        return res;
238
}
239

    
240
int8_t psinstance_handle_msg(const struct psinstance * ps)
241
        /* WARNING: this is a blocking function on the network socket */
242
{
243
        uint8_t buff[MSG_BUFFSIZE];
244
        struct nodeID *remote = NULL;
245
        int len;
246
        int8_t res = 0;
247

    
248
        len = recv_from_peer(ps->my_sock, &remote, buff, MSG_BUFFSIZE);
249
        if (len < 0) {
250
                fprintf(stderr,"[ERROR] Error receiving message. Maybe larger than %d bytes\n", MSG_BUFFSIZE);
251
                res = -1;
252
        }else
253
                switch (buff[0] /* Message Type */) {
254
                        case MSG_TYPE_TMAN:
255
                        case MSG_TYPE_NEIGHBOURHOOD:
256
                        case MSG_TYPE_TOPOLOGY:
257
                                dtprintf("Topo message received:\n");
258
                                topology_message_parse(ps->topology, remote, buff, len);
259
                                res = 1;
260
                                break;
261
                        case MSG_TYPE_CHUNK:
262
                                dtprintf("Chunk message received:\n");
263
                                if(psinstance_is_source(ps))
264
                                        dtprintf("\tDiscarded as playing source role\n");
265
                                else
266
                                        received_chunk(ps->streaming, remote, buff, len);
267
                                res = 2;
268
                                break;
269
                        case MSG_TYPE_SIGNALLING:
270
                                dtprintf("Sign message received:\n");
271
                                sigParseData(ps, remote, buff, len);
272
                                res = 3;
273
                                break;
274
                        default:
275
                                fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
276
                                res = -2;
277
                }
278

    
279
        if (remote)
280
                nodeid_free(remote);
281
        return res;
282
}
283

    
284
int psinstance_poll(struct psinstance *ps, suseconds_t delta)
285
{
286
        enum streaming_action required_action;
287
        int data_state;
288

    
289
        if (ps)
290
        {
291
                streaming_timers_set_timeout(&ps->timers, delta, psinstance_is_source(ps) && ps->inc.fds[0] == -1);
292
                dtprintf("[DEBUG] timer: %lu %lu\n", ps->timers.sleep_timer.tv_sec, ps->timers.sleep_timer.tv_usec); 
293
                data_state = wait4data(ps->my_sock, &(ps->timers.sleep_timer), ps->inc.fds);
294

    
295
                required_action = streaming_timers_state_handler(&ps->timers, data_state, psinstance_is_source(ps));
296
                switch (required_action) {
297
                        case OFFER_ACTION:
298
                                dtprintf("Offer time!\n");
299
                                psinstance_send_offer(ps);
300
                                dtprintf("interval: %lu\n", ps->chunk_offer_interval);
301
                                streaming_timers_update_offer_time(&ps->timers, ps->chunk_offer_interval);
302
                                break;
303
                        case INJECT_ACTION:
304
                                dtprintf("Chunk seeding time!\n");
305
                                psinstance_inject_chunk(ps);
306
                                streaming_timers_update_chunk_time(&ps->timers, ps->chunk_time_interval);
307
                                break;
308
                        case PARSE_MSG_ACTION:
309
                                dtprintf("Got a message from the world!!\n");
310
                                psinstance_handle_msg(ps);
311
                                break;
312
                        case NO_ACTION:
313
                                dtprintf("Nothing happens...\n");
314
                        default:
315
                                break;
316
                }
317
                if (streaming_timers_update_flag(&ps->timers))
318
                                topology_update(ps->topology);
319
        }
320
        return data_state;
321
}
322

    
323
int8_t psinstance_topology_update(const struct psinstance * ps)
324
{
325
        if (ps && ps->topology)
326
                topology_update(ps->topology);
327
        return 0;
328
}
329

    
330
suseconds_t psinstance_offer_interval(const struct psinstance * ps)
331
{
332
        return ps->chunk_offer_interval;
333
}
334

    
335
int pstreamer_register_fds(const struct psinstance * ps, fd_register_f func, void *handler)
336
{
337
        register_network_fds(ps->my_sock, func, handler);
338
        return 0;
339
}