2 * Copyright (c) 2016 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #if DISPATCH_EVENT_BACKEND_EPOLL
24 #include <linux/sockios.h>
25 #include <sys/epoll.h>
26 #include <sys/eventfd.h>
27 #include <sys/signalfd.h>
28 #include <sys/timerfd.h>
31 #define EPOLLFREE 0x4000
34 #if !DISPATCH_USE_MGR_THREAD
35 #error unsupported configuration
38 #define DISPATCH_EPOLL_MAX_EVENT_COUNT 16
41 DISPATCH_EPOLL_EVENTFD
= 0x0001,
42 DISPATCH_EPOLL_CLOCK_WALL
= 0x0002,
43 DISPATCH_EPOLL_CLOCK_MACH
= 0x0003,
46 typedef struct dispatch_muxnote_s
{
47 TAILQ_ENTRY(dispatch_muxnote_s
) dmn_list
;
48 TAILQ_HEAD(, dispatch_unote_linkage_s
) dmn_readers_head
;
49 TAILQ_HEAD(, dispatch_unote_linkage_s
) dmn_writers_head
;
54 bool dmn_skip_outq_ioctl
;
55 bool dmn_skip_inq_ioctl
;
56 } *dispatch_muxnote_t
;
58 typedef struct dispatch_epoll_timeout_s
{
63 } *dispatch_epoll_timeout_t
;
65 static int _dispatch_epfd
, _dispatch_eventfd
;
67 static dispatch_once_t epoll_init_pred
;
68 static void _dispatch_epoll_init(void *);
70 DISPATCH_CACHELINE_ALIGN
71 static TAILQ_HEAD(dispatch_muxnote_bucket_s
, dispatch_muxnote_s
)
72 _dispatch_sources
[DSL_HASH_SIZE
];
74 #define DISPATCH_EPOLL_TIMEOUT_INITIALIZER(clock) \
75 [DISPATCH_CLOCK_##clock] = { \
77 .det_ident = DISPATCH_EPOLL_CLOCK_##clock, \
79 static struct dispatch_epoll_timeout_s _dispatch_epoll_timeout
[] = {
80 DISPATCH_EPOLL_TIMEOUT_INITIALIZER(WALL
),
81 DISPATCH_EPOLL_TIMEOUT_INITIALIZER(MACH
),
84 #pragma mark dispatch_muxnote_t
86 DISPATCH_ALWAYS_INLINE
87 static inline struct dispatch_muxnote_bucket_s
*
88 _dispatch_muxnote_bucket(uint32_t ident
)
90 return &_dispatch_sources
[DSL_HASH(ident
)];
92 #define _dispatch_unote_muxnote_bucket(du) \
93 _dispatch_muxnote_bucket(du._du->du_ident)
95 DISPATCH_ALWAYS_INLINE
96 static inline dispatch_muxnote_t
97 _dispatch_muxnote_find(struct dispatch_muxnote_bucket_s
*dmb
,
98 uint32_t ident
, int16_t filter
)
100 dispatch_muxnote_t dmn
;
101 if (filter
== EVFILT_WRITE
) filter
= EVFILT_READ
;
102 TAILQ_FOREACH(dmn
, dmb
, dmn_list
) {
103 if (dmn
->dmn_ident
== ident
&& dmn
->dmn_filter
== filter
) {
109 #define _dispatch_unote_muxnote_find(dmb, du) \
110 _dispatch_muxnote_find(dmb, du._du->du_ident, du._du->du_filter)
113 _dispatch_muxnote_dispose(dispatch_muxnote_t dmn
)
115 if (dmn
->dmn_filter
!= EVFILT_READ
|| (uint32_t)dmn
->dmn_fd
!= dmn
->dmn_ident
) {
121 static pthread_t manager_thread
;
124 _dispatch_muxnote_signal_block_and_raise(int signo
)
126 // On linux, for signals to be delivered to the signalfd, signals
127 // must be blocked, else any thread that hasn't them blocked may
128 // receive them. Fix that by lazily noticing, blocking said signal,
129 // and raising the signal again when it happens
131 pthread_kill(manager_thread
, signo
);
134 static dispatch_muxnote_t
135 _dispatch_muxnote_create(dispatch_unote_t du
, uint32_t events
)
137 static sigset_t signals_with_unotes
;
138 static struct sigaction sa
= {
139 .sa_handler
= _dispatch_muxnote_signal_block_and_raise
,
140 .sa_flags
= SA_RESTART
,
143 dispatch_muxnote_t dmn
;
145 int fd
= (int)du
._du
->du_ident
;
146 int16_t filter
= du
._du
->du_filter
;
147 bool skip_outq_ioctl
= false, skip_inq_ioctl
= false;
151 case EVFILT_SIGNAL
: {
152 int signo
= (int)du
._du
->du_ident
;
153 if (!sigismember(&signals_with_unotes
, signo
)) {
154 manager_thread
= pthread_self();
155 sigaddset(&signals_with_unotes
, signo
);
156 sigaction(signo
, &sa
, NULL
);
158 sigemptyset(&sigmask
);
159 sigaddset(&sigmask
, signo
);
160 fd
= signalfd(-1, &sigmask
, SFD_NONBLOCK
| SFD_CLOEXEC
);
167 filter
= EVFILT_READ
;
169 if (fstat(fd
, &sb
) < 0) {
172 if (S_ISREG(sb
.st_mode
)) {
173 // make a dummy fd that is both readable & writeable
174 fd
= eventfd(1, EFD_CLOEXEC
| EFD_NONBLOCK
);
178 // Linux doesn't support output queue size ioctls for regular files
179 skip_outq_ioctl
= true;
180 } else if (S_ISSOCK(sb
.st_mode
)) {
181 socklen_t vlen
= sizeof(int);
183 // Linux doesn't support saying how many clients are ready to be
184 // accept()ed for sockets
185 if (getsockopt(fd
, SOL_SOCKET
, SO_ACCEPTCONN
, &v
, &vlen
) == 0) {
186 skip_inq_ioctl
= (bool)v
;
192 DISPATCH_INTERNAL_CRASH(0, "Unexpected filter");
195 dmn
= _dispatch_calloc(1, sizeof(struct dispatch_muxnote_s
));
196 TAILQ_INIT(&dmn
->dmn_readers_head
);
197 TAILQ_INIT(&dmn
->dmn_writers_head
);
199 dmn
->dmn_ident
= du
._du
->du_ident
;
200 dmn
->dmn_filter
= filter
;
201 dmn
->dmn_events
= events
;
202 dmn
->dmn_skip_outq_ioctl
= skip_outq_ioctl
;
203 dmn
->dmn_skip_inq_ioctl
= skip_inq_ioctl
;
207 #pragma mark dispatch_unote_t
210 _dispatch_epoll_update(dispatch_muxnote_t dmn
, int op
)
212 dispatch_once_f(&epoll_init_pred
, NULL
, _dispatch_epoll_init
);
213 struct epoll_event ev
= {
214 .events
= dmn
->dmn_events
,
215 .data
= { .ptr
= dmn
},
217 return epoll_ctl(_dispatch_epfd
, op
, dmn
->dmn_fd
, &ev
);
221 _dispatch_unote_register(dispatch_unote_t du
,
222 DISPATCH_UNUSED dispatch_wlh_t wlh
, dispatch_priority_t pri
)
224 struct dispatch_muxnote_bucket_s
*dmb
;
225 dispatch_muxnote_t dmn
;
226 uint32_t events
= EPOLLFREE
;
228 dispatch_assert(!_dispatch_unote_registered(du
));
229 du
._du
->du_priority
= pri
;
231 switch (du
._du
->du_filter
) {
232 case DISPATCH_EVFILT_CUSTOM_ADD
:
233 case DISPATCH_EVFILT_CUSTOM_OR
:
234 case DISPATCH_EVFILT_CUSTOM_REPLACE
:
235 du
._du
->du_wlh
= DISPATCH_WLH_ANON
;
245 if (du
._du
->du_type
->dst_flags
& EV_DISPATCH
) {
246 events
|= EPOLLONESHOT
;
249 dmb
= _dispatch_unote_muxnote_bucket(du
);
250 dmn
= _dispatch_unote_muxnote_find(dmb
, du
);
252 events
&= ~dmn
->dmn_events
;
254 dmn
->dmn_events
|= events
;
255 if (_dispatch_epoll_update(dmn
, EPOLL_CTL_MOD
) < 0) {
256 dmn
->dmn_events
&= ~events
;
261 dmn
= _dispatch_muxnote_create(du
, events
);
262 if (_dispatch_epoll_update(dmn
, EPOLL_CTL_ADD
) < 0) {
263 _dispatch_muxnote_dispose(dmn
);
266 TAILQ_INSERT_TAIL(dmb
, dmn
, dmn_list
);
271 dispatch_unote_linkage_t dul
= _dispatch_unote_get_linkage(du
);
272 if (events
& EPOLLOUT
) {
273 TAILQ_INSERT_TAIL(&dmn
->dmn_writers_head
, dul
, du_link
);
275 TAILQ_INSERT_TAIL(&dmn
->dmn_readers_head
, dul
, du_link
);
277 dul
->du_muxnote
= dmn
;
278 dispatch_assert(du
._du
->du_wlh
== NULL
);
279 du
._du
->du_wlh
= DISPATCH_WLH_ANON
;
285 _dispatch_unote_resume(dispatch_unote_t du
)
287 dispatch_muxnote_t dmn
= _dispatch_unote_get_linkage(du
)->du_muxnote
;
288 dispatch_assert(_dispatch_unote_registered(du
));
290 _dispatch_epoll_update(dmn
, EPOLL_CTL_MOD
);
294 _dispatch_unote_unregister(dispatch_unote_t du
, DISPATCH_UNUSED
uint32_t flags
)
296 switch (du
._du
->du_filter
) {
297 case DISPATCH_EVFILT_CUSTOM_ADD
:
298 case DISPATCH_EVFILT_CUSTOM_OR
:
299 case DISPATCH_EVFILT_CUSTOM_REPLACE
:
300 du
._du
->du_wlh
= NULL
;
303 if (_dispatch_unote_registered(du
)) {
304 dispatch_unote_linkage_t dul
= _dispatch_unote_get_linkage(du
);
305 dispatch_muxnote_t dmn
= dul
->du_muxnote
;
306 uint32_t events
= dmn
->dmn_events
;
308 if (du
._du
->du_filter
== EVFILT_WRITE
) {
309 TAILQ_REMOVE(&dmn
->dmn_writers_head
, dul
, du_link
);
311 TAILQ_REMOVE(&dmn
->dmn_readers_head
, dul
, du_link
);
313 _TAILQ_TRASH_ENTRY(dul
, du_link
);
314 dul
->du_muxnote
= NULL
;
316 if (TAILQ_EMPTY(&dmn
->dmn_readers_head
)) {
317 events
&= (uint32_t)(~EPOLLIN
);
319 if (TAILQ_EMPTY(&dmn
->dmn_writers_head
)) {
320 events
&= (uint32_t)(~EPOLLOUT
);
323 if (events
== dmn
->dmn_events
) {
325 } else if (events
& (EPOLLIN
| EPOLLOUT
)) {
326 dmn
->dmn_events
= events
;
327 _dispatch_epoll_update(dmn
, EPOLL_CTL_MOD
);
329 epoll_ctl(_dispatch_epfd
, EPOLL_CTL_DEL
, dmn
->dmn_fd
, NULL
);
330 TAILQ_REMOVE(_dispatch_unote_muxnote_bucket(du
), dmn
, dmn_list
);
331 _dispatch_muxnote_dispose(dmn
);
333 dispatch_assert(du
._du
->du_wlh
== DISPATCH_WLH_ANON
);
334 du
._du
->du_wlh
= NULL
;
342 _dispatch_event_merge_timer(dispatch_clock_t clock
)
344 _dispatch_timers_expired
= true;
345 _dispatch_timers_processing_mask
|= 1 << DISPATCH_TIMER_INDEX(clock
, 0);
346 #if DISPATCH_USE_DTRACE
347 _dispatch_timers_will_wake
|= 1 << 0;
349 _dispatch_epoll_timeout
[clock
].det_armed
= false;
350 _dispatch_timers_heap
[clock
].dth_flags
&= ~DTH_ARMED
;
354 _dispatch_timeout_program(uint32_t tidx
, uint64_t target
,
355 DISPATCH_UNUSED
uint64_t leeway
)
357 dispatch_clock_t clock
= DISPATCH_TIMER_CLOCK(tidx
);
358 dispatch_epoll_timeout_t timer
= &_dispatch_epoll_timeout
[clock
];
359 struct epoll_event ev
= {
360 .events
= EPOLLONESHOT
| EPOLLIN
,
361 .data
= { .u32
= timer
->det_ident
},
365 if (target
>= INT64_MAX
&& !timer
->det_registered
) {
369 if (unlikely(timer
->det_fd
< 0)) {
372 switch (DISPATCH_TIMER_CLOCK(tidx
)) {
373 case DISPATCH_CLOCK_MACH
:
374 clockid
= CLOCK_MONOTONIC
;
376 case DISPATCH_CLOCK_WALL
:
377 clockid
= CLOCK_REALTIME
;
380 fd
= timerfd_create(clockid
, TFD_NONBLOCK
| TFD_CLOEXEC
);
381 if (!dispatch_assume(fd
>= 0)) {
387 if (target
< INT64_MAX
) {
388 struct itimerspec its
= { .it_value
= {
389 .tv_sec
= target
/ NSEC_PER_SEC
,
390 .tv_nsec
= target
% NSEC_PER_SEC
,
392 dispatch_assume_zero(timerfd_settime(timer
->det_fd
, TFD_TIMER_ABSTIME
,
394 if (!timer
->det_registered
) {
396 } else if (!timer
->det_armed
) {
404 dispatch_assume_zero(epoll_ctl(_dispatch_epfd
, op
, timer
->det_fd
, &ev
));
405 timer
->det_armed
= timer
->det_registered
= (op
!= EPOLL_CTL_DEL
);;
409 _dispatch_event_loop_timer_arm(uint32_t tidx
, dispatch_timer_delay_s range
,
410 dispatch_clock_now_cache_t nows
)
412 uint64_t target
= range
.delay
;
413 target
+= _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx
), nows
);
414 _dispatch_timers_heap
[tidx
].dth_flags
|= DTH_ARMED
;
415 _dispatch_timeout_program(tidx
, target
, range
.leeway
);
419 _dispatch_event_loop_timer_delete(uint32_t tidx
)
421 _dispatch_timers_heap
[tidx
].dth_flags
&= ~DTH_ARMED
;
422 _dispatch_timeout_program(tidx
, UINT64_MAX
, UINT64_MAX
);
425 #pragma mark dispatch_loop
428 _dispatch_event_loop_atfork_child(void)
433 _dispatch_epoll_init(void *context DISPATCH_UNUSED
)
435 _dispatch_fork_becomes_unsafe();
438 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
439 TAILQ_INIT(&_dispatch_sources
[i
]);
442 _dispatch_epfd
= epoll_create1(EPOLL_CLOEXEC
);
443 if (_dispatch_epfd
< 0) {
444 DISPATCH_INTERNAL_CRASH(errno
, "epoll_create1() failed");
447 _dispatch_eventfd
= eventfd(0, EFD_CLOEXEC
| EFD_NONBLOCK
);
448 if (_dispatch_eventfd
< 0) {
449 DISPATCH_INTERNAL_CRASH(errno
, "epoll_eventfd() failed");
452 struct epoll_event ev
= {
453 .events
= EPOLLIN
| EPOLLFREE
,
454 .data
= { .u32
= DISPATCH_EPOLL_EVENTFD
, },
456 int op
= EPOLL_CTL_ADD
;
457 if (epoll_ctl(_dispatch_epfd
, op
, _dispatch_eventfd
, &ev
) < 0) {
458 DISPATCH_INTERNAL_CRASH(errno
, "epoll_ctl() failed");
461 #if DISPATCH_USE_MGR_THREAD
462 dx_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
, 0);
467 _dispatch_event_loop_poke(dispatch_wlh_t wlh DISPATCH_UNUSED
,
468 uint64_t dq_state DISPATCH_UNUSED
, uint32_t flags DISPATCH_UNUSED
)
470 dispatch_once_f(&epoll_init_pred
, NULL
, _dispatch_epoll_init
);
471 dispatch_assume_zero(eventfd_write(_dispatch_eventfd
, 1));
475 _dispatch_event_merge_signal(dispatch_muxnote_t dmn
)
477 dispatch_unote_linkage_t dul
, dul_next
;
478 struct signalfd_siginfo si
;
481 // Linux has the weirdest semantics around signals: if it finds a thread
482 // that has not masked a process wide-signal, it may deliver it to this
483 // thread, meaning that the signalfd may have been made readable, but the
484 // signal consumed through the legacy delivery mechanism.
486 // Because of this we can get a misfire of the signalfd yielding EAGAIN the
487 // first time around. The _dispatch_muxnote_signal_block_and_raise() hack
488 // will kick in, the thread with the wrong mask will be fixed up, and the
489 // signal delivered to us again properly.
490 if ((rc
= read(dmn
->dmn_fd
, &si
, sizeof(si
))) == sizeof(si
)) {
491 TAILQ_FOREACH_SAFE(dul
, &dmn
->dmn_readers_head
, du_link
, dul_next
) {
492 dispatch_unote_t du
= _dispatch_unote_linkage_get_unote(dul
);
493 dux_merge_evt(du
._du
, EV_ADD
|EV_ENABLE
|EV_CLEAR
, 1, 0, 0);
496 dispatch_assume(rc
== -1 && errno
== EAGAIN
);
501 _dispatch_get_buffer_size(dispatch_muxnote_t dmn
, bool writer
)
505 if (writer
? dmn
->dmn_skip_outq_ioctl
: dmn
->dmn_skip_inq_ioctl
) {
509 if (ioctl((int)dmn
->dmn_ident
, writer
? SIOCOUTQ
: SIOCINQ
, &n
) != 0) {
513 // this file descriptor actually doesn't support the buffer
514 // size ioctl, remember that for next time to avoid the syscall.
517 dispatch_assume_zero(errno
);
521 dmn
->dmn_skip_outq_ioctl
= true;
523 dmn
->dmn_skip_inq_ioctl
= true;
531 _dispatch_event_merge_fd(dispatch_muxnote_t dmn
, uint32_t events
)
533 dispatch_unote_linkage_t dul
, dul_next
;
536 if (events
& EPOLLIN
) {
537 data
= _dispatch_get_buffer_size(dmn
, false);
538 TAILQ_FOREACH_SAFE(dul
, &dmn
->dmn_readers_head
, du_link
, dul_next
) {
539 dispatch_unote_t du
= _dispatch_unote_linkage_get_unote(dul
);
540 dux_merge_evt(du
._du
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, ~data
, 0, 0);
544 if (events
& EPOLLOUT
) {
545 data
= _dispatch_get_buffer_size(dmn
, true);
546 TAILQ_FOREACH_SAFE(dul
, &dmn
->dmn_writers_head
, du_link
, dul_next
) {
547 dispatch_unote_t du
= _dispatch_unote_linkage_get_unote(dul
);
548 dux_merge_evt(du
._du
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, ~data
, 0, 0);
555 _dispatch_event_loop_drain(uint32_t flags
)
557 struct epoll_event ev
[DISPATCH_EPOLL_MAX_EVENT_COUNT
];
559 int timeout
= (flags
& KEVENT_FLAG_IMMEDIATE
) ? 0 : -1;
562 r
= epoll_wait(_dispatch_epfd
, ev
, countof(ev
), timeout
);
563 if (unlikely(r
== -1)) {
569 DISPATCH_CLIENT_CRASH(err
, "Do not close random Unix descriptors");
572 (void)dispatch_assume_zero(err
);
578 for (i
= 0; i
< r
; i
++) {
579 dispatch_muxnote_t dmn
;
582 if (ev
[i
].events
& EPOLLFREE
) {
583 DISPATCH_CLIENT_CRASH(0, "Do not close random Unix descriptors");
586 switch (ev
[i
].data
.u32
) {
587 case DISPATCH_EPOLL_EVENTFD
:
588 dispatch_assume_zero(eventfd_read(_dispatch_eventfd
, &value
));
591 case DISPATCH_EPOLL_CLOCK_WALL
:
592 _dispatch_event_merge_timer(DISPATCH_CLOCK_WALL
);
595 case DISPATCH_EPOLL_CLOCK_MACH
:
596 _dispatch_event_merge_timer(DISPATCH_CLOCK_MACH
);
600 dmn
= ev
[i
].data
.ptr
;
601 switch (dmn
->dmn_filter
) {
603 _dispatch_event_merge_signal(dmn
);
607 _dispatch_event_merge_fd(dmn
, ev
[i
].events
);
615 _dispatch_event_loop_wake_owner(dispatch_sync_context_t dsc
,
616 dispatch_wlh_t wlh
, uint64_t old_state
, uint64_t new_state
)
618 (void)dsc
; (void)wlh
; (void)old_state
; (void)new_state
;
622 _dispatch_event_loop_wait_for_ownership(dispatch_sync_context_t dsc
)
624 if (dsc
->dsc_release_storage
) {
625 _dispatch_queue_release_storage(dsc
->dc_data
);
630 _dispatch_event_loop_end_ownership(dispatch_wlh_t wlh
, uint64_t old_state
,
631 uint64_t new_state
, uint32_t flags
)
633 (void)wlh
; (void)old_state
; (void)new_state
; (void)flags
;
636 #if DISPATCH_WLH_DEBUG
638 _dispatch_event_loop_assert_not_owned(dispatch_wlh_t wlh
)
645 _dispatch_event_loop_leave_immediate(dispatch_wlh_t wlh
, uint64_t dq_state
)
647 (void)wlh
; (void)dq_state
;
650 #endif // DISPATCH_EVENT_BACKEND_EPOLL