Statistics
| Branch: | Revision:

iof-bird / bird-2.0.1 / proto / bfd / io.c @ 6b3f1a54

History | View | Annotate | Download (8.99 KB)

1
/*
2
 *        BIRD -- I/O and event loop
3
 *
4
 *        Can be freely distributed and used under the terms of the GNU GPL.
5
 */
6

    
7
#include <stdio.h>
8
#include <stdlib.h>
9
#include <unistd.h>
10
#include <errno.h>
11
#include <fcntl.h>
12
#include <poll.h>
13
#include <pthread.h>
14
#include <time.h>
15
#include <sys/time.h>
16

    
17
#include "nest/bird.h"
18
#include "proto/bfd/io.h"
19

    
20
#include "lib/buffer.h"
21
#include "lib/lists.h"
22
#include "lib/resource.h"
23
#include "lib/event.h"
24
#include "lib/timer.h"
25
#include "lib/socket.h"
26

    
27

    
28
struct birdloop
29
{
30
  pool *pool;
31
  pthread_t thread;
32
  pthread_mutex_t mutex;
33

    
34
  u8 stop_called;
35
  u8 poll_active;
36
  u8 wakeup_masked;
37
  int wakeup_fds[2];
38

    
39
  struct timeloop time;
40
  list event_list;
41
  list sock_list;
42
  uint sock_num;
43

    
44
  BUFFER(sock *) poll_sk;
45
  BUFFER(struct pollfd) poll_fd;
46
  u8 poll_changed;
47
  u8 close_scheduled;
48
};
49

    
50

    
51
/*
52
 *        Current thread context
53
 */
54

    
55
static pthread_key_t current_loop_key;
56
extern pthread_key_t current_time_key;
57

    
58
static inline struct birdloop *
59
birdloop_current(void)
60
{
61
  return pthread_getspecific(current_loop_key);
62
}
63

    
64
static inline void
65
birdloop_set_current(struct birdloop *loop)
66
{
67
  pthread_setspecific(current_loop_key, loop);
68
  pthread_setspecific(current_time_key, loop ? &loop->time : &main_timeloop);
69
}
70

    
71
static inline void
72
birdloop_init_current(void)
73
{
74
  pthread_key_create(&current_loop_key, NULL);
75
}
76

    
77

    
78
/*
79
 *        Wakeup code for birdloop
80
 */
81

    
82
static void
83
pipe_new(int *pfds)
84
{
85
  int rv = pipe(pfds);
86
  if (rv < 0)
87
    die("pipe: %m");
88

    
89
  if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0)
90
    die("fcntl(O_NONBLOCK): %m");
91

    
92
  if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0)
93
    die("fcntl(O_NONBLOCK): %m");
94
}
95

    
96
void
97
pipe_drain(int fd)
98
{
99
  char buf[64];
100
  int rv;
101
  
102
 try:
103
  rv = read(fd, buf, 64);
104
  if (rv < 0)
105
  {
106
    if (errno == EINTR)
107
      goto try;
108
    if (errno == EAGAIN)
109
      return;
110
    die("wakeup read: %m");
111
  }
112
  if (rv == 64)
113
    goto try;
114
}
115

    
116
void
117
pipe_kick(int fd)
118
{
119
  u64 v = 1;
120
  int rv;
121

    
122
 try:
123
  rv = write(fd, &v, sizeof(u64));
124
  if (rv < 0)
125
  {
126
    if (errno == EINTR)
127
      goto try;
128
    if (errno == EAGAIN)
129
      return;
130
    die("wakeup write: %m");
131
  }
132
}
133

    
134
static inline void
135
wakeup_init(struct birdloop *loop)
136
{
137
  pipe_new(loop->wakeup_fds);
138
}
139

    
140
static inline void
141
wakeup_drain(struct birdloop *loop)
142
{
143
  pipe_drain(loop->wakeup_fds[0]);
144
}
145

    
146
static inline void
147
wakeup_do_kick(struct birdloop *loop)
148
{
149
  pipe_kick(loop->wakeup_fds[1]);
150
}
151

    
152
static inline void
153
wakeup_kick(struct birdloop *loop)
154
{
155
  if (!loop->wakeup_masked)
156
    wakeup_do_kick(loop);
157
  else
158
    loop->wakeup_masked = 2;
159
}
160

    
161
/* For notifications from outside */
162
void
163
wakeup_kick_current(void)
164
{
165
  struct birdloop *loop = birdloop_current();
166

    
167
  if (loop && loop->poll_active)
168
    wakeup_kick(loop);
169
}
170

    
171

    
172
/*
173
 *        Events
174
 */
175

    
176
static inline uint
177
events_waiting(struct birdloop *loop)
178
{
179
  return !EMPTY_LIST(loop->event_list);
180
}
181

    
182
static inline void
183
events_init(struct birdloop *loop)
184
{
185
  init_list(&loop->event_list);
186
}
187

    
188
static void
189
events_fire(struct birdloop *loop)
190
{
191
  times_update(&loop->time);
192
  ev_run_list(&loop->event_list);
193
}
194

    
195
void
196
ev2_schedule(event *e)
197
{
198
  struct birdloop *loop = birdloop_current();
199

    
200
  if (loop->poll_active && EMPTY_LIST(loop->event_list))
201
    wakeup_kick(loop);
202

    
203
  if (e->n.next)
204
    rem_node(&e->n);
205

    
206
  add_tail(&loop->event_list, &e->n);
207
}
208

    
209

    
210
/*
211
 *        Sockets
212
 */
213

    
214
static void
215
sockets_init(struct birdloop *loop)
216
{
217
  init_list(&loop->sock_list);
218
  loop->sock_num = 0;
219

    
220
  BUFFER_INIT(loop->poll_sk, loop->pool, 4);
221
  BUFFER_INIT(loop->poll_fd, loop->pool, 4);
222
  loop->poll_changed = 1;        /* add wakeup fd */
223
}
224

    
225
static void
226
sockets_add(struct birdloop *loop, sock *s)
227
{
228
  add_tail(&loop->sock_list, &s->n);
229
  loop->sock_num++;
230

    
231
  s->index = -1;
232
  loop->poll_changed = 1;
233

    
234
  if (loop->poll_active)
235
    wakeup_kick(loop);
236
}
237

    
238
void
239
sk_start(sock *s)
240
{
241
  struct birdloop *loop = birdloop_current();
242

    
243
  sockets_add(loop, s);
244
}
245

    
246
static void
247
sockets_remove(struct birdloop *loop, sock *s)
248
{
249
  rem_node(&s->n);
250
  loop->sock_num--;
251

    
252
  if (s->index >= 0)
253
    loop->poll_sk.data[s->index] = NULL;
254

    
255
  s->index = -1;
256
  loop->poll_changed = 1;
257

    
258
  /* Wakeup moved to sk_stop() */
259
}
260

    
261
void
262
sk_stop(sock *s)
263
{
264
  struct birdloop *loop = birdloop_current();
265

    
266
  sockets_remove(loop, s);
267

    
268
  if (loop->poll_active)
269
  {
270
    loop->close_scheduled = 1;
271
    wakeup_kick(loop);
272
  }
273
  else
274
    close(s->fd);
275

    
276
  s->fd = -1;
277
}
278

    
279
static inline uint sk_want_events(sock *s)
280
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
281

    
282
/*
283
FIXME: this should be called from sock code
284

285
static void
286
sockets_update(struct birdloop *loop, sock *s)
287
{
288
  if (s->index >= 0)
289
    loop->poll_fd.data[s->index].events = sk_want_events(s);
290
}
291
*/
292

    
293
static void
294
sockets_prepare(struct birdloop *loop)
295
{
296
  BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
297
  BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
298

    
299
  struct pollfd *pfd = loop->poll_fd.data;
300
  sock **psk = loop->poll_sk.data;
301
  uint i = 0;
302
  node *n;
303

    
304
  WALK_LIST(n, loop->sock_list)
305
  {
306
    sock *s = SKIP_BACK(sock, n, n);
307

    
308
    ASSERT(i < loop->sock_num);
309

    
310
    s->index = i;
311
    *psk = s;
312
    pfd->fd = s->fd;
313
    pfd->events = sk_want_events(s);
314
    pfd->revents = 0;
315

    
316
    pfd++;
317
    psk++;
318
    i++;
319
  }
320

    
321
  ASSERT(i == loop->sock_num);
322

    
323
  /* Add internal wakeup fd */
324
  *psk = NULL;
325
  pfd->fd = loop->wakeup_fds[0];
326
  pfd->events = POLLIN;
327
  pfd->revents = 0;
328

    
329
  loop->poll_changed = 0;
330
}
331

    
332
static void
333
sockets_close_fds(struct birdloop *loop)
334
{
335
  struct pollfd *pfd = loop->poll_fd.data;
336
  sock **psk = loop->poll_sk.data;
337
  int poll_num = loop->poll_fd.used - 1;
338

    
339
  int i;
340
  for (i = 0; i < poll_num; i++)
341
    if (psk[i] == NULL)
342
      close(pfd[i].fd);
343

    
344
  loop->close_scheduled = 0;
345
}
346

    
347
int sk_read(sock *s, int revents);
348
int sk_write(sock *s);
349

    
350
static void
351
sockets_fire(struct birdloop *loop)
352
{
353
  struct pollfd *pfd = loop->poll_fd.data;
354
  sock **psk = loop->poll_sk.data;
355
  int poll_num = loop->poll_fd.used - 1;
356

    
357
  times_update(&loop->time);
358

    
359
  /* Last fd is internal wakeup fd */
360
  if (pfd[poll_num].revents & POLLIN)
361
    wakeup_drain(loop);
362

    
363
  int i;
364
  for (i = 0; i < poll_num; pfd++, psk++, i++)
365
  {
366
    int e = 1;
367

    
368
    if (! pfd->revents)
369
      continue;
370

    
371
    if (pfd->revents & POLLNVAL)
372
      die("poll: invalid fd %d", pfd->fd);
373

    
374
    if (pfd->revents & POLLIN)
375
      while (e && *psk && (*psk)->rx_hook)
376
        e = sk_read(*psk, 0);
377

    
378
    e = 1;
379
    if (pfd->revents & POLLOUT)
380
      while (e && *psk)
381
        e = sk_write(*psk);
382
  }
383
}
384

    
385

    
386
/*
387
 *        Birdloop
388
 */
389

    
390
static void * birdloop_main(void *arg);
391

    
392
struct birdloop *
393
birdloop_new(void)
394
{
395
  /* FIXME: this init should be elsewhere and thread-safe */
396
  static int init = 0;
397
  if (!init)
398
    { birdloop_init_current(); init = 1; }
399

    
400
  pool *p = rp_new(NULL, "Birdloop root");
401
  struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop));
402
  loop->pool = p;
403
  pthread_mutex_init(&loop->mutex, NULL);
404

    
405
  wakeup_init(loop);
406

    
407
  events_init(loop);
408
  timers_init(&loop->time, p);
409
  sockets_init(loop);
410

    
411
  return loop;
412
}
413

    
414
void
415
birdloop_start(struct birdloop *loop)
416
{
417
  int rv = pthread_create(&loop->thread, NULL, birdloop_main, loop);
418
  if (rv)
419
    die("pthread_create(): %M", rv);
420
}
421

    
422
void
423
birdloop_stop(struct birdloop *loop)
424
{
425
  pthread_mutex_lock(&loop->mutex);
426
  loop->stop_called = 1;
427
  wakeup_do_kick(loop);
428
  pthread_mutex_unlock(&loop->mutex);
429

    
430
  int rv = pthread_join(loop->thread, NULL);
431
  if (rv)
432
    die("pthread_join(): %M", rv);
433
}
434

    
435
void
436
birdloop_free(struct birdloop *loop)
437
{
438
  rfree(loop->pool);
439
}
440

    
441

    
442
void
443
birdloop_enter(struct birdloop *loop)
444
{
445
  /* TODO: these functions could save and restore old context */
446
  pthread_mutex_lock(&loop->mutex);
447
  birdloop_set_current(loop);
448
}
449

    
450
void
451
birdloop_leave(struct birdloop *loop)
452
{
453
  /* TODO: these functions could save and restore old context */
454
  birdloop_set_current(NULL);
455
  pthread_mutex_unlock(&loop->mutex);
456
}
457

    
458
void
459
birdloop_mask_wakeups(struct birdloop *loop)
460
{
461
  pthread_mutex_lock(&loop->mutex);
462
  loop->wakeup_masked = 1;
463
  pthread_mutex_unlock(&loop->mutex);
464
}
465

    
466
void
467
birdloop_unmask_wakeups(struct birdloop *loop)
468
{
469
  pthread_mutex_lock(&loop->mutex);
470
  if (loop->wakeup_masked == 2)
471
    wakeup_do_kick(loop);
472
  loop->wakeup_masked = 0;
473
  pthread_mutex_unlock(&loop->mutex);
474
}
475

    
476
static void *
477
birdloop_main(void *arg)
478
{
479
  struct birdloop *loop = arg;
480
  timer *t;
481
  int rv, timeout;
482

    
483
  birdloop_set_current(loop);
484

    
485
  pthread_mutex_lock(&loop->mutex);
486
  while (1)
487
  {
488
    events_fire(loop);
489
    timers_fire(&loop->time);
490

    
491
    times_update(&loop->time);
492
    if (events_waiting(loop))
493
      timeout = 0;
494
    else if (t = timers_first(&loop->time))
495
      timeout = (tm_remains(t) TO_MS) + 1;
496
    else
497
      timeout = -1;
498

    
499
    if (loop->poll_changed)
500
      sockets_prepare(loop);
501

    
502
    loop->poll_active = 1;
503
    pthread_mutex_unlock(&loop->mutex);
504

    
505
  try:
506
    rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
507
    if (rv < 0)
508
    {
509
      if (errno == EINTR || errno == EAGAIN)
510
        goto try;
511
      die("poll: %m");
512
    }
513

    
514
    pthread_mutex_lock(&loop->mutex);
515
    loop->poll_active = 0;
516

    
517
    if (loop->close_scheduled)
518
      sockets_close_fds(loop);
519

    
520
    if (loop->stop_called)
521
      break;
522

    
523
    if (rv)
524
      sockets_fire(loop);
525

    
526
    timers_fire(&loop->time);
527
  }
528

    
529
  loop->stop_called = 0;
530
  pthread_mutex_unlock(&loop->mutex);
531

    
532
  return NULL;
533
}
534

    
535