2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 #include "protocolServer.h"
26 #include <sys/mount.h>
28 #define DKEV_DISPOSE_IMMEDIATE_DELETE 0x1
29 #define DKEV_UNREGISTER_DISCONNECTED 0x2
30 #define DKEV_UNREGISTER_REPLY_REMOVE 0x4
31 #define DKEV_UNREGISTER_WAKEUP 0x8
33 static void _dispatch_source_handler_free(dispatch_source_t ds
, long kind
);
34 static void _dispatch_source_merge_kevent(dispatch_source_t ds
,
35 const _dispatch_kevent_qos_s
*ke
);
36 static bool _dispatch_kevent_register(dispatch_kevent_t
*dkp
,
37 pthread_priority_t pp
, uint32_t *flgp
);
38 static long _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
,
39 unsigned int options
);
40 static long _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
42 static void _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
);
43 static void _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
);
44 static void _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
);
45 static void _dispatch_timers_unregister(dispatch_source_t ds
,
46 dispatch_kevent_t dk
);
47 static void _dispatch_timers_update(dispatch_source_t ds
);
48 static void _dispatch_timer_aggregates_check(void);
49 static void _dispatch_timer_aggregates_register(dispatch_source_t ds
);
50 static void _dispatch_timer_aggregates_update(dispatch_source_t ds
,
52 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds
,
54 static inline unsigned long _dispatch_source_timer_data(
55 dispatch_source_refs_t dr
, unsigned long prev
);
56 static void _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s
*ke
);
57 static long _dispatch_kq_immediate_update(_dispatch_kevent_qos_s
*ke
);
58 static void _dispatch_memorypressure_init(void);
60 static void _dispatch_mach_host_calendar_change_register(void);
61 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
62 static void _dispatch_mach_recv_msg_buf_init(void);
63 static kern_return_t
_dispatch_kevent_machport_resume(dispatch_kevent_t dk
,
64 uint32_t new_flags
, uint32_t del_flags
);
66 static kern_return_t
_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
,
67 uint32_t new_flags
, uint32_t del_flags
);
68 static void _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s
*ke
);
69 static mach_msg_size_t
_dispatch_kevent_mach_msg_size(
70 _dispatch_kevent_qos_s
*ke
);
72 static inline void _dispatch_mach_host_calendar_change_register(void) {}
73 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
75 static const char * _evfiltstr(short filt
);
77 static void dispatch_kevent_debug(const char *verb
,
78 const _dispatch_kevent_qos_s
*kev
, int i
, int n
,
79 const char *function
, unsigned int line
);
80 static void _dispatch_kevent_debugger(void *context
);
81 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
82 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
85 dispatch_kevent_debug(const char *verb
, const _dispatch_kevent_qos_s
*kev
,
86 int i
, int n
, const char *function
, unsigned int line
)
88 (void)verb
; (void)kev
; (void)i
; (void)n
; (void)function
; (void)line
;
90 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
92 #define _dispatch_kevent_debug(verb, _kev) \
93 dispatch_kevent_debug(verb, _kev, 0, 1, __FUNCTION__, __LINE__)
94 #define _dispatch_kevent_debug_n(verb, _kev, i, n) \
95 dispatch_kevent_debug(verb, _kev, i, n, __FUNCTION__, __LINE__)
96 #ifndef DISPATCH_MGR_QUEUE_DEBUG
97 #define DISPATCH_MGR_QUEUE_DEBUG 0
99 #if DISPATCH_MGR_QUEUE_DEBUG
100 #define _dispatch_kevent_mgr_debug _dispatch_kevent_debug
103 _dispatch_kevent_mgr_debug(_dispatch_kevent_qos_s
* kev DISPATCH_UNUSED
) {}
107 #pragma mark dispatch_source_t
110 dispatch_source_create(dispatch_source_type_t type
, uintptr_t handle
,
111 unsigned long mask
, dispatch_queue_t dq
)
113 // ensure _dispatch_evfilt_machport_direct_enabled is initialized
114 _dispatch_root_queues_init();
115 const _dispatch_kevent_qos_s
*proto_kev
= &type
->ke
;
116 dispatch_source_t ds
;
117 dispatch_kevent_t dk
;
120 if (type
== NULL
|| (mask
& ~type
->mask
)) {
121 return DISPATCH_BAD_INPUT
;
123 if (type
->mask
&& !mask
) {
124 // expect a non-zero mask when the type declares one ... except
125 switch (type
->ke
.filter
) {
126 case DISPATCH_EVFILT_TIMER
:
127 break; // timers don't need masks
128 #if DISPATCH_USE_VM_PRESSURE
130 break; // type->init forces the only acceptable mask
132 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
133 break; // type->init handles zero mask as a legacy case
135 // otherwise reject as invalid input
136 return DISPATCH_BAD_INPUT
;
140 switch (type
->ke
.filter
) {
142 if (handle
>= NSIG
) {
143 return DISPATCH_BAD_INPUT
;
147 #if DISPATCH_USE_VM_PRESSURE
150 #if DISPATCH_USE_MEMORYSTATUS
151 case EVFILT_MEMORYSTATUS
:
153 case DISPATCH_EVFILT_CUSTOM_ADD
:
154 case DISPATCH_EVFILT_CUSTOM_OR
:
156 return DISPATCH_BAD_INPUT
;
159 case DISPATCH_EVFILT_TIMER
:
160 if ((handle
== 0) != (type
->ke
.ident
== 0)) {
161 return DISPATCH_BAD_INPUT
;
168 ds
= _dispatch_alloc(DISPATCH_VTABLE(source
),
169 sizeof(struct dispatch_source_s
));
170 // Initialize as a queue first, then override some settings below.
171 _dispatch_queue_init(ds
->_as_dq
, DQF_NONE
, 1, true);
172 ds
->dq_label
= "source";
173 ds
->do_ref_cnt
++; // the reference the manager queue holds
175 switch (type
->ke
.filter
) {
176 case DISPATCH_EVFILT_CUSTOM_OR
:
177 dk
= DISPATCH_KEV_CUSTOM_OR
;
179 case DISPATCH_EVFILT_CUSTOM_ADD
:
180 dk
= DISPATCH_KEV_CUSTOM_ADD
;
183 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
184 dk
->dk_kevent
= *proto_kev
;
185 dk
->dk_kevent
.ident
= handle
;
186 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
187 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
188 dk
->dk_kevent
.udata
= (_dispatch_kevent_qos_udata_t
)dk
;
189 TAILQ_INIT(&dk
->dk_sources
);
191 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
192 ds
->ds_ident_hack
= (uintptr_t)dk
->dk_kevent
.ident
;
193 if (EV_UDATA_SPECIFIC
& proto_kev
->flags
) {
194 dk
->dk_kevent
.flags
|= EV_DISPATCH
;
195 ds
->ds_is_direct_kevent
= true;
196 ds
->ds_needs_rearm
= true;
202 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
203 ds
->ds_needs_rearm
= true;
204 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
205 // we cheat and use EV_CLEAR to mean a "flag thingy"
206 ds
->ds_is_adder
= true;
208 // Some sources require special processing
209 if (type
->init
!= NULL
) {
210 type
->init(ds
, type
, handle
, mask
, dq
);
212 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
213 if (!ds
->ds_is_custom_source
&& (dk
->dk_kevent
.flags
& EV_VANISHED
)) {
214 // see _dispatch_source_merge_kevent
215 dispatch_assert(!(dk
->dk_kevent
.flags
& EV_ONESHOT
));
216 dispatch_assert(dk
->dk_kevent
.flags
& EV_DISPATCH
);
217 dispatch_assert(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
);
220 if (fastpath(!ds
->ds_refs
)) {
221 ds
->ds_refs
= _dispatch_calloc(1ul,
222 sizeof(struct dispatch_source_refs_s
));
224 ds
->ds_refs
->dr_source_wref
= _dispatch_ptr2wref(ds
);
227 dq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
229 _dispatch_retain(dq
);
232 _dispatch_object_debug(ds
, "%s", __func__
);
237 _dispatch_source_dispose(dispatch_source_t ds
)
239 _dispatch_object_debug(ds
, "%s", __func__
);
240 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
241 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
242 _dispatch_source_handler_free(ds
, DS_CANCEL_HANDLER
);
244 _dispatch_queue_destroy(ds
->_as_dq
);
248 _dispatch_source_xref_dispose(dispatch_source_t ds
)
250 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
);
254 dispatch_source_testcancel(dispatch_source_t ds
)
256 return (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
);
260 dispatch_source_get_mask(dispatch_source_t ds
)
262 unsigned long mask
= ds
->ds_pending_data_mask
;
263 if (ds
->ds_vmpressure_override
) {
264 mask
= NOTE_VM_PRESSURE
;
266 #if TARGET_IPHONE_SIMULATOR
267 else if (ds
->ds_memorypressure_override
) {
268 mask
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
275 dispatch_source_get_handle(dispatch_source_t ds
)
277 unsigned int handle
= (unsigned int)ds
->ds_ident_hack
;
278 #if TARGET_IPHONE_SIMULATOR
279 if (ds
->ds_memorypressure_override
) {
287 dispatch_source_get_data(dispatch_source_t ds
)
289 unsigned long data
= ds
->ds_data
;
290 if (ds
->ds_vmpressure_override
) {
291 data
= NOTE_VM_PRESSURE
;
293 #if TARGET_IPHONE_SIMULATOR
294 else if (ds
->ds_memorypressure_override
) {
295 data
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
301 DISPATCH_ALWAYS_INLINE
303 _dispatch_source_merge_data2(dispatch_source_t ds
,
304 pthread_priority_t pp
, unsigned long val
)
306 _dispatch_kevent_qos_s kev
= {
307 .fflags
= (typeof(kev
.fflags
))val
,
308 .data
= (typeof(kev
.data
))val
,
309 #if DISPATCH_USE_KEVENT_QOS
310 .qos
= (_dispatch_kevent_priority_t
)pp
,
313 #if !DISPATCH_USE_KEVENT_QOS
317 dispatch_assert(ds
->ds_dkev
== DISPATCH_KEV_CUSTOM_OR
||
318 ds
->ds_dkev
== DISPATCH_KEV_CUSTOM_ADD
);
319 _dispatch_kevent_debug("synthetic data", &kev
);
320 _dispatch_source_merge_kevent(ds
, &kev
);
324 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
326 _dispatch_source_merge_data2(ds
, 0, val
);
330 _dispatch_source_merge_data(dispatch_source_t ds
, pthread_priority_t pp
,
333 _dispatch_source_merge_data2(ds
, pp
, val
);
337 #pragma mark dispatch_source_handler
339 DISPATCH_ALWAYS_INLINE
340 static inline dispatch_continuation_t
341 _dispatch_source_get_handler(dispatch_source_refs_t dr
, long kind
)
343 return os_atomic_load(&dr
->ds_handler
[kind
], relaxed
);
345 #define _dispatch_source_get_event_handler(dr) \
346 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
347 #define _dispatch_source_get_cancel_handler(dr) \
348 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
349 #define _dispatch_source_get_registration_handler(dr) \
350 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
352 DISPATCH_ALWAYS_INLINE
353 static inline dispatch_continuation_t
354 _dispatch_source_handler_alloc(dispatch_source_t ds
, void *func
, long kind
,
357 // sources don't propagate priority by default
358 const dispatch_block_flags_t flags
=
359 DISPATCH_BLOCK_HAS_PRIORITY
| DISPATCH_BLOCK_NO_VOUCHER
;
360 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
362 uintptr_t dc_flags
= 0;
364 if (kind
!= DS_EVENT_HANDLER
) {
365 dc_flags
|= DISPATCH_OBJ_CONSUME_BIT
;
369 _dispatch_continuation_init(dc
, ds
, func
, 0, flags
, dc_flags
);
370 #endif /* __BLOCKS__ */
372 dc_flags
|= DISPATCH_OBJ_CTXT_FETCH_BIT
;
373 _dispatch_continuation_init_f(dc
, ds
, ds
->do_ctxt
, func
,
376 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
386 _dispatch_source_handler_dispose(dispatch_continuation_t dc
)
389 if (dc
->dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
390 Block_release(dc
->dc_ctxt
);
392 #endif /* __BLOCKS__ */
393 if (dc
->dc_voucher
) {
394 _voucher_release(dc
->dc_voucher
);
395 dc
->dc_voucher
= VOUCHER_INVALID
;
397 _dispatch_continuation_free(dc
);
400 DISPATCH_ALWAYS_INLINE
401 static inline dispatch_continuation_t
402 _dispatch_source_handler_take(dispatch_source_t ds
, long kind
)
404 return os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], NULL
, relaxed
);
407 DISPATCH_ALWAYS_INLINE
409 _dispatch_source_handler_free(dispatch_source_t ds
, long kind
)
411 dispatch_continuation_t dc
= _dispatch_source_handler_take(ds
, kind
);
412 if (dc
) _dispatch_source_handler_dispose(dc
);
415 DISPATCH_ALWAYS_INLINE
417 _dispatch_source_handler_replace(dispatch_source_t ds
, long kind
,
418 dispatch_continuation_t dc
)
421 _dispatch_continuation_free(dc
);
423 } else if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
424 dc
->dc_ctxt
= ds
->do_ctxt
;
426 dc
= os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], dc
, release
);
427 if (dc
) _dispatch_source_handler_dispose(dc
);
432 _dispatch_source_set_handler_slow(void *context
)
434 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
435 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
437 dispatch_continuation_t dc
= context
;
438 long kind
= (long)dc
->dc_data
;
440 _dispatch_source_handler_replace(ds
, kind
, dc
);
445 _dispatch_source_set_handler(dispatch_source_t ds
, long kind
,
446 dispatch_continuation_t dc
)
448 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
449 if (_dispatch_queue_try_inactive_suspend(ds
->_as_dq
)) {
450 _dispatch_source_handler_replace(ds
, kind
, dc
);
451 return dx_vtable(ds
)->do_resume(ds
, false);
453 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation
, ds
);
454 if (kind
== DS_REGISTN_HANDLER
) {
455 _dispatch_bug_deprecated("Setting registration handler after "
456 "the source has been activated");
458 dc
->dc_data
= (void *)kind
;
459 _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, dc
,
460 _dispatch_source_set_handler_slow
);
465 dispatch_source_set_event_handler(dispatch_source_t ds
,
466 dispatch_block_t handler
)
468 dispatch_continuation_t dc
;
469 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, true);
470 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
472 #endif /* __BLOCKS__ */
475 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
476 dispatch_function_t handler
)
478 dispatch_continuation_t dc
;
479 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
480 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
484 _dispatch_source_set_event_handler_continuation(dispatch_source_t ds
,
485 dispatch_continuation_t dc
)
487 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
488 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
493 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
494 dispatch_block_t handler
)
496 dispatch_continuation_t dc
;
497 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, true);
498 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
500 #endif /* __BLOCKS__ */
503 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
504 dispatch_function_t handler
)
506 dispatch_continuation_t dc
;
507 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, false);
508 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
513 dispatch_source_set_registration_handler(dispatch_source_t ds
,
514 dispatch_block_t handler
)
516 dispatch_continuation_t dc
;
517 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, true);
518 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
520 #endif /* __BLOCKS__ */
523 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
524 dispatch_function_t handler
)
526 dispatch_continuation_t dc
;
527 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, false);
528 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
532 #pragma mark dispatch_source_invoke
535 _dispatch_source_registration_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
536 dispatch_invoke_flags_t flags
)
538 dispatch_continuation_t dc
;
540 dc
= _dispatch_source_handler_take(ds
, DS_REGISTN_HANDLER
);
541 if (ds
->dq_atomic_flags
& (DSF_CANCELED
| DQF_RELEASED
)) {
542 // no registration callout if source is canceled rdar://problem/8955246
543 return _dispatch_source_handler_dispose(dc
);
545 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
546 dc
->dc_ctxt
= ds
->do_ctxt
;
548 _dispatch_continuation_pop(dc
, cq
, flags
);
552 _dispatch_source_cancel_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
553 dispatch_invoke_flags_t flags
)
555 dispatch_continuation_t dc
;
557 dc
= _dispatch_source_handler_take(ds
, DS_CANCEL_HANDLER
);
558 ds
->ds_pending_data_mask
= 0;
559 ds
->ds_pending_data
= 0;
561 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
562 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
566 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
567 return _dispatch_source_handler_dispose(dc
);
569 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
570 dc
->dc_ctxt
= ds
->do_ctxt
;
572 _dispatch_continuation_pop(dc
, cq
, flags
);
576 _dispatch_source_latch_and_call(dispatch_source_t ds
, dispatch_queue_t cq
,
577 dispatch_invoke_flags_t flags
)
581 dispatch_source_refs_t dr
= ds
->ds_refs
;
582 dispatch_continuation_t dc
= _dispatch_source_get_handler(dr
, DS_EVENT_HANDLER
);
583 prev
= os_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
584 if (ds
->ds_is_level
) {
586 } else if (ds
->ds_is_timer
&& ds_timer(dr
).target
&& prev
) {
587 ds
->ds_data
= _dispatch_source_timer_data(dr
, prev
);
591 if (!dispatch_assume(prev
) || !dc
) {
594 _dispatch_continuation_pop(dc
, cq
, flags
);
595 if (ds
->ds_is_timer
&& (ds_timer(dr
).flags
& DISPATCH_TIMER_AFTER
)) {
596 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
597 dispatch_release(ds
); // dispatch_after sources are one-shot
602 _dispatch_source_kevent_unregister(dispatch_source_t ds
)
604 _dispatch_object_debug(ds
, "%s", __func__
);
605 uint32_t flags
= (uint32_t)ds
->ds_pending_data_mask
;
606 dispatch_kevent_t dk
= ds
->ds_dkev
;
607 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
608 if (ds
->ds_is_custom_source
) {
613 if (ds
->ds_is_direct_kevent
&&
614 ((dqf
& DSF_DELETED
) || !(ds
->ds_is_installed
))) {
615 dk
->dk_kevent
.flags
|= EV_DELETE
; // already deleted
616 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
618 if (dk
->dk_kevent
.filter
== DISPATCH_EVFILT_TIMER
) {
620 if (ds
->ds_is_installed
) {
621 _dispatch_timers_unregister(ds
, dk
);
623 } else if (!ds
->ds_is_direct_kevent
) {
625 dispatch_assert((bool)ds
->ds_is_installed
);
626 TAILQ_REMOVE(&dk
->dk_sources
, ds
->ds_refs
, dr_list
);
627 _dispatch_kevent_unregister(dk
, flags
, 0);
629 unsigned int dkev_dispose_options
= 0;
630 if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
631 dkev_dispose_options
|= DKEV_DISPOSE_IMMEDIATE_DELETE
;
632 } else if (dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
) {
633 if (!ds
->ds_is_direct_kevent
) {
634 dkev_dispose_options
|= DKEV_DISPOSE_IMMEDIATE_DELETE
;
637 long r
= _dispatch_kevent_unregister(dk
, flags
, dkev_dispose_options
);
638 if (r
== EINPROGRESS
) {
639 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
641 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
642 return; // deferred unregistration
643 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
644 } else if (r
== ENOENT
) {
645 _dispatch_debug("kevent-source[%p]: ENOENT delete kevent[%p]",
647 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
648 return; // potential concurrent EV_DELETE delivery rdar://22047283
651 dispatch_assume_zero(r
);
654 _TAILQ_TRASH_ENTRY(ds
->ds_refs
, dr_list
);
657 dqf
= _dispatch_queue_atomic_flags_set_and_clear_orig(ds
->_as_dq
,
658 DSF_DELETED
, DSF_ARMED
| DSF_DEFERRED_DELETE
| DSF_CANCEL_WAITER
);
659 if (dqf
& DSF_CANCEL_WAITER
) {
660 _dispatch_wake_by_address(&ds
->dq_atomic_flags
);
662 ds
->ds_is_installed
= true;
663 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
664 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dk
);
665 _dispatch_release(ds
); // the retain is done at creation time
668 DISPATCH_ALWAYS_INLINE
670 _dispatch_source_tryarm(dispatch_source_t ds
)
672 dispatch_queue_flags_t oqf
, nqf
;
673 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
674 if (oqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
)) {
675 // the test is inside the loop because it's convenient but the
676 // result should not change for the duration of the rmw_loop
677 os_atomic_rmw_loop_give_up(break);
679 nqf
= oqf
| DSF_ARMED
;
684 _dispatch_source_kevent_resume(dispatch_source_t ds
, uint32_t new_flags
)
686 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
687 case DISPATCH_EVFILT_TIMER
:
688 _dispatch_timers_update(ds
);
689 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
690 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
694 case EVFILT_MACHPORT
:
695 if ((ds
->ds_pending_data_mask
& DISPATCH_MACH_RECV_MESSAGE
) &&
696 !ds
->ds_is_direct_kevent
) {
697 new_flags
|= DISPATCH_MACH_RECV_MESSAGE
; // emulate EV_DISPATCH
702 if (unlikely(!_dispatch_source_tryarm(ds
))) {
705 if (unlikely(_dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0))) {
706 _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
, DSF_DELETED
,
710 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
715 _dispatch_source_kevent_register(dispatch_source_t ds
, pthread_priority_t pp
)
717 dispatch_assert_zero((bool)ds
->ds_is_installed
);
718 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
719 case DISPATCH_EVFILT_TIMER
:
720 _dispatch_timers_update(ds
);
721 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
722 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
726 bool do_resume
= _dispatch_kevent_register(&ds
->ds_dkev
, pp
, &flags
);
727 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
->ds_refs
, dr_list
);
728 ds
->ds_is_installed
= true;
729 if (do_resume
|| ds
->ds_needs_rearm
) {
730 if (unlikely(!_dispatch_source_kevent_resume(ds
, flags
))) {
731 _dispatch_source_kevent_unregister(ds
);
734 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
736 _dispatch_object_debug(ds
, "%s", __func__
);
740 _dispatch_source_set_event_handler_context(void *ctxt
)
742 dispatch_source_t ds
= ctxt
;
743 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
745 if (dc
&& (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
)) {
746 dc
->dc_ctxt
= ds
->do_ctxt
;
750 static pthread_priority_t
751 _dispatch_source_compute_kevent_priority(dispatch_source_t ds
)
753 pthread_priority_t p
= ds
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
754 dispatch_queue_t tq
= ds
->do_targetq
;
755 pthread_priority_t tqp
= tq
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
757 while (unlikely(tq
->do_targetq
)) {
758 if (unlikely(tq
== &_dispatch_mgr_q
)) {
759 return _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
761 if (unlikely(_dispatch_queue_is_thread_bound(tq
))) {
762 // thread bound hierarchies are weird, we need to install
763 // from the context of the thread this hierarchy is bound to
766 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq
))) {
767 // this queue may not be activated yet, so the queue graph may not
768 // have stabilized yet
769 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, ds
);
772 if (unlikely(!_dispatch_queue_has_immutable_target(tq
))) {
773 if (!_dispatch_is_in_root_queues_array(tq
->do_targetq
)) {
774 // we're not allowed to dereference tq->do_targetq
775 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, ds
);
779 if (!(tq
->dq_priority
& _PTHREAD_PRIORITY_INHERIT_FLAG
)) {
780 if (p
< tqp
) p
= tqp
;
783 tqp
= tq
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
786 if (unlikely(!tqp
)) {
787 // pthread root queues opt out of QoS
790 return _dispatch_priority_inherit_from_root_queue(p
, tq
);
794 _dispatch_source_finalize_activation(dispatch_source_t ds
)
796 dispatch_continuation_t dc
;
798 if (unlikely(ds
->ds_is_direct_kevent
&&
799 (_dispatch_queue_atomic_flags(ds
->_as_dq
) & DSF_CANCELED
))) {
800 return _dispatch_source_kevent_unregister(ds
);
803 dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
805 if (_dispatch_object_is_barrier(dc
)) {
806 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DQF_BARRIER_BIT
);
808 ds
->dq_priority
= dc
->dc_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
809 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
810 _dispatch_barrier_async_detached_f(ds
->_as_dq
, ds
,
811 _dispatch_source_set_event_handler_context
);
816 _dispatch_queue_finalize_activation(ds
->_as_dq
);
818 if (ds
->ds_is_direct_kevent
&& !ds
->ds_is_installed
) {
819 pthread_priority_t pp
= _dispatch_source_compute_kevent_priority(ds
);
820 if (pp
) _dispatch_source_kevent_register(ds
, pp
);
824 DISPATCH_ALWAYS_INLINE
825 static inline dispatch_queue_t
826 _dispatch_source_invoke2(dispatch_object_t dou
, dispatch_invoke_flags_t flags
,
827 uint64_t *owned
, struct dispatch_object_s
**dc_ptr DISPATCH_UNUSED
)
829 dispatch_source_t ds
= dou
._ds
;
830 dispatch_queue_t retq
= NULL
;
831 dispatch_queue_t dq
= _dispatch_queue_get_current();
833 if (_dispatch_queue_class_probe(ds
)) {
834 // Intentionally always drain even when on the manager queue
835 // and not the source's regular target queue: we need to be able
836 // to drain timer setting and the like there.
837 retq
= _dispatch_queue_serial_drain(ds
->_as_dq
, flags
, owned
, NULL
);
840 // This function performs all source actions. Each action is responsible
841 // for verifying that it takes place on the appropriate queue. If the
842 // current queue is not the correct queue for this action, the correct queue
843 // will be returned and the invoke will be re-driven on that queue.
845 // The order of tests here in invoke and in wakeup should be consistent.
847 dispatch_source_refs_t dr
= ds
->ds_refs
;
848 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
850 if (ds
->ds_is_direct_kevent
) {
851 dkq
= ds
->do_targetq
;
854 if (!ds
->ds_is_installed
) {
855 // The source needs to be installed on the kevent queue.
859 _dispatch_source_kevent_register(ds
, _dispatch_get_defaultpriority());
862 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
863 // Source suspended by an item drained from the source queue.
864 return ds
->do_targetq
;
867 if (_dispatch_source_get_registration_handler(dr
)) {
868 // The source has been registered and the registration handler needs
869 // to be delivered on the target queue.
870 if (dq
!= ds
->do_targetq
) {
871 return ds
->do_targetq
;
873 // clears ds_registration_handler
874 _dispatch_source_registration_callout(ds
, dq
, flags
);
877 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
878 bool prevent_starvation
= false;
880 if ((dqf
& DSF_DEFERRED_DELETE
) &&
881 ((dqf
& DSF_DELETED
) || !(dqf
& DSF_ARMED
))) {
883 // DSF_DELETE: Pending source kevent unregistration has been completed
884 // !DSF_ARMED: event was delivered and can safely be unregistered
888 _dispatch_source_kevent_unregister(ds
);
889 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
892 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && ds
->ds_pending_data
) {
893 // The source has pending data to deliver via the event handler callback
894 // on the target queue. Some sources need to be rearmed on the kevent
895 // queue after event delivery.
896 if (dq
== ds
->do_targetq
) {
897 _dispatch_source_latch_and_call(ds
, dq
, flags
);
898 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
900 // starvation avoidance: if the source triggers itself then force a
901 // re-queue to give other things already queued on the target queue
904 // however, if the source is directly targetting an overcommit root
905 // queue, this would requeue the source and ask for a new overcommit
906 // thread right away.
907 prevent_starvation
= dq
->do_targetq
||
908 !(dq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
909 if (prevent_starvation
&& ds
->ds_pending_data
) {
910 retq
= ds
->do_targetq
;
913 // there is no point trying to be eager, the next thing to do is
914 // to deliver the event
915 return ds
->do_targetq
;
919 if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !(dqf
& DSF_DEFERRED_DELETE
)) {
920 // The source has been cancelled and needs to be uninstalled from the
921 // kevent queue. After uninstallation, the cancellation handler needs
922 // to be delivered to the target queue.
923 if (!(dqf
& DSF_DELETED
)) {
927 _dispatch_source_kevent_unregister(ds
);
928 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
929 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
930 if (!(dqf
& DSF_ARMED
)) {
931 goto unregister_event
;
933 // we need to wait for the EV_DELETE
937 if (dq
!= ds
->do_targetq
&& (_dispatch_source_get_event_handler(dr
) ||
938 _dispatch_source_get_cancel_handler(dr
) ||
939 _dispatch_source_get_registration_handler(dr
))) {
940 retq
= ds
->do_targetq
;
942 _dispatch_source_cancel_callout(ds
, dq
, flags
);
943 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
945 prevent_starvation
= false;
948 if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
949 // The source needs to be rearmed on the kevent queue.
953 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
954 // no need for resume when we can directly unregister the kevent
955 goto unregister_event
;
957 if (prevent_starvation
) {
958 // keep the old behavior to force re-enqueue to our target queue
959 // for the rearm. It is inefficient though and we should
960 // improve this <rdar://problem/24635615>.
962 // if the handler didn't run, or this is a pending delete
963 // or our target queue is a global queue, then starvation is
964 // not a concern and we can rearm right away.
965 return ds
->do_targetq
;
967 if (unlikely(!_dispatch_source_kevent_resume(ds
, 0))) {
968 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
969 goto unregister_event
;
978 _dispatch_source_invoke(dispatch_source_t ds
, dispatch_invoke_flags_t flags
)
980 _dispatch_queue_class_invoke(ds
->_as_dq
, flags
, _dispatch_source_invoke2
);
984 _dispatch_source_wakeup(dispatch_source_t ds
, pthread_priority_t pp
,
985 dispatch_wakeup_flags_t flags
)
987 // This function determines whether the source needs to be invoked.
988 // The order of tests here in wakeup and in invoke should be consistent.
990 dispatch_source_refs_t dr
= ds
->ds_refs
;
991 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
992 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
993 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
994 bool deferred_delete
= (dqf
& DSF_DEFERRED_DELETE
);
996 if (ds
->ds_is_direct_kevent
) {
997 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1000 if (!ds
->ds_is_installed
) {
1001 // The source needs to be installed on the kevent queue.
1003 } else if (_dispatch_source_get_registration_handler(dr
)) {
1004 // The registration handler needs to be delivered to the target queue.
1005 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1006 } else if (deferred_delete
&& ((dqf
& DSF_DELETED
) || !(dqf
& DSF_ARMED
))) {
1007 // Pending source kevent unregistration has been completed
1008 // or EV_ONESHOT event can be acknowledged
1010 } else if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && ds
->ds_pending_data
) {
1011 // The source has pending data to deliver to the target queue.
1012 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1013 } else if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !deferred_delete
) {
1014 // The source needs to be uninstalled from the kevent queue, or the
1015 // cancellation handler needs to be delivered to the target queue.
1016 // Note: cancellation assumes installation.
1017 if (!(dqf
& DSF_DELETED
)) {
1019 } else if (_dispatch_source_get_event_handler(dr
) ||
1020 _dispatch_source_get_cancel_handler(dr
) ||
1021 _dispatch_source_get_registration_handler(dr
)) {
1022 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1024 } else if (ds
->ds_needs_rearm
&& !(dqf
& DSF_ARMED
)) {
1025 // The source needs to be rearmed on the kevent queue.
1028 if (!tq
&& _dispatch_queue_class_probe(ds
)) {
1029 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
1033 return _dispatch_queue_class_wakeup(ds
->_as_dq
, pp
, flags
, tq
);
1035 return _dispatch_queue_class_override_drainer(ds
->_as_dq
, pp
, flags
);
1036 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
1037 return _dispatch_release_tailcall(ds
);
1042 dispatch_source_cancel(dispatch_source_t ds
)
1044 _dispatch_object_debug(ds
, "%s", __func__
);
1045 // Right after we set the cancel flag, someone else
1046 // could potentially invoke the source, do the cancellation,
1047 // unregister the source, and deallocate it. We would
1048 // need to therefore retain/release before setting the bit
1049 _dispatch_retain(ds
);
1051 dispatch_queue_t q
= ds
->_as_dq
;
1052 if (_dispatch_queue_atomic_flags_set_orig(q
, DSF_CANCELED
) & DSF_CANCELED
) {
1053 _dispatch_release_tailcall(ds
);
1055 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
| DISPATCH_WAKEUP_CONSUME
);
1060 dispatch_source_cancel_and_wait(dispatch_source_t ds
)
1062 dispatch_queue_flags_t old_dqf
, dqf
, new_dqf
;
1063 pthread_priority_t pp
;
1065 if (unlikely(_dispatch_source_get_cancel_handler(ds
->ds_refs
))) {
1066 DISPATCH_CLIENT_CRASH(ds
, "Source has a cancel handler");
1069 _dispatch_object_debug(ds
, "%s", __func__
);
1070 os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1071 new_dqf
= old_dqf
| DSF_CANCELED
;
1072 if (old_dqf
& DSF_CANCEL_WAITER
) {
1073 os_atomic_rmw_loop_give_up(break);
1075 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1076 // just add DSF_CANCELED
1077 } else if ((old_dqf
& DSF_DEFERRED_DELETE
) || !ds
->ds_is_direct_kevent
){
1078 new_dqf
|= DSF_CANCEL_WAITER
;
1083 if (old_dqf
& DQF_RELEASED
) {
1084 DISPATCH_CLIENT_CRASH(ds
, "Dispatch source used after last release");
1086 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1089 if (dqf
& DSF_CANCEL_WAITER
) {
1093 // simplified version of _dispatch_queue_drain_try_lock
1094 // that also sets the DIRTY bit on failure to lock
1095 dispatch_lock_owner tid_self
= _dispatch_tid_self();
1096 uint64_t xor_owner_and_set_full_width
= tid_self
|
1097 DISPATCH_QUEUE_WIDTH_FULL_BIT
| DISPATCH_QUEUE_IN_BARRIER
;
1098 uint64_t old_state
, new_state
;
1100 os_atomic_rmw_loop2o(ds
, dq_state
, old_state
, new_state
, seq_cst
, {
1101 new_state
= old_state
;
1102 if (likely(_dq_state_is_runnable(old_state
) &&
1103 !_dq_state_drain_locked(old_state
))) {
1104 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
1105 new_state
^= xor_owner_and_set_full_width
;
1106 } else if (old_dqf
& DSF_CANCELED
) {
1107 os_atomic_rmw_loop_give_up(break);
1109 // this case needs a release barrier, hence the seq_cst above
1110 new_state
|= DISPATCH_QUEUE_DIRTY
;
1114 if (unlikely(_dq_state_is_suspended(old_state
))) {
1115 if (unlikely(_dq_state_suspend_cnt(old_state
))) {
1116 DISPATCH_CLIENT_CRASH(ds
, "Source is suspended");
1118 // inactive sources have never been registered and there is no need
1119 // to wait here because activation will notice and mark the source
1120 // as deleted without ever trying to use the fd or mach port.
1121 return dispatch_activate(ds
);
1124 if (likely(_dq_state_is_runnable(old_state
) &&
1125 !_dq_state_drain_locked(old_state
))) {
1126 // same thing _dispatch_source_invoke2() does when handling cancellation
1127 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1128 if (!(dqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
))) {
1129 _dispatch_source_kevent_unregister(ds
);
1130 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1131 if (likely((dqf
& DSF_STATE_MASK
) == DSF_DELETED
)) {
1132 _dispatch_source_cancel_callout(ds
, NULL
, DISPATCH_INVOKE_NONE
);
1135 _dispatch_try_lock_transfer_or_wakeup(ds
->_as_dq
);
1136 } else if (unlikely(_dq_state_drain_locked_by(old_state
, tid_self
))) {
1137 DISPATCH_CLIENT_CRASH(ds
, "dispatch_source_cancel_and_wait "
1138 "called from a source handler");
1141 pp
= _dispatch_get_priority() & _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
1142 if (pp
) dx_wakeup(ds
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
1143 dispatch_activate(ds
);
1146 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1147 while (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
1148 if (unlikely(!(dqf
& DSF_CANCEL_WAITER
))) {
1149 if (!os_atomic_cmpxchgvw2o(ds
, dq_atomic_flags
,
1150 dqf
, dqf
| DSF_CANCEL_WAITER
, &dqf
, relaxed
)) {
1153 dqf
|= DSF_CANCEL_WAITER
;
1155 _dispatch_wait_on_address(&ds
->dq_atomic_flags
, dqf
, DLOCK_LOCK_NONE
);
1156 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1161 _dispatch_source_merge_kevent(dispatch_source_t ds
,
1162 const _dispatch_kevent_qos_s
*ke
)
1164 _dispatch_object_debug(ds
, "%s", __func__
);
1165 dispatch_wakeup_flags_t flags
= 0;
1166 dispatch_queue_flags_t dqf
;
1167 pthread_priority_t pp
= 0;
1169 if (ds
->ds_needs_rearm
|| (ke
->flags
& (EV_DELETE
| EV_ONESHOT
))) {
1170 // once we modify the queue atomic flags below, it will allow concurrent
1171 // threads running _dispatch_source_invoke2 to dispose of the source,
1172 // so we can't safely borrow the reference we get from the knote udata
1173 // anymore, and need our own
1174 flags
= DISPATCH_WAKEUP_CONSUME
;
1175 _dispatch_retain(ds
); // rdar://20382435
1178 if ((ke
->flags
& EV_UDATA_SPECIFIC
) && (ke
->flags
& EV_ONESHOT
) &&
1179 !(ke
->flags
& EV_DELETE
)) {
1180 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1181 DSF_DEFERRED_DELETE
, DSF_ARMED
);
1182 if (ke
->flags
& EV_VANISHED
) {
1183 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
1184 "monitored resource vanished before the source "
1185 "cancel handler was invoked", 0);
1187 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds
,
1188 (ke
->flags
& EV_VANISHED
) ? "vanished" :
1189 "deferred delete oneshot", (void*)ke
->udata
);
1190 } else if ((ke
->flags
& EV_DELETE
) || (ke
->flags
& EV_ONESHOT
)) {
1191 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1192 DSF_DELETED
, DSF_ARMED
);
1193 _dispatch_debug("kevent-source[%p]: delete kevent[%p]",
1194 ds
, (void*)ke
->udata
);
1195 if (ke
->flags
& EV_DELETE
) goto done
;
1196 } else if (ds
->ds_needs_rearm
) {
1197 dqf
= _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
1198 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p] ",
1199 ds
, (void*)ke
->udata
);
1201 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1204 if (dqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1205 goto done
; // rdar://20204025
1208 if (ke
->filter
== EVFILT_MACHPORT
&&
1209 dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
) {
1210 DISPATCH_INTERNAL_CRASH(ke
->flags
,"Unexpected kevent for mach channel");
1215 if ((ke
->flags
& EV_UDATA_SPECIFIC
) && (ke
->flags
& EV_ONESHOT
) &&
1216 (ke
->flags
& EV_VANISHED
)) {
1217 // if the resource behind the ident vanished, the event handler can't
1218 // do anything useful anymore, so do not try to call it at all
1220 // Note: if the kernel doesn't support EV_VANISHED we always get it
1221 // back unchanged from the flags passed at EV_ADD (registration) time
1222 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1223 // if we get both bits it was a real EV_VANISHED delivery
1224 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1226 } else if (ke
->filter
== EVFILT_MACHPORT
) {
1227 data
= DISPATCH_MACH_RECV_MESSAGE
;
1228 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1230 } else if (ds
->ds_is_level
) {
1231 // ke->data is signed and "negative available data" makes no sense
1232 // zero bytes happens when EV_EOF is set
1233 dispatch_assert(ke
->data
>= 0l);
1234 data
= ~(unsigned long)ke
->data
;
1235 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1236 } else if (ds
->ds_is_adder
) {
1237 data
= (unsigned long)ke
->data
;
1238 os_atomic_add2o(ds
, ds_pending_data
, data
, relaxed
);
1239 } else if (ke
->fflags
& ds
->ds_pending_data_mask
) {
1240 data
= ke
->fflags
& ds
->ds_pending_data_mask
;
1241 os_atomic_or2o(ds
, ds_pending_data
, data
, relaxed
);
1245 #if DISPATCH_USE_KEVENT_QOS
1246 pp
= ((pthread_priority_t
)ke
->qos
) & ~_PTHREAD_PRIORITY_FLAGS_MASK
;
1248 dx_wakeup(ds
, pp
, flags
| DISPATCH_WAKEUP_FLUSH
);
1252 #pragma mark dispatch_kevent_t
1254 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1255 static void _dispatch_kevent_guard(dispatch_kevent_t dk
);
1256 static void _dispatch_kevent_unguard(dispatch_kevent_t dk
);
1258 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk
) { (void)dk
; }
1259 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk
) { (void)dk
; }
1262 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
1263 static struct dispatch_kevent_s _dispatch_kevent_data_or
= {
1265 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
1268 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or
.dk_sources
),
1270 static struct dispatch_kevent_s _dispatch_kevent_data_add
= {
1272 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
1274 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add
.dk_sources
),
1276 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
1278 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
1280 DISPATCH_CACHELINE_ALIGN
1281 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
1284 _dispatch_kevent_init()
1287 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
1288 TAILQ_INIT(&_dispatch_sources
[i
]);
1291 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
1292 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
1293 &_dispatch_kevent_data_or
, dk_list
);
1294 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
1295 &_dispatch_kevent_data_add
, dk_list
);
1296 _dispatch_kevent_data_or
.dk_kevent
.udata
=
1297 (_dispatch_kevent_qos_udata_t
)&_dispatch_kevent_data_or
;
1298 _dispatch_kevent_data_add
.dk_kevent
.udata
=
1299 (_dispatch_kevent_qos_udata_t
)&_dispatch_kevent_data_add
;
1300 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
1303 static inline uintptr_t
1304 _dispatch_kevent_hash(uint64_t ident
, short filter
)
1308 value
= (filter
== EVFILT_MACHPORT
||
1309 filter
== DISPATCH_EVFILT_MACH_NOTIFICATION
?
1310 MACH_PORT_INDEX(ident
) : ident
);
1315 return DSL_HASH((uintptr_t)value
);
1318 static dispatch_kevent_t
1319 _dispatch_kevent_find(uint64_t ident
, short filter
)
1321 uintptr_t hash
= _dispatch_kevent_hash(ident
, filter
);
1322 dispatch_kevent_t dki
;
1324 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
1325 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
1333 _dispatch_kevent_insert(dispatch_kevent_t dk
)
1335 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) return;
1336 _dispatch_kevent_guard(dk
);
1337 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
1338 dk
->dk_kevent
.filter
);
1339 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
1342 // Find existing kevents, and merge any new flags if necessary
1344 _dispatch_kevent_register(dispatch_kevent_t
*dkp
, pthread_priority_t pp
,
1347 dispatch_kevent_t dk
= NULL
, ds_dkev
= *dkp
;
1349 bool do_resume
= false;
1351 if (!(ds_dkev
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1352 dk
= _dispatch_kevent_find(ds_dkev
->dk_kevent
.ident
,
1353 ds_dkev
->dk_kevent
.filter
);
1356 // If an existing dispatch kevent is found, check to see if new flags
1357 // need to be added to the existing kevent
1358 new_flags
= ~dk
->dk_kevent
.fflags
& ds_dkev
->dk_kevent
.fflags
;
1359 dk
->dk_kevent
.fflags
|= ds_dkev
->dk_kevent
.fflags
;
1362 do_resume
= new_flags
;
1365 #if DISPATCH_USE_KEVENT_WORKQUEUE
1366 if (!_dispatch_kevent_workqueue_enabled
) {
1368 } else if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1369 dk
->dk_kevent
.qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
1371 pp
&= (~_PTHREAD_PRIORITY_FLAGS_MASK
|
1372 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
1373 if (!pp
) pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
1374 _dispatch_assert_is_valid_qos_class(pp
);
1375 dk
->dk_kevent
.qos
= (_dispatch_kevent_priority_t
)pp
;
1380 _dispatch_kevent_insert(dk
);
1381 new_flags
= dk
->dk_kevent
.fflags
;
1384 // Re-register the kevent with the kernel if new flags were added
1385 // by the dispatch kevent
1387 dk
->dk_kevent
.flags
|= EV_ADD
;
1394 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
1399 if (dk
->dk_kevent
.flags
& EV_DELETE
) {
1402 switch (dk
->dk_kevent
.filter
) {
1403 case DISPATCH_EVFILT_TIMER
:
1404 case DISPATCH_EVFILT_CUSTOM_ADD
:
1405 case DISPATCH_EVFILT_CUSTOM_OR
:
1406 // these types not registered with kevent
1409 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1410 return _dispatch_kevent_mach_notify_resume(dk
, new_flags
, del_flags
);
1411 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1412 case EVFILT_MACHPORT
:
1413 if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1414 return _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
1420 // oneshot dk may be freed by the time we return from
1421 // _dispatch_kq_immediate_update if the event was delivered (and then
1422 // unregistered) concurrently.
1423 oneshot
= (dk
->dk_kevent
.flags
& EV_ONESHOT
);
1424 r
= _dispatch_kq_immediate_update(&dk
->dk_kevent
);
1425 if (r
&& (dk
->dk_kevent
.flags
& EV_ADD
) &&
1426 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1427 dk
->dk_kevent
.flags
|= EV_DELETE
;
1428 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
1429 } else if (!oneshot
&& (dk
->dk_kevent
.flags
& EV_DISPATCH
)) {
1430 // we can safely skip doing this for ONESHOT events because
1431 // the next kq update we will do is _dispatch_kevent_dispose()
1432 // which also clears EV_ADD.
1433 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_VANISHED
);
1437 (void)new_flags
; (void)del_flags
;
1441 _dispatch_kevent_dispose(dispatch_kevent_t dk
, unsigned int options
)
1444 switch (dk
->dk_kevent
.filter
) {
1445 case DISPATCH_EVFILT_TIMER
:
1446 case DISPATCH_EVFILT_CUSTOM_ADD
:
1447 case DISPATCH_EVFILT_CUSTOM_OR
:
1448 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) {
1451 // these sources live on statically allocated lists
1455 if (!(dk
->dk_kevent
.flags
& EV_DELETE
)) {
1456 dk
->dk_kevent
.flags
|= EV_DELETE
;
1457 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
|EV_VANISHED
);
1458 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1459 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1461 switch (dk
->dk_kevent
.filter
) {
1463 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1464 r
= _dispatch_kevent_mach_notify_resume(dk
, 0,dk
->dk_kevent
.fflags
);
1466 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1467 case EVFILT_MACHPORT
:
1468 if (!(dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1469 r
= _dispatch_kevent_machport_resume(dk
,0,dk
->dk_kevent
.fflags
);
1476 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1477 _dispatch_kq_deferred_update(&dk
->dk_kevent
);
1479 r
= _dispatch_kq_immediate_update(&dk
->dk_kevent
);
1483 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1484 dk
->dk_kevent
.flags
&= ~EV_ENABLE
;
1487 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) {
1488 bool deferred_delete
= (r
== EINPROGRESS
);
1489 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1490 if (r
== ENOENT
) deferred_delete
= true;
1492 if (deferred_delete
) {
1493 // deferred EV_DELETE or concurrent concurrent EV_DELETE delivery
1494 dk
->dk_kevent
.flags
&= ~EV_DELETE
;
1495 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1499 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
1500 dk
->dk_kevent
.filter
);
1501 TAILQ_REMOVE(&_dispatch_sources
[hash
], dk
, dk_list
);
1503 _dispatch_kevent_unguard(dk
);
1509 _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
,
1510 unsigned int options
)
1512 dispatch_source_refs_t dri
;
1513 uint32_t del_flags
, fflags
= 0;
1516 if (TAILQ_EMPTY(&dk
->dk_sources
) ||
1517 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1518 r
= _dispatch_kevent_dispose(dk
, options
);
1520 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
1521 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
1522 uint32_t mask
= (uint32_t)dsi
->ds_pending_data_mask
;
1525 del_flags
= flg
& ~fflags
;
1527 dk
->dk_kevent
.flags
|= EV_ADD
;
1528 dk
->dk_kevent
.fflags
&= ~del_flags
;
1529 r
= _dispatch_kevent_resume(dk
, 0, del_flags
);
1537 _dispatch_kevent_proc_exit(_dispatch_kevent_qos_s
*ke
)
1539 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
1540 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
1541 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
1542 _dispatch_kevent_qos_s fake
;
1544 fake
.flags
&= ~EV_ERROR
;
1545 fake
.flags
|= EV_ONESHOT
;
1546 fake
.fflags
= NOTE_EXIT
;
1548 _dispatch_kevent_debug("synthetic NOTE_EXIT", ke
);
1549 _dispatch_kevent_merge(&fake
);
1554 _dispatch_kevent_error(_dispatch_kevent_qos_s
*ke
)
1556 _dispatch_kevent_qos_s
*kev
= NULL
;
1558 if (ke
->flags
& EV_DELETE
) {
1559 if (ke
->flags
& EV_UDATA_SPECIFIC
) {
1560 if (ke
->data
== EINPROGRESS
) {
1561 // deferred EV_DELETE
1564 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1565 if (ke
->data
== ENOENT
) {
1566 // deferred EV_DELETE
1571 // for EV_DELETE if the update was deferred we may have reclaimed
1572 // our dispatch_kevent_t, and it is unsafe to dereference it now.
1573 } else if (ke
->udata
) {
1574 kev
= &((dispatch_kevent_t
)ke
->udata
)->dk_kevent
;
1575 ke
->flags
|= kev
->flags
;
1579 if (ke
->filter
== EVFILT_MACHPORT
&& ke
->data
== ENOTSUP
&&
1580 (ke
->flags
& EV_ADD
) && _dispatch_evfilt_machport_direct_enabled
&&
1581 kev
&& (kev
->fflags
& MACH_RCV_MSG
)) {
1582 DISPATCH_INTERNAL_CRASH(ke
->ident
,
1583 "Missing EVFILT_MACHPORT support for ports");
1588 // log the unexpected error
1589 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
1591 ke
->flags
& EV_DELETE
? "delete" :
1592 ke
->flags
& EV_ADD
? "add" :
1593 ke
->flags
& EV_ENABLE
? "enable" : "monitor",
1599 _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
)
1602 static dispatch_once_t pred
;
1603 dispatch_once_f(&pred
, NULL
, _dispatch_kevent_debugger
);
1605 if (ke
->filter
== EVFILT_USER
) {
1606 _dispatch_kevent_mgr_debug(ke
);
1609 if (slowpath(ke
->flags
& EV_ERROR
)) {
1610 if (ke
->filter
== EVFILT_PROC
&& ke
->data
== ESRCH
) {
1611 _dispatch_debug("kevent[0x%llx]: ESRCH from EVFILT_PROC: "
1612 "generating fake NOTE_EXIT", (unsigned long long)ke
->udata
);
1613 return _dispatch_kevent_proc_exit(ke
);
1615 _dispatch_debug("kevent[0x%llx]: handling error",
1616 (unsigned long long)ke
->udata
);
1617 return _dispatch_kevent_error(ke
);
1619 if (ke
->filter
== EVFILT_TIMER
) {
1620 _dispatch_debug("kevent[0x%llx]: handling timer",
1621 (unsigned long long)ke
->udata
);
1622 return _dispatch_timers_kevent(ke
);
1625 if (ke
->filter
== EVFILT_MACHPORT
) {
1626 _dispatch_debug("kevent[0x%llx]: handling mach port",
1627 (unsigned long long)ke
->udata
);
1628 return _dispatch_mach_kevent_merge(ke
);
1631 return _dispatch_kevent_merge(ke
);
1636 _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
)
1638 dispatch_kevent_t dk
= (void*)ke
->udata
;
1639 dispatch_source_refs_t dri
, dr_next
;
1641 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
1642 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri
), ke
);
1646 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1648 _dispatch_kevent_guard(dispatch_kevent_t dk
)
1651 const unsigned int guard_flags
= GUARD_CLOSE
;
1652 int r
, fd_flags
= 0;
1653 switch (dk
->dk_kevent
.filter
) {
1657 guard
= &dk
->dk_kevent
;
1658 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, NULL
, 0,
1659 &guard
, guard_flags
, &fd_flags
);
1660 if (slowpath(r
== -1)) {
1663 (void)dispatch_assume_zero(err
);
1667 dk
->dk_kevent
.ext
[0] = guard_flags
;
1668 dk
->dk_kevent
.ext
[1] = fd_flags
;
1674 _dispatch_kevent_unguard(dispatch_kevent_t dk
)
1677 unsigned int guard_flags
;
1679 switch (dk
->dk_kevent
.filter
) {
1683 guard_flags
= (unsigned int)dk
->dk_kevent
.ext
[0];
1687 guard
= &dk
->dk_kevent
;
1688 fd_flags
= (int)dk
->dk_kevent
.ext
[1];
1689 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, &guard
,
1690 guard_flags
, NULL
, 0, &fd_flags
);
1691 if (slowpath(r
== -1)) {
1692 (void)dispatch_assume_zero(errno
);
1695 dk
->dk_kevent
.ext
[0] = 0;
1699 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1702 #pragma mark dispatch_source_timer
1704 #if DISPATCH_USE_DTRACE
1705 static dispatch_source_refs_t
1706 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1707 #define _dispatch_trace_next_timer_set(x, q) \
1708 _dispatch_trace_next_timer[(q)] = (x)
1709 #define _dispatch_trace_next_timer_program(d, q) \
1710 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1711 #define _dispatch_trace_next_timer_wake(q) \
1712 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1714 #define _dispatch_trace_next_timer_set(x, q)
1715 #define _dispatch_trace_next_timer_program(d, q)
1716 #define _dispatch_trace_next_timer_wake(q)
1719 #define _dispatch_source_timer_telemetry_enabled() false
1723 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1724 uintptr_t ident
, struct dispatch_timer_source_s
*values
)
1726 if (_dispatch_trace_timer_configure_enabled()) {
1727 _dispatch_trace_timer_configure(ds
, ident
, values
);
1731 DISPATCH_ALWAYS_INLINE
1733 _dispatch_source_timer_telemetry(dispatch_source_t ds
, uintptr_t ident
,
1734 struct dispatch_timer_source_s
*values
)
1736 if (_dispatch_trace_timer_configure_enabled() ||
1737 _dispatch_source_timer_telemetry_enabled()) {
1738 _dispatch_source_timer_telemetry_slow(ds
, ident
, values
);
1739 asm(""); // prevent tailcall
1743 // approx 1 year (60s * 60m * 24h * 365d)
1744 #define FOREVER_NSEC 31536000000000000ull
1746 DISPATCH_ALWAYS_INLINE
1747 static inline uint64_t
1748 _dispatch_source_timer_now(uint64_t nows
[], unsigned int tidx
)
1750 unsigned int tk
= DISPATCH_TIMER_KIND(tidx
);
1751 if (nows
&& fastpath(nows
[tk
] != 0)) {
1756 case DISPATCH_TIMER_KIND_MACH
:
1757 now
= _dispatch_absolute_time();
1759 case DISPATCH_TIMER_KIND_WALL
:
1760 now
= _dispatch_get_nanoseconds();
1769 static inline unsigned long
1770 _dispatch_source_timer_data(dispatch_source_refs_t dr
, unsigned long prev
)
1772 // calculate the number of intervals since last fire
1773 unsigned long data
, missed
;
1775 now
= _dispatch_source_timer_now(NULL
, _dispatch_source_timer_idx(dr
));
1776 missed
= (unsigned long)((now
- ds_timer(dr
).last_fire
) /
1777 ds_timer(dr
).interval
);
1778 // correct for missed intervals already delivered last time
1779 data
= prev
- ds_timer(dr
).missed
+ missed
;
1780 ds_timer(dr
).missed
= missed
;
1784 struct dispatch_set_timer_params
{
1785 dispatch_source_t ds
;
1787 struct dispatch_timer_source_s values
;
1791 _dispatch_source_set_timer3(void *context
)
1793 // Called on the _dispatch_mgr_q
1794 struct dispatch_set_timer_params
*params
= context
;
1795 dispatch_source_t ds
= params
->ds
;
1796 ds
->ds_ident_hack
= params
->ident
;
1797 ds_timer(ds
->ds_refs
) = params
->values
;
1798 // Clear any pending data that might have accumulated on
1799 // older timer params <rdar://problem/8574886>
1800 ds
->ds_pending_data
= 0;
1801 // Re-arm in case we got disarmed because of pending set_timer suspension
1802 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
1803 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
, ds
->ds_dkev
);
1804 dispatch_resume(ds
);
1805 // Must happen after resume to avoid getting disarmed due to suspension
1806 _dispatch_timers_update(ds
);
1807 dispatch_release(ds
);
1808 if (params
->values
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1809 _dispatch_mach_host_calendar_change_register();
1815 _dispatch_source_set_timer2(void *context
)
1817 // Called on the source queue
1818 struct dispatch_set_timer_params
*params
= context
;
1819 dispatch_suspend(params
->ds
);
1820 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q
, params
,
1821 _dispatch_source_set_timer3
);
1825 static struct dispatch_set_timer_params
*
1826 _dispatch_source_timer_params(dispatch_source_t ds
, dispatch_time_t start
,
1827 uint64_t interval
, uint64_t leeway
)
1829 struct dispatch_set_timer_params
*params
;
1830 params
= _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params
));
1832 params
->values
.flags
= ds_timer(ds
->ds_refs
).flags
;
1834 if (interval
== 0) {
1835 // we use zero internally to mean disabled
1837 } else if ((int64_t)interval
< 0) {
1838 // 6866347 - make sure nanoseconds won't overflow
1839 interval
= INT64_MAX
;
1841 if ((int64_t)leeway
< 0) {
1844 if (start
== DISPATCH_TIME_NOW
) {
1845 start
= _dispatch_absolute_time();
1846 } else if (start
== DISPATCH_TIME_FOREVER
) {
1850 if ((int64_t)start
< 0) {
1852 start
= (dispatch_time_t
)-((int64_t)start
);
1853 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1856 interval
= _dispatch_time_nano2mach(interval
);
1858 // rdar://problem/7287561 interval must be at least one in
1859 // in order to avoid later division by zero when calculating
1860 // the missed interval count. (NOTE: the wall clock's
1861 // interval is already "fixed" to be 1 or more)
1864 leeway
= _dispatch_time_nano2mach(leeway
);
1865 params
->values
.flags
&= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK
;
1867 params
->ident
= DISPATCH_TIMER_IDENT(params
->values
.flags
);
1868 params
->values
.target
= start
;
1869 params
->values
.deadline
= (start
< UINT64_MAX
- leeway
) ?
1870 start
+ leeway
: UINT64_MAX
;
1871 params
->values
.interval
= interval
;
1872 params
->values
.leeway
= (interval
== INT64_MAX
|| leeway
< interval
/ 2) ?
1873 leeway
: interval
/ 2;
1877 DISPATCH_ALWAYS_INLINE
1879 _dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1880 uint64_t interval
, uint64_t leeway
, bool source_sync
)
1882 if (slowpath(!ds
->ds_is_timer
) ||
1883 slowpath(ds_timer(ds
->ds_refs
).flags
& DISPATCH_TIMER_INTERVAL
)) {
1884 DISPATCH_CLIENT_CRASH(ds
, "Attempt to set timer on a non-timer source");
1887 struct dispatch_set_timer_params
*params
;
1888 params
= _dispatch_source_timer_params(ds
, start
, interval
, leeway
);
1890 _dispatch_source_timer_telemetry(ds
, params
->ident
, ¶ms
->values
);
1891 // Suspend the source so that it doesn't fire with pending changes
1892 // The use of suspend/resume requires the external retain/release
1893 dispatch_retain(ds
);
1895 return _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, params
,
1896 _dispatch_source_set_timer2
);
1898 return _dispatch_source_set_timer2(params
);
1903 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1904 uint64_t interval
, uint64_t leeway
)
1906 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, true);
1910 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds
,
1911 dispatch_time_t start
, uint64_t interval
, uint64_t leeway
)
1913 // Don't serialize through the source queue for CF timers <rdar://13833190>
1914 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, false);
1918 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1920 dispatch_source_refs_t dr
= ds
->ds_refs
;
1921 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1922 const bool animation
= ds_timer(dr
).flags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1923 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1924 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1925 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1927 interval
= FOREVER_NSEC
;
1929 interval
= _dispatch_time_nano2mach(interval
);
1930 uint64_t target
= _dispatch_absolute_time() + interval
;
1931 target
= (target
/ interval
) * interval
;
1932 const uint64_t leeway
= animation
?
1933 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1934 ds_timer(dr
).target
= target
;
1935 ds_timer(dr
).deadline
= target
+ leeway
;
1936 ds_timer(dr
).interval
= interval
;
1937 ds_timer(dr
).leeway
= leeway
;
1938 _dispatch_source_timer_telemetry(ds
, ds
->ds_ident_hack
, &ds_timer(dr
));
1942 #pragma mark dispatch_timers
1944 #define DISPATCH_TIMER_STRUCT(refs) \
1945 uint64_t target, deadline; \
1946 TAILQ_HEAD(, refs) dt_sources
1948 typedef struct dispatch_timer_s
{
1949 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s
);
1950 } *dispatch_timer_t
;
1952 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1954 .target = UINT64_MAX, \
1955 .deadline = UINT64_MAX, \
1956 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1957 _dispatch_timer[tidx].dt_sources), \
1959 #define DISPATCH_TIMER_INIT(kind, qos) \
1960 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1961 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1963 struct dispatch_timer_s _dispatch_timer
[] = {
1964 DISPATCH_TIMER_INIT(WALL
, NORMAL
),
1965 DISPATCH_TIMER_INIT(WALL
, CRITICAL
),
1966 DISPATCH_TIMER_INIT(WALL
, BACKGROUND
),
1967 DISPATCH_TIMER_INIT(MACH
, NORMAL
),
1968 DISPATCH_TIMER_INIT(MACH
, CRITICAL
),
1969 DISPATCH_TIMER_INIT(MACH
, BACKGROUND
),
1971 #define DISPATCH_TIMER_COUNT \
1972 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1975 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1976 (void*)&_dispatch_kevent_timer[tidx]
1978 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1979 (uintptr_t)&_dispatch_kevent_timer[tidx]
1982 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1983 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1985 // dynamic initialization in _dispatch_timers_init()
1986 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1989 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1993 .filter = DISPATCH_EVFILT_TIMER, \
1994 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1996 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1997 _dispatch_kevent_timer[tidx].dk_sources), \
1999 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
2000 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
2001 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
2003 struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
2004 DISPATCH_KEVENT_TIMER_INIT(WALL
, NORMAL
),
2005 DISPATCH_KEVENT_TIMER_INIT(WALL
, CRITICAL
),
2006 DISPATCH_KEVENT_TIMER_INIT(WALL
, BACKGROUND
),
2007 DISPATCH_KEVENT_TIMER_INIT(MACH
, NORMAL
),
2008 DISPATCH_KEVENT_TIMER_INIT(MACH
, CRITICAL
),
2009 DISPATCH_KEVENT_TIMER_INIT(MACH
, BACKGROUND
),
2010 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM
),
2012 #define DISPATCH_KEVENT_TIMER_COUNT \
2013 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
2015 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
2016 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(tidx, note) \
2018 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(tidx), \
2019 .filter = EVFILT_TIMER, \
2020 .flags = EV_ONESHOT, \
2021 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
2023 #define DISPATCH_KEVENT_TIMEOUT_INIT(kind, qos, note) \
2024 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_INDEX( \
2025 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos), note)
2027 _dispatch_kevent_qos_s _dispatch_kevent_timeout
[] = {
2028 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, NORMAL
, NOTE_MACH_CONTINUOUS_TIME
),
2029 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, CRITICAL
, NOTE_MACH_CONTINUOUS_TIME
| NOTE_CRITICAL
),
2030 DISPATCH_KEVENT_TIMEOUT_INIT(WALL
, BACKGROUND
, NOTE_MACH_CONTINUOUS_TIME
| NOTE_BACKGROUND
),
2031 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, NORMAL
, 0),
2032 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, CRITICAL
, NOTE_CRITICAL
),
2033 DISPATCH_KEVENT_TIMEOUT_INIT(MACH
, BACKGROUND
, NOTE_BACKGROUND
),
2035 #define DISPATCH_KEVENT_TIMEOUT_COUNT \
2036 ((sizeof(_dispatch_kevent_timeout) / sizeof(_dispatch_kevent_timeout[0])))
2037 static_assert(DISPATCH_KEVENT_TIMEOUT_COUNT
== DISPATCH_TIMER_INDEX_COUNT
- 1,
2038 "should have a kevent for everything but disarm (ddt assumes this)");
2040 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
2041 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
2043 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
2044 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
2045 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
2046 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
2049 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
2050 typeof(dr) dri = NULL; typeof(dt) dti; \
2051 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
2052 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
2053 if (ds_timer(dr).target < ds_timer(dri).target) { \
2057 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
2058 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
2063 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
2065 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
2069 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
2071 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
2075 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
2077 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
2078 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
2080 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
2083 #define _dispatch_timers_check(dra, dta) ({ \
2084 unsigned int timerm = _dispatch_timers_mask; \
2085 bool update = false; \
2086 unsigned int tidx; \
2087 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
2088 if (!(timerm & (1 << tidx))){ \
2091 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
2092 TAILQ_FIRST(&dra[tidx].dk_sources); \
2093 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
2094 TAILQ_FIRST(&dta[tidx].dt_sources); \
2095 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
2096 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
2097 if (target != dta[tidx].target) { \
2098 dta[tidx].target = target; \
2101 if (deadline != dta[tidx].deadline) { \
2102 dta[tidx].deadline = deadline; \
2108 static bool _dispatch_timers_reconfigure
, _dispatch_timer_expired
;
2109 static unsigned int _dispatch_timers_mask
;
2110 static bool _dispatch_timers_force_max_leeway
;
2113 _dispatch_timers_init(void)
2117 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2118 _dispatch_kevent_timer
[tidx
].dk_kevent
.udata
=
2119 DISPATCH_KEVENT_TIMER_UDATA(tidx
);
2122 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
2123 _dispatch_timers_force_max_leeway
= true;
2128 _dispatch_timers_unregister(dispatch_source_t ds
, dispatch_kevent_t dk
)
2130 dispatch_source_refs_t dr
= ds
->ds_refs
;
2131 unsigned int tidx
= (unsigned int)dk
->dk_kevent
.ident
;
2133 if (slowpath(ds_timer_aggregate(ds
))) {
2134 _dispatch_timer_aggregates_unregister(ds
, tidx
);
2136 _dispatch_timers_remove(tidx
, dk
, _dispatch_kevent_timer
, dr
, dr_list
,
2137 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
2138 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2139 _dispatch_timers_reconfigure
= true;
2140 _dispatch_timers_mask
|= 1 << tidx
;
2144 // Updates the ordered list of timers based on next fire date for changes to ds.
2145 // Should only be called from the context of _dispatch_mgr_q.
2147 _dispatch_timers_update(dispatch_source_t ds
)
2149 dispatch_kevent_t dk
= ds
->ds_dkev
;
2150 dispatch_source_refs_t dr
= ds
->ds_refs
;
2153 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2155 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
2156 if (slowpath(!dk
)) {
2159 // Move timers that are disabled, suspended or have missed intervals to the
2160 // disarmed list, rearm after resume resp. source invoke will reenable them
2161 if (!ds_timer(dr
).target
|| DISPATCH_QUEUE_IS_SUSPENDED(ds
) ||
2162 ds
->ds_pending_data
) {
2163 tidx
= DISPATCH_TIMER_INDEX_DISARM
;
2164 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
2165 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
,
2168 tidx
= _dispatch_source_timer_idx(dr
);
2170 if (slowpath(ds_timer_aggregate(ds
))) {
2171 _dispatch_timer_aggregates_register(ds
);
2173 if (slowpath(!ds
->ds_is_installed
)) {
2174 ds
->ds_is_installed
= true;
2175 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2176 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_ARMED
);
2177 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
2180 _dispatch_object_debug(ds
, "%s", __func__
);
2184 _dispatch_timers_unregister(ds
, dk
);
2186 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
2187 _dispatch_timers_reconfigure
= true;
2188 _dispatch_timers_mask
|= 1 << tidx
;
2190 if (dk
!= &_dispatch_kevent_timer
[tidx
]){
2191 ds
->ds_dkev
= &_dispatch_kevent_timer
[tidx
];
2193 _dispatch_timers_insert(tidx
, _dispatch_kevent_timer
, dr
, dr_list
,
2194 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
2195 if (slowpath(ds_timer_aggregate(ds
))) {
2196 _dispatch_timer_aggregates_update(ds
, tidx
);
2201 _dispatch_timers_run2(uint64_t nows
[], unsigned int tidx
)
2203 dispatch_source_refs_t dr
;
2204 dispatch_source_t ds
;
2205 uint64_t now
, missed
;
2207 now
= _dispatch_source_timer_now(nows
, tidx
);
2208 while ((dr
= TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
))) {
2209 ds
= _dispatch_source_from_refs(dr
);
2210 // We may find timers on the wrong list due to a pending update from
2211 // dispatch_source_set_timer. Force an update of the list in that case.
2212 if (tidx
!= ds
->ds_ident_hack
) {
2213 _dispatch_timers_update(ds
);
2216 if (!ds_timer(dr
).target
) {
2217 // No configured timers on the list
2220 if (ds_timer(dr
).target
> now
) {
2221 // Done running timers for now.
2224 // Remove timers that are suspended or have missed intervals from the
2225 // list, rearm after resume resp. source invoke will reenable them
2226 if (DISPATCH_QUEUE_IS_SUSPENDED(ds
) || ds
->ds_pending_data
) {
2227 _dispatch_timers_update(ds
);
2230 // Calculate number of missed intervals.
2231 missed
= (now
- ds_timer(dr
).target
) / ds_timer(dr
).interval
;
2232 if (++missed
> INT_MAX
) {
2235 if (ds_timer(dr
).interval
< INT64_MAX
) {
2236 ds_timer(dr
).target
+= missed
* ds_timer(dr
).interval
;
2237 ds_timer(dr
).deadline
= ds_timer(dr
).target
+ ds_timer(dr
).leeway
;
2239 ds_timer(dr
).target
= UINT64_MAX
;
2240 ds_timer(dr
).deadline
= UINT64_MAX
;
2242 _dispatch_timers_update(ds
);
2243 ds_timer(dr
).last_fire
= now
;
2246 data
= os_atomic_add2o(ds
, ds_pending_data
,
2247 (unsigned long)missed
, relaxed
);
2248 _dispatch_trace_timer_fire(dr
, data
, (unsigned long)missed
);
2249 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_FLUSH
);
2250 if (ds_timer(dr
).flags
& DISPATCH_TIMER_AFTER
) {
2251 _dispatch_source_kevent_unregister(ds
);
2258 _dispatch_timers_run(uint64_t nows
[])
2261 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2262 if (!TAILQ_EMPTY(&_dispatch_kevent_timer
[tidx
].dk_sources
)) {
2263 _dispatch_timers_run2(nows
, tidx
);
2268 static inline unsigned int
2269 _dispatch_timers_get_delay(uint64_t nows
[], struct dispatch_timer_s timer
[],
2270 uint64_t *delay
, uint64_t *leeway
, int qos
, int kind
)
2272 unsigned int tidx
, ridx
= DISPATCH_TIMER_COUNT
;
2273 uint64_t tmp
, delta
= UINT64_MAX
, dldelta
= UINT64_MAX
;
2275 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2276 if (qos
>= 0 && qos
!= DISPATCH_TIMER_QOS(tidx
)){
2279 if (kind
>= 0 && kind
!= DISPATCH_TIMER_KIND(tidx
)){
2282 uint64_t target
= timer
[tidx
].target
;
2283 if (target
== UINT64_MAX
) {
2286 uint64_t deadline
= timer
[tidx
].deadline
;
2288 // Timer pre-coalescing <rdar://problem/13222034>
2289 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
2290 uint64_t latest
= deadline
> window
? deadline
- window
: 0;
2291 dispatch_source_refs_t dri
;
2292 TAILQ_FOREACH(dri
, &_dispatch_kevent_timer
[tidx
].dk_sources
,
2294 tmp
= ds_timer(dri
).target
;
2295 if (tmp
> latest
) break;
2299 uint64_t now
= _dispatch_source_timer_now(nows
, tidx
);
2300 if (target
<= now
) {
2305 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
2306 tmp
= _dispatch_time_mach2nano(tmp
);
2308 if (tmp
< INT64_MAX
&& tmp
< delta
) {
2312 dispatch_assert(target
<= deadline
);
2313 tmp
= deadline
- now
;
2314 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
2315 tmp
= _dispatch_time_mach2nano(tmp
);
2317 if (tmp
< INT64_MAX
&& tmp
< dldelta
) {
2322 *leeway
= delta
&& delta
< UINT64_MAX
? dldelta
- delta
: UINT64_MAX
;
2328 // in linux we map the _dispatch_kevent_qos_s to struct kevent instead
2329 // of struct kevent64. We loose the kevent.ext[] members and the time
2330 // out is based on relavite msec based time vs. absolute nsec based time.
2331 // For now we make the adjustments right here until the solution
2332 // to either extend libkqueue with a proper kevent64 API or removing kevent
2333 // all together and move to a lower API (e.g. epoll or kernel_module.
2334 // Also leeway is ignored.
2337 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s
*ke
, uint64_t delay
,
2338 uint64_t leeway
, uint64_t nows
[])
2340 // call to update nows[]
2341 _dispatch_source_timer_now(nows
, DISPATCH_TIMER_KIND_WALL
);
2342 // adjust nsec based delay to msec based and ignore leeway
2344 if ((int64_t)(delay
) <= 0) {
2345 delay
= 1; // if value <= 0 the dispatch will stop
2347 ke
->data
= (int64_t)delay
;
2352 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s
*ke
, uint64_t delay
,
2353 uint64_t leeway
, uint64_t nows
[])
2355 delay
+= _dispatch_source_timer_now(nows
, DISPATCH_TIMER_KIND_WALL
);
2356 if (slowpath(_dispatch_timers_force_max_leeway
)) {
2357 ke
->data
= (int64_t)(delay
+ leeway
);
2360 ke
->data
= (int64_t)delay
;
2361 ke
->ext
[1] = leeway
;
2367 _dispatch_timers_program2(uint64_t nows
[], _dispatch_kevent_qos_s
*ke
,
2371 uint64_t delay
, leeway
;
2373 _dispatch_timers_get_delay(nows
, _dispatch_timer
, &delay
, &leeway
,
2374 (int)DISPATCH_TIMER_QOS(tidx
), (int)DISPATCH_TIMER_KIND(tidx
));
2375 poll
= (delay
== 0);
2376 if (poll
|| delay
== UINT64_MAX
) {
2377 _dispatch_trace_next_timer_set(NULL
, DISPATCH_TIMER_QOS(tidx
));
2382 ke
->flags
|= EV_DELETE
;
2383 ke
->flags
&= ~(EV_ADD
|EV_ENABLE
);
2385 _dispatch_trace_next_timer_set(
2386 TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
), DISPATCH_TIMER_QOS(tidx
));
2387 _dispatch_trace_next_timer_program(delay
, DISPATCH_TIMER_QOS(tidx
));
2388 _dispatch_kevent_timer_set_delay(ke
, delay
, leeway
, nows
);
2389 ke
->flags
|= EV_ADD
|EV_ENABLE
;
2390 ke
->flags
&= ~EV_DELETE
;
2391 #if DISPATCH_USE_KEVENT_WORKQUEUE
2392 if (_dispatch_kevent_workqueue_enabled
) {
2393 ke
->qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
2397 _dispatch_kq_deferred_update(ke
);
2403 _dispatch_timers_program(uint64_t nows
[])
2406 unsigned int tidx
, timerm
= _dispatch_timers_mask
;
2407 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMEOUT_COUNT
; tidx
++) {
2408 if (!(timerm
& 1 << tidx
)){
2411 poll
|= _dispatch_timers_program2(nows
, &_dispatch_kevent_timeout
[tidx
],
2419 _dispatch_timers_configure(void)
2421 _dispatch_timer_aggregates_check();
2422 // Find out if there is a new target/deadline on the timer lists
2423 return _dispatch_timers_check(_dispatch_kevent_timer
, _dispatch_timer
);
2428 _dispatch_timers_calendar_change(void)
2432 // calendar change may have gone past the wallclock deadline
2433 _dispatch_timer_expired
= true;
2434 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
2435 _dispatch_timers_mask
|=
2436 1 << DISPATCH_TIMER_INDEX(DISPATCH_TIMER_KIND_WALL
, qos
);
2442 _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
)
2444 dispatch_assert(ke
->data
> 0);
2445 dispatch_assert((ke
->ident
& DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
) ==
2446 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
);
2447 unsigned int tidx
= ke
->ident
& ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
;
2448 dispatch_assert(tidx
< DISPATCH_KEVENT_TIMEOUT_COUNT
);
2449 dispatch_assert(_dispatch_kevent_timeout
[tidx
].data
!= 0);
2450 _dispatch_kevent_timeout
[tidx
].data
= 0; // kevent deleted via EV_ONESHOT
2451 _dispatch_timer_expired
= true;
2452 _dispatch_timers_mask
|= 1 << tidx
;
2453 _dispatch_trace_next_timer_wake(DISPATCH_TIMER_QOS(tidx
));
2457 _dispatch_mgr_timers(void)
2459 uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
] = {};
2460 bool expired
= slowpath(_dispatch_timer_expired
);
2462 _dispatch_timers_run(nows
);
2464 bool reconfigure
= slowpath(_dispatch_timers_reconfigure
);
2465 if (reconfigure
|| expired
) {
2467 reconfigure
= _dispatch_timers_configure();
2468 _dispatch_timers_reconfigure
= false;
2470 if (reconfigure
|| expired
) {
2471 expired
= _dispatch_timer_expired
= _dispatch_timers_program(nows
);
2472 expired
= expired
|| _dispatch_mgr_q
.dq_items_tail
;
2474 _dispatch_timers_mask
= 0;
2480 #pragma mark dispatch_timer_aggregate
2483 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s
) dk_sources
;
2484 } dispatch_timer_aggregate_refs_s
;
2486 typedef struct dispatch_timer_aggregate_s
{
2487 DISPATCH_QUEUE_HEADER(queue
);
2488 TAILQ_ENTRY(dispatch_timer_aggregate_s
) dta_list
;
2489 dispatch_timer_aggregate_refs_s
2490 dta_kevent_timer
[DISPATCH_KEVENT_TIMER_COUNT
];
2492 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s
);
2493 } dta_timer
[DISPATCH_TIMER_COUNT
];
2494 struct dispatch_timer_s dta_timer_data
[DISPATCH_TIMER_COUNT
];
2495 unsigned int dta_refcount
;
2496 } DISPATCH_QUEUE_ALIGN dispatch_timer_aggregate_s
;
2498 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s
) dispatch_timer_aggregates_s
;
2499 static dispatch_timer_aggregates_s _dispatch_timer_aggregates
=
2500 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates
);
2502 dispatch_timer_aggregate_t
2503 dispatch_timer_aggregate_create(void)
2506 dispatch_timer_aggregate_t dta
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
2507 sizeof(struct dispatch_timer_aggregate_s
));
2508 _dispatch_queue_init(dta
->_as_dq
, DQF_NONE
,
2509 DISPATCH_QUEUE_WIDTH_MAX
, false);
2510 dta
->do_targetq
= _dispatch_get_root_queue(
2511 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
2512 //FIXME: aggregates need custom vtable
2513 //dta->dq_label = "timer-aggregate";
2514 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMER_COUNT
; tidx
++) {
2515 TAILQ_INIT(&dta
->dta_kevent_timer
[tidx
].dk_sources
);
2517 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2518 TAILQ_INIT(&dta
->dta_timer
[tidx
].dt_sources
);
2519 dta
->dta_timer
[tidx
].target
= UINT64_MAX
;
2520 dta
->dta_timer
[tidx
].deadline
= UINT64_MAX
;
2521 dta
->dta_timer_data
[tidx
].target
= UINT64_MAX
;
2522 dta
->dta_timer_data
[tidx
].deadline
= UINT64_MAX
;
2524 return (dispatch_timer_aggregate_t
)_dispatch_introspection_queue_create(
2528 typedef struct dispatch_timer_delay_s
{
2529 dispatch_timer_t timer
;
2530 uint64_t delay
, leeway
;
2531 } *dispatch_timer_delay_t
;
2534 _dispatch_timer_aggregate_get_delay(void *ctxt
)
2536 dispatch_timer_delay_t dtd
= ctxt
;
2537 struct { uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
]; } dtn
= {};
2538 _dispatch_timers_get_delay(dtn
.nows
, dtd
->timer
, &dtd
->delay
, &dtd
->leeway
,
2543 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta
,
2544 uint64_t *leeway_ptr
)
2546 struct dispatch_timer_delay_s dtd
= {
2547 .timer
= dta
->dta_timer_data
,
2549 dispatch_sync_f(dta
->_as_dq
, &dtd
, _dispatch_timer_aggregate_get_delay
);
2551 *leeway_ptr
= dtd
.leeway
;
2557 _dispatch_timer_aggregate_update(void *ctxt
)
2559 dispatch_timer_aggregate_t dta
= (void*)_dispatch_queue_get_current();
2560 dispatch_timer_t dtau
= ctxt
;
2562 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2563 dta
->dta_timer_data
[tidx
].target
= dtau
[tidx
].target
;
2564 dta
->dta_timer_data
[tidx
].deadline
= dtau
[tidx
].deadline
;
2571 _dispatch_timer_aggregates_configure(void)
2573 dispatch_timer_aggregate_t dta
;
2574 dispatch_timer_t dtau
;
2575 TAILQ_FOREACH(dta
, &_dispatch_timer_aggregates
, dta_list
) {
2576 if (!_dispatch_timers_check(dta
->dta_kevent_timer
, dta
->dta_timer
)) {
2579 dtau
= _dispatch_calloc(DISPATCH_TIMER_COUNT
, sizeof(*dtau
));
2580 memcpy(dtau
, dta
->dta_timer
, sizeof(dta
->dta_timer
));
2581 _dispatch_barrier_async_detached_f(dta
->_as_dq
, dtau
,
2582 _dispatch_timer_aggregate_update
);
2587 _dispatch_timer_aggregates_check(void)
2589 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates
))) {
2592 _dispatch_timer_aggregates_configure();
2596 _dispatch_timer_aggregates_register(dispatch_source_t ds
)
2598 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2599 if (!dta
->dta_refcount
++) {
2600 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates
, dta
, dta_list
);
2606 _dispatch_timer_aggregates_update(dispatch_source_t ds
, unsigned int tidx
)
2608 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2609 dispatch_timer_source_aggregate_refs_t dr
;
2610 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2611 _dispatch_timers_insert(tidx
, dta
->dta_kevent_timer
, dr
, dra_list
,
2612 dta
->dta_timer
, dr
, dta_list
);
2617 _dispatch_timer_aggregates_unregister(dispatch_source_t ds
, unsigned int tidx
)
2619 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2620 dispatch_timer_source_aggregate_refs_t dr
;
2621 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2622 _dispatch_timers_remove(tidx
, (dispatch_timer_aggregate_refs_s
*)NULL
,
2623 dta
->dta_kevent_timer
, dr
, dra_list
, dta
->dta_timer
, dr
, dta_list
);
2624 if (!--dta
->dta_refcount
) {
2625 TAILQ_REMOVE(&_dispatch_timer_aggregates
, dta
, dta_list
);
2630 #pragma mark dispatch_kqueue
2632 static int _dispatch_kq
;
2634 #if DISPATCH_DEBUG_QOS && DISPATCH_USE_KEVENT_WORKQUEUE
2635 #define _dispatch_kevent_assert_valid_qos(ke) ({ \
2636 if (_dispatch_kevent_workqueue_enabled) { \
2637 const _dispatch_kevent_qos_s *_ke = (ke); \
2638 if (_ke->flags & (EV_ADD|EV_ENABLE)) { \
2639 _dispatch_assert_is_valid_qos_class(\
2640 (pthread_priority_t)_ke->qos); \
2641 dispatch_assert(_ke->qos); \
2646 #define _dispatch_kevent_assert_valid_qos(ke) ((void)ke)
2651 _dispatch_kq_init(void *context DISPATCH_UNUSED
)
2653 _dispatch_fork_becomes_unsafe();
2654 #if DISPATCH_USE_KEVENT_WORKQUEUE
2655 _dispatch_kevent_workqueue_init();
2656 if (_dispatch_kevent_workqueue_enabled
) {
2658 const _dispatch_kevent_qos_s kev
[] = {
2661 .filter
= EVFILT_USER
,
2662 .flags
= EV_ADD
|EV_CLEAR
,
2663 .qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
,
2667 .filter
= EVFILT_USER
,
2668 .fflags
= NOTE_TRIGGER
,
2673 r
= kevent_qos(-1, kev
, 2, NULL
, 0, NULL
, NULL
,
2674 KEVENT_FLAG_WORKQ
|KEVENT_FLAG_IMMEDIATE
);
2675 if (slowpath(r
== -1)) {
2681 DISPATCH_CLIENT_CRASH(err
,
2682 "Failed to initalize workqueue kevent");
2688 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2689 #if DISPATCH_USE_MGR_THREAD
2690 static const _dispatch_kevent_qos_s kev
= {
2692 .filter
= EVFILT_USER
,
2693 .flags
= EV_ADD
|EV_CLEAR
,
2696 _dispatch_fork_becomes_unsafe();
2697 #if DISPATCH_USE_GUARDED_FD
2698 guardid_t guard
= (uintptr_t)&kev
;
2699 _dispatch_kq
= guarded_kqueue_np(&guard
, GUARD_CLOSE
| GUARD_DUP
);
2701 _dispatch_kq
= kqueue();
2703 if (_dispatch_kq
== -1) {
2707 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2708 "process is out of file descriptors");
2711 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2712 "system is out of file descriptors");
2715 DISPATCH_CLIENT_CRASH(err
, "kqueue() failure: "
2716 "kernel is out of memory");
2719 DISPATCH_INTERNAL_CRASH(err
, "kqueue() failure");
2723 (void)dispatch_assume_zero(kevent_qos(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
,
2725 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
, 0);
2726 #endif // DISPATCH_USE_MGR_THREAD
2731 _dispatch_kq_update(const _dispatch_kevent_qos_s
*ke
, int n
)
2734 _dispatch_kevent_qos_s kev_error
[n
];
2735 static dispatch_once_t pred
;
2736 dispatch_once_f(&pred
, NULL
, _dispatch_kq_init
);
2738 for (i
= 0; i
< n
; i
++) {
2739 if (ke
[i
].filter
!= EVFILT_USER
|| DISPATCH_MGR_QUEUE_DEBUG
) {
2740 _dispatch_kevent_debug_n("updating", ke
+ i
, i
, n
);
2744 unsigned int flags
= KEVENT_FLAG_ERROR_EVENTS
;
2745 #if DISPATCH_USE_KEVENT_WORKQUEUE
2746 if (_dispatch_kevent_workqueue_enabled
) {
2747 flags
|= KEVENT_FLAG_WORKQ
;
2752 r
= kevent_qos(_dispatch_kq
, ke
, n
, kev_error
, n
, NULL
, NULL
, flags
);
2753 if (slowpath(r
== -1)) {
2759 DISPATCH_CLIENT_CRASH(err
, "Do not close random Unix descriptors");
2762 (void)dispatch_assume_zero(err
);
2767 for (i
= 0, n
= r
; i
< n
; i
++) {
2768 if (kev_error
[i
].flags
& EV_ERROR
) {
2769 _dispatch_kevent_debug("returned error", &kev_error
[i
]);
2770 _dispatch_kevent_drain(&kev_error
[i
]);
2771 r
= (int)kev_error
[i
].data
;
2773 _dispatch_kevent_mgr_debug(&kev_error
[i
]);
2780 DISPATCH_ALWAYS_INLINE
2782 _dispatch_kq_update_all(const _dispatch_kevent_qos_s
*kev
, int n
)
2784 (void)_dispatch_kq_update(kev
, n
);
2787 DISPATCH_ALWAYS_INLINE
2789 _dispatch_kq_update_one(const _dispatch_kevent_qos_s
*kev
)
2791 return _dispatch_kq_update(kev
, 1);
2795 _dispatch_kevent_maps_to_same_knote(const _dispatch_kevent_qos_s
*e1
,
2796 const _dispatch_kevent_qos_s
*e2
)
2798 return e1
->filter
== e2
->filter
&&
2799 e1
->ident
== e2
->ident
&&
2800 e1
->udata
== e2
->udata
;
2804 _dispatch_deferred_event_find_slot(dispatch_deferred_items_t ddi
,
2805 const _dispatch_kevent_qos_s
*ke
)
2807 _dispatch_kevent_qos_s
*events
= ddi
->ddi_eventlist
;
2810 for (i
= 0; i
< ddi
->ddi_nevents
; i
++) {
2811 if (_dispatch_kevent_maps_to_same_knote(&events
[i
], ke
)) {
2819 _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s
*ke
)
2821 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
2824 _dispatch_kevent_assert_valid_qos(ke
);
2826 if (unlikely(ddi
->ddi_nevents
== ddi
->ddi_maxevents
)) {
2827 _dispatch_deferred_items_set(NULL
);
2828 _dispatch_kq_update_all(ddi
->ddi_eventlist
, ddi
->ddi_nevents
);
2829 ddi
->ddi_nevents
= 0;
2830 _dispatch_deferred_items_set(ddi
);
2832 if (ke
->filter
!= EVFILT_USER
|| DISPATCH_MGR_QUEUE_DEBUG
) {
2833 _dispatch_kevent_debug("deferred", ke
);
2835 bool needs_enable
= false;
2836 slot
= _dispatch_deferred_event_find_slot(ddi
, ke
);
2837 if (slot
== ddi
->ddi_nevents
) {
2839 } else if (ke
->flags
& EV_DELETE
) {
2840 // <rdar://problem/26202376> when deleting and an enable is pending,
2841 // we must merge EV_ENABLE to do an immediate deletion
2842 needs_enable
= (ddi
->ddi_eventlist
[slot
].flags
& EV_ENABLE
);
2844 ddi
->ddi_eventlist
[slot
] = *ke
;
2846 ddi
->ddi_eventlist
[slot
].flags
|= EV_ENABLE
;
2849 _dispatch_kq_update_one(ke
);
2854 _dispatch_kq_immediate_update(_dispatch_kevent_qos_s
*ke
)
2856 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
2859 _dispatch_kevent_assert_valid_qos(ke
);
2861 _dispatch_kevent_qos_s
*events
= ddi
->ddi_eventlist
;
2862 slot
= _dispatch_deferred_event_find_slot(ddi
, ke
);
2863 if (slot
< ddi
->ddi_nevents
) {
2864 // <rdar://problem/26202376> when deleting and an enable is pending,
2865 // we must merge EV_ENABLE to do an immediate deletion
2866 if ((ke
->flags
& EV_DELETE
) && (events
[slot
].flags
& EV_ENABLE
)) {
2867 ke
->flags
|= EV_ENABLE
;
2869 last
= --ddi
->ddi_nevents
;
2871 events
[slot
] = events
[last
];
2875 return _dispatch_kq_update_one(ke
);
2879 #pragma mark dispatch_mgr
2883 _dispatch_mgr_queue_poke(dispatch_queue_t dq DISPATCH_UNUSED
,
2884 pthread_priority_t pp DISPATCH_UNUSED
)
2886 static const _dispatch_kevent_qos_s kev
= {
2888 .filter
= EVFILT_USER
,
2889 .fflags
= NOTE_TRIGGER
,
2892 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2893 _dispatch_debug("waking up the dispatch manager queue: %p", dq
);
2895 _dispatch_kq_deferred_update(&kev
);
2899 _dispatch_mgr_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
2900 dispatch_wakeup_flags_t flags
)
2902 if (flags
& DISPATCH_WAKEUP_FLUSH
) {
2903 os_atomic_or2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, release
);
2906 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
2910 if (!_dispatch_queue_class_probe(&_dispatch_mgr_q
)) {
2914 _dispatch_mgr_queue_poke(dq
, pp
);
2919 _dispatch_event_init(void)
2921 _dispatch_kevent_init();
2922 _dispatch_timers_init();
2923 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
2924 _dispatch_mach_recv_msg_buf_init();
2926 _dispatch_memorypressure_init();
2927 _voucher_activity_debug_channel_init();
2930 #if DISPATCH_USE_MGR_THREAD
2933 _dispatch_mgr_init(void)
2935 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
2936 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2937 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q
,
2938 DISPATCH_INVOKE_STEALING
, NULL
) != owned
) {
2939 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
2941 _dispatch_mgr_priority_init();
2942 _dispatch_event_init();
2947 _dispatch_mgr_wait_for_event(dispatch_deferred_items_t ddi
, bool poll
)
2950 dispatch_assert((size_t)ddi
->ddi_maxevents
< countof(ddi
->ddi_eventlist
));
2953 r
= kevent_qos(_dispatch_kq
, ddi
->ddi_eventlist
, ddi
->ddi_nevents
,
2954 ddi
->ddi_eventlist
+ ddi
->ddi_maxevents
, 1, NULL
, NULL
,
2955 poll
? KEVENT_FLAG_IMMEDIATE
: KEVENT_FLAG_NONE
);
2956 if (slowpath(r
== -1)) {
2962 DISPATCH_CLIENT_CRASH(err
, "Do not close random Unix descriptors");
2965 (void)dispatch_assume_zero(err
);
2969 ddi
->ddi_nevents
= 0;
2973 DISPATCH_NOINLINE DISPATCH_NORETURN
2975 _dispatch_mgr_invoke(void)
2977 dispatch_deferred_items_s ddi
;
2980 ddi
.ddi_magic
= DISPATCH_DEFERRED_ITEMS_MAGIC
;
2981 ddi
.ddi_stashed_pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
2982 ddi
.ddi_nevents
= 0;
2983 ddi
.ddi_maxevents
= 1;
2985 _dispatch_deferred_items_set(&ddi
);
2988 _dispatch_mgr_queue_drain();
2989 poll
= _dispatch_mgr_timers();
2990 poll
= poll
|| _dispatch_queue_class_probe(&_dispatch_mgr_q
);
2991 if (_dispatch_mgr_wait_for_event(&ddi
, poll
)) {
2992 _dispatch_kevent_qos_s
*ke
= ddi
.ddi_eventlist
+ ddi
.ddi_maxevents
;
2993 _dispatch_kevent_debug("received", ke
);
2994 _dispatch_kevent_drain(ke
);
2998 #endif // DISPATCH_USE_MGR_THREAD
3002 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
,
3003 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
3005 #if DISPATCH_USE_KEVENT_WORKQUEUE
3006 if (_dispatch_kevent_workqueue_enabled
) {
3007 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
3008 "kevent workqueue enabled");
3011 #if DISPATCH_USE_MGR_THREAD
3012 _dispatch_mgr_init();
3013 // never returns, so burn bridges behind us & clear stack 2k ahead
3014 _dispatch_clear_stack(2048);
3015 _dispatch_mgr_invoke();
3019 #if DISPATCH_USE_KEVENT_WORKQUEUE
3021 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((pthread_priority_t)(~0ul))
3023 DISPATCH_ALWAYS_INLINE
3024 static inline pthread_priority_t
3025 _dispatch_kevent_worker_thread_init(dispatch_deferred_items_t ddi
)
3027 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
3029 ddi
->ddi_magic
= DISPATCH_DEFERRED_ITEMS_MAGIC
;
3030 ddi
->ddi_nevents
= 0;
3031 ddi
->ddi_maxevents
= countof(ddi
->ddi_eventlist
);
3032 ddi
->ddi_stashed_pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
3034 pthread_priority_t pp
= _dispatch_get_priority();
3035 if (!(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
3036 // If this thread does not have the event manager flag set, don't setup
3037 // as the dispatch manager and let the caller know to only process
3038 // the delivered events.
3040 // Also add the NEEDS_UNBIND flag so that
3041 // _dispatch_priority_compute_update knows it has to unbind
3042 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3043 pp
|= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
3044 _dispatch_thread_setspecific(dispatch_priority_key
,
3045 (void *)(uintptr_t)pp
);
3046 ddi
->ddi_stashed_pp
= 0;
3047 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
;
3050 if ((pp
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) ||
3051 !(pp
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
3052 // When the phtread kext is delivering kevents to us, and pthread
3053 // root queues are in use, then the pthread priority TSD is set
3054 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
3056 // Given that this isn't a valid QoS we need to fixup the TSD,
3057 // and the best option is to clear the qos/priority bits which tells
3058 // us to not do any QoS related calls on this thread.
3060 // However, in that case the manager thread is opted out of QoS,
3061 // as far as pthread is concerned, and can't be turned into
3062 // something else, so we can't stash.
3063 pp
&= (pthread_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
;
3065 // Managers always park without mutating to a regular worker thread, and
3066 // hence never need to unbind from userland, and when draining a manager,
3067 // the NEEDS_UNBIND flag would cause the mutation to happen.
3068 // So we need to strip this flag
3069 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
3070 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
3072 // ensure kevents registered from this thread are registered at manager QoS
3073 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(
3074 (pthread_priority_t
)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
, NULL
);
3075 _dispatch_queue_set_current(&_dispatch_mgr_q
);
3076 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q
,
3077 DISPATCH_INVOKE_STEALING
, NULL
) != owned
) {
3078 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
3080 static int event_thread_init
;
3081 if (!event_thread_init
) {
3082 event_thread_init
= 1;
3083 _dispatch_event_init();
3088 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
3090 _dispatch_kevent_worker_thread_reset(pthread_priority_t old_dp
)
3092 dispatch_queue_t dq
= &_dispatch_mgr_q
;
3093 uint64_t orig_dq_state
;
3095 _dispatch_queue_drain_unlock(dq
, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
,
3097 _dispatch_reset_defaultpriority(old_dp
);
3098 _dispatch_queue_set_current(NULL
);
3099 return _dq_state_is_dirty(orig_dq_state
);
3104 _dispatch_kevent_worker_thread(_dispatch_kevent_qos_s
**events
, int *nevents
)
3106 _dispatch_introspection_thread_add();
3108 if (!events
&& !nevents
) {
3109 // events for worker thread request have already been delivered earlier
3113 _dispatch_kevent_qos_s
*ke
= *events
;
3115 if (!dispatch_assume(n
) || !dispatch_assume(*events
)) return;
3117 dispatch_deferred_items_s ddi
;
3118 pthread_priority_t old_dp
= _dispatch_kevent_worker_thread_init(&ddi
);
3120 _dispatch_deferred_items_set(&ddi
);
3121 for (int i
= 0; i
< n
; i
++) {
3122 _dispatch_kevent_debug("received", ke
);
3123 _dispatch_kevent_drain(ke
++);
3126 if (old_dp
!= DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
3127 _dispatch_mgr_queue_drain();
3128 bool poll
= _dispatch_mgr_timers();
3129 if (_dispatch_kevent_worker_thread_reset(old_dp
)) {
3132 if (poll
) _dispatch_mgr_queue_poke(&_dispatch_mgr_q
, 0);
3134 _dispatch_deferred_items_set(NULL
);
3136 if (ddi
.ddi_stashed_pp
& _PTHREAD_PRIORITY_PRIORITY_MASK
) {
3138 if (ddi
.ddi_nevents
) {
3139 _dispatch_kq_update_all(ddi
.ddi_eventlist
, ddi
.ddi_nevents
);
3141 ddi
.ddi_stashed_pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
3142 return _dispatch_root_queue_drain_deferred_item(ddi
.ddi_stashed_dq
,
3143 ddi
.ddi_stashed_dou
, ddi
.ddi_stashed_pp
);
3144 #ifndef WORKQ_KEVENT_EVENT_BUFFER_LEN
3145 } else if (ddi
.ddi_nevents
> *nevents
) {
3147 _dispatch_kq_update_all(ddi
.ddi_eventlist
, ddi
.ddi_nevents
);
3150 *nevents
= ddi
.ddi_nevents
;
3151 dispatch_static_assert(__builtin_types_compatible_p(typeof(**events
),
3152 typeof(*ddi
.ddi_eventlist
)));
3153 memcpy(*events
, ddi
.ddi_eventlist
,
3154 (size_t)ddi
.ddi_nevents
* sizeof(*ddi
.ddi_eventlist
));
3157 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
3160 #pragma mark dispatch_memorypressure
3162 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3163 #define DISPATCH_MEMORYPRESSURE_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYPRESSURE
3164 #define DISPATCH_MEMORYPRESSURE_SOURCE_MASK ( \
3165 DISPATCH_MEMORYPRESSURE_NORMAL | \
3166 DISPATCH_MEMORYPRESSURE_WARN | \
3167 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3168 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3169 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3170 #define DISPATCH_MEMORYPRESSURE_MALLOC_MASK ( \
3171 DISPATCH_MEMORYPRESSURE_WARN | \
3172 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3173 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3174 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3175 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
3176 #define DISPATCH_MEMORYPRESSURE_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
3177 #define DISPATCH_MEMORYPRESSURE_SOURCE_MASK DISPATCH_VM_PRESSURE
3180 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
3181 static dispatch_source_t _dispatch_memorypressure_source
;
3184 _dispatch_memorypressure_handler(void *context DISPATCH_UNUSED
)
3186 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3187 unsigned long memorypressure
;
3188 memorypressure
= dispatch_source_get_data(_dispatch_memorypressure_source
);
3190 if (memorypressure
& DISPATCH_MEMORYPRESSURE_NORMAL
) {
3191 _dispatch_memory_warn
= false;
3192 _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
3193 #if VOUCHER_USE_MACH_VOUCHER
3194 if (_firehose_task_buffer
) {
3195 firehose_buffer_clear_bank_flags(_firehose_task_buffer
,
3196 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY
);
3200 if (memorypressure
& DISPATCH_MEMORYPRESSURE_WARN
) {
3201 _dispatch_memory_warn
= true;
3202 _dispatch_continuation_cache_limit
=
3203 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYPRESSURE_PRESSURE_WARN
;
3204 #if VOUCHER_USE_MACH_VOUCHER
3205 if (_firehose_task_buffer
) {
3206 firehose_buffer_set_bank_flags(_firehose_task_buffer
,
3207 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY
);
3211 if (memorypressure
& DISPATCH_MEMORYPRESSURE_MALLOC_MASK
) {
3212 malloc_memory_event_handler(memorypressure
& DISPATCH_MEMORYPRESSURE_MALLOC_MASK
);
3214 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
3215 // we must have gotten DISPATCH_VM_PRESSURE
3216 malloc_zone_pressure_relief(0,0);
3221 _dispatch_memorypressure_init(void)
3223 _dispatch_memorypressure_source
= dispatch_source_create(
3224 DISPATCH_MEMORYPRESSURE_SOURCE_TYPE
, 0,
3225 DISPATCH_MEMORYPRESSURE_SOURCE_MASK
,
3226 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true));
3227 dispatch_source_set_event_handler_f(_dispatch_memorypressure_source
,
3228 _dispatch_memorypressure_handler
);
3229 dispatch_activate(_dispatch_memorypressure_source
);
3232 static inline void _dispatch_memorypressure_init(void) {}
3233 #endif // DISPATCH_USE_MEMORYPRESSURE_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
3236 #pragma mark dispatch_mach
3240 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
3241 #define _dispatch_debug_machport(name) \
3242 dispatch_debug_machport((name), __func__)
3244 #define _dispatch_debug_machport(name) ((void)(name))
3247 // Flags for all notifications that are registered/unregistered when a
3248 // send-possible notification is requested/delivered
3249 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
3250 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
3251 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
3252 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3253 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3254 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
3255 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3256 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3258 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
3259 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
3260 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
3262 #define _DISPATCH_MACHPORT_HASH_SIZE 32
3263 #define _DISPATCH_MACHPORT_HASH(x) \
3264 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
3266 #ifndef MACH_RCV_VOUCHER
3267 #define MACH_RCV_VOUCHER 0x00000800
3269 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
3270 #define DISPATCH_MACH_RCV_OPTIONS ( \
3271 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
3272 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
3273 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
3276 #define DISPATCH_MACH_NOTIFICATION_ARMED(dk) ((dk)->dk_kevent.ext[0])
3278 static void _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s
*ke
,
3279 mach_msg_header_t
*hdr
);
3280 static void _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s
*ke
,
3281 mach_msg_header_t
*hdr
);
3282 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds
,
3283 dispatch_source_refs_t dr
, dispatch_kevent_t dk
,
3284 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
,
3285 mach_msg_size_t siz
);
3286 static kern_return_t
_dispatch_mach_notify_update(dispatch_kevent_t dk
,
3287 uint32_t new_flags
, uint32_t del_flags
, uint32_t mask
,
3288 mach_msg_id_t notify_msgid
, mach_port_mscount_t notify_sync
);
3289 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
);
3290 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
3291 dispatch_mach_reply_refs_t dmr
, unsigned int options
);
3292 static void _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm
);
3293 static void _dispatch_mach_msg_recv(dispatch_mach_t dm
,
3294 dispatch_mach_reply_refs_t dmr
, _dispatch_kevent_qos_s
*ke
,
3295 mach_msg_header_t
*hdr
, mach_msg_size_t siz
);
3296 static void _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm
,
3297 const _dispatch_kevent_qos_s
*ke
);
3298 static inline mach_msg_option_t
_dispatch_mach_checkin_options(void);
3300 static const size_t _dispatch_mach_recv_msg_size
=
3301 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE
;
3302 static const size_t dispatch_mach_trailer_size
=
3303 sizeof(dispatch_mach_trailer_t
);
3304 static mach_port_t _dispatch_mach_notify_port
;
3305 static dispatch_source_t _dispatch_mach_notify_source
;
3308 _dispatch_kevent_mach_msg_buf(_dispatch_kevent_qos_s
*ke
)
3310 return (void*)ke
->ext
[0];
3313 static inline mach_msg_size_t
3314 _dispatch_kevent_mach_msg_size(_dispatch_kevent_qos_s
*ke
)
3316 // buffer size in the successful receive case, but message size (like
3317 // msgh_size) in the MACH_RCV_TOO_LARGE case, i.e. add trailer size.
3318 return (mach_msg_size_t
)ke
->ext
[1];
3322 _dispatch_source_type_mach_recv_direct_init(dispatch_source_t ds
,
3323 dispatch_source_type_t type DISPATCH_UNUSED
,
3324 uintptr_t handle DISPATCH_UNUSED
,
3325 unsigned long mask DISPATCH_UNUSED
,
3326 dispatch_queue_t q DISPATCH_UNUSED
)
3328 ds
->ds_pending_data_mask
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
3329 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3330 if (_dispatch_evfilt_machport_direct_enabled
) return;
3331 ds
->ds_dkev
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
3332 ds
->ds_dkev
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
3333 ds
->ds_is_direct_kevent
= false;
3338 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct
= {
3340 .filter
= EVFILT_MACHPORT
,
3341 .flags
= EV_VANISHED
|EV_DISPATCH
|EV_UDATA_SPECIFIC
,
3342 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
3344 .init
= _dispatch_source_type_mach_recv_direct_init
,
3347 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3348 static mach_port_t _dispatch_mach_portset
, _dispatch_mach_recv_portset
;
3349 static _dispatch_kevent_qos_s _dispatch_mach_recv_kevent
= {
3350 .filter
= EVFILT_MACHPORT
,
3351 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
3352 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
3356 _dispatch_mach_recv_msg_buf_init(void)
3358 if (_dispatch_evfilt_machport_direct_enabled
) return;
3359 mach_vm_size_t vm_size
= mach_vm_round_page(
3360 _dispatch_mach_recv_msg_size
+ dispatch_mach_trailer_size
);
3361 mach_vm_address_t vm_addr
= vm_page_size
;
3364 while (slowpath(kr
= mach_vm_allocate(mach_task_self(), &vm_addr
, vm_size
,
3365 VM_FLAGS_ANYWHERE
))) {
3366 if (kr
!= KERN_NO_SPACE
) {
3367 DISPATCH_CLIENT_CRASH(kr
,
3368 "Could not allocate mach msg receive buffer");
3370 _dispatch_temporary_resource_shortage();
3371 vm_addr
= vm_page_size
;
3373 _dispatch_mach_recv_kevent
.ext
[0] = (uintptr_t)vm_addr
;
3374 _dispatch_mach_recv_kevent
.ext
[1] = vm_size
;
3380 _dispatch_source_merge_mach_msg_direct(dispatch_source_t ds
,
3381 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
)
3383 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
3384 dispatch_queue_t cq
= _dispatch_queue_get_current();
3386 // see firehose_client_push_notify_async
3387 _dispatch_queue_set_current(ds
->_as_dq
);
3389 _dispatch_queue_set_current(cq
);
3390 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3396 _dispatch_source_create_mach_msg_direct_recv(mach_port_t recvp
,
3397 const struct dispatch_continuation_s
*dc
)
3399 dispatch_source_t ds
;
3400 ds
= dispatch_source_create(&_dispatch_source_type_mach_recv_direct
,
3401 recvp
, 0, &_dispatch_mgr_q
);
3402 os_atomic_store(&ds
->ds_refs
->ds_handler
[DS_EVENT_HANDLER
],
3403 (dispatch_continuation_t
)dc
, relaxed
);
3408 _dispatch_mach_notify_port_init(void *context DISPATCH_UNUSED
)
3411 #if HAVE_MACH_PORT_CONSTRUCT
3412 mach_port_options_t opts
= { .flags
= MPO_CONTEXT_AS_GUARD
| MPO_STRICT
};
3414 const mach_port_context_t guard
= 0xfeed09071f1ca7edull
;
3416 const mach_port_context_t guard
= 0xff1ca7edull
;
3418 kr
= mach_port_construct(mach_task_self(), &opts
, guard
,
3419 &_dispatch_mach_notify_port
);
3421 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
3422 &_dispatch_mach_notify_port
);
3424 DISPATCH_VERIFY_MIG(kr
);
3426 DISPATCH_CLIENT_CRASH(kr
,
3427 "mach_port_construct() failed: cannot create receive right");
3430 static const struct dispatch_continuation_s dc
= {
3431 .dc_func
= (void*)_dispatch_mach_notify_source_invoke
,
3433 _dispatch_mach_notify_source
= _dispatch_source_create_mach_msg_direct_recv(
3434 _dispatch_mach_notify_port
, &dc
);
3435 dispatch_assert(_dispatch_mach_notify_source
);
3436 dispatch_activate(_dispatch_mach_notify_source
);
3440 _dispatch_get_mach_notify_port(void)
3442 static dispatch_once_t pred
;
3443 dispatch_once_f(&pred
, NULL
, _dispatch_mach_notify_port_init
);
3444 return _dispatch_mach_notify_port
;
3447 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3449 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED
)
3453 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
3454 &_dispatch_mach_recv_portset
);
3455 DISPATCH_VERIFY_MIG(kr
);
3457 DISPATCH_CLIENT_CRASH(kr
,
3458 "mach_port_allocate() failed: cannot create port set");
3460 _dispatch_kevent_qos_s
*ke
= &_dispatch_mach_recv_kevent
;
3461 dispatch_assert(_dispatch_kevent_mach_msg_buf(ke
));
3462 dispatch_assert(dispatch_mach_trailer_size
==
3463 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
3464 DISPATCH_MACH_RCV_TRAILER
)));
3465 ke
->ident
= _dispatch_mach_recv_portset
;
3466 #if DISPATCH_USE_KEVENT_WORKQUEUE
3467 if (_dispatch_kevent_workqueue_enabled
) {
3468 ke
->qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
3471 _dispatch_kq_immediate_update(&_dispatch_mach_recv_kevent
);
3475 _dispatch_get_mach_recv_portset(void)
3477 static dispatch_once_t pred
;
3478 dispatch_once_f(&pred
, NULL
, _dispatch_mach_recv_portset_init
);
3479 return _dispatch_mach_recv_portset
;
3483 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED
)
3485 _dispatch_kevent_qos_s kev
= {
3486 .filter
= EVFILT_MACHPORT
,
3489 #if DISPATCH_USE_KEVENT_WORKQUEUE
3490 if (_dispatch_kevent_workqueue_enabled
) {
3491 kev
.qos
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
3497 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
3498 &_dispatch_mach_portset
);
3499 DISPATCH_VERIFY_MIG(kr
);
3501 DISPATCH_CLIENT_CRASH(kr
,
3502 "mach_port_allocate() failed: cannot create port set");
3504 kev
.ident
= _dispatch_mach_portset
;
3505 _dispatch_kq_immediate_update(&kev
);
3509 _dispatch_get_mach_portset(void)
3511 static dispatch_once_t pred
;
3512 dispatch_once_f(&pred
, NULL
, _dispatch_mach_portset_init
);
3513 return _dispatch_mach_portset
;
3516 static kern_return_t
3517 _dispatch_mach_portset_update(dispatch_kevent_t dk
, mach_port_t mps
)
3519 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
3522 _dispatch_debug_machport(mp
);
3523 kr
= mach_port_move_member(mach_task_self(), mp
, mps
);
3525 DISPATCH_VERIFY_MIG(kr
);
3527 case KERN_INVALID_RIGHT
:
3529 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
3530 "mach_port_move_member() failed ", kr
);
3534 case KERN_INVALID_NAME
:
3536 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
3541 (void)dispatch_assume_zero(kr
);
3545 return mps
? kr
: 0;
3548 static kern_return_t
3549 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
3552 kern_return_t kr
= 0;
3553 dispatch_assert_zero(new_flags
& del_flags
);
3554 if ((new_flags
& _DISPATCH_MACH_RECV_FLAGS
) ||
3555 (del_flags
& _DISPATCH_MACH_RECV_FLAGS
)) {
3557 if (new_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
3558 mps
= _dispatch_get_mach_recv_portset();
3559 } else if ((new_flags
& DISPATCH_MACH_RECV_MESSAGE
) ||
3560 ((del_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) &&
3561 (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE
))) {
3562 mps
= _dispatch_get_mach_portset();
3564 mps
= MACH_PORT_NULL
;
3566 kr
= _dispatch_mach_portset_update(dk
, mps
);
3570 #endif // DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3572 static kern_return_t
3573 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
3576 kern_return_t kr
= 0;
3577 dispatch_assert_zero(new_flags
& del_flags
);
3578 if ((new_flags
& _DISPATCH_MACH_SP_FLAGS
) ||
3579 (del_flags
& _DISPATCH_MACH_SP_FLAGS
)) {
3580 // Requesting a (delayed) non-sync send-possible notification
3581 // registers for both immediate dead-name notification and delayed-arm
3582 // send-possible notification for the port.
3583 // The send-possible notification is armed when a mach_msg() with the
3584 // the MACH_SEND_NOTIFY to the port times out.
3585 // If send-possible is unavailable, fall back to immediate dead-name
3586 // registration rdar://problem/2527840&9008724
3587 kr
= _dispatch_mach_notify_update(dk
, new_flags
, del_flags
,
3588 _DISPATCH_MACH_SP_FLAGS
, MACH_NOTIFY_SEND_POSSIBLE
,
3589 MACH_NOTIFY_SEND_POSSIBLE
== MACH_NOTIFY_DEAD_NAME
? 1 : 0);
3594 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3597 _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s
*ke
)
3599 mach_port_t name
= (mach_port_name_t
)ke
->data
;
3600 dispatch_kevent_t dk
;
3602 _dispatch_debug_machport(name
);
3603 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
3604 if (!dispatch_assume(dk
)) {
3607 _dispatch_mach_portset_update(dk
, MACH_PORT_NULL
); // emulate EV_DISPATCH
3609 _dispatch_kevent_qos_s kev
= {
3611 .filter
= EVFILT_MACHPORT
,
3612 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
3613 .fflags
= DISPATCH_MACH_RECV_MESSAGE
,
3614 .udata
= (uintptr_t)dk
,
3616 _dispatch_kevent_debug("synthetic", &kev
);
3617 _dispatch_kevent_merge(&kev
);
3623 _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s
*ke
)
3625 mach_msg_header_t
*hdr
= _dispatch_kevent_mach_msg_buf(ke
);
3626 mach_msg_size_t siz
;
3627 mach_msg_return_t kr
= (mach_msg_return_t
)ke
->fflags
;
3629 if (!fastpath(hdr
)) {
3630 DISPATCH_INTERNAL_CRASH(kr
, "EVFILT_MACHPORT with no message");
3632 if (fastpath(!kr
)) {
3633 _dispatch_kevent_mach_msg_recv(ke
, hdr
);
3635 } else if (kr
!= MACH_RCV_TOO_LARGE
) {
3637 } else if (!ke
->data
) {
3638 DISPATCH_INTERNAL_CRASH(0, "MACH_RCV_LARGE_IDENTITY with no identity");
3640 if (slowpath(ke
->ext
[1] > (UINT_MAX
- dispatch_mach_trailer_size
))) {
3641 DISPATCH_INTERNAL_CRASH(ke
->ext
[1],
3642 "EVFILT_MACHPORT with overlarge message");
3644 siz
= _dispatch_kevent_mach_msg_size(ke
) + dispatch_mach_trailer_size
;
3646 if (!dispatch_assume(hdr
)) {
3647 // Kernel will discard message too large to fit
3651 mach_port_t name
= (mach_port_name_t
)ke
->data
;
3652 const mach_msg_option_t options
= ((DISPATCH_MACH_RCV_OPTIONS
|
3653 MACH_RCV_TIMEOUT
) & ~MACH_RCV_LARGE
);
3654 kr
= mach_msg(hdr
, options
, 0, siz
, name
, MACH_MSG_TIMEOUT_NONE
,
3656 if (fastpath(!kr
)) {
3657 _dispatch_kevent_mach_msg_recv(ke
, hdr
);
3659 } else if (kr
== MACH_RCV_TOO_LARGE
) {
3660 _dispatch_log("BUG in libdispatch client: "
3661 "_dispatch_kevent_mach_msg_drain: dropped message too "
3662 "large to fit in memory: id = 0x%x, size = %u",
3663 hdr
->msgh_id
, _dispatch_kevent_mach_msg_size(ke
));
3664 kr
= MACH_MSG_SUCCESS
;
3666 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3671 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
3672 "message reception failed", kr
);
3678 _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s
*ke
)
3680 if (unlikely(!(ke
->flags
& EV_UDATA_SPECIFIC
))) {
3681 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3682 if (ke
->ident
== _dispatch_mach_recv_portset
) {
3683 _dispatch_kevent_mach_msg_drain(ke
);
3684 return _dispatch_kq_deferred_update(&_dispatch_mach_recv_kevent
);
3685 } else if (ke
->ident
== _dispatch_mach_portset
) {
3686 return _dispatch_kevent_machport_drain(ke
);
3689 return _dispatch_kevent_error(ke
);
3692 dispatch_kevent_t dk
= (dispatch_kevent_t
)ke
->udata
;
3693 dispatch_source_refs_t dr
= TAILQ_FIRST(&dk
->dk_sources
);
3694 bool is_reply
= (dk
->dk_kevent
.flags
& EV_ONESHOT
);
3695 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
3697 if (_dispatch_kevent_mach_msg_size(ke
)) {
3698 _dispatch_kevent_mach_msg_drain(ke
);
3700 // _dispatch_kevent_mach_msg_drain() should have deleted this event
3701 dispatch_assert(ke
->flags
& EV_DELETE
);
3705 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
3706 // re-arm the mach channel
3707 ke
->fflags
= DISPATCH_MACH_RCV_OPTIONS
;
3711 return _dispatch_kq_deferred_update(ke
);
3713 } else if (is_reply
) {
3714 DISPATCH_INTERNAL_CRASH(ke
->flags
, "Unexpected EVFILT_MACHPORT event");
3716 if (unlikely((ke
->flags
& EV_VANISHED
) &&
3717 (dx_type(ds
) == DISPATCH_MACH_CHANNEL_TYPE
))) {
3718 DISPATCH_CLIENT_CRASH(ke
->flags
,
3719 "Unexpected EV_VANISHED (do not destroy random mach ports)");
3721 return _dispatch_kevent_merge(ke
);
3725 _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s
*ke
,
3726 mach_msg_header_t
*hdr
)
3728 dispatch_source_refs_t dri
;
3729 dispatch_kevent_t dk
;
3730 mach_port_t name
= hdr
->msgh_local_port
;
3731 mach_msg_size_t siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
3733 if (!dispatch_assume(hdr
->msgh_size
<= UINT_MAX
-
3734 dispatch_mach_trailer_size
)) {
3735 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3736 "received overlarge message");
3737 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3739 if (!dispatch_assume(name
)) {
3740 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3741 "received message with MACH_PORT_NULL port");
3742 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3744 _dispatch_debug_machport(name
);
3745 if (ke
->flags
& EV_UDATA_SPECIFIC
) {
3746 dk
= (void*)ke
->udata
;
3748 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
3750 if (!dispatch_assume(dk
)) {
3751 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3752 "received message with unknown kevent");
3753 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3755 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
3756 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3757 if (dsi
->ds_pending_data_mask
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
3758 return _dispatch_source_merge_mach_msg(dsi
, dri
, dk
, ke
, hdr
, siz
);
3761 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3762 "received message with no listeners");
3763 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
3767 _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s
*ke
,
3768 mach_msg_header_t
*hdr
)
3771 mach_msg_destroy(hdr
);
3772 if (hdr
!= _dispatch_kevent_mach_msg_buf(ke
)) {
3779 _dispatch_source_merge_mach_msg(dispatch_source_t ds
, dispatch_source_refs_t dr
,
3780 dispatch_kevent_t dk
, _dispatch_kevent_qos_s
*ke
,
3781 mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
3783 if (dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
) {
3784 return _dispatch_source_merge_mach_msg_direct(ds
, ke
, hdr
);
3786 dispatch_mach_reply_refs_t dmr
= NULL
;
3787 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
3788 dmr
= (dispatch_mach_reply_refs_t
)dr
;
3790 return _dispatch_mach_msg_recv((dispatch_mach_t
)ds
, dmr
, ke
, hdr
, siz
);
3795 _dispatch_mach_notify_merge(mach_port_t name
, uint32_t flag
, bool final
)
3797 dispatch_source_refs_t dri
, dr_next
;
3798 dispatch_kevent_t dk
;
3801 dk
= _dispatch_kevent_find(name
, DISPATCH_EVFILT_MACH_NOTIFICATION
);
3806 // Update notification registration state.
3807 dk
->dk_kevent
.data
&= ~_DISPATCH_MACH_SP_FLAGS
;
3808 _dispatch_kevent_qos_s kev
= {
3810 .filter
= DISPATCH_EVFILT_MACH_NOTIFICATION
,
3811 .flags
= EV_ADD
|EV_ENABLE
,
3813 .udata
= (uintptr_t)dk
,
3816 // This can never happen again
3819 // Re-register for notification before delivery
3820 unreg
= _dispatch_kevent_resume(dk
, flag
, 0);
3822 DISPATCH_MACH_NOTIFICATION_ARMED(dk
) = 0;
3823 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
3824 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3825 if (dx_type(dsi
) == DISPATCH_MACH_CHANNEL_TYPE
) {
3826 dispatch_mach_t dm
= (dispatch_mach_t
)dsi
;
3827 _dispatch_mach_merge_notification_kevent(dm
, &kev
);
3828 if (unreg
&& dm
->dm_dkev
) {
3829 _dispatch_mach_notification_kevent_unregister(dm
);
3832 _dispatch_source_merge_kevent(dsi
, &kev
);
3834 _dispatch_source_kevent_unregister(dsi
);
3837 if (!dr_next
|| DISPATCH_MACH_NOTIFICATION_ARMED(dk
)) {
3838 // current merge is last in list (dk might have been freed)
3839 // or it re-armed the notification
3845 static kern_return_t
3846 _dispatch_mach_notify_update(dispatch_kevent_t dk
, uint32_t new_flags
,
3847 uint32_t del_flags
, uint32_t mask
, mach_msg_id_t notify_msgid
,
3848 mach_port_mscount_t notify_sync
)
3850 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
3851 typeof(dk
->dk_kevent
.data
) prev
= dk
->dk_kevent
.data
;
3852 kern_return_t kr
, krr
= 0;
3854 // Update notification registration state.
3855 dk
->dk_kevent
.data
|= (new_flags
| dk
->dk_kevent
.fflags
) & mask
;
3856 dk
->dk_kevent
.data
&= ~(del_flags
& mask
);
3858 _dispatch_debug_machport(port
);
3859 if ((dk
->dk_kevent
.data
& mask
) && !(prev
& mask
)) {
3860 _dispatch_debug("machport[0x%08x]: registering for send-possible "
3861 "notification", port
);
3862 previous
= MACH_PORT_NULL
;
3863 krr
= mach_port_request_notification(mach_task_self(), port
,
3864 notify_msgid
, notify_sync
, _dispatch_get_mach_notify_port(),
3865 MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
3866 DISPATCH_VERIFY_MIG(krr
);
3869 case KERN_INVALID_NAME
:
3870 case KERN_INVALID_RIGHT
:
3871 // Suppress errors & clear registration state
3872 dk
->dk_kevent
.data
&= ~mask
;
3875 // Else, we don't expect any errors from mach. Log any errors
3876 if (dispatch_assume_zero(krr
)) {
3877 // log the error & clear registration state
3878 dk
->dk_kevent
.data
&= ~mask
;
3879 } else if (dispatch_assume_zero(previous
)) {
3880 // Another subsystem has beat libdispatch to requesting the
3881 // specified Mach notification on this port. We should
3882 // technically cache the previous port and message it when the
3883 // kernel messages our port. Or we can just say screw those
3884 // subsystems and deallocate the previous port.
3885 // They should adopt libdispatch :-P
3886 kr
= mach_port_deallocate(mach_task_self(), previous
);
3887 DISPATCH_VERIFY_MIG(kr
);
3888 (void)dispatch_assume_zero(kr
);
3889 previous
= MACH_PORT_NULL
;
3892 } else if (!(dk
->dk_kevent
.data
& mask
) && (prev
& mask
)) {
3893 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
3894 "notification", port
);
3895 previous
= MACH_PORT_NULL
;
3896 kr
= mach_port_request_notification(mach_task_self(), port
,
3897 notify_msgid
, notify_sync
, MACH_PORT_NULL
,
3898 MACH_MSG_TYPE_MOVE_SEND_ONCE
, &previous
);
3899 DISPATCH_VERIFY_MIG(kr
);
3902 case KERN_INVALID_NAME
:
3903 case KERN_INVALID_RIGHT
:
3904 case KERN_INVALID_ARGUMENT
:
3907 if (dispatch_assume_zero(kr
)) {
3914 if (slowpath(previous
)) {
3915 // the kernel has not consumed the send-once right yet
3916 (void)dispatch_assume_zero(
3917 _dispatch_send_consume_send_once_right(previous
));
3923 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED
)
3925 static int notify_type
= HOST_NOTIFY_CALENDAR_SET
;
3927 _dispatch_debug("registering for calendar-change notification");
3929 kr
= host_request_notification(_dispatch_get_mach_host_port(),
3930 notify_type
, _dispatch_get_mach_notify_port());
3931 // Fallback when missing support for newer _SET variant, fires strictly more.
3932 if (kr
== KERN_INVALID_ARGUMENT
&&
3933 notify_type
!= HOST_NOTIFY_CALENDAR_CHANGE
){
3934 notify_type
= HOST_NOTIFY_CALENDAR_CHANGE
;
3937 DISPATCH_VERIFY_MIG(kr
);
3938 (void)dispatch_assume_zero(kr
);
3942 _dispatch_mach_host_calendar_change_register(void)
3944 static dispatch_once_t pred
;
3945 dispatch_once_f(&pred
, NULL
, _dispatch_mach_host_notify_update
);
3949 _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
)
3951 mig_reply_error_t reply
;
3952 dispatch_assert(sizeof(mig_reply_error_t
) == sizeof(union
3953 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
));
3954 dispatch_assert(sizeof(mig_reply_error_t
) < _dispatch_mach_recv_msg_size
);
3955 boolean_t success
= libdispatch_internal_protocol_server(hdr
, &reply
.Head
);
3956 if (!success
&& reply
.RetCode
== MIG_BAD_ID
&&
3957 (hdr
->msgh_id
== HOST_CALENDAR_SET_REPLYID
||
3958 hdr
->msgh_id
== HOST_CALENDAR_CHANGED_REPLYID
)) {
3959 _dispatch_debug("calendar-change notification");
3960 _dispatch_timers_calendar_change();
3961 _dispatch_mach_host_notify_update(NULL
);
3963 reply
.RetCode
= KERN_SUCCESS
;
3965 if (dispatch_assume(success
) && reply
.RetCode
!= MIG_NO_REPLY
) {
3966 (void)dispatch_assume_zero(reply
.RetCode
);
3968 if (!success
|| (reply
.RetCode
&& reply
.RetCode
!= MIG_NO_REPLY
)) {
3969 mach_msg_destroy(hdr
);
3974 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED
,
3975 mach_port_name_t name
)
3978 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
3979 "deleted prematurely", name
);
3982 _dispatch_debug_machport(name
);
3983 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DELETED
, true);
3985 return KERN_SUCCESS
;
3989 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED
,
3990 mach_port_name_t name
)
3994 _dispatch_debug("machport[0x%08x]: dead-name notification", name
);
3995 _dispatch_debug_machport(name
);
3996 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DEAD
, true);
3998 // the act of receiving a dead name notification allocates a dead-name
3999 // right that must be deallocated
4000 kr
= mach_port_deallocate(mach_task_self(), name
);
4001 DISPATCH_VERIFY_MIG(kr
);
4002 //(void)dispatch_assume_zero(kr);
4004 return KERN_SUCCESS
;
4008 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED
,
4009 mach_port_name_t name
)
4011 _dispatch_debug("machport[0x%08x]: send-possible notification", name
);
4012 _dispatch_debug_machport(name
);
4013 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_POSSIBLE
, false);
4015 return KERN_SUCCESS
;
4019 #pragma mark dispatch_mach_t
4021 #define DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT 0x1
4022 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
4023 #define DISPATCH_MACH_WAIT_FOR_REPLY 0x4
4024 #define DISPATCH_MACH_OWNED_REPLY_PORT 0x8
4025 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
4027 #define DM_SEND_STATUS_SUCCESS 0x1
4028 #define DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT 0x2
4030 DISPATCH_ENUM(dispatch_mach_send_invoke_flags
, uint32_t,
4031 DM_SEND_INVOKE_NONE
= 0x0,
4032 DM_SEND_INVOKE_FLUSH
= 0x1,
4033 DM_SEND_INVOKE_NEEDS_BARRIER
= 0x2,
4034 DM_SEND_INVOKE_CANCEL
= 0x4,
4035 DM_SEND_INVOKE_CAN_RUN_BARRIER
= 0x8,
4036 DM_SEND_INVOKE_IMMEDIATE_SEND
= 0x10,
4038 #define DM_SEND_INVOKE_IMMEDIATE_SEND_MASK \
4039 ((dispatch_mach_send_invoke_flags_t)DM_SEND_INVOKE_IMMEDIATE_SEND)
4041 static inline pthread_priority_t
_dispatch_mach_priority_propagate(
4042 mach_msg_option_t options
);
4043 static mach_port_t
_dispatch_mach_msg_get_remote_port(dispatch_object_t dou
);
4044 static mach_port_t
_dispatch_mach_msg_get_reply_port(dispatch_object_t dou
);
4045 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm
,
4046 mach_port_t local_port
, mach_port_t remote_port
);
4047 static inline void _dispatch_mach_msg_reply_received(dispatch_mach_t dm
,
4048 dispatch_mach_reply_refs_t dmr
, mach_port_t local_port
);
4049 static dispatch_mach_msg_t
_dispatch_mach_msg_create_reply_disconnected(
4050 dispatch_object_t dou
, dispatch_mach_reply_refs_t dmr
);
4051 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
,
4052 dispatch_object_t dou
);
4053 static inline mach_msg_header_t
* _dispatch_mach_msg_get_msg(
4054 dispatch_mach_msg_t dmsg
);
4055 static void _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_object_t dou
,
4056 pthread_priority_t pp
);
4058 static dispatch_mach_t
4059 _dispatch_mach_create(const char *label
, dispatch_queue_t q
, void *context
,
4060 dispatch_mach_handler_function_t handler
, bool handler_is_block
)
4063 dispatch_mach_refs_t dr
;
4065 dm
= _dispatch_alloc(DISPATCH_VTABLE(mach
),
4066 sizeof(struct dispatch_mach_s
));
4067 _dispatch_queue_init(dm
->_as_dq
, DQF_NONE
, 1, true);
4069 dm
->dq_label
= label
;
4070 dm
->do_ref_cnt
++; // the reference _dispatch_mach_cancel_invoke holds
4072 dr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s
));
4073 dr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4074 dr
->dm_handler_func
= handler
;
4075 dr
->dm_handler_ctxt
= context
;
4077 dm
->dm_handler_is_block
= handler_is_block
;
4079 dm
->dm_refs
= _dispatch_calloc(1ul,
4080 sizeof(struct dispatch_mach_send_refs_s
));
4081 dm
->dm_refs
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4082 dm
->dm_refs
->dm_disconnect_cnt
= DISPATCH_MACH_NEVER_CONNECTED
;
4083 TAILQ_INIT(&dm
->dm_refs
->dm_replies
);
4086 q
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
4088 _dispatch_retain(q
);
4091 _dispatch_object_debug(dm
, "%s", __func__
);
4096 dispatch_mach_create(const char *label
, dispatch_queue_t q
,
4097 dispatch_mach_handler_t handler
)
4099 dispatch_block_t bb
= _dispatch_Block_copy((void*)handler
);
4100 return _dispatch_mach_create(label
, q
, bb
,
4101 (dispatch_mach_handler_function_t
)_dispatch_Block_invoke(bb
), true);
4105 dispatch_mach_create_f(const char *label
, dispatch_queue_t q
, void *context
,
4106 dispatch_mach_handler_function_t handler
)
4108 return _dispatch_mach_create(label
, q
, context
, handler
, false);
4112 _dispatch_mach_dispose(dispatch_mach_t dm
)
4114 _dispatch_object_debug(dm
, "%s", __func__
);
4115 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4116 if (dm
->dm_handler_is_block
&& dr
->dm_handler_ctxt
) {
4117 Block_release(dr
->dm_handler_ctxt
);
4121 _dispatch_queue_destroy(dm
->_as_dq
);
4125 dispatch_mach_connect(dispatch_mach_t dm
, mach_port_t receive
,
4126 mach_port_t send
, dispatch_mach_msg_t checkin
)
4128 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4129 dispatch_kevent_t dk
;
4130 uint32_t disconnect_cnt
;
4131 dispatch_source_type_t type
= &_dispatch_source_type_mach_recv_direct
;
4133 dm
->ds_is_direct_kevent
= (bool)_dispatch_evfilt_machport_direct_enabled
;
4134 if (MACH_PORT_VALID(receive
)) {
4135 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4136 dk
->dk_kevent
= type
->ke
;
4137 dk
->dk_kevent
.ident
= receive
;
4138 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
|EV_VANISHED
;
4139 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4140 TAILQ_INIT(&dk
->dk_sources
);
4142 dm
->ds_pending_data_mask
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
4143 dm
->ds_needs_rearm
= dm
->ds_is_direct_kevent
;
4144 if (!dm
->ds_is_direct_kevent
) {
4145 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
;
4146 dk
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
4148 _dispatch_retain(dm
); // the reference the manager queue holds
4151 if (MACH_PORT_VALID(send
)) {
4153 dispatch_retain(checkin
);
4154 checkin
->dmsg_options
= _dispatch_mach_checkin_options();
4155 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
4157 dr
->dm_checkin
= checkin
;
4159 // monitor message reply ports
4160 dm
->ds_pending_data_mask
|= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4161 dispatch_assert(DISPATCH_MACH_NEVER_CONNECTED
- 1 ==
4162 DISPATCH_MACH_NEVER_INSTALLED
);
4163 disconnect_cnt
= os_atomic_dec2o(dr
, dm_disconnect_cnt
, release
);
4164 if (unlikely(disconnect_cnt
!= DISPATCH_MACH_NEVER_INSTALLED
)) {
4165 DISPATCH_CLIENT_CRASH(disconnect_cnt
, "Channel already connected");
4167 _dispatch_object_debug(dm
, "%s", __func__
);
4168 return dispatch_activate(dm
);
4171 // assumes low bit of mach port names is always set
4172 #define DISPATCH_MACH_REPLY_PORT_UNOWNED 0x1u
4175 _dispatch_mach_reply_mark_reply_port_owned(dispatch_mach_reply_refs_t dmr
)
4177 dmr
->dmr_reply
&= ~DISPATCH_MACH_REPLY_PORT_UNOWNED
;
4181 _dispatch_mach_reply_is_reply_port_owned(dispatch_mach_reply_refs_t dmr
)
4183 mach_port_t reply_port
= dmr
->dmr_reply
;
4184 return reply_port
? !(reply_port
& DISPATCH_MACH_REPLY_PORT_UNOWNED
) :false;
4187 static inline mach_port_t
4188 _dispatch_mach_reply_get_reply_port(dispatch_mach_reply_refs_t dmr
)
4190 mach_port_t reply_port
= dmr
->dmr_reply
;
4191 return reply_port
? (reply_port
| DISPATCH_MACH_REPLY_PORT_UNOWNED
) : 0;
4195 _dispatch_mach_reply_tryremove(dispatch_mach_t dm
,
4196 dispatch_mach_reply_refs_t dmr
)
4199 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4200 if ((removed
= _TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4201 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4202 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4204 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4210 _dispatch_mach_reply_waiter_unregister(dispatch_mach_t dm
,
4211 dispatch_mach_reply_refs_t dmr
, unsigned int options
)
4213 dispatch_mach_msg_t dmsgr
= NULL
;
4214 bool disconnected
= (options
& DKEV_UNREGISTER_DISCONNECTED
);
4215 if (options
& DKEV_UNREGISTER_REPLY_REMOVE
) {
4216 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4217 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4218 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4220 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4221 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4222 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4225 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(NULL
, dmr
);
4226 } else if (dmr
->dmr_voucher
) {
4227 _voucher_release(dmr
->dmr_voucher
);
4228 dmr
->dmr_voucher
= NULL
;
4230 _dispatch_debug("machport[0x%08x]: unregistering for sync reply%s, ctxt %p",
4231 _dispatch_mach_reply_get_reply_port(dmr
),
4232 disconnected
? " (disconnected)" : "", dmr
->dmr_ctxt
);
4234 return _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4236 dispatch_assert(!(options
& DKEV_UNREGISTER_WAKEUP
));
4241 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
4242 dispatch_mach_reply_refs_t dmr
, unsigned int options
)
4244 dispatch_mach_msg_t dmsgr
= NULL
;
4245 bool replies_empty
= false;
4246 bool disconnected
= (options
& DKEV_UNREGISTER_DISCONNECTED
);
4247 if (options
& DKEV_UNREGISTER_REPLY_REMOVE
) {
4248 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4249 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4250 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4252 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4253 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
4254 replies_empty
= TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
);
4255 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4258 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(NULL
, dmr
);
4259 } else if (dmr
->dmr_voucher
) {
4260 _voucher_release(dmr
->dmr_voucher
);
4261 dmr
->dmr_voucher
= NULL
;
4263 uint32_t flags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4264 dispatch_kevent_t dk
= dmr
->dmr_dkev
;
4265 _dispatch_debug("machport[0x%08x]: unregistering for reply%s, ctxt %p",
4266 (mach_port_t
)dk
->dk_kevent
.ident
,
4267 disconnected
? " (disconnected)" : "", dmr
->dmr_ctxt
);
4268 if (!dm
->ds_is_direct_kevent
) {
4269 dmr
->dmr_dkev
= NULL
;
4270 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dmr
, dr_list
);
4271 _dispatch_kevent_unregister(dk
, flags
, 0);
4273 long r
= _dispatch_kevent_unregister(dk
, flags
, options
);
4274 if (r
== EINPROGRESS
) {
4275 _dispatch_debug("machport[0x%08x]: deferred delete kevent[%p]",
4276 (mach_port_t
)dk
->dk_kevent
.ident
, dk
);
4277 dispatch_assert(options
== DKEV_UNREGISTER_DISCONNECTED
);
4278 // dmr must be put back so that the event delivery finds it, the
4279 // replies lock is held by the caller.
4280 TAILQ_INSERT_HEAD(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4282 dmr
->dmr_voucher
= dmsgr
->dmsg_voucher
;
4283 dmsgr
->dmsg_voucher
= NULL
;
4284 dispatch_release(dmsgr
);
4286 return; // deferred unregistration
4288 dispatch_assume_zero(r
);
4289 dmr
->dmr_dkev
= NULL
;
4290 _TAILQ_TRASH_ENTRY(dmr
, dr_list
);
4294 return _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4296 if ((options
& DKEV_UNREGISTER_WAKEUP
) && replies_empty
&&
4297 (dm
->dm_refs
->dm_disconnect_cnt
||
4298 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
4299 dx_wakeup(dm
, 0, DISPATCH_WAKEUP_FLUSH
);
4305 _dispatch_mach_reply_waiter_register(dispatch_mach_t dm
,
4306 dispatch_mach_reply_refs_t dmr
, mach_port_t reply_port
,
4307 dispatch_mach_msg_t dmsg
, mach_msg_option_t msg_opts
)
4309 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4310 dmr
->dmr_dkev
= NULL
;
4311 dmr
->dmr_reply
= reply_port
;
4312 if (msg_opts
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
4313 _dispatch_mach_reply_mark_reply_port_owned(dmr
);
4315 if (dmsg
->dmsg_voucher
) {
4316 dmr
->dmr_voucher
= _voucher_retain(dmsg
->dmsg_voucher
);
4318 dmr
->dmr_priority
= (dispatch_priority_t
)dmsg
->dmsg_priority
;
4319 // make reply context visible to leaks rdar://11777199
4320 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
4323 _dispatch_debug("machport[0x%08x]: registering for sync reply, ctxt %p",
4324 reply_port
, dmsg
->do_ctxt
);
4325 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4326 if (unlikely(_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4327 DISPATCH_INTERNAL_CRASH(dmr
->dmr_list
.tqe_prev
, "Reply already registered");
4329 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4330 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4335 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm
, mach_port_t reply_port
,
4336 dispatch_mach_msg_t dmsg
)
4338 dispatch_kevent_t dk
;
4339 dispatch_mach_reply_refs_t dmr
;
4340 dispatch_source_type_t type
= &_dispatch_source_type_mach_recv_direct
;
4341 pthread_priority_t mp
, pp
;
4343 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4344 dk
->dk_kevent
= type
->ke
;
4345 dk
->dk_kevent
.ident
= reply_port
;
4346 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
|EV_ONESHOT
;
4347 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4348 TAILQ_INIT(&dk
->dk_sources
);
4349 if (!dm
->ds_is_direct_kevent
) {
4350 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
4351 dk
->dk_kevent
.flags
&= ~(EV_UDATA_SPECIFIC
|EV_VANISHED
);
4354 dmr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s
));
4355 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
4357 dmr
->dmr_reply
= reply_port
;
4358 if (dmsg
->dmsg_voucher
) {
4359 dmr
->dmr_voucher
= _voucher_retain(dmsg
->dmsg_voucher
);
4361 dmr
->dmr_priority
= (dispatch_priority_t
)dmsg
->dmsg_priority
;
4362 // make reply context visible to leaks rdar://11777199
4363 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
4365 pp
= dm
->dq_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
4366 if (pp
&& dm
->ds_is_direct_kevent
) {
4367 mp
= dmsg
->dmsg_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
4368 if (pp
< mp
) pp
= mp
;
4369 pp
|= dm
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4371 pp
= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
;
4374 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p",
4375 reply_port
, dmsg
->do_ctxt
);
4377 bool do_resume
= _dispatch_kevent_register(&dmr
->dmr_dkev
, pp
, &flags
);
4378 TAILQ_INSERT_TAIL(&dmr
->dmr_dkev
->dk_sources
, (dispatch_source_refs_t
)dmr
,
4380 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
4381 if (unlikely(_TAILQ_IS_ENQUEUED(dmr
, dmr_list
))) {
4382 DISPATCH_INTERNAL_CRASH(dmr
->dmr_list
.tqe_prev
, "Reply already registered");
4384 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
4385 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
4386 if (do_resume
&& _dispatch_kevent_resume(dmr
->dmr_dkev
, flags
, 0)) {
4387 return _dispatch_mach_reply_kevent_unregister(dm
, dmr
,
4388 DKEV_UNREGISTER_DISCONNECTED
|DKEV_UNREGISTER_REPLY_REMOVE
);
4394 _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm
)
4396 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4397 dispatch_kevent_t dk
= dm
->dm_dkev
;
4399 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dm
->dm_refs
,
4401 dm
->ds_pending_data_mask
&= ~(unsigned long)
4402 (DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
);
4403 _dispatch_kevent_unregister(dk
,
4404 DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
, 0);
4409 _dispatch_mach_notification_kevent_register(dispatch_mach_t dm
,mach_port_t send
)
4411 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4412 dispatch_kevent_t dk
;
4414 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
4415 dk
->dk_kevent
= _dispatch_source_type_mach_send
.ke
;
4416 dk
->dk_kevent
.ident
= send
;
4417 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
4418 dk
->dk_kevent
.fflags
= DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
;
4419 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
4420 TAILQ_INIT(&dk
->dk_sources
);
4422 dm
->ds_pending_data_mask
|= dk
->dk_kevent
.fflags
;
4425 bool do_resume
= _dispatch_kevent_register(&dk
,
4426 _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
, &flags
);
4427 TAILQ_INSERT_TAIL(&dk
->dk_sources
,
4428 (dispatch_source_refs_t
)dm
->dm_refs
, dr_list
);
4430 if (do_resume
&& _dispatch_kevent_resume(dm
->dm_dkev
, flags
, 0)) {
4431 _dispatch_mach_notification_kevent_unregister(dm
);
4436 _dispatch_get_thread_reply_port(void)
4438 mach_port_t reply_port
, mrp
= _dispatch_get_thread_mig_reply_port();
4441 _dispatch_debug("machport[0x%08x]: borrowed thread sync reply port",
4444 reply_port
= mach_reply_port();
4445 _dispatch_set_thread_mig_reply_port(reply_port
);
4446 _dispatch_debug("machport[0x%08x]: allocated thread sync reply port",
4449 _dispatch_debug_machport(reply_port
);
4454 _dispatch_clear_thread_reply_port(mach_port_t reply_port
)
4456 mach_port_t mrp
= _dispatch_get_thread_mig_reply_port();
4457 if (reply_port
!= mrp
) {
4459 _dispatch_debug("machport[0x%08x]: did not clear thread sync reply "
4460 "port (found 0x%08x)", reply_port
, mrp
);
4464 _dispatch_set_thread_mig_reply_port(MACH_PORT_NULL
);
4465 _dispatch_debug_machport(reply_port
);
4466 _dispatch_debug("machport[0x%08x]: cleared thread sync reply port",
4471 _dispatch_set_thread_reply_port(mach_port_t reply_port
)
4473 _dispatch_debug_machport(reply_port
);
4474 mach_port_t mrp
= _dispatch_get_thread_mig_reply_port();
4476 kern_return_t kr
= mach_port_mod_refs(mach_task_self(), reply_port
,
4477 MACH_PORT_RIGHT_RECEIVE
, -1);
4478 DISPATCH_VERIFY_MIG(kr
);
4479 dispatch_assume_zero(kr
);
4480 _dispatch_debug("machport[0x%08x]: deallocated sync reply port "
4481 "(found 0x%08x)", reply_port
, mrp
);
4483 _dispatch_set_thread_mig_reply_port(reply_port
);
4484 _dispatch_debug("machport[0x%08x]: restored thread sync reply port",
4489 static inline mach_port_t
4490 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou
)
4492 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
4493 mach_port_t remote
= hdr
->msgh_remote_port
;
4497 static inline mach_port_t
4498 _dispatch_mach_msg_get_reply_port(dispatch_object_t dou
)
4500 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
4501 mach_port_t local
= hdr
->msgh_local_port
;
4502 if (!MACH_PORT_VALID(local
) || MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
) !=
4503 MACH_MSG_TYPE_MAKE_SEND_ONCE
) return MACH_PORT_NULL
;
4508 _dispatch_mach_msg_set_reason(dispatch_mach_msg_t dmsg
, mach_error_t err
,
4509 unsigned long reason
)
4511 dispatch_assert_zero(reason
& ~(unsigned long)code_emask
);
4512 dmsg
->dmsg_error
= ((err
|| !reason
) ? err
:
4513 err_local
|err_sub(0x3e0)|(mach_error_t
)reason
);
4516 static inline unsigned long
4517 _dispatch_mach_msg_get_reason(dispatch_mach_msg_t dmsg
, mach_error_t
*err_ptr
)
4519 mach_error_t err
= dmsg
->dmsg_error
;
4521 dmsg
->dmsg_error
= 0;
4522 if ((err
& system_emask
) == err_local
&& err_get_sub(err
) == 0x3e0) {
4524 return err_get_code(err
);
4527 return err
? DISPATCH_MACH_MESSAGE_SEND_FAILED
: DISPATCH_MACH_MESSAGE_SENT
;
4531 _dispatch_mach_msg_recv(dispatch_mach_t dm
, dispatch_mach_reply_refs_t dmr
,
4532 _dispatch_kevent_qos_s
*ke
, mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
4534 _dispatch_debug_machport(hdr
->msgh_remote_port
);
4535 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
4536 hdr
->msgh_local_port
, hdr
->msgh_id
, hdr
->msgh_remote_port
);
4537 bool canceled
= (dm
->dq_atomic_flags
& DSF_CANCELED
);
4538 if (!dmr
&& canceled
) {
4539 // message received after cancellation, _dispatch_mach_kevent_merge is
4540 // responsible for mach channel source state (e.g. deferred deletion)
4541 return _dispatch_kevent_mach_msg_destroy(ke
, hdr
);
4543 dispatch_mach_msg_t dmsg
;
4545 pthread_priority_t priority
;
4548 _voucher_mach_msg_clear(hdr
, false); // deallocate reply message voucher
4549 voucher
= dmr
->dmr_voucher
;
4550 dmr
->dmr_voucher
= NULL
; // transfer reference
4551 priority
= dmr
->dmr_priority
;
4552 ctxt
= dmr
->dmr_ctxt
;
4553 unsigned int options
= DKEV_DISPOSE_IMMEDIATE_DELETE
;
4554 options
|= DKEV_UNREGISTER_REPLY_REMOVE
;
4555 options
|= DKEV_UNREGISTER_WAKEUP
;
4556 if (canceled
) options
|= DKEV_UNREGISTER_DISCONNECTED
;
4557 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, options
);
4558 ke
->flags
|= EV_DELETE
; // remember that unregister deleted the event
4559 if (canceled
) return;
4561 voucher
= voucher_create_with_mach_msg(hdr
);
4562 priority
= _voucher_get_priority(voucher
);
4564 dispatch_mach_msg_destructor_t destructor
;
4565 destructor
= (hdr
== _dispatch_kevent_mach_msg_buf(ke
)) ?
4566 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
4567 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
4568 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
4569 if (hdr
== _dispatch_kevent_mach_msg_buf(ke
)) {
4570 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move
, (uint64_t)hdr
, (uint64_t)dmsg
->dmsg_buf
);
4572 dmsg
->dmsg_voucher
= voucher
;
4573 dmsg
->dmsg_priority
= priority
;
4574 dmsg
->do_ctxt
= ctxt
;
4575 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_MESSAGE_RECEIVED
);
4576 _dispatch_voucher_debug("mach-msg[%p] create", voucher
, dmsg
);
4577 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
4578 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4581 DISPATCH_ALWAYS_INLINE
4582 static inline dispatch_mach_msg_t
4583 _dispatch_mach_msg_reply_recv(dispatch_mach_t dm
,
4584 dispatch_mach_reply_refs_t dmr
, mach_port_t reply_port
)
4586 if (slowpath(!MACH_PORT_VALID(reply_port
))) {
4587 DISPATCH_CLIENT_CRASH(reply_port
, "Invalid reply port");
4589 void *ctxt
= dmr
->dmr_ctxt
;
4590 mach_msg_header_t
*hdr
, *hdr2
= NULL
;
4591 void *hdr_copyout_addr
;
4592 mach_msg_size_t siz
, msgsiz
= 0;
4593 mach_msg_return_t kr
;
4594 mach_msg_option_t options
;
4595 siz
= mach_vm_round_page(_dispatch_mach_recv_msg_size
+
4596 dispatch_mach_trailer_size
);
4598 for (mach_vm_address_t p
= mach_vm_trunc_page(hdr
+ vm_page_size
);
4599 p
< (mach_vm_address_t
)hdr
+ siz
; p
+= vm_page_size
) {
4600 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
4602 options
= DISPATCH_MACH_RCV_OPTIONS
& (~MACH_RCV_VOUCHER
);
4604 _dispatch_debug_machport(reply_port
);
4605 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG %s", reply_port
,
4606 (options
& MACH_RCV_TIMEOUT
) ? "poll" : "wait");
4607 kr
= mach_msg(hdr
, options
, 0, siz
, reply_port
, MACH_MSG_TIMEOUT_NONE
,
4609 hdr_copyout_addr
= hdr
;
4610 _dispatch_debug_machport(reply_port
);
4611 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG (size %u, opts 0x%x) "
4612 "returned: %s - 0x%x", reply_port
, siz
, options
,
4613 mach_error_string(kr
), kr
);
4615 case MACH_RCV_TOO_LARGE
:
4616 if (!fastpath(hdr
->msgh_size
<= UINT_MAX
-
4617 dispatch_mach_trailer_size
)) {
4618 DISPATCH_CLIENT_CRASH(hdr
->msgh_size
, "Overlarge message");
4620 if (options
& MACH_RCV_LARGE
) {
4621 msgsiz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
4622 hdr2
= malloc(msgsiz
);
4623 if (dispatch_assume(hdr2
)) {
4627 options
|= MACH_RCV_TIMEOUT
;
4628 options
&= ~MACH_RCV_LARGE
;
4631 _dispatch_log("BUG in libdispatch client: "
4632 "dispatch_mach_send_and_wait_for_reply: dropped message too "
4633 "large to fit in memory: id = 0x%x, size = %u", hdr
->msgh_id
,
4636 case MACH_RCV_INVALID_NAME
: // rdar://problem/21963848
4637 case MACH_RCV_PORT_CHANGED
: // rdar://problem/21885327
4638 case MACH_RCV_PORT_DIED
:
4639 // channel was disconnected/canceled and reply port destroyed
4640 _dispatch_debug("machport[0x%08x]: sync reply port destroyed, ctxt %p: "
4641 "%s - 0x%x", reply_port
, ctxt
, mach_error_string(kr
), kr
);
4643 case MACH_MSG_SUCCESS
:
4644 if (hdr
->msgh_remote_port
) {
4645 _dispatch_debug_machport(hdr
->msgh_remote_port
);
4647 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, size = %u, "
4648 "reply on 0x%08x", hdr
->msgh_local_port
, hdr
->msgh_id
,
4649 hdr
->msgh_size
, hdr
->msgh_remote_port
);
4650 siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
4651 if (hdr2
&& siz
< msgsiz
) {
4652 void *shrink
= realloc(hdr2
, msgsiz
);
4653 if (shrink
) hdr
= hdr2
= shrink
;
4657 dispatch_assume_zero(kr
);
4660 _dispatch_mach_msg_reply_received(dm
, dmr
, hdr
->msgh_local_port
);
4661 hdr
->msgh_local_port
= MACH_PORT_NULL
;
4662 if (slowpath((dm
->dq_atomic_flags
& DSF_CANCELED
) || kr
)) {
4663 if (!kr
) mach_msg_destroy(hdr
);
4666 dispatch_mach_msg_t dmsg
;
4667 dispatch_mach_msg_destructor_t destructor
= (!hdr2
) ?
4668 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
4669 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
4670 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
4671 if (!hdr2
|| hdr
!= hdr_copyout_addr
) {
4672 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move
, (uint64_t)hdr_copyout_addr
, (uint64_t)_dispatch_mach_msg_get_msg(dmsg
));
4674 dmsg
->do_ctxt
= ctxt
;
4682 _dispatch_mach_msg_reply_received(dispatch_mach_t dm
,
4683 dispatch_mach_reply_refs_t dmr
, mach_port_t local_port
)
4685 bool removed
= _dispatch_mach_reply_tryremove(dm
, dmr
);
4686 if (!MACH_PORT_VALID(local_port
) || !removed
) {
4687 // port moved/destroyed during receive, or reply waiter was never
4688 // registered or already removed (disconnected)
4691 mach_port_t reply_port
= _dispatch_mach_reply_get_reply_port(dmr
);
4692 _dispatch_debug("machport[0x%08x]: unregistered for sync reply, ctxt %p",
4693 reply_port
, dmr
->dmr_ctxt
);
4694 if (_dispatch_mach_reply_is_reply_port_owned(dmr
)) {
4695 _dispatch_set_thread_reply_port(reply_port
);
4696 if (local_port
!= reply_port
) {
4697 DISPATCH_CLIENT_CRASH(local_port
,
4698 "Reply received on unexpected port");
4702 mach_msg_header_t
*hdr
;
4703 dispatch_mach_msg_t dmsg
;
4704 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4705 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4706 hdr
->msgh_local_port
= local_port
;
4707 dmsg
->dmsg_voucher
= dmr
->dmr_voucher
;
4708 dmr
->dmr_voucher
= NULL
; // transfer reference
4709 dmsg
->dmsg_priority
= dmr
->dmr_priority
;
4710 dmsg
->do_ctxt
= dmr
->dmr_ctxt
;
4711 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_REPLY_RECEIVED
);
4712 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4716 _dispatch_mach_msg_disconnected(dispatch_mach_t dm
, mach_port_t local_port
,
4717 mach_port_t remote_port
)
4719 mach_msg_header_t
*hdr
;
4720 dispatch_mach_msg_t dmsg
;
4721 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4722 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4723 if (local_port
) hdr
->msgh_local_port
= local_port
;
4724 if (remote_port
) hdr
->msgh_remote_port
= remote_port
;
4725 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_DISCONNECTED
);
4726 _dispatch_debug("machport[0x%08x]: %s right disconnected", local_port
?
4727 local_port
: remote_port
, local_port
? "receive" : "send");
4728 return _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4731 static inline dispatch_mach_msg_t
4732 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou
,
4733 dispatch_mach_reply_refs_t dmr
)
4735 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
4736 mach_port_t reply_port
= dmsg
? dmsg
->dmsg_reply
:
4737 _dispatch_mach_reply_get_reply_port(dmr
);
4742 v
= dmr
->dmr_voucher
;
4743 dmr
->dmr_voucher
= NULL
; // transfer reference
4744 if (v
) _voucher_release(v
);
4750 v
= dmsg
->dmsg_voucher
;
4751 if (v
) _voucher_retain(v
);
4753 v
= dmr
->dmr_voucher
;
4754 dmr
->dmr_voucher
= NULL
; // transfer reference
4757 if ((dmsg
&& (dmsg
->dmsg_options
& DISPATCH_MACH_WAIT_FOR_REPLY
) &&
4758 (dmsg
->dmsg_options
& DISPATCH_MACH_OWNED_REPLY_PORT
)) ||
4759 (dmr
&& !dmr
->dmr_dkev
&&
4760 _dispatch_mach_reply_is_reply_port_owned(dmr
))) {
4761 if (v
) _voucher_release(v
);
4762 // deallocate owned reply port to break _dispatch_mach_msg_reply_recv
4763 // out of waiting in mach_msg(MACH_RCV_MSG)
4764 kern_return_t kr
= mach_port_mod_refs(mach_task_self(), reply_port
,
4765 MACH_PORT_RIGHT_RECEIVE
, -1);
4766 DISPATCH_VERIFY_MIG(kr
);
4767 dispatch_assume_zero(kr
);
4771 mach_msg_header_t
*hdr
;
4772 dmsgr
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
4773 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
4774 dmsgr
->dmsg_voucher
= v
;
4775 hdr
->msgh_local_port
= reply_port
;
4777 dmsgr
->dmsg_priority
= dmsg
->dmsg_priority
;
4778 dmsgr
->do_ctxt
= dmsg
->do_ctxt
;
4780 dmsgr
->dmsg_priority
= dmr
->dmr_priority
;
4781 dmsgr
->do_ctxt
= dmr
->dmr_ctxt
;
4783 _dispatch_mach_msg_set_reason(dmsgr
, 0, DISPATCH_MACH_DISCONNECTED
);
4784 _dispatch_debug("machport[0x%08x]: reply disconnected, ctxt %p",
4785 hdr
->msgh_local_port
, dmsgr
->do_ctxt
);
4791 _dispatch_mach_msg_not_sent(dispatch_mach_t dm
, dispatch_object_t dou
)
4793 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
4794 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
4795 mach_msg_option_t msg_opts
= dmsg
->dmsg_options
;
4796 _dispatch_debug("machport[0x%08x]: not sent msg id 0x%x, ctxt %p, "
4797 "msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x",
4798 msg
->msgh_remote_port
, msg
->msgh_id
, dmsg
->do_ctxt
,
4799 msg_opts
, msg
->msgh_voucher_port
, dmsg
->dmsg_reply
);
4800 unsigned long reason
= (msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
) ?
4801 0 : DISPATCH_MACH_MESSAGE_NOT_SENT
;
4802 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
4803 _dispatch_mach_msg_set_reason(dmsg
, 0, reason
);
4804 _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4805 if (dmsgr
) _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4810 _dispatch_mach_msg_send(dispatch_mach_t dm
, dispatch_object_t dou
,
4811 dispatch_mach_reply_refs_t dmr
, pthread_priority_t pp
,
4812 dispatch_mach_send_invoke_flags_t send_flags
)
4814 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4815 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
= NULL
;
4816 voucher_t voucher
= dmsg
->dmsg_voucher
;
4817 mach_voucher_t ipc_kvoucher
= MACH_VOUCHER_NULL
;
4818 uint32_t send_status
= 0;
4819 bool clear_voucher
= false, kvoucher_move_send
= false;
4820 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
4821 bool is_reply
= (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) ==
4822 MACH_MSG_TYPE_MOVE_SEND_ONCE
);
4823 mach_port_t reply_port
= dmsg
->dmsg_reply
;
4825 dr
->dm_needs_mgr
= 0;
4826 if (unlikely(dr
->dm_checkin
&& dmsg
!= dr
->dm_checkin
)) {
4827 // send initial checkin message
4828 if (dm
->dm_dkev
&& slowpath(_dispatch_queue_get_current() !=
4829 &_dispatch_mgr_q
)) {
4830 // send kevent must be uninstalled on the manager queue
4831 dr
->dm_needs_mgr
= 1;
4834 if (unlikely(!_dispatch_mach_msg_send(dm
,
4835 dr
->dm_checkin
, NULL
, pp
, DM_SEND_INVOKE_NONE
))) {
4838 dr
->dm_checkin
= NULL
;
4841 mach_msg_return_t kr
= 0;
4842 mach_msg_option_t opts
= 0, msg_opts
= dmsg
->dmsg_options
;
4843 if (!(msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
)) {
4844 mach_msg_priority_t msg_priority
= MACH_MSG_PRIORITY_UNSPECIFIED
;
4845 opts
= MACH_SEND_MSG
| (msg_opts
& ~DISPATCH_MACH_OPTIONS_MASK
);
4847 if (dmsg
!= dr
->dm_checkin
) {
4848 msg
->msgh_remote_port
= dr
->dm_send
;
4850 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
4851 if (slowpath(!dm
->dm_dkev
)) {
4852 _dispatch_mach_notification_kevent_register(dm
,
4853 msg
->msgh_remote_port
);
4855 if (fastpath(dm
->dm_dkev
)) {
4856 if (DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) {
4859 opts
|= MACH_SEND_NOTIFY
;
4862 opts
|= MACH_SEND_TIMEOUT
;
4863 if (dmsg
->dmsg_priority
!= _voucher_get_priority(voucher
)) {
4864 ipc_kvoucher
= _voucher_create_mach_voucher_with_priority(
4865 voucher
, dmsg
->dmsg_priority
);
4867 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher
, dmsg
);
4869 kvoucher_move_send
= true;
4870 clear_voucher
= _voucher_mach_msg_set_mach_voucher(msg
,
4871 ipc_kvoucher
, kvoucher_move_send
);
4873 clear_voucher
= _voucher_mach_msg_set(msg
, voucher
);
4875 if (pp
&& _dispatch_evfilt_machport_direct_enabled
) {
4876 opts
|= MACH_SEND_OVERRIDE
;
4877 msg_priority
= (mach_msg_priority_t
)pp
;
4880 _dispatch_debug_machport(msg
->msgh_remote_port
);
4881 if (reply_port
) _dispatch_debug_machport(reply_port
);
4882 if (msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
) {
4883 if (msg_opts
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
4884 _dispatch_clear_thread_reply_port(reply_port
);
4886 _dispatch_mach_reply_waiter_register(dm
, dmr
, reply_port
, dmsg
,
4889 kr
= mach_msg(msg
, opts
, msg
->msgh_size
, 0, MACH_PORT_NULL
, 0,
4891 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
4892 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
4893 "%s - 0x%x", msg
->msgh_remote_port
, msg
->msgh_id
, dmsg
->do_ctxt
,
4894 opts
, msg_opts
, msg
->msgh_voucher_port
, reply_port
,
4895 mach_error_string(kr
), kr
);
4896 if (unlikely(kr
&& (msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
))) {
4897 _dispatch_mach_reply_waiter_unregister(dm
, dmr
,
4898 DKEV_UNREGISTER_REPLY_REMOVE
);
4900 if (clear_voucher
) {
4901 if (kr
== MACH_SEND_INVALID_VOUCHER
&& msg
->msgh_voucher_port
) {
4902 DISPATCH_CLIENT_CRASH(kr
, "Voucher port corruption");
4905 kv
= _voucher_mach_msg_clear(msg
, kvoucher_move_send
);
4906 if (kvoucher_move_send
) ipc_kvoucher
= kv
;
4909 if (kr
== MACH_SEND_TIMED_OUT
&& (opts
& MACH_SEND_TIMEOUT
)) {
4910 if (opts
& MACH_SEND_NOTIFY
) {
4911 _dispatch_debug("machport[0x%08x]: send-possible notification "
4912 "armed", (mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
);
4913 DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
) = 1;
4915 // send kevent must be installed on the manager queue
4916 dr
->dm_needs_mgr
= 1;
4919 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher
);
4920 voucher_t ipc_voucher
;
4921 ipc_voucher
= _voucher_create_with_priority_and_mach_voucher(
4922 voucher
, dmsg
->dmsg_priority
, ipc_kvoucher
);
4923 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
4924 ipc_voucher
, dmsg
, voucher
);
4925 if (dmsg
->dmsg_voucher
) _voucher_release(dmsg
->dmsg_voucher
);
4926 dmsg
->dmsg_voucher
= ipc_voucher
;
4929 } else if (ipc_kvoucher
&& (kr
|| !kvoucher_move_send
)) {
4930 _voucher_dealloc_mach_voucher(ipc_kvoucher
);
4932 if (!(msg_opts
& DISPATCH_MACH_WAIT_FOR_REPLY
) && !kr
&& reply_port
&&
4933 !(dm
->ds_dkev
&& dm
->ds_dkev
->dk_kevent
.ident
== reply_port
)) {
4934 if (!dm
->ds_is_direct_kevent
&&
4935 _dispatch_queue_get_current() != &_dispatch_mgr_q
) {
4936 // reply receive kevent must be installed on the manager queue
4937 dr
->dm_needs_mgr
= 1;
4938 dmsg
->dmsg_options
= msg_opts
| DISPATCH_MACH_REGISTER_FOR_REPLY
;
4941 _dispatch_mach_reply_kevent_register(dm
, reply_port
, dmsg
);
4943 if (unlikely(!is_reply
&& dmsg
== dr
->dm_checkin
&& dm
->dm_dkev
)) {
4944 _dispatch_mach_notification_kevent_unregister(dm
);
4947 // Send failed, so reply was never registered <rdar://problem/14309159>
4948 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
4950 _dispatch_mach_msg_set_reason(dmsg
, kr
, 0);
4951 if ((send_flags
& DM_SEND_INVOKE_IMMEDIATE_SEND
) &&
4952 (msg_opts
& DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
)) {
4953 // Return sent message synchronously <rdar://problem/25947334>
4954 send_status
|= DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
;
4956 _dispatch_queue_push(dm
->_as_dq
, dmsg
, dmsg
->dmsg_priority
);
4958 if (dmsgr
) _dispatch_queue_push(dm
->_as_dq
, dmsgr
, dmsgr
->dmsg_priority
);
4959 send_status
|= DM_SEND_STATUS_SUCCESS
;
4965 #pragma mark dispatch_mach_send_refs_t
4967 static void _dispatch_mach_cancel(dispatch_mach_t dm
);
4968 static void _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm
,
4969 pthread_priority_t pp
);
4971 DISPATCH_ALWAYS_INLINE
4972 static inline pthread_priority_t
4973 _dm_state_get_override(uint64_t dm_state
)
4975 dm_state
&= DISPATCH_MACH_STATE_OVERRIDE_MASK
;
4976 return (pthread_priority_t
)(dm_state
>> 32);
4979 DISPATCH_ALWAYS_INLINE
4980 static inline uint64_t
4981 _dm_state_override_from_priority(pthread_priority_t pp
)
4983 uint64_t pp_state
= pp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4984 return pp_state
<< 32;
4987 DISPATCH_ALWAYS_INLINE
4989 _dm_state_needs_override(uint64_t dm_state
, uint64_t pp_state
)
4991 return (pp_state
> (dm_state
& DISPATCH_MACH_STATE_OVERRIDE_MASK
));
4994 DISPATCH_ALWAYS_INLINE
4995 static inline uint64_t
4996 _dm_state_merge_override(uint64_t dm_state
, uint64_t pp_state
)
4998 if (_dm_state_needs_override(dm_state
, pp_state
)) {
4999 dm_state
&= ~DISPATCH_MACH_STATE_OVERRIDE_MASK
;
5000 dm_state
|= pp_state
;
5001 dm_state
|= DISPATCH_MACH_STATE_DIRTY
;
5002 dm_state
|= DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5007 #define _dispatch_mach_send_push_update_tail(dr, tail) \
5008 os_mpsc_push_update_tail(dr, dm, tail, do_next)
5009 #define _dispatch_mach_send_push_update_head(dr, head) \
5010 os_mpsc_push_update_head(dr, dm, head)
5011 #define _dispatch_mach_send_get_head(dr) \
5012 os_mpsc_get_head(dr, dm)
5013 #define _dispatch_mach_send_unpop_head(dr, dc, dc_next) \
5014 os_mpsc_undo_pop_head(dr, dm, dc, dc_next, do_next)
5015 #define _dispatch_mach_send_pop_head(dr, head) \
5016 os_mpsc_pop_head(dr, dm, head, do_next)
5018 DISPATCH_ALWAYS_INLINE
5020 _dispatch_mach_send_push_inline(dispatch_mach_send_refs_t dr
,
5021 dispatch_object_t dou
)
5023 if (_dispatch_mach_send_push_update_tail(dr
, dou
._do
)) {
5024 _dispatch_mach_send_push_update_head(dr
, dou
._do
);
5032 _dispatch_mach_send_drain(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
,
5033 dispatch_mach_send_invoke_flags_t send_flags
)
5035 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5036 dispatch_mach_reply_refs_t dmr
;
5037 dispatch_mach_msg_t dmsg
;
5038 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
5039 pthread_priority_t pp
= _dm_state_get_override(dr
->dm_state
);
5040 uint64_t old_state
, new_state
;
5041 uint32_t send_status
;
5042 bool needs_mgr
, disconnecting
, returning_send_result
= false;
5045 needs_mgr
= false; disconnecting
= false;
5046 while (dr
->dm_tail
) {
5047 dc
= _dispatch_mach_send_get_head(dr
);
5049 dispatch_mach_send_invoke_flags_t sf
= send_flags
;
5050 // Only request immediate send result for the first message
5051 send_flags
&= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK
;
5052 next_dc
= _dispatch_mach_send_pop_head(dr
, dc
);
5053 if (_dispatch_object_has_type(dc
,
5054 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER
))) {
5055 if (!(send_flags
& DM_SEND_INVOKE_CAN_RUN_BARRIER
)) {
5058 _dispatch_continuation_pop(dc
, dm
->_as_dq
, flags
);
5061 if (_dispatch_object_is_slow_item(dc
)) {
5062 dmsg
= ((dispatch_continuation_t
)dc
)->dc_data
;
5063 dmr
= ((dispatch_continuation_t
)dc
)->dc_other
;
5064 } else if (_dispatch_object_has_vtable(dc
)) {
5065 dmsg
= (dispatch_mach_msg_t
)dc
;
5068 if ((dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
) &&
5069 (_dispatch_queue_get_current() != &_dispatch_mgr_q
)) {
5070 // send kevent must be uninstalled on the manager queue
5074 if (unlikely(!_dispatch_mach_reconnect_invoke(dm
, dc
))) {
5075 disconnecting
= true;
5080 _dispatch_voucher_ktrace_dmsg_pop(dmsg
);
5081 if (unlikely(dr
->dm_disconnect_cnt
||
5082 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
5083 _dispatch_mach_msg_not_sent(dm
, dmsg
);
5086 send_status
= _dispatch_mach_msg_send(dm
, dmsg
, dmr
, pp
, sf
);
5087 if (unlikely(!send_status
)) {
5090 if (send_status
& DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
) {
5091 returning_send_result
= true;
5093 } while ((dc
= next_dc
));
5096 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5097 if (old_state
& DISPATCH_MACH_STATE_DIRTY
) {
5098 new_state
= old_state
;
5099 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5100 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5101 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5110 // if this is not a complete drain, we must undo some things
5111 _dispatch_mach_send_unpop_head(dr
, dc
, next_dc
);
5113 if (_dispatch_object_has_type(dc
,
5114 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER
))) {
5115 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5116 new_state
= old_state
;
5117 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5118 new_state
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5119 new_state
&= ~DISPATCH_MACH_STATE_UNLOCK_MASK
;
5122 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5123 new_state
= old_state
;
5124 if (old_state
& (DISPATCH_MACH_STATE_DIRTY
|
5125 DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
)) {
5126 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5127 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5128 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5130 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5131 new_state
&= ~DISPATCH_MACH_STATE_UNLOCK_MASK
;
5137 if (old_state
& DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
) {
5138 // Ensure that the root queue sees that this thread was overridden.
5139 _dispatch_set_defaultpriority_override();
5142 if (unlikely(new_state
& DISPATCH_MACH_STATE_UNLOCK_MASK
)) {
5143 os_atomic_thread_fence(acquire
);
5144 pp
= _dm_state_get_override(new_state
);
5148 if (new_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5149 pp
= _dm_state_get_override(new_state
);
5150 _dispatch_mach_send_barrier_drain_push(dm
, pp
);
5153 pp
= _dm_state_get_override(new_state
);
5157 if (!disconnecting
) dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_FLUSH
);
5159 return returning_send_result
;
5164 _dispatch_mach_send_invoke(dispatch_mach_t dm
,
5165 dispatch_invoke_flags_t flags
,
5166 dispatch_mach_send_invoke_flags_t send_flags
)
5168 dispatch_lock_owner tid_self
= _dispatch_tid_self();
5169 uint64_t old_state
, new_state
;
5170 pthread_priority_t pp_floor
;
5172 uint64_t canlock_mask
= DISPATCH_MACH_STATE_UNLOCK_MASK
;
5173 uint64_t canlock_state
= 0;
5175 if (send_flags
& DM_SEND_INVOKE_NEEDS_BARRIER
) {
5176 canlock_mask
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5177 canlock_state
= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5178 } else if (!(send_flags
& DM_SEND_INVOKE_CAN_RUN_BARRIER
)) {
5179 canlock_mask
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5182 if (flags
& DISPATCH_INVOKE_MANAGER_DRAIN
) {
5185 // _dispatch_queue_class_invoke will have applied the queue override
5186 // (if any) before we get here. Else use the default base priority
5187 // as an estimation of the priority we already asked for.
5188 pp_floor
= dm
->_as_dq
->dq_override
;
5190 pp_floor
= _dispatch_get_defaultpriority();
5191 pp_floor
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5196 os_atomic_rmw_loop2o(dm
->dm_refs
, dm_state
, old_state
, new_state
, acquire
, {
5197 new_state
= old_state
;
5198 if (unlikely((old_state
& canlock_mask
) != canlock_state
)) {
5199 if (!(send_flags
& DM_SEND_INVOKE_FLUSH
)) {
5200 os_atomic_rmw_loop_give_up(break);
5202 new_state
|= DISPATCH_MACH_STATE_DIRTY
;
5204 if (likely(pp_floor
)) {
5205 pthread_priority_t pp
= _dm_state_get_override(old_state
);
5206 if (unlikely(pp
> pp_floor
)) {
5207 os_atomic_rmw_loop_give_up({
5208 _dispatch_wqthread_override_start(tid_self
, pp
);
5209 // Ensure that the root queue sees
5210 // that this thread was overridden.
5211 _dispatch_set_defaultpriority_override();
5217 new_state
|= tid_self
;
5218 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5219 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5220 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5224 if (unlikely((old_state
& canlock_mask
) != canlock_state
)) {
5227 if (send_flags
& DM_SEND_INVOKE_CANCEL
) {
5228 _dispatch_mach_cancel(dm
);
5230 _dispatch_mach_send_drain(dm
, flags
, send_flags
);
5235 _dispatch_mach_send_barrier_drain_invoke(dispatch_continuation_t dc
,
5236 dispatch_invoke_flags_t flags
)
5238 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
5239 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5240 dispatch_thread_frame_s dtf
;
5242 DISPATCH_COMPILER_CAN_ASSUME(dc
->dc_priority
== DISPATCH_NO_PRIORITY
);
5243 DISPATCH_COMPILER_CAN_ASSUME(dc
->dc_voucher
== DISPATCH_NO_VOUCHER
);
5244 // hide the mach channel (see _dispatch_mach_barrier_invoke comment)
5245 _dispatch_thread_frame_stash(&dtf
);
5246 _dispatch_continuation_pop_forwarded(dc
, DISPATCH_NO_VOUCHER
, dc_flags
,{
5247 _dispatch_mach_send_invoke(dm
, flags
,
5248 DM_SEND_INVOKE_NEEDS_BARRIER
| DM_SEND_INVOKE_CAN_RUN_BARRIER
);
5250 _dispatch_thread_frame_unstash(&dtf
);
5255 _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm
,
5256 pthread_priority_t pp
)
5258 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5260 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRRIER_DRAIN
);
5263 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5264 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5265 return _dispatch_queue_push(dm
->_as_dq
, dc
, pp
);
5270 _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_continuation_t dc
,
5271 pthread_priority_t pp
)
5273 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5274 uint64_t pp_state
, old_state
, new_state
, state_flags
= 0;
5275 dispatch_lock_owner owner
;
5278 // <rdar://problem/25896179> when pushing a send barrier that destroys
5279 // the last reference to this channel, and the send queue is already
5280 // draining on another thread, the send barrier may run as soon as
5281 // _dispatch_mach_send_push_inline() returns.
5282 _dispatch_retain(dm
);
5283 pp_state
= _dm_state_override_from_priority(pp
);
5285 wakeup
= _dispatch_mach_send_push_inline(dr
, dc
);
5287 state_flags
= DISPATCH_MACH_STATE_DIRTY
;
5288 if (dc
->do_vtable
== DC_VTABLE(MACH_SEND_BARRIER
)) {
5289 state_flags
|= DISPATCH_MACH_STATE_PENDING_BARRIER
;
5294 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5295 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5296 new_state
|= state_flags
;
5299 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, relaxed
, {
5300 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5301 if (old_state
== new_state
) {
5302 os_atomic_rmw_loop_give_up(break);
5307 pp
= _dm_state_get_override(new_state
);
5308 owner
= _dispatch_lock_owner((dispatch_lock
)old_state
);
5310 if (_dm_state_needs_override(old_state
, pp_state
)) {
5311 _dispatch_wqthread_override_start_check_owner(owner
, pp
,
5312 &dr
->dm_state_lock
.dul_lock
);
5314 return _dispatch_release_tailcall(dm
);
5317 dispatch_wakeup_flags_t wflags
= 0;
5318 if (state_flags
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5319 _dispatch_mach_send_barrier_drain_push(dm
, pp
);
5320 } else if (wakeup
|| dr
->dm_disconnect_cnt
||
5321 (dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5322 wflags
= DISPATCH_WAKEUP_FLUSH
| DISPATCH_WAKEUP_CONSUME
;
5323 } else if (old_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5324 wflags
= DISPATCH_WAKEUP_OVERRIDING
| DISPATCH_WAKEUP_CONSUME
;
5327 return dx_wakeup(dm
, pp
, wflags
);
5329 return _dispatch_release_tailcall(dm
);
5334 _dispatch_mach_send_push_and_trydrain(dispatch_mach_t dm
,
5335 dispatch_object_t dou
, pthread_priority_t pp
,
5336 dispatch_mach_send_invoke_flags_t send_flags
)
5338 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5339 dispatch_lock_owner tid_self
= _dispatch_tid_self();
5340 uint64_t pp_state
, old_state
, new_state
, canlock_mask
, state_flags
= 0;
5341 dispatch_lock_owner owner
;
5343 pp_state
= _dm_state_override_from_priority(pp
);
5344 bool wakeup
= _dispatch_mach_send_push_inline(dr
, dou
);
5346 state_flags
= DISPATCH_MACH_STATE_DIRTY
;
5349 if (unlikely(dr
->dm_disconnect_cnt
||
5350 (dm
->dq_atomic_flags
& DSF_CANCELED
))) {
5351 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, release
, {
5352 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5353 new_state
|= state_flags
;
5355 dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_FLUSH
);
5359 canlock_mask
= DISPATCH_MACH_STATE_UNLOCK_MASK
|
5360 DISPATCH_MACH_STATE_PENDING_BARRIER
;
5362 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, seq_cst
, {
5363 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5364 new_state
|= state_flags
;
5365 if (likely((old_state
& canlock_mask
) == 0)) {
5366 new_state
|= tid_self
;
5367 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5368 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5369 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5373 os_atomic_rmw_loop2o(dr
, dm_state
, old_state
, new_state
, acquire
, {
5374 new_state
= _dm_state_merge_override(old_state
, pp_state
);
5375 if (new_state
== old_state
) {
5376 os_atomic_rmw_loop_give_up(return false);
5378 if (likely((old_state
& canlock_mask
) == 0)) {
5379 new_state
|= tid_self
;
5380 new_state
&= ~DISPATCH_MACH_STATE_DIRTY
;
5381 new_state
&= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE
;
5382 new_state
&= ~DISPATCH_MACH_STATE_PENDING_BARRIER
;
5387 owner
= _dispatch_lock_owner((dispatch_lock
)old_state
);
5389 if (_dm_state_needs_override(old_state
, pp_state
)) {
5390 _dispatch_wqthread_override_start_check_owner(owner
, pp
,
5391 &dr
->dm_state_lock
.dul_lock
);
5396 if (old_state
& DISPATCH_MACH_STATE_PENDING_BARRIER
) {
5397 dx_wakeup(dm
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
5401 // Ensure our message is still at the head of the queue and has not already
5402 // been dequeued by another thread that raced us to the send queue lock.
5403 // A plain load of the head and comparison against our object pointer is
5405 if (unlikely(!(wakeup
&& dou
._do
== dr
->dm_head
))) {
5406 // Don't request immediate send result for messages we don't own
5407 send_flags
&= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK
;
5409 return _dispatch_mach_send_drain(dm
, DISPATCH_INVOKE_NONE
, send_flags
);
5413 _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm
,
5414 const _dispatch_kevent_qos_s
*ke
)
5416 if (!(ke
->fflags
& dm
->ds_pending_data_mask
)) {
5419 _dispatch_mach_send_invoke(dm
, DISPATCH_INVOKE_MANAGER_DRAIN
,
5420 DM_SEND_INVOKE_FLUSH
);
5424 #pragma mark dispatch_mach_t
5426 static inline mach_msg_option_t
5427 _dispatch_mach_checkin_options(void)
5429 mach_msg_option_t options
= 0;
5430 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
5431 options
= MACH_SEND_NOIMPORTANCE
; // <rdar://problem/16996737>
5437 static inline mach_msg_option_t
5438 _dispatch_mach_send_options(void)
5440 mach_msg_option_t options
= 0;
5444 DISPATCH_ALWAYS_INLINE
5445 static inline pthread_priority_t
5446 _dispatch_mach_priority_propagate(mach_msg_option_t options
)
5448 #if DISPATCH_USE_NOIMPORTANCE_QOS
5449 if (options
& MACH_SEND_NOIMPORTANCE
) return 0;
5453 return _dispatch_priority_propagate();
5458 _dispatch_mach_send_msg(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5459 dispatch_continuation_t dc_wait
, mach_msg_option_t options
)
5461 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5462 if (slowpath(dmsg
->do_next
!= DISPATCH_OBJECT_LISTLESS
)) {
5463 DISPATCH_CLIENT_CRASH(dmsg
->do_next
, "Message already enqueued");
5465 dispatch_retain(dmsg
);
5466 pthread_priority_t priority
= _dispatch_mach_priority_propagate(options
);
5467 options
|= _dispatch_mach_send_options();
5468 dmsg
->dmsg_options
= options
;
5469 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
5470 dmsg
->dmsg_reply
= _dispatch_mach_msg_get_reply_port(dmsg
);
5471 bool is_reply
= (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) ==
5472 MACH_MSG_TYPE_MOVE_SEND_ONCE
);
5473 dmsg
->dmsg_priority
= priority
;
5474 dmsg
->dmsg_voucher
= _voucher_copy();
5475 _dispatch_voucher_debug("mach-msg[%p] set", dmsg
->dmsg_voucher
, dmsg
);
5477 uint32_t send_status
;
5478 bool returning_send_result
= false;
5479 dispatch_mach_send_invoke_flags_t send_flags
= DM_SEND_INVOKE_NONE
;
5480 if (options
& DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
) {
5481 send_flags
= DM_SEND_INVOKE_IMMEDIATE_SEND
;
5483 if (is_reply
&& !dmsg
->dmsg_reply
&& !dr
->dm_disconnect_cnt
&&
5484 !(dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5485 // replies are sent to a send-once right and don't need the send queue
5486 dispatch_assert(!dc_wait
);
5487 send_status
= _dispatch_mach_msg_send(dm
, dmsg
, NULL
, 0, send_flags
);
5488 dispatch_assert(send_status
);
5489 returning_send_result
= !!(send_status
&
5490 DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT
);
5492 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
5493 priority
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5494 dispatch_object_t dou
= { ._dmsg
= dmsg
};
5495 if (dc_wait
) dou
._dc
= dc_wait
;
5496 returning_send_result
= _dispatch_mach_send_push_and_trydrain(dm
, dou
,
5497 priority
, send_flags
);
5499 if (returning_send_result
) {
5500 _dispatch_voucher_debug("mach-msg[%p] clear", dmsg
->dmsg_voucher
, dmsg
);
5501 if (dmsg
->dmsg_voucher
) _voucher_release(dmsg
->dmsg_voucher
);
5502 dmsg
->dmsg_voucher
= NULL
;
5503 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
5504 dispatch_release(dmsg
);
5506 return returning_send_result
;
5511 dispatch_mach_send(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5512 mach_msg_option_t options
)
5514 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5515 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5516 bool returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, NULL
,options
);
5517 dispatch_assert(!returned_send_result
);
5522 dispatch_mach_send_with_result(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
5523 mach_msg_option_t options
, dispatch_mach_send_flags_t send_flags
,
5524 dispatch_mach_reason_t
*send_result
, mach_error_t
*send_error
)
5526 if (unlikely(send_flags
!= DISPATCH_MACH_SEND_DEFAULT
)) {
5527 DISPATCH_CLIENT_CRASH(send_flags
, "Invalid send flags");
5529 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5530 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5531 options
|= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
;
5532 bool returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, NULL
,options
);
5533 unsigned long reason
= DISPATCH_MACH_NEEDS_DEFERRED_SEND
;
5534 mach_error_t err
= 0;
5535 if (returned_send_result
) {
5536 reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5538 *send_result
= reason
;
5544 _dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm
,
5545 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
,
5546 bool *returned_send_result
)
5548 mach_port_t reply_port
= _dispatch_mach_msg_get_reply_port(dmsg
);
5550 // use per-thread mach reply port <rdar://24597802>
5551 reply_port
= _dispatch_get_thread_reply_port();
5552 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
5553 dispatch_assert(MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
) ==
5554 MACH_MSG_TYPE_MAKE_SEND_ONCE
);
5555 hdr
->msgh_local_port
= reply_port
;
5556 options
|= DISPATCH_MACH_OWNED_REPLY_PORT
;
5559 dispatch_mach_reply_refs_t dmr
;
5561 dmr
= _dispatch_calloc(1, sizeof(*dmr
));
5563 struct dispatch_mach_reply_refs_s dmr_buf
= { };
5566 struct dispatch_continuation_s dc_wait
= {
5567 .dc_flags
= DISPATCH_OBJ_SYNC_SLOW_BIT
,
5570 .dc_priority
= DISPATCH_NO_PRIORITY
,
5571 .dc_voucher
= DISPATCH_NO_VOUCHER
,
5573 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
5574 *returned_send_result
= _dispatch_mach_send_msg(dm
, dmsg
, &dc_wait
,options
);
5575 if (options
& DISPATCH_MACH_OWNED_REPLY_PORT
) {
5576 _dispatch_clear_thread_reply_port(reply_port
);
5578 dmsg
= _dispatch_mach_msg_reply_recv(dm
, dmr
, reply_port
);
5587 dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm
,
5588 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
)
5590 bool returned_send_result
;
5591 dispatch_mach_msg_t reply
;
5592 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5593 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5594 options
|= DISPATCH_MACH_WAIT_FOR_REPLY
;
5595 reply
= _dispatch_mach_send_and_wait_for_reply(dm
, dmsg
, options
,
5596 &returned_send_result
);
5597 dispatch_assert(!returned_send_result
);
5603 dispatch_mach_send_with_result_and_wait_for_reply(dispatch_mach_t dm
,
5604 dispatch_mach_msg_t dmsg
, mach_msg_option_t options
,
5605 dispatch_mach_send_flags_t send_flags
,
5606 dispatch_mach_reason_t
*send_result
, mach_error_t
*send_error
)
5608 if (unlikely(send_flags
!= DISPATCH_MACH_SEND_DEFAULT
)) {
5609 DISPATCH_CLIENT_CRASH(send_flags
, "Invalid send flags");
5611 bool returned_send_result
;
5612 dispatch_mach_msg_t reply
;
5613 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
5614 options
&= ~DISPATCH_MACH_OPTIONS_MASK
;
5615 options
|= DISPATCH_MACH_WAIT_FOR_REPLY
;
5616 options
|= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT
;
5617 reply
= _dispatch_mach_send_and_wait_for_reply(dm
, dmsg
, options
,
5618 &returned_send_result
);
5619 unsigned long reason
= DISPATCH_MACH_NEEDS_DEFERRED_SEND
;
5620 mach_error_t err
= 0;
5621 if (returned_send_result
) {
5622 reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5624 *send_result
= reason
;
5631 _dispatch_mach_disconnect(dispatch_mach_t dm
)
5633 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5636 _dispatch_mach_notification_kevent_unregister(dm
);
5638 if (MACH_PORT_VALID(dr
->dm_send
)) {
5639 _dispatch_mach_msg_disconnected(dm
, MACH_PORT_NULL
, dr
->dm_send
);
5641 dr
->dm_send
= MACH_PORT_NULL
;
5642 if (dr
->dm_checkin
) {
5643 _dispatch_mach_msg_not_sent(dm
, dr
->dm_checkin
);
5644 dr
->dm_checkin
= NULL
;
5646 _dispatch_unfair_lock_lock(&dm
->dm_refs
->dm_replies_lock
);
5647 dispatch_mach_reply_refs_t dmr
, tmp
;
5648 TAILQ_FOREACH_SAFE(dmr
, &dm
->dm_refs
->dm_replies
, dmr_list
, tmp
) {
5649 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
5650 _TAILQ_MARK_NOT_ENQUEUED(dmr
, dmr_list
);
5651 if (dmr
->dmr_dkev
) {
5652 _dispatch_mach_reply_kevent_unregister(dm
, dmr
,
5653 DKEV_UNREGISTER_DISCONNECTED
);
5655 _dispatch_mach_reply_waiter_unregister(dm
, dmr
,
5656 DKEV_UNREGISTER_DISCONNECTED
);
5659 disconnected
= TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
);
5660 _dispatch_unfair_lock_unlock(&dm
->dm_refs
->dm_replies_lock
);
5661 return disconnected
;
5665 _dispatch_mach_cancel(dispatch_mach_t dm
)
5667 _dispatch_object_debug(dm
, "%s", __func__
);
5668 if (!_dispatch_mach_disconnect(dm
)) return;
5670 mach_port_t local_port
= (mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
;
5671 _dispatch_source_kevent_unregister(dm
->_as_ds
);
5672 if ((dm
->dq_atomic_flags
& DSF_STATE_MASK
) == DSF_DELETED
) {
5673 _dispatch_mach_msg_disconnected(dm
, local_port
, MACH_PORT_NULL
);
5676 _dispatch_queue_atomic_flags_set_and_clear(dm
->_as_dq
, DSF_DELETED
,
5677 DSF_ARMED
| DSF_DEFERRED_DELETE
);
5683 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
, dispatch_object_t dou
)
5685 if (!_dispatch_mach_disconnect(dm
)) return false;
5686 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5687 dr
->dm_checkin
= dou
._dc
->dc_data
;
5688 dr
->dm_send
= (mach_port_t
)dou
._dc
->dc_other
;
5689 _dispatch_continuation_free(dou
._dc
);
5690 (void)os_atomic_dec2o(dr
, dm_disconnect_cnt
, relaxed
);
5691 _dispatch_object_debug(dm
, "%s", __func__
);
5692 _dispatch_release(dm
); // <rdar://problem/26266265>
5698 dispatch_mach_reconnect(dispatch_mach_t dm
, mach_port_t send
,
5699 dispatch_mach_msg_t checkin
)
5701 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5702 (void)os_atomic_inc2o(dr
, dm_disconnect_cnt
, relaxed
);
5703 if (MACH_PORT_VALID(send
) && checkin
) {
5704 dispatch_retain(checkin
);
5705 checkin
->dmsg_options
= _dispatch_mach_checkin_options();
5706 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
5709 dr
->dm_checkin_port
= MACH_PORT_NULL
;
5711 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5712 dc
->dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5713 // actually called manually in _dispatch_mach_send_drain
5714 dc
->dc_func
= (void*)_dispatch_mach_reconnect_invoke
;
5716 dc
->dc_data
= checkin
;
5717 dc
->dc_other
= (void*)(uintptr_t)send
;
5718 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5719 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5720 _dispatch_retain(dm
); // <rdar://problem/26266265>
5721 return _dispatch_mach_send_push(dm
, dc
, 0);
5726 dispatch_mach_get_checkin_port(dispatch_mach_t dm
)
5728 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5729 if (slowpath(dm
->dq_atomic_flags
& DSF_CANCELED
)) {
5730 return MACH_PORT_DEAD
;
5732 return dr
->dm_checkin_port
;
5737 _dispatch_mach_connect_invoke(dispatch_mach_t dm
)
5739 dispatch_mach_refs_t dr
= dm
->ds_refs
;
5740 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5741 DISPATCH_MACH_CONNECTED
, NULL
, 0, dr
->dm_handler_func
);
5742 dm
->dm_connect_handler_called
= 1;
5747 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg
,
5748 dispatch_invoke_flags_t flags
)
5750 dispatch_thread_frame_s dtf
;
5751 dispatch_mach_refs_t dr
;
5754 unsigned long reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
5755 _dispatch_thread_set_self_t adopt_flags
= DISPATCH_PRIORITY_ENFORCE
|
5756 DISPATCH_VOUCHER_CONSUME
|DISPATCH_VOUCHER_REPLACE
;
5758 // hide mach channel
5759 dm
= (dispatch_mach_t
)_dispatch_thread_frame_stash(&dtf
);
5761 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
5762 _dispatch_voucher_ktrace_dmsg_pop(dmsg
);
5763 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg
->dmsg_voucher
, dmsg
);
5764 (void)_dispatch_adopt_priority_and_set_voucher(dmsg
->dmsg_priority
,
5765 dmsg
->dmsg_voucher
, adopt_flags
);
5766 dmsg
->dmsg_voucher
= NULL
;
5767 dispatch_invoke_with_autoreleasepool(flags
, {
5768 if (slowpath(!dm
->dm_connect_handler_called
)) {
5769 _dispatch_mach_connect_invoke(dm
);
5771 _dispatch_client_callout4(dr
->dm_handler_ctxt
, reason
, dmsg
, err
,
5772 dr
->dm_handler_func
);
5774 _dispatch_thread_frame_unstash(&dtf
);
5775 _dispatch_introspection_queue_item_complete(dmsg
);
5776 dispatch_release(dmsg
);
5781 _dispatch_mach_barrier_invoke(dispatch_continuation_t dc
,
5782 dispatch_invoke_flags_t flags
)
5784 dispatch_thread_frame_s dtf
;
5785 dispatch_mach_t dm
= dc
->dc_other
;
5786 dispatch_mach_refs_t dr
;
5787 uintptr_t dc_flags
= (uintptr_t)dc
->dc_data
;
5788 unsigned long type
= dc_type(dc
);
5790 // hide mach channel from clients
5791 if (type
== DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER
)) {
5792 // on the send queue, the mach channel isn't the current queue
5793 // its target queue is the current one already
5794 _dispatch_thread_frame_stash(&dtf
);
5797 DISPATCH_COMPILER_CAN_ASSUME(dc_flags
& DISPATCH_OBJ_CONSUME_BIT
);
5798 _dispatch_continuation_pop_forwarded(dc
, dm
->dq_override_voucher
, dc_flags
,{
5799 dispatch_invoke_with_autoreleasepool(flags
, {
5800 if (slowpath(!dm
->dm_connect_handler_called
)) {
5801 _dispatch_mach_connect_invoke(dm
);
5803 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
5804 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5805 DISPATCH_MACH_BARRIER_COMPLETED
, NULL
, 0,
5806 dr
->dm_handler_func
);
5809 if (type
== DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER
)) {
5810 _dispatch_thread_frame_unstash(&dtf
);
5816 dispatch_mach_send_barrier_f(dispatch_mach_t dm
, void *context
,
5817 dispatch_function_t func
)
5819 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5820 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5821 pthread_priority_t pp
;
5823 _dispatch_continuation_init_f(dc
, dm
, context
, func
, 0, 0, dc_flags
);
5824 dc
->dc_data
= (void *)dc
->dc_flags
;
5826 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRIER
);
5827 _dispatch_trace_continuation_push(dm
->_as_dq
, dc
);
5828 pp
= _dispatch_continuation_get_override_priority(dm
->_as_dq
, dc
);
5829 return _dispatch_mach_send_push(dm
, dc
, pp
);
5834 dispatch_mach_send_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
5836 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5837 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5838 pthread_priority_t pp
;
5840 _dispatch_continuation_init(dc
, dm
, barrier
, 0, 0, dc_flags
);
5841 dc
->dc_data
= (void *)dc
->dc_flags
;
5843 dc
->do_vtable
= DC_VTABLE(MACH_SEND_BARRIER
);
5844 _dispatch_trace_continuation_push(dm
->_as_dq
, dc
);
5845 pp
= _dispatch_continuation_get_override_priority(dm
->_as_dq
, dc
);
5846 return _dispatch_mach_send_push(dm
, dc
, pp
);
5851 dispatch_mach_receive_barrier_f(dispatch_mach_t dm
, void *context
,
5852 dispatch_function_t func
)
5854 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5855 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5857 _dispatch_continuation_init_f(dc
, dm
, context
, func
, 0, 0, dc_flags
);
5858 dc
->dc_data
= (void *)dc
->dc_flags
;
5860 dc
->do_vtable
= DC_VTABLE(MACH_RECV_BARRIER
);
5861 return _dispatch_continuation_async(dm
->_as_dq
, dc
);
5866 dispatch_mach_receive_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
5868 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5869 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
5871 _dispatch_continuation_init(dc
, dm
, barrier
, 0, 0, dc_flags
);
5872 dc
->dc_data
= (void *)dc
->dc_flags
;
5874 dc
->do_vtable
= DC_VTABLE(MACH_RECV_BARRIER
);
5875 return _dispatch_continuation_async(dm
->_as_dq
, dc
);
5880 _dispatch_mach_cancel_invoke(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
)
5882 dispatch_mach_refs_t dr
= dm
->ds_refs
;
5884 dispatch_invoke_with_autoreleasepool(flags
, {
5885 if (slowpath(!dm
->dm_connect_handler_called
)) {
5886 _dispatch_mach_connect_invoke(dm
);
5888 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
5889 DISPATCH_MACH_CANCELED
, NULL
, 0, dr
->dm_handler_func
);
5891 dm
->dm_cancel_handler_called
= 1;
5892 _dispatch_release(dm
); // the retain is done at creation time
5897 dispatch_mach_cancel(dispatch_mach_t dm
)
5899 dispatch_source_cancel(dm
->_as_ds
);
5903 _dispatch_mach_install(dispatch_mach_t dm
, pthread_priority_t pp
)
5905 uint32_t disconnect_cnt
;
5908 _dispatch_source_kevent_register(dm
->_as_ds
, pp
);
5910 if (dm
->ds_is_direct_kevent
) {
5911 pp
&= (~_PTHREAD_PRIORITY_FLAGS_MASK
|
5912 _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
|
5913 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
5914 // _dispatch_mach_reply_kevent_register assumes this has been done
5915 // which is unlike regular sources or queues, the DEFAULTQUEUE flag
5916 // is used so that the priority of that channel doesn't act as a floor
5917 // QoS for incoming messages (26761457)
5918 dm
->dq_priority
= (dispatch_priority_t
)pp
;
5920 dm
->ds_is_installed
= true;
5921 if (unlikely(!os_atomic_cmpxchgv2o(dm
->dm_refs
, dm_disconnect_cnt
,
5922 DISPATCH_MACH_NEVER_INSTALLED
, 0, &disconnect_cnt
, release
))) {
5923 DISPATCH_INTERNAL_CRASH(disconnect_cnt
, "Channel already installed");
5928 _dispatch_mach_finalize_activation(dispatch_mach_t dm
)
5930 if (dm
->ds_is_direct_kevent
&& !dm
->ds_is_installed
) {
5931 dispatch_source_t ds
= dm
->_as_ds
;
5932 pthread_priority_t pp
= _dispatch_source_compute_kevent_priority(ds
);
5933 if (pp
) _dispatch_mach_install(dm
, pp
);
5937 _dispatch_queue_finalize_activation(dm
->_as_dq
);
5940 DISPATCH_ALWAYS_INLINE
5941 static inline dispatch_queue_t
5942 _dispatch_mach_invoke2(dispatch_object_t dou
, dispatch_invoke_flags_t flags
,
5943 uint64_t *owned
, struct dispatch_object_s
**dc_ptr DISPATCH_UNUSED
)
5945 dispatch_mach_t dm
= dou
._dm
;
5946 dispatch_queue_t retq
= NULL
;
5947 dispatch_queue_t dq
= _dispatch_queue_get_current();
5949 // This function performs all mach channel actions. Each action is
5950 // responsible for verifying that it takes place on the appropriate queue.
5951 // If the current queue is not the correct queue for this action, the
5952 // correct queue will be returned and the invoke will be re-driven on that
5955 // The order of tests here in invoke and in wakeup should be consistent.
5957 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
5958 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
5960 if (dm
->ds_is_direct_kevent
) {
5961 dkq
= dm
->do_targetq
;
5964 if (slowpath(!dm
->ds_is_installed
)) {
5965 // The channel needs to be installed on the kevent queue.
5969 _dispatch_mach_install(dm
, _dispatch_get_defaultpriority());
5972 if (_dispatch_queue_class_probe(dm
)) {
5973 if (dq
== dm
->do_targetq
) {
5974 retq
= _dispatch_queue_serial_drain(dm
->_as_dq
, flags
, owned
, NULL
);
5976 retq
= dm
->do_targetq
;
5980 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
5983 bool requires_mgr
= dr
->dm_needs_mgr
|| (dr
->dm_disconnect_cnt
&&
5984 (dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
));
5985 if (!(dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) ||
5986 (dqf
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
5987 // The channel has pending messages to send.
5988 if (unlikely(requires_mgr
&& dq
!= &_dispatch_mgr_q
)) {
5989 return retq
? retq
: &_dispatch_mgr_q
;
5991 dispatch_mach_send_invoke_flags_t send_flags
= DM_SEND_INVOKE_NONE
;
5992 if (dq
!= &_dispatch_mgr_q
) {
5993 send_flags
|= DM_SEND_INVOKE_CAN_RUN_BARRIER
;
5995 _dispatch_mach_send_invoke(dm
, flags
, send_flags
);
5997 } else if (dqf
& DSF_CANCELED
) {
5998 // The channel has been cancelled and needs to be uninstalled from the
5999 // manager queue. After uninstallation, the cancellation handler needs
6000 // to be delivered to the target queue.
6001 if ((dqf
& DSF_STATE_MASK
) == (DSF_ARMED
| DSF_DEFERRED_DELETE
)) {
6002 // waiting for the delivery of a deferred delete event
6005 if ((dqf
& DSF_STATE_MASK
) != DSF_DELETED
) {
6006 if (dq
!= &_dispatch_mgr_q
) {
6007 return retq
? retq
: &_dispatch_mgr_q
;
6009 _dispatch_mach_send_invoke(dm
, flags
, DM_SEND_INVOKE_CANCEL
);
6010 dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
6011 if (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
6012 // waiting for the delivery of a deferred delete event
6013 // or deletion didn't happen because send_invoke couldn't
6014 // acquire the send lock
6018 if (!dm
->dm_cancel_handler_called
) {
6019 if (dq
!= dm
->do_targetq
) {
6020 return retq
? retq
: dm
->do_targetq
;
6022 _dispatch_mach_cancel_invoke(dm
, flags
);
6031 _dispatch_mach_invoke(dispatch_mach_t dm
, dispatch_invoke_flags_t flags
)
6033 _dispatch_queue_class_invoke(dm
, flags
, _dispatch_mach_invoke2
);
6037 _dispatch_mach_wakeup(dispatch_mach_t dm
, pthread_priority_t pp
,
6038 dispatch_wakeup_flags_t flags
)
6040 // This function determines whether the mach channel needs to be invoked.
6041 // The order of tests here in probe and in invoke should be consistent.
6043 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
6044 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
6045 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
6046 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(dm
->_as_dq
);
6048 if (dm
->ds_is_direct_kevent
) {
6049 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
6052 if (!dm
->ds_is_installed
) {
6053 // The channel needs to be installed on the kevent queue.
6058 if (_dispatch_queue_class_probe(dm
)) {
6059 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
6063 if (_dispatch_lock_is_locked(dr
->dm_state_lock
.dul_lock
)) {
6064 // Sending and uninstallation below require the send lock, the channel
6065 // will be woken up when the lock is dropped <rdar://15132939&15203957>
6066 _dispatch_queue_reinstate_override_priority(dm
, (dispatch_priority_t
)pp
);
6071 bool requires_mgr
= dr
->dm_needs_mgr
|| (dr
->dm_disconnect_cnt
&&
6072 (dm
->dm_dkev
|| !dm
->ds_is_direct_kevent
));
6073 if (!(dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
)) ||
6074 (dqf
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
6075 if (unlikely(requires_mgr
)) {
6076 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
6078 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
6081 // can happen when we can't send because the port is full
6082 // but we should not lose the override
6083 _dispatch_queue_reinstate_override_priority(dm
,
6084 (dispatch_priority_t
)pp
);
6086 } else if (dqf
& DSF_CANCELED
) {
6087 if ((dqf
& DSF_STATE_MASK
) == (DSF_ARMED
| DSF_DEFERRED_DELETE
)) {
6088 // waiting for the delivery of a deferred delete event
6089 } else if ((dqf
& DSF_STATE_MASK
) != DSF_DELETED
) {
6090 // The channel needs to be uninstalled from the manager queue
6091 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
6092 } else if (!dm
->dm_cancel_handler_called
) {
6093 // the cancellation handler needs to be delivered to the target
6095 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
6101 return _dispatch_queue_class_wakeup(dm
->_as_dq
, pp
, flags
, tq
);
6103 return _dispatch_queue_class_override_drainer(dm
->_as_dq
, pp
, flags
);
6104 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
6105 return _dispatch_release_tailcall(dm
);
6110 #pragma mark dispatch_mach_msg_t
6113 dispatch_mach_msg_create(mach_msg_header_t
*msg
, size_t size
,
6114 dispatch_mach_msg_destructor_t destructor
, mach_msg_header_t
**msg_ptr
)
6116 if (slowpath(size
< sizeof(mach_msg_header_t
)) ||
6117 slowpath(destructor
&& !msg
)) {
6118 DISPATCH_CLIENT_CRASH(size
, "Empty message");
6120 dispatch_mach_msg_t dmsg
= _dispatch_alloc(DISPATCH_VTABLE(mach_msg
),
6121 sizeof(struct dispatch_mach_msg_s
) +
6122 (destructor
? 0 : size
- sizeof(dmsg
->dmsg_msg
)));
6124 dmsg
->dmsg_msg
= msg
;
6126 memcpy(dmsg
->dmsg_buf
, msg
, size
);
6128 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
6129 dmsg
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
6131 dmsg
->dmsg_destructor
= destructor
;
6132 dmsg
->dmsg_size
= size
;
6134 *msg_ptr
= _dispatch_mach_msg_get_msg(dmsg
);
6140 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg
)
6142 if (dmsg
->dmsg_voucher
) {
6143 _voucher_release(dmsg
->dmsg_voucher
);
6144 dmsg
->dmsg_voucher
= NULL
;
6146 switch (dmsg
->dmsg_destructor
) {
6147 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
6149 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE
:
6150 free(dmsg
->dmsg_msg
);
6152 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE
: {
6153 mach_vm_size_t vm_size
= dmsg
->dmsg_size
;
6154 mach_vm_address_t vm_addr
= (uintptr_t)dmsg
->dmsg_msg
;
6155 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
6161 static inline mach_msg_header_t
*
6162 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
)
6164 return dmsg
->dmsg_destructor
? dmsg
->dmsg_msg
:
6165 (mach_msg_header_t
*)dmsg
->dmsg_buf
;
6169 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
, size_t *size_ptr
)
6172 *size_ptr
= dmsg
->dmsg_size
;
6174 return _dispatch_mach_msg_get_msg(dmsg
);
6178 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg
, char* buf
, size_t bufsiz
)
6181 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6182 dx_kind(dmsg
), dmsg
);
6183 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "xrefcnt = 0x%x, "
6184 "refcnt = 0x%x, ", dmsg
->do_xref_cnt
+ 1, dmsg
->do_ref_cnt
+ 1);
6185 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "opts/err = 0x%x, "
6186 "msgh[%p] = { ", dmsg
->dmsg_options
, dmsg
->dmsg_buf
);
6187 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
6189 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "id 0x%x, ",
6192 if (hdr
->msgh_size
) {
6193 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "size %u, ",
6196 if (hdr
->msgh_bits
) {
6197 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "bits <l %u, r %u",
6198 MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
),
6199 MACH_MSGH_BITS_REMOTE(hdr
->msgh_bits
));
6200 if (MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
)) {
6201 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", o 0x%x",
6202 MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
));
6204 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ">, ");
6206 if (hdr
->msgh_local_port
&& hdr
->msgh_remote_port
) {
6207 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x, "
6208 "remote 0x%x", hdr
->msgh_local_port
, hdr
->msgh_remote_port
);
6209 } else if (hdr
->msgh_local_port
) {
6210 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x",
6211 hdr
->msgh_local_port
);
6212 } else if (hdr
->msgh_remote_port
) {
6213 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "remote 0x%x",
6214 hdr
->msgh_remote_port
);
6216 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "no ports");
6218 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, " } }");
6223 #pragma mark dispatch_mig_server
6226 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
,
6227 dispatch_mig_callback_t callback
)
6229 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
6230 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
6231 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
) | MACH_RCV_VOUCHER
;
6232 mach_msg_options_t tmp_options
;
6233 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
6234 mach_msg_return_t kr
= 0;
6235 uint64_t assertion_token
= 0;
6236 unsigned int cnt
= 1000; // do not stall out serial queues
6237 boolean_t demux_success
;
6238 bool received
= false;
6239 size_t rcv_size
= maxmsgsz
+ MAX_TRAILER_SIZE
;
6241 bufRequest
= alloca(rcv_size
);
6242 bufRequest
->RetCode
= 0;
6243 for (mach_vm_address_t p
= mach_vm_trunc_page(bufRequest
+ vm_page_size
);
6244 p
< (mach_vm_address_t
)bufRequest
+ rcv_size
; p
+= vm_page_size
) {
6245 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
6248 bufReply
= alloca(rcv_size
);
6249 bufReply
->Head
.msgh_size
= 0;
6250 for (mach_vm_address_t p
= mach_vm_trunc_page(bufReply
+ vm_page_size
);
6251 p
< (mach_vm_address_t
)bufReply
+ rcv_size
; p
+= vm_page_size
) {
6252 *(char*)p
= 0; // ensure alloca buffer doesn't overlap with stack guard
6256 options
|= MACH_RCV_LARGE
; // rdar://problem/8422992
6258 tmp_options
= options
;
6259 // XXX FIXME -- change this to not starve out the target queue
6261 if (DISPATCH_QUEUE_IS_SUSPENDED(ds
) || (--cnt
== 0)) {
6262 options
&= ~MACH_RCV_MSG
;
6263 tmp_options
&= ~MACH_RCV_MSG
;
6265 if (!(tmp_options
& MACH_SEND_MSG
)) {
6269 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
6270 (mach_msg_size_t
)rcv_size
, (mach_port_t
)ds
->ds_ident_hack
, 0,0);
6272 tmp_options
= options
;
6276 case MACH_SEND_INVALID_DEST
:
6277 case MACH_SEND_TIMED_OUT
:
6278 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
6279 mach_msg_destroy(&bufReply
->Head
);
6282 case MACH_RCV_TIMED_OUT
:
6283 // Don't return an error if a message was sent this time or
6284 // a message was successfully received previously
6285 // rdar://problems/7363620&7791738
6286 if(bufReply
->Head
.msgh_remote_port
|| received
) {
6287 kr
= MACH_MSG_SUCCESS
;
6290 case MACH_RCV_INVALID_NAME
:
6293 case MACH_RCV_TOO_LARGE
:
6294 // receive messages that are too large and log their id and size
6295 // rdar://problem/8422992
6296 tmp_options
&= ~MACH_RCV_LARGE
;
6297 size_t large_size
= bufReply
->Head
.msgh_size
+ MAX_TRAILER_SIZE
;
6298 void *large_buf
= malloc(large_size
);
6300 rcv_size
= large_size
;
6301 bufReply
= large_buf
;
6303 if (!mach_msg(&bufReply
->Head
, tmp_options
, 0,
6304 (mach_msg_size_t
)rcv_size
,
6305 (mach_port_t
)ds
->ds_ident_hack
, 0, 0)) {
6306 _dispatch_log("BUG in libdispatch client: "
6307 "dispatch_mig_server received message larger than "
6308 "requested size %zd: id = 0x%x, size = %d",
6309 maxmsgsz
, bufReply
->Head
.msgh_id
,
6310 bufReply
->Head
.msgh_size
);
6318 _dispatch_bug_mach_client(
6319 "dispatch_mig_server: mach_msg() failed", kr
);
6325 if (!(tmp_options
& MACH_RCV_MSG
)) {
6329 if (assertion_token
) {
6330 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6331 int r
= proc_importance_assertion_complete(assertion_token
);
6332 (void)dispatch_assume_zero(r
);
6334 assertion_token
= 0;
6338 bufTemp
= bufRequest
;
6339 bufRequest
= bufReply
;
6342 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6343 #pragma clang diagnostic push
6344 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
6345 int r
= proc_importance_assertion_begin_with_msg(&bufRequest
->Head
,
6346 NULL
, &assertion_token
);
6347 if (r
&& slowpath(r
!= EIO
)) {
6348 (void)dispatch_assume_zero(r
);
6350 #pragma clang diagnostic pop
6352 _voucher_replace(voucher_create_with_mach_msg(&bufRequest
->Head
));
6353 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
6355 if (!demux_success
) {
6356 // destroy the request - but not the reply port
6357 bufRequest
->Head
.msgh_remote_port
= 0;
6358 mach_msg_destroy(&bufRequest
->Head
);
6359 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
6360 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
6362 if (slowpath(bufReply
->RetCode
)) {
6363 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
6367 // destroy the request - but not the reply port
6368 bufRequest
->Head
.msgh_remote_port
= 0;
6369 mach_msg_destroy(&bufRequest
->Head
);
6373 if (bufReply
->Head
.msgh_remote_port
) {
6374 tmp_options
|= MACH_SEND_MSG
;
6375 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) !=
6376 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
6377 tmp_options
|= MACH_SEND_TIMEOUT
;
6383 if (assertion_token
) {
6384 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6385 int r
= proc_importance_assertion_complete(assertion_token
);
6386 (void)dispatch_assume_zero(r
);
6393 #endif /* HAVE_MACH */
6396 #pragma mark dispatch_source_debug
6400 _evfiltstr(short filt
)
6403 #define _evfilt2(f) case (f): return #f
6404 _evfilt2(EVFILT_READ
);
6405 _evfilt2(EVFILT_WRITE
);
6406 _evfilt2(EVFILT_AIO
);
6407 _evfilt2(EVFILT_VNODE
);
6408 _evfilt2(EVFILT_PROC
);
6409 _evfilt2(EVFILT_SIGNAL
);
6410 _evfilt2(EVFILT_TIMER
);
6412 _evfilt2(EVFILT_MACHPORT
);
6413 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION
);
6415 _evfilt2(EVFILT_FS
);
6416 _evfilt2(EVFILT_USER
);
6418 _evfilt2(EVFILT_VM
);
6421 _evfilt2(EVFILT_SOCK
);
6423 #ifdef EVFILT_MEMORYSTATUS
6424 _evfilt2(EVFILT_MEMORYSTATUS
);
6427 _evfilt2(DISPATCH_EVFILT_TIMER
);
6428 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
6429 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
6431 return "EVFILT_missing";
6437 _evflagstr2(uint16_t *flagsp
)
6439 #define _evflag2(f) \
6440 if ((*flagsp & (f)) == (f) && (f)) { \
6445 _evflag2(EV_DELETE
);
6446 _evflag2(EV_ENABLE
);
6447 _evflag2(EV_DISABLE
);
6448 _evflag2(EV_ONESHOT
);
6450 _evflag2(EV_RECEIPT
);
6451 _evflag2(EV_DISPATCH
);
6452 _evflag2(EV_UDATA_SPECIFIC
);
6457 _evflag2(EV_OOBAND
);
6461 _evflag2(EV_VANISHED
);
6463 return "EV_UNKNOWN ";
6468 _evflagstr(uint16_t flags
, char *str
, size_t strsize
)
6472 strlcat(str
, _evflagstr2(&flags
), strsize
);
6474 size_t sz
= strlen(str
);
6475 if (sz
) str
[sz
-1] = 0;
6481 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6483 dispatch_queue_t target
= ds
->do_targetq
;
6484 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%lx, "
6485 "mask = 0x%lx, pending_data = 0x%lx, registered = %d, "
6486 "armed = %d, deleted = %d%s, canceled = %d, ",
6487 target
&& target
->dq_label
? target
->dq_label
: "", target
,
6488 ds
->ds_ident_hack
, ds
->ds_pending_data_mask
, ds
->ds_pending_data
,
6489 ds
->ds_is_installed
, (bool)(ds
->dq_atomic_flags
& DSF_ARMED
),
6490 (bool)(ds
->dq_atomic_flags
& DSF_DELETED
),
6491 (ds
->dq_atomic_flags
& DSF_DEFERRED_DELETE
) ? " (pending)" : "",
6492 (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
));
6496 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6498 dispatch_source_refs_t dr
= ds
->ds_refs
;
6499 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx"
6500 ", last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
6501 (unsigned long long)ds_timer(dr
).target
,
6502 (unsigned long long)ds_timer(dr
).deadline
,
6503 (unsigned long long)ds_timer(dr
).last_fire
,
6504 (unsigned long long)ds_timer(dr
).interval
, ds_timer(dr
).flags
);
6508 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
6511 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6513 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6514 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6515 if (ds
->ds_is_timer
) {
6516 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
6521 } else if (ds
->ds_is_custom_source
) {
6522 filter
= _evfiltstr((int16_t)(uintptr_t)ds
->ds_dkev
);
6524 filter
= _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
);
6526 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "kevent = %p%s, "
6527 "filter = %s }", ds
->ds_dkev
, ds
->ds_is_direct_kevent
? " (direct)"
6534 _dispatch_mach_debug_attr(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
6536 dispatch_queue_t target
= dm
->do_targetq
;
6537 return dsnprintf(buf
, bufsiz
, "target = %s[%p], receive = 0x%x, "
6538 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
6539 "send state = %016llx, disconnected = %d, canceled = %d ",
6540 target
&& target
->dq_label
? target
->dq_label
: "", target
,
6541 dm
->ds_dkev
?(mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
:0,
6542 dm
->dm_refs
->dm_send
,
6543 dm
->dm_dkev
?(mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
:0,
6544 dm
->dm_dkev
&& DISPATCH_MACH_NOTIFICATION_ARMED(dm
->dm_dkev
) ?
6545 " (armed)" : "", dm
->dm_refs
->dm_checkin_port
,
6546 dm
->dm_refs
->dm_checkin
? " (pending)" : "",
6547 dm
->dm_refs
->dm_state
, dm
->dm_refs
->dm_disconnect_cnt
,
6548 (bool)(dm
->dq_atomic_flags
& DSF_CANCELED
));
6552 _dispatch_mach_debug(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
6555 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
6556 dm
->dq_label
&& !dm
->dm_cancel_handler_called
? dm
->dq_label
:
6558 offset
+= _dispatch_object_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
6559 offset
+= _dispatch_mach_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
6560 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
6568 dispatch_kevent_debug(const char *verb
, const _dispatch_kevent_qos_s
*kev
,
6569 int i
, int n
, const char *function
, unsigned int line
)
6575 snprintf(i_n
, sizeof(i_n
), "%d/%d ", i
+ 1, n
);
6579 #if DISPATCH_USE_KEVENT_QOS
6580 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6581 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6582 "qos = 0x%x, ext[0] = 0x%llx, ext[1] = 0x%llx, ext[2] = 0x%llx, "
6583 "ext[3] = 0x%llx }: %s #%u", verb
, kev
, i_n
, kev
->ident
,
6584 _evfiltstr(kev
->filter
), _evflagstr(kev
->flags
, flagstr
,
6585 sizeof(flagstr
)), kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
,
6586 kev
->qos
, kev
->ext
[0], kev
->ext
[1], kev
->ext
[2], kev
->ext
[3],
6589 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6590 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6591 "ext[0] = 0x%llx, ext[1] = 0x%llx }: %s #%u", verb
, kev
, i_n
,
6592 kev
->ident
, _evfiltstr(kev
->filter
), _evflagstr(kev
->flags
, flagstr
,
6593 sizeof(flagstr
)), kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
,
6594 #ifndef IGNORE_KEVENT64_EXT
6595 kev
->ext
[0], kev
->ext
[1],
6604 _dispatch_kevent_debugger2(void *context
)
6607 socklen_t sa_len
= sizeof(sa
);
6608 int c
, fd
= (int)(long)context
;
6610 dispatch_kevent_t dk
;
6611 dispatch_source_t ds
;
6612 dispatch_source_refs_t dr
;
6615 c
= accept(fd
, &sa
, &sa_len
);
6617 if (errno
!= EAGAIN
) {
6618 (void)dispatch_assume_zero(errno
);
6623 int r
= fcntl(c
, F_SETFL
, 0); // disable non-blocking IO
6625 (void)dispatch_assume_zero(errno
);
6628 debug_stream
= fdopen(c
, "a");
6629 if (!dispatch_assume(debug_stream
)) {
6634 fprintf(debug_stream
, "HTTP/1.0 200 OK\r\n");
6635 fprintf(debug_stream
, "Content-type: text/html\r\n");
6636 fprintf(debug_stream
, "Pragma: nocache\r\n");
6637 fprintf(debug_stream
, "\r\n");
6638 fprintf(debug_stream
, "<html>\n");
6639 fprintf(debug_stream
, "<head><title>PID %u</title></head>\n", getpid());
6640 fprintf(debug_stream
, "<body>\n<ul>\n");
6642 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
6643 if (TAILQ_EMPTY(&_dispatch_sources
[i
])) {
6646 TAILQ_FOREACH(dk
, &_dispatch_sources
[i
], dk_list
) {
6647 fprintf(debug_stream
, "\t<br><li>DK %p ident %lu filter %s flags "
6648 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
6649 dk
, (unsigned long)dk
->dk_kevent
.ident
,
6650 _evfiltstr(dk
->dk_kevent
.filter
), dk
->dk_kevent
.flags
,
6651 dk
->dk_kevent
.fflags
, (unsigned long)dk
->dk_kevent
.data
,
6652 (void*)dk
->dk_kevent
.udata
);
6653 fprintf(debug_stream
, "\t\t<ul>\n");
6654 TAILQ_FOREACH(dr
, &dk
->dk_sources
, dr_list
) {
6655 ds
= _dispatch_source_from_refs(dr
);
6656 fprintf(debug_stream
, "\t\t\t<li>DS %p refcnt 0x%x state "
6657 "0x%llx data 0x%lx mask 0x%lx flags 0x%x</li>\n",
6658 ds
, ds
->do_ref_cnt
+ 1, ds
->dq_state
,
6659 ds
->ds_pending_data
, ds
->ds_pending_data_mask
,
6660 ds
->dq_atomic_flags
);
6661 if (_dq_state_is_enqueued(ds
->dq_state
)) {
6662 dispatch_queue_t dq
= ds
->do_targetq
;
6663 fprintf(debug_stream
, "\t\t<br>DQ: %p refcnt 0x%x state "
6664 "0x%llx label: %s\n", dq
, dq
->do_ref_cnt
+ 1,
6665 dq
->dq_state
, dq
->dq_label
?: "");
6668 fprintf(debug_stream
, "\t\t</ul>\n");
6669 fprintf(debug_stream
, "\t</li>\n");
6672 fprintf(debug_stream
, "</ul>\n</body>\n</html>\n");
6673 fflush(debug_stream
);
6674 fclose(debug_stream
);
6678 _dispatch_kevent_debugger2_cancel(void *context
)
6680 int ret
, fd
= (int)(long)context
;
6684 (void)dispatch_assume_zero(errno
);
6689 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED
)
6692 struct sockaddr_in sa_in
;
6696 .sin_family
= AF_INET
,
6697 .sin_addr
= { htonl(INADDR_LOOPBACK
), },
6700 dispatch_source_t ds
;
6702 int val
, r
, fd
, sock_opt
= 1;
6703 socklen_t slen
= sizeof(sa_u
);
6710 valstr
= getenv("LIBDISPATCH_DEBUGGER");
6716 sa_u
.sa_in
.sin_addr
.s_addr
= 0;
6718 fd
= socket(PF_INET
, SOCK_STREAM
, 0);
6720 (void)dispatch_assume_zero(errno
);
6723 r
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&sock_opt
,
6724 (socklen_t
) sizeof sock_opt
);
6726 (void)dispatch_assume_zero(errno
);
6730 r
= fcntl(fd
, F_SETFL
, O_NONBLOCK
);
6732 (void)dispatch_assume_zero(errno
);
6736 r
= bind(fd
, &sa_u
.sa
, sizeof(sa_u
));
6738 (void)dispatch_assume_zero(errno
);
6741 r
= listen(fd
, SOMAXCONN
);
6743 (void)dispatch_assume_zero(errno
);
6746 r
= getsockname(fd
, &sa_u
.sa
, &slen
);
6748 (void)dispatch_assume_zero(errno
);
6752 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, (uintptr_t)fd
, 0,
6754 if (dispatch_assume(ds
)) {
6755 _dispatch_log("LIBDISPATCH: debug port: %hu",
6756 (in_port_t
)ntohs(sa_u
.sa_in
.sin_port
));
6758 /* ownership of fd transfers to ds */
6759 dispatch_set_context(ds
, (void *)(long)fd
);
6760 dispatch_source_set_event_handler_f(ds
, _dispatch_kevent_debugger2
);
6761 dispatch_source_set_cancel_handler_f(ds
,
6762 _dispatch_kevent_debugger2_cancel
);
6763 dispatch_resume(ds
);
6773 #ifndef MACH_PORT_TYPE_SPREQUEST
6774 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
6779 dispatch_debug_machport(mach_port_t name
, const char* str
)
6781 mach_port_type_t type
;
6782 mach_msg_bits_t ns
= 0, nr
= 0, nso
= 0, nd
= 0;
6783 unsigned int dnreqs
= 0, dnrsiz
;
6784 kern_return_t kr
= mach_port_type(mach_task_self(), name
, &type
);
6786 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name
,
6787 kr
, mach_error_string(kr
), str
);
6790 if (type
& MACH_PORT_TYPE_SEND
) {
6791 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6792 MACH_PORT_RIGHT_SEND
, &ns
));
6794 if (type
& MACH_PORT_TYPE_SEND_ONCE
) {
6795 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6796 MACH_PORT_RIGHT_SEND_ONCE
, &nso
));
6798 if (type
& MACH_PORT_TYPE_DEAD_NAME
) {
6799 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6800 MACH_PORT_RIGHT_DEAD_NAME
, &nd
));
6802 if (type
& (MACH_PORT_TYPE_RECEIVE
|MACH_PORT_TYPE_SEND
)) {
6803 kr
= mach_port_dnrequest_info(mach_task_self(), name
, &dnrsiz
, &dnreqs
);
6804 if (kr
!= KERN_INVALID_RIGHT
) (void)dispatch_assume_zero(kr
);
6806 if (type
& MACH_PORT_TYPE_RECEIVE
) {
6807 mach_port_status_t status
= { .mps_pset
= 0, };
6808 mach_msg_type_number_t cnt
= MACH_PORT_RECEIVE_STATUS_COUNT
;
6809 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
6810 MACH_PORT_RIGHT_RECEIVE
, &nr
));
6811 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
6812 name
, MACH_PORT_RECEIVE_STATUS
, (void*)&status
, &cnt
));
6813 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6814 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
6815 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
6816 "seqno(%03u) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
6817 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N",
6818 status
.mps_nsrequest
? "Y":"N", status
.mps_pdrequest
? "Y":"N",
6819 status
.mps_srights
? "Y":"N", status
.mps_sorights
,
6820 status
.mps_qlimit
, status
.mps_msgcount
, status
.mps_mscount
,
6821 status
.mps_seqno
, str
);
6822 } else if (type
& (MACH_PORT_TYPE_SEND
|MACH_PORT_TYPE_SEND_ONCE
|
6823 MACH_PORT_TYPE_DEAD_NAME
)) {
6824 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6825 "dnreqs(%03u) spreq(%s) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
6826 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N", str
);
6828 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name
, type
,
6835 #endif // DISPATCH_DEBUG