Statistics
| Branch: | Revision:

peerstreamer-src / peerstreamer-ng.c @ c81c126e

History | View | Annotate | Download (5 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<signal.h>
21
#include<string.h>
22
#include<router.h>
23
#include<path_handlers.h>
24
#include<debug.h>
25
#include<task_manager.h>
26
#include<context.h>
27
#include<pschannel.h>
28
#include<periodic_task_intfs.h>
29
#include<pstreamer.h>
30

    
31
#include<mongoose.h>
32

    
33

    
34
static uint8_t running = 1;
35

    
36
void sig_exit(int signo)
37
{
38
        running = 0;
39
}
40

    
41
void parse_args(struct context *c, int argc, char *const* argv)
42
{
43
        int o;
44

    
45
        while ((o = getopt (argc, argv, "vqp:c:s:")) != -1)
46
                switch (o) {
47
                        case 'p':
48
                                strncpy(c->http_port, optarg, 16);
49
                                break;
50
                        case 'c':
51
                                c->csvfile = strdup(optarg);
52
                                break;
53
                        case 's':
54
                                c->streamer_opts = strdup(optarg);
55
                                break;
56
                        case 'q':
57
                                set_debug(0);
58
                                break;
59
                        case 'v':
60
                                set_debug(2);
61
                                break;
62
                }
63
}
64

    
65
void mg_request_decode(char * buff, int buff_len, const struct http_message *hm)
66
{
67
        size_t i = 0;
68

    
69
        if (buff_len - i -2 > hm->method.len)
70
        {
71
                memmove(buff + i, hm->method.p, hm->method.len);
72
                i += hm->method.len;
73
        }
74
        buff[i++] = ' ';
75
        if (buff_len - i -2 > hm->uri.len)
76
        {
77
                memmove(buff + i, hm->uri.p, hm->uri.len);
78
                i += hm->uri.len;
79
        }
80
        buff[i++] = ' ';
81
        if (buff_len - i -1 > hm->query_string.len)
82
        {
83
                memmove(buff + i, hm->query_string.p, hm->query_string.len);
84
                i += hm->query_string.len;
85
        }
86
        buff[i] = '\0';
87
}
88

    
89
void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
90
{
91
        struct context *c = nc->user_data;
92
        struct http_message *hm;
93
        char buff[80];
94

    
95
        switch (ev) {
96
                case MG_EV_HTTP_REQUEST:
97
                        hm  = (struct http_message *) ev_data;
98
                        info("Received a request:\n");
99
                        mg_request_decode(buff, 80, hm);
100
                        info("\t%s\n", buff);
101
                        // Try to call a path handler. If it fails serve
102
                        // public contents
103
                        if(router_handle(c->router, nc, hm))
104
                                mg_serve_http(nc, hm, c->http_opts);
105
                        break;
106
                default:
107
                        break;
108
        }
109
}
110

    
111
void init(struct context *c, int argc, char **argv)
112
{
113
        strncpy(c->http_port, "3000", 16);
114
        signal(SIGINT, sig_exit);
115

    
116
        c->http_opts.enable_directory_listing = "no";
117
        c->http_opts.document_root = "Public/";
118
        c->http_opts.index_files = "index.html,player.html";
119
        c->csvfile = NULL;
120
        c->streamer_opts = NULL;
121
        set_debug(1);
122

    
123
        c->router = router_create(10);
124
        load_path_handlers(c->router);
125
        c->tm = task_manager_new();
126

    
127
        c->mongoose_srv = (struct mg_mgr*) malloc(sizeof(struct mg_mgr));
128
        mg_mgr_init(c->mongoose_srv, c);
129

    
130
        c->janus = janus_instance_create(c->mongoose_srv, c->tm, NULL);
131
        janus_instance_launch(c->janus);
132
        c->psm = pstreamer_manager_new(6001, c->janus);
133

    
134
        parse_args(c, argc, argv);
135
        pstreamer_manager_set_streamer_options(c->psm, c->streamer_opts);
136
        c->pb = pschannel_bucket_new(c->csvfile);
137
        pschannel_bucket_insert(c->pb, "local_channel", "127.0.0.1", "6000", "300kbps", "127.0.0.1:3000/lchannel.sdp");
138
}
139

    
140
struct mg_mgr * launch_http_task(struct context *c)
141
{
142
        struct mg_connection *nc;
143

    
144
        nc = mg_bind(c->mongoose_srv, c->http_port, ev_handler);
145
        nc->user_data = c;
146
        if (nc == NULL) {
147
                fprintf(stderr, "Error starting server on port %s\n", c->http_port);
148
                exit(1);
149
        }
150
        mg_set_protocol_http_websocket(nc);
151
        task_manager_new_task(c->tm, mongoose_task_reinit, mongoose_task_callback, 1000, (void *)c->mongoose_srv->ifaces[MG_MAIN_IFACE]);
152

    
153
        return c->mongoose_srv;
154
}
155

    
156
void context_deinit(struct context *c)
157
{
158
        if (c->csvfile)
159
                free(c->csvfile);
160
        if (c->streamer_opts)
161
                free(c->streamer_opts);
162
        router_destroy(&(c->router));
163
        pstreamer_manager_destroy(&(c->psm));  // this must be destroyed before task managers!
164
        sleep(1); //  let janus get the http notications for interrupting streaming sessions
165
        janus_instance_destroy(&(c->janus));  // this has to be destroyed after pstreamer_manager and before task_manager
166
        task_manager_destroy(&(c->tm));
167
        pschannel_bucket_destroy(&(c->pb));
168
        mg_mgr_free(c->mongoose_srv);
169
        free(c->mongoose_srv);
170
}
171

    
172
int main(int argc, char** argv)
173
{
174
        static struct context c;
175

    
176
        init(&c, argc, argv);
177

    
178

    
179
        info("Starting server on port %s\n", c.http_port);
180
        launch_http_task(&c);
181
        task_manager_new_task(c.tm, NULL, pschannel_csvfile_task_callback, 1000, (void *) c.pb);
182
        task_manager_new_task(c.tm, NULL, pstreamer_purge_task_callback, 5000, (void *) c.psm);
183
        while (running)
184
                task_manager_poll(c.tm, 1000);
185

    
186
        info("\nExiting..\n");
187
        context_deinit(&c);
188
        return 0;
189
}