Statistics
| Branch: | Revision:

peerstreamer-src / src / task_manager.c @ 58fb2cdc

History | View | Annotate | Download (7.66 KB)

1 9eb656e7 Luca Baldesi
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19
20
#include<task_manager.h>
21
#include<malloc.h>
22
#include<int_bucket.h>
23
#include<list.h>
24
#include<sys/time.h>
25
26
typedef int sock_t;
27
28
#define TASK_NUM_INC 10
29 c03fc03d Luca Baldesi
#ifndef MIN
30 9eb656e7 Luca Baldesi
#define MIN(a, b) ((a) < (b) ? (a) : (b))
31 c03fc03d Luca Baldesi
#endif
32
#ifndef MAX
33 9eb656e7 Luca Baldesi
#define MAX(a, b) ((a) > (b) ? (a) : (b))
34 c03fc03d Luca Baldesi
#endif
35 9eb656e7 Luca Baldesi
36
struct periodic_task {
37
        periodic_task_callback callback;
38
        periodic_task_reinit reinit;
39
        timeout to;
40
        timeout time_to_expire;
41
        struct int_bucket * writefds;
42
        struct int_bucket * readfds;
43
        struct int_bucket * errfds;
44
        struct list_head list_el;
45 c03fc03d Luca Baldesi
        void * data;
46 9eb656e7 Luca Baldesi
};
47
48
struct task_manager {
49
        struct list_head list;
50
        struct list_head * tasks;
51
};
52
53
struct task_manager * task_manager_new()
54
{
55
        struct task_manager * tm;
56
57
        tm = (struct task_manager *) malloc(sizeof(struct task_manager));
58
        tm->tasks = &(tm->list);
59
        INIT_LIST_HEAD(tm->tasks);
60
61
        return tm;
62
}
63
64 c03fc03d Luca Baldesi
struct periodic_task * periodic_task_new(periodic_task_reinit reinit, periodic_task_callback callback, timeout to, void * data)
65 9eb656e7 Luca Baldesi
{
66
        struct periodic_task * pt;
67
68
        pt = (struct periodic_task *) malloc(sizeof(struct periodic_task));
69
        pt->reinit = reinit;
70
        pt->callback = callback;
71
        pt->to = to;
72
        pt->time_to_expire = to;
73
        pt->writefds = int_bucket_new(10);
74
        pt->readfds = int_bucket_new(10);
75
        pt->errfds = int_bucket_new(10);
76 c03fc03d Luca Baldesi
        pt->data = data;
77 9eb656e7 Luca Baldesi
78
        if (pt->reinit)
79
                pt->reinit(pt);
80
        return pt;
81
}
82
83 c03fc03d Luca Baldesi
int periodic_task_set_remaining_time(struct periodic_task * pt, timeout to)
84 9eb656e7 Luca Baldesi
{
85
        if(pt)
86
        {
87 c03fc03d Luca Baldesi
                pt->time_to_expire = to;
88 9eb656e7 Luca Baldesi
                return 0;
89
        }
90
        else
91
                return -1;
92
}
93
94 c03fc03d Luca Baldesi
timeout periodic_task_reset_timeout(struct periodic_task * pt)
95
{
96
        if(pt)
97
        {
98
                pt->time_to_expire = pt->to;
99
                return pt->to;
100
        }
101
        else
102
                return 0;
103
}
104
105 9eb656e7 Luca Baldesi
int periodic_task_writefd_add(struct periodic_task * pt, int fd)
106
{
107
        if(pt && fd > 0)
108
                return int_bucket_insert(pt->writefds, fd, 1);
109
        else
110
                return -1;
111
}
112
113
int periodic_task_readfd_add(struct periodic_task * pt, int fd)
114
{
115
        if(pt && fd > 0)
116 fc82e6c5 Luca Baldesi
                return int_bucket_insert(pt->readfds, (uint32_t) fd, 1);
117 9eb656e7 Luca Baldesi
        else
118
                return -1;
119
}
120
121
int periodic_task_errfd_add(struct periodic_task * pt, int fd)
122
{
123
        if(pt && fd > 0)
124
                return int_bucket_insert(pt->errfds, fd, 1);
125
        else
126
                return -1;
127
}
128
129
uint8_t task_manager_add(struct task_manager *tm, struct periodic_task * pt)
130
{
131
        if (tm && pt)
132
        {
133
                list_add(&(pt->list_el), tm->tasks);
134
                return 0;
135
        }
136
        else
137
        {
138
                return 1;
139
        }
140
}
141
142
void periodic_task_destroy(struct periodic_task ** pt)
143
{
144
        if(pt && *pt)
145
        {
146
                int_bucket_destroy(&((*pt)->writefds));
147
                int_bucket_destroy(&((*pt)->readfds));
148
                int_bucket_destroy(&((*pt)->errfds));
149
                free(*pt);
150
                *pt = NULL;
151
        }
152
}
153
154
void task_manager_destroy(struct task_manager ** tm)
155
{
156
        struct periodic_task *pos, *n;
157
158
        if (tm && *tm)
159
        {
160
                list_for_each_entry_safe(pos, n, (*tm)->tasks, list_el)
161
                        periodic_task_destroy(&(pos));
162
                free(*tm);
163
                *tm = NULL;
164
        }
165
}
166
167 c03fc03d Luca Baldesi
struct periodic_task * task_manager_new_task(struct task_manager *tm, periodic_task_reinit reinit, periodic_task_callback callback, timeout to, void * data)
168 9eb656e7 Luca Baldesi
{
169
        struct periodic_task * pt;
170
        uint8_t res;
171
172 c03fc03d Luca Baldesi
        pt = periodic_task_new(reinit, callback, to, data);
173 9eb656e7 Luca Baldesi
        res = task_manager_add(tm, pt);
174
        if (res)
175
        {
176
                periodic_task_destroy(&pt);
177
                pt = NULL;
178
        }
179
        return pt;
180
}
181
182
void task_manager_int_bucket2fd_set(const struct int_bucket *fd_int, fd_set * fds, sock_t * max_fd)
183
{
184
        const uint32_t * fd;
185
        fd = NULL;
186
        while((fd = int_bucket_iter(fd_int, fd)))
187
                {
188 fc82e6c5 Luca Baldesi
                        FD_SET((int)*fd, fds);
189 b7ea7ae7 Luca Baldesi
                        if((*max_fd) == INVALID_SOCKET || ((int)*fd) - ((int)(*max_fd)) > 0)
190 9eb656e7 Luca Baldesi
                                *max_fd = *fd;
191
                }
192
}
193
194
void periodic_task_trigger(struct periodic_task * pt, int event, fd_set * read_set, fd_set *write_set, fd_set *err_set)
195
{
196
        if (pt)
197
        {
198
                if(pt->callback)
199 c03fc03d Luca Baldesi
                        pt->callback(pt, event, read_set, write_set, err_set);
200 9eb656e7 Luca Baldesi
                if(pt->reinit)
201
                        pt->reinit(pt);
202
                pt->time_to_expire = pt->to;
203
        }
204
}
205
206
uint8_t periodic_task_triggerable(const struct periodic_task *pt, fd_set * read_set, fd_set *write_set, fd_set *err_set)
207
{
208
        const uint32_t * iter = NULL;
209
        uint8_t triggerable = 0;
210
211
        while (!triggerable && (iter = int_bucket_iter(pt->readfds, iter)))
212 fc82e6c5 Luca Baldesi
        {
213
                if (FD_ISSET((int) *iter, read_set))
214 9eb656e7 Luca Baldesi
                        triggerable = 1;
215 fc82e6c5 Luca Baldesi
        }
216 9eb656e7 Luca Baldesi
        while (!triggerable && (iter = int_bucket_iter(pt->writefds, iter)))
217
                if (FD_ISSET(*iter, write_set))
218
                        triggerable = 1;
219
        while (!triggerable && (iter = int_bucket_iter(pt->errfds, iter)))
220
                if (FD_ISSET(*iter, err_set))
221
                        triggerable = 1;
222
223
        return triggerable;
224
}
225
226
int stopwatch_select(int maxfd, fd_set * rset, fd_set * wset, fd_set * eset, struct timeval * tv, timeout * elapsed)
227
{
228
        int res;
229
        struct timeval t1, t2, diff;
230
231
        gettimeofday(&t1, NULL);
232
        res = select(maxfd, rset, wset, eset, tv);
233
        gettimeofday(&t2, NULL);
234
        timersub(&t2, &t1, &diff);
235
        *elapsed = ((diff.tv_sec) * 1000 + diff.tv_usec/1000.0) + 0.5;
236
        return res;
237
}
238
239
int task_manager_poll(struct task_manager * tm, timeout to)
240
{
241
        timeout sleep_time, min_to;  // milliseconds, of course
242
        fd_set read_set, write_set, err_set;
243
        sock_t max_fd;
244
        struct timeval tv;
245 c03fc03d Luca Baldesi
        struct periodic_task * pos;
246 9eb656e7 Luca Baldesi
        int event;
247
248
        if (tm)
249
        {
250
                max_fd = INVALID_SOCKET;
251
                FD_ZERO(&read_set);
252
                FD_ZERO(&write_set);
253
                FD_ZERO(&err_set);
254
                min_to = to;
255
256
                list_for_each_entry(pos, tm->tasks, list_el)
257
                {
258
                        task_manager_int_bucket2fd_set(pos->readfds, &read_set, &max_fd);
259
                        task_manager_int_bucket2fd_set(pos->writefds, &write_set, &max_fd);
260
                        task_manager_int_bucket2fd_set(pos->errfds, &err_set, &max_fd);
261
                        if (pos->time_to_expire < min_to)
262
                                min_to = pos->time_to_expire;
263
                }
264
265
                tv.tv_sec = min_to / 1000;
266
                tv.tv_usec = (min_to % 1000) * 1000;
267
268
                event = stopwatch_select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv, &sleep_time);
269
270
                if(event < 0)
271
                        perror("Error on select()");
272
273
                if(event == 0)  // timeout
274
                {
275
                        list_for_each_entry(pos, tm->tasks, list_el)
276 c03fc03d Luca Baldesi
                        {
277 9eb656e7 Luca Baldesi
                                pos->time_to_expire = MAX(0, pos->time_to_expire - sleep_time);
278 c03fc03d Luca Baldesi
                                if (pos->time_to_expire == 0)  // not idle loop, an actual task has to perform a periodic update
279
                                        periodic_task_trigger(pos, event, &read_set, &write_set, &err_set);
280
                        }
281 9eb656e7 Luca Baldesi
                }
282
283
                if (event > 0)
284
                        list_for_each_entry(pos, tm->tasks, list_el)
285
                        {
286
                                pos->time_to_expire = MAX(0, pos->time_to_expire - sleep_time);
287
                                if(periodic_task_triggerable(pos, &read_set, &write_set, &err_set))
288
                                        periodic_task_trigger(pos, event, &read_set, &write_set, &err_set);
289
                        }
290
291
                return event;
292
        }
293
        else
294
                return -1;
295
}
296
297
void task_manager_destroy_task(struct task_manager * tm, struct periodic_task ** pt)
298
{
299
        if (tm && pt && *pt)
300
        {
301
                list_del(&((*pt)->list_el));
302
                periodic_task_destroy(pt);
303
        }
304
305
}
306 c03fc03d Luca Baldesi
307
int periodic_task_set_data(struct periodic_task * pt, void * data)
308
{
309
        if (pt)
310
        {
311
                pt->data = data;
312
                return 0;
313
        }
314
        return 1;
315
}
316
317
void * periodic_task_get_data(const struct periodic_task * pt)
318
{
319
        if (pt)
320
                return pt->data;
321
        return NULL;
322
}
323
324
timeout periodic_task_get_remaining_time(const struct periodic_task * pt)
325
{
326
        if (pt)
327
                return pt->time_to_expire;
328
        return 0;
329
}
330
331
void periodic_task_flush_fdsets(struct periodic_task *pt)
332
{
333
        int_bucket_flush(pt->writefds);
334
        int_bucket_flush(pt->readfds);
335
        int_bucket_flush(pt->errfds);
336
}