Revision b87dc7a2 src/pstreamer.c

View differences:

src/pstreamer.c
23 23
#include<string.h>
24 24
#include<stdio.h>
25 25
#include<ord_set.h>
26
#include<psinstance.h>
27
#include<task_manager.h>
28
#include<periodic_task_intfs.h>
26 29

  
27 30
struct pstreamer {
28 31
	char source_ip[MAX_IPADDR_LENGTH];
29
	char source_port[MAX_PORT_LENGTH];
32
	uint16_t source_port;
30 33
	char id[PSID_LENGTH];  // identifier for the streamer instance
31
	struct pscontext * psc;
34
	struct psinstance * psc;
32 35
	time_t last_beat;
33 36
	uint16_t base_port;
37
	struct periodic_task * topology_task;
38
	struct periodic_task * offer_task;
39
	struct periodic_task * msg_task;
40
	struct task_manager * tm;
41
	timeout topology_interval;
34 42
};
35 43

  
36 44
struct pstreamer_manager {
37 45
	struct ord_set * streamers;
38
	uint16_t next_streaming_port;
46
	uint16_t initial_streaming_port;
39 47
};
40 48

  
49
int8_t pstreamer_init(struct pstreamer * ps)
50
/* we assume source_ip and source_port are valid strings */
51
{
52
	char config[255];
53
	char * fmt = "port=%d,dechunkiser=rtp,base=%d";
54

  
55
	sprintf(config, fmt, ps->base_port, ps->base_port+1);
56
	ps->psc = psinstance_create(ps->source_ip, ps->source_port, config);
57

  
58
	if (ps->psc)
59
	{
60
		ps->topology_task = NULL;
61
		ps->offer_task = NULL;
62
		ps->msg_task = NULL;
63
		ps->tm = NULL;
64
		ps->topology_interval = 400;
65
	}
66

  
67
	return 0;
68
}
69

  
70
int8_t pstreamer_deinit(struct pstreamer * ps)
71
{
72
	task_manager_destroy_task(ps->tm, &(ps->topology_task));
73
	task_manager_destroy_task(ps->tm, &(ps->offer_task));
74
	task_manager_destroy_task(ps->tm, &(ps->msg_task));
75
	psinstance_destroy(&(ps->psc));
76
	return 0;
77
}
78

  
41 79
int8_t pstreamer_cmp(const void * v1, const void * v2)
42 80
{
43 81
	const struct pstreamer *ps1, *ps2;
......
54 92

  
55 93
char * pstreamer_to_json(const struct pstreamer * ps)
56 94
{
57
	char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%s\"}";
95
	char fmt[] = "{\"id\":\"%s\",\"source_ip\":\"%s\",\"source_port\":\"%d\"}";
58 96
	size_t reslength;
59 97
	char * res = NULL;
60 98

  
......
73 111

  
74 112
	psm = malloc(sizeof(struct pstreamer_manager));
75 113
	psm->streamers = ord_set_new(1, pstreamer_cmp);
76
	psm->next_streaming_port = starting_port;
114
	psm->initial_streaming_port = starting_port;
77 115

  
78 116
	return psm;
79 117
}
80 118

  
81 119
void pstreamer_manager_destroy(struct pstreamer_manager ** psm)
82 120
{
121
	const void * ps = NULL;
122
	
83 123
	if (psm && *psm)
84 124
	{
125
		ord_set_for_each(ps, (*psm)->streamers)
126
			pstreamer_deinit((struct pstreamer *)ps);
127

  
85 128
		ord_set_destroy(&((*psm)->streamers), 1);
86 129
		free(*psm);
87 130
		*psm = NULL;
......
93 136
	ps->last_beat = time(NULL);
94 137
}
95 138

  
139
int8_t pstreamer_schedule_tasks(struct pstreamer *ps, struct task_manager * tm)
140
{
141
	if (ps && tm && ps->psc)
142
	{
143
		ps->tm = tm;
144
		ps->topology_task = task_manager_new_task(tm, pstreamer_topology_task_reinit, pstreamer_topology_task_callback, ps->topology_interval, ps->psc);
145
		ps->offer_task = task_manager_new_task(tm, pstreamer_offer_task_reinit, pstreamer_offer_task_callback, psinstance_offer_interval(ps->psc), ps->psc); 
146
		ps->msg_task = task_manager_new_task(tm, pstreamer_msg_handling_task_reinit, pstreamer_msg_handling_task_callback, 1000, ps->psc);
147
	}
148
	return 0;
149
}
150

  
151
uint16_t assign_streaming_ports(struct pstreamer_manager *psm)
152
{
153
	const struct pstreamer * ps = NULL;
154
	const void * iter = NULL;
155
	uint16_t base_port;
156
	uint8_t found = 1;
157

  
158
	base_port = psm->initial_streaming_port;
159
	while (found==1)
160
	{
161
		found = 0;
162
		ord_set_for_each(iter, psm->streamers)
163
		{
164
			ps = (const struct pstreamer *) iter;
165
			if (ps->base_port == base_port)
166
				found = 1;
167
		}
168
		if (found)
169
			base_port += 5;  // we consider RTP streamers uses 4 ports
170
	}
171
	return base_port;
172
}
173

  
96 174
const struct pstreamer * pstreamer_manager_create_streamer(struct pstreamer_manager * psm, const char * source_ip, const char * source_port, const char * id)
97 175
{
98 176
	struct pstreamer * ps = NULL;
......
102 180
	{
103 181
		ps = malloc(sizeof(struct pstreamer));
104 182
		strncpy(ps->source_ip, source_ip, MAX_IPADDR_LENGTH);
105
		strncpy(ps->source_port, source_port, MAX_PORT_LENGTH);
183
		ps->source_port = atoi(source_port);
106 184
		strncpy(ps->id, id, PSID_LENGTH);
107
		ps->base_port = psm->next_streaming_port;
108
		psm->next_streaming_port += 4;  // we consider RTP streamers uses 4 ports
185
		ps->base_port = assign_streaming_ports(psm); 
109 186
		ptr = ord_set_find(psm->streamers, (const void *) ps);
110 187
		if (ptr == NULL)
111 188
		{
112 189
			pstreamer_touch(ps);
113
			// initialize context
190
			pstreamer_init(ps);
114 191
			ord_set_insert(psm->streamers, ps, 0);
115 192
		} else
116 193
		{
......
127 204
	uint8_t res = 1;
128 205

  
129 206
	if (psm && ps)
207
	{
208
		pstreamer_deinit((struct pstreamer *)ps);
130 209
		res = ord_set_remove(psm->streamers, ps, 1);
210
	}
131 211

  
132 212
	return res;
133 213
}

Also available in: Unified diff