Revision b87dc7a2

View differences:

Libs/pstreamer
1
Subproject commit dafbcca48fc8cb0b88644d3978c9f321c3932eed
1
Subproject commit 7cb6c19edfb2c8bdf17b8718283d05e7d28e3410
Makefile
3 3

  
4 4
EXE=peerstreamer-ng
5 5

  
6
CFLAGS+=-Isrc/ -ILibs/mongoose/
6
CFLAGS+=-Isrc/ -ILibs/mongoose/ -ILibs/pstreamer/include -ILibs/GRAPES/include -LLibs/GRAPES/src  -LLibs/pstreamer/src 
7 7
ifdef DEBUG
8 8
CFLAGS+=-g -W -Wall -Wno-unused-function -Wno-unused-parameter -O0
9 9
else
......
12 12

  
13 13
LIBS+=Libs/mongoose/mongoose.o Libs/GRAPES/src/libgrapes.a Libs/pstreamer/src/libpstreamer.a
14 14
MONGOOSE_OPTS+=-DMG_DISABLE_MQTT -DMG_DISABLE_JSON_RPC -DMG_DISABLE_SOCKETPAIR  -DMG_DISABLE_CGI # -DMG_DISABLE_HTTP_WEBSOCKET
15
LDFLAGS+=-lm
15
LDFLAGS+=-lpstreamer -lgrapes -lm
16 16

  
17 17
all: $(EXE)
18 18

  
19 19
$(EXE): $(LIBS) $(OBJS) peerstreamer-ng.c
20
	$(CC) peerstreamer-ng.c -o peerstreamer-ng $(OBJS) $(CFLAGS) $(LIBS) $(LDFLAGS)
20
	$(CC) -o peerstreamer-ng  peerstreamer-ng.c $(OBJS) Libs/mongoose/mongoose.o $(CFLAGS) $(LDFLAGS)
21 21

  
22 22
%.o: %.c 
23 23
	$(CC) $< -o $@ -c $(CFLAGS) 
Test/Makefile
4 4
TARGET_SRC += $(wildcard ../src/*.c) 
5 5
TARGET_OBJS=$(TARGET_SRC:.c=.o)
6 6

  
7
LIBS=../Libs/mongoose/mongoose.o
8
LDFLAGS +=-lm
7
LIBS+=../Libs/mongoose/mongoose.o ../Libs/GRAPES/src/libgrapes.a ../Libs/pstreamer/src/libpstreamer.a
8
LDFLAGS+=-lpstreamer -lgrapes -lm
9 9
CFLAGS += -g -W -Wall -Wno-unused-function -Wno-unused-parameter -O0 -I../Libs/mongoose/  -I../src
10
CFLAGS+=-L../Libs/GRAPES/src  -L../Libs/pstreamer/src 
10 11

  
11 12
all: $(LIBS) $(TARGET_SRC) $(TARGET_OBJS) $(OBJS)
12 13

  
......
15 16
	git submodule update Libs/mongoose/
16 17
	$(CC) -c -o Libs/mongoose/mongoose.o Libs/mongoose/mongoose.c $(CFLAGS) -DMG_DISABLE_MQTT -DMG_DISABLE_JSON_RPC -DMG_DISABLE_SOCKETPAIR  -DMG_DISABLE_CGI # -DMG_DISABLE_HTTP_WEBSOCKET
17 18

  
19
Libs/GRAPES/src/libgrapes.a:
20
	git submodule init Libs/GRAPES/
21
	git submodule update Libs/GRAPES/
22
	make -C Libs/GRAPES/ 
23

  
24
Libs/pstreamer/src/libpstreamer.a:
25
	git submodule init Libs/pstreamer/
26
	git submodule update Libs/pstreamer/
27
	make -C Libs/pstreamer/ 
28

  
18 29
%.test: %.c $(TARGET_OBJS) 
19 30
	$(CC) -o $@ $< $(CFLAGS) $(TARGET_OBJS) $(LIBS) $(LDFLAGS)
20 31

  
peerstreamer-ng.c
25 25
#include<task_manager.h>
26 26
#include<context.h>
27 27
#include<pschannel.h>
28
#include<mg_periodic_task_intfs.h>
28
#include<periodic_task_intfs.h>
29 29
#include<pstreamer.h>
30 30

  
31 31
#include<mongoose.h>
src/mg_periodic_task_intfs.c
1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

  
20
#include<mg_periodic_task_intfs.h>
21
#include<mongoose.h>
22

  
23
void add_fd_to_fdset(void * handler, int sock, char set)
24
{
25
	struct periodic_task * pt;
26

  
27
	pt = (struct periodic_task *) handler;
28

  
29
	switch (set)
30
	{
31
		case 'r':
32
			periodic_task_readfd_add(pt, sock);
33
			break;
34
		case 'w':
35
			periodic_task_writefd_add(pt, sock);
36
			break;
37
		case 'e':
38
			periodic_task_errfd_add(pt, sock);
39
			break;
40
		default:
41
			fprintf(stderr, "[ERROR] sock insertion\n");
42
	}
43
}
44

  
45

  
46
uint8_t mongoose_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
47
{
48
	struct mg_iface *iface;
49

  
50
	iface = (struct mg_iface *) periodic_task_get_data(pt);
51

  
52
	return mongoose_select_action(iface, ret, readfds, writefds, errfds);
53
}
54

  
55

  
56
uint8_t mongoose_task_reinit(struct periodic_task * pt)
57
{
58
	struct mg_iface *iface;
59
	timeout timeout_ms;
60

  
61
	iface = (struct mg_iface *) periodic_task_get_data(pt);
62

  
63
	timeout_ms = periodic_task_get_remaining_time(pt);
64
	if (timeout_ms == 0)
65
		timeout_ms = periodic_task_reset_timeout(pt);
66

  
67
	periodic_task_flush_fdsets(pt);
68

  
69
	timeout_ms = mongoose_select_init(iface, add_fd_to_fdset, (void*) pt, timeout_ms);
70
	periodic_task_set_remaining_time(pt, timeout_ms);
71

  
72
	return 0;
73
}
src/mg_periodic_task_intfs.h
1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

  
20
#ifndef __PERIODIC_TASK_INTFS__
21
#define __PERIODIC_TASK_INTFS__ 1
22

  
23
#include<task_manager.h>
24
#include <sys/select.h>
25

  
26
void add_fd_to_fdset(void * handler, int sock, char set);
27

  
28
uint8_t mongoose_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
29

  
30
uint8_t mongoose_task_reinit(struct periodic_task * pt);
31

  
32
#endif
src/ord_set.h
11 11

  
12 12
#include<stdint.h>
13 13

  
14
#define ord_set_for_each(iter, set)\
15
	for((iter) = NULL; ((iter) = ord_set_iter((set), (iter)));)
16

  
14 17
typedef uint32_t ord_set_size;
15 18

  
16 19
typedef int8_t (*cmp_func_t)(const void *, const void *);
src/path_handlers.c
89 89
		ps = pstreamer_manager_create_streamer(c->psm, ipaddr, port, id); 
90 90
		if(ps)
91 91
		{
92
			pstreamer_schedule_tasks((struct pstreamer*)ps, c->tm);
92 93
			debug("Streamer instance created\n");
93 94
			sdpuri = sdpfile_create(c, ch, ps);
94 95
			mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n");
src/periodic_task_intfs.c
1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

  
20
#include<periodic_task_intfs.h>
21
#include<mongoose.h>
22
#include<psinstance.h>
23
#include<pstreamer_event.h>
24

  
25
void add_fd_to_fdset(void * handler, int sock, char set)
26
{
27
	struct periodic_task * pt;
28

  
29
	pt = (struct periodic_task *) handler;
30

  
31
	switch (set)
32
	{
33
		case 'r':
34
			periodic_task_readfd_add(pt, sock);
35
			break;
36
		case 'w':
37
			periodic_task_writefd_add(pt, sock);
38
			break;
39
		case 'e':
40
			periodic_task_errfd_add(pt, sock);
41
			break;
42
		default:
43
			fprintf(stderr, "[ERROR] sock insertion\n");
44
	}
45
}
46

  
47
uint8_t pstreamer_topology_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
48
{
49
	struct psinstance * ps;
50
	ps = (struct psinstance *) periodic_task_get_data(pt);
51
	if (ret == 0)
52
		psinstance_topology_update(ps);
53
	return 0;
54
}
55

  
56
uint8_t pstreamer_topology_task_reinit(struct periodic_task * pt)
57
{
58
	return 0;
59
}
60

  
61
uint8_t pstreamer_offer_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
62
{
63
	struct psinstance * ps;
64
	ps = (struct psinstance *) periodic_task_get_data(pt);
65
	if (ret == 0)
66
		psinstance_send_offer(ps);
67
	return 0;
68
}
69

  
70
uint8_t pstreamer_offer_task_reinit(struct periodic_task * pt)
71
{
72
	struct psinstance * ps;
73
	ps = (struct psinstance *) periodic_task_get_data(pt);
74
	periodic_task_set_remaining_time(pt, psinstance_offer_interval(ps));
75
	return 0;
76
}
77

  
78
uint8_t pstreamer_msg_handling_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
79
{
80
	struct psinstance * ps;
81

  
82
	ps = (struct psinstance *) periodic_task_get_data(pt);
83
	if (ret > 0)  // we do not consider timeouts, we just want to handle data ready
84
		psinstance_handle_msg(ps);
85
	return 0;
86
}
87

  
88
uint8_t pstreamer_msg_handling_task_reinit(struct periodic_task * pt)
89
{
90
	struct psinstance * ps;
91
	ps = (struct psinstance *) periodic_task_get_data(pt);
92

  
93
	periodic_task_flush_fdsets(pt);
94
	pstreamer_register_fds(ps, add_fd_to_fdset, (void*)pt);
95
	return 0;
96
}
97

  
98

  
99
uint8_t mongoose_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds)
100
{
101
	struct mg_iface *iface;
102

  
103
	iface = (struct mg_iface *) periodic_task_get_data(pt);
104

  
105
	return mongoose_select_action(iface, ret, readfds, writefds, errfds);
106
}
107

  
108

  
109
uint8_t mongoose_task_reinit(struct periodic_task * pt)
110
{
111
	struct mg_iface *iface;
112
	timeout timeout_ms;
113

  
114
	iface = (struct mg_iface *) periodic_task_get_data(pt);
115

  
116
	timeout_ms = periodic_task_get_remaining_time(pt);
117
	if (timeout_ms == 0)
118
		timeout_ms = periodic_task_reset_timeout(pt);
119

  
120
	periodic_task_flush_fdsets(pt);
121

  
122
	timeout_ms = mongoose_select_init(iface, add_fd_to_fdset, (void*) pt, timeout_ms);
123
	periodic_task_set_remaining_time(pt, timeout_ms);
124

  
125
	return 0;
126
}
src/periodic_task_intfs.h
1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

  
20
#ifndef __PERIODIC_TASK_INTFS__
21
#define __PERIODIC_TASK_INTFS__ 1
22

  
23
#include<task_manager.h>
24
#include <sys/select.h>
25

  
26
uint8_t mongoose_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
27

  
28
uint8_t mongoose_task_reinit(struct periodic_task * pt);
29

  
30
uint8_t pstreamer_topology_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
31

  
32
uint8_t pstreamer_topology_task_reinit(struct periodic_task * pt);
33

  
34
uint8_t pstreamer_offer_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
35

  
36
uint8_t pstreamer_offer_task_reinit(struct periodic_task * pt);
37

  
38
uint8_t pstreamer_msg_handling_task_callback(struct periodic_task * pt, int ret, fd_set * readfds, fd_set * writefds, fd_set * errfds);
39

  
40
uint8_t pstreamer_msg_handling_task_reinit(struct periodic_task * pt);
41

  
42
#endif
src/pstreamer.c
23 23
#include<string.h>
24 24
#include<stdio.h>
25 25
#include<ord_set.h>
26
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
26 29

  
27 30
struct pstreamer {
28 31
	char source_ip[MAX_IPADDR_LENGTH];
29
	char source_port[MAX_PORT_LENGTH];
32
	uint16_t source_port;
30 33
	char id[PSID_LENGTH];  // identifier for the streamer instance
31
	struct pscontext * psc;
34
	struct psinstance * psc;
32 35
	time_t last_beat;
33 36
	uint16_t base_port;
37
	struct periodic_task * topology_task;
38
	struct periodic_task * offer_task;
39
	struct periodic_task * msg_task;
40
	struct task_manager * tm;
41
	timeout topology_interval;
34 42
};
35 43

  
36 44
struct pstreamer_manager {
37 45
	struct ord_set * streamers;
38
	uint16_t next_streaming_port;
46
	uint16_t initial_streaming_port;
39 47
};
40 48

  
49
int8_t pstreamer_init(struct pstreamer * ps)
50
/* we assume source_ip and source_port are valid strings */
51
{
52
	char config[255];
53
	char * fmt = "port=%d,dechunkiser=rtp,base=%d";
54

  
55
	sprintf(config, fmt, ps->base_port, ps->base_port+1);
56
	ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
57

  
58
	if (ps->psc)
59
	{
60
		ps->topology_task = NULL;
61
		ps->offer_task = NULL;
62
		ps->msg_task = NULL;
63
		ps->tm = NULL;
64
		ps->topology_interval = 400;
65
	}
66

  
67
	return 0;
68
}
69

  
70
int8_t pstreamer_deinit(struct pstreamer * ps)
71
{
72
	task_manager_destroy_task(ps->tm, &(ps->topology_task));
73
	task_manager_destroy_task(ps->tm, &(ps->offer_task));
74
	task_manager_destroy_task(ps->tm, &(ps->msg_task));
75
	psinstance_destroy(&(ps->psc));
76
	return 0;
77
}
78

  
41 79
int8_t pstreamer_cmp(const void * v1, const void * v2)
42 80
{
43 81
	const struct pstreamer *ps1, *ps2;
......
54 92

  
55 93
char * pstreamer_to_json(const struct pstreamer * ps)
56 94
{
57
	char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%s\"}";
95
	char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\"}";
58 96
	size_t reslength;
59 97
	char * res = NULL;
60 98

  
......
73 111

  
74 112
	psm = malloc(sizeof(struct pstreamer_manager));
75 113
	psm->streamers = ord_set_new(1, pstreamer_cmp);
76
	psm->next_streaming_port = starting_port;
114
	psm->initial_streaming_port = starting_port;
77 115

  
78 116
	return psm;
79 117
}
80 118

  
81 119
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
82 120
{
121
	const void * ps = NULL;
122
	
83 123
	if (psm && *psm)
84 124
	{
125
		ord_set_for_each(ps, (*psm)->streamers)
126
			pstreamer_deinit((struct pstreamer *)ps);
127

  
85 128
		ord_set_destroy(&((*psm)->streamers), 1);
86 129
		free(*psm);
87 130
		*psm = NULL;
......
93 136
	ps->last_beat = time(NULL);
94 137
}
95 138

  
139
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
140
{
141
	if (ps && tm && ps->psc)
142
	{
143
		ps->tm = tm;
144
		ps->topology_task = task_manager_new_task(tm, pstreamer_topology_task_reinit, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
145
		ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
146
		ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
147
	}
148
	return 0;
149
}
150

  
151
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
152
{
153
	const struct pstreamer * ps = NULL;
154
	const void * iter = NULL;
155
	uint16_t base_port;
156
	uint8_t found = 1;
157

  
158
	base_port = psm->initial_streaming_port;
159
	while (found==1)
160
	{
161
		found = 0;
162
		ord_set_for_each(iter, psm->streamers)
163
		{
164
			ps = (const struct pstreamer *) iter;
165
			if (ps->base_port == base_port)
166
				found = 1;
167
		}
168
		if (found)
169
			base_port += 5;  // we consider RTP streamers uses 4 ports
170
	}
171
	return base_port;
172
}
173

  
96 174
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id)
97 175
{
98 176
	struct pstreamer * ps = NULL;
......
102 180
	{
103 181
		ps = malloc(sizeof(struct pstreamer));
104 182
		strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
105
		strncpy(ps->source_port, source_port, MAX_PORT_LENGTH);
183
		ps->source_port = atoi(source_port);
106 184
		strncpy(ps->id, id, PSID_LENGTH);
107
		ps->base_port = psm->next_streaming_port;
108
		psm->next_streaming_port += 4;  // we consider RTP streamers uses 4 ports
185
		ps->base_port = assign_streaming_ports(psm); 
109 186
		ptr = ord_set_find(psm->streamers, (const void *) ps);
110 187
		if (ptr == NULL)
111 188
		{
112 189
			pstreamer_touch(ps);
113
			// initialize context
190
			pstreamer_init(ps);
114 191
			ord_set_insert(psm->streamers, ps, 0);
115 192
		} else
116 193
		{
......
127 204
	uint8_t res = 1;
128 205

  
129 206
	if (psm && ps)
207
	{
208
		pstreamer_deinit((struct pstreamer *)ps);
130 209
		res = ord_set_remove(psm->streamers, ps, 1);
210
	}
131 211

  
132 212
	return res;
133 213
}
src/pstreamer.h
23 23
#include<ord_set.h>
24 24
#include<stdint.h>
25 25
#include<name_lengths.h>
26
#include<task_manager.h>
26 27

  
27 28
struct pstreamer;
28 29
struct pstreamer_manager;
......
41 42

  
42 43
uint16_t pstreamer_base_port(const struct pstreamer * ps);
43 44

  
45
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm);
46

  
44 47
#endif

Also available in: Unified diff