2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 #include "protocolServer.h"
27 #define DKEV_DISPOSE_IMMEDIATE_DELETE 0x1
28 #define DKEV_UNREGISTER_DISCONNECTED 0x2
29 #define DKEV_UNREGISTER_REPLY_REMOVE 0x4
30 #define DKEV_UNREGISTER_WAKEUP 0x8
32 static pthread_priority_t
33 _dispatch_source_compute_kevent_priority(dispatch_source_t ds
);
34 static void _dispatch_source_handler_free(dispatch_source_t ds
, long kind
);
35 static void _dispatch_source_merge_kevent(dispatch_source_t ds
,
36 const _dispatch_kevent_qos_s
*ke
);
37 static bool _dispatch_kevent_register(dispatch_kevent_t
*dkp
,
38 pthread_priority_t pp
, uint32_t *flgp
);
39 static long _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
,
40 unsigned int options
);
41 static long _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
43 static void _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
);
44 static void _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
);
45 static void _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
);
46 static void _dispatch_timers_unregister(dispatch_source_t ds
,
47 dispatch_kevent_t dk
);
48 static void _dispatch_timers_update(dispatch_source_t ds
);
49 static void _dispatch_timer_aggregates_check(void);
50 static void _dispatch_timer_aggregates_register(dispatch_source_t ds
);
51 static void _dispatch_timer_aggregates_update(dispatch_source_t ds
,
53 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds
,
55 static inline unsigned long _dispatch_source_timer_data(
56 dispatch_source_refs_t dr
, unsigned long prev
);
57 static void _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s
*ke
);
58 static long _dispatch_kq_immediate_update(_dispatch_kevent_qos_s
*ke
);
59 static void _dispatch_memorypressure_init(void);
61 static void _dispatch_mach_host_calendar_change_register(void);
62 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
63 static void _dispatch_mach_recv_msg_buf_init(void);
64 static kern_return_t
_dispatch_kevent_machport_resume(dispatch_kevent_t dk
,
65 uint32_t new_flags
, uint32_t del_flags
);
67 static kern_return_t
_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
,
68 uint32_t new_flags
, uint32_t del_flags
);
69 static void _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s
*ke
);
70 static mach_msg_size_t
_dispatch_kevent_mach_msg_size(
71 _dispatch_kevent_qos_s
*ke
);
73 static inline void _dispatch_mach_host_calendar_change_register(void) {}
74 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
76 static const char * _evfiltstr(short filt
);
78 static void dispatch_kevent_debug(const char *verb
,
79 const _dispatch_kevent_qos_s
*kev
, int i
, int n
,
80 const char *function
, unsigned int line
);
81 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
82 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
85 dispatch_kevent_debug(const char *verb
, const _dispatch_kevent_qos_s
*kev
,
86 int i
, int n
, const char *function
, unsigned int line
)
88 (void)verb
; (void)kev
; (void)i
; (void)n
; (void)function
; (void)line
;
90 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
92 #define _dispatch_kevent_debug(verb, _kev) \
93 dispatch_kevent_debug(verb, _kev, 0, 1, __FUNCTION__, __LINE__)
94 #define _dispatch_kevent_debug_n(verb, _kev, i, n) \
95 dispatch_kevent_debug(verb, _kev, i, n, __FUNCTION__, __LINE__)
96 #ifndef DISPATCH_MGR_QUEUE_DEBUG
97 #define DISPATCH_MGR_QUEUE_DEBUG 0
99 #if DISPATCH_MGR_QUEUE_DEBUG
100 #define _dispatch_kevent_mgr_debug _dispatch_kevent_debug
103 _dispatch_kevent_mgr_debug(_dispatch_kevent_qos_s
* kev DISPATCH_UNUSED
) {}
107 #pragma mark dispatch_source_t
110 dispatch_source_create(dispatch_source_type_t type
, uintptr_t handle
,
111 unsigned long mask
, dispatch_queue_t dq
)
113 // ensure _dispatch_evfilt_machport_direct_enabled is initialized
114 _dispatch_root_queues_init();
115 const _dispatch_kevent_qos_s
*proto_kev
= &type
->ke
;
116 dispatch_source_t ds
;
117 dispatch_kevent_t dk
;
120 if (type
== NULL
|| (mask
& ~type
->mask
)) {
121 return DISPATCH_BAD_INPUT
;
123 if (type
->mask
&& !mask
) {
124 // expect a non-zero mask when the type declares one ... except
125 switch (type
->ke
.filter
) {
126 case DISPATCH_EVFILT_TIMER
:
127 break; // timers don't need masks
128 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
129 break; // type->init handles zero mask as a legacy case
131 // otherwise reject as invalid input
132 return DISPATCH_BAD_INPUT
;
136 switch (type
->ke
.filter
) {
138 if (handle
>= NSIG
) {
139 return DISPATCH_BAD_INPUT
;
143 #if DISPATCH_USE_MEMORYSTATUS
144 case EVFILT_MEMORYSTATUS
:
146 case DISPATCH_EVFILT_CUSTOM_ADD
:
147 case DISPATCH_EVFILT_CUSTOM_OR
:
149 return DISPATCH_BAD_INPUT
;
152 case DISPATCH_EVFILT_TIMER
:
153 if ((handle
== 0) != (type
->ke
.ident
== 0)) {
154 return DISPATCH_BAD_INPUT
;
161 ds
= _dispatch_alloc(DISPATCH_VTABLE(source
),
162 sizeof(struct dispatch_source_s
));
163 // Initialize as a queue first, then override some settings below.
164 _dispatch_queue_init(ds
->_as_dq
, DQF_NONE
, 1, true);
165 ds
->dq_label
= "source";
166 ds
->do_ref_cnt
++; // the reference the manager queue holds
168 switch (type
->ke
.filter
) {
169 case DISPATCH_EVFILT_CUSTOM_OR
:
170 dk
= DISPATCH_KEV_CUSTOM_OR
;
172 case DISPATCH_EVFILT_CUSTOM_ADD
:
173 dk
= DISPATCH_KEV_CUSTOM_ADD
;
176 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
177 dk
->dk_kevent
= *proto_kev
;
178 dk
->dk_kevent
.ident
= handle
;
179 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
180 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
181 dk
->dk_kevent
.udata
= (_dispatch_kevent_qos_udata_t
)dk
;
182 TAILQ_INIT(&dk
->dk_sources
);
184 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
185 ds
->ds_ident_hack
= (uintptr_t)dk
->dk_kevent
.ident
;
186 if (EV_UDATA_SPECIFIC
& proto_kev
->flags
) {
187 dk
->dk_kevent
.flags
|= EV_DISPATCH
;
188 ds
->ds_is_direct_kevent
= true;
189 ds
->ds_needs_rearm
= true;
195 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
196 ds
->ds_needs_rearm
= true;
197 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
198 // we cheat and use EV_CLEAR to mean a "flag thingy"
199 ds
->ds_is_adder
= true;
201 // Some sources require special processing
202 if (type
->init
!= NULL
) {
203 type
->init(ds
, type
, handle
, mask
);
205 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
206 if (!ds
->ds_is_custom_source
&& (dk
->dk_kevent
.flags
& EV_VANISHED
)) {
207 // see _dispatch_source_merge_kevent
208 dispatch_assert(!(dk
->dk_kevent
.flags
& EV_ONESHOT
));
209 dispatch_assert(dk
->dk_kevent
.flags
& EV_DISPATCH
);
210 dispatch_assert(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
);
213 if (fastpath(!ds
->ds_refs
)) {
214 ds
->ds_refs
= _dispatch_calloc(1ul,
215 sizeof(struct dispatch_source_refs_s
));
217 ds
->ds_refs
->dr_source_wref
= _dispatch_ptr2wref(ds
);
220 dq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
222 _dispatch_retain(dq
);
225 _dispatch_object_debug(ds
, "%s", __func__
);
230 _dispatch_source_dispose(dispatch_source_t ds
)
232 _dispatch_object_debug(ds
, "%s", __func__
);
233 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
234 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
235 _dispatch_source_handler_free(ds
, DS_CANCEL_HANDLER
);
237 _dispatch_queue_destroy(ds
->_as_dq
);
241 _dispatch_source_xref_dispose(dispatch_source_t ds
)
243 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
);
247 dispatch_source_testcancel(dispatch_source_t ds
)
249 return (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
);
253 dispatch_source_get_mask(dispatch_source_t ds
)
255 unsigned long mask
= ds
->ds_pending_data_mask
;
256 if (ds
->ds_vmpressure_override
) {
257 mask
= NOTE_VM_PRESSURE
;
259 #if TARGET_IPHONE_SIMULATOR
260 else if (ds
->ds_memorypressure_override
) {
261 mask
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
268 dispatch_source_get_handle(dispatch_source_t ds
)
270 unsigned int handle
= (unsigned int)ds
->ds_ident_hack
;
271 #if TARGET_IPHONE_SIMULATOR
272 if (ds
->ds_memorypressure_override
) {
280 dispatch_source_get_data(dispatch_source_t ds
)
282 unsigned long data
= ds
->ds_data
;
283 if (ds
->ds_vmpressure_override
) {
284 data
= NOTE_VM_PRESSURE
;
286 #if TARGET_IPHONE_SIMULATOR
287 else if (ds
->ds_memorypressure_override
) {
288 data
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
294 DISPATCH_ALWAYS_INLINE
296 _dispatch_source_merge_data2(dispatch_source_t ds
,
297 pthread_priority_t pp
, unsigned long val
)
299 _dispatch_kevent_qos_s kev
= {
300 .fflags
= (typeof(kev
.fflags
))val
,
301 .data
= (typeof(kev
.data
))val
,
302 #if DISPATCH_USE_KEVENT_QOS
303 .qos
= (_dispatch_kevent_priority_t
)pp
,
306 #if !DISPATCH_USE_KEVENT_QOS
310 dispatch_assert(ds
->ds_dkev
== DISPATCH_KEV_CUSTOM_OR
||
311 ds
->ds_dkev
== DISPATCH_KEV_CUSTOM_ADD
);
312 _dispatch_kevent_debug("synthetic data", &kev
);
313 _dispatch_source_merge_kevent(ds
, &kev
);
317 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
319 _dispatch_source_merge_data2(ds
, 0, val
);
323 _dispatch_source_merge_data(dispatch_source_t ds
, pthread_priority_t pp
,
326 _dispatch_source_merge_data2(ds
, pp
, val
);
330 #pragma mark dispatch_source_handler
332 DISPATCH_ALWAYS_INLINE
333 static inline dispatch_continuation_t
334 _dispatch_source_get_handler(dispatch_source_refs_t dr
, long kind
)
336 return os_atomic_load(&dr
->ds_handler
[kind
], relaxed
);
338 #define _dispatch_source_get_event_handler(dr) \
339 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
340 #define _dispatch_source_get_cancel_handler(dr) \
341 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
342 #define _dispatch_source_get_registration_handler(dr) \
343 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
345 DISPATCH_ALWAYS_INLINE
346 static inline dispatch_continuation_t
347 _dispatch_source_handler_alloc(dispatch_source_t ds
, void *func
, long kind
,
350 // sources don't propagate priority by default
351 const dispatch_block_flags_t flags
=
352 DISPATCH_BLOCK_HAS_PRIORITY
| DISPATCH_BLOCK_NO_VOUCHER
;
353 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
355 uintptr_t dc_flags
= 0;
357 if (kind
!= DS_EVENT_HANDLER
) {
358 dc_flags
|= DISPATCH_OBJ_CONSUME_BIT
;
362 _dispatch_continuation_init(dc
, ds
, func
, 0, flags
, dc_flags
);
363 #endif /* __BLOCKS__ */
365 dc_flags
|= DISPATCH_OBJ_CTXT_FETCH_BIT
;
366 _dispatch_continuation_init_f(dc
, ds
, ds
->do_ctxt
, func
,
369 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
379 _dispatch_source_handler_dispose(dispatch_continuation_t dc
)
382 if (dc
->dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
383 Block_release(dc
->dc_ctxt
);
385 #endif /* __BLOCKS__ */
386 if (dc
->dc_voucher
) {
387 _voucher_release(dc
->dc_voucher
);
388 dc
->dc_voucher
= VOUCHER_INVALID
;
390 _dispatch_continuation_free(dc
);
393 DISPATCH_ALWAYS_INLINE
394 static inline dispatch_continuation_t
395 _dispatch_source_handler_take(dispatch_source_t ds
, long kind
)
397 return os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], NULL
, relaxed
);
400 DISPATCH_ALWAYS_INLINE
402 _dispatch_source_handler_free(dispatch_source_t ds
, long kind
)
404 dispatch_continuation_t dc
= _dispatch_source_handler_take(ds
, kind
);
405 if (dc
) _dispatch_source_handler_dispose(dc
);
408 DISPATCH_ALWAYS_INLINE
410 _dispatch_source_handler_replace(dispatch_source_t ds
, long kind
,
411 dispatch_continuation_t dc
)
414 _dispatch_continuation_free(dc
);
416 } else if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
417 dc
->dc_ctxt
= ds
->do_ctxt
;
419 dc
= os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], dc
, release
);
420 if (dc
) _dispatch_source_handler_dispose(dc
);
425 _dispatch_source_set_handler_slow(void *context
)
427 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
428 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
430 dispatch_continuation_t dc
= context
;
431 long kind
= (long)dc
->dc_data
;
433 _dispatch_source_handler_replace(ds
, kind
, dc
);
438 _dispatch_source_set_handler(dispatch_source_t ds
, long kind
,
439 dispatch_continuation_t dc
)
441 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
442 if (_dispatch_queue_try_inactive_suspend(ds
->_as_dq
)) {
443 _dispatch_source_handler_replace(ds
, kind
, dc
);
444 return dx_vtable(ds
)->do_resume(ds
, false);
446 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation
, ds
);
447 if (kind
== DS_REGISTN_HANDLER
) {
448 _dispatch_bug_deprecated("Setting registration handler after "
449 "the source has been activated");
451 dc
->dc_data
= (void *)kind
;
452 _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, dc
,
453 _dispatch_source_set_handler_slow
);
458 dispatch_source_set_event_handler(dispatch_source_t ds
,
459 dispatch_block_t handler
)
461 dispatch_continuation_t dc
;
462 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, true);
463 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
465 #endif /* __BLOCKS__ */
468 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
469 dispatch_function_t handler
)
471 dispatch_continuation_t dc
;
472 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
473 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
477 _dispatch_source_set_event_handler_continuation(dispatch_source_t ds
,
478 dispatch_continuation_t dc
)
480 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
481 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
486 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
487 dispatch_block_t handler
)
489 dispatch_continuation_t dc
;
490 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, true);
491 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
493 #endif /* __BLOCKS__ */
496 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
497 dispatch_function_t handler
)
499 dispatch_continuation_t dc
;
500 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, false);
501 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
506 dispatch_source_set_registration_handler(dispatch_source_t ds
,
507 dispatch_block_t handler
)
509 dispatch_continuation_t dc
;
510 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, true);
511 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
513 #endif /* __BLOCKS__ */
516 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
517 dispatch_function_t handler
)
519 dispatch_continuation_t dc
;
520 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, false);
521 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
525 #pragma mark dispatch_source_invoke
528 _dispatch_source_registration_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
529 dispatch_invoke_flags_t flags
)
531 dispatch_continuation_t dc
;
533 dc
= _dispatch_source_handler_take(ds
, DS_REGISTN_HANDLER
);
534 if (ds
->dq_atomic_flags
& (DSF_CANCELED
| DQF_RELEASED
)) {
535 // no registration callout if source is canceled rdar://problem/8955246
536 return _dispatch_source_handler_dispose(dc
);
538 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
539 dc
->dc_ctxt
= ds
->do_ctxt
;
541 _dispatch_continuation_pop(dc
, cq
, flags
);
545 _dispatch_source_cancel_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
546 dispatch_invoke_flags_t flags
)
548 dispatch_continuation_t dc
;
550 dc
= _dispatch_source_handler_take(ds
, DS_CANCEL_HANDLER
);
551 ds
->ds_pending_data_mask
= 0;
552 ds
->ds_pending_data
= 0;
554 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
555 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
559 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
560 return _dispatch_source_handler_dispose(dc
);
562 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
563 dc
->dc_ctxt
= ds
->do_ctxt
;
565 _dispatch_continuation_pop(dc
, cq
, flags
);
569 _dispatch_source_latch_and_call(dispatch_source_t ds
, dispatch_queue_t cq
,
570 dispatch_invoke_flags_t flags
)
574 dispatch_source_refs_t dr
= ds
->ds_refs
;
575 dispatch_continuation_t dc
= _dispatch_source_get_handler(dr
, DS_EVENT_HANDLER
);
576 prev
= os_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
577 if (ds
->ds_is_level
) {
579 } else if (ds
->ds_is_timer
&& ds_timer(dr
).target
&& prev
) {
580 ds
->ds_data
= _dispatch_source_timer_data(dr
, prev
);
584 if (!dispatch_assume(prev
) || !dc
) {
587 _dispatch_continuation_pop(dc
, cq
, flags
);
588 if (ds
->ds_is_timer
&& (ds_timer(dr
).flags
& DISPATCH_TIMER_AFTER
)) {
589 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
590 dispatch_release(ds
); // dispatch_after sources are one-shot
595 _dispatch_source_kevent_unregister(dispatch_source_t ds
)
597 _dispatch_object_debug(ds
, "%s", __func__
);
598 uint32_t flags
= (uint32_t)ds
->ds_pending_data_mask
;
599 dispatch_kevent_t dk
= ds
->ds_dkev
;
600 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
601 if (ds
->ds_is_custom_source
) {
606 if (ds
->ds_is_direct_kevent
&&
607 ((dqf
& DSF_DELETED
) || !(ds
->ds_is_installed
))) {
608 dk
->dk_kevent
.flags
|= EV_DELETE
; // already deleted
609 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
611 if (dk
->dk_kevent
.filter
== DISPATCH_EVFILT_TIMER
) {
613 if (ds
->ds_is_installed
) {
614 _dispatch_timers_unregister(ds
, dk
);
616 } else if (!ds
->ds_is_direct_kevent
) {
618 dispatch_assert((bool)ds
->ds_is_installed
);
619 TAILQ_REMOVE(&dk
->dk_sources
, ds
->ds_refs
, dr_list
);
620 _dispatch_kevent_unregister(dk
, flags
, 0);
622 unsigned int dkev_dispose_options
= 0;
623 if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
624 dkev_dispose_options
|= DKEV_DISPOSE_IMMEDIATE_DELETE
;
625 } else if (dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
) {
626 if (!ds
->ds_is_direct_kevent
) {
627 dkev_dispose_options
|= DKEV_DISPOSE_IMMEDIATE_DELETE
;
630 long r
= _dispatch_kevent_unregister(dk
, flags
, dkev_dispose_options
);
631 if (r
== EINPROGRESS
) {
632 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
634 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
635 return; // deferred unregistration
636 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
637 } else if (r
== ENOENT
) {
638 _dispatch_debug("kevent-source[%p]: ENOENT delete kevent[%p]",
640 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
641 return; // potential concurrent EV_DELETE delivery rdar://22047283
644 dispatch_assume_zero(r
);
647 _TAILQ_TRASH_ENTRY(ds
->ds_refs
, dr_list
);
650 dqf
= _dispatch_queue_atomic_flags_set_and_clear_orig(ds
->_as_dq
,
651 DSF_DELETED
, DSF_ARMED
| DSF_DEFERRED_DELETE
| DSF_CANCEL_WAITER
);
652 if (dqf
& DSF_CANCEL_WAITER
) {
653 _dispatch_wake_by_address(&ds
->dq_atomic_flags
);
655 ds
->ds_is_installed
= true;
656 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
657 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dk
);
658 _dispatch_release(ds
); // the retain is done at creation time
661 DISPATCH_ALWAYS_INLINE
663 _dispatch_source_tryarm(dispatch_source_t ds
)
665 dispatch_queue_flags_t oqf
, nqf
;
666 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
667 if (oqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
)) {
668 // the test is inside the loop because it's convenient but the
669 // result should not change for the duration of the rmw_loop
670 os_atomic_rmw_loop_give_up(break);
672 nqf
= oqf
| DSF_ARMED
;
677 _dispatch_source_kevent_resume(dispatch_source_t ds
, uint32_t new_flags
)
679 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
680 case DISPATCH_EVFILT_TIMER
:
681 _dispatch_timers_update(ds
);
682 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
683 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
687 case EVFILT_MACHPORT
:
688 if ((ds
->ds_pending_data_mask
& DISPATCH_MACH_RECV_MESSAGE
) &&
689 !ds
->ds_is_direct_kevent
) {
690 new_flags
|= DISPATCH_MACH_RECV_MESSAGE
; // emulate EV_DISPATCH
695 if (unlikely(!_dispatch_source_tryarm(ds
))) {
698 if (unlikely(_dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0))) {
699 _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
, DSF_DELETED
,
703 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
708 _dispatch_source_kevent_register(dispatch_source_t ds
, pthread_priority_t pp
)
710 dispatch_assert_zero((bool)ds
->ds_is_installed
);
711 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
712 case DISPATCH_EVFILT_TIMER
:
713 // aggressively coalesce background/maintenance QoS timers
714 // <rdar://problem/12200216&27342536>
715 pp
= _dispatch_source_compute_kevent_priority(ds
);
716 if (_dispatch_is_background_priority(pp
)) {
717 ds_timer(ds
->ds_refs
).flags
|= DISPATCH_TIMER_BACKGROUND
;
719 _dispatch_timers_update(ds
);
720 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
721 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
725 bool do_resume
= _dispatch_kevent_register(&ds
->ds_dkev
, pp
, &flags
);
726 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
->ds_refs
, dr_list
);
727 ds
->ds_is_installed
= true;
728 if (do_resume
|| ds
->ds_needs_rearm
) {
729 if (unlikely(!_dispatch_source_kevent_resume(ds
, flags
))) {
730 _dispatch_source_kevent_unregister(ds
);
733 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
735 _dispatch_object_debug(ds
, "%s", __func__
);
739 _dispatch_source_set_event_handler_context(void *ctxt
)
741 dispatch_source_t ds
= ctxt
;
742 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
744 if (dc
&& (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
)) {
745 dc
->dc_ctxt
= ds
->do_ctxt
;
749 static pthread_priority_t
750 _dispatch_source_compute_kevent_priority(dispatch_source_t ds
)
752 pthread_priority_t p
= ds
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
753 dispatch_queue_t tq
= ds
->do_targetq
;
754 pthread_priority_t tqp
= tq
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
756 while (unlikely(tq
->do_targetq
)) {
757 if (unlikely(tq
== &_dispatch_mgr_q
)) {
758 return _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
760 if (unlikely(_dispatch_queue_is_thread_bound(tq
))) {
761 // thread bound hierarchies are weird, we need to install
762 // from the context of the thread this hierarchy is bound to
765 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq
))) {
766 // this queue may not be activated yet, so the queue graph may not
767 // have stabilized yet
768 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, ds
);
771 if (unlikely(!_dispatch_queue_has_immutable_target(tq
))) {
772 if (!_dispatch_is_in_root_queues_array(tq
->do_targetq
)) {
773 // we're not allowed to dereference tq->do_targetq
774 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, ds
);
778 if (!(tq
->dq_priority
& _PTHREAD_PRIORITY_INHERIT_FLAG
)) {
779 if (p
< tqp
) p
= tqp
;
782 tqp
= tq
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
785 if (unlikely(!tqp
)) {
786 // pthread root queues opt out of QoS
789 return _dispatch_priority_inherit_from_root_queue(p
, tq
);
793 _dispatch_source_finalize_activation(dispatch_source_t ds
)
795 dispatch_continuation_t dc
;
797 if (unlikely(ds
->ds_is_direct_kevent
&&
798 (_dispatch_queue_atomic_flags(ds
->_as_dq
) & DSF_CANCELED
))) {
799 return _dispatch_source_kevent_unregister(ds
);
802 dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
804 if (_dispatch_object_is_barrier(dc
)) {
805 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DQF_BARRIER_BIT
);
807 ds
->dq_priority
= dc
->dc_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
808 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
809 _dispatch_barrier_async_detached_f(ds
->_as_dq
, ds
,
810 _dispatch_source_set_event_handler_context
);
815 _dispatch_queue_finalize_activation(ds
->_as_dq
);
817 if (ds
->ds_is_direct_kevent
&& !ds
->ds_is_installed
) {
818 pthread_priority_t pp
= _dispatch_source_compute_kevent_priority(ds
);
819 if (pp
) _dispatch_source_kevent_register(ds
, pp
);
823 DISPATCH_ALWAYS_INLINE
824 static inline dispatch_queue_t
825 _dispatch_source_invoke2(dispatch_object_t dou
, dispatch_invoke_flags_t flags
,
826 uint64_t *owned
, struct dispatch_object_s
**dc_ptr DISPATCH_UNUSED
)
828 dispatch_source_t ds
= dou
._ds
;
829 dispatch_queue_t retq
= NULL
;
830 dispatch_queue_t dq
= _dispatch_queue_get_current();
832 if (_dispatch_queue_class_probe(ds
)) {
833 // Intentionally always drain even when on the manager queue
834 // and not the source's regular target queue: we need to be able
835 // to drain timer setting and the like there.
836 retq
= _dispatch_queue_serial_drain(ds
->_as_dq
, flags
, owned
, NULL
);
839 // This function performs all source actions. Each action is responsible
840 // for verifying that it takes place on the appropriate queue. If the
841 // current queue is not the correct queue for this action, the correct queue
842 // will be returned and the invoke will be re-driven on that queue.
844 // The order of tests here in invoke and in wakeup should be consistent.
846 dispatch_source_refs_t dr
= ds
->ds_refs
;
847 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
849 if (ds
->ds_is_direct_kevent
) {
850 dkq
= ds
->do_targetq
;
853 if (!ds
->ds_is_installed
) {
854 // The source needs to be installed on the kevent queue.
858 _dispatch_source_kevent_register(ds
, _dispatch_get_defaultpriority());
861 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
862 // Source suspended by an item drained from the source queue.
863 return ds
->do_targetq
;
866 if (_dispatch_source_get_registration_handler(dr
)) {
867 // The source has been registered and the registration handler needs
868 // to be delivered on the target queue.
869 if (dq
!= ds
->do_targetq
) {
870 return ds
->do_targetq
;
872 // clears ds_registration_handler
873 _dispatch_source_registration_callout(ds
, dq
, flags
);
876 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
877 bool prevent_starvation
= false;
879 if ((dqf
& DSF_DEFERRED_DELETE
) &&
880 ((dqf
& DSF_DELETED
) || !(dqf
& DSF_ARMED
))) {
882 // DSF_DELETE: Pending source kevent unregistration has been completed
883 // !DSF_ARMED: event was delivered and can safely be unregistered
887 _dispatch_source_kevent_unregister(ds
);
888 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
891 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && ds
->ds_pending_data
) {
892 // The source has pending data to deliver via the event handler callback
893 // on the target queue. Some sources need to be rearmed on the kevent
894 // queue after event delivery.
895 if (dq
== ds
->do_targetq
) {
896 _dispatch_source_latch_and_call(ds
, dq
, flags
);
897 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
899 // starvation avoidance: if the source triggers itself then force a
900 // re-queue to give other things already queued on the target queue
903 // however, if the source is directly targetting an overcommit root
904 // queue, this would requeue the source and ask for a new overcommit
905 // thread right away.
906 prevent_starvation
= dq
->do_targetq
||
907 !(dq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
908 if (prevent_starvation
&& ds
->ds_pending_data
) {
909 retq
= ds
->do_targetq
;
912 // there is no point trying to be eager, the next thing to do is
913 // to deliver the event
914 return ds
->do_targetq
;
918 if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !(dqf
& DSF_DEFERRED_DELETE
)) {
919 // The source has been cancelled and needs to be uninstalled from the
920 // kevent queue. After uninstallation, the cancellation handler needs
921 // to be delivered to the target queue.
922 if (!(dqf
& DSF_DELETED
)) {
926 _dispatch_source_kevent_unregister(ds
);
927 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
928 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
929 if (!(dqf
& DSF_ARMED
)) {
930 goto unregister_event
;
932 // we need to wait for the EV_DELETE
936 if (dq
!= ds
->do_targetq
&& (_dispatch_source_get_event_handler(dr
) ||
937 _dispatch_source_get_cancel_handler(dr
) ||
938 _dispatch_source_get_registration_handler(dr
))) {
939 retq
= ds
->do_targetq
;
941 _dispatch_source_cancel_callout(ds
, dq
, flags
);
942 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
944 prevent_starvation
= false;
947 if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
948 // The source needs to be rearmed on the kevent queue.
952 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
953 // no need for resume when we can directly unregister the kevent
954 goto unregister_event
;
956 if (prevent_starvation
) {
957 // keep the old behavior to force re-enqueue to our target queue
958 // for the rearm. It is inefficient though and we should
959 // improve this <rdar://problem/24635615>.
961 // if the handler didn't run, or this is a pending delete
962 // or our target queue is a global queue, then starvation is
963 // not a concern and we can rearm right away.
964 return ds
->do_targetq
;
966 if (unlikely(!_dispatch_source_kevent_resume(ds
, 0))) {
967 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
968 goto unregister_event
;
977 _dispatch_source_invoke(dispatch_source_t ds
, dispatch_invoke_flags_t flags
)
979 _dispatch_queue_class_invoke(ds
->_as_dq
, flags
, _dispatch_source_invoke2
);
983 _dispatch_source_wakeup(dispatch_source_t ds
, pthread_priority_t pp
,
984 dispatch_wakeup_flags_t flags
)
986 // This function determines whether the source needs to be invoked.
987 // The order of tests here in wakeup and in invoke should be consistent.
989 dispatch_source_refs_t dr
= ds
->ds_refs
;
990 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
991 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
992 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
993 bool deferred_delete
= (dqf
& DSF_DEFERRED_DELETE
);
995 if (ds
->ds_is_direct_kevent
) {
996 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
999 if (!ds
->ds_is_installed
) {
1000 // The source needs to be installed on the kevent queue.
1002 } else if (_dispatch_source_get_registration_handler(dr
)) {
1003 // The registration handler needs to be delivered to the target queue.
1004 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1005 } else if (deferred_delete
&& ((dqf
& DSF_DELETED
) || !(dqf
& DSF_ARMED
))) {
1006 // Pending source kevent unregistration has been completed
1007 // or EV_ONESHOT event can be acknowledged
1009 } else if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && ds
->ds_pending_data
) {
1010 // The source has pending data to deliver to the target queue.
1011 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1012 } else if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !deferred_delete
) {
1013 // The source needs to be uninstalled from the kevent queue, or the
1014 // cancellation handler needs to be delivered to the target queue.
1015 // Note: cancellation assumes installation.
1016 if (!(dqf
& DSF_DELETED
)) {
1018 } else if (_dispatch_source_get_event_handler(dr
) ||
1019 _dispatch_source_get_cancel_handler(dr
) ||
1020 _dispatch_source_get_registration_handler(dr
)) {
1021 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1023 } else if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
1024 // The source needs to be rearmed on the kevent queue.
1027 if (!tq
&& _dispatch_queue_class_probe(ds
)) {
1028 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1032 return _dispatch_queue_class_wakeup(ds
->_as_dq
, pp
, flags
, tq
);
1034 return _dispatch_queue_class_override_drainer(ds
->_as_dq
, pp
, flags
);
1035 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
1036 return _dispatch_release_tailcall(ds
);
1041 dispatch_source_cancel(dispatch_source_t ds
)
1043 _dispatch_object_debug(ds
, "%s", __func__
);
1044 // Right after we set the cancel flag, someone else
1045 // could potentially invoke the source, do the cancellation,
1046 // unregister the source, and deallocate it. We would
1047 // need to therefore retain/release before setting the bit
1048 _dispatch_retain(ds
);
1050 dispatch_queue_t q
= ds
->_as_dq
;
1051 if (_dispatch_queue_atomic_flags_set_orig(q
, DSF_CANCELED
) & DSF_CANCELED
) {
1052 _dispatch_release_tailcall(ds
);
1054 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
| DISPATCH_WAKEUP_CONSUME
);
1059 dispatch_source_cancel_and_wait(dispatch_source_t ds
)
1061 dispatch_queue_flags_t old_dqf
, dqf
, new_dqf
;
1062 pthread_priority_t pp
;
1064 if (unlikely(_dispatch_source_get_cancel_handler(ds
->ds_refs
))) {
1065 DISPATCH_CLIENT_CRASH(ds
, "Source has a cancel handler");
1068 _dispatch_object_debug(ds
, "%s", __func__
);
1069 os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1070 new_dqf
= old_dqf
| DSF_CANCELED
;
1071 if (old_dqf
& DSF_CANCEL_WAITER
) {
1072 os_atomic_rmw_loop_give_up(break);
1074 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1075 // just add DSF_CANCELED
1076 } else if ((old_dqf
& DSF_DEFERRED_DELETE
) || !ds
->ds_is_direct_kevent
){
1077 new_dqf
|= DSF_CANCEL_WAITER
;
1082 if (old_dqf
& DQF_RELEASED
) {
1083 DISPATCH_CLIENT_CRASH(ds
, "Dispatch source used after last release");
1085 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1088 if (dqf
& DSF_CANCEL_WAITER
) {
1092 // simplified version of _dispatch_queue_drain_try_lock
1093 // that also sets the DIRTY bit on failure to lock
1094 dispatch_lock_owner tid_self
= _dispatch_tid_self();
1095 uint64_t xor_owner_and_set_full_width
= tid_self
|
1096 DISPATCH_QUEUE_WIDTH_FULL_BIT
| DISPATCH_QUEUE_IN_BARRIER
;
1097 uint64_t old_state
, new_state
;
1099 os_atomic_rmw_loop2o(ds
, dq_state
, old_state
, new_state
, seq_cst
, {
1100 new_state
= old_state
;
1101 if (likely(_dq_state_is_runnable(old_state
) &&
1102 !_dq_state_drain_locked(old_state
))) {
1103 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
1104 new_state
^= xor_owner_and_set_full_width
;
1105 } else if (old_dqf
& DSF_CANCELED
) {
1106 os_atomic_rmw_loop_give_up(break);
1108 // this case needs a release barrier, hence the seq_cst above
1109 new_state
|= DISPATCH_QUEUE_DIRTY
;
1113 if (unlikely(_dq_state_is_suspended(old_state
))) {
1114 if (unlikely(_dq_state_suspend_cnt(old_state
))) {
1115 DISPATCH_CLIENT_CRASH(ds
, "Source is suspended");
1117 // inactive sources have never been registered and there is no need
1118 // to wait here because activation will notice and mark the source
1119 // as deleted without ever trying to use the fd or mach port.
1120 return dispatch_activate(ds
);
1123 if (likely(_dq_state_is_runnable(old_state
) &&
1124 !_dq_state_drain_locked(old_state
))) {
1125 // same thing _dispatch_source_invoke2() does when handling cancellation
1126 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1127 if (!(dqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
))) {
1128 _dispatch_source_kevent_unregister(ds
);
1129 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1130 if (likely((dqf
& DSF_STATE_MASK
) == DSF_DELETED
)) {
1131 _dispatch_source_cancel_callout(ds
, NULL
, DISPATCH_INVOKE_NONE
);
1134 _dispatch_try_lock_transfer_or_wakeup(ds
->_as_dq
);
1135 } else if (unlikely(_dq_state_drain_locked_by(old_state
, tid_self
))) {
1136 DISPATCH_CLIENT_CRASH(ds
, "dispatch_source_cancel_and_wait "
1137 "called from a source handler");
1140 pp
= _dispatch_get_priority() & _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
1141 if (pp
) dx_wakeup(ds
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
1142 dispatch_activate(ds
);
1145 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1146 while (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
1147 if (unlikely(!(dqf
& DSF_CANCEL_WAITER
))) {
1148 if (!os_atomic_cmpxchgvw2o(ds
, dq_atomic_flags
,
1149 dqf
, dqf
| DSF_CANCEL_WAITER
, &dqf
, relaxed
)) {
1152 dqf
|= DSF_CANCEL_WAITER
;
1154 _dispatch_wait_on_address(&ds
->dq_atomic_flags
, dqf
, DLOCK_LOCK_NONE
);
1155 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1160 _dispatch_source_merge_kevent(dispatch_source_t ds
,
1161 const _dispatch_kevent_qos_s
*ke
)
1163 _dispatch_object_debug(ds
, "%s", __func__
);
1164 dispatch_wakeup_flags_t flags
= 0;
1165 dispatch_queue_flags_t dqf
;
1166 pthread_priority_t pp
= 0;
1168 if (ds
->ds_needs_rearm
|| (ke
->flags
& (EV_DELETE
| EV_ONESHOT
))) {
1169 // once we modify the queue atomic flags below, it will allow concurrent
1170 // threads running _dispatch_source_invoke2 to dispose of the source,
1171 // so we can't safely borrow the reference we get from the knote udata
1172 // anymore, and need our own
1173 flags
= DISPATCH_WAKEUP_CONSUME
;
1174 _dispatch_retain(ds
); // rdar://20382435
1177 if ((ke
->flags
& EV_UDATA_SPECIFIC
) && (ke
->flags
& EV_ONESHOT
) &&
1178 !(ke
->flags
& EV_DELETE
)) {
1179 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1180 DSF_DEFERRED_DELETE
, DSF_ARMED
);
1181 if (ke
->flags
& EV_VANISHED
) {
1182 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
1183 "monitored resource vanished before the source "
1184 "cancel handler was invoked", 0);
1186 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds
,
1187 (ke
->flags
& EV_VANISHED
) ? "vanished" :
1188 "deferred delete oneshot", (void*)ke
->udata
);
1189 } else if ((ke
->flags
& EV_DELETE
) || (ke
->flags
& EV_ONESHOT
)) {
1190 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1191 DSF_DELETED
, DSF_ARMED
);
1192 _dispatch_debug("kevent-source[%p]: delete kevent[%p]",
1193 ds
, (void*)ke
->udata
);
1194 if (ke
->flags
& EV_DELETE
) goto done
;
1195 } else if (ds
->ds_needs_rearm
) {
1196 dqf
= _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
1197 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p] ",
1198 ds
, (void*)ke
->udata
);
1200 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1203 if (dqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1204 goto done
; // rdar://20204025
1207 if (ke
->filter
== EVFILT_MACHPORT
&&
1208 dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
) {
1209 DISPATCH_INTERNAL_CRASH(ke
->flags
,"Unexpected kevent for mach channel");
1214 if ((ke
->flags
& EV_UDATA_SPECIFIC
) && (ke
->flags
& EV_ONESHOT
) &&
1215 (ke
->flags
& EV_VANISHED
)) {
1216 // if the resource behind the ident vanished, the event handler can't
1217 // do anything useful anymore, so do not try to call it at all
1219 // Note: if the kernel doesn't support EV_VANISHED we always get it
1220 // back unchanged from the flags passed at EV_ADD (registration) time
1221 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1222 // if we get both bits it was a real EV_VANISHED delivery
1223 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1225 } else if (ke
->filter
== EVFILT_MACHPORT
) {
1226 data
= DISPATCH_MACH_RECV_MESSAGE
;
1227 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1229 } else if (ds
->ds_is_level
) {
1230 // ke->data is signed and "negative available data" makes no sense
1231 // zero bytes happens when EV_EOF is set
1232 dispatch_assert(ke
->data
>= 0l);
1233 data
= ~(unsigned long)ke
->data
;
1234 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1235 } else if (ds
->ds_is_adder
) {
1236 data
= (unsigned long)ke
->data
;
1237 os_atomic_add2o(ds
, ds_pending_data
, data
, relaxed
);
1238 } else if (ke
->fflags
& ds
->ds_pending_data_mask
) {
1239 data
= ke
->fflags
& ds
->ds_pending_data_mask
;
1240 os_atomic_or2o(ds
, ds_pending_data
, data
, relaxed
);
1244 #if DISPATCH_USE_KEVENT_QOS
1245 pp
= ((pthread_priority_t
)ke
->qos
) & ~_PTHREAD_PRIORITY_FLAGS_MASK
;
1247 dx_wakeup(ds
, pp
, flags
| DISPATCH_WAKEUP_FLUSH
);
1251 #pragma mark dispatch_kevent_t
1253 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
1255 DISPATCH_CACHELINE_ALIGN
1256 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
1259 _dispatch_kevent_init()
1262 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
1263 TAILQ_INIT(&_dispatch_sources
[i
]);
1267 static inline uintptr_t
1268 _dispatch_kevent_hash(uint64_t ident
, short filter
)
1272 value
= (filter
== EVFILT_MACHPORT
||
1273 filter
== DISPATCH_EVFILT_MACH_NOTIFICATION
?
1274 MACH_PORT_INDEX(ident
) : ident
);
1279 return DSL_HASH((uintptr_t)value
);
1282 static dispatch_kevent_t
1283 _dispatch_kevent_find(uint64_t ident
, short filter
)
1285 uintptr_t hash
= _dispatch_kevent_hash(ident
, filter
);
1286 dispatch_kevent_t dki
;
1288 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
1289 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
1297 _dispatch_kevent_insert(dispatch_kevent_t dk
)
1299 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) return;
1300 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
1301 dk
->dk_kevent
.filter
);
1302 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
1305 // Find existing kevents, and merge any new flags if necessary
1307 _dispatch_kevent_register(dispatch_kevent_t
*dkp
, pthread_priority_t pp
,
1310 dispatch_kevent_t dk
= NULL
, ds_dkev
= *dkp
;
1312 bool do_resume
= false;
1314 if (!(ds_dkev
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1315 dk
= _dispatch_kevent_find(ds_dkev
->dk_kevent
.ident
,
1316 ds_dkev
->dk_kevent
.filter
);
1319 // If an existing dispatch kevent is found, check to see if new flags
1320 // need to be added to the existing kevent
1321 new_flags
= ~dk
->dk_kevent
.fflags
& ds_dkev
->dk_kevent
.fflags
;
1322 dk
->dk_kevent
.fflags
|= ds_dkev
->dk_kevent
.fflags
;
1325 do_resume
= new_flags
;
1328 #if DISPATCH_USE_KEVENT_WORKQUEUE
1329 if (!_dispatch_kevent_workqueue_enabled
) {
1331 } else if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1332 dk
->dk_kevent
.qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
1334 pp
&= (~_PTHREAD_PRIORITY_FLAGS_MASK
|
1335 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
1336 if (!pp
) pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
1337 _dispatch_assert_is_valid_qos_class(pp
);
1338 dk
->dk_kevent
.qos
= (_dispatch_kevent_priority_t
)pp
;
1343 _dispatch_kevent_insert(dk
);
1344 new_flags
= dk
->dk_kevent
.fflags
;
1347 // Re-register the kevent with the kernel if new flags were added
1348 // by the dispatch kevent
1350 dk
->dk_kevent
.flags
|= EV_ADD
;
1357 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
1362 if (dk
->dk_kevent
.flags
& EV_DELETE
) {
1365 switch (dk
->dk_kevent
.filter
) {
1366 case DISPATCH_EVFILT_TIMER
:
1367 case DISPATCH_EVFILT_CUSTOM_ADD
:
1368 case DISPATCH_EVFILT_CUSTOM_OR
:
1369 // these types not registered with kevent
1372 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1373 return _dispatch_kevent_mach_notify_resume(dk
, new_flags
, del_flags
);
1374 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1375 case EVFILT_MACHPORT
:
1376 if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1377 return _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
1383 // oneshot dk may be freed by the time we return from
1384 // _dispatch_kq_immediate_update if the event was delivered (and then
1385 // unregistered) concurrently.
1386 oneshot
= (dk
->dk_kevent
.flags
& EV_ONESHOT
);
1387 r
= _dispatch_kq_immediate_update(&dk
->dk_kevent
);
1388 if (r
&& (dk
->dk_kevent
.flags
& EV_ADD
) &&
1389 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1390 dk
->dk_kevent
.flags
|= EV_DELETE
;
1391 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
1392 } else if (!oneshot
&& (dk
->dk_kevent
.flags
& EV_DISPATCH
)) {
1393 // we can safely skip doing this for ONESHOT events because
1394 // the next kq update we will do is _dispatch_kevent_dispose()
1395 // which also clears EV_ADD.
1396 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_VANISHED
);
1400 (void)new_flags
; (void)del_flags
;
1404 _dispatch_kevent_dispose(dispatch_kevent_t dk
, unsigned int options
)
1407 switch (dk
->dk_kevent
.filter
) {
1408 case DISPATCH_EVFILT_TIMER
:
1409 case DISPATCH_EVFILT_CUSTOM_ADD
:
1410 case DISPATCH_EVFILT_CUSTOM_OR
:
1411 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) {
1414 // these sources live on statically allocated lists
1418 if (!(dk
->dk_kevent
.flags
& EV_DELETE
)) {
1419 dk
->dk_kevent
.flags
|= EV_DELETE
;
1420 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
1421 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1422 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1424 switch (dk
->dk_kevent
.filter
) {
1426 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1427 r
= _dispatch_kevent_mach_notify_resume(dk
, 0,dk
->dk_kevent
.fflags
);
1429 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1430 case EVFILT_MACHPORT
:
1431 if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1432 r
= _dispatch_kevent_machport_resume(dk
,0,dk
->dk_kevent
.fflags
);
1439 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1440 _dispatch_kq_deferred_update(&dk
->dk_kevent
);
1442 r
= _dispatch_kq_immediate_update(&dk
->dk_kevent
);
1446 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1447 dk
->dk_kevent
.flags
&= ~EV_ENABLE
;
1450 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) {
1451 bool deferred_delete
= (r
== EINPROGRESS
);
1452 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1453 if (r
== ENOENT
) deferred_delete
= true;
1455 if (deferred_delete
) {
1456 // deferred EV_DELETE or concurrent concurrent EV_DELETE delivery
1457 dk
->dk_kevent
.flags
&= ~EV_DELETE
;
1458 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1462 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
1463 dk
->dk_kevent
.filter
);
1464 TAILQ_REMOVE(&_dispatch_sources
[hash
], dk
, dk_list
);
1471 _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
,
1472 unsigned int options
)
1474 dispatch_source_refs_t dri
;
1475 uint32_t del_flags
, fflags
= 0;
1478 if (TAILQ_EMPTY(&dk
->dk_sources
) ||
1479 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1480 r
= _dispatch_kevent_dispose(dk
, options
);
1482 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
1483 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
1484 uint32_t mask
= (uint32_t)dsi
->ds_pending_data_mask
;
1487 del_flags
= flg
& ~fflags
;
1489 dk
->dk_kevent
.flags
|= EV_ADD
;
1490 dk
->dk_kevent
.fflags
&= ~del_flags
;
1491 r
= _dispatch_kevent_resume(dk
, 0, del_flags
);
1499 _dispatch_kevent_proc_exit(_dispatch_kevent_qos_s
*ke
)
1501 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
1502 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
1503 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
1504 _dispatch_kevent_qos_s fake
;
1506 fake
.flags
&= ~EV_ERROR
;
1507 fake
.flags
|= EV_ONESHOT
;
1508 fake
.fflags
= NOTE_EXIT
;
1510 _dispatch_kevent_debug("synthetic NOTE_EXIT", ke
);
1511 _dispatch_kevent_merge(&fake
);
1516 _dispatch_kevent_error(_dispatch_kevent_qos_s
*ke
)
1518 _dispatch_kevent_qos_s
*kev
= NULL
;
1520 if (ke
->flags
& EV_DELETE
) {
1521 if (ke
->flags
& EV_UDATA_SPECIFIC
) {
1522 if (ke
->data
== EINPROGRESS
) {
1523 // deferred EV_DELETE
1526 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1527 if (ke
->data
== ENOENT
) {
1528 // deferred EV_DELETE
1533 // for EV_DELETE if the update was deferred we may have reclaimed
1534 // our dispatch_kevent_t, and it is unsafe to dereference it now.
1535 } else if (ke
->udata
) {
1536 kev
= &((dispatch_kevent_t
)ke
->udata
)->dk_kevent
;
1537 ke
->flags
|= kev
->flags
;
1541 if (ke
->filter
== EVFILT_MACHPORT
&& ke
->data
== ENOTSUP
&&
1542 (ke
->flags
& EV_ADD
) && _dispatch_evfilt_machport_direct_enabled
&&
1543 kev
&& (kev
->fflags
& MACH_RCV_MSG
)) {
1544 DISPATCH_INTERNAL_CRASH(ke
->ident
,
1545 "Missing EVFILT_MACHPORT support for ports");
1550 // log the unexpected error
1551 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
1553 ke
->flags
& EV_DELETE
? "delete" :
1554 ke
->flags
& EV_ADD
? "add" :
1555 ke
->flags
& EV_ENABLE
? "enable" : "monitor",
1561 _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
)
1563 if (ke
->filter
== EVFILT_USER
) {
1564 _dispatch_kevent_mgr_debug(ke
);
1567 if (slowpath(ke
->flags
& EV_ERROR
)) {
1568 if (ke
->filter
== EVFILT_PROC
&& ke
->data
== ESRCH
) {
1569 _dispatch_debug("kevent[0x%llx]: ESRCH from EVFILT_PROC: "
1570 "generating fake NOTE_EXIT", (unsigned long long)ke
->udata
);
1571 return _dispatch_kevent_proc_exit(ke
);
1573 _dispatch_debug("kevent[0x%llx]: handling error",
1574 (unsigned long long)ke
->udata
);
1575 return _dispatch_kevent_error(ke
);
1577 if (ke
->filter
== EVFILT_TIMER
) {
1578 _dispatch_debug("kevent[0x%llx]: handling timer",
1579 (unsigned long long)ke
->udata
);
1580 return _dispatch_timers_kevent(ke
);
1583 if (ke
->filter
== EVFILT_MACHPORT
) {
1584 _dispatch_debug("kevent[0x%llx]: handling mach port",
1585 (unsigned long long)ke
->udata
);
1586 return _dispatch_mach_kevent_merge(ke
);
1589 return _dispatch_kevent_merge(ke
);
1594 _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
)
1596 dispatch_kevent_t dk
= (void*)ke
->udata
;
1597 dispatch_source_refs_t dri
, dr_next
;
1599 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
1600 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri
), ke
);
1605 #pragma mark dispatch_source_timer
1607 #if DISPATCH_USE_DTRACE
1608 static dispatch_source_refs_t
1609 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1610 #define _dispatch_trace_next_timer_set(x, q) \
1611 _dispatch_trace_next_timer[(q)] = (x)
1612 #define _dispatch_trace_next_timer_program(d, q) \
1613 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1614 #define _dispatch_trace_next_timer_wake(q) \
1615 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1617 #define _dispatch_trace_next_timer_set(x, q)
1618 #define _dispatch_trace_next_timer_program(d, q)
1619 #define _dispatch_trace_next_timer_wake(q)
1622 #define _dispatch_source_timer_telemetry_enabled() false
1626 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1627 dispatch_clock_t clock
, struct dispatch_timer_source_s
*values
)
1629 if (_dispatch_trace_timer_configure_enabled()) {
1630 _dispatch_trace_timer_configure(ds
, clock
, values
);
1634 DISPATCH_ALWAYS_INLINE
1636 _dispatch_source_timer_telemetry(dispatch_source_t ds
, dispatch_clock_t clock
,
1637 struct dispatch_timer_source_s
*values
)
1639 if (_dispatch_trace_timer_configure_enabled() ||
1640 _dispatch_source_timer_telemetry_enabled()) {
1641 _dispatch_source_timer_telemetry_slow(ds
, clock
, values
);
1642 asm(""); // prevent tailcall
1646 static inline unsigned long
1647 _dispatch_source_timer_data(dispatch_source_refs_t dr
, unsigned long prev
)
1649 // calculate the number of intervals since last fire
1650 unsigned long data
, missed
;
1652 now
= _dispatch_time_now(DISPATCH_TIMER_CLOCK(_dispatch_source_timer_idx(dr
)));
1653 missed
= (unsigned long)((now
- ds_timer(dr
).last_fire
) /
1654 ds_timer(dr
).interval
);
1655 // correct for missed intervals already delivered last time
1656 data
= prev
- ds_timer(dr
).missed
+ missed
;
1657 ds_timer(dr
).missed
= missed
;
1661 struct dispatch_set_timer_params
{
1662 dispatch_source_t ds
;
1663 struct dispatch_timer_source_s values
;
1664 dispatch_clock_t clock
;
1668 _dispatch_source_set_timer3(void *context
)
1670 // Called on the _dispatch_mgr_q
1671 struct dispatch_set_timer_params
*params
= context
;
1672 dispatch_source_t ds
= params
->ds
;
1673 dispatch_timer_source_refs_t dt
= (dispatch_timer_source_refs_t
)ds
->ds_refs
;
1675 params
->values
.flags
= ds_timer(dt
).flags
;
1676 if (params
->clock
== DISPATCH_CLOCK_WALL
) {
1677 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1679 _dispatch_mach_host_calendar_change_register();
1682 params
->values
.flags
&= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK
;
1684 ds_timer(dt
) = params
->values
;
1685 ds
->ds_ident_hack
= _dispatch_source_timer_idx(ds
->ds_refs
);
1686 // Clear any pending data that might have accumulated on
1687 // older timer params <rdar://problem/8574886>
1688 ds
->ds_pending_data
= 0;
1690 dispatch_resume(ds
);
1691 if (_dispatch_source_tryarm(ds
)) {
1692 // Re-arm in case we got disarmed because of pending set_timer suspension
1693 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
, dt
);
1694 // Must happen after resume to avoid getting disarmed due to suspension
1695 _dispatch_timers_update(ds
);
1697 dispatch_release(ds
);
1702 _dispatch_source_set_timer2(void *context
)
1704 // Called on the source queue
1705 struct dispatch_set_timer_params
*params
= context
;
1706 dispatch_suspend(params
->ds
);
1707 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q
, params
,
1708 _dispatch_source_set_timer3
);
1712 static struct dispatch_set_timer_params
*
1713 _dispatch_source_timer_params(dispatch_source_t ds
, dispatch_time_t start
,
1714 uint64_t interval
, uint64_t leeway
)
1716 struct dispatch_set_timer_params
*params
;
1717 params
= _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params
));
1720 if (interval
== 0) {
1721 // we use zero internally to mean disabled
1723 } else if ((int64_t)interval
< 0) {
1724 // 6866347 - make sure nanoseconds won't overflow
1725 interval
= INT64_MAX
;
1727 if ((int64_t)leeway
< 0) {
1730 if (start
== DISPATCH_TIME_NOW
) {
1731 start
= _dispatch_absolute_time();
1732 } else if (start
== DISPATCH_TIME_FOREVER
) {
1736 if ((int64_t)start
< 0) {
1738 start
= (dispatch_time_t
)-((int64_t)start
);
1739 params
->clock
= DISPATCH_CLOCK_WALL
;
1742 interval
= _dispatch_time_nano2mach(interval
);
1744 // rdar://problem/7287561 interval must be at least one in
1745 // in order to avoid later division by zero when calculating
1746 // the missed interval count. (NOTE: the wall clock's
1747 // interval is already "fixed" to be 1 or more)
1750 leeway
= _dispatch_time_nano2mach(leeway
);
1751 params
->clock
= DISPATCH_CLOCK_MACH
;
1753 params
->values
.target
= start
;
1754 params
->values
.deadline
= (start
< UINT64_MAX
- leeway
) ?
1755 start
+ leeway
: UINT64_MAX
;
1756 params
->values
.interval
= interval
;
1757 params
->values
.leeway
= (interval
== INT64_MAX
|| leeway
< interval
/ 2) ?
1758 leeway
: interval
/ 2;
1762 DISPATCH_ALWAYS_INLINE
1764 _dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1765 uint64_t interval
, uint64_t leeway
, bool source_sync
)
1767 if (slowpath(!ds
->ds_is_timer
) ||
1768 slowpath(ds_timer(ds
->ds_refs
).flags
& DISPATCH_TIMER_INTERVAL
)) {
1769 DISPATCH_CLIENT_CRASH(ds
, "Attempt to set timer on a non-timer source");
1772 struct dispatch_set_timer_params
*params
;
1773 params
= _dispatch_source_timer_params(ds
, start
, interval
, leeway
);
1775 _dispatch_source_timer_telemetry(ds
, params
->clock
, ¶ms
->values
);
1776 // Suspend the source so that it doesn't fire with pending changes
1777 // The use of suspend/resume requires the external retain/release
1778 dispatch_retain(ds
);
1780 return _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, params
,
1781 _dispatch_source_set_timer2
);
1783 return _dispatch_source_set_timer2(params
);
1788 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1789 uint64_t interval
, uint64_t leeway
)
1791 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, true);
1795 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds
,
1796 dispatch_time_t start
, uint64_t interval
, uint64_t leeway
)
1798 // Don't serialize through the source queue for CF timers <rdar://13833190>
1799 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, false);
1803 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1805 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1806 // approx 1 year (60s * 60m * 24h * 365d)
1807 #define FOREVER_NSEC 31536000000000000ull
1809 dispatch_source_refs_t dr
= ds
->ds_refs
;
1810 const bool animation
= ds_timer(dr
).flags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1811 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1812 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1813 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1815 interval
= FOREVER_NSEC
;
1817 interval
= _dispatch_time_nano2mach(interval
);
1818 uint64_t target
= _dispatch_absolute_time() + interval
;
1819 target
= (target
/ interval
) * interval
;
1820 const uint64_t leeway
= animation
?
1821 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1822 ds_timer(dr
).target
= target
;
1823 ds_timer(dr
).deadline
= target
+ leeway
;
1824 ds_timer(dr
).interval
= interval
;
1825 ds_timer(dr
).leeway
= leeway
;
1826 dispatch_clock_t clock
= DISPATCH_TIMER_CLOCK(ds
->ds_ident_hack
);
1827 _dispatch_source_timer_telemetry(ds
, clock
, &ds_timer(dr
));
1831 #pragma mark dispatch_timers
1833 #define DISPATCH_TIMER_STRUCT(refs) \
1834 uint64_t target, deadline; \
1835 TAILQ_HEAD(, refs) dt_sources
1837 typedef struct dispatch_timer_s
{
1838 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s
);
1839 } *dispatch_timer_t
;
1841 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1843 .target = UINT64_MAX, \
1844 .deadline = UINT64_MAX, \
1845 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1846 _dispatch_timer[tidx].dt_sources), \
1848 #define DISPATCH_TIMER_INIT(kind, qos) \
1849 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1850 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos))
1852 struct dispatch_timer_s _dispatch_timer
[] = {
1853 DISPATCH_TIMER_INIT(WALL
, NORMAL
),
1854 DISPATCH_TIMER_INIT(WALL
, CRITICAL
),
1855 DISPATCH_TIMER_INIT(WALL
, BACKGROUND
),
1856 DISPATCH_TIMER_INIT(MACH
, NORMAL
),
1857 DISPATCH_TIMER_INIT(MACH
, CRITICAL
),
1858 DISPATCH_TIMER_INIT(MACH
, BACKGROUND
),
1860 #define DISPATCH_TIMER_COUNT \
1861 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1864 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1865 (void*)&_dispatch_kevent_timer[tidx]
1867 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1868 (uintptr_t)&_dispatch_kevent_timer[tidx]
1871 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1872 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1874 // dynamic initialization in _dispatch_timers_init()
1875 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1878 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1882 .filter = DISPATCH_EVFILT_TIMER, \
1883 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1885 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1886 _dispatch_kevent_timer[tidx].dk_sources), \
1888 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1889 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1890 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos))
1892 struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
1893 DISPATCH_KEVENT_TIMER_INIT(WALL
, NORMAL
),
1894 DISPATCH_KEVENT_TIMER_INIT(WALL
, CRITICAL
),
1895 DISPATCH_KEVENT_TIMER_INIT(WALL
, BACKGROUND
),
1896 DISPATCH_KEVENT_TIMER_INIT(MACH
, NORMAL
),
1897 DISPATCH_KEVENT_TIMER_INIT(MACH
, CRITICAL
),
1898 DISPATCH_KEVENT_TIMER_INIT(MACH
, BACKGROUND
),
1899 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM
),
1901 #define DISPATCH_KEVENT_TIMER_COUNT \
1902 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1904 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1905 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(tidx, note) \
1907 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(tidx), \
1908 .filter = EVFILT_TIMER, \
1909 .flags = EV_ONESHOT, \
1910 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1912 #define DISPATCH_KEVENT_TIMEOUT_INIT(kind, qos, note) \
1913 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_INDEX( \
1914 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos), note)
1916 _dispatch_kevent_qos_s _dispatch_kevent_timeout
[] = {
1917 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, NORMAL
, NOTE_MACH_CONTINUOUS_TIME
),
1918 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, CRITICAL
, NOTE_MACH_CONTINUOUS_TIME
| NOTE_CRITICAL
),
1919 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, BACKGROUND
, NOTE_MACH_CONTINUOUS_TIME
| NOTE_BACKGROUND
),
1920 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, NORMAL
, 0),
1921 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, CRITICAL
, NOTE_CRITICAL
),
1922 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, BACKGROUND
, NOTE_BACKGROUND
),
1924 #define DISPATCH_KEVENT_TIMEOUT_COUNT \
1925 ((sizeof(_dispatch_kevent_timeout) / sizeof(_dispatch_kevent_timeout[0])))
1926 static_assert(DISPATCH_KEVENT_TIMEOUT_COUNT
== DISPATCH_TIMER_INDEX_COUNT
- 1,
1927 "should have a kevent for everything but disarm (ddt assumes this)");
1929 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1930 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1932 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
1933 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
1934 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
1935 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
1938 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1939 typeof(dr) dri = NULL; typeof(dt) dti; \
1940 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1941 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1942 if (ds_timer(dr).target < ds_timer(dri).target) { \
1946 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1947 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1952 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1954 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1958 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1960 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1964 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1966 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1967 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1969 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1972 #define _dispatch_timers_check(dra, dta) ({ \
1973 unsigned int timerm = _dispatch_timers_mask; \
1974 bool update = false; \
1975 unsigned int tidx; \
1976 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1977 if (!(timerm & (1 << tidx))){ \
1980 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1981 TAILQ_FIRST(&dra[tidx].dk_sources); \
1982 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1983 TAILQ_FIRST(&dta[tidx].dt_sources); \
1984 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1985 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1986 if (target != dta[tidx].target) { \
1987 dta[tidx].target = target; \
1990 if (deadline != dta[tidx].deadline) { \
1991 dta[tidx].deadline = deadline; \
1997 static bool _dispatch_timers_reconfigure
, _dispatch_timer_expired
;
1998 static unsigned int _dispatch_timers_mask
;
1999 static bool _dispatch_timers_force_max_leeway
;
2002 _dispatch_timers_init(void)
2006 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2007 _dispatch_kevent_timer
[tidx
].dk_kevent
.udata
=
2008 DISPATCH_KEVENT_TIMER_UDATA(tidx
);
2011 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
2012 _dispatch_timers_force_max_leeway
= true;
2017 _dispatch_timers_unregister(dispatch_source_t ds
, dispatch_kevent_t dk
)
2019 dispatch_source_refs_t dr
= ds
->ds_refs
;
2020 unsigned int tidx
= (unsigned int)dk
->dk_kevent
.ident
;
2022 if (slowpath(ds_timer_aggregate(ds
))) {
2023 _dispatch_timer_aggregates_unregister(ds
, tidx
);
2025 _dispatch_timers_remove(tidx
, dk
, _dispatch_kevent_timer
, dr
, dr_list
,
2026 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
2027 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2028 _dispatch_timers_reconfigure
= true;
2029 _dispatch_timers_mask
|= 1 << tidx
;
2033 // Updates the ordered list of timers based on next fire date for changes to ds.
2034 // Should only be called from the context of _dispatch_mgr_q.
2036 _dispatch_timers_update(dispatch_source_t ds
)
2038 dispatch_kevent_t dk
= ds
->ds_dkev
;
2039 dispatch_source_refs_t dr
= ds
->ds_refs
;
2042 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2044 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
2045 if (slowpath(!dk
)) {
2048 // Move timers that are disabled, suspended or have missed intervals to the
2049 // disarmed list, rearm after resume resp. source invoke will reenable them
2050 if (!ds_timer(dr
).target
|| DISPATCH_QUEUE_IS_SUSPENDED(ds
) ||
2051 ds
->ds_pending_data
) {
2052 tidx
= DISPATCH_TIMER_INDEX_DISARM
;
2053 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
2054 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
,
2057 tidx
= _dispatch_source_timer_idx(dr
);
2059 if (slowpath(ds_timer_aggregate(ds
))) {
2060 _dispatch_timer_aggregates_register(ds
);
2062 if (slowpath(!ds
->ds_is_installed
)) {
2063 ds
->ds_is_installed
= true;
2064 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2065 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
2066 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
2069 _dispatch_object_debug(ds
, "%s", __func__
);
2073 _dispatch_timers_unregister(ds
, dk
);
2075 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2076 _dispatch_timers_reconfigure
= true;
2077 _dispatch_timers_mask
|= 1 << tidx
;
2079 if (dk
!= &_dispatch_kevent_timer
[tidx
]){
2080 ds
->ds_dkev
= &_dispatch_kevent_timer
[tidx
];
2082 _dispatch_timers_insert(tidx
, _dispatch_kevent_timer
, dr
, dr_list
,
2083 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
2084 if (slowpath(ds_timer_aggregate(ds
))) {
2085 _dispatch_timer_aggregates_update(ds
, tidx
);
2090 _dispatch_timers_run2(dispatch_clock_now_cache_t nows
, unsigned int tidx
)
2092 dispatch_source_refs_t dr
;
2093 dispatch_source_t ds
;
2094 uint64_t now
, missed
;
2096 now
= _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx
), nows
);
2097 while ((dr
= TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
))) {
2098 ds
= _dispatch_source_from_refs(dr
);
2099 // We may find timers on the wrong list due to a pending update from
2100 // dispatch_source_set_timer. Force an update of the list in that case.
2101 if (tidx
!= ds
->ds_ident_hack
) {
2102 _dispatch_timers_update(ds
);
2105 if (!ds_timer(dr
).target
) {
2106 // No configured timers on the list
2109 if (ds_timer(dr
).target
> now
) {
2110 // Done running timers for now.
2113 // Remove timers that are suspended or have missed intervals from the
2114 // list, rearm after resume resp. source invoke will reenable them
2115 if (DISPATCH_QUEUE_IS_SUSPENDED(ds
) || ds
->ds_pending_data
) {
2116 _dispatch_timers_update(ds
);
2119 // Calculate number of missed intervals.
2120 missed
= (now
- ds_timer(dr
).target
) / ds_timer(dr
).interval
;
2121 if (++missed
> INT_MAX
) {
2124 if (ds_timer(dr
).interval
< INT64_MAX
) {
2125 ds_timer(dr
).target
+= missed
* ds_timer(dr
).interval
;
2126 ds_timer(dr
).deadline
= ds_timer(dr
).target
+ ds_timer(dr
).leeway
;
2128 ds_timer(dr
).target
= UINT64_MAX
;
2129 ds_timer(dr
).deadline
= UINT64_MAX
;
2131 _dispatch_timers_update(ds
);
2132 ds_timer(dr
).last_fire
= now
;
2135 data
= os_atomic_add2o(ds
, ds_pending_data
,
2136 (unsigned long)missed
, relaxed
);
2137 _dispatch_trace_timer_fire(dr
, data
, (unsigned long)missed
);
2138 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
);
2139 if (ds_timer(dr
).flags
& DISPATCH_TIMER_AFTER
) {
2140 _dispatch_source_kevent_unregister(ds
);
2147 _dispatch_timers_run(dispatch_clock_now_cache_t nows
)
2150 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2151 if (!TAILQ_EMPTY(&_dispatch_kevent_timer
[tidx
].dk_sources
)) {
2152 _dispatch_timers_run2(nows
, tidx
);
2157 #define DISPATCH_TIMERS_GET_DELAY_ALL (~0u)
2159 static inline unsigned int
2160 _dispatch_timers_get_delay(dispatch_clock_now_cache_t nows
,
2161 struct dispatch_timer_s timer
[],
2162 uint64_t *delay
, uint64_t *leeway
, unsigned int query
)
2164 unsigned int tidx
, ridx
= DISPATCH_TIMER_COUNT
, minidx
, maxidx
;
2165 uint64_t tmp
, delta
= INT64_MAX
, dldelta
= INT64_MAX
;
2167 if (query
== DISPATCH_TIMERS_GET_DELAY_ALL
) {
2169 maxidx
= DISPATCH_TIMER_COUNT
- 1;
2171 minidx
= maxidx
= query
;
2174 for (tidx
= minidx
; tidx
<= maxidx
; tidx
++) {
2175 dispatch_clock_t clock
= DISPATCH_TIMER_CLOCK(tidx
);
2176 uint64_t target
= timer
[tidx
].target
;
2177 if (target
>= INT64_MAX
) {
2180 uint64_t deadline
= timer
[tidx
].deadline
;
2181 if (query
!= DISPATCH_TIMERS_GET_DELAY_ALL
) {
2182 // Timer pre-coalescing <rdar://problem/13222034>
2183 unsigned int qos
= DISPATCH_TIMER_QOS(tidx
);
2184 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
2185 uint64_t latest
= deadline
> window
? deadline
- window
: 0;
2186 dispatch_source_refs_t dri
;
2187 TAILQ_FOREACH(dri
, &_dispatch_kevent_timer
[tidx
].dk_sources
,
2189 tmp
= ds_timer(dri
).target
;
2190 if (tmp
> latest
) break;
2194 uint64_t now
= _dispatch_time_now_cached(clock
, nows
);
2195 if (target
<= now
) {
2200 if (clock
!= DISPATCH_CLOCK_WALL
) {
2201 tmp
= _dispatch_time_mach2nano(tmp
);
2203 if (tmp
< INT64_MAX
&& tmp
< delta
) {
2207 dispatch_assert(target
<= deadline
);
2208 tmp
= deadline
- now
;
2209 if (clock
!= DISPATCH_CLOCK_WALL
) {
2210 tmp
= _dispatch_time_mach2nano(tmp
);
2212 if (tmp
< INT64_MAX
&& tmp
< dldelta
) {
2217 *leeway
= delta
&& delta
< INT64_MAX
? dldelta
- delta
: INT64_MAX
;
2223 // in linux we map the _dispatch_kevent_qos_s to struct kevent instead
2224 // of struct kevent64. We loose the kevent.ext[] members and the time
2225 // out is based on relavite msec based time vs. absolute nsec based time.
2226 // For now we make the adjustments right here until the solution
2227 // to either extend libkqueue with a proper kevent64 API or removing kevent
2228 // all together and move to a lower API (e.g. epoll or kernel_module.
2229 // Also leeway is ignored.
2232 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s
*ke
, uint64_t delay
,
2233 uint64_t leeway
, dispatch_clock_now_cache_t nows
)
2235 // call to update nows[]
2236 _dispatch_time_now_cached(DISPATCH_CLOCK_WALL
, nows
);
2237 #ifdef KEVENT_NSEC_NOT_SUPPORTED
2238 // adjust nsec based delay to msec based and ignore leeway
2240 if ((int64_t)(delay
) <= 0) {
2241 delay
= 1; // if value <= 0 the dispatch will stop
2244 ke
->fflags
|= NOTE_NSECONDS
;
2246 ke
->data
= (int64_t)delay
;
2251 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s
*ke
, uint64_t delay
,
2252 uint64_t leeway
, dispatch_clock_now_cache_t nows
)
2254 delay
+= _dispatch_time_now_cached(DISPATCH_CLOCK_WALL
, nows
);
2255 if (slowpath(_dispatch_timers_force_max_leeway
)) {
2256 ke
->data
= (int64_t)(delay
+ leeway
);
2259 ke
->data
= (int64_t)delay
;
2260 ke
->ext
[1] = leeway
;
2266 _dispatch_timers_program2(dispatch_clock_now_cache_t nows
,
2267 _dispatch_kevent_qos_s
*ke
, unsigned int tidx
)
2270 uint64_t delay
, leeway
;
2272 _dispatch_timers_get_delay(nows
, _dispatch_timer
, &delay
, &leeway
, tidx
);
2273 poll
= (delay
== 0);
2274 if (poll
|| delay
== UINT64_MAX
) {
2275 _dispatch_trace_next_timer_set(NULL
, DISPATCH_TIMER_QOS(tidx
));
2280 ke
->flags
|= EV_DELETE
;
2281 ke
->flags
&= ~(EV_ADD
|EV_ENABLE
);
2283 _dispatch_trace_next_timer_set(
2284 TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
), DISPATCH_TIMER_QOS(tidx
));
2285 _dispatch_trace_next_timer_program(delay
, DISPATCH_TIMER_QOS(tidx
));
2286 _dispatch_kevent_timer_set_delay(ke
, delay
, leeway
, nows
);
2287 ke
->flags
|= EV_ADD
|EV_ENABLE
;
2288 ke
->flags
&= ~EV_DELETE
;
2289 #if DISPATCH_USE_KEVENT_WORKQUEUE
2290 if (_dispatch_kevent_workqueue_enabled
) {
2291 ke
->qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
2295 _dispatch_kq_deferred_update(ke
);
2301 _dispatch_timers_program(dispatch_clock_now_cache_t nows
)
2304 unsigned int tidx
, timerm
= _dispatch_timers_mask
;
2305 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMEOUT_COUNT
; tidx
++) {
2306 if (!(timerm
& 1 << tidx
)){
2309 poll
|= _dispatch_timers_program2(nows
, &_dispatch_kevent_timeout
[tidx
],
2317 _dispatch_timers_configure(void)
2319 _dispatch_timer_aggregates_check();
2320 // Find out if there is a new target/deadline on the timer lists
2321 return _dispatch_timers_check(_dispatch_kevent_timer
, _dispatch_timer
);
2326 _dispatch_timers_calendar_change(void)
2330 // calendar change may have gone past the wallclock deadline
2331 _dispatch_timer_expired
= true;
2332 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
2333 _dispatch_timers_mask
|=
2334 1 << DISPATCH_TIMER_INDEX(DISPATCH_CLOCK_WALL
, qos
);
2340 _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
)
2342 dispatch_assert(ke
->data
> 0);
2343 dispatch_assert((ke
->ident
& DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
) ==
2344 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
);
2345 unsigned int tidx
= ke
->ident
& ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
;
2346 dispatch_assert(tidx
< DISPATCH_KEVENT_TIMEOUT_COUNT
);
2347 dispatch_assert(_dispatch_kevent_timeout
[tidx
].data
!= 0);
2348 _dispatch_kevent_timeout
[tidx
].data
= 0; // kevent deleted via EV_ONESHOT
2349 _dispatch_timer_expired
= true;
2350 _dispatch_timers_mask
|= 1 << tidx
;
2351 _dispatch_trace_next_timer_wake(DISPATCH_TIMER_QOS(tidx
));
2355 _dispatch_mgr_timers(void)
2357 dispatch_clock_now_cache_s nows
= { };
2358 bool expired
= slowpath(_dispatch_timer_expired
);
2360 _dispatch_timers_run(&nows
);
2362 bool reconfigure
= slowpath(_dispatch_timers_reconfigure
);
2363 if (reconfigure
|| expired
) {
2365 reconfigure
= _dispatch_timers_configure();
2366 _dispatch_timers_reconfigure
= false;
2368 if (reconfigure
|| expired
) {
2369 expired
= _dispatch_timer_expired
= _dispatch_timers_program(&nows
);
2370 expired
= expired
|| _dispatch_mgr_q
.dq_items_tail
;
2372 _dispatch_timers_mask
= 0;
2378 #pragma mark dispatch_timer_aggregate
2381 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s
) dk_sources
;
2382 } dispatch_timer_aggregate_refs_s
;
2384 typedef struct dispatch_timer_aggregate_s
{
2385 DISPATCH_QUEUE_HEADER(queue
);
2386 TAILQ_ENTRY(dispatch_timer_aggregate_s
) dta_list
;
2387 dispatch_timer_aggregate_refs_s
2388 dta_kevent_timer
[DISPATCH_KEVENT_TIMER_COUNT
];
2390 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s
);
2391 } dta_timer
[DISPATCH_TIMER_COUNT
];
2392 struct dispatch_timer_s dta_timer_data
[DISPATCH_TIMER_COUNT
];
2393 unsigned int dta_refcount
;
2394 } DISPATCH_QUEUE_ALIGN dispatch_timer_aggregate_s
;
2396 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s
) dispatch_timer_aggregates_s
;
2397 static dispatch_timer_aggregates_s _dispatch_timer_aggregates
=
2398 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates
);
2400 dispatch_timer_aggregate_t
2401 dispatch_timer_aggregate_create(void)
2404 dispatch_timer_aggregate_t dta
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
2405 sizeof(struct dispatch_timer_aggregate_s
));
2406 _dispatch_queue_init(dta
->_as_dq
, DQF_NONE
,
2407 DISPATCH_QUEUE_WIDTH_MAX
, false);
2408 dta
->do_targetq
= _dispatch_get_root_queue(
2409 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
2410 //FIXME: aggregates need custom vtable
2411 //dta->dq_label = "timer-aggregate";
2412 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMER_COUNT
; tidx
++) {
2413 TAILQ_INIT(&dta
->dta_kevent_timer
[tidx
].dk_sources
);
2415 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2416 TAILQ_INIT(&dta
->dta_timer
[tidx
].dt_sources
);
2417 dta
->dta_timer
[tidx
].target
= UINT64_MAX
;
2418 dta
->dta_timer
[tidx
].deadline
= UINT64_MAX
;
2419 dta
->dta_timer_data
[tidx
].target
= UINT64_MAX
;
2420 dta
->dta_timer_data
[tidx
].deadline
= UINT64_MAX
;
2422 return (dispatch_timer_aggregate_t
)_dispatch_introspection_queue_create(
2426 typedef struct dispatch_timer_delay_s
{
2427 dispatch_timer_t timer
;
2428 uint64_t delay
, leeway
;
2429 } *dispatch_timer_delay_t
;
2432 _dispatch_timer_aggregate_get_delay(void *ctxt
)
2434 dispatch_timer_delay_t dtd
= ctxt
;
2435 dispatch_clock_now_cache_s nows
= { };
2436 _dispatch_timers_get_delay(&nows
, dtd
->timer
, &dtd
->delay
, &dtd
->leeway
,
2437 DISPATCH_TIMERS_GET_DELAY_ALL
);
2441 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta
,
2442 uint64_t *leeway_ptr
)
2444 struct dispatch_timer_delay_s dtd
= {
2445 .timer
= dta
->dta_timer_data
,
2447 dispatch_sync_f(dta
->_as_dq
, &dtd
, _dispatch_timer_aggregate_get_delay
);
2449 *leeway_ptr
= dtd
.leeway
;
2455 _dispatch_timer_aggregate_update(void *ctxt
)
2457 dispatch_timer_aggregate_t dta
= (void*)_dispatch_queue_get_current();
2458 dispatch_timer_t dtau
= ctxt
;
2460 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2461 dta
->dta_timer_data
[tidx
].target
= dtau
[tidx
].target
;
2462 dta
->dta_timer_data
[tidx
].deadline
= dtau
[tidx
].deadline
;
2469 _dispatch_timer_aggregates_configure(void)
2471 dispatch_timer_aggregate_t dta
;
2472 dispatch_timer_t dtau
;
2473 TAILQ_FOREACH(dta
, &_dispatch_timer_aggregates
, dta_list
) {
2474 if (!_dispatch_timers_check(dta
->dta_kevent_timer
, dta
->dta_timer
)) {
2477 dtau
= _dispatch_calloc(DISPATCH_TIMER_COUNT
, sizeof(*dtau
));
2478 memcpy(dtau
, dta
->dta_timer
, sizeof(dta
->dta_timer
));
2479 _dispatch_barrier_async_detached_f(dta
->_as_dq
, dtau
,
2480 _dispatch_timer_aggregate_update
);
2485 _dispatch_timer_aggregates_check(void)
2487 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates
))) {
2490 _dispatch_timer_aggregates_configure();
2494 _dispatch_timer_aggregates_register(dispatch_source_t ds
)
2496 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2497 if (!dta
->dta_refcount
++) {
2498 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates
, dta
, dta_list
);
2504 _dispatch_timer_aggregates_update(dispatch_source_t ds
, unsigned int tidx
)
2506 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2507 dispatch_timer_source_aggregate_refs_t dr
;
2508 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2509 _dispatch_timers_insert(tidx
, dta
->dta_kevent_timer
, dr
, dra_list
,
2510 dta
->dta_timer
, dr
, dta_list
);
2515 _dispatch_timer_aggregates_unregister(dispatch_source_t ds
, unsigned int tidx
)
2517 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2518 dispatch_timer_source_aggregate_refs_t dr
;
2519 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2520 _dispatch_timers_remove(tidx
, (dispatch_timer_aggregate_refs_s
*)NULL
,
2521 dta
->dta_kevent_timer
, dr
, dra_list
, dta
->dta_timer
, dr
, dta_list
);
2522 if (!--dta
->dta_refcount
) {
2523 TAILQ_REMOVE(&_dispatch_timer_aggregates
, dta
, dta_list
);
2528 #pragma mark dispatch_kqueue
2530 static int _dispatch_kq
;
2532 #if DISPATCH_DEBUG_QOS && DISPATCH_USE_KEVENT_WORKQUEUE
2533 #define _dispatch_kevent_assert_valid_qos(ke) ({ \
2534 if (_dispatch_kevent_workqueue_enabled) { \
2535 const _dispatch_kevent_qos_s *_ke = (ke); \
2536 if (_ke->flags & (EV_ADD|EV_ENABLE)) { \
2537 _dispatch_assert_is_valid_qos_class(\
2538 (pthread_priority_t)_ke->qos); \
2539 dispatch_assert(_ke->qos); \
2544 #define _dispatch_kevent_assert_valid_qos(ke) ((void)ke)
2549 _dispatch_kq_init(void *context DISPATCH_UNUSED
)
2551 _dispatch_fork_becomes_unsafe();
2552 #if DISPATCH_USE_KEVENT_WORKQUEUE
2553 _dispatch_kevent_workqueue_init();
2554 if (_dispatch_kevent_workqueue_enabled
) {
2556 const _dispatch_kevent_qos_s kev
[] = {
2559 .filter
= EVFILT_USER
,
2560 .flags
= EV_ADD
|EV_CLEAR
,
2561 .qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
,
2565 .filter
= EVFILT_USER
,
2566 .fflags
= NOTE_TRIGGER
,
2571 r
= kevent_qos(-1, kev
, 2, NULL
, 0, NULL
, NULL
,
2572 KEVENT_FLAG_WORKQ
|KEVENT_FLAG_IMMEDIATE
);
2573 if (slowpath(r
== -1)) {
2579 DISPATCH_CLIENT_CRASH(err
,
2580 "Failed to initalize workqueue kevent");
2586 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2587 #if DISPATCH_USE_MGR_THREAD
2588 static const _dispatch_kevent_qos_s kev
= {
2590 .filter
= EVFILT_USER
,
2591 .flags
= EV_ADD
|EV_CLEAR
,
2594 _dispatch_fork_becomes_unsafe();
2595 #if DISPATCH_USE_GUARDED_FD
2596 guardid_t guard
= (uintptr_t)&kev
;
2597 _dispatch_kq
= guarded_kqueue_np(&guard
, GUARD_CLOSE
| GUARD_DUP
);
2599 _dispatch_kq
= kqueue();
2601 if (_dispatch_kq
== -1) {
2605 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2606 "process is out of file descriptors");
2609 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2610 "system is out of file descriptors");
2613 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2614 "kernel is out of memory");
2617 DISPATCH_INTERNAL_CRASH(err
, "kqueue() failure");
2621 (void)dispatch_assume_zero(kevent_qos(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
,
2623 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
, 0);
2624 #endif // DISPATCH_USE_MGR_THREAD
2629 _dispatch_kq_update(const _dispatch_kevent_qos_s
*ke
, int n
)
2632 _dispatch_kevent_qos_s kev_error
[n
];
2633 static dispatch_once_t pred
;
2634 dispatch_once_f(&pred
, NULL
, _dispatch_kq_init
);
2636 for (i
= 0; i
< n
; i
++) {
2637 if (ke
[i
].filter
!= EVFILT_USER
|| DISPATCH_MGR_QUEUE_DEBUG
) {
2638 _dispatch_kevent_debug_n("updating", ke
+ i
, i
, n
);
2642 unsigned int flags
= KEVENT_FLAG_ERROR_EVENTS
;
2643 #if DISPATCH_USE_KEVENT_WORKQUEUE
2644 if (_dispatch_kevent_workqueue_enabled
) {
2645 flags
|= KEVENT_FLAG_WORKQ
;
2650 r
= kevent_qos(_dispatch_kq
, ke
, n
, kev_error
, n
, NULL
, NULL
, flags
);
2651 if (slowpath(r
== -1)) {
2657 DISPATCH_CLIENT_CRASH(err
, "Do not close random Unix descriptors");
2660 (void)dispatch_assume_zero(err
);
2665 for (i
= 0, n
= r
; i
< n
; i
++) {
2666 if (kev_error
[i
].flags
& EV_ERROR
) {
2667 _dispatch_kevent_debug("returned error", &kev_error
[i
]);
2668 _dispatch_kevent_drain(&kev_error
[i
]);
2669 r
= (int)kev_error
[i
].data
;
2671 _dispatch_kevent_mgr_debug(&kev_error
[i
]);
2678 DISPATCH_ALWAYS_INLINE
2680 _dispatch_kq_update_all(const _dispatch_kevent_qos_s
*kev
, int n
)
2682 (void)_dispatch_kq_update(kev
, n
);
2685 DISPATCH_ALWAYS_INLINE
2687 _dispatch_kq_update_one(const _dispatch_kevent_qos_s
*kev
)
2689 return _dispatch_kq_update(kev
, 1);
2693 _dispatch_kevent_maps_to_same_knote(const _dispatch_kevent_qos_s
*e1
,
2694 const _dispatch_kevent_qos_s
*e2
)
2696 return e1
->filter
== e2
->filter
&&
2697 e1
->ident
== e2
->ident
&&
2698 e1
->udata
== e2
->udata
;
2702 _dispatch_deferred_event_find_slot(dispatch_deferred_items_t ddi
,
2703 const _dispatch_kevent_qos_s
*ke
)
2705 _dispatch_kevent_qos_s
*events
= ddi
->ddi_eventlist
;
2708 for (i
= 0; i
< ddi
->ddi_nevents
; i
++) {
2709 if (_dispatch_kevent_maps_to_same_knote(&events
[i
], ke
)) {
2717 _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s
*ke
)
2719 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
2722 _dispatch_kevent_assert_valid_qos(ke
);
2724 if (unlikely(ddi
->ddi_nevents
== ddi
->ddi_maxevents
)) {
2725 _dispatch_deferred_items_set(NULL
);
2726 _dispatch_kq_update_all(ddi
->ddi_eventlist
, ddi
->ddi_nevents
);
2727 ddi
->ddi_nevents
= 0;
2728 _dispatch_deferred_items_set(ddi
);
2730 if (ke
->filter
!= EVFILT_USER
|| DISPATCH_MGR_QUEUE_DEBUG
) {
2731 _dispatch_kevent_debug("deferred", ke
);
2733 bool needs_enable
= false;
2734 slot
= _dispatch_deferred_event_find_slot(ddi
, ke
);
2735 if (slot
== ddi
->ddi_nevents
) {
2737 } else if (ke
->flags
& EV_DELETE
) {
2738 // <rdar://problem/26202376> when deleting and an enable is pending,
2739 // we must merge EV_ENABLE to do an immediate deletion
2740 needs_enable
= (ddi
->ddi_eventlist
[slot
].flags
& EV_ENABLE
);
2742 ddi
->ddi_eventlist
[slot
] = *ke
;
2744 ddi
->ddi_eventlist
[slot
].flags
|= EV_ENABLE
;
2747 _dispatch_kq_update_one(ke
);
2752 _dispatch_kq_immediate_update(_dispatch_kevent_qos_s
*ke
)
2754 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
2757 _dispatch_kevent_assert_valid_qos(ke
);
2759 _dispatch_kevent_qos_s
*events
= ddi
->ddi_eventlist
;
2760 slot
= _dispatch_deferred_event_find_slot(ddi
, ke
);
2761 if (slot
< ddi
->ddi_nevents
) {
2762 // <rdar://problem/26202376> when deleting and an enable is pending,
2763 // we must merge EV_ENABLE to do an immediate deletion
2764 if ((ke
->flags
& EV_DELETE
) && (events
[slot
].flags
& EV_ENABLE
)) {
2765 ke
->flags
|= EV_ENABLE
;
2767 last
= --ddi
->ddi_nevents
;
2769 events
[slot
] = events
[last
];
2773 return _dispatch_kq_update_one(ke
);
2777 #pragma mark dispatch_mgr
2781 _dispatch_mgr_queue_poke(dispatch_queue_t dq DISPATCH_UNUSED
,
2782 pthread_priority_t pp DISPATCH_UNUSED
)
2784 static const _dispatch_kevent_qos_s kev
= {
2786 .filter
= EVFILT_USER
,
2787 .fflags
= NOTE_TRIGGER
,
2790 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2791 _dispatch_debug("waking up the dispatch manager queue: %p", dq
);
2793 _dispatch_kq_deferred_update(&kev
);
2797 _dispatch_mgr_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
2798 dispatch_wakeup_flags_t flags
)
2800 if (flags
& DISPATCH_WAKEUP_FLUSH
) {
2801 os_atomic_or2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, release
);
2804 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
2808 if (!_dispatch_queue_class_probe(&_dispatch_mgr_q
)) {
2812 _dispatch_mgr_queue_poke(dq
, pp
);
2817 _dispatch_event_init(void)
2819 _dispatch_kevent_init();
2820 _dispatch_timers_init();
2821 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
2822 _dispatch_mach_recv_msg_buf_init();
2824 _dispatch_memorypressure_init();
2825 _voucher_activity_debug_channel_init();
2828 #if DISPATCH_USE_MGR_THREAD
2831 _dispatch_mgr_init(void)
2833 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
2834 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2835 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q
,
2836 DISPATCH_INVOKE_STEALING
, NULL
) != owned
) {
2837 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
2839 _dispatch_mgr_priority_init();
2840 _dispatch_event_init();
2845 _dispatch_mgr_wait_for_event(dispatch_deferred_items_t ddi
, bool poll
)
2848 dispatch_assert((size_t)ddi
->ddi_maxevents
< countof(ddi
->ddi_eventlist
));
2851 r
= kevent_qos(_dispatch_kq
, ddi
->ddi_eventlist
, ddi
->ddi_nevents
,
2852 ddi
->ddi_eventlist
+ ddi
->ddi_maxevents
, 1, NULL
, NULL
,
2853 poll
? KEVENT_FLAG_IMMEDIATE
: KEVENT_FLAG_NONE
);
2854 if (slowpath(r
== -1)) {
2860 DISPATCH_CLIENT_CRASH(err
, "Do not close random Unix descriptors");
2863 (void)dispatch_assume_zero(err
);
2867 ddi
->ddi_nevents
= 0;
2871 DISPATCH_NOINLINE DISPATCH_NORETURN
2873 _dispatch_mgr_invoke(void)
2875 dispatch_deferred_items_s ddi
;
2878 ddi
.ddi_magic
= DISPATCH_DEFERRED_ITEMS_MAGIC
;
2879 ddi
.ddi_stashed_pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
2880 ddi
.ddi_nevents
= 0;
2881 ddi
.ddi_maxevents
= 1;
2883 _dispatch_deferred_items_set(&ddi
);
2886 _dispatch_mgr_queue_drain();
2887 poll
= _dispatch_mgr_timers();
2888 poll
= poll
|| _dispatch_queue_class_probe(&_dispatch_mgr_q
);
2889 if (_dispatch_mgr_wait_for_event(&ddi
, poll
)) {
2890 _dispatch_kevent_qos_s
*ke
= ddi
.ddi_eventlist
+ ddi
.ddi_maxevents
;
2891 _dispatch_kevent_debug("received", ke
);
2892 _dispatch_kevent_drain(ke
);
2896 #endif // DISPATCH_USE_MGR_THREAD
2900 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
,
2901 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
2903 #if DISPATCH_USE_KEVENT_WORKQUEUE
2904 if (_dispatch_kevent_workqueue_enabled
) {
2905 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2906 "kevent workqueue enabled");
2909 #if DISPATCH_USE_MGR_THREAD
2910 _dispatch_mgr_init();
2911 // never returns, so burn bridges behind us & clear stack 2k ahead
2912 _dispatch_clear_stack(2048);
2913 _dispatch_mgr_invoke();
2917 #if DISPATCH_USE_KEVENT_WORKQUEUE
2919 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((pthread_priority_t)(~0ul))
2921 DISPATCH_ALWAYS_INLINE
2922 static inline pthread_priority_t
2923 _dispatch_kevent_worker_thread_init(dispatch_deferred_items_t ddi
)
2925 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
2927 ddi
->ddi_magic
= DISPATCH_DEFERRED_ITEMS_MAGIC
;
2928 ddi
->ddi_nevents
= 0;
2929 ddi
->ddi_maxevents
= countof(ddi
->ddi_eventlist
);
2930 ddi
->ddi_stashed_pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
2932 pthread_priority_t pp
= _dispatch_get_priority();
2933 if (!(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2934 // If this thread does not have the event manager flag set, don't setup
2935 // as the dispatch manager and let the caller know to only process
2936 // the delivered events.
2938 // Also add the NEEDS_UNBIND flag so that
2939 // _dispatch_priority_compute_update knows it has to unbind
2940 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2941 pp
|= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2942 _dispatch_thread_setspecific(dispatch_priority_key
,
2943 (void *)(uintptr_t)pp
);
2944 ddi
->ddi_stashed_pp
= 0;
2945 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
;
2948 if ((pp
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) ||
2949 !(pp
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2950 // When the phtread kext is delivering kevents to us, and pthread
2951 // root queues are in use, then the pthread priority TSD is set
2952 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2954 // Given that this isn't a valid QoS we need to fixup the TSD,
2955 // and the best option is to clear the qos/priority bits which tells
2956 // us to not do any QoS related calls on this thread.
2958 // However, in that case the manager thread is opted out of QoS,
2959 // as far as pthread is concerned, and can't be turned into
2960 // something else, so we can't stash.
2961 pp
&= (pthread_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
;
2963 // Managers always park without mutating to a regular worker thread, and
2964 // hence never need to unbind from userland, and when draining a manager,
2965 // the NEEDS_UNBIND flag would cause the mutation to happen.
2966 // So we need to strip this flag
2967 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2968 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
2970 // ensure kevents registered from this thread are registered at manager QoS
2971 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(
2972 (pthread_priority_t
)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
, NULL
);
2973 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2974 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q
,
2975 DISPATCH_INVOKE_STEALING
, NULL
) != owned
) {
2976 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
2978 static int event_thread_init
;
2979 if (!event_thread_init
) {
2980 event_thread_init
= 1;
2981 _dispatch_event_init();
2986 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2988 _dispatch_kevent_worker_thread_reset(pthread_priority_t old_dp
)
2990 dispatch_queue_t dq
= &_dispatch_mgr_q
;
2991 uint64_t orig_dq_state
;
2993 _dispatch_queue_drain_unlock(dq
, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
,
2995 _dispatch_reset_defaultpriority(old_dp
);
2996 _dispatch_queue_set_current(NULL
);
2997 return _dq_state_is_dirty(orig_dq_state
);
3002 _dispatch_kevent_worker_thread(_dispatch_kevent_qos_s
**events
, int *nevents
)
3004 _dispatch_introspection_thread_add();
3006 if (!events
&& !nevents
) {
3007 // events for worker thread request have already been delivered earlier
3011 _dispatch_kevent_qos_s
*ke
= *events
;
3013 if (!dispatch_assume(n
) || !dispatch_assume(*events
)) return;
3015 dispatch_deferred_items_s ddi
;
3016 pthread_priority_t old_dp
= _dispatch_kevent_worker_thread_init(&ddi
);
3018 _dispatch_deferred_items_set(&ddi
);
3019 for (int i
= 0; i
< n
; i
++) {
3020 _dispatch_kevent_debug("received", ke
);
3021 _dispatch_kevent_drain(ke
++);
3024 if (old_dp
!= DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
3025 _dispatch_mgr_queue_drain();
3026 bool poll
= _dispatch_mgr_timers();
3027 if (_dispatch_kevent_worker_thread_reset(old_dp
)) {
3030 if (poll
) _dispatch_mgr_queue_poke(&_dispatch_mgr_q
, 0);
3032 _dispatch_deferred_items_set(NULL
);
3034 if (ddi
.ddi_stashed_pp
& _PTHREAD_PRIORITY_PRIORITY_MASK
) {
3036 if (ddi
.ddi_nevents
) {
3037 _dispatch_kq_update_all(ddi
.ddi_eventlist
, ddi
.ddi_nevents
);
3039 ddi
.ddi_stashed_pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
3040 return _dispatch_root_queue_drain_deferred_item(ddi
.ddi_stashed_dq
,
3041 ddi
.ddi_stashed_dou
, ddi
.ddi_stashed_pp
);
3042 #ifndef WORKQ_KEVENT_EVENT_BUFFER_LEN
3043 } else if (ddi
.ddi_nevents
> *nevents
) {
3045 _dispatch_kq_update_all(ddi
.ddi_eventlist
, ddi
.ddi_nevents
);
3048 *nevents
= ddi
.ddi_nevents
;
3049 dispatch_static_assert(__builtin_types_compatible_p(typeof(**events
),
3050 typeof(*ddi
.ddi_eventlist
)));
3051 memcpy(*events
, ddi
.ddi_eventlist
,
3052 (size_t)ddi
.ddi_nevents
* sizeof(*ddi
.ddi_eventlist
));
3055 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
3058 #pragma mark dispatch_memorypressure
3060 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3061 #define DISPATCH_MEMORYPRESSURE_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYPRESSURE
3062 #define DISPATCH_MEMORYPRESSURE_SOURCE_MASK ( \
3063 DISPATCH_MEMORYPRESSURE_NORMAL | \
3064 DISPATCH_MEMORYPRESSURE_WARN | \
3065 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3066 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3067 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3068 #define DISPATCH_MEMORYPRESSURE_MALLOC_MASK ( \
3069 DISPATCH_MEMORYPRESSURE_WARN | \
3070 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3071 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3072 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3075 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3076 static dispatch_source_t _dispatch_memorypressure_source
;
3079 _dispatch_memorypressure_handler(void *context DISPATCH_UNUSED
)
3081 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3082 unsigned long memorypressure
;
3083 memorypressure
= dispatch_source_get_data(_dispatch_memorypressure_source
);
3085 if (memorypressure
& DISPATCH_MEMORYPRESSURE_NORMAL
) {
3086 _dispatch_memory_warn
= false;
3087 _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
3088 #if VOUCHER_USE_MACH_VOUCHER
3089 if (_firehose_task_buffer
) {
3090 firehose_buffer_clear_bank_flags(_firehose_task_buffer
,
3091 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY
);
3095 if (memorypressure
& DISPATCH_MEMORYPRESSURE_WARN
) {
3096 _dispatch_memory_warn
= true;
3097 _dispatch_continuation_cache_limit
=
3098 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYPRESSURE_PRESSURE_WARN
;
3099 #if VOUCHER_USE_MACH_VOUCHER
3100 if (_firehose_task_buffer
) {
3101 firehose_buffer_set_bank_flags(_firehose_task_buffer
,
3102 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY
);
3106 if (memorypressure
& DISPATCH_MEMORYPRESSURE_MALLOC_MASK
) {
3107 malloc_memory_event_handler(memorypressure
& DISPATCH_MEMORYPRESSURE_MALLOC_MASK
);
3113 _dispatch_memorypressure_init(void)
3115 _dispatch_memorypressure_source
= dispatch_source_create(
3116 DISPATCH_MEMORYPRESSURE_SOURCE_TYPE
, 0,
3117 DISPATCH_MEMORYPRESSURE_SOURCE_MASK
,
3118 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true));
3119 dispatch_source_set_event_handler_f(_dispatch_memorypressure_source
,
3120 _dispatch_memorypressure_handler
);
3121 dispatch_activate(_dispatch_memorypressure_source
);
3124 static inline void _dispatch_memorypressure_init(void) {}
3125 #endif // DISPATCH_USE_MEMORYPRESSURE_SOURCE
3128 #pragma mark dispatch_mach
3132 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
3133 #define _dispatch_debug_machport(name) \
3134 dispatch_debug_machport((name), __func__)
3136 #define _dispatch_debug_machport(name) ((void)(name))
3139 // Flags for all notifications that are registered/unregistered when a
3140 // send-possible notification is requested/delivered
3141 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
3142 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
3143 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
3144 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3145 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3146 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
3147 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3148 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3150 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
3151 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
3152 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
3154 #define _DISPATCH_MACHPORT_HASH_SIZE 32
3155 #define _DISPATCH_MACHPORT_HASH(x) \
3156 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
3158 #ifndef MACH_RCV_VOUCHER
3159 #define MACH_RCV_VOUCHER 0x00000800
3161 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
3162 #define DISPATCH_MACH_RCV_OPTIONS ( \
3163 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
3164 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
3165 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
3168 #define DISPATCH_MACH_NOTIFICATION_ARMED(dk) ((dk)->dk_kevent.ext[0])
3170 static void _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s
*ke
,
3171 mach_msg_header_t
*hdr
);
3172 static void _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s
*ke
,
3173 mach_msg_header_t
*hdr
);
3174 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds
,
3175 dispatch_source_refs_t dr
, dispatch_kevent_t dk
,
3176 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
,
3177 mach_msg_size_t siz
);
3178 static kern_return_t
_dispatch_mach_notify_update(dispatch_kevent_t dk
,
3179 uint32_t new_flags
, uint32_t del_flags
, uint32_t mask
,
3180 mach_msg_id_t notify_msgid
, mach_port_mscount_t notify_sync
);
3181 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
);
3182 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
3183 dispatch_mach_reply_refs_t dmr
, unsigned int options
);
3184 static void _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm
);
3185 static void _dispatch_mach_msg_recv(dispatch_mach_t dm
,
3186 dispatch_mach_reply_refs_t dmr
, _dispatch_kevent_qos_s
*ke
,
3187 mach_msg_header_t
*hdr
, mach_msg_size_t siz
);
3188 static void _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm
,
3189 const _dispatch_kevent_qos_s
*ke
);
3190 static inline mach_msg_option_t
_dispatch_mach_checkin_options(void);
3192 static const size_t _dispatch_mach_recv_msg_size
=
3193 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE
;
3194 static const size_t dispatch_mach_trailer_size
=
3195 sizeof(dispatch_mach_trailer_t
);
3196 static mach_port_t _dispatch_mach_notify_port
;
3197 static dispatch_source_t _dispatch_mach_notify_source
;
3200 _dispatch_kevent_mach_msg_buf(_dispatch_kevent_qos_s
*ke
)
3202 return (void*)ke
->ext
[0];
3205 static inline mach_msg_size_t
3206 _dispatch_kevent_mach_msg_size(_dispatch_kevent_qos_s
*ke
)
3208 // buffer size in the successful receive case, but message size (like
3209 // msgh_size) in the MACH_RCV_TOO_LARGE case, i.e. add trailer size.
3210 return (mach_msg_size_t
)ke
->ext
[1];
3214 _dispatch_source_type_mach_recv_direct_init(dispatch_source_t ds
,
3215 dispatch_source_type_t type DISPATCH_UNUSED
,
3216 uintptr_t handle DISPATCH_UNUSED
,
3217 unsigned long mask DISPATCH_UNUSED
)
3219 ds
->ds_pending_data_mask
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
3220 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3221 if (_dispatch_evfilt_machport_direct_enabled
) return;
3222 ds
->ds_dkev
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
3223 ds
->ds_dkev
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
3224 ds
->ds_is_direct_kevent
= false;
3229 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct
= {
3231 .filter
= EVFILT_MACHPORT
,
3232 .flags
= EV_VANISHED
|EV_DISPATCH
|EV_UDATA_SPECIFIC
,
3233 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
3235 .init
= _dispatch_source_type_mach_recv_direct_init
,
3238 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3239 static mach_port_t _dispatch_mach_portset
, _dispatch_mach_recv_portset
;
3240 static _dispatch_kevent_qos_s _dispatch_mach_recv_kevent
= {
3241 .filter
= EVFILT_MACHPORT
,
3242 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
3243 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
3247 _dispatch_mach_recv_msg_buf_init(void)
3249 if (_dispatch_evfilt_machport_direct_enabled
) return;
3250 mach_vm_size_t vm_size
= mach_vm_round_page(
3251 _dispatch_mach_recv_msg_size
+ dispatch_mach_trailer_size
);
3252 mach_vm_address_t vm_addr
= vm_page_size
;
3255 while (slowpath(kr
= mach_vm_allocate(mach_task_self(), &vm_addr
, vm_size
,
3256 VM_FLAGS_ANYWHERE
))) {
3257 if (kr
!= KERN_NO_SPACE
) {
3258 DISPATCH_CLIENT_CRASH(kr
,
3259 "Could not allocate mach msg receive buffer");
3261 _dispatch_temporary_resource_shortage();
3262 vm_addr
= vm_page_size
;
3264 _dispatch_mach_recv_kevent
.ext
[0] = (uintptr_t)vm_addr
;
3265 _dispatch_mach_recv_kevent
.ext
[1] = vm_size
;
3271 _dispatch_source_merge_mach_msg_direct(dispatch_source_t ds
,
3272 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
)
3274 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
3275 dispatch_queue_t cq
= _dispatch_queue_get_current();
3277 // see firehose_client_push_notify_async
3278 _dispatch_queue_set_current(ds
->_as_dq
);
3280 _dispatch_queue_set_current(cq
);
3281 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3287 _dispatch_source_create_mach_msg_direct_recv(mach_port_t recvp
,
3288 const struct dispatch_continuation_s
*dc
)
3290 dispatch_source_t ds
;
3291 ds
= dispatch_source_create(&_dispatch_source_type_mach_recv_direct
,
3292 recvp
, 0, &_dispatch_mgr_q
);
3293 os_atomic_store(&ds
->ds_refs
->ds_handler
[DS_EVENT_HANDLER
],
3294 (dispatch_continuation_t
)dc
, relaxed
);
3299 _dispatch_mach_notify_port_init(void *context DISPATCH_UNUSED
)
3302 #if HAVE_MACH_PORT_CONSTRUCT
3303 mach_port_options_t opts
= { .flags
= MPO_CONTEXT_AS_GUARD
| MPO_STRICT
};
3305 const mach_port_context_t guard
= 0xfeed09071f1ca7edull
;
3307 const mach_port_context_t guard
= 0xff1ca7edull
;
3309 kr
= mach_port_construct(mach_task_self(), &opts
, guard
,
3310 &_dispatch_mach_notify_port
);
3312 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
3313 &_dispatch_mach_notify_port
);
3315 DISPATCH_VERIFY_MIG(kr
);
3317 DISPATCH_CLIENT_CRASH(kr
,
3318 "mach_port_construct() failed: cannot create receive right");
3321 static const struct dispatch_continuation_s dc
= {
3322 .dc_func
= (void*)_dispatch_mach_notify_source_invoke
,
3324 _dispatch_mach_notify_source
= _dispatch_source_create_mach_msg_direct_recv(
3325 _dispatch_mach_notify_port
, &dc
);
3326 dispatch_assert(_dispatch_mach_notify_source
);
3327 dispatch_activate(_dispatch_mach_notify_source
);
3331 _dispatch_get_mach_notify_port(void)
3333 static dispatch_once_t pred
;
3334 dispatch_once_f(&pred
, NULL
, _dispatch_mach_notify_port_init
);
3335 return _dispatch_mach_notify_port
;
3338 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3340 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED
)
3344 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
3345 &_dispatch_mach_recv_portset
);
3346 DISPATCH_VERIFY_MIG(kr
);
3348 DISPATCH_CLIENT_CRASH(kr
,
3349 "mach_port_allocate() failed: cannot create port set");
3351 _dispatch_kevent_qos_s
*ke
= &_dispatch_mach_recv_kevent
;
3352 dispatch_assert(_dispatch_kevent_mach_msg_buf(ke
));
3353 dispatch_assert(dispatch_mach_trailer_size
==
3354 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
3355 DISPATCH_MACH_RCV_TRAILER
)));
3356 ke
->ident
= _dispatch_mach_recv_portset
;
3357 #if DISPATCH_USE_KEVENT_WORKQUEUE
3358 if (_dispatch_kevent_workqueue_enabled
) {
3359 ke
->qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
3362 _dispatch_kq_immediate_update(&_dispatch_mach_recv_kevent
);
3366 _dispatch_get_mach_recv_portset(void)
3368 static dispatch_once_t pred
;
3369 dispatch_once_f(&pred
, NULL
, _dispatch_mach_recv_portset_init
);
3370 return _dispatch_mach_recv_portset
;
3374 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED
)
3376 _dispatch_kevent_qos_s kev
= {
3377 .filter
= EVFILT_MACHPORT
,
3380 #if DISPATCH_USE_KEVENT_WORKQUEUE
3381 if (_dispatch_kevent_workqueue_enabled
) {
3382 kev
.qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
3388 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
3389 &_dispatch_mach_portset
);
3390 DISPATCH_VERIFY_MIG(kr
);
3392 DISPATCH_CLIENT_CRASH(kr
,
3393 "mach_port_allocate() failed: cannot create port set");
3395 kev
.ident
= _dispatch_mach_portset
;
3396 _dispatch_kq_immediate_update(&kev
);
3400 _dispatch_get_mach_portset(void)
3402 static dispatch_once_t pred
;
3403 dispatch_once_f(&pred
, NULL
, _dispatch_mach_portset_init
);
3404 return _dispatch_mach_portset
;
3407 static kern_return_t
3408 _dispatch_mach_portset_update(dispatch_kevent_t dk
, mach_port_t mps
)
3410 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
3413 _dispatch_debug_machport(mp
);
3414 kr
= mach_port_move_member(mach_task_self(), mp
, mps
);
3416 DISPATCH_VERIFY_MIG(kr
);
3418 case KERN_INVALID_RIGHT
:
3420 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
3421 "mach_port_move_member() failed ", kr
);
3425 case KERN_INVALID_NAME
:
3427 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
3432 (void)dispatch_assume_zero(kr
);
3436 return mps
? kr
: 0;
3439 static kern_return_t
3440 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
3443 kern_return_t kr
= 0;
3444 dispatch_assert_zero(new_flags
& del_flags
);
3445 if ((new_flags
& _DISPATCH_MACH_RECV_FLAGS
) ||
3446 (del_flags
& _DISPATCH_MACH_RECV_FLAGS
)) {
3448 if (new_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
3449 mps
= _dispatch_get_mach_recv_portset();
3450 } else if ((new_flags
& DISPATCH_MACH_RECV_MESSAGE
) ||
3451 ((del_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) &&
3452 (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE
))) {
3453 mps
= _dispatch_get_mach_portset();
3455 mps
= MACH_PORT_NULL
;
3457 kr
= _dispatch_mach_portset_update(dk
, mps
);
3461 #endif // DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3463 static kern_return_t
3464 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
3467 kern_return_t kr
= 0;
3468 dispatch_assert_zero(new_flags
& del_flags
);
3469 if ((new_flags
& _DISPATCH_MACH_SP_FLAGS
) ||
3470 (del_flags
& _DISPATCH_MACH_SP_FLAGS
)) {
3471 // Requesting a (delayed) non-sync send-possible notification
3472 // registers for both immediate dead-name notification and delayed-arm
3473 // send-possible notification for the port.
3474 // The send-possible notification is armed when a mach_msg() with the
3475 // the MACH_SEND_NOTIFY to the port times out.
3476 // If send-possible is unavailable, fall back to immediate dead-name
3477 // registration rdar://problem/2527840&9008724
3478 kr
= _dispatch_mach_notify_update(dk
, new_flags
, del_flags
,
3479 _DISPATCH_MACH_SP_FLAGS
, MACH_NOTIFY_SEND_POSSIBLE
,
3480 MACH_NOTIFY_SEND_POSSIBLE
== MACH_NOTIFY_DEAD_NAME
? 1 : 0);
3485 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3488 _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s
*ke
)
3490 mach_port_t name
= (mach_port_name_t
)ke
->data
;
3491 dispatch_kevent_t dk
;
3493 _dispatch_debug_machport(name
);
3494 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
3495 if (!dispatch_assume(dk
)) {
3498 _dispatch_mach_portset_update(dk
, MACH_PORT_NULL
); // emulate EV_DISPATCH
3500 _dispatch_kevent_qos_s kev
= {
3502 .filter
= EVFILT_MACHPORT
,
3503 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
3504 .fflags
= DISPATCH_MACH_RECV_MESSAGE
,
3505 .udata
= (uintptr_t)dk
,
3507 _dispatch_kevent_debug("synthetic", &kev
);
3508 _dispatch_kevent_merge(&kev
);
3514 _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s
*ke
)
3516 mach_msg_header_t
*hdr
= _dispatch_kevent_mach_msg_buf(ke
);
3517 mach_msg_size_t siz
;
3518 mach_msg_return_t kr
= (mach_msg_return_t
)ke
->fflags
;
3520 if (!fastpath(hdr
)) {
3521 DISPATCH_INTERNAL_CRASH(kr
, "EVFILT_MACHPORT with no message");
3523 if (fastpath(!kr
)) {
3524 _dispatch_kevent_mach_msg_recv(ke
, hdr
);
3526 } else if (kr
!= MACH_RCV_TOO_LARGE
) {
3528 } else if (!ke
->data
) {
3529 DISPATCH_INTERNAL_CRASH(0, "MACH_RCV_LARGE_IDENTITY with no identity");
3531 if (slowpath(ke
->ext
[1] > (UINT_MAX
- dispatch_mach_trailer_size
))) {
3532 DISPATCH_INTERNAL_CRASH(ke
->ext
[1],
3533 "EVFILT_MACHPORT with overlarge message");
3535 siz
= _dispatch_kevent_mach_msg_size(ke
) + dispatch_mach_trailer_size
;
3537 if (!dispatch_assume(hdr
)) {
3538 // Kernel will discard message too large to fit
3542 mach_port_t name
= (mach_port_name_t
)ke
->data
;
3543 const mach_msg_option_t options
= ((DISPATCH_MACH_RCV_OPTIONS
|
3544 MACH_RCV_TIMEOUT
) & ~MACH_RCV_LARGE
);
3545 kr
= mach_msg(hdr
, options
, 0, siz
, name
, MACH_MSG_TIMEOUT_NONE
,
3547 if (fastpath(!kr
)) {
3548 _dispatch_kevent_mach_msg_recv(ke
, hdr
);
3550 } else if (kr
== MACH_RCV_TOO_LARGE
) {
3551 _dispatch_log("BUG in libdispatch client: "
3552 "_dispatch_kevent_mach_msg_drain: dropped message too "
3553 "large to fit in memory: id = 0x%x, size = %u",
3554 hdr
->msgh_id
, _dispatch_kevent_mach_msg_size(ke
));
3555 kr
= MACH_MSG_SUCCESS
;
3557 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3562 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
3563 "message reception failed", kr
);
3569 _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s
*ke
)
3571 if (unlikely(!(ke
->flags
& EV_UDATA_SPECIFIC
))) {
3572 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3573 if (ke
->ident
== _dispatch_mach_recv_portset
) {
3574 _dispatch_kevent_mach_msg_drain(ke
);
3575 return _dispatch_kq_deferred_update(&_dispatch_mach_recv_kevent
);
3576 } else if (ke
->ident
== _dispatch_mach_portset
) {
3577 return _dispatch_kevent_machport_drain(ke
);
3580 return _dispatch_kevent_error(ke
);
3583 dispatch_kevent_t dk
= (dispatch_kevent_t
)ke
->udata
;
3584 dispatch_source_refs_t dr
= TAILQ_FIRST(&dk
->dk_sources
);
3585 bool is_reply
= (dk
->dk_kevent
.flags
& EV_ONESHOT
);
3586 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
3588 if (_dispatch_kevent_mach_msg_size(ke
)) {
3589 _dispatch_kevent_mach_msg_drain(ke
);
3591 // _dispatch_kevent_mach_msg_drain() should have deleted this event
3592 dispatch_assert(ke
->flags
& EV_DELETE
);
3596 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
3597 // re-arm the mach channel
3598 ke
->fflags
= DISPATCH_MACH_RCV_OPTIONS
;
3602 return _dispatch_kq_deferred_update(ke
);
3604 } else if (is_reply
) {
3605 DISPATCH_INTERNAL_CRASH(ke
->flags
, "Unexpected EVFILT_MACHPORT event");
3607 if (unlikely((ke
->flags
& EV_VANISHED
) &&
3608 (dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
))) {
3609 DISPATCH_CLIENT_CRASH(ke
->flags
,
3610 "Unexpected EV_VANISHED (do not destroy random mach ports)");
3612 return _dispatch_kevent_merge(ke
);
3616 _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s
*ke
,
3617 mach_msg_header_t
*hdr
)
3619 dispatch_source_refs_t dri
;
3620 dispatch_kevent_t dk
;
3621 mach_port_t name
= hdr
->msgh_local_port
;
3622 mach_msg_size_t siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
3624 if (!dispatch_assume(hdr
->msgh_size
<= UINT_MAX
-
3625 dispatch_mach_trailer_size
)) {
3626 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3627 "received overlarge message");
3628 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3630 if (!dispatch_assume(name
)) {
3631 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3632 "received message with MACH_PORT_NULL port");
3633 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3635 _dispatch_debug_machport(name
);
3636 if (ke
->flags
& EV_UDATA_SPECIFIC
) {
3637 dk
= (void*)ke
->udata
;
3639 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
3641 if (!dispatch_assume(dk
)) {
3642 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3643 "received message with unknown kevent");
3644 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3646 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
3647 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3648 if (dsi
->ds_pending_data_mask
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
3649 return _dispatch_source_merge_mach_msg(dsi
, dri
, dk
, ke
, hdr
, siz
);
3652 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3653 "received message with no listeners");
3654 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3658 _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s
*ke
,
3659 mach_msg_header_t
*hdr
)
3662 mach_msg_destroy(hdr
);
3663 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3670 _dispatch_source_merge_mach_msg(dispatch_source_t ds
, dispatch_source_refs_t dr
,
3671 dispatch_kevent_t dk
, _dispatch_kevent_qos_s
*ke
,
3672 mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
3674 if (dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
) {
3675 return _dispatch_source_merge_mach_msg_direct(ds
, ke
, hdr
);
3677 dispatch_mach_reply_refs_t dmr
= NULL
;
3678 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
3679 dmr
= (dispatch_mach_reply_refs_t
)dr
;
3681 return _dispatch_mach_msg_recv((dispatch_mach_t
)ds
, dmr
, ke
, hdr
, siz
);
3686 _dispatch_mach_notify_merge(mach_port_t name
, uint32_t flag
, bool final
)
3688 dispatch_source_refs_t dri
, dr_next
;
3689 dispatch_kevent_t dk
;
3692 dk
= _dispatch_kevent_find(name
, DISPATCH_EVFILT_MACH_NOTIFICATION
);
3697 // Update notification registration state.
3698 dk
->dk_kevent
.data
&= ~_DISPATCH_MACH_SP_FLAGS
;
3699 _dispatch_kevent_qos_s kev
= {
3701 .filter
= DISPATCH_EVFILT_MACH_NOTIFICATION
,
3702 .flags
= EV_ADD
|EV_ENABLE
,
3704 .udata
= (uintptr_t)dk
,
3707 // This can never happen again
3710 // Re-register for notification before delivery
3711 unreg
= _dispatch_kevent_resume(dk
, flag
, 0);
3713 DISPATCH_MACH_NOTIFICATION_ARMED(dk
) = 0;
3714 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
3715 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3716 if (dx_type(dsi
) == DISPATCH_MACH_CHANNEL_TYPE
) {
3717 dispatch_mach_t dm
= (dispatch_mach_t
)dsi
;
3718 _dispatch_mach_merge_notification_kevent(dm
, &kev
);
3719 if (unreg
&& dm
->dm_dkev
) {
3720 _dispatch_mach_notification_kevent_unregister(dm
);
3723 _dispatch_source_merge_kevent(dsi
, &kev
);
3725 _dispatch_source_kevent_unregister(dsi
);
3728 if (!dr_next
|| DISPATCH_MACH_NOTIFICATION_ARMED(dk
)) {
3729 // current merge is last in list (dk might have been freed)
3730 // or it re-armed the notification
3736 static kern_return_t
3737 _dispatch_mach_notify_update(dispatch_kevent_t dk
, uint32_t new_flags
,
3738 uint32_t del_flags
, uint32_t mask
, mach_msg_id_t notify_msgid
,
3739 mach_port_mscount_t notify_sync
)
3741 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
3742 typeof(dk
->dk_kevent
.data
) prev
= dk
->dk_kevent
.data
;
3743 kern_return_t kr
, krr
= 0;
3745 // Update notification registration state.
3746 dk
->dk_kevent
.data
|= (new_flags
| dk
->dk_kevent
.fflags
) & mask
;
3747 dk
->dk_kevent
.data
&= ~(del_flags
& mask
);
3749 _dispatch_debug_machport(port
);
3750 if ((dk
->dk_kevent
.data
& mask
) && !(prev
& mask
)) {
3751 _dispatch_debug("machport[0x%08x]: registering for send-possible "
3752 "notification", port
);
3753 previous
= MACH_PORT_NULL
;
3754 krr
= mach_port_request_notification(mach_task_self(), port
,
3755 notify_msgid
, notify_sync
, _dispatch_get_mach_notify_port(),
3756 MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
3757 DISPATCH_VERIFY_MIG(krr
);
3760 case KERN_INVALID_NAME
:
3761 case KERN_INVALID_RIGHT
:
3762 // Suppress errors & clear registration state
3763 dk
->dk_kevent
.data
&= ~mask
;
3766 // Else, we don't expect any errors from mach. Log any errors
3767 if (dispatch_assume_zero(krr
)) {
3768 // log the error & clear registration state
3769 dk
->dk_kevent
.data
&= ~mask
;
3770 } else if (dispatch_assume_zero(previous
)) {
3771 // Another subsystem has beat libdispatch to requesting the
3772 // specified Mach notification on this port. We should
3773 // technically cache the previous port and message it when the
3774 // kernel messages our port. Or we can just say screw those
3775 // subsystems and deallocate the previous port.
3776 // They should adopt libdispatch :-P
3777 kr
= mach_port_deallocate(mach_task_self(), previous
);
3778 DISPATCH_VERIFY_MIG(kr
);
3779 (void)dispatch_assume_zero(kr
);
3780 previous
= MACH_PORT_NULL
;
3783 } else if (!(dk
->dk_kevent
.data
& mask
) && (prev
& mask
)) {
3784 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
3785 "notification", port
);
3786 previous
= MACH_PORT_NULL
;
3787 kr
= mach_port_request_notification(mach_task_self(), port
,
3788 notify_msgid
, notify_sync
, MACH_PORT_NULL
,
3789 MACH_MSG_TYPE_MOVE_SEND_ONCE
, &previous
);
3790 DISPATCH_VERIFY_MIG(kr
);
3793 case KERN_INVALID_NAME
:
3794 case KERN_INVALID_RIGHT
:
3795 case KERN_INVALID_ARGUMENT
:
3798 if (dispatch_assume_zero(kr
)) {
3805 if (slowpath(previous
)) {
3806 // the kernel has not consumed the send-once right yet
3807 (void)dispatch_assume_zero(
3808 _dispatch_send_consume_send_once_right(previous
));
3814 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED
)
3816 static int notify_type
= HOST_NOTIFY_CALENDAR_SET
;
3818 _dispatch_debug("registering for calendar-change notification");
3820 kr
= host_request_notification(_dispatch_get_mach_host_port(),
3821 notify_type
, _dispatch_get_mach_notify_port());
3822 // Fallback when missing support for newer _SET variant, fires strictly more.
3823 if (kr
== KERN_INVALID_ARGUMENT
&&
3824 notify_type
!= HOST_NOTIFY_CALENDAR_CHANGE
){
3825 notify_type
= HOST_NOTIFY_CALENDAR_CHANGE
;
3828 DISPATCH_VERIFY_MIG(kr
);
3829 (void)dispatch_assume_zero(kr
);
3833 _dispatch_mach_host_calendar_change_register(void)
3835 static dispatch_once_t pred
;
3836 dispatch_once_f(&pred
, NULL
, _dispatch_mach_host_notify_update
);
3840 _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
)
3842 mig_reply_error_t reply
;
3843 dispatch_assert(sizeof(mig_reply_error_t
) == sizeof(union
3844 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
));
3845 dispatch_assert(sizeof(mig_reply_error_t
) < _dispatch_mach_recv_msg_size
);
3846 boolean_t success
= libdispatch_internal_protocol_server(hdr
, &reply
.Head
);
3847 if (!success
&& reply
.RetCode
== MIG_BAD_ID
&&
3848 (hdr
->msgh_id
== HOST_CALENDAR_SET_REPLYID
||
3849 hdr
->msgh_id
== HOST_CALENDAR_CHANGED_REPLYID
)) {
3850 _dispatch_debug("calendar-change notification");
3851 _dispatch_timers_calendar_change();
3852 _dispatch_mach_host_notify_update(NULL
);
3854 reply
.RetCode
= KERN_SUCCESS
;
3856 if (dispatch_assume(success
) && reply
.RetCode
!= MIG_NO_REPLY
) {
3857 (void)dispatch_assume_zero(reply
.RetCode
);
3859 if (!success
|| (reply
.RetCode
&& reply
.RetCode
!= MIG_NO_REPLY
)) {
3860 mach_msg_destroy(hdr
);
3865 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED
,
3866 mach_port_name_t name
)
3869 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
3870 "deleted prematurely", name
);
3873 _dispatch_debug_machport(name
);
3874 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DELETED
, true);
3876 return KERN_SUCCESS
;
3880 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED
,
3881 mach_port_name_t name
)
3885 _dispatch_debug("machport[0x%08x]: dead-name notification", name
);
3886 _dispatch_debug_machport(name
);
3887 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DEAD
, true);
3889 // the act of receiving a dead name notification allocates a dead-name
3890 // right that must be deallocated
3891 kr
= mach_port_deallocate(mach_task_self(), name
);
3892 DISPATCH_VERIFY_MIG(kr
);
3893 //(void)dispatch_assume_zero(kr);
3895 return KERN_SUCCESS
;
3899 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED
,
3900 mach_port_name_t name
)
3902 _dispatch_debug("machport[0x%08x]: send-possible notification", name
);
3903 _dispatch_debug_machport(name
);
3904 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_POSSIBLE
, false);
3906 return KERN_SUCCESS
;
3910 #pragma mark dispatch_mach_t
3912 #define DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT 0x1
3913 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3914 #define DISPATCH_MACH_WAIT_FOR_REPLY 0x4
3915 #define DISPATCH_MACH_OWNED_REPLY_PORT 0x8
3916 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3918 #define DM_SEND_STATUS_SUCCESS 0x1
3919 #define DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT 0x2
3921 DISPATCH_ENUM(dispatch_mach_send_invoke_flags
, uint32_t,
3922 DM_SEND_INVOKE_NONE
= 0x0,
3923 DM_SEND_INVOKE_FLUSH
= 0x1,
3924 DM_SEND_INVOKE_NEEDS_BARRIER
= 0x2,
3925 DM_SEND_INVOKE_CANCEL
= 0x4,
3926 DM_SEND_INVOKE_CAN_RUN_BARRIER
= 0x8,
3927 DM_SEND_INVOKE_IMMEDIATE_SEND
= 0x10,
3929 #define DM_SEND_INVOKE_IMMEDIATE_SEND_MASK \
3930 ((dispatch_mach_send_invoke_flags_t)DM_SEND_INVOKE_IMMEDIATE_SEND)
3932 static inline pthread_priority_t
_dispatch_mach_priority_propagate(
3933 mach_msg_option_t options
);
3934 static mach_port_t
_dispatch_mach_msg_get_remote_port(dispatch_object_t dou
);
3935 static mach_port_t
_dispatch_mach_msg_get_reply_port(dispatch_object_t dou
);
3936 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm
,
3937 mach_port_t local_port
, mach_port_t remote_port
);
3938 static inline void _dispatch_mach_msg_reply_received(dispatch_mach_t dm
,
3939 dispatch_mach_reply_refs_t dmr
, mach_port_t local_port
);
3940 static dispatch_mach_msg_t
_dispatch_mach_msg_create_reply_disconnected(
3941 dispatch_object_t dou
, dispatch_mach_reply_refs_t dmr
);
3942 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
,
3943 dispatch_object_t dou
);
3944 static inline mach_msg_header_t
* _dispatch_mach_msg_get_msg(
3945 dispatch_mach_msg_t dmsg
);
3946 static void _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_object_t dou
,
3947 pthread_priority_t pp
);
3949 static dispatch_mach_t
3950 _dispatch_mach_create(const char *label
, dispatch_queue_t q
, void *context
,
3951 dispatch_mach_handler_function_t handler
, bool handler_is_block
)
3954 dispatch_mach_refs_t dr
;
3956 dm
= _dispatch_alloc(DISPATCH_VTABLE(mach
),
3957 sizeof(struct dispatch_mach_s
));
3958 _dispatch_queue_init(dm
->_as_dq
, DQF_NONE
, 1, true);
3960 dm
->dq_label
= label
;
3961 dm
->do_ref_cnt
++; // the reference _dispatch_mach_cancel_invoke holds
3963 dr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s
));
3964 dr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3965 dr
->dm_handler_func
= handler
;
3966 dr
->dm_handler_ctxt
= context
;
3968 dm
->dm_handler_is_block
= handler_is_block
;
3970 dm
->dm_refs
= _dispatch_calloc(1ul,
3971 sizeof(struct dispatch_mach_send_refs_s
));
3972 dm
->dm_refs
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3973 dm
->dm_refs
->dm_disconnect_cnt
= DISPATCH_MACH_NEVER_CONNECTED
;
3974 TAILQ_INIT(&dm
->dm_refs
->dm_replies
);
3977 q
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
3979 _dispatch_retain(q
);
3982 _dispatch_object_debug(dm
, "%s", __func__
);
3987 dispatch_mach_create(const char *label
, dispatch_queue_t q
,
3988 dispatch_mach_handler_t handler
)
3990 dispatch_block_t bb
= _dispatch_Block_copy((void*)handler
);
3991 return _dispatch_mach_create(label
, q
, bb
,
3992 (dispatch_mach_handler_function_t
)_dispatch_Block_invoke(bb
), true);
3996 dispatch_mach_create_f(const char *label
, dispatch_queue_t q
, void *context
,
3997 dispatch_mach_handler_function_t handler
)
3999 return _dispatch_mach_create(label
, q
, context
, handler
, false);
4003 _dispatch_mach_dispose(dispatch_mach_t dm
)
4005 _dispatch_object_debug(dm
, "%s", __func__
);
4006 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4007 if (dm
->dm_handler_is_block
&& dr
->dm_handler_ctxt
) {
4008 Block_release(dr
->dm_handler_ctxt
);
4012 _dispatch_queue_destroy(dm
->_as_dq
);
4016 dispatch_mach_connect(dispatch_mach_t dm
, mach_port_t receive
,
4017 mach_port_t send
, dispatch_mach_msg_t checkin
)
4019 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4020 dispatch_kevent_t dk
;
4021 uint32_t disconnect_cnt
;
4022 dispatch_source_type_t type
= &_dispatch_source_type_mach_recv_direct
;
4024 dm
->ds_is_direct_kevent
= (bool)_dispatch_evfilt_machport_direct_enabled
;
4025 if (MACH_PORT_VALID(receive
)) {
4026 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4027 dk
->dk_kevent
= type
->ke
;
4028 dk
->dk_kevent
.ident
= receive
;
4029 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
|EV_VANISHED
;
4030 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4031 TAILQ_INIT(&dk
->dk_sources
);
4033 dm
->ds_pending_data_mask
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
4034 dm
->ds_needs_rearm
= dm
->ds_is_direct_kevent
;
4035 if (!dm
->ds_is_direct_kevent
) {
4036 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
4037 dk
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
4039 _dispatch_retain(dm
); // the reference the manager queue holds
4042 if (MACH_PORT_VALID(send
)) {
4044 dispatch_retain(checkin
);
4045 checkin
->dmsg_options
= _dispatch_mach_checkin_options();
4046 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
4048 dr
->dm_checkin
= checkin
;
4050 // monitor message reply ports
4051 dm
->ds_pending_data_mask
|= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4052 dispatch_assert(DISPATCH_MACH_NEVER_CONNECTED
- 1 ==
4053 DISPATCH_MACH_NEVER_INSTALLED
);
4054 disconnect_cnt
= os_atomic_dec2o(dr
, dm_disconnect_cnt
, release
);
4055 if (unlikely(disconnect_cnt
!= DISPATCH_MACH_NEVER_INSTALLED
)) {
4056 DISPATCH_CLIENT_CRASH(disconnect_cnt
, "Channel already connected");
4058 _dispatch_object_debug(dm
, "%s", __func__
);
4059 return dispatch_activate(dm
);
4062 // assumes low bit of mach port names is always set
4063 #define DISPATCH_MACH_REPLY_PORT_UNOWNED 0x1u
4066 _dispatch_mach_reply_mark_reply_port_owned(dispatch_mach_reply_refs_t dmr
)
4068 dmr
->dmr_reply
&= ~DISPATCH_MACH_REPLY_PORT_UNOWNED
;
4072 _dispatch_mach_reply_is_reply_port_owned(dispatch_mach_reply_refs_t dmr
)
4074 mach_port_t reply_port
= dmr
->dmr_reply
;
4075 return reply_port
? !(reply_port
& DISPATCH_MACH_REPLY_PORT_UNOWNED
) :false;
4078 static inline mach_port_t
4079 _dispatch_mach_reply_get_reply_port(dispatch_mach_reply_refs_t dmr
)
4081 mach_port_t reply_port
= dmr
->dmr_reply
;
4082 return reply_port
? (reply_port
| DISPATCH_MACH_REPLY_PORT_UNOWNED
) : 0;
4086 _dispatch_mach_reply_tryremove(dispatch_mach_t dm
,
4087 dispatch_mach_reply_refs_t dmr
)
4090 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4091 if ((removed
= _TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4092 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4093 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4095 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4101 _dispatch_mach_reply_waiter_unregister(dispatch_mach_t dm
,
4102 dispatch_mach_reply_refs_t dmr
, unsigned int options
)
4104 dispatch_mach_msg_t dmsgr
= NULL
;
4105 bool disconnected
= (options
& DKEV_UNREGISTER_DISCONNECTED
);
4106 if (options
& DKEV_UNREGISTER_REPLY_REMOVE
) {
4107 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4108 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4109 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4111 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4112 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4113 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4116 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(NULL
, dmr
);
4117 } else if (dmr
->dmr_voucher
) {
4118 _voucher_release(dmr
->dmr_voucher
);
4119 dmr
->dmr_voucher
= NULL
;
4121 _dispatch_debug("machport[0x%08x]: unregistering for sync reply%s, ctxt %p",
4122 _dispatch_mach_reply_get_reply_port(dmr
),
4123 disconnected
? " (disconnected)" : "", dmr
->dmr_ctxt
);
4125 return _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4127 dispatch_assert(!(options
& DKEV_UNREGISTER_WAKEUP
));
4132 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
4133 dispatch_mach_reply_refs_t dmr
, unsigned int options
)
4135 dispatch_mach_msg_t dmsgr
= NULL
;
4136 bool replies_empty
= false;
4137 bool disconnected
= (options
& DKEV_UNREGISTER_DISCONNECTED
);
4138 if (options
& DKEV_UNREGISTER_REPLY_REMOVE
) {
4139 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4140 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4141 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4143 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4144 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4145 replies_empty
= TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
);
4146 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4149 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(NULL
, dmr
);
4150 } else if (dmr
->dmr_voucher
) {
4151 _voucher_release(dmr
->dmr_voucher
);
4152 dmr
->dmr_voucher
= NULL
;
4154 uint32_t flags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4155 dispatch_kevent_t dk
= dmr
->dmr_dkev
;
4156 _dispatch_debug("machport[0x%08x]: unregistering for reply%s, ctxt %p",
4157 (mach_port_t
)dk
->dk_kevent
.ident
,
4158 disconnected
? " (disconnected)" : "", dmr
->dmr_ctxt
);
4159 if (!dm
->ds_is_direct_kevent
) {
4160 dmr
->dmr_dkev
= NULL
;
4161 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dmr
, dr_list
);
4162 _dispatch_kevent_unregister(dk
, flags
, 0);
4164 long r
= _dispatch_kevent_unregister(dk
, flags
, options
);
4165 if (r
== EINPROGRESS
) {
4166 _dispatch_debug("machport[0x%08x]: deferred delete kevent[%p]",
4167 (mach_port_t
)dk
->dk_kevent
.ident
, dk
);
4168 dispatch_assert(options
== DKEV_UNREGISTER_DISCONNECTED
);
4169 // dmr must be put back so that the event delivery finds it, the
4170 // replies lock is held by the caller.
4171 TAILQ_INSERT_HEAD(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4173 dmr
->dmr_voucher
= dmsgr
->dmsg_voucher
;
4174 dmsgr
->dmsg_voucher
= NULL
;
4175 dispatch_release(dmsgr
);
4177 return; // deferred unregistration
4179 dispatch_assume_zero(r
);
4180 dmr
->dmr_dkev
= NULL
;
4181 _TAILQ_TRASH_ENTRY(dmr
, dr_list
);
4185 return _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4187 if ((options
& DKEV_UNREGISTER_WAKEUP
) && replies_empty
&&
4188 (dm
->dm_refs
->dm_disconnect_cnt
||
4189 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
4190 dx_wakeup(dm
, 0, DISPATCH_WAKEUP_FLUSH
);
4196 _dispatch_mach_reply_waiter_register(dispatch_mach_t dm
,
4197 dispatch_mach_reply_refs_t dmr
, mach_port_t reply_port
,
4198 dispatch_mach_msg_t dmsg
, mach_msg_option_t msg_opts
)
4200 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4201 dmr
->dmr_dkev
= NULL
;
4202 dmr
->dmr_reply
= reply_port
;
4203 if (msg_opts
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
4204 _dispatch_mach_reply_mark_reply_port_owned(dmr
);
4206 if (dmsg
->dmsg_voucher
) {
4207 dmr
->dmr_voucher
= _voucher_retain(dmsg
->dmsg_voucher
);
4209 dmr
->dmr_priority
= (dispatch_priority_t
)dmsg
->dmsg_priority
;
4210 // make reply context visible to leaks rdar://11777199
4211 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
4214 _dispatch_debug("machport[0x%08x]: registering for sync reply, ctxt %p",
4215 reply_port
, dmsg
->do_ctxt
);
4216 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4217 if (unlikely(_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4218 DISPATCH_INTERNAL_CRASH(dmr
->dmr_list
.tqe_prev
, "Reply already registered");
4220 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4221 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4226 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm
, mach_port_t reply_port
,
4227 dispatch_mach_msg_t dmsg
)
4229 dispatch_kevent_t dk
;
4230 dispatch_mach_reply_refs_t dmr
;
4231 dispatch_source_type_t type
= &_dispatch_source_type_mach_recv_direct
;
4232 pthread_priority_t mp
, pp
;
4234 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4235 dk
->dk_kevent
= type
->ke
;
4236 dk
->dk_kevent
.ident
= reply_port
;
4237 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
|EV_ONESHOT
;
4238 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4239 TAILQ_INIT(&dk
->dk_sources
);
4240 if (!dm
->ds_is_direct_kevent
) {
4241 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4242 dk
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
4245 dmr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s
));
4246 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4248 dmr
->dmr_reply
= reply_port
;
4249 if (dmsg
->dmsg_voucher
) {
4250 dmr
->dmr_voucher
= _voucher_retain(dmsg
->dmsg_voucher
);
4252 dmr
->dmr_priority
= (dispatch_priority_t
)dmsg
->dmsg_priority
;
4253 // make reply context visible to leaks rdar://11777199
4254 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
4256 pp
= dm
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
4257 if (pp
&& dm
->ds_is_direct_kevent
) {
4258 mp
= dmsg
->dmsg_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
4259 if (pp
< mp
) pp
= mp
;
4260 pp
|= dm
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4262 pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
4265 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p",
4266 reply_port
, dmsg
->do_ctxt
);
4268 bool do_resume
= _dispatch_kevent_register(&dmr
->dmr_dkev
, pp
, &flags
);
4269 TAILQ_INSERT_TAIL(&dmr
->dmr_dkev
->dk_sources
, (dispatch_source_refs_t
)dmr
,
4271 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4272 if (unlikely(_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4273 DISPATCH_INTERNAL_CRASH(dmr
->dmr_list
.tqe_prev
, "Reply already registered");
4275 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4276 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4277 if (do_resume
&& _dispatch_kevent_resume(dmr
->dmr_dkev
, flags
, 0)) {
4278 return _dispatch_mach_reply_kevent_unregister(dm
, dmr
,
4279 DKEV_UNREGISTER_DISCONNECTED
|DKEV_UNREGISTER_REPLY_REMOVE
);
4285 _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm
)
4287 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4288 dispatch_kevent_t dk
= dm
->dm_dkev
;
4290 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dm
->dm_refs
,
4292 dm
->ds_pending_data_mask
&= ~(unsigned long)
4293 (DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
);
4294 _dispatch_kevent_unregister(dk
,
4295 DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
, 0);
4300 _dispatch_mach_notification_kevent_register(dispatch_mach_t dm
,mach_port_t send
)
4302 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4303 dispatch_kevent_t dk
;
4305 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4306 dk
->dk_kevent
= _dispatch_source_type_mach_send
.ke
;
4307 dk
->dk_kevent
.ident
= send
;
4308 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
4309 dk
->dk_kevent
.fflags
= DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
;
4310 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4311 TAILQ_INIT(&dk
->dk_sources
);
4313 dm
->ds_pending_data_mask
|= dk
->dk_kevent
.fflags
;
4316 bool do_resume
= _dispatch_kevent_register(&dk
,
4317 _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
, &flags
);
4318 TAILQ_INSERT_TAIL(&dk
->dk_sources
,
4319 (dispatch_source_refs_t
)dm
->dm_refs
, dr_list
);
4321 if (do_resume
&& _dispatch_kevent_resume(dm
->dm_dkev
, flags
, 0)) {
4322 _dispatch_mach_notification_kevent_unregister(dm
);
4327 _dispatch_get_thread_reply_port(void)
4329 mach_port_t reply_port
, mrp
= _dispatch_get_thread_mig_reply_port();
4332 _dispatch_debug("machport[0x%08x]: borrowed thread sync reply port",
4335 reply_port
= mach_reply_port();
4336 _dispatch_set_thread_mig_reply_port(reply_port
);
4337 _dispatch_debug("machport[0x%08x]: allocated thread sync reply port",
4340 _dispatch_debug_machport(reply_port
);
4345 _dispatch_clear_thread_reply_port(mach_port_t reply_port
)
4347 mach_port_t mrp
= _dispatch_get_thread_mig_reply_port();
4348 if (reply_port
!= mrp
) {
4350 _dispatch_debug("machport[0x%08x]: did not clear thread sync reply "
4351 "port (found 0x%08x)", reply_port
, mrp
);
4355 _dispatch_set_thread_mig_reply_port(MACH_PORT_NULL
);
4356 _dispatch_debug_machport(reply_port
);
4357 _dispatch_debug("machport[0x%08x]: cleared thread sync reply port",
4362 _dispatch_set_thread_reply_port(mach_port_t reply_port
)
4364 _dispatch_debug_machport(reply_port
);
4365 mach_port_t mrp
= _dispatch_get_thread_mig_reply_port();
4367 kern_return_t kr
= mach_port_mod_refs(mach_task_self(), reply_port
,
4368 MACH_PORT_RIGHT_RECEIVE
, -1);
4369 DISPATCH_VERIFY_MIG(kr
);
4370 dispatch_assume_zero(kr
);
4371 _dispatch_debug("machport[0x%08x]: deallocated sync reply port "
4372 "(found 0x%08x)", reply_port
, mrp
);
4374 _dispatch_set_thread_mig_reply_port(reply_port
);
4375 _dispatch_debug("machport[0x%08x]: restored thread sync reply port",
4380 static inline mach_port_t
4381 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou
)
4383 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
4384 mach_port_t remote
= hdr
->msgh_remote_port
;
4388 static inline mach_port_t
4389 _dispatch_mach_msg_get_reply_port(dispatch_object_t dou
)
4391 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
4392 mach_port_t local
= hdr
->msgh_local_port
;
4393 if (!MACH_PORT_VALID(local
) || MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
) !=
4394 MACH_MSG_TYPE_MAKE_SEND_ONCE
) return MACH_PORT_NULL
;
4399 _dispatch_mach_msg_set_reason(dispatch_mach_msg_t dmsg
, mach_error_t err
,
4400 unsigned long reason
)
4402 dispatch_assert_zero(reason
& ~(unsigned long)code_emask
);
4403 dmsg
->dmsg_error
= ((err
|| !reason
) ? err
:
4404 err_local
|err_sub(0x3e0)|(mach_error_t
)reason
);
4407 static inline unsigned long
4408 _dispatch_mach_msg_get_reason(dispatch_mach_msg_t dmsg
, mach_error_t
*err_ptr
)
4410 mach_error_t err
= dmsg
->dmsg_error
;
4412 dmsg
->dmsg_error
= 0;
4413 if ((err
& system_emask
) == err_local
&& err_get_sub(err
) == 0x3e0) {
4415 return err_get_code(err
);
4418 return err
? DISPATCH_MACH_MESSAGE_SEND_FAILED
: DISPATCH_MACH_MESSAGE_SENT
;
4422 _dispatch_mach_msg_recv(dispatch_mach_t dm
, dispatch_mach_reply_refs_t dmr
,
4423 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
4425 _dispatch_debug_machport(hdr
->msgh_remote_port
);
4426 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
4427 hdr
->msgh_local_port
, hdr
->msgh_id
, hdr
->msgh_remote_port
);
4428 bool canceled
= (dm
->dq_atomic_flags
& DSF_CANCELED
);
4429 if (!dmr
&& canceled
) {
4430 // message received after cancellation, _dispatch_mach_kevent_merge is
4431 // responsible for mach channel source state (e.g. deferred deletion)
4432 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
4434 dispatch_mach_msg_t dmsg
;
4436 pthread_priority_t priority
;
4439 _voucher_mach_msg_clear(hdr
, false); // deallocate reply message voucher
4440 voucher
= dmr
->dmr_voucher
;
4441 dmr
->dmr_voucher
= NULL
; // transfer reference
4442 priority
= dmr
->dmr_priority
;
4443 ctxt
= dmr
->dmr_ctxt
;
4444 unsigned int options
= DKEV_DISPOSE_IMMEDIATE_DELETE
;
4445 options
|= DKEV_UNREGISTER_REPLY_REMOVE
;
4446 options
|= DKEV_UNREGISTER_WAKEUP
;
4447 if (canceled
) options
|= DKEV_UNREGISTER_DISCONNECTED
;
4448 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, options
);
4449 ke
->flags
|= EV_DELETE
; // remember that unregister deleted the event
4450 if (canceled
) return;
4452 voucher
= voucher_create_with_mach_msg(hdr
);
4453 priority
= _voucher_get_priority(voucher
);
4455 dispatch_mach_msg_destructor_t destructor
;
4456 destructor
= (hdr
== _dispatch_kevent_mach_msg_buf(ke
)) ?
4457 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
4458 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
4459 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
4460 if (hdr
== _dispatch_kevent_mach_msg_buf(ke
)) {
4461 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move
, (uint64_t)hdr
, (uint64_t)dmsg
->dmsg_buf
);
4463 dmsg
->dmsg_voucher
= voucher
;
4464 dmsg
->dmsg_priority
= priority
;
4465 dmsg
->do_ctxt
= ctxt
;
4466 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_MESSAGE_RECEIVED
);
4467 _dispatch_voucher_debug("mach-msg[%p] create", voucher
, dmsg
);
4468 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
4469 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4472 DISPATCH_ALWAYS_INLINE
4473 static inline dispatch_mach_msg_t
4474 _dispatch_mach_msg_reply_recv(dispatch_mach_t dm
,
4475 dispatch_mach_reply_refs_t dmr
, mach_port_t reply_port
)
4477 if (slowpath(!MACH_PORT_VALID(reply_port
))) {
4478 DISPATCH_CLIENT_CRASH(reply_port
, "Invalid reply port");
4480 void *ctxt
= dmr
->dmr_ctxt
;
4481 mach_msg_header_t
*hdr
, *hdr2
= NULL
;
4482 void *hdr_copyout_addr
;
4483 mach_msg_size_t siz
, msgsiz
= 0;
4484 mach_msg_return_t kr
;
4485 mach_msg_option_t options
;
4486 siz
= mach_vm_round_page(_dispatch_mach_recv_msg_size
+
4487 dispatch_mach_trailer_size
);
4489 for (mach_vm_address_t p
= mach_vm_trunc_page(hdr
+ vm_page_size
);
4490 p
< (mach_vm_address_t
)hdr
+ siz
; p
+= vm_page_size
) {
4491 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
4493 options
= DISPATCH_MACH_RCV_OPTIONS
& (~MACH_RCV_VOUCHER
);
4495 _dispatch_debug_machport(reply_port
);
4496 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG %s", reply_port
,
4497 (options
& MACH_RCV_TIMEOUT
) ? "poll" : "wait");
4498 kr
= mach_msg(hdr
, options
, 0, siz
, reply_port
, MACH_MSG_TIMEOUT_NONE
,
4500 hdr_copyout_addr
= hdr
;
4501 _dispatch_debug_machport(reply_port
);
4502 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG (size %u, opts 0x%x) "
4503 "returned: %s - 0x%x", reply_port
, siz
, options
,
4504 mach_error_string(kr
), kr
);
4506 case MACH_RCV_TOO_LARGE
:
4507 if (!fastpath(hdr
->msgh_size
<= UINT_MAX
-
4508 dispatch_mach_trailer_size
)) {
4509 DISPATCH_CLIENT_CRASH(hdr
->msgh_size
, "Overlarge message");
4511 if (options
& MACH_RCV_LARGE
) {
4512 msgsiz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
4513 hdr2
= malloc(msgsiz
);
4514 if (dispatch_assume(hdr2
)) {
4518 options
|= MACH_RCV_TIMEOUT
;
4519 options
&= ~MACH_RCV_LARGE
;
4522 _dispatch_log("BUG in libdispatch client: "
4523 "dispatch_mach_send_and_wait_for_reply: dropped message too "
4524 "large to fit in memory: id = 0x%x, size = %u", hdr
->msgh_id
,
4527 case MACH_RCV_INVALID_NAME
: // rdar://problem/21963848
4528 case MACH_RCV_PORT_CHANGED
: // rdar://problem/21885327
4529 case MACH_RCV_PORT_DIED
:
4530 // channel was disconnected/canceled and reply port destroyed
4531 _dispatch_debug("machport[0x%08x]: sync reply port destroyed, ctxt %p: "
4532 "%s - 0x%x", reply_port
, ctxt
, mach_error_string(kr
), kr
);
4534 case MACH_MSG_SUCCESS
:
4535 if (hdr
->msgh_remote_port
) {
4536 _dispatch_debug_machport(hdr
->msgh_remote_port
);
4538 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, size = %u, "
4539 "reply on 0x%08x", hdr
->msgh_local_port
, hdr
->msgh_id
,
4540 hdr
->msgh_size
, hdr
->msgh_remote_port
);
4541 siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
4542 if (hdr2
&& siz
< msgsiz
) {
4543 void *shrink
= realloc(hdr2
, msgsiz
);
4544 if (shrink
) hdr
= hdr2
= shrink
;
4548 dispatch_assume_zero(kr
);
4551 _dispatch_mach_msg_reply_received(dm
, dmr
, hdr
->msgh_local_port
);
4552 hdr
->msgh_local_port
= MACH_PORT_NULL
;
4553 if (slowpath((dm
->dq_atomic_flags
& DSF_CANCELED
) || kr
)) {
4554 if (!kr
) mach_msg_destroy(hdr
);
4557 dispatch_mach_msg_t dmsg
;
4558 dispatch_mach_msg_destructor_t destructor
= (!hdr2
) ?
4559 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
4560 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
4561 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
4562 if (!hdr2
|| hdr
!= hdr_copyout_addr
) {
4563 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move
, (uint64_t)hdr_copyout_addr
, (uint64_t)_dispatch_mach_msg_get_msg(dmsg
));
4565 dmsg
->do_ctxt
= ctxt
;
4573 _dispatch_mach_msg_reply_received(dispatch_mach_t dm
,
4574 dispatch_mach_reply_refs_t dmr
, mach_port_t local_port
)
4576 bool removed
= _dispatch_mach_reply_tryremove(dm
, dmr
);
4577 if (!MACH_PORT_VALID(local_port
) || !removed
) {
4578 // port moved/destroyed during receive, or reply waiter was never
4579 // registered or already removed (disconnected)
4582 mach_port_t reply_port
= _dispatch_mach_reply_get_reply_port(dmr
);
4583 _dispatch_debug("machport[0x%08x]: unregistered for sync reply, ctxt %p",
4584 reply_port
, dmr
->dmr_ctxt
);
4585 if (_dispatch_mach_reply_is_reply_port_owned(dmr
)) {
4586 _dispatch_set_thread_reply_port(reply_port
);
4587 if (local_port
!= reply_port
) {
4588 DISPATCH_CLIENT_CRASH(local_port
,
4589 "Reply received on unexpected port");
4593 mach_msg_header_t
*hdr
;
4594 dispatch_mach_msg_t dmsg
;
4595 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4596 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4597 hdr
->msgh_local_port
= local_port
;
4598 dmsg
->dmsg_voucher
= dmr
->dmr_voucher
;
4599 dmr
->dmr_voucher
= NULL
; // transfer reference
4600 dmsg
->dmsg_priority
= dmr
->dmr_priority
;
4601 dmsg
->do_ctxt
= dmr
->dmr_ctxt
;
4602 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_REPLY_RECEIVED
);
4603 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4607 _dispatch_mach_msg_disconnected(dispatch_mach_t dm
, mach_port_t local_port
,
4608 mach_port_t remote_port
)
4610 mach_msg_header_t
*hdr
;
4611 dispatch_mach_msg_t dmsg
;
4612 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4613 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4614 if (local_port
) hdr
->msgh_local_port
= local_port
;
4615 if (remote_port
) hdr
->msgh_remote_port
= remote_port
;
4616 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_DISCONNECTED
);
4617 _dispatch_debug("machport[0x%08x]: %s right disconnected", local_port
?
4618 local_port
: remote_port
, local_port
? "receive" : "send");
4619 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4622 static inline dispatch_mach_msg_t
4623 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou
,
4624 dispatch_mach_reply_refs_t dmr
)
4626 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
4627 mach_port_t reply_port
= dmsg
? dmsg
->dmsg_reply
:
4628 _dispatch_mach_reply_get_reply_port(dmr
);
4633 v
= dmr
->dmr_voucher
;
4634 dmr
->dmr_voucher
= NULL
; // transfer reference
4635 if (v
) _voucher_release(v
);
4641 v
= dmsg
->dmsg_voucher
;
4642 if (v
) _voucher_retain(v
);
4644 v
= dmr
->dmr_voucher
;
4645 dmr
->dmr_voucher
= NULL
; // transfer reference
4648 if ((dmsg
&& (dmsg
->dmsg_options
& DISPATCH_MACH_WAIT_FOR_REPLY
) &&
4649 (dmsg
->dmsg_options
& DISPATCH_MACH_OWNED_REPLY_PORT
)) ||
4650 (dmr
&& !dmr
->dmr_dkev
&&
4651 _dispatch_mach_reply_is_reply_port_owned(dmr
))) {
4652 if (v
) _voucher_release(v
);
4653 // deallocate owned reply port to break _dispatch_mach_msg_reply_recv
4654 // out of waiting in mach_msg(MACH_RCV_MSG)
4655 kern_return_t kr
= mach_port_mod_refs(mach_task_self(), reply_port
,
4656 MACH_PORT_RIGHT_RECEIVE
, -1);
4657 DISPATCH_VERIFY_MIG(kr
);
4658 dispatch_assume_zero(kr
);
4662 mach_msg_header_t
*hdr
;
4663 dmsgr
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4664 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4665 dmsgr
->dmsg_voucher
= v
;
4666 hdr
->msgh_local_port
= reply_port
;
4668 dmsgr
->dmsg_priority
= dmsg
->dmsg_priority
;
4669 dmsgr
->do_ctxt
= dmsg
->do_ctxt
;
4671 dmsgr
->dmsg_priority
= dmr
->dmr_priority
;
4672 dmsgr
->do_ctxt
= dmr
->dmr_ctxt
;
4674 _dispatch_mach_msg_set_reason(dmsgr
, 0, DISPATCH_MACH_DISCONNECTED
);
4675 _dispatch_debug("machport[0x%08x]: reply disconnected, ctxt %p",
4676 hdr
->msgh_local_port
, dmsgr
->do_ctxt
);
4682 _dispatch_mach_msg_not_sent(dispatch_mach_t dm
, dispatch_object_t dou
)
4684 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
4685 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
4686 mach_msg_option_t msg_opts
= dmsg
->dmsg_options
;
4687 _dispatch_debug("machport[0x%08x]: not sent msg id 0x%x, ctxt %p, "
4688 "msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x",
4689 msg
->msgh_remote_port
, msg
->msgh_id
, dmsg
->do_ctxt
,
4690 msg_opts
, msg
->msgh_voucher_port
, dmsg
->dmsg_reply
);
4691 unsigned long reason
= (msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
) ?
4692 0 : DISPATCH_MACH_MESSAGE_NOT_SENT
;
4693 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
4694 _dispatch_mach_msg_set_reason(dmsg
, 0, reason
);
4695 _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4696 if (dmsgr
) _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4701 _dispatch_mach_msg_send(dispatch_mach_t dm
, dispatch_object_t dou
,
4702 dispatch_mach_reply_refs_t dmr
, pthread_priority_t pp
,
4703 dispatch_mach_send_invoke_flags_t send_flags
)
4705 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4706 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
= NULL
;
4707 voucher_t voucher
= dmsg
->dmsg_voucher
;
4708 mach_voucher_t ipc_kvoucher
= MACH_VOUCHER_NULL
;
4709 uint32_t send_status
= 0;
4710 bool clear_voucher
= false, kvoucher_move_send
= false;
4711 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
4712 bool is_reply
= (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) ==
4713 MACH_MSG_TYPE_MOVE_SEND_ONCE
);
4714 mach_port_t reply_port
= dmsg
->dmsg_reply
;
4716 dr
->dm_needs_mgr
= 0;
4717 if (unlikely(dr
->dm_checkin
&& dmsg
!= dr
->dm_checkin
)) {
4718 // send initial checkin message
4719 if (dm
->dm_dkev
&& slowpath(_dispatch_queue_get_current() !=
4720 &_dispatch_mgr_q
)) {
4721 // send kevent must be uninstalled on the manager queue
4722 dr
->dm_needs_mgr
= 1;
4725 if (unlikely(!_dispatch_mach_msg_send(dm
,
4726 dr
->dm_checkin
, NULL
, pp
, DM_SEND_INVOKE_NONE
))) {
4729 dr
->dm_checkin
= NULL
;
4732 mach_msg_return_t kr
= 0;
4733 mach_msg_option_t opts
= 0, msg_opts
= dmsg
->dmsg_options
;
4734 if (!(msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
)) {
4735 mach_msg_priority_t msg_priority
= MACH_MSG_PRIORITY_UNSPECIFIED
;
4736 opts
= MACH_SEND_MSG
| (msg_opts
& ~DISPATCH_MACH_OPTIONS_MASK
);
4738 if (dmsg
!= dr
->dm_checkin
) {
4739 msg
->msgh_remote_port
= dr
->dm_send
;
4741 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
4742 if (slowpath(!dm
->dm_dkev
)) {
4743 _dispatch_mach_notification_kevent_register(dm
,
4744 msg
->msgh_remote_port
);
4746 if (fastpath(dm
->dm_dkev
)) {
4747 if (DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) {
4750 opts
|= MACH_SEND_NOTIFY
;
4753 opts
|= MACH_SEND_TIMEOUT
;
4754 if (dmsg
->dmsg_priority
!= _voucher_get_priority(voucher
)) {
4755 ipc_kvoucher
= _voucher_create_mach_voucher_with_priority(
4756 voucher
, dmsg
->dmsg_priority
);
4758 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher
, dmsg
);
4760 kvoucher_move_send
= true;
4761 clear_voucher
= _voucher_mach_msg_set_mach_voucher(msg
,
4762 ipc_kvoucher
, kvoucher_move_send
);
4764 clear_voucher
= _voucher_mach_msg_set(msg
, voucher
);
4766 if (pp
&& _dispatch_evfilt_machport_direct_enabled
) {
4767 opts
|= MACH_SEND_OVERRIDE
;
4768 msg_priority
= (mach_msg_priority_t
)pp
;
4771 _dispatch_debug_machport(msg
->msgh_remote_port
);
4772 if (reply_port
) _dispatch_debug_machport(reply_port
);
4773 if (msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
) {
4774 if (msg_opts
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
4775 _dispatch_clear_thread_reply_port(reply_port
);
4777 _dispatch_mach_reply_waiter_register(dm
, dmr
, reply_port
, dmsg
,
4780 kr
= mach_msg(msg
, opts
, msg
->msgh_size
, 0, MACH_PORT_NULL
, 0,
4782 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
4783 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
4784 "%s - 0x%x", msg
->msgh_remote_port
, msg
->msgh_id
, dmsg
->do_ctxt
,
4785 opts
, msg_opts
, msg
->msgh_voucher_port
, reply_port
,
4786 mach_error_string(kr
), kr
);
4787 if (unlikely(kr
&& (msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
))) {
4788 _dispatch_mach_reply_waiter_unregister(dm
, dmr
,
4789 DKEV_UNREGISTER_REPLY_REMOVE
);
4791 if (clear_voucher
) {
4792 if (kr
== MACH_SEND_INVALID_VOUCHER
&& msg
->msgh_voucher_port
) {
4793 DISPATCH_CLIENT_CRASH(kr
, "Voucher port corruption");
4796 kv
= _voucher_mach_msg_clear(msg
, kvoucher_move_send
);
4797 if (kvoucher_move_send
) ipc_kvoucher
= kv
;
4800 if (kr
== MACH_SEND_TIMED_OUT
&& (opts
& MACH_SEND_TIMEOUT
)) {
4801 if (opts
& MACH_SEND_NOTIFY
) {
4802 _dispatch_debug("machport[0x%08x]: send-possible notification "
4803 "armed", (mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
);
4804 DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
) = 1;
4806 // send kevent must be installed on the manager queue
4807 dr
->dm_needs_mgr
= 1;
4810 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher
);
4811 voucher_t ipc_voucher
;
4812 ipc_voucher
= _voucher_create_with_priority_and_mach_voucher(
4813 voucher
, dmsg
->dmsg_priority
, ipc_kvoucher
);
4814 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
4815 ipc_voucher
, dmsg
, voucher
);
4816 if (dmsg
->dmsg_voucher
) _voucher_release(dmsg
->dmsg_voucher
);
4817 dmsg
->dmsg_voucher
= ipc_voucher
;
4820 } else if (ipc_kvoucher
&& (kr
|| !kvoucher_move_send
)) {
4821 _voucher_dealloc_mach_voucher(ipc_kvoucher
);
4823 if (!(msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
) && !kr
&& reply_port
&&
4824 !(dm
->ds_dkev
&& dm
->ds_dkev
->dk_kevent
.ident
== reply_port
)) {
4825 if (!dm
->ds_is_direct_kevent
&&
4826 _dispatch_queue_get_current() != &_dispatch_mgr_q
) {
4827 // reply receive kevent must be installed on the manager queue
4828 dr
->dm_needs_mgr
= 1;
4829 dmsg
->dmsg_options
= msg_opts
| DISPATCH_MACH_REGISTER_FOR_REPLY
;
4832 _dispatch_mach_reply_kevent_register(dm
, reply_port
, dmsg
);
4834 if (unlikely(!is_reply
&& dmsg
== dr
->dm_checkin
&& dm
->dm_dkev
)) {
4835 _dispatch_mach_notification_kevent_unregister(dm
);
4838 // Send failed, so reply was never registered <rdar://problem/14309159>
4839 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
4841 _dispatch_mach_msg_set_reason(dmsg
, kr
, 0);
4842 if ((send_flags
& DM_SEND_INVOKE_IMMEDIATE_SEND
) &&
4843 (msg_opts
& DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
)) {
4844 // Return sent message synchronously <rdar://problem/25947334>
4845 send_status
|= DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
;
4847 _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4849 if (dmsgr
) _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4850 send_status
|= DM_SEND_STATUS_SUCCESS
;
4856 #pragma mark dispatch_mach_send_refs_t
4858 static void _dispatch_mach_cancel(dispatch_mach_t dm
);
4859 static void _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm
,
4860 pthread_priority_t pp
);
4862 DISPATCH_ALWAYS_INLINE
4863 static inline pthread_priority_t
4864 _dm_state_get_override(uint64_t dm_state
)
4866 dm_state
&= DISPATCH_MACH_STATE_OVERRIDE_MASK
;
4867 return (pthread_priority_t
)(dm_state
>> 32);
4870 DISPATCH_ALWAYS_INLINE
4871 static inline uint64_t
4872 _dm_state_override_from_priority(pthread_priority_t pp
)
4874 uint64_t pp_state
= pp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4875 return pp_state
<< 32;
4878 DISPATCH_ALWAYS_INLINE
4880 _dm_state_needs_override(uint64_t dm_state
, uint64_t pp_state
)
4882 return (pp_state
> (dm_state
& DISPATCH_MACH_STATE_OVERRIDE_MASK
));
4885 DISPATCH_ALWAYS_INLINE
4886 static inline uint64_t
4887 _dm_state_merge_override(uint64_t dm_state
, uint64_t pp_state
)
4889 if (_dm_state_needs_override(dm_state
, pp_state
)) {
4890 dm_state
&= ~DISPATCH_MACH_STATE_OVERRIDE_MASK
;
4891 dm_state
|= pp_state
;
4892 dm_state
|= DISPATCH_MACH_STATE_DIRTY
;
4893 dm_state
|= DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
4898 #define _dispatch_mach_send_push_update_tail(dr, tail) \
4899 os_mpsc_push_update_tail(dr, dm, tail, do_next)
4900 #define _dispatch_mach_send_push_update_head(dr, head) \
4901 os_mpsc_push_update_head(dr, dm, head)
4902 #define _dispatch_mach_send_get_head(dr) \
4903 os_mpsc_get_head(dr, dm)
4904 #define _dispatch_mach_send_unpop_head(dr, dc, dc_next) \
4905 os_mpsc_undo_pop_head(dr, dm, dc, dc_next, do_next)
4906 #define _dispatch_mach_send_pop_head(dr, head) \
4907 os_mpsc_pop_head(dr, dm, head, do_next)
4909 DISPATCH_ALWAYS_INLINE
4911 _dispatch_mach_send_push_inline(dispatch_mach_send_refs_t dr
,
4912 dispatch_object_t dou
)
4914 if (_dispatch_mach_send_push_update_tail(dr
, dou
._do
)) {
4915 _dispatch_mach_send_push_update_head(dr
, dou
._do
);
4923 _dispatch_mach_send_drain(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
,
4924 dispatch_mach_send_invoke_flags_t send_flags
)
4926 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4927 dispatch_mach_reply_refs_t dmr
;
4928 dispatch_mach_msg_t dmsg
;
4929 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
4930 pthread_priority_t pp
= _dm_state_get_override(dr
->dm_state
);
4931 uint64_t old_state
, new_state
;
4932 uint32_t send_status
;
4933 bool needs_mgr
, disconnecting
, returning_send_result
= false;
4936 needs_mgr
= false; disconnecting
= false;
4937 while (dr
->dm_tail
) {
4938 dc
= _dispatch_mach_send_get_head(dr
);
4940 dispatch_mach_send_invoke_flags_t sf
= send_flags
;
4941 // Only request immediate send result for the first message
4942 send_flags
&= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK
;
4943 next_dc
= _dispatch_mach_send_pop_head(dr
, dc
);
4944 if (_dispatch_object_has_type(dc
,
4945 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER
))) {
4946 if (!(send_flags
& DM_SEND_INVOKE_CAN_RUN_BARRIER
)) {
4949 _dispatch_continuation_pop(dc
, dm
->_as_dq
, flags
);
4952 if (_dispatch_object_is_slow_item(dc
)) {
4953 dmsg
= ((dispatch_continuation_t
)dc
)->dc_data
;
4954 dmr
= ((dispatch_continuation_t
)dc
)->dc_other
;
4955 } else if (_dispatch_object_has_vtable(dc
)) {
4956 dmsg
= (dispatch_mach_msg_t
)dc
;
4959 if ((dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
) &&
4960 (_dispatch_queue_get_current() != &_dispatch_mgr_q
)) {
4961 // send kevent must be uninstalled on the manager queue
4965 if (unlikely(!_dispatch_mach_reconnect_invoke(dm
, dc
))) {
4966 disconnecting
= true;
4971 _dispatch_voucher_ktrace_dmsg_pop(dmsg
);
4972 if (unlikely(dr
->dm_disconnect_cnt
||
4973 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
4974 _dispatch_mach_msg_not_sent(dm
, dmsg
);
4977 send_status
= _dispatch_mach_msg_send(dm
, dmsg
, dmr
, pp
, sf
);
4978 if (unlikely(!send_status
)) {
4981 if (send_status
& DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
) {
4982 returning_send_result
= true;
4984 } while ((dc
= next_dc
));
4987 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
4988 if (old_state
& DISPATCH_MACH_STATE_DIRTY
) {
4989 new_state
= old_state
;
4990 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
4991 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
4992 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5001 // if this is not a complete drain, we must undo some things
5002 _dispatch_mach_send_unpop_head(dr
, dc
, next_dc
);
5004 if (_dispatch_object_has_type(dc
,
5005 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER
))) {
5006 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5007 new_state
= old_state
;
5008 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5009 new_state
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5010 new_state
&= ~DISPATCH_MACH_STATE_UNLOCK_MASK
;
5013 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5014 new_state
= old_state
;
5015 if (old_state
& (DISPATCH_MACH_STATE_DIRTY
|
5016 DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
)) {
5017 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5018 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5019 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5021 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5022 new_state
&= ~DISPATCH_MACH_STATE_UNLOCK_MASK
;
5028 if (old_state
& DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
) {
5029 // Ensure that the root queue sees that this thread was overridden.
5030 _dispatch_set_defaultpriority_override();
5033 if (unlikely(new_state
& DISPATCH_MACH_STATE_UNLOCK_MASK
)) {
5034 os_atomic_thread_fence(acquire
);
5035 pp
= _dm_state_get_override(new_state
);
5039 if (new_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5040 pp
= _dm_state_get_override(new_state
);
5041 _dispatch_mach_send_barrier_drain_push(dm
, pp
);
5043 if (needs_mgr
|| dr
->dm_needs_mgr
) {
5044 pp
= _dm_state_get_override(new_state
);
5048 if (!disconnecting
) dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_FLUSH
);
5050 return returning_send_result
;
5055 _dispatch_mach_send_invoke(dispatch_mach_t dm
,
5056 dispatch_invoke_flags_t flags
,
5057 dispatch_mach_send_invoke_flags_t send_flags
)
5059 dispatch_lock_owner tid_self
= _dispatch_tid_self();
5060 uint64_t old_state
, new_state
;
5061 pthread_priority_t pp_floor
;
5063 uint64_t canlock_mask
= DISPATCH_MACH_STATE_UNLOCK_MASK
;
5064 uint64_t canlock_state
= 0;
5066 if (send_flags
& DM_SEND_INVOKE_NEEDS_BARRIER
) {
5067 canlock_mask
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5068 canlock_state
= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5069 } else if (!(send_flags
& DM_SEND_INVOKE_CAN_RUN_BARRIER
)) {
5070 canlock_mask
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5073 if (flags
& DISPATCH_INVOKE_MANAGER_DRAIN
) {
5076 // _dispatch_queue_class_invoke will have applied the queue override
5077 // (if any) before we get here. Else use the default base priority
5078 // as an estimation of the priority we already asked for.
5079 pp_floor
= dm
->_as_dq
->dq_override
;
5081 pp_floor
= _dispatch_get_defaultpriority();
5082 pp_floor
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5087 os_atomic_rmw_loop2o(dm
->dm_refs
, dm_state
, old_state
, new_state
, acquire
, {
5088 new_state
= old_state
;
5089 if (unlikely((old_state
& canlock_mask
) != canlock_state
)) {
5090 if (!(send_flags
& DM_SEND_INVOKE_FLUSH
)) {
5091 os_atomic_rmw_loop_give_up(break);
5093 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5095 if (likely(pp_floor
)) {
5096 pthread_priority_t pp
= _dm_state_get_override(old_state
);
5097 if (unlikely(pp
> pp_floor
)) {
5098 os_atomic_rmw_loop_give_up({
5099 _dispatch_wqthread_override_start(tid_self
, pp
);
5100 // Ensure that the root queue sees
5101 // that this thread was overridden.
5102 _dispatch_set_defaultpriority_override();
5108 new_state
|= tid_self
;
5109 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5110 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5111 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5115 if (unlikely((old_state
& canlock_mask
) != canlock_state
)) {
5118 if (send_flags
& DM_SEND_INVOKE_CANCEL
) {
5119 _dispatch_mach_cancel(dm
);
5121 _dispatch_mach_send_drain(dm
, flags
, send_flags
);
5126 _dispatch_mach_send_barrier_drain_invoke(dispatch_continuation_t dc
,
5127 dispatch_invoke_flags_t flags
)
5129 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
5130 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5131 dispatch_thread_frame_s dtf
;
5133 DISPATCH_COMPILER_CAN_ASSUME(dc
->dc_priority
== DISPATCH_NO_PRIORITY
);
5134 DISPATCH_COMPILER_CAN_ASSUME(dc
->dc_voucher
== DISPATCH_NO_VOUCHER
);
5135 // hide the mach channel (see _dispatch_mach_barrier_invoke comment)
5136 _dispatch_thread_frame_stash(&dtf
);
5137 _dispatch_continuation_pop_forwarded(dc
, DISPATCH_NO_VOUCHER
, dc_flags
,{
5138 _dispatch_mach_send_invoke(dm
, flags
,
5139 DM_SEND_INVOKE_NEEDS_BARRIER
| DM_SEND_INVOKE_CAN_RUN_BARRIER
);
5141 _dispatch_thread_frame_unstash(&dtf
);
5146 _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm
,
5147 pthread_priority_t pp
)
5149 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5151 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRRIER_DRAIN
);
5154 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5155 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5156 return _dispatch_queue_push(dm
->_as_dq
, dc
, pp
);
5161 _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_continuation_t dc
,
5162 pthread_priority_t pp
)
5164 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5165 uint64_t pp_state
, old_state
, new_state
, state_flags
= 0;
5166 dispatch_lock_owner owner
;
5169 // <rdar://problem/25896179> when pushing a send barrier that destroys
5170 // the last reference to this channel, and the send queue is already
5171 // draining on another thread, the send barrier may run as soon as
5172 // _dispatch_mach_send_push_inline() returns.
5173 _dispatch_retain(dm
);
5174 pp_state
= _dm_state_override_from_priority(pp
);
5176 wakeup
= _dispatch_mach_send_push_inline(dr
, dc
);
5178 state_flags
= DISPATCH_MACH_STATE_DIRTY
;
5179 if (dc
->do_vtable
== DC_VTABLE(MACH_SEND_BARRIER
)) {
5180 state_flags
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5185 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5186 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5187 new_state
|= state_flags
;
5190 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, relaxed
, {
5191 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5192 if (old_state
== new_state
) {
5193 os_atomic_rmw_loop_give_up(break);
5198 pp
= _dm_state_get_override(new_state
);
5199 owner
= _dispatch_lock_owner((dispatch_lock
)old_state
);
5201 if (_dm_state_needs_override(old_state
, pp_state
)) {
5202 _dispatch_wqthread_override_start_check_owner(owner
, pp
,
5203 &dr
->dm_state_lock
.dul_lock
);
5205 return _dispatch_release_tailcall(dm
);
5208 dispatch_wakeup_flags_t wflags
= 0;
5209 if (state_flags
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5210 _dispatch_mach_send_barrier_drain_push(dm
, pp
);
5211 } else if (wakeup
|| dr
->dm_disconnect_cnt
||
5212 (dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5213 wflags
= DISPATCH_WAKEUP_FLUSH
| DISPATCH_WAKEUP_CONSUME
;
5214 } else if (old_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5215 wflags
= DISPATCH_WAKEUP_OVERRIDING
| DISPATCH_WAKEUP_CONSUME
;
5218 return dx_wakeup(dm
, pp
, wflags
);
5220 return _dispatch_release_tailcall(dm
);
5225 _dispatch_mach_send_push_and_trydrain(dispatch_mach_t dm
,
5226 dispatch_object_t dou
, pthread_priority_t pp
,
5227 dispatch_mach_send_invoke_flags_t send_flags
)
5229 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5230 dispatch_lock_owner tid_self
= _dispatch_tid_self();
5231 uint64_t pp_state
, old_state
, new_state
, canlock_mask
, state_flags
= 0;
5232 dispatch_lock_owner owner
;
5234 pp_state
= _dm_state_override_from_priority(pp
);
5235 bool wakeup
= _dispatch_mach_send_push_inline(dr
, dou
);
5237 state_flags
= DISPATCH_MACH_STATE_DIRTY
;
5240 if (unlikely(dr
->dm_disconnect_cnt
||
5241 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
5242 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5243 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5244 new_state
|= state_flags
;
5246 dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_FLUSH
);
5250 canlock_mask
= DISPATCH_MACH_STATE_UNLOCK_MASK
|
5251 DISPATCH_MACH_STATE_PENDING_BARRIER
;
5253 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, seq_cst
, {
5254 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5255 new_state
|= state_flags
;
5256 if (likely((old_state
& canlock_mask
) == 0)) {
5257 new_state
|= tid_self
;
5258 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5259 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5260 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5264 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, acquire
, {
5265 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5266 if (new_state
== old_state
) {
5267 os_atomic_rmw_loop_give_up(return false);
5269 if (likely((old_state
& canlock_mask
) == 0)) {
5270 new_state
|= tid_self
;
5271 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5272 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5273 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5278 owner
= _dispatch_lock_owner((dispatch_lock
)old_state
);
5280 if (_dm_state_needs_override(old_state
, pp_state
)) {
5281 _dispatch_wqthread_override_start_check_owner(owner
, pp
,
5282 &dr
->dm_state_lock
.dul_lock
);
5287 if (old_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5288 dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
5292 // Ensure our message is still at the head of the queue and has not already
5293 // been dequeued by another thread that raced us to the send queue lock.
5294 // A plain load of the head and comparison against our object pointer is
5296 if (unlikely(!(wakeup
&& dou
._do
== dr
->dm_head
))) {
5297 // Don't request immediate send result for messages we don't own
5298 send_flags
&= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK
;
5300 return _dispatch_mach_send_drain(dm
, DISPATCH_INVOKE_NONE
, send_flags
);
5304 _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm
,
5305 const _dispatch_kevent_qos_s
*ke
)
5307 if (!(ke
->fflags
& dm
->ds_pending_data_mask
)) {
5310 _dispatch_mach_send_invoke(dm
, DISPATCH_INVOKE_MANAGER_DRAIN
,
5311 DM_SEND_INVOKE_FLUSH
);
5315 #pragma mark dispatch_mach_t
5317 static inline mach_msg_option_t
5318 _dispatch_mach_checkin_options(void)
5320 mach_msg_option_t options
= 0;
5321 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
5322 options
= MACH_SEND_NOIMPORTANCE
; // <rdar://problem/16996737>
5328 static inline mach_msg_option_t
5329 _dispatch_mach_send_options(void)
5331 mach_msg_option_t options
= 0;
5335 DISPATCH_ALWAYS_INLINE
5336 static inline pthread_priority_t
5337 _dispatch_mach_priority_propagate(mach_msg_option_t options
)
5339 #if DISPATCH_USE_NOIMPORTANCE_QOS
5340 if (options
& MACH_SEND_NOIMPORTANCE
) return 0;
5344 return _dispatch_priority_propagate();
5349 _dispatch_mach_send_msg(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5350 dispatch_continuation_t dc_wait
, mach_msg_option_t options
)
5352 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5353 if (slowpath(dmsg
->do_next
!= DISPATCH_OBJECT_LISTLESS
)) {
5354 DISPATCH_CLIENT_CRASH(dmsg
->do_next
, "Message already enqueued");
5356 dispatch_retain(dmsg
);
5357 pthread_priority_t priority
= _dispatch_mach_priority_propagate(options
);
5358 options
|= _dispatch_mach_send_options();
5359 dmsg
->dmsg_options
= options
;
5360 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
5361 dmsg
->dmsg_reply
= _dispatch_mach_msg_get_reply_port(dmsg
);
5362 bool is_reply
= (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) ==
5363 MACH_MSG_TYPE_MOVE_SEND_ONCE
);
5364 dmsg
->dmsg_priority
= priority
;
5365 dmsg
->dmsg_voucher
= _voucher_copy();
5366 _dispatch_voucher_debug("mach-msg[%p] set", dmsg
->dmsg_voucher
, dmsg
);
5368 uint32_t send_status
;
5369 bool returning_send_result
= false;
5370 dispatch_mach_send_invoke_flags_t send_flags
= DM_SEND_INVOKE_NONE
;
5371 if (options
& DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
) {
5372 send_flags
= DM_SEND_INVOKE_IMMEDIATE_SEND
;
5374 if (is_reply
&& !dmsg
->dmsg_reply
&& !dr
->dm_disconnect_cnt
&&
5375 !(dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5376 // replies are sent to a send-once right and don't need the send queue
5377 dispatch_assert(!dc_wait
);
5378 send_status
= _dispatch_mach_msg_send(dm
, dmsg
, NULL
, 0, send_flags
);
5379 dispatch_assert(send_status
);
5380 returning_send_result
= !!(send_status
&
5381 DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
);
5383 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
5384 priority
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5385 dispatch_object_t dou
= { ._dmsg
= dmsg
};
5386 if (dc_wait
) dou
._dc
= dc_wait
;
5387 returning_send_result
= _dispatch_mach_send_push_and_trydrain(dm
, dou
,
5388 priority
, send_flags
);
5390 if (returning_send_result
) {
5391 _dispatch_voucher_debug("mach-msg[%p] clear", dmsg
->dmsg_voucher
, dmsg
);
5392 if (dmsg
->dmsg_voucher
) _voucher_release(dmsg
->dmsg_voucher
);
5393 dmsg
->dmsg_voucher
= NULL
;
5394 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
5395 dispatch_release(dmsg
);
5397 return returning_send_result
;
5402 dispatch_mach_send(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5403 mach_msg_option_t options
)
5405 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5406 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5407 bool returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, NULL
,options
);
5408 dispatch_assert(!returned_send_result
);
5413 dispatch_mach_send_with_result(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5414 mach_msg_option_t options
, dispatch_mach_send_flags_t send_flags
,
5415 dispatch_mach_reason_t
*send_result
, mach_error_t
*send_error
)
5417 if (unlikely(send_flags
!= DISPATCH_MACH_SEND_DEFAULT
)) {
5418 DISPATCH_CLIENT_CRASH(send_flags
, "Invalid send flags");
5420 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5421 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5422 options
|= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
;
5423 bool returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, NULL
,options
);
5424 unsigned long reason
= DISPATCH_MACH_NEEDS_DEFERRED_SEND
;
5425 mach_error_t err
= 0;
5426 if (returned_send_result
) {
5427 reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5429 *send_result
= reason
;
5435 _dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm
,
5436 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
,
5437 bool *returned_send_result
)
5439 mach_port_t reply_port
= _dispatch_mach_msg_get_reply_port(dmsg
);
5441 // use per-thread mach reply port <rdar://24597802>
5442 reply_port
= _dispatch_get_thread_reply_port();
5443 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
5444 dispatch_assert(MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
) ==
5445 MACH_MSG_TYPE_MAKE_SEND_ONCE
);
5446 hdr
->msgh_local_port
= reply_port
;
5447 options
|= DISPATCH_MACH_OWNED_REPLY_PORT
;
5450 dispatch_mach_reply_refs_t dmr
;
5452 dmr
= _dispatch_calloc(1, sizeof(*dmr
));
5454 struct dispatch_mach_reply_refs_s dmr_buf
= { };
5457 struct dispatch_continuation_s dc_wait
= {
5458 .dc_flags
= DISPATCH_OBJ_SYNC_SLOW_BIT
,
5461 .dc_priority
= DISPATCH_NO_PRIORITY
,
5462 .dc_voucher
= DISPATCH_NO_VOUCHER
,
5464 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
5465 *returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, &dc_wait
,options
);
5466 if (options
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
5467 _dispatch_clear_thread_reply_port(reply_port
);
5469 dmsg
= _dispatch_mach_msg_reply_recv(dm
, dmr
, reply_port
);
5478 dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm
,
5479 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
)
5481 bool returned_send_result
;
5482 dispatch_mach_msg_t reply
;
5483 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5484 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5485 options
|= DISPATCH_MACH_WAIT_FOR_REPLY
;
5486 reply
= _dispatch_mach_send_and_wait_for_reply(dm
, dmsg
, options
,
5487 &returned_send_result
);
5488 dispatch_assert(!returned_send_result
);
5494 dispatch_mach_send_with_result_and_wait_for_reply(dispatch_mach_t dm
,
5495 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
,
5496 dispatch_mach_send_flags_t send_flags
,
5497 dispatch_mach_reason_t
*send_result
, mach_error_t
*send_error
)
5499 if (unlikely(send_flags
!= DISPATCH_MACH_SEND_DEFAULT
)) {
5500 DISPATCH_CLIENT_CRASH(send_flags
, "Invalid send flags");
5502 bool returned_send_result
;
5503 dispatch_mach_msg_t reply
;
5504 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5505 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5506 options
|= DISPATCH_MACH_WAIT_FOR_REPLY
;
5507 options
|= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
;
5508 reply
= _dispatch_mach_send_and_wait_for_reply(dm
, dmsg
, options
,
5509 &returned_send_result
);
5510 unsigned long reason
= DISPATCH_MACH_NEEDS_DEFERRED_SEND
;
5511 mach_error_t err
= 0;
5512 if (returned_send_result
) {
5513 reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5515 *send_result
= reason
;
5522 _dispatch_mach_disconnect(dispatch_mach_t dm
)
5524 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5527 _dispatch_mach_notification_kevent_unregister(dm
);
5529 if (MACH_PORT_VALID(dr
->dm_send
)) {
5530 _dispatch_mach_msg_disconnected(dm
, MACH_PORT_NULL
, dr
->dm_send
);
5532 dr
->dm_send
= MACH_PORT_NULL
;
5533 if (dr
->dm_checkin
) {
5534 _dispatch_mach_msg_not_sent(dm
, dr
->dm_checkin
);
5535 dr
->dm_checkin
= NULL
;
5537 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
5538 dispatch_mach_reply_refs_t dmr
, tmp
;
5539 TAILQ_FOREACH_SAFE(dmr
, &dm
->dm_refs
->dm_replies
, dmr_list
, tmp
) {
5540 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
5541 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
5542 if (dmr
->dmr_dkev
) {
5543 _dispatch_mach_reply_kevent_unregister(dm
, dmr
,
5544 DKEV_UNREGISTER_DISCONNECTED
);
5546 _dispatch_mach_reply_waiter_unregister(dm
, dmr
,
5547 DKEV_UNREGISTER_DISCONNECTED
);
5550 disconnected
= TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
);
5551 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
5552 return disconnected
;
5556 _dispatch_mach_cancel(dispatch_mach_t dm
)
5558 _dispatch_object_debug(dm
, "%s", __func__
);
5559 if (!_dispatch_mach_disconnect(dm
)) return;
5561 mach_port_t local_port
= (mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
;
5562 _dispatch_source_kevent_unregister(dm
->_as_ds
);
5563 if ((dm
->dq_atomic_flags
& DSF_STATE_MASK
) == DSF_DELETED
) {
5564 _dispatch_mach_msg_disconnected(dm
, local_port
, MACH_PORT_NULL
);
5567 _dispatch_queue_atomic_flags_set_and_clear(dm
->_as_dq
, DSF_DELETED
,
5568 DSF_ARMED
| DSF_DEFERRED_DELETE
);
5574 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
, dispatch_object_t dou
)
5576 if (!_dispatch_mach_disconnect(dm
)) return false;
5577 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5578 dr
->dm_checkin
= dou
._dc
->dc_data
;
5579 dr
->dm_send
= (mach_port_t
)dou
._dc
->dc_other
;
5580 _dispatch_continuation_free(dou
._dc
);
5581 (void)os_atomic_dec2o(dr
, dm_disconnect_cnt
, relaxed
);
5582 _dispatch_object_debug(dm
, "%s", __func__
);
5583 _dispatch_release(dm
); // <rdar://problem/26266265>
5589 dispatch_mach_reconnect(dispatch_mach_t dm
, mach_port_t send
,
5590 dispatch_mach_msg_t checkin
)
5592 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5593 (void)os_atomic_inc2o(dr
, dm_disconnect_cnt
, relaxed
);
5594 if (MACH_PORT_VALID(send
) && checkin
) {
5595 dispatch_retain(checkin
);
5596 checkin
->dmsg_options
= _dispatch_mach_checkin_options();
5597 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
5600 dr
->dm_checkin_port
= MACH_PORT_NULL
;
5602 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5603 dc
->dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5604 // actually called manually in _dispatch_mach_send_drain
5605 dc
->dc_func
= (void*)_dispatch_mach_reconnect_invoke
;
5607 dc
->dc_data
= checkin
;
5608 dc
->dc_other
= (void*)(uintptr_t)send
;
5609 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5610 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5611 _dispatch_retain(dm
); // <rdar://problem/26266265>
5612 return _dispatch_mach_send_push(dm
, dc
, 0);
5617 dispatch_mach_get_checkin_port(dispatch_mach_t dm
)
5619 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5620 if (slowpath(dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5621 return MACH_PORT_DEAD
;
5623 return dr
->dm_checkin_port
;
5628 _dispatch_mach_connect_invoke(dispatch_mach_t dm
)
5630 dispatch_mach_refs_t dr
= dm
->ds_refs
;
5631 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5632 DISPATCH_MACH_CONNECTED
, NULL
, 0, dr
->dm_handler_func
);
5633 dm
->dm_connect_handler_called
= 1;
5638 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg
,
5639 dispatch_invoke_flags_t flags
)
5641 dispatch_thread_frame_s dtf
;
5642 dispatch_mach_refs_t dr
;
5645 unsigned long reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5646 _dispatch_thread_set_self_t adopt_flags
= DISPATCH_PRIORITY_ENFORCE
|
5647 DISPATCH_VOUCHER_CONSUME
|DISPATCH_VOUCHER_REPLACE
;
5649 // hide mach channel
5650 dm
= (dispatch_mach_t
)_dispatch_thread_frame_stash(&dtf
);
5652 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
5653 _dispatch_voucher_ktrace_dmsg_pop(dmsg
);
5654 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg
->dmsg_voucher
, dmsg
);
5655 (void)_dispatch_adopt_priority_and_set_voucher(dmsg
->dmsg_priority
,
5656 dmsg
->dmsg_voucher
, adopt_flags
);
5657 dmsg
->dmsg_voucher
= NULL
;
5658 dispatch_invoke_with_autoreleasepool(flags
, {
5659 if (slowpath(!dm
->dm_connect_handler_called
)) {
5660 _dispatch_mach_connect_invoke(dm
);
5662 _dispatch_client_callout4(dr
->dm_handler_ctxt
, reason
, dmsg
, err
,
5663 dr
->dm_handler_func
);
5665 _dispatch_thread_frame_unstash(&dtf
);
5666 _dispatch_introspection_queue_item_complete(dmsg
);
5667 dispatch_release(dmsg
);
5672 _dispatch_mach_barrier_invoke(dispatch_continuation_t dc
,
5673 dispatch_invoke_flags_t flags
)
5675 dispatch_thread_frame_s dtf
;
5676 dispatch_mach_t dm
= dc
->dc_other
;
5677 dispatch_mach_refs_t dr
;
5678 uintptr_t dc_flags
= (uintptr_t)dc
->dc_data
;
5679 unsigned long type
= dc_type(dc
);
5681 // hide mach channel from clients
5682 if (type
== DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER
)) {
5683 // on the send queue, the mach channel isn't the current queue
5684 // its target queue is the current one already
5685 _dispatch_thread_frame_stash(&dtf
);
5688 DISPATCH_COMPILER_CAN_ASSUME(dc_flags
& DISPATCH_OBJ_CONSUME_BIT
);
5689 _dispatch_continuation_pop_forwarded(dc
, dm
->dq_override_voucher
, dc_flags
,{
5690 dispatch_invoke_with_autoreleasepool(flags
, {
5691 if (slowpath(!dm
->dm_connect_handler_called
)) {
5692 _dispatch_mach_connect_invoke(dm
);
5694 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
5695 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5696 DISPATCH_MACH_BARRIER_COMPLETED
, NULL
, 0,
5697 dr
->dm_handler_func
);
5700 if (type
== DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER
)) {
5701 _dispatch_thread_frame_unstash(&dtf
);
5707 dispatch_mach_send_barrier_f(dispatch_mach_t dm
, void *context
,
5708 dispatch_function_t func
)
5710 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5711 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5712 pthread_priority_t pp
;
5714 _dispatch_continuation_init_f(dc
, dm
, context
, func
, 0, 0, dc_flags
);
5715 dc
->dc_data
= (void *)dc
->dc_flags
;
5717 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRIER
);
5718 _dispatch_trace_continuation_push(dm
->_as_dq
, dc
);
5719 pp
= _dispatch_continuation_get_override_priority(dm
->_as_dq
, dc
);
5720 return _dispatch_mach_send_push(dm
, dc
, pp
);
5725 dispatch_mach_send_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
5727 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5728 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5729 pthread_priority_t pp
;
5731 _dispatch_continuation_init(dc
, dm
, barrier
, 0, 0, dc_flags
);
5732 dc
->dc_data
= (void *)dc
->dc_flags
;
5734 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRIER
);
5735 _dispatch_trace_continuation_push(dm
->_as_dq
, dc
);
5736 pp
= _dispatch_continuation_get_override_priority(dm
->_as_dq
, dc
);
5737 return _dispatch_mach_send_push(dm
, dc
, pp
);
5742 dispatch_mach_receive_barrier_f(dispatch_mach_t dm
, void *context
,
5743 dispatch_function_t func
)
5745 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5746 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5748 _dispatch_continuation_init_f(dc
, dm
, context
, func
, 0, 0, dc_flags
);
5749 dc
->dc_data
= (void *)dc
->dc_flags
;
5751 dc
->do_vtable
= DC_VTABLE(MACH_RECV_BARRIER
);
5752 return _dispatch_continuation_async(dm
->_as_dq
, dc
);
5757 dispatch_mach_receive_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
5759 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5760 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5762 _dispatch_continuation_init(dc
, dm
, barrier
, 0, 0, dc_flags
);
5763 dc
->dc_data
= (void *)dc
->dc_flags
;
5765 dc
->do_vtable
= DC_VTABLE(MACH_RECV_BARRIER
);
5766 return _dispatch_continuation_async(dm
->_as_dq
, dc
);
5771 _dispatch_mach_cancel_invoke(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
)
5773 dispatch_mach_refs_t dr
= dm
->ds_refs
;
5775 dispatch_invoke_with_autoreleasepool(flags
, {
5776 if (slowpath(!dm
->dm_connect_handler_called
)) {
5777 _dispatch_mach_connect_invoke(dm
);
5779 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5780 DISPATCH_MACH_CANCELED
, NULL
, 0, dr
->dm_handler_func
);
5782 dm
->dm_cancel_handler_called
= 1;
5783 _dispatch_release(dm
); // the retain is done at creation time
5788 dispatch_mach_cancel(dispatch_mach_t dm
)
5790 dispatch_source_cancel(dm
->_as_ds
);
5794 _dispatch_mach_install(dispatch_mach_t dm
, pthread_priority_t pp
)
5796 uint32_t disconnect_cnt
;
5799 _dispatch_source_kevent_register(dm
->_as_ds
, pp
);
5801 if (dm
->ds_is_direct_kevent
) {
5802 pp
&= (~_PTHREAD_PRIORITY_FLAGS_MASK
|
5803 _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
|
5804 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
5805 // _dispatch_mach_reply_kevent_register assumes this has been done
5806 // which is unlike regular sources or queues, the DEFAULTQUEUE flag
5807 // is used so that the priority of that channel doesn't act as a floor
5808 // QoS for incoming messages (26761457)
5809 dm
->dq_priority
= (dispatch_priority_t
)pp
;
5811 dm
->ds_is_installed
= true;
5812 if (unlikely(!os_atomic_cmpxchgv2o(dm
->dm_refs
, dm_disconnect_cnt
,
5813 DISPATCH_MACH_NEVER_INSTALLED
, 0, &disconnect_cnt
, release
))) {
5814 DISPATCH_INTERNAL_CRASH(disconnect_cnt
, "Channel already installed");
5819 _dispatch_mach_finalize_activation(dispatch_mach_t dm
)
5821 if (dm
->ds_is_direct_kevent
&& !dm
->ds_is_installed
) {
5822 dispatch_source_t ds
= dm
->_as_ds
;
5823 pthread_priority_t pp
= _dispatch_source_compute_kevent_priority(ds
);
5824 if (pp
) _dispatch_mach_install(dm
, pp
);
5828 _dispatch_queue_finalize_activation(dm
->_as_dq
);
5831 DISPATCH_ALWAYS_INLINE
5832 static inline dispatch_queue_t
5833 _dispatch_mach_invoke2(dispatch_object_t dou
, dispatch_invoke_flags_t flags
,
5834 uint64_t *owned
, struct dispatch_object_s
**dc_ptr DISPATCH_UNUSED
)
5836 dispatch_mach_t dm
= dou
._dm
;
5837 dispatch_queue_t retq
= NULL
;
5838 dispatch_queue_t dq
= _dispatch_queue_get_current();
5840 // This function performs all mach channel actions. Each action is
5841 // responsible for verifying that it takes place on the appropriate queue.
5842 // If the current queue is not the correct queue for this action, the
5843 // correct queue will be returned and the invoke will be re-driven on that
5846 // The order of tests here in invoke and in wakeup should be consistent.
5848 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5849 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
5851 if (dm
->ds_is_direct_kevent
) {
5852 dkq
= dm
->do_targetq
;
5855 if (slowpath(!dm
->ds_is_installed
)) {
5856 // The channel needs to be installed on the kevent queue.
5860 _dispatch_mach_install(dm
, _dispatch_get_defaultpriority());
5863 if (_dispatch_queue_class_probe(dm
)) {
5864 if (dq
== dm
->do_targetq
) {
5865 retq
= _dispatch_queue_serial_drain(dm
->_as_dq
, flags
, owned
, NULL
);
5867 retq
= dm
->do_targetq
;
5871 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
5874 bool requires_mgr
= dr
->dm_needs_mgr
|| (dr
->dm_disconnect_cnt
&&
5875 (dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
));
5876 if (!(dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) ||
5877 (dqf
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
5878 // The channel has pending messages to send.
5879 if (unlikely(requires_mgr
&& dq
!= &_dispatch_mgr_q
)) {
5880 return retq
? retq
: &_dispatch_mgr_q
;
5882 dispatch_mach_send_invoke_flags_t send_flags
= DM_SEND_INVOKE_NONE
;
5883 if (dq
!= &_dispatch_mgr_q
) {
5884 send_flags
|= DM_SEND_INVOKE_CAN_RUN_BARRIER
;
5886 _dispatch_mach_send_invoke(dm
, flags
, send_flags
);
5888 } else if (dqf
& DSF_CANCELED
) {
5889 // The channel has been cancelled and needs to be uninstalled from the
5890 // manager queue. After uninstallation, the cancellation handler needs
5891 // to be delivered to the target queue.
5892 if ((dqf
& DSF_STATE_MASK
) == (DSF_ARMED
| DSF_DEFERRED_DELETE
)) {
5893 // waiting for the delivery of a deferred delete event
5896 if ((dqf
& DSF_STATE_MASK
) != DSF_DELETED
) {
5897 if (dq
!= &_dispatch_mgr_q
) {
5898 return retq
? retq
: &_dispatch_mgr_q
;
5900 _dispatch_mach_send_invoke(dm
, flags
, DM_SEND_INVOKE_CANCEL
);
5901 dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
5902 if (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
5903 // waiting for the delivery of a deferred delete event
5904 // or deletion didn't happen because send_invoke couldn't
5905 // acquire the send lock
5909 if (!dm
->dm_cancel_handler_called
) {
5910 if (dq
!= dm
->do_targetq
) {
5911 return retq
? retq
: dm
->do_targetq
;
5913 _dispatch_mach_cancel_invoke(dm
, flags
);
5922 _dispatch_mach_invoke(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
)
5924 _dispatch_queue_class_invoke(dm
, flags
, _dispatch_mach_invoke2
);
5928 _dispatch_mach_wakeup(dispatch_mach_t dm
, pthread_priority_t pp
,
5929 dispatch_wakeup_flags_t flags
)
5931 // This function determines whether the mach channel needs to be invoked.
5932 // The order of tests here in probe and in invoke should be consistent.
5934 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5935 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
5936 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
5937 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
5939 if (dm
->ds_is_direct_kevent
) {
5940 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
5943 if (!dm
->ds_is_installed
) {
5944 // The channel needs to be installed on the kevent queue.
5949 if (_dispatch_queue_class_probe(dm
)) {
5950 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
5954 if (_dispatch_lock_is_locked(dr
->dm_state_lock
.dul_lock
)) {
5955 // Sending and uninstallation below require the send lock, the channel
5956 // will be woken up when the lock is dropped <rdar://15132939&15203957>
5957 _dispatch_queue_reinstate_override_priority(dm
, (dispatch_priority_t
)pp
);
5962 bool requires_mgr
= dr
->dm_needs_mgr
|| (dr
->dm_disconnect_cnt
&&
5963 (dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
));
5964 if (!(dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) ||
5965 (dqf
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
5966 if (unlikely(requires_mgr
)) {
5967 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
5969 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
5972 // can happen when we can't send because the port is full
5973 // but we should not lose the override
5974 _dispatch_queue_reinstate_override_priority(dm
,
5975 (dispatch_priority_t
)pp
);
5977 } else if (dqf
& DSF_CANCELED
) {
5978 if ((dqf
& DSF_STATE_MASK
) == (DSF_ARMED
| DSF_DEFERRED_DELETE
)) {
5979 // waiting for the delivery of a deferred delete event
5980 } else if ((dqf
& DSF_STATE_MASK
) != DSF_DELETED
) {
5981 // The channel needs to be uninstalled from the manager queue
5982 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
5983 } else if (!dm
->dm_cancel_handler_called
) {
5984 // the cancellation handler needs to be delivered to the target
5986 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
5992 return _dispatch_queue_class_wakeup(dm
->_as_dq
, pp
, flags
, tq
);
5994 return _dispatch_queue_class_override_drainer(dm
->_as_dq
, pp
, flags
);
5995 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5996 return _dispatch_release_tailcall(dm
);
6001 #pragma mark dispatch_mach_msg_t
6004 dispatch_mach_msg_create(mach_msg_header_t
*msg
, size_t size
,
6005 dispatch_mach_msg_destructor_t destructor
, mach_msg_header_t
**msg_ptr
)
6007 if (slowpath(size
< sizeof(mach_msg_header_t
)) ||
6008 slowpath(destructor
&& !msg
)) {
6009 DISPATCH_CLIENT_CRASH(size
, "Empty message");
6011 dispatch_mach_msg_t dmsg
= _dispatch_alloc(DISPATCH_VTABLE(mach_msg
),
6012 sizeof(struct dispatch_mach_msg_s
) +
6013 (destructor
? 0 : size
- sizeof(dmsg
->dmsg_msg
)));
6015 dmsg
->dmsg_msg
= msg
;
6017 memcpy(dmsg
->dmsg_buf
, msg
, size
);
6019 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
6020 dmsg
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
6022 dmsg
->dmsg_destructor
= destructor
;
6023 dmsg
->dmsg_size
= size
;
6025 *msg_ptr
= _dispatch_mach_msg_get_msg(dmsg
);
6031 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg
)
6033 if (dmsg
->dmsg_voucher
) {
6034 _voucher_release(dmsg
->dmsg_voucher
);
6035 dmsg
->dmsg_voucher
= NULL
;
6037 switch (dmsg
->dmsg_destructor
) {
6038 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
6040 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE
:
6041 free(dmsg
->dmsg_msg
);
6043 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE
: {
6044 mach_vm_size_t vm_size
= dmsg
->dmsg_size
;
6045 mach_vm_address_t vm_addr
= (uintptr_t)dmsg
->dmsg_msg
;
6046 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
6052 static inline mach_msg_header_t
*
6053 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
)
6055 return dmsg
->dmsg_destructor
? dmsg
->dmsg_msg
:
6056 (mach_msg_header_t
*)dmsg
->dmsg_buf
;
6060 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
, size_t *size_ptr
)
6063 *size_ptr
= dmsg
->dmsg_size
;
6065 return _dispatch_mach_msg_get_msg(dmsg
);
6069 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg
, char* buf
, size_t bufsiz
)
6072 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6073 dx_kind(dmsg
), dmsg
);
6074 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "xrefcnt = 0x%x, "
6075 "refcnt = 0x%x, ", dmsg
->do_xref_cnt
+ 1, dmsg
->do_ref_cnt
+ 1);
6076 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "opts/err = 0x%x, "
6077 "msgh[%p] = { ", dmsg
->dmsg_options
, dmsg
->dmsg_buf
);
6078 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
6080 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "id 0x%x, ",
6083 if (hdr
->msgh_size
) {
6084 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "size %u, ",
6087 if (hdr
->msgh_bits
) {
6088 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "bits <l %u, r %u",
6089 MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
),
6090 MACH_MSGH_BITS_REMOTE(hdr
->msgh_bits
));
6091 if (MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
)) {
6092 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", o 0x%x",
6093 MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
));
6095 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ">, ");
6097 if (hdr
->msgh_local_port
&& hdr
->msgh_remote_port
) {
6098 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x, "
6099 "remote 0x%x", hdr
->msgh_local_port
, hdr
->msgh_remote_port
);
6100 } else if (hdr
->msgh_local_port
) {
6101 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x",
6102 hdr
->msgh_local_port
);
6103 } else if (hdr
->msgh_remote_port
) {
6104 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "remote 0x%x",
6105 hdr
->msgh_remote_port
);
6107 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "no ports");
6109 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, " } }");
6114 #pragma mark dispatch_mig_server
6117 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
,
6118 dispatch_mig_callback_t callback
)
6120 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
6121 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
6122 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
) | MACH_RCV_VOUCHER
;
6123 mach_msg_options_t tmp_options
;
6124 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
6125 mach_msg_return_t kr
= 0;
6126 uint64_t assertion_token
= 0;
6127 unsigned int cnt
= 1000; // do not stall out serial queues
6128 boolean_t demux_success
;
6129 bool received
= false;
6130 size_t rcv_size
= maxmsgsz
+ MAX_TRAILER_SIZE
;
6132 bufRequest
= alloca(rcv_size
);
6133 bufRequest
->RetCode
= 0;
6134 for (mach_vm_address_t p
= mach_vm_trunc_page(bufRequest
+ vm_page_size
);
6135 p
< (mach_vm_address_t
)bufRequest
+ rcv_size
; p
+= vm_page_size
) {
6136 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
6139 bufReply
= alloca(rcv_size
);
6140 bufReply
->Head
.msgh_size
= 0;
6141 for (mach_vm_address_t p
= mach_vm_trunc_page(bufReply
+ vm_page_size
);
6142 p
< (mach_vm_address_t
)bufReply
+ rcv_size
; p
+= vm_page_size
) {
6143 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
6147 options
|= MACH_RCV_LARGE
; // rdar://problem/8422992
6149 tmp_options
= options
;
6150 // XXX FIXME -- change this to not starve out the target queue
6152 if (DISPATCH_QUEUE_IS_SUSPENDED(ds
) || (--cnt
== 0)) {
6153 options
&= ~MACH_RCV_MSG
;
6154 tmp_options
&= ~MACH_RCV_MSG
;
6156 if (!(tmp_options
& MACH_SEND_MSG
)) {
6160 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
6161 (mach_msg_size_t
)rcv_size
, (mach_port_t
)ds
->ds_ident_hack
, 0,0);
6163 tmp_options
= options
;
6167 case MACH_SEND_INVALID_DEST
:
6168 case MACH_SEND_TIMED_OUT
:
6169 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
6170 mach_msg_destroy(&bufReply
->Head
);
6173 case MACH_RCV_TIMED_OUT
:
6174 // Don't return an error if a message was sent this time or
6175 // a message was successfully received previously
6176 // rdar://problems/7363620&7791738
6177 if(bufReply
->Head
.msgh_remote_port
|| received
) {
6178 kr
= MACH_MSG_SUCCESS
;
6181 case MACH_RCV_INVALID_NAME
:
6184 case MACH_RCV_TOO_LARGE
:
6185 // receive messages that are too large and log their id and size
6186 // rdar://problem/8422992
6187 tmp_options
&= ~MACH_RCV_LARGE
;
6188 size_t large_size
= bufReply
->Head
.msgh_size
+ MAX_TRAILER_SIZE
;
6189 void *large_buf
= malloc(large_size
);
6191 rcv_size
= large_size
;
6192 bufReply
= large_buf
;
6194 if (!mach_msg(&bufReply
->Head
, tmp_options
, 0,
6195 (mach_msg_size_t
)rcv_size
,
6196 (mach_port_t
)ds
->ds_ident_hack
, 0, 0)) {
6197 _dispatch_log("BUG in libdispatch client: "
6198 "dispatch_mig_server received message larger than "
6199 "requested size %zd: id = 0x%x, size = %d",
6200 maxmsgsz
, bufReply
->Head
.msgh_id
,
6201 bufReply
->Head
.msgh_size
);
6209 _dispatch_bug_mach_client(
6210 "dispatch_mig_server: mach_msg() failed", kr
);
6216 if (!(tmp_options
& MACH_RCV_MSG
)) {
6220 if (assertion_token
) {
6221 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6222 int r
= proc_importance_assertion_complete(assertion_token
);
6223 (void)dispatch_assume_zero(r
);
6225 assertion_token
= 0;
6229 bufTemp
= bufRequest
;
6230 bufRequest
= bufReply
;
6233 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6234 #pragma clang diagnostic push
6235 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
6236 int r
= proc_importance_assertion_begin_with_msg(&bufRequest
->Head
,
6237 NULL
, &assertion_token
);
6238 if (r
&& slowpath(r
!= EIO
)) {
6239 (void)dispatch_assume_zero(r
);
6241 #pragma clang diagnostic pop
6243 _voucher_replace(voucher_create_with_mach_msg(&bufRequest
->Head
));
6244 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
6246 if (!demux_success
) {
6247 // destroy the request - but not the reply port
6248 bufRequest
->Head
.msgh_remote_port
= 0;
6249 mach_msg_destroy(&bufRequest
->Head
);
6250 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
6251 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
6253 if (slowpath(bufReply
->RetCode
)) {
6254 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
6258 // destroy the request - but not the reply port
6259 bufRequest
->Head
.msgh_remote_port
= 0;
6260 mach_msg_destroy(&bufRequest
->Head
);
6264 if (bufReply
->Head
.msgh_remote_port
) {
6265 tmp_options
|= MACH_SEND_MSG
;
6266 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) !=
6267 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
6268 tmp_options
|= MACH_SEND_TIMEOUT
;
6274 if (assertion_token
) {
6275 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6276 int r
= proc_importance_assertion_complete(assertion_token
);
6277 (void)dispatch_assume_zero(r
);
6284 #endif /* HAVE_MACH */
6287 #pragma mark dispatch_source_debug
6291 _evfiltstr(short filt
)
6294 #define _evfilt2(f) case (f): return #f
6295 _evfilt2(EVFILT_READ
);
6296 _evfilt2(EVFILT_WRITE
);
6297 _evfilt2(EVFILT_AIO
);
6298 _evfilt2(EVFILT_VNODE
);
6299 _evfilt2(EVFILT_PROC
);
6300 _evfilt2(EVFILT_SIGNAL
);
6301 _evfilt2(EVFILT_TIMER
);
6303 _evfilt2(EVFILT_MACHPORT
);
6304 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION
);
6306 _evfilt2(EVFILT_FS
);
6307 _evfilt2(EVFILT_USER
);
6309 _evfilt2(EVFILT_SOCK
);
6311 #ifdef EVFILT_MEMORYSTATUS
6312 _evfilt2(EVFILT_MEMORYSTATUS
);
6315 _evfilt2(DISPATCH_EVFILT_TIMER
);
6316 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
6317 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
6319 return "EVFILT_missing";
6325 _evflagstr2(uint16_t *flagsp
)
6327 #define _evflag2(f) \
6328 if ((*flagsp & (f)) == (f) && (f)) { \
6333 _evflag2(EV_DELETE
);
6334 _evflag2(EV_ENABLE
);
6335 _evflag2(EV_DISABLE
);
6336 _evflag2(EV_ONESHOT
);
6338 _evflag2(EV_RECEIPT
);
6339 _evflag2(EV_DISPATCH
);
6340 _evflag2(EV_UDATA_SPECIFIC
);
6345 _evflag2(EV_OOBAND
);
6349 _evflag2(EV_VANISHED
);
6351 return "EV_UNKNOWN ";
6356 _evflagstr(uint16_t flags
, char *str
, size_t strsize
)
6360 strlcat(str
, _evflagstr2(&flags
), strsize
);
6362 size_t sz
= strlen(str
);
6363 if (sz
) str
[sz
-1] = 0;
6369 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6371 dispatch_queue_t target
= ds
->do_targetq
;
6372 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%lx, "
6373 "mask = 0x%lx, pending_data = 0x%lx, registered = %d, "
6374 "armed = %d, deleted = %d%s, canceled = %d, ",
6375 target
&& target
->dq_label
? target
->dq_label
: "", target
,
6376 ds
->ds_ident_hack
, ds
->ds_pending_data_mask
, ds
->ds_pending_data
,
6377 ds
->ds_is_installed
, (bool)(ds
->dq_atomic_flags
& DSF_ARMED
),
6378 (bool)(ds
->dq_atomic_flags
& DSF_DELETED
),
6379 (ds
->dq_atomic_flags
& DSF_DEFERRED_DELETE
) ? " (pending)" : "",
6380 (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
));
6384 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6386 dispatch_source_refs_t dr
= ds
->ds_refs
;
6387 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx"
6388 ", last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
6389 (unsigned long long)ds_timer(dr
).target
,
6390 (unsigned long long)ds_timer(dr
).deadline
,
6391 (unsigned long long)ds_timer(dr
).last_fire
,
6392 (unsigned long long)ds_timer(dr
).interval
, ds_timer(dr
).flags
);
6396 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6399 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6401 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6402 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6403 if (ds
->ds_is_timer
) {
6404 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6409 } else if (ds
->ds_is_custom_source
) {
6410 filter
= _evfiltstr((int16_t)(uintptr_t)ds
->ds_dkev
);
6412 filter
= _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
);
6414 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "kevent = %p%s, "
6415 "filter = %s }", ds
->ds_dkev
, ds
->ds_is_direct_kevent
? " (direct)"
6422 _dispatch_mach_debug_attr(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
6424 dispatch_queue_t target
= dm
->do_targetq
;
6425 return dsnprintf(buf
, bufsiz
, "target = %s[%p], receive = 0x%x, "
6426 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
6427 "send state = %016llx, disconnected = %d, canceled = %d ",
6428 target
&& target
->dq_label
? target
->dq_label
: "", target
,
6429 dm
->ds_dkev
?(mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
:0,
6430 dm
->dm_refs
->dm_send
,
6431 dm
->dm_dkev
?(mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
:0,
6432 dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
) ?
6433 " (armed)" : "", dm
->dm_refs
->dm_checkin_port
,
6434 dm
->dm_refs
->dm_checkin
? " (pending)" : "",
6435 dm
->dm_refs
->dm_state
, dm
->dm_refs
->dm_disconnect_cnt
,
6436 (bool)(dm
->dq_atomic_flags
& DSF_CANCELED
));
6440 _dispatch_mach_debug(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
6443 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6444 dm
->dq_label
&& !dm
->dm_cancel_handler_called
? dm
->dq_label
:
6446 offset
+= _dispatch_object_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
6447 offset
+= _dispatch_mach_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
6448 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
6456 dispatch_kevent_debug(const char *verb
, const _dispatch_kevent_qos_s
*kev
,
6457 int i
, int n
, const char *function
, unsigned int line
)
6463 snprintf(i_n
, sizeof(i_n
), "%d/%d ", i
+ 1, n
);
6467 #if DISPATCH_USE_KEVENT_QOS
6468 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6469 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6470 "qos = 0x%x, ext[0] = 0x%llx, ext[1] = 0x%llx, ext[2] = 0x%llx, "
6471 "ext[3] = 0x%llx }: %s #%u", verb
, kev
, i_n
, kev
->ident
,
6472 _evfiltstr(kev
->filter
), _evflagstr(kev
->flags
, flagstr
,
6473 sizeof(flagstr
)), kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
,
6474 kev
->qos
, kev
->ext
[0], kev
->ext
[1], kev
->ext
[2], kev
->ext
[3],
6477 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6478 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6479 "ext[0] = 0x%llx, ext[1] = 0x%llx }: %s #%u", verb
, kev
, i_n
,
6480 kev
->ident
, _evfiltstr(kev
->filter
), _evflagstr(kev
->flags
, flagstr
,
6481 sizeof(flagstr
)), kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
,
6482 #ifndef IGNORE_KEVENT64_EXT
6483 kev
->ext
[0], kev
->ext
[1],
6493 #ifndef MACH_PORT_TYPE_SPREQUEST
6494 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
6499 dispatch_debug_machport(mach_port_t name
, const char* str
)
6501 mach_port_type_t type
;
6502 mach_msg_bits_t ns
= 0, nr
= 0, nso
= 0, nd
= 0;
6503 unsigned int dnreqs
= 0, dnrsiz
;
6504 kern_return_t kr
= mach_port_type(mach_task_self(), name
, &type
);
6506 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name
,
6507 kr
, mach_error_string(kr
), str
);
6510 if (type
& MACH_PORT_TYPE_SEND
) {
6511 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6512 MACH_PORT_RIGHT_SEND
, &ns
));
6514 if (type
& MACH_PORT_TYPE_SEND_ONCE
) {
6515 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6516 MACH_PORT_RIGHT_SEND_ONCE
, &nso
));
6518 if (type
& MACH_PORT_TYPE_DEAD_NAME
) {
6519 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6520 MACH_PORT_RIGHT_DEAD_NAME
, &nd
));
6522 if (type
& (MACH_PORT_TYPE_RECEIVE
|MACH_PORT_TYPE_SEND
)) {
6523 kr
= mach_port_dnrequest_info(mach_task_self(), name
, &dnrsiz
, &dnreqs
);
6524 if (kr
!= KERN_INVALID_RIGHT
) (void)dispatch_assume_zero(kr
);
6526 if (type
& MACH_PORT_TYPE_RECEIVE
) {
6527 mach_port_status_t status
= { .mps_pset
= 0, };
6528 mach_msg_type_number_t cnt
= MACH_PORT_RECEIVE_STATUS_COUNT
;
6529 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6530 MACH_PORT_RIGHT_RECEIVE
, &nr
));
6531 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
6532 name
, MACH_PORT_RECEIVE_STATUS
, (void*)&status
, &cnt
));
6533 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6534 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
6535 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
6536 "seqno(%03u) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
6537 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N",
6538 status
.mps_nsrequest
? "Y":"N", status
.mps_pdrequest
? "Y":"N",
6539 status
.mps_srights
? "Y":"N", status
.mps_sorights
,
6540 status
.mps_qlimit
, status
.mps_msgcount
, status
.mps_mscount
,
6541 status
.mps_seqno
, str
);
6542 } else if (type
& (MACH_PORT_TYPE_SEND
|MACH_PORT_TYPE_SEND_ONCE
|
6543 MACH_PORT_TYPE_DEAD_NAME
)) {
6544 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6545 "dnreqs(%03u) spreq(%s) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
6546 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N", str
);
6548 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name
, type
,
6555 #endif // DISPATCH_DEBUG