Statistics
| Branch: | Revision:

peerstreamer-src / src / task_manager.c @ 9eb656e7

History | View | Annotate | Download (6.99 KB)

1
/*******************************************************************
2
* PeerStreamer-ng is a P2P video streaming application exposing a ReST
3
* interface.
4
* Copyright (C) 2017 Luca Baldesi <luca.baldesi@unitn.it>
5
*
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
10
*
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
* GNU Affero General Public License for more details.
15
*
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
*******************************************************************/
19

    
20
#include<task_manager.h>
21
#include<malloc.h>
22
#include<int_bucket.h>
23
#include<list.h>
24
#include<sys/time.h>
25

    
26
typedef int sock_t;
27

    
28
#define TASK_NUM_INC 10
29
#define MIN(a, b) ((a) < (b) ? (a) : (b))
30
#define MAX(a, b) ((a) > (b) ? (a) : (b))
31
#define INVALID_SOCKET (-1)
32

    
33
struct periodic_task {
34
        periodic_task_callback callback;
35
        periodic_task_reinit reinit;
36
        timeout to;
37
        timeout time_to_expire;
38
        struct int_bucket * writefds;
39
        struct int_bucket * readfds;
40
        struct int_bucket * errfds;
41
        struct list_head list_el;
42
};
43

    
44
struct task_manager {
45
        struct list_head list;
46
        struct list_head * tasks;
47
};
48

    
49
struct task_manager * task_manager_new()
50
{
51
        struct task_manager * tm;
52

    
53
        tm = (struct task_manager *) malloc(sizeof(struct task_manager));
54
        tm->tasks = &(tm->list);
55
        INIT_LIST_HEAD(tm->tasks);
56

    
57
        return tm;
58
}
59

    
60
struct periodic_task * periodic_task_new(periodic_task_reinit reinit, periodic_task_callback callback, timeout to)
61
{
62
        struct periodic_task * pt;
63

    
64
        pt = (struct periodic_task *) malloc(sizeof(struct periodic_task));
65
        pt->reinit = reinit;
66
        pt->callback = callback;
67
        pt->to = to;
68
        pt->time_to_expire = to;
69
        pt->writefds = int_bucket_new(10);
70
        pt->readfds = int_bucket_new(10);
71
        pt->errfds = int_bucket_new(10);
72

    
73
        if (pt->reinit)
74
                pt->reinit(pt);
75
        return pt;
76
}
77

    
78
int periodic_task_reset_timeout(struct periodic_task * pt, timeout to)
79
{
80
        if(pt)
81
        {
82
                pt->to = to;
83
                return 0;
84
        }
85
        else
86
                return -1;
87
}
88

    
89
int periodic_task_writefd_add(struct periodic_task * pt, int fd)
90
{
91
        if(pt && fd > 0)
92
                return int_bucket_insert(pt->writefds, fd, 1);
93
        else
94
                return -1;
95
}
96

    
97
int periodic_task_readfd_add(struct periodic_task * pt, int fd)
98
{
99
        if(pt && fd > 0)
100
                return int_bucket_insert(pt->readfds, fd, 1);
101
        else
102
                return -1;
103
}
104

    
105
int periodic_task_errfd_add(struct periodic_task * pt, int fd)
106
{
107
        if(pt && fd > 0)
108
                return int_bucket_insert(pt->errfds, fd, 1);
109
        else
110
                return -1;
111
}
112

    
113
uint8_t task_manager_add(struct task_manager *tm, struct periodic_task * pt)
114
{
115
        if (tm && pt)
116
        {
117
                list_add(&(pt->list_el), tm->tasks);
118
                return 0;
119
        }
120
        else
121
        {
122
                return 1;
123
        }
124
}
125

    
126
void periodic_task_destroy(struct periodic_task ** pt)
127
{
128
        if(pt && *pt)
129
        {
130
                int_bucket_destroy(&((*pt)->writefds));
131
                int_bucket_destroy(&((*pt)->readfds));
132
                int_bucket_destroy(&((*pt)->errfds));
133
                free(*pt);
134
                *pt = NULL;
135
        }
136
}
137

    
138
void task_manager_destroy(struct task_manager ** tm)
139
{
140
        struct periodic_task *pos, *n;
141

    
142
        if (tm && *tm)
143
        {
144
                list_for_each_entry_safe(pos, n, (*tm)->tasks, list_el)
145
                        periodic_task_destroy(&(pos));
146
                free(*tm);
147
                *tm = NULL;
148
        }
149
}
150

    
151
struct periodic_task * task_manager_new_task(struct task_manager *tm, periodic_task_reinit reinit, periodic_task_callback callback, timeout to)
152
{
153
        struct periodic_task * pt;
154
        uint8_t res;
155

    
156
        pt = periodic_task_new(reinit, callback, to);
157
        res = task_manager_add(tm, pt);
158
        if (res)
159
        {
160
                periodic_task_destroy(&pt);
161
                pt = NULL;
162
        }
163
        return pt;
164
}
165

    
166
void task_manager_int_bucket2fd_set(const struct int_bucket *fd_int, fd_set * fds, sock_t * max_fd)
167
{
168
        const uint32_t * fd;
169
        fd = NULL;
170
        while((fd = int_bucket_iter(fd_int, fd)))
171
                {
172
                        FD_SET(*fd, fds);
173
                        if((*max_fd) == INVALID_SOCKET || *fd - (*max_fd) > 0)
174
                                *max_fd = *fd;
175
                }
176
}
177

    
178
void periodic_task_trigger(struct periodic_task * pt, int event, fd_set * read_set, fd_set *write_set, fd_set *err_set)
179
{
180
        if (pt)
181
        {
182
                if(pt->callback)
183
                        pt->callback(event, read_set, write_set, err_set);
184
                if(pt->reinit)
185
                        pt->reinit(pt);
186
                pt->time_to_expire = pt->to;
187
        }
188
}
189

    
190
uint8_t periodic_task_triggerable(const struct periodic_task *pt, fd_set * read_set, fd_set *write_set, fd_set *err_set)
191
{
192
        const uint32_t * iter = NULL;
193
        uint8_t triggerable = 0;
194

    
195
        while (!triggerable && (iter = int_bucket_iter(pt->readfds, iter)))
196
                if (FD_ISSET(*iter, read_set))
197
                        triggerable = 1;
198
        while (!triggerable && (iter = int_bucket_iter(pt->writefds, iter)))
199
                if (FD_ISSET(*iter, write_set))
200
                        triggerable = 1;
201
        while (!triggerable && (iter = int_bucket_iter(pt->errfds, iter)))
202
                if (FD_ISSET(*iter, err_set))
203
                        triggerable = 1;
204

    
205
        return triggerable;
206
}
207

    
208
int stopwatch_select(int maxfd, fd_set * rset, fd_set * wset, fd_set * eset, struct timeval * tv, timeout * elapsed)
209
{
210
        int res;
211
        struct timeval t1, t2, diff;
212

    
213
        gettimeofday(&t1, NULL);
214
        res = select(maxfd, rset, wset, eset, tv);
215
        gettimeofday(&t2, NULL);
216
        timersub(&t2, &t1, &diff);
217
        *elapsed = ((diff.tv_sec) * 1000 + diff.tv_usec/1000.0) + 0.5;
218
        return res;
219
}
220

    
221
int task_manager_poll(struct task_manager * tm, timeout to)
222
{
223
        timeout sleep_time, min_to;  // milliseconds, of course
224
        fd_set read_set, write_set, err_set;
225
        sock_t max_fd;
226
        struct timeval tv;
227
        struct periodic_task * pos, *triggered_task;
228
        int event;
229

    
230
        if (tm)
231
        {
232
                max_fd = INVALID_SOCKET;
233
                FD_ZERO(&read_set);
234
                FD_ZERO(&write_set);
235
                FD_ZERO(&err_set);
236
                min_to = to;
237
                triggered_task = NULL;  // it means task manager got triggered for a timeout
238

    
239
                list_for_each_entry(pos, tm->tasks, list_el)
240
                {
241
                        task_manager_int_bucket2fd_set(pos->readfds, &read_set, &max_fd);
242
                        task_manager_int_bucket2fd_set(pos->writefds, &write_set, &max_fd);
243
                        task_manager_int_bucket2fd_set(pos->errfds, &err_set, &max_fd);
244
                        if (pos->time_to_expire < min_to)
245
                        {
246
                                min_to = pos->time_to_expire;
247
                                triggered_task = pos;        
248
                        }
249
                }
250

    
251
                tv.tv_sec = min_to / 1000;
252
                tv.tv_usec = (min_to % 1000) * 1000;
253

    
254
                event = stopwatch_select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv, &sleep_time);
255

    
256
                if(event < 0)
257
                        perror("Error on select()");
258

    
259
                if(event == 0)  // timeout
260
                {
261
                        list_for_each_entry(pos, tm->tasks, list_el)
262
                                pos->time_to_expire = MAX(0, pos->time_to_expire - sleep_time);
263
                        if (triggered_task != NULL)  // not idle loop, an actual task as to perform periodic update
264
                                periodic_task_trigger(triggered_task, event, &read_set, &write_set, &err_set);
265
                }
266

    
267
                if (event > 0)
268
                        list_for_each_entry(pos, tm->tasks, list_el)
269
                        {
270
                                pos->time_to_expire = MAX(0, pos->time_to_expire - sleep_time);
271
                                if(periodic_task_triggerable(pos, &read_set, &write_set, &err_set))
272
                                        periodic_task_trigger(pos, event, &read_set, &write_set, &err_set);
273
                        }
274

    
275
                return event;
276
        }
277
        else
278
                return -1;
279
}
280

    
281
void task_manager_destroy_task(struct task_manager * tm, struct periodic_task ** pt)
282
{
283
        if (tm && pt && *pt)
284
        {
285
                list_del(&((*pt)->list_el));
286
                periodic_task_destroy(pt);
287
        }
288

    
289
}