2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 #include "protocolServer.h"
26 #include <sys/mount.h>
28 #define DKEV_DISPOSE_IMMEDIATE_DELETE 0x1
29 #define DKEV_DISPOSE_IGNORE_ENOENT 0x2
31 static void _dispatch_source_merge_kevent(dispatch_source_t ds
,
32 const _dispatch_kevent_qos_s
*ke
);
33 static bool _dispatch_kevent_register(dispatch_kevent_t
*dkp
, uint32_t *flgp
);
34 static long _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
,
36 static long _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
38 static void _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
);
39 static void _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
);
40 static void _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
);
41 static void _dispatch_timers_unregister(dispatch_source_t ds
,
42 dispatch_kevent_t dk
);
43 static void _dispatch_timers_update(dispatch_source_t ds
);
44 static void _dispatch_timer_aggregates_check(void);
45 static void _dispatch_timer_aggregates_register(dispatch_source_t ds
);
46 static void _dispatch_timer_aggregates_update(dispatch_source_t ds
,
48 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds
,
50 static inline unsigned long _dispatch_source_timer_data(
51 dispatch_source_refs_t dr
, unsigned long prev
);
52 static long _dispatch_kq_update(const _dispatch_kevent_qos_s
*);
53 static void _dispatch_memorystatus_init(void);
55 static void _dispatch_mach_host_calendar_change_register(void);
56 static void _dispatch_mach_recv_msg_buf_init(void);
57 static kern_return_t
_dispatch_kevent_machport_resume(dispatch_kevent_t dk
,
58 uint32_t new_flags
, uint32_t del_flags
);
59 static kern_return_t
_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
,
60 uint32_t new_flags
, uint32_t del_flags
);
61 static inline void _dispatch_kevent_mach_portset(_dispatch_kevent_qos_s
*ke
);
63 static inline void _dispatch_mach_host_calendar_change_register(void) {}
64 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
66 static const char * _evfiltstr(short filt
);
68 static void _dispatch_kevent_debug(const _dispatch_kevent_qos_s
* kev
,
70 static void _dispatch_kevent_debugger(void *context
);
71 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
72 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
75 _dispatch_kevent_debug(const _dispatch_kevent_qos_s
* kev DISPATCH_UNUSED
,
76 const char* str DISPATCH_UNUSED
) {}
77 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
79 #ifndef DISPATCH_MGR_QUEUE_DEBUG
80 #define DISPATCH_MGR_QUEUE_DEBUG 0
82 #if DISPATCH_MGR_QUEUE_DEBUG
83 #define _dispatch_kevent_mgr_debug _dispatch_kevent_debug
86 _dispatch_kevent_mgr_debug(_dispatch_kevent_qos_s
* kev DISPATCH_UNUSED
,
87 const char* str DISPATCH_UNUSED
) {}
91 #pragma mark dispatch_source_t
94 dispatch_source_create(dispatch_source_type_t type
,
99 const _dispatch_kevent_qos_s
*proto_kev
= &type
->ke
;
100 dispatch_source_t ds
;
101 dispatch_kevent_t dk
;
104 if (type
== NULL
|| (mask
& ~type
->mask
)) {
108 switch (type
->ke
.filter
) {
110 if (handle
>= NSIG
) {
115 #if DISPATCH_USE_VM_PRESSURE
118 #if DISPATCH_USE_MEMORYSTATUS
119 case EVFILT_MEMORYSTATUS
:
121 case DISPATCH_EVFILT_CUSTOM_ADD
:
122 case DISPATCH_EVFILT_CUSTOM_OR
:
127 case DISPATCH_EVFILT_TIMER
:
128 if (!!handle
^ !!type
->ke
.ident
) {
136 ds
= _dispatch_alloc(DISPATCH_VTABLE(source
),
137 sizeof(struct dispatch_source_s
));
138 // Initialize as a queue first, then override some settings below.
139 _dispatch_queue_init((dispatch_queue_t
)ds
);
140 ds
->dq_label
= "source";
142 ds
->do_ref_cnt
++; // the reference the manager queue holds
143 ds
->do_ref_cnt
++; // since source is created suspended
144 ds
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
146 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
147 dk
->dk_kevent
= *proto_kev
;
148 dk
->dk_kevent
.ident
= handle
;
149 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
150 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
151 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
152 TAILQ_INIT(&dk
->dk_sources
);
155 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
156 ds
->ds_ident_hack
= (uintptr_t)dk
->dk_kevent
.ident
;
157 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
158 ds
->ds_is_level
= true;
159 ds
->ds_needs_rearm
= true;
160 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
161 // we cheat and use EV_CLEAR to mean a "flag thingy"
162 ds
->ds_is_adder
= true;
164 if (EV_UDATA_SPECIFIC
& proto_kev
->flags
) {
165 dispatch_assert(!(EV_ONESHOT
& proto_kev
->flags
));
166 dk
->dk_kevent
.flags
|= EV_DISPATCH
;
167 ds
->ds_is_direct_kevent
= true;
168 ds
->ds_needs_rearm
= true;
170 // Some sources require special processing
171 if (type
->init
!= NULL
) {
172 type
->init(ds
, type
, handle
, mask
, dq
);
174 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
176 if (fastpath(!ds
->ds_refs
)) {
177 ds
->ds_refs
= _dispatch_calloc(1ul,
178 sizeof(struct dispatch_source_refs_s
));
180 ds
->ds_refs
->dr_source_wref
= _dispatch_ptr2wref(ds
);
182 if (!ds
->ds_is_direct_kevent
) {
183 // The initial target queue is the manager queue, in order to get
184 // the source installed. <rdar://problem/8928171>
185 ds
->do_targetq
= &_dispatch_mgr_q
;
186 // First item on the queue sets the user-specified target queue
187 dispatch_set_target_queue(ds
, dq
);
190 dq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
192 _dispatch_retain(dq
);
195 _dispatch_queue_priority_inherit_from_target((dispatch_queue_t
)ds
, dq
);
196 _dispatch_queue_set_override_priority(dq
);
198 _dispatch_object_debug(ds
, "%s", __func__
);
202 DISPATCH_ALWAYS_INLINE
203 static inline dispatch_queue_t
204 _dispatch_source_get_kevent_queue(dispatch_source_t ds
)
206 if (ds
->ds_is_direct_kevent
) {
207 return ds
->do_targetq
;
209 return &_dispatch_mgr_q
;
213 _dispatch_source_dispose(dispatch_source_t ds
)
215 _dispatch_object_debug(ds
, "%s", __func__
);
217 _dispatch_queue_destroy(ds
);
221 _dispatch_source_xref_dispose(dispatch_source_t ds
)
223 _dispatch_wakeup(ds
);
227 dispatch_source_cancel(dispatch_source_t ds
)
229 _dispatch_object_debug(ds
, "%s", __func__
);
230 // Right after we set the cancel flag, someone else
231 // could potentially invoke the source, do the cancelation,
232 // unregister the source, and deallocate it. We would
233 // need to therefore retain/release before setting the bit
235 _dispatch_retain(ds
);
236 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_CANCELED
, relaxed
);
237 _dispatch_wakeup(ds
);
238 _dispatch_release(ds
);
242 dispatch_source_testcancel(dispatch_source_t ds
)
244 return (bool)(ds
->ds_atomic_flags
& DSF_CANCELED
);
248 dispatch_source_get_mask(dispatch_source_t ds
)
250 unsigned long mask
= ds
->ds_pending_data_mask
;
251 if (ds
->ds_vmpressure_override
) {
252 mask
= NOTE_VM_PRESSURE
;
254 #if TARGET_IPHONE_SIMULATOR
255 else if (ds
->ds_memorystatus_override
) {
256 mask
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
263 dispatch_source_get_handle(dispatch_source_t ds
)
265 unsigned int handle
= (unsigned int)ds
->ds_ident_hack
;
266 #if TARGET_IPHONE_SIMULATOR
267 if (ds
->ds_memorystatus_override
) {
275 dispatch_source_get_data(dispatch_source_t ds
)
277 unsigned long data
= ds
->ds_data
;
278 if (ds
->ds_vmpressure_override
) {
279 data
= NOTE_VM_PRESSURE
;
281 #if TARGET_IPHONE_SIMULATOR
282 else if (ds
->ds_memorystatus_override
) {
283 data
= NOTE_MEMORYSTATUS_PRESSURE_WARN
;
290 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
292 _dispatch_kevent_qos_s kev
= {
293 .fflags
= (typeof(kev
.fflags
))val
,
294 .data
= (typeof(kev
.data
))val
,
298 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_ADD
||
299 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_OR
);
301 _dispatch_source_merge_kevent(ds
, &kev
);
305 #pragma mark dispatch_source_handler
307 DISPATCH_ALWAYS_INLINE
308 static inline dispatch_continuation_t
309 _dispatch_source_handler_alloc(dispatch_source_t ds
, void *handler
, long kind
,
312 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
314 dc
->do_vtable
= (void *)((block
? DISPATCH_OBJ_BLOCK_RELEASE_BIT
:
315 DISPATCH_OBJ_CTXT_FETCH_BIT
) | (kind
!= DS_EVENT_HANDLER
?
316 DISPATCH_OBJ_ASYNC_BIT
: 0l));
318 dc
->dc_voucher
= NULL
;
321 if (slowpath(_dispatch_block_has_private_data(handler
))) {
322 // sources don't propagate priority by default
323 dispatch_block_flags_t flags
= DISPATCH_BLOCK_NO_QOS_CLASS
;
324 flags
|= _dispatch_block_get_flags(handler
);
325 _dispatch_continuation_priority_set(dc
,
326 _dispatch_block_get_priority(handler
), flags
);
328 if (kind
!= DS_EVENT_HANDLER
) {
329 dc
->dc_func
= _dispatch_call_block_and_release
;
331 dc
->dc_func
= _dispatch_Block_invoke(handler
);
333 dc
->dc_ctxt
= _dispatch_Block_copy(handler
);
334 #endif /* __BLOCKS__ */
336 dc
->dc_func
= handler
;
337 dc
->dc_ctxt
= ds
->do_ctxt
;
339 _dispatch_trace_continuation_push((dispatch_queue_t
)ds
, dc
);
343 dc
->dc_data
= (void*)kind
;
348 _dispatch_source_handler_replace(dispatch_source_refs_t dr
, long kind
,
349 dispatch_continuation_t dc_new
)
351 dispatch_continuation_t dc
= dr
->ds_handler
[kind
];
354 if ((long)dc
->do_vtable
& DISPATCH_OBJ_BLOCK_RELEASE_BIT
) {
355 Block_release(dc
->dc_ctxt
);
357 #endif /* __BLOCKS__ */
358 if (dc
->dc_voucher
) {
359 _voucher_release(dc
->dc_voucher
);
360 dc
->dc_voucher
= NULL
;
362 _dispatch_continuation_free(dc
);
364 dr
->ds_handler
[kind
] = dc_new
;
368 _dispatch_source_handler_free(dispatch_source_refs_t dr
, long kind
)
370 _dispatch_source_handler_replace(dr
, kind
, NULL
);
374 _dispatch_source_set_handler(void *context
)
376 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
377 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
378 dispatch_continuation_t dc
= context
;
379 long kind
= (long)dc
->dc_data
;
382 _dispatch_continuation_free(dc
);
384 } else if ((long)dc
->do_vtable
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
385 dc
->dc_ctxt
= ds
->do_ctxt
;
387 _dispatch_source_handler_replace(ds
->ds_refs
, kind
, dc
);
388 if (kind
== DS_EVENT_HANDLER
&& dc
&& dc
->dc_priority
) {
389 #if HAVE_PTHREAD_WORKQUEUE_QOS
390 ds
->dq_priority
= dc
->dc_priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
;
391 _dispatch_queue_set_override_priority((dispatch_queue_t
)ds
);
398 dispatch_source_set_event_handler(dispatch_source_t ds
,
399 dispatch_block_t handler
)
401 dispatch_continuation_t dc
;
402 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, true);
403 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
404 _dispatch_source_set_handler
);
406 #endif /* __BLOCKS__ */
409 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
410 dispatch_function_t handler
)
412 dispatch_continuation_t dc
;
413 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
414 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
415 _dispatch_source_set_handler
);
419 _dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds
,
420 void *ctxt
, dispatch_function_t handler
)
422 dispatch_continuation_t dc
;
423 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
424 dc
->do_vtable
= (void *)((long)dc
->do_vtable
&~DISPATCH_OBJ_CTXT_FETCH_BIT
);
425 dc
->dc_other
= dc
->dc_ctxt
;
427 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
428 _dispatch_source_set_handler
);
433 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
434 dispatch_block_t handler
)
436 dispatch_continuation_t dc
;
437 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, true);
438 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
439 _dispatch_source_set_handler
);
441 #endif /* __BLOCKS__ */
444 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
445 dispatch_function_t handler
)
447 dispatch_continuation_t dc
;
448 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, false);
449 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
450 _dispatch_source_set_handler
);
455 dispatch_source_set_registration_handler(dispatch_source_t ds
,
456 dispatch_block_t handler
)
458 dispatch_continuation_t dc
;
459 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, true);
460 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
461 _dispatch_source_set_handler
);
463 #endif /* __BLOCKS__ */
466 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
467 dispatch_function_t handler
)
469 dispatch_continuation_t dc
;
470 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, false);
471 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, dc
,
472 _dispatch_source_set_handler
);
476 #pragma mark dispatch_source_invoke
479 _dispatch_source_registration_callout(dispatch_source_t ds
)
481 dispatch_source_refs_t dr
= ds
->ds_refs
;
482 dispatch_continuation_t dc
= dr
->ds_handler
[DS_REGISTN_HANDLER
];
483 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
484 // no registration callout if source is canceled rdar://problem/8955246
485 return _dispatch_source_handler_free(dr
, DS_REGISTN_HANDLER
);
487 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(ds
->dq_priority
);
488 if ((long)dc
->do_vtable
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
489 dc
->dc_ctxt
= ds
->do_ctxt
;
491 _dispatch_continuation_pop(dc
);
492 dr
->ds_handler
[DS_REGISTN_HANDLER
] = NULL
;
493 _dispatch_reset_defaultpriority(old_dp
);
497 _dispatch_source_cancel_callout(dispatch_source_t ds
)
499 dispatch_source_refs_t dr
= ds
->ds_refs
;
500 dispatch_continuation_t dc
= dr
->ds_handler
[DS_CANCEL_HANDLER
];
501 ds
->ds_pending_data_mask
= 0;
502 ds
->ds_pending_data
= 0;
504 _dispatch_source_handler_free(dr
, DS_EVENT_HANDLER
);
505 _dispatch_source_handler_free(dr
, DS_REGISTN_HANDLER
);
509 if (!(ds
->ds_atomic_flags
& DSF_CANCELED
)) {
510 return _dispatch_source_handler_free(dr
, DS_CANCEL_HANDLER
);
512 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(ds
->dq_priority
);
513 if ((long)dc
->do_vtable
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
514 dc
->dc_ctxt
= ds
->do_ctxt
;
516 _dispatch_continuation_pop(dc
);
517 dr
->ds_handler
[DS_CANCEL_HANDLER
] = NULL
;
518 _dispatch_reset_defaultpriority(old_dp
);
522 _dispatch_source_latch_and_call(dispatch_source_t ds
)
526 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
529 dispatch_source_refs_t dr
= ds
->ds_refs
;
530 dispatch_continuation_t dc
= dr
->ds_handler
[DS_EVENT_HANDLER
];
531 prev
= dispatch_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
532 if (ds
->ds_is_level
) {
534 } else if (ds
->ds_is_timer
&& ds_timer(dr
).target
&& prev
) {
535 ds
->ds_data
= _dispatch_source_timer_data(dr
, prev
);
539 if (!dispatch_assume(prev
) || !dc
) {
542 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(ds
->dq_priority
);
543 voucher_t voucher
= dc
->dc_voucher
? _voucher_retain(dc
->dc_voucher
) : NULL
;
544 _dispatch_continuation_voucher_adopt(dc
); // consumes voucher reference
545 _dispatch_continuation_pop(dc
);
546 if (voucher
) dc
->dc_voucher
= voucher
;
547 _dispatch_reset_defaultpriority(old_dp
);
551 _dispatch_source_kevent_unregister(dispatch_source_t ds
)
553 _dispatch_object_debug(ds
, "%s", __func__
);
554 uint32_t flags
= (uint32_t)ds
->ds_pending_data_mask
;
555 dispatch_kevent_t dk
= ds
->ds_dkev
;
556 if (ds
->ds_atomic_flags
& DSF_DELETED
) {
557 dk
->dk_kevent
.flags
|= EV_DELETE
; // already deleted
558 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
);
560 if (dk
->dk_kevent
.filter
== DISPATCH_EVFILT_TIMER
) {
562 _dispatch_timers_unregister(ds
, dk
);
563 } else if (!ds
->ds_is_direct_kevent
) {
565 TAILQ_REMOVE(&dk
->dk_sources
, ds
->ds_refs
, dr_list
);
566 _dispatch_kevent_unregister(dk
, flags
, 0);
568 int dkev_dispose_options
= 0;
569 if (ds
->ds_needs_rearm
&& !(ds
->ds_atomic_flags
& DSF_ARMED
)) {
570 dkev_dispose_options
|= DKEV_DISPOSE_IMMEDIATE_DELETE
;
572 if (ds
->ds_needs_mgr
) {
573 dkev_dispose_options
|= DKEV_DISPOSE_IGNORE_ENOENT
;
574 ds
->ds_needs_mgr
= false;
576 long r
= _dispatch_kevent_unregister(dk
, flags
, dkev_dispose_options
);
577 if (r
== EINPROGRESS
) {
578 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
580 ds
->ds_pending_delete
= true;
581 return; // deferred unregistration
582 } else if (r
== ENOENT
) {
583 _dispatch_debug("kevent-source[%p]: ENOENT delete kevent[%p]",
585 ds
->ds_needs_mgr
= true;
586 return; // potential concurrent EV_DELETE delivery rdar://22047283
589 _TAILQ_TRASH_ENTRY(ds
->ds_refs
, dr_list
);
591 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
592 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, ds
->ds_dkev
);
593 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
594 _dispatch_release(ds
); // the retain is done at creation time
598 _dispatch_source_kevent_resume(dispatch_source_t ds
, uint32_t new_flags
)
600 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
601 case DISPATCH_EVFILT_TIMER
:
602 _dispatch_timers_update(ds
);
603 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
604 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
607 case EVFILT_MACHPORT
:
608 if (ds
->ds_pending_data_mask
& DISPATCH_MACH_RECV_MESSAGE
) {
609 new_flags
|= DISPATCH_MACH_RECV_MESSAGE
; // emulate EV_DISPATCH
613 if ((ds
->ds_atomic_flags
& DSF_DELETED
) ||
614 _dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0)) {
615 _dispatch_source_kevent_unregister(ds
);
620 _dispatch_source_kevent_register(dispatch_source_t ds
)
622 dispatch_assert_zero(ds
->ds_is_installed
);
623 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
624 case DISPATCH_EVFILT_TIMER
:
625 _dispatch_timers_update(ds
);
626 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
627 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
631 bool do_resume
= _dispatch_kevent_register(&ds
->ds_dkev
, &flags
);
632 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
->ds_refs
, dr_list
);
633 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
634 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, ds
->ds_dkev
);
635 if (do_resume
|| ds
->ds_needs_rearm
) {
636 _dispatch_source_kevent_resume(ds
, flags
);
638 _dispatch_object_debug(ds
, "%s", __func__
);
641 DISPATCH_ALWAYS_INLINE
642 static inline dispatch_queue_t
643 _dispatch_source_invoke2(dispatch_object_t dou
,
644 _dispatch_thread_semaphore_t
*sema_ptr DISPATCH_UNUSED
)
646 dispatch_source_t ds
= dou
._ds
;
647 if (_dispatch_queue_class_probe(ds
)) {
648 if (slowpath(_dispatch_queue_drain(ds
))) {
649 DISPATCH_CLIENT_CRASH("Sync onto source");
653 // This function performs all source actions. Each action is responsible
654 // for verifying that it takes place on the appropriate queue. If the
655 // current queue is not the correct queue for this action, the correct queue
656 // will be returned and the invoke will be re-driven on that queue.
658 // The order of tests here in invoke and in probe should be consistent.
660 dispatch_queue_t dq
= _dispatch_queue_get_current();
661 dispatch_queue_t dkq
= _dispatch_source_get_kevent_queue(ds
);
662 dispatch_source_refs_t dr
= ds
->ds_refs
;
664 if (!ds
->ds_is_installed
) {
665 // The source needs to be installed on the kevent queue.
669 _dispatch_source_kevent_register(ds
);
670 ds
->ds_is_installed
= true;
671 if (dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
672 return ds
->do_targetq
;
674 if (slowpath(ds
->do_xref_cnt
== -1)) {
675 return dkq
; // rdar://problem/9558246
677 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds
))) {
678 // Source suspended by an item drained from the source queue.
680 } else if (dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
681 // The source has been registered and the registration handler needs
682 // to be delivered on the target queue.
683 if (dq
!= ds
->do_targetq
) {
684 return ds
->do_targetq
;
686 // clears ds_registration_handler
687 _dispatch_source_registration_callout(ds
);
688 if (slowpath(ds
->do_xref_cnt
== -1)) {
689 return dkq
; // rdar://problem/9558246
691 } else if ((ds
->ds_atomic_flags
& DSF_DELETED
) && (ds
->ds_pending_delete
||
692 (ds
->ds_atomic_flags
& DSF_ONESHOT
))) {
693 // Pending source kevent unregistration has been completed
694 if (ds
->ds_needs_mgr
) {
695 dkq
= &_dispatch_mgr_q
;
700 ds
->ds_pending_delete
= false;
701 if (ds
->ds_atomic_flags
& DSF_ONESHOT
) {
702 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ONESHOT
,
706 _dispatch_source_kevent_unregister(ds
);
707 if (ds
->ds_needs_mgr
) {
708 return &_dispatch_mgr_q
;
711 if (dr
->ds_handler
[DS_EVENT_HANDLER
] ||
712 dr
->ds_handler
[DS_CANCEL_HANDLER
] ||
713 dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
714 return ds
->do_targetq
;
716 } else if (((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1))
717 && !ds
->ds_pending_delete
) {
718 // The source has been cancelled and needs to be uninstalled from the
719 // kevent queue. After uninstallation, the cancellation handler needs
720 // to be delivered to the target queue.
722 if (ds
->ds_needs_mgr
) {
723 dkq
= &_dispatch_mgr_q
;
728 _dispatch_source_kevent_unregister(ds
);
729 if (ds
->ds_needs_mgr
) {
730 return &_dispatch_mgr_q
;
732 if (ds
->ds_pending_delete
) {
733 // deferred unregistration
734 if (ds
->ds_needs_rearm
) {
740 if (dr
->ds_handler
[DS_EVENT_HANDLER
] ||
741 dr
->ds_handler
[DS_CANCEL_HANDLER
] ||
742 dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
743 if (dq
!= ds
->do_targetq
) {
744 return ds
->do_targetq
;
747 _dispatch_source_cancel_callout(ds
);
748 } else if (ds
->ds_pending_data
&& !ds
->ds_pending_delete
) {
749 // The source has pending data to deliver via the event handler callback
750 // on the target queue. Some sources need to be rearmed on the kevent
751 // queue after event delivery.
752 if (dq
!= ds
->do_targetq
) {
753 return ds
->do_targetq
;
755 _dispatch_source_latch_and_call(ds
);
756 if (ds
->ds_needs_rearm
) {
759 } else if (ds
->ds_needs_rearm
&& !(ds
->ds_atomic_flags
& DSF_ARMED
)) {
760 // The source needs to be rearmed on the kevent queue.
764 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
765 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
767 _dispatch_source_kevent_resume(ds
, 0);
775 _dispatch_source_invoke(dispatch_source_t ds
, dispatch_object_t dou
,
776 dispatch_invoke_flags_t flags
)
778 _dispatch_queue_class_invoke(ds
, dou
._dc
, flags
, _dispatch_source_invoke2
);
782 _dispatch_source_probe(dispatch_source_t ds
)
784 // This function determines whether the source needs to be invoked.
785 // The order of tests here in probe and in invoke should be consistent.
787 dispatch_source_refs_t dr
= ds
->ds_refs
;
788 if (!ds
->ds_is_installed
) {
789 // The source needs to be installed on the kevent queue.
791 } else if (dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
792 // The registration handler needs to be delivered to the target queue.
794 } else if ((ds
->ds_atomic_flags
& DSF_DELETED
) && (ds
->ds_pending_delete
||
795 (ds
->ds_atomic_flags
& DSF_ONESHOT
))) {
796 // Pending source kevent unregistration has been completed
798 } else if (((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1))
799 && !ds
->ds_pending_delete
) {
800 // The source needs to be uninstalled from the kevent queue, or the
801 // cancellation handler needs to be delivered to the target queue.
802 // Note: cancellation assumes installation.
803 if (ds
->ds_dkev
|| dr
->ds_handler
[DS_EVENT_HANDLER
] ||
804 dr
->ds_handler
[DS_CANCEL_HANDLER
] ||
805 dr
->ds_handler
[DS_REGISTN_HANDLER
]) {
808 } else if (ds
->ds_pending_data
&& !ds
->ds_pending_delete
) {
809 // The source has pending data to deliver to the target queue.
811 } else if (ds
->ds_needs_rearm
&& !(ds
->ds_atomic_flags
& DSF_ARMED
)) {
812 // The source needs to be rearmed on the kevent queue.
815 return _dispatch_queue_class_probe(ds
);
819 _dispatch_source_merge_kevent(dispatch_source_t ds
,
820 const _dispatch_kevent_qos_s
*ke
)
822 _dispatch_object_debug(ds
, "%s", __func__
);
823 bool retained
= false;
824 if ((ke
->flags
& EV_UDATA_SPECIFIC
) && (ke
->flags
& EV_ONESHOT
) &&
825 !(ke
->flags
& EV_DELETE
)) {
826 _dispatch_debug("kevent-source[%p]: deferred delete oneshot kevent[%p]",
827 ds
, (void*)ke
->udata
);
828 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ONESHOT
, relaxed
);
829 } else if ((ke
->flags
& EV_DELETE
) || (ke
->flags
& EV_ONESHOT
)) {
830 _dispatch_debug("kevent-source[%p]: delete kevent[%p]",
831 ds
, (void*)ke
->udata
);
833 _dispatch_retain(ds
);
834 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_DELETED
, relaxed
);
835 if (ke
->flags
& EV_DELETE
) goto done
;
837 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
838 goto done
; // rdar://20204025
840 if (ds
->ds_is_level
) {
841 // ke->data is signed and "negative available data" makes no sense
842 // zero bytes happens when EV_EOF is set
843 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
844 dispatch_assert(ke
->data
>= 0l);
845 dispatch_atomic_store2o(ds
, ds_pending_data
, ~(unsigned long)ke
->data
,
847 } else if (ds
->ds_is_adder
) {
848 (void)dispatch_atomic_add2o(ds
, ds_pending_data
,
849 (unsigned long)ke
->data
, relaxed
);
850 } else if (ke
->fflags
& ds
->ds_pending_data_mask
) {
851 (void)dispatch_atomic_or2o(ds
, ds_pending_data
,
852 ke
->fflags
& ds
->ds_pending_data_mask
, relaxed
);
855 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
856 if (ds
->ds_needs_rearm
) {
859 _dispatch_retain(ds
); // rdar://20382435
861 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
862 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p] ",
863 ds
, (void*)ke
->udata
);
866 _dispatch_queue_wakeup_and_release((dispatch_queue_t
)ds
);
868 _dispatch_queue_wakeup((dispatch_queue_t
)ds
);
873 #pragma mark dispatch_kevent_t
875 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
876 static void _dispatch_kevent_guard(dispatch_kevent_t dk
);
877 static void _dispatch_kevent_unguard(dispatch_kevent_t dk
);
879 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk
) { (void)dk
; }
880 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk
) { (void)dk
; }
883 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
884 static struct dispatch_kevent_s _dispatch_kevent_data_or
= {
886 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
889 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or
.dk_sources
),
891 static struct dispatch_kevent_s _dispatch_kevent_data_add
= {
893 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
895 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add
.dk_sources
),
897 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
899 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
901 DISPATCH_CACHELINE_ALIGN
902 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
905 _dispatch_kevent_init()
908 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
909 TAILQ_INIT(&_dispatch_sources
[i
]);
912 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
913 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
914 &_dispatch_kevent_data_or
, dk_list
);
915 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
916 &_dispatch_kevent_data_add
, dk_list
);
917 _dispatch_kevent_data_or
.dk_kevent
.udata
=
918 (uintptr_t)&_dispatch_kevent_data_or
;
919 _dispatch_kevent_data_add
.dk_kevent
.udata
=
920 (uintptr_t)&_dispatch_kevent_data_add
;
921 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
924 static inline uintptr_t
925 _dispatch_kevent_hash(uint64_t ident
, short filter
)
929 value
= (filter
== EVFILT_MACHPORT
||
930 filter
== DISPATCH_EVFILT_MACH_NOTIFICATION
?
931 MACH_PORT_INDEX(ident
) : ident
);
935 return DSL_HASH((uintptr_t)value
);
938 static dispatch_kevent_t
939 _dispatch_kevent_find(uint64_t ident
, short filter
)
941 uintptr_t hash
= _dispatch_kevent_hash(ident
, filter
);
942 dispatch_kevent_t dki
;
944 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
945 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
953 _dispatch_kevent_insert(dispatch_kevent_t dk
)
955 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) return;
956 _dispatch_kevent_guard(dk
);
957 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
958 dk
->dk_kevent
.filter
);
959 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
962 // Find existing kevents, and merge any new flags if necessary
964 _dispatch_kevent_register(dispatch_kevent_t
*dkp
, uint32_t *flgp
)
966 dispatch_kevent_t dk
= NULL
, ds_dkev
= *dkp
;
968 bool do_resume
= false;
970 if (!(ds_dkev
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
971 dk
= _dispatch_kevent_find(ds_dkev
->dk_kevent
.ident
,
972 ds_dkev
->dk_kevent
.filter
);
975 // If an existing dispatch kevent is found, check to see if new flags
976 // need to be added to the existing kevent
977 new_flags
= ~dk
->dk_kevent
.fflags
& ds_dkev
->dk_kevent
.fflags
;
978 dk
->dk_kevent
.fflags
|= ds_dkev
->dk_kevent
.fflags
;
981 do_resume
= new_flags
;
984 _dispatch_kevent_insert(dk
);
985 new_flags
= dk
->dk_kevent
.fflags
;
988 // Re-register the kevent with the kernel if new flags were added
989 // by the dispatch kevent
991 dk
->dk_kevent
.flags
|= EV_ADD
;
998 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
1002 switch (dk
->dk_kevent
.filter
) {
1003 case DISPATCH_EVFILT_TIMER
:
1004 case DISPATCH_EVFILT_CUSTOM_ADD
:
1005 case DISPATCH_EVFILT_CUSTOM_OR
:
1006 // these types not registered with kevent
1009 case EVFILT_MACHPORT
:
1010 return _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
1011 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1012 return _dispatch_kevent_mach_notify_resume(dk
, new_flags
, del_flags
);
1015 if (dk
->dk_kevent
.flags
& EV_DELETE
) {
1018 r
= _dispatch_kq_update(&dk
->dk_kevent
);
1019 if (r
&& (dk
->dk_kevent
.flags
& EV_ADD
) &&
1020 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1021 dk
->dk_kevent
.flags
|= EV_DELETE
;
1022 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
);
1023 } else if (dk
->dk_kevent
.flags
& EV_DISPATCH
) {
1024 dk
->dk_kevent
.flags
&= ~EV_ADD
;
1031 _dispatch_kevent_dispose(dispatch_kevent_t dk
, int options
)
1034 switch (dk
->dk_kevent
.filter
) {
1035 case DISPATCH_EVFILT_TIMER
:
1036 case DISPATCH_EVFILT_CUSTOM_ADD
:
1037 case DISPATCH_EVFILT_CUSTOM_OR
:
1038 if (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
) {
1041 // these sources live on statically allocated lists
1045 case EVFILT_MACHPORT
:
1046 _dispatch_kevent_machport_resume(dk
, 0, dk
->dk_kevent
.fflags
);
1048 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
1049 _dispatch_kevent_mach_notify_resume(dk
, 0, dk
->dk_kevent
.fflags
);
1053 if (~dk
->dk_kevent
.flags
& EV_DELETE
) {
1054 dk
->dk_kevent
.flags
|= EV_DELETE
;
1055 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
);
1056 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1057 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1059 r
= _dispatch_kq_update(&dk
->dk_kevent
);
1060 if (r
== ENOENT
&& (options
& DKEV_DISPOSE_IGNORE_ENOENT
)) {
1063 if (options
& DKEV_DISPOSE_IMMEDIATE_DELETE
) {
1064 dk
->dk_kevent
.flags
&= ~EV_ENABLE
;
1069 if ((r
== EINPROGRESS
|| r
== ENOENT
) &&
1070 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1071 // deferred EV_DELETE or concurrent concurrent EV_DELETE delivery
1072 dk
->dk_kevent
.flags
&= ~EV_DELETE
;
1073 dk
->dk_kevent
.flags
|= EV_ENABLE
;
1075 if ((dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1077 // zero/trash dr linkage
1078 dispatch_source_refs_t dr
= TAILQ_FIRST(&dk
->dk_sources
);
1079 TAILQ_REMOVE(&dk
->dk_sources
, dr
, dr_list
);
1082 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
1083 dk
->dk_kevent
.filter
);
1084 TAILQ_REMOVE(&_dispatch_sources
[hash
], dk
, dk_list
);
1086 _dispatch_kevent_unguard(dk
);
1093 _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
, int options
)
1095 dispatch_source_refs_t dri
;
1096 uint32_t del_flags
, fflags
= 0;
1099 if (TAILQ_EMPTY(&dk
->dk_sources
) ||
1100 (dk
->dk_kevent
.flags
& EV_UDATA_SPECIFIC
)) {
1101 r
= _dispatch_kevent_dispose(dk
, options
);
1103 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
1104 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
1105 uint32_t mask
= (uint32_t)dsi
->ds_pending_data_mask
;
1108 del_flags
= flg
& ~fflags
;
1110 dk
->dk_kevent
.flags
|= EV_ADD
;
1111 dk
->dk_kevent
.fflags
&= ~del_flags
;
1112 r
= _dispatch_kevent_resume(dk
, 0, del_flags
);
1120 _dispatch_kevent_proc_exit(_dispatch_kevent_qos_s
*ke
)
1122 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
1123 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
1124 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
1125 _dispatch_kevent_qos_s fake
;
1127 fake
.flags
&= ~EV_ERROR
;
1128 fake
.fflags
= NOTE_EXIT
;
1130 _dispatch_kevent_drain(&fake
);
1135 _dispatch_kevent_error(_dispatch_kevent_qos_s
*ke
)
1137 _dispatch_kevent_debug(ke
, __func__
);
1139 // log the unexpected error
1140 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
1141 ke
->flags
& EV_DELETE
? "delete" :
1142 ke
->flags
& EV_ADD
? "add" :
1143 ke
->flags
& EV_ENABLE
? "enable" : "monitor",
1149 _dispatch_kevent_drain(_dispatch_kevent_qos_s
*ke
)
1152 static dispatch_once_t pred
;
1153 dispatch_once_f(&pred
, NULL
, _dispatch_kevent_debugger
);
1155 if (ke
->filter
== EVFILT_USER
) {
1156 _dispatch_kevent_mgr_debug(ke
, __func__
);
1159 if (slowpath(ke
->flags
& EV_ERROR
)) {
1160 if (ke
->filter
== EVFILT_PROC
&& ke
->data
== ESRCH
) {
1161 ke
->data
= 0; // don't return error from caller
1162 if (ke
->flags
& EV_DELETE
) {
1163 _dispatch_debug("kevent[0x%llx]: ignoring ESRCH from "
1164 "EVFILT_PROC EV_DELETE", ke
->udata
);
1167 _dispatch_debug("kevent[0x%llx]: ESRCH from EVFILT_PROC: "
1168 "generating fake NOTE_EXIT", ke
->udata
);
1169 return _dispatch_kevent_proc_exit(ke
);
1171 return _dispatch_kevent_error(ke
);
1173 if (ke
->filter
== EVFILT_TIMER
) {
1174 return _dispatch_timers_kevent(ke
);
1177 if (ke
->filter
== EVFILT_MACHPORT
) {
1178 return _dispatch_kevent_mach_portset(ke
);
1181 return _dispatch_kevent_merge(ke
);
1186 _dispatch_kevent_merge(_dispatch_kevent_qos_s
*ke
)
1188 _dispatch_kevent_debug(ke
, __func__
);
1189 dispatch_kevent_t dk
;
1190 dispatch_source_refs_t dri
, dr_next
;
1192 dk
= (void*)ke
->udata
;
1193 dispatch_assert(dk
);
1195 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
1196 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri
), ke
);
1200 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1202 _dispatch_kevent_guard(dispatch_kevent_t dk
)
1205 const unsigned int guard_flags
= GUARD_CLOSE
;
1206 int r
, fd_flags
= 0;
1207 switch (dk
->dk_kevent
.filter
) {
1211 guard
= &dk
->dk_kevent
;
1212 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, NULL
, 0,
1213 &guard
, guard_flags
, &fd_flags
);
1214 if (slowpath(r
== -1)) {
1217 (void)dispatch_assume_zero(err
);
1221 dk
->dk_kevent
.ext
[0] = guard_flags
;
1222 dk
->dk_kevent
.ext
[1] = fd_flags
;
1228 _dispatch_kevent_unguard(dispatch_kevent_t dk
)
1231 unsigned int guard_flags
;
1233 switch (dk
->dk_kevent
.filter
) {
1237 guard_flags
= (unsigned int)dk
->dk_kevent
.ext
[0];
1241 guard
= &dk
->dk_kevent
;
1242 fd_flags
= (int)dk
->dk_kevent
.ext
[1];
1243 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, &guard
,
1244 guard_flags
, NULL
, 0, &fd_flags
);
1245 if (slowpath(r
== -1)) {
1246 (void)dispatch_assume_zero(errno
);
1249 dk
->dk_kevent
.ext
[0] = 0;
1253 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1256 #pragma mark dispatch_source_timer
1258 #if DISPATCH_USE_DTRACE
1259 static dispatch_source_refs_t
1260 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1261 #define _dispatch_trace_next_timer_set(x, q) \
1262 _dispatch_trace_next_timer[(q)] = (x)
1263 #define _dispatch_trace_next_timer_program(d, q) \
1264 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1265 #define _dispatch_trace_next_timer_wake(q) \
1266 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1268 #define _dispatch_trace_next_timer_set(x, q)
1269 #define _dispatch_trace_next_timer_program(d, q)
1270 #define _dispatch_trace_next_timer_wake(q)
1273 #define _dispatch_source_timer_telemetry_enabled() false
1277 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1278 uintptr_t ident
, struct dispatch_timer_source_s
*values
)
1280 if (_dispatch_trace_timer_configure_enabled()) {
1281 _dispatch_trace_timer_configure(ds
, ident
, values
);
1285 DISPATCH_ALWAYS_INLINE
1287 _dispatch_source_timer_telemetry(dispatch_source_t ds
, uintptr_t ident
,
1288 struct dispatch_timer_source_s
*values
)
1290 if (_dispatch_trace_timer_configure_enabled() ||
1291 _dispatch_source_timer_telemetry_enabled()) {
1292 _dispatch_source_timer_telemetry_slow(ds
, ident
, values
);
1293 asm(""); // prevent tailcall
1297 // approx 1 year (60s * 60m * 24h * 365d)
1298 #define FOREVER_NSEC 31536000000000000ull
1300 DISPATCH_ALWAYS_INLINE
1301 static inline uint64_t
1302 _dispatch_source_timer_now(uint64_t nows
[], unsigned int tidx
)
1304 unsigned int tk
= DISPATCH_TIMER_KIND(tidx
);
1305 if (nows
&& fastpath(nows
[tk
])) {
1310 case DISPATCH_TIMER_KIND_MACH
:
1311 now
= _dispatch_absolute_time();
1313 case DISPATCH_TIMER_KIND_WALL
:
1314 now
= _dispatch_get_nanoseconds();
1323 static inline unsigned long
1324 _dispatch_source_timer_data(dispatch_source_refs_t dr
, unsigned long prev
)
1326 // calculate the number of intervals since last fire
1327 unsigned long data
, missed
;
1329 now
= _dispatch_source_timer_now(NULL
, _dispatch_source_timer_idx(dr
));
1330 missed
= (unsigned long)((now
- ds_timer(dr
).last_fire
) /
1331 ds_timer(dr
).interval
);
1332 // correct for missed intervals already delivered last time
1333 data
= prev
- ds_timer(dr
).missed
+ missed
;
1334 ds_timer(dr
).missed
= missed
;
1338 struct dispatch_set_timer_params
{
1339 dispatch_source_t ds
;
1341 struct dispatch_timer_source_s values
;
1345 _dispatch_source_set_timer3(void *context
)
1347 // Called on the _dispatch_mgr_q
1348 struct dispatch_set_timer_params
*params
= context
;
1349 dispatch_source_t ds
= params
->ds
;
1350 ds
->ds_ident_hack
= params
->ident
;
1351 ds_timer(ds
->ds_refs
) = params
->values
;
1352 // Clear any pending data that might have accumulated on
1353 // older timer params <rdar://problem/8574886>
1354 ds
->ds_pending_data
= 0;
1355 // Re-arm in case we got disarmed because of pending set_timer suspension
1356 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, release
);
1357 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
, ds
->ds_dkev
);
1358 dispatch_resume(ds
);
1359 // Must happen after resume to avoid getting disarmed due to suspension
1360 _dispatch_timers_update(ds
);
1361 dispatch_release(ds
);
1362 if (params
->values
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1363 _dispatch_mach_host_calendar_change_register();
1369 _dispatch_source_set_timer2(void *context
)
1371 // Called on the source queue
1372 struct dispatch_set_timer_params
*params
= context
;
1373 dispatch_suspend(params
->ds
);
1374 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q
, params
,
1375 _dispatch_source_set_timer3
);
1379 static struct dispatch_set_timer_params
*
1380 _dispatch_source_timer_params(dispatch_source_t ds
, dispatch_time_t start
,
1381 uint64_t interval
, uint64_t leeway
)
1383 struct dispatch_set_timer_params
*params
;
1384 params
= _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params
));
1386 params
->values
.flags
= ds_timer(ds
->ds_refs
).flags
;
1388 if (interval
== 0) {
1389 // we use zero internally to mean disabled
1391 } else if ((int64_t)interval
< 0) {
1392 // 6866347 - make sure nanoseconds won't overflow
1393 interval
= INT64_MAX
;
1395 if ((int64_t)leeway
< 0) {
1398 if (start
== DISPATCH_TIME_NOW
) {
1399 start
= _dispatch_absolute_time();
1400 } else if (start
== DISPATCH_TIME_FOREVER
) {
1404 if ((int64_t)start
< 0) {
1406 start
= (dispatch_time_t
)-((int64_t)start
);
1407 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1410 interval
= _dispatch_time_nano2mach(interval
);
1412 // rdar://problem/7287561 interval must be at least one in
1413 // in order to avoid later division by zero when calculating
1414 // the missed interval count. (NOTE: the wall clock's
1415 // interval is already "fixed" to be 1 or more)
1418 leeway
= _dispatch_time_nano2mach(leeway
);
1419 params
->values
.flags
&= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK
;
1421 params
->ident
= DISPATCH_TIMER_IDENT(params
->values
.flags
);
1422 params
->values
.target
= start
;
1423 params
->values
.deadline
= (start
< UINT64_MAX
- leeway
) ?
1424 start
+ leeway
: UINT64_MAX
;
1425 params
->values
.interval
= interval
;
1426 params
->values
.leeway
= (interval
== INT64_MAX
|| leeway
< interval
/ 2) ?
1427 leeway
: interval
/ 2;
1431 DISPATCH_ALWAYS_INLINE
1433 _dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1434 uint64_t interval
, uint64_t leeway
, bool source_sync
)
1436 if (slowpath(!ds
->ds_is_timer
) ||
1437 slowpath(ds_timer(ds
->ds_refs
).flags
& DISPATCH_TIMER_INTERVAL
)) {
1438 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1441 struct dispatch_set_timer_params
*params
;
1442 params
= _dispatch_source_timer_params(ds
, start
, interval
, leeway
);
1444 _dispatch_source_timer_telemetry(ds
, params
->ident
, ¶ms
->values
);
1445 // Suspend the source so that it doesn't fire with pending changes
1446 // The use of suspend/resume requires the external retain/release
1447 dispatch_retain(ds
);
1449 return _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, params
,
1450 _dispatch_source_set_timer2
);
1452 return _dispatch_source_set_timer2(params
);
1457 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1458 uint64_t interval
, uint64_t leeway
)
1460 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, true);
1464 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds
,
1465 dispatch_time_t start
, uint64_t interval
, uint64_t leeway
)
1467 // Don't serialize through the source queue for CF timers <rdar://13833190>
1468 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, false);
1472 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1474 dispatch_source_refs_t dr
= ds
->ds_refs
;
1475 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1476 const bool animation
= ds_timer(dr
).flags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1477 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1478 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1479 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1481 interval
= FOREVER_NSEC
;
1483 interval
= _dispatch_time_nano2mach(interval
);
1484 uint64_t target
= _dispatch_absolute_time() + interval
;
1485 target
= (target
/ interval
) * interval
;
1486 const uint64_t leeway
= animation
?
1487 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1488 ds_timer(dr
).target
= target
;
1489 ds_timer(dr
).deadline
= target
+ leeway
;
1490 ds_timer(dr
).interval
= interval
;
1491 ds_timer(dr
).leeway
= leeway
;
1492 _dispatch_source_timer_telemetry(ds
, ds
->ds_ident_hack
, &ds_timer(dr
));
1496 #pragma mark dispatch_timers
1498 #define DISPATCH_TIMER_STRUCT(refs) \
1499 uint64_t target, deadline; \
1500 TAILQ_HEAD(, refs) dt_sources
1502 typedef struct dispatch_timer_s
{
1503 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s
);
1504 } *dispatch_timer_t
;
1506 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1508 .target = UINT64_MAX, \
1509 .deadline = UINT64_MAX, \
1510 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1511 _dispatch_timer[tidx].dt_sources), \
1513 #define DISPATCH_TIMER_INIT(kind, qos) \
1514 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1515 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1517 struct dispatch_timer_s _dispatch_timer
[] = {
1518 DISPATCH_TIMER_INIT(WALL
, NORMAL
),
1519 DISPATCH_TIMER_INIT(WALL
, CRITICAL
),
1520 DISPATCH_TIMER_INIT(WALL
, BACKGROUND
),
1521 DISPATCH_TIMER_INIT(MACH
, NORMAL
),
1522 DISPATCH_TIMER_INIT(MACH
, CRITICAL
),
1523 DISPATCH_TIMER_INIT(MACH
, BACKGROUND
),
1525 #define DISPATCH_TIMER_COUNT \
1526 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1528 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1529 (uintptr_t)&_dispatch_kevent_timer[tidx]
1531 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1532 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1534 // dynamic initialization in _dispatch_timers_init()
1535 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1538 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1542 .filter = DISPATCH_EVFILT_TIMER, \
1543 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1545 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1546 _dispatch_kevent_timer[tidx].dk_sources), \
1548 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1549 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1550 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1552 struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
1553 DISPATCH_KEVENT_TIMER_INIT(WALL
, NORMAL
),
1554 DISPATCH_KEVENT_TIMER_INIT(WALL
, CRITICAL
),
1555 DISPATCH_KEVENT_TIMER_INIT(WALL
, BACKGROUND
),
1556 DISPATCH_KEVENT_TIMER_INIT(MACH
, NORMAL
),
1557 DISPATCH_KEVENT_TIMER_INIT(MACH
, CRITICAL
),
1558 DISPATCH_KEVENT_TIMER_INIT(MACH
, BACKGROUND
),
1559 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM
),
1561 #define DISPATCH_KEVENT_TIMER_COUNT \
1562 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1564 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1565 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1567 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1568 .filter = EVFILT_TIMER, \
1569 .flags = EV_ONESHOT, \
1570 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1572 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1573 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1575 _dispatch_kevent_qos_s _dispatch_kevent_timeout
[] = {
1576 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL
, 0),
1577 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL
, NOTE_CRITICAL
),
1578 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND
, NOTE_BACKGROUND
),
1581 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1582 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1584 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
1585 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
1586 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
1587 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
1590 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1591 typeof(dr) dri = NULL; typeof(dt) dti; \
1592 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1593 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1594 if (ds_timer(dr).target < ds_timer(dri).target) { \
1598 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1599 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1604 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1606 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1610 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1612 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1616 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1618 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1619 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1621 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1624 #define _dispatch_timers_check(dra, dta) ({ \
1625 unsigned int qosm = _dispatch_timers_qos_mask; \
1626 bool update = false; \
1627 unsigned int tidx; \
1628 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1629 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1632 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1633 TAILQ_FIRST(&dra[tidx].dk_sources); \
1634 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1635 TAILQ_FIRST(&dta[tidx].dt_sources); \
1636 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1637 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1638 if (target != dta[tidx].target) { \
1639 dta[tidx].target = target; \
1642 if (deadline != dta[tidx].deadline) { \
1643 dta[tidx].deadline = deadline; \
1649 static bool _dispatch_timers_reconfigure
, _dispatch_timer_expired
;
1650 static unsigned int _dispatch_timers_qos_mask
;
1651 static bool _dispatch_timers_force_max_leeway
;
1654 _dispatch_timers_init(void)
1658 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1659 _dispatch_kevent_timer
[tidx
].dk_kevent
.udata
= \
1660 DISPATCH_KEVENT_TIMER_UDATA(tidx
);
1663 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
1664 _dispatch_timers_force_max_leeway
= true;
1669 _dispatch_timers_unregister(dispatch_source_t ds
, dispatch_kevent_t dk
)
1671 dispatch_source_refs_t dr
= ds
->ds_refs
;
1672 unsigned int tidx
= (unsigned int)dk
->dk_kevent
.ident
;
1674 if (slowpath(ds_timer_aggregate(ds
))) {
1675 _dispatch_timer_aggregates_unregister(ds
, tidx
);
1677 _dispatch_timers_remove(tidx
, dk
, _dispatch_kevent_timer
, dr
, dr_list
,
1678 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
1679 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1680 _dispatch_timers_reconfigure
= true;
1681 _dispatch_timers_qos_mask
|= 1 << DISPATCH_TIMER_QOS(tidx
);
1685 // Updates the ordered list of timers based on next fire date for changes to ds.
1686 // Should only be called from the context of _dispatch_mgr_q.
1688 _dispatch_timers_update(dispatch_source_t ds
)
1690 dispatch_kevent_t dk
= ds
->ds_dkev
;
1691 dispatch_source_refs_t dr
= ds
->ds_refs
;
1694 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1696 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1697 if (slowpath(!dk
)) {
1700 // Move timers that are disabled, suspended or have missed intervals to the
1701 // disarmed list, rearm after resume resp. source invoke will reenable them
1702 if (!ds_timer(dr
).target
|| DISPATCH_OBJECT_SUSPENDED(ds
) ||
1703 ds
->ds_pending_data
) {
1704 tidx
= DISPATCH_TIMER_INDEX_DISARM
;
1705 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
1706 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
,
1709 tidx
= _dispatch_source_timer_idx(dr
);
1711 if (slowpath(ds_timer_aggregate(ds
))) {
1712 _dispatch_timer_aggregates_register(ds
);
1714 if (slowpath(!ds
->ds_is_installed
)) {
1715 ds
->ds_is_installed
= true;
1716 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1717 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
1718 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
,
1721 _dispatch_object_debug(ds
, "%s", __func__
);
1725 _dispatch_timers_unregister(ds
, dk
);
1727 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1728 _dispatch_timers_reconfigure
= true;
1729 _dispatch_timers_qos_mask
|= 1 << DISPATCH_TIMER_QOS(tidx
);
1731 if (dk
!= &_dispatch_kevent_timer
[tidx
]){
1732 ds
->ds_dkev
= &_dispatch_kevent_timer
[tidx
];
1734 _dispatch_timers_insert(tidx
, _dispatch_kevent_timer
, dr
, dr_list
,
1735 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
1736 if (slowpath(ds_timer_aggregate(ds
))) {
1737 _dispatch_timer_aggregates_update(ds
, tidx
);
1742 _dispatch_timers_run2(uint64_t nows
[], unsigned int tidx
)
1744 dispatch_source_refs_t dr
;
1745 dispatch_source_t ds
;
1746 uint64_t now
, missed
;
1748 now
= _dispatch_source_timer_now(nows
, tidx
);
1749 while ((dr
= TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
))) {
1750 ds
= _dispatch_source_from_refs(dr
);
1751 // We may find timers on the wrong list due to a pending update from
1752 // dispatch_source_set_timer. Force an update of the list in that case.
1753 if (tidx
!= ds
->ds_ident_hack
) {
1754 _dispatch_timers_update(ds
);
1757 if (!ds_timer(dr
).target
) {
1758 // No configured timers on the list
1761 if (ds_timer(dr
).target
> now
) {
1762 // Done running timers for now.
1765 // Remove timers that are suspended or have missed intervals from the
1766 // list, rearm after resume resp. source invoke will reenable them
1767 if (DISPATCH_OBJECT_SUSPENDED(ds
) || ds
->ds_pending_data
) {
1768 _dispatch_timers_update(ds
);
1771 // Calculate number of missed intervals.
1772 missed
= (now
- ds_timer(dr
).target
) / ds_timer(dr
).interval
;
1773 if (++missed
> INT_MAX
) {
1776 if (ds_timer(dr
).interval
< INT64_MAX
) {
1777 ds_timer(dr
).target
+= missed
* ds_timer(dr
).interval
;
1778 ds_timer(dr
).deadline
= ds_timer(dr
).target
+ ds_timer(dr
).leeway
;
1780 ds_timer(dr
).target
= UINT64_MAX
;
1781 ds_timer(dr
).deadline
= UINT64_MAX
;
1783 _dispatch_timers_update(ds
);
1784 ds_timer(dr
).last_fire
= now
;
1787 data
= dispatch_atomic_add2o(ds
, ds_pending_data
,
1788 (unsigned long)missed
, relaxed
);
1789 _dispatch_trace_timer_fire(dr
, data
, (unsigned long)missed
);
1790 _dispatch_wakeup(ds
);
1796 _dispatch_timers_run(uint64_t nows
[])
1799 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1800 if (!TAILQ_EMPTY(&_dispatch_kevent_timer
[tidx
].dk_sources
)) {
1801 _dispatch_timers_run2(nows
, tidx
);
1806 static inline unsigned int
1807 _dispatch_timers_get_delay(uint64_t nows
[], struct dispatch_timer_s timer
[],
1808 uint64_t *delay
, uint64_t *leeway
, int qos
)
1810 unsigned int tidx
, ridx
= DISPATCH_TIMER_COUNT
;
1811 uint64_t tmp
, delta
= UINT64_MAX
, dldelta
= UINT64_MAX
;
1813 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1814 if (qos
>= 0 && qos
!= DISPATCH_TIMER_QOS(tidx
)){
1817 uint64_t target
= timer
[tidx
].target
;
1818 if (target
== UINT64_MAX
) {
1821 uint64_t deadline
= timer
[tidx
].deadline
;
1823 // Timer pre-coalescing <rdar://problem/13222034>
1824 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
1825 uint64_t latest
= deadline
> window
? deadline
- window
: 0;
1826 dispatch_source_refs_t dri
;
1827 TAILQ_FOREACH(dri
, &_dispatch_kevent_timer
[tidx
].dk_sources
,
1829 tmp
= ds_timer(dri
).target
;
1830 if (tmp
> latest
) break;
1834 uint64_t now
= _dispatch_source_timer_now(nows
, tidx
);
1835 if (target
<= now
) {
1840 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
1841 tmp
= _dispatch_time_mach2nano(tmp
);
1843 if (tmp
< INT64_MAX
&& tmp
< delta
) {
1847 dispatch_assert(target
<= deadline
);
1848 tmp
= deadline
- now
;
1849 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
1850 tmp
= _dispatch_time_mach2nano(tmp
);
1852 if (tmp
< INT64_MAX
&& tmp
< dldelta
) {
1857 *leeway
= delta
&& delta
< UINT64_MAX
? dldelta
- delta
: UINT64_MAX
;
1862 _dispatch_timers_program2(uint64_t nows
[], _dispatch_kevent_qos_s
*ke
,
1867 uint64_t delay
, leeway
;
1869 tidx
= _dispatch_timers_get_delay(nows
, _dispatch_timer
, &delay
, &leeway
,
1871 poll
= (delay
== 0);
1872 if (poll
|| delay
== UINT64_MAX
) {
1873 _dispatch_trace_next_timer_set(NULL
, qos
);
1878 ke
->flags
|= EV_DELETE
;
1879 ke
->flags
&= ~(EV_ADD
|EV_ENABLE
);
1881 _dispatch_trace_next_timer_set(
1882 TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
), qos
);
1883 _dispatch_trace_next_timer_program(delay
, qos
);
1884 delay
+= _dispatch_source_timer_now(nows
, DISPATCH_TIMER_KIND_WALL
);
1885 if (slowpath(_dispatch_timers_force_max_leeway
)) {
1886 ke
->data
= (int64_t)(delay
+ leeway
);
1889 ke
->data
= (int64_t)delay
;
1890 ke
->ext
[1] = leeway
;
1892 ke
->flags
|= EV_ADD
|EV_ENABLE
;
1893 ke
->flags
&= ~EV_DELETE
;
1895 _dispatch_kq_update(ke
);
1901 _dispatch_timers_program(uint64_t nows
[])
1904 unsigned int qos
, qosm
= _dispatch_timers_qos_mask
;
1905 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
1906 if (!(qosm
& 1 << qos
)){
1909 poll
|= _dispatch_timers_program2(nows
, &_dispatch_kevent_timeout
[qos
],
1917 _dispatch_timers_configure(void)
1919 _dispatch_timer_aggregates_check();
1920 // Find out if there is a new target/deadline on the timer lists
1921 return _dispatch_timers_check(_dispatch_kevent_timer
, _dispatch_timer
);
1925 _dispatch_timers_calendar_change(void)
1927 // calendar change may have gone past the wallclock deadline
1928 _dispatch_timer_expired
= true;
1929 _dispatch_timers_qos_mask
= ~0u;
1933 _dispatch_timers_kevent(_dispatch_kevent_qos_s
*ke
)
1935 _dispatch_kevent_debug(ke
, __func__
);
1936 dispatch_assert(ke
->data
> 0);
1937 dispatch_assert((ke
->ident
& DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
) ==
1938 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
);
1939 unsigned int qos
= ke
->ident
& ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
;
1940 dispatch_assert(qos
< DISPATCH_TIMER_QOS_COUNT
);
1941 dispatch_assert(_dispatch_kevent_timeout
[qos
].data
);
1942 _dispatch_kevent_timeout
[qos
].data
= 0; // kevent deleted via EV_ONESHOT
1943 _dispatch_timer_expired
= true;
1944 _dispatch_timers_qos_mask
|= 1 << qos
;
1945 _dispatch_trace_next_timer_wake(qos
);
1949 _dispatch_mgr_timers(void)
1951 uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
] = {};
1952 bool expired
= slowpath(_dispatch_timer_expired
);
1954 _dispatch_timers_run(nows
);
1956 bool reconfigure
= slowpath(_dispatch_timers_reconfigure
);
1957 if (reconfigure
|| expired
) {
1959 reconfigure
= _dispatch_timers_configure();
1960 _dispatch_timers_reconfigure
= false;
1962 if (reconfigure
|| expired
) {
1963 expired
= _dispatch_timer_expired
= _dispatch_timers_program(nows
);
1964 expired
= expired
|| _dispatch_mgr_q
.dq_items_tail
;
1966 _dispatch_timers_qos_mask
= 0;
1972 #pragma mark dispatch_timer_aggregate
1975 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s
) dk_sources
;
1976 } dispatch_timer_aggregate_refs_s
;
1978 typedef struct dispatch_timer_aggregate_s
{
1979 DISPATCH_STRUCT_HEADER(queue
);
1980 DISPATCH_QUEUE_HEADER
;
1981 TAILQ_ENTRY(dispatch_timer_aggregate_s
) dta_list
;
1982 dispatch_timer_aggregate_refs_s
1983 dta_kevent_timer
[DISPATCH_KEVENT_TIMER_COUNT
];
1985 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s
);
1986 } dta_timer
[DISPATCH_TIMER_COUNT
];
1987 struct dispatch_timer_s dta_timer_data
[DISPATCH_TIMER_COUNT
];
1988 unsigned int dta_refcount
;
1989 } dispatch_timer_aggregate_s
;
1991 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s
) dispatch_timer_aggregates_s
;
1992 static dispatch_timer_aggregates_s _dispatch_timer_aggregates
=
1993 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates
);
1995 dispatch_timer_aggregate_t
1996 dispatch_timer_aggregate_create(void)
1999 dispatch_timer_aggregate_t dta
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
2000 sizeof(struct dispatch_timer_aggregate_s
));
2001 _dispatch_queue_init((dispatch_queue_t
)dta
);
2002 dta
->do_targetq
= _dispatch_get_root_queue(
2003 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
2004 dta
->dq_width
= DISPATCH_QUEUE_WIDTH_MAX
;
2005 //FIXME: aggregates need custom vtable
2006 //dta->dq_label = "timer-aggregate";
2007 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMER_COUNT
; tidx
++) {
2008 TAILQ_INIT(&dta
->dta_kevent_timer
[tidx
].dk_sources
);
2010 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2011 TAILQ_INIT(&dta
->dta_timer
[tidx
].dt_sources
);
2012 dta
->dta_timer
[tidx
].target
= UINT64_MAX
;
2013 dta
->dta_timer
[tidx
].deadline
= UINT64_MAX
;
2014 dta
->dta_timer_data
[tidx
].target
= UINT64_MAX
;
2015 dta
->dta_timer_data
[tidx
].deadline
= UINT64_MAX
;
2017 return (dispatch_timer_aggregate_t
)_dispatch_introspection_queue_create(
2018 (dispatch_queue_t
)dta
);
2021 typedef struct dispatch_timer_delay_s
{
2022 dispatch_timer_t timer
;
2023 uint64_t delay
, leeway
;
2024 } *dispatch_timer_delay_t
;
2027 _dispatch_timer_aggregate_get_delay(void *ctxt
)
2029 dispatch_timer_delay_t dtd
= ctxt
;
2030 struct { uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
]; } dtn
= {};
2031 _dispatch_timers_get_delay(dtn
.nows
, dtd
->timer
, &dtd
->delay
, &dtd
->leeway
,
2036 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta
,
2037 uint64_t *leeway_ptr
)
2039 struct dispatch_timer_delay_s dtd
= {
2040 .timer
= dta
->dta_timer_data
,
2042 dispatch_sync_f((dispatch_queue_t
)dta
, &dtd
,
2043 _dispatch_timer_aggregate_get_delay
);
2045 *leeway_ptr
= dtd
.leeway
;
2051 _dispatch_timer_aggregate_update(void *ctxt
)
2053 dispatch_timer_aggregate_t dta
= (void*)_dispatch_queue_get_current();
2054 dispatch_timer_t dtau
= ctxt
;
2056 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2057 dta
->dta_timer_data
[tidx
].target
= dtau
[tidx
].target
;
2058 dta
->dta_timer_data
[tidx
].deadline
= dtau
[tidx
].deadline
;
2065 _dispatch_timer_aggregates_configure(void)
2067 dispatch_timer_aggregate_t dta
;
2068 dispatch_timer_t dtau
;
2069 TAILQ_FOREACH(dta
, &_dispatch_timer_aggregates
, dta_list
) {
2070 if (!_dispatch_timers_check(dta
->dta_kevent_timer
, dta
->dta_timer
)) {
2073 dtau
= _dispatch_calloc(DISPATCH_TIMER_COUNT
, sizeof(*dtau
));
2074 memcpy(dtau
, dta
->dta_timer
, sizeof(dta
->dta_timer
));
2075 _dispatch_barrier_async_detached_f((dispatch_queue_t
)dta
, dtau
,
2076 _dispatch_timer_aggregate_update
);
2081 _dispatch_timer_aggregates_check(void)
2083 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates
))) {
2086 _dispatch_timer_aggregates_configure();
2090 _dispatch_timer_aggregates_register(dispatch_source_t ds
)
2092 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2093 if (!dta
->dta_refcount
++) {
2094 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates
, dta
, dta_list
);
2100 _dispatch_timer_aggregates_update(dispatch_source_t ds
, unsigned int tidx
)
2102 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2103 dispatch_timer_source_aggregate_refs_t dr
;
2104 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2105 _dispatch_timers_insert(tidx
, dta
->dta_kevent_timer
, dr
, dra_list
,
2106 dta
->dta_timer
, dr
, dta_list
);
2111 _dispatch_timer_aggregates_unregister(dispatch_source_t ds
, unsigned int tidx
)
2113 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
2114 dispatch_timer_source_aggregate_refs_t dr
;
2115 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
2116 _dispatch_timers_remove(tidx
, (dispatch_timer_aggregate_refs_s
*)NULL
,
2117 dta
->dta_kevent_timer
, dr
, dra_list
, dta
->dta_timer
, dr
, dta_list
);
2118 if (!--dta
->dta_refcount
) {
2119 TAILQ_REMOVE(&_dispatch_timer_aggregates
, dta
, dta_list
);
2124 #pragma mark dispatch_select
2126 static int _dispatch_kq
;
2128 #if DISPATCH_USE_SELECT_FALLBACK
2130 static unsigned int _dispatch_select_workaround
;
2131 static fd_set _dispatch_rfds
;
2132 static fd_set _dispatch_wfds
;
2133 static uint64_t*_dispatch_rfd_ptrs
;
2134 static uint64_t*_dispatch_wfd_ptrs
;
2138 _dispatch_select_register(const _dispatch_kevent_qos_s
*kev
)
2140 // Must execute on manager queue
2141 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2143 // If an EINVAL or ENOENT error occurred while adding/enabling a read or
2144 // write kevent, assume it was due to a type of filedescriptor not
2145 // supported by kqueue and fall back to select
2146 switch (kev
->filter
) {
2148 if ((kev
->data
== EINVAL
|| kev
->data
== ENOENT
) &&
2149 dispatch_assume(kev
->ident
< FD_SETSIZE
)) {
2150 FD_SET((int)kev
->ident
, &_dispatch_rfds
);
2151 if (slowpath(!_dispatch_rfd_ptrs
)) {
2152 _dispatch_rfd_ptrs
= _dispatch_calloc(FD_SETSIZE
,
2153 sizeof(*_dispatch_rfd_ptrs
));
2155 if (!_dispatch_rfd_ptrs
[kev
->ident
]) {
2156 _dispatch_rfd_ptrs
[kev
->ident
] = kev
->udata
;
2157 _dispatch_select_workaround
++;
2158 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2159 (int)kev
->ident
, (long)kev
->data
);
2165 if ((kev
->data
== EINVAL
|| kev
->data
== ENOENT
) &&
2166 dispatch_assume(kev
->ident
< FD_SETSIZE
)) {
2167 FD_SET((int)kev
->ident
, &_dispatch_wfds
);
2168 if (slowpath(!_dispatch_wfd_ptrs
)) {
2169 _dispatch_wfd_ptrs
= _dispatch_calloc(FD_SETSIZE
,
2170 sizeof(*_dispatch_wfd_ptrs
));
2172 if (!_dispatch_wfd_ptrs
[kev
->ident
]) {
2173 _dispatch_wfd_ptrs
[kev
->ident
] = kev
->udata
;
2174 _dispatch_select_workaround
++;
2175 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2176 (int)kev
->ident
, (long)kev
->data
);
2187 _dispatch_select_unregister(const _dispatch_kevent_qos_s
*kev
)
2189 // Must execute on manager queue
2190 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2192 switch (kev
->filter
) {
2194 if (_dispatch_rfd_ptrs
&& kev
->ident
< FD_SETSIZE
&&
2195 _dispatch_rfd_ptrs
[kev
->ident
]) {
2196 FD_CLR((int)kev
->ident
, &_dispatch_rfds
);
2197 _dispatch_rfd_ptrs
[kev
->ident
] = 0;
2198 _dispatch_select_workaround
--;
2203 if (_dispatch_wfd_ptrs
&& kev
->ident
< FD_SETSIZE
&&
2204 _dispatch_wfd_ptrs
[kev
->ident
]) {
2205 FD_CLR((int)kev
->ident
, &_dispatch_wfds
);
2206 _dispatch_wfd_ptrs
[kev
->ident
] = 0;
2207 _dispatch_select_workaround
--;
2217 _dispatch_mgr_select(bool poll
)
2219 static const struct timeval timeout_immediately
= { 0, 0 };
2220 fd_set tmp_rfds
, tmp_wfds
;
2222 bool kevent_avail
= false;
2224 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
2225 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
2227 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
,
2228 poll
? (struct timeval
*)&timeout_immediately
: NULL
);
2229 if (slowpath(r
== -1)) {
2233 (void)dispatch_assume_zero(err
);
2237 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2238 if (i
== _dispatch_kq
) {
2241 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
, &_dispatch_wfds
)){
2245 if (dispatch_assume(r
!= -1)) {
2248 if (_dispatch_rfd_ptrs
&& _dispatch_rfd_ptrs
[i
]) {
2249 FD_CLR(i
, &_dispatch_rfds
);
2250 _dispatch_rfd_ptrs
[i
] = 0;
2251 _dispatch_select_workaround
--;
2253 if (_dispatch_wfd_ptrs
&& _dispatch_wfd_ptrs
[i
]) {
2254 FD_CLR(i
, &_dispatch_wfds
);
2255 _dispatch_wfd_ptrs
[i
] = 0;
2256 _dispatch_select_workaround
--;
2263 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2264 if (FD_ISSET(i
, &tmp_rfds
)) {
2265 if (i
== _dispatch_kq
) {
2266 kevent_avail
= true;
2269 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISPATCH
2270 _dispatch_kevent_qos_s kev
= {
2271 .ident
= (uint64_t)i
,
2272 .filter
= EVFILT_READ
,
2273 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2275 .udata
= _dispatch_rfd_ptrs
[i
],
2277 _dispatch_kevent_drain(&kev
);
2279 if (FD_ISSET(i
, &tmp_wfds
)) {
2280 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISPATCH
2281 _dispatch_kevent_qos_s kev
= {
2282 .ident
= (uint64_t)i
,
2283 .filter
= EVFILT_WRITE
,
2284 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2286 .udata
= _dispatch_wfd_ptrs
[i
],
2288 _dispatch_kevent_drain(&kev
);
2292 return kevent_avail
;
2295 #endif // DISPATCH_USE_SELECT_FALLBACK
2298 #pragma mark dispatch_kqueue
2301 _dispatch_kq_init(void *context DISPATCH_UNUSED
)
2303 static const _dispatch_kevent_qos_s kev
= {
2305 .filter
= EVFILT_USER
,
2306 .flags
= EV_ADD
|EV_CLEAR
,
2309 _dispatch_safe_fork
= false;
2310 #if DISPATCH_USE_GUARDED_FD
2311 guardid_t guard
= (uintptr_t)&kev
;
2312 _dispatch_kq
= guarded_kqueue_np(&guard
, GUARD_CLOSE
| GUARD_DUP
);
2314 _dispatch_kq
= kqueue();
2316 if (_dispatch_kq
== -1) {
2320 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2321 "process is out of file descriptors");
2324 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2325 "system is out of file descriptors");
2328 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2329 "kernel is out of memory");
2332 (void)dispatch_assume_zero(err
);
2333 DISPATCH_CRASH("kqueue() failure");
2337 #if DISPATCH_USE_SELECT_FALLBACK
2338 else if (dispatch_assume(_dispatch_kq
< FD_SETSIZE
)) {
2339 // in case we fall back to select()
2340 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
2342 #endif // DISPATCH_USE_SELECT_FALLBACK
2344 (void)dispatch_assume_zero(kevent_qos(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
,
2346 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
, 0);
2350 _dispatch_get_kq(void)
2352 static dispatch_once_t pred
;
2354 dispatch_once_f(&pred
, NULL
, _dispatch_kq_init
);
2356 return _dispatch_kq
;
2361 _dispatch_kq_update(const _dispatch_kevent_qos_s
*kev
)
2364 _dispatch_kevent_qos_s kev_error
;
2366 #if DISPATCH_USE_SELECT_FALLBACK
2367 if (slowpath(_dispatch_select_workaround
) && (kev
->flags
& EV_DELETE
)) {
2368 if (_dispatch_select_unregister(kev
)) {
2372 #endif // DISPATCH_USE_SELECT_FALLBACK
2373 if (kev
->filter
!= EVFILT_USER
|| DISPATCH_MGR_QUEUE_DEBUG
) {
2374 _dispatch_kevent_debug(kev
, __func__
);
2377 r
= kevent_qos(_dispatch_get_kq(), kev
, 1, &kev_error
,
2378 1, NULL
, NULL
, KEVENT_FLAG_ERROR_EVENTS
);
2379 if (slowpath(r
== -1)) {
2385 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2388 (void)dispatch_assume_zero(err
);
2396 if (kev_error
.flags
& EV_ERROR
&& kev_error
.data
) {
2397 _dispatch_kevent_debug(&kev_error
, __func__
);
2399 r
= (int)kev_error
.data
;
2402 _dispatch_kevent_mgr_debug(&kev_error
, __func__
);
2405 // deferred EV_DELETE
2408 if ((kev
->flags
& EV_DELETE
) && (kev
->flags
& EV_UDATA_SPECIFIC
)) {
2409 // potential concurrent EV_DELETE delivery
2414 if ((kev
->flags
& (EV_ADD
|EV_ENABLE
)) && !(kev
->flags
& EV_DELETE
)) {
2415 #if DISPATCH_USE_SELECT_FALLBACK
2416 if (_dispatch_select_register(&kev_error
)) {
2420 #elif DISPATCH_DEBUG
2421 if (kev
->filter
== EVFILT_READ
|| kev
->filter
== EVFILT_WRITE
) {
2422 DISPATCH_CRASH("Unsupported fd for EVFILT_READ or EVFILT_WRITE "
2425 #endif // DISPATCH_USE_SELECT_FALLBACK
2431 kev_error
.flags
|= kev
->flags
;
2432 _dispatch_kevent_drain(&kev_error
);
2433 r
= (int)kev_error
.data
;
2440 #pragma mark dispatch_mgr
2442 static _dispatch_kevent_qos_s
*_dispatch_kevent_enable
;
2445 _dispatch_mgr_kevent_reenable(_dispatch_kevent_qos_s
*ke
)
2447 dispatch_assert(!_dispatch_kevent_enable
|| _dispatch_kevent_enable
== ke
);
2448 _dispatch_kevent_enable
= ke
;
2452 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED
)
2454 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
2458 static const _dispatch_kevent_qos_s kev
= {
2460 .filter
= EVFILT_USER
,
2461 .fflags
= NOTE_TRIGGER
,
2464 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2465 _dispatch_debug("waking up the dispatch manager queue: %p", dq
);
2468 _dispatch_kq_update(&kev
);
2475 _dispatch_mgr_init(void)
2477 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q
, dq_running
, relaxed
);
2478 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_mgr_q
);
2479 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q
);
2480 _dispatch_mgr_priority_init();
2481 _dispatch_kevent_init();
2482 _dispatch_timers_init();
2483 _dispatch_mach_recv_msg_buf_init();
2484 _dispatch_memorystatus_init();
2487 DISPATCH_NOINLINE DISPATCH_NORETURN
2489 _dispatch_mgr_invoke(void)
2491 _dispatch_kevent_qos_s kev
;
2496 _dispatch_mgr_queue_drain();
2497 poll
= _dispatch_mgr_timers();
2498 #if DISPATCH_USE_SELECT_FALLBACK
2499 if (slowpath(_dispatch_select_workaround
)) {
2500 poll
= _dispatch_mgr_select(poll
);
2501 if (!poll
) continue;
2503 #endif // DISPATCH_USE_SELECT_FALLBACK
2504 poll
= poll
|| _dispatch_queue_class_probe(&_dispatch_mgr_q
);
2505 r
= kevent_qos(_dispatch_kq
, _dispatch_kevent_enable
,
2506 _dispatch_kevent_enable
? 1 : 0, &kev
, 1, NULL
, NULL
,
2507 poll
? KEVENT_FLAG_IMMEDIATE
: KEVENT_FLAG_NONE
);
2508 _dispatch_kevent_enable
= NULL
;
2509 if (slowpath(r
== -1)) {
2515 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2518 (void)dispatch_assume_zero(err
);
2522 _dispatch_kevent_drain(&kev
);
2529 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
,
2530 dispatch_object_t dou DISPATCH_UNUSED
,
2531 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
2533 _dispatch_mgr_init();
2534 // never returns, so burn bridges behind us & clear stack 2k ahead
2535 _dispatch_clear_stack(2048);
2536 _dispatch_mgr_invoke();
2540 #pragma mark dispatch_memorystatus
2542 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2543 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2544 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2545 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2546 DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2547 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2548 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2549 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2552 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2553 static dispatch_source_t _dispatch_memorystatus_source
;
2556 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED
)
2558 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2559 unsigned long memorystatus
;
2560 memorystatus
= dispatch_source_get_data(_dispatch_memorystatus_source
);
2561 if (memorystatus
& DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL
) {
2562 _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
2563 _voucher_activity_heap_pressure_normal();
2566 _dispatch_continuation_cache_limit
=
2567 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN
;
2568 _voucher_activity_heap_pressure_warn();
2570 malloc_zone_pressure_relief(0,0);
2574 _dispatch_memorystatus_init(void)
2576 _dispatch_memorystatus_source
= dispatch_source_create(
2577 DISPATCH_MEMORYSTATUS_SOURCE_TYPE
, 0,
2578 DISPATCH_MEMORYSTATUS_SOURCE_MASK
,
2579 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true));
2580 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source
,
2581 _dispatch_memorystatus_handler
);
2582 dispatch_resume(_dispatch_memorystatus_source
);
2585 static inline void _dispatch_memorystatus_init(void) {}
2586 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2589 #pragma mark dispatch_mach
2593 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2594 #define _dispatch_debug_machport(name) \
2595 dispatch_debug_machport((name), __func__)
2597 #define _dispatch_debug_machport(name) ((void)(name))
2600 // Flags for all notifications that are registered/unregistered when a
2601 // send-possible notification is requested/delivered
2602 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2603 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2604 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2605 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2606 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2607 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2608 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2609 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2611 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2612 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2613 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2615 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2616 #define _DISPATCH_MACHPORT_HASH(x) \
2617 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2619 #ifndef MACH_RCV_LARGE_IDENTITY
2620 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2622 #ifndef MACH_RCV_VOUCHER
2623 #define MACH_RCV_VOUCHER 0x00000800
2625 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2626 #define DISPATCH_MACH_RCV_OPTIONS ( \
2627 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2628 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2629 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
2632 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2634 static void _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s
*ke
);
2635 static void _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s
*ke
);
2636 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t
*hdr
);
2637 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t
*hdr
);
2638 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds
,
2639 dispatch_source_refs_t dr
, dispatch_kevent_t dk
,
2640 mach_msg_header_t
*hdr
, mach_msg_size_t siz
);
2641 static kern_return_t
_dispatch_mach_notify_update(dispatch_kevent_t dk
,
2642 uint32_t new_flags
, uint32_t del_flags
, uint32_t mask
,
2643 mach_msg_id_t notify_msgid
, mach_port_mscount_t notify_sync
);
2644 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
);
2645 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
2646 dispatch_mach_reply_refs_t dmr
, bool disconnected
);
2647 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm
);
2648 static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou
,
2649 mach_msg_option_t options
);
2650 static void _dispatch_mach_msg_recv(dispatch_mach_t dm
,
2651 dispatch_mach_reply_refs_t dmr
, mach_msg_header_t
*hdr
,
2652 mach_msg_size_t siz
);
2653 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm
,
2654 const _dispatch_kevent_qos_s
*ke
);
2655 static inline mach_msg_option_t
_dispatch_mach_checkin_options(void);
2657 static const size_t _dispatch_mach_recv_msg_size
=
2658 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE
;
2659 static const size_t dispatch_mach_trailer_size
=
2660 sizeof(dispatch_mach_trailer_t
);
2661 static mach_msg_size_t _dispatch_mach_recv_msg_buf_size
;
2662 static mach_port_t _dispatch_mach_portset
, _dispatch_mach_recv_portset
;
2663 static mach_port_t _dispatch_mach_notify_port
;
2664 static _dispatch_kevent_qos_s _dispatch_mach_recv_kevent
= {
2665 .filter
= EVFILT_MACHPORT
,
2666 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2667 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
2669 static dispatch_source_t _dispatch_mach_notify_source
;
2671 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct
= {
2673 .filter
= EVFILT_MACHPORT
,
2675 .fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
,
2680 _dispatch_mach_recv_msg_buf_init(void)
2682 mach_vm_size_t vm_size
= mach_vm_round_page(
2683 _dispatch_mach_recv_msg_size
+ dispatch_mach_trailer_size
);
2684 _dispatch_mach_recv_msg_buf_size
= (mach_msg_size_t
)vm_size
;
2685 mach_vm_address_t vm_addr
= vm_page_size
;
2688 while (slowpath(kr
= mach_vm_allocate(mach_task_self(), &vm_addr
, vm_size
,
2689 VM_FLAGS_ANYWHERE
))) {
2690 if (kr
!= KERN_NO_SPACE
) {
2691 (void)dispatch_assume_zero(kr
);
2692 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2694 _dispatch_temporary_resource_shortage();
2695 vm_addr
= vm_page_size
;
2697 _dispatch_mach_recv_kevent
.ext
[0] = (uintptr_t)vm_addr
;
2698 _dispatch_mach_recv_kevent
.ext
[1] = vm_size
;
2702 _dispatch_get_mach_recv_msg_buf(void)
2704 return (void*)_dispatch_mach_recv_kevent
.ext
[0];
2708 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED
)
2712 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
2713 &_dispatch_mach_recv_portset
);
2714 DISPATCH_VERIFY_MIG(kr
);
2715 if (dispatch_assume_zero(kr
)) {
2716 DISPATCH_CLIENT_CRASH(
2717 "mach_port_allocate() failed: cannot create port set");
2719 dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2720 dispatch_assert(dispatch_mach_trailer_size
==
2721 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2722 DISPATCH_MACH_RCV_TRAILER
)));
2723 _dispatch_mach_recv_kevent
.ident
= _dispatch_mach_recv_portset
;
2724 _dispatch_kq_update(&_dispatch_mach_recv_kevent
);
2726 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
2727 &_dispatch_mach_notify_port
);
2728 DISPATCH_VERIFY_MIG(kr
);
2729 if (dispatch_assume_zero(kr
)) {
2730 DISPATCH_CLIENT_CRASH(
2731 "mach_port_allocate() failed: cannot create receive right");
2733 _dispatch_mach_notify_source
= dispatch_source_create(
2734 &_dispatch_source_type_mach_recv_direct
,
2735 _dispatch_mach_notify_port
, 0, &_dispatch_mgr_q
);
2736 static const struct dispatch_continuation_s dc
= {
2737 .dc_func
= (void*)_dispatch_mach_notify_source_invoke
,
2739 _dispatch_mach_notify_source
->ds_refs
->ds_handler
[DS_EVENT_HANDLER
] =
2740 (dispatch_continuation_t
)&dc
;
2741 dispatch_assert(_dispatch_mach_notify_source
);
2742 dispatch_resume(_dispatch_mach_notify_source
);
2746 _dispatch_get_mach_recv_portset(void)
2748 static dispatch_once_t pred
;
2749 dispatch_once_f(&pred
, NULL
, _dispatch_mach_recv_portset_init
);
2750 return _dispatch_mach_recv_portset
;
2754 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED
)
2756 _dispatch_kevent_qos_s kev
= {
2757 .filter
= EVFILT_MACHPORT
,
2762 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
2763 &_dispatch_mach_portset
);
2764 DISPATCH_VERIFY_MIG(kr
);
2765 if (dispatch_assume_zero(kr
)) {
2766 DISPATCH_CLIENT_CRASH(
2767 "mach_port_allocate() failed: cannot create port set");
2769 kev
.ident
= _dispatch_mach_portset
;
2770 _dispatch_kq_update(&kev
);
2774 _dispatch_get_mach_portset(void)
2776 static dispatch_once_t pred
;
2777 dispatch_once_f(&pred
, NULL
, _dispatch_mach_portset_init
);
2778 return _dispatch_mach_portset
;
2781 static kern_return_t
2782 _dispatch_mach_portset_update(dispatch_kevent_t dk
, mach_port_t mps
)
2784 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
2787 _dispatch_debug_machport(mp
);
2788 kr
= mach_port_move_member(mach_task_self(), mp
, mps
);
2790 DISPATCH_VERIFY_MIG(kr
);
2792 case KERN_INVALID_RIGHT
:
2794 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2795 "mach_port_move_member() failed ", kr
);
2799 case KERN_INVALID_NAME
:
2801 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2806 (void)dispatch_assume_zero(kr
);
2810 return mps
? kr
: 0;
2814 _dispatch_kevent_mach_recv_reenable(_dispatch_kevent_qos_s
*ke DISPATCH_UNUSED
)
2816 #if (TARGET_IPHONE_SIMULATOR && \
2817 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2818 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2819 // delete and re-add kevent to workaround <rdar://problem/13924256>
2820 if (ke
->ext
[1] != _dispatch_mach_recv_kevent
.ext
[1]) {
2821 _dispatch_kevent_qos_s kev
= _dispatch_mach_recv_kevent
;
2822 kev
.flags
= EV_DELETE
;
2823 _dispatch_kq_update(&kev
);
2826 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent
);
2829 static kern_return_t
2830 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
2833 kern_return_t kr
= 0;
2834 dispatch_assert_zero(new_flags
& del_flags
);
2835 if ((new_flags
& _DISPATCH_MACH_RECV_FLAGS
) ||
2836 (del_flags
& _DISPATCH_MACH_RECV_FLAGS
)) {
2838 if (new_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
2839 mps
= _dispatch_get_mach_recv_portset();
2840 } else if ((new_flags
& DISPATCH_MACH_RECV_MESSAGE
) ||
2841 ((del_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) &&
2842 (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE
))) {
2843 mps
= _dispatch_get_mach_portset();
2845 mps
= MACH_PORT_NULL
;
2847 kr
= _dispatch_mach_portset_update(dk
, mps
);
2852 static kern_return_t
2853 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
2856 kern_return_t kr
= 0;
2857 dispatch_assert_zero(new_flags
& del_flags
);
2858 if ((new_flags
& _DISPATCH_MACH_SP_FLAGS
) ||
2859 (del_flags
& _DISPATCH_MACH_SP_FLAGS
)) {
2860 // Requesting a (delayed) non-sync send-possible notification
2861 // registers for both immediate dead-name notification and delayed-arm
2862 // send-possible notification for the port.
2863 // The send-possible notification is armed when a mach_msg() with the
2864 // the MACH_SEND_NOTIFY to the port times out.
2865 // If send-possible is unavailable, fall back to immediate dead-name
2866 // registration rdar://problem/2527840&9008724
2867 kr
= _dispatch_mach_notify_update(dk
, new_flags
, del_flags
,
2868 _DISPATCH_MACH_SP_FLAGS
, MACH_NOTIFY_SEND_POSSIBLE
,
2869 MACH_NOTIFY_SEND_POSSIBLE
== MACH_NOTIFY_DEAD_NAME
? 1 : 0);
2875 _dispatch_kevent_mach_portset(_dispatch_kevent_qos_s
*ke
)
2877 if (ke
->ident
== _dispatch_mach_recv_portset
) {
2878 return _dispatch_kevent_mach_msg_drain(ke
);
2879 } else if (ke
->ident
== _dispatch_mach_portset
) {
2880 return _dispatch_kevent_machport_drain(ke
);
2882 return _dispatch_kevent_error(ke
);
2888 _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s
*ke
)
2890 _dispatch_kevent_debug(ke
, __func__
);
2891 mach_port_t name
= (mach_port_name_t
)ke
->data
;
2892 dispatch_kevent_t dk
;
2894 _dispatch_debug_machport(name
);
2895 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
2896 if (!dispatch_assume(dk
)) {
2899 _dispatch_mach_portset_update(dk
, MACH_PORT_NULL
); // emulate EV_DISPATCH
2901 _dispatch_kevent_qos_s kev
= {
2903 .filter
= EVFILT_MACHPORT
,
2904 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2905 .fflags
= DISPATCH_MACH_RECV_MESSAGE
,
2906 .udata
= (uintptr_t)dk
,
2908 _dispatch_kevent_debug(&kev
, __func__
);
2909 _dispatch_kevent_merge(&kev
);
2914 _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s
*ke
)
2916 _dispatch_kevent_debug(ke
, __func__
);
2917 mach_msg_header_t
*hdr
= (mach_msg_header_t
*)ke
->ext
[0];
2918 mach_msg_size_t siz
, msgsiz
;
2919 mach_msg_return_t kr
= (mach_msg_return_t
)ke
->fflags
;
2921 _dispatch_kevent_mach_recv_reenable(ke
);
2922 if (!dispatch_assume(hdr
)) {
2923 DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2925 if (fastpath(!kr
)) {
2926 return _dispatch_kevent_mach_msg_recv(hdr
);
2927 } else if (kr
!= MACH_RCV_TOO_LARGE
) {
2930 if (!dispatch_assume(ke
->ext
[1] <= UINT_MAX
-
2931 dispatch_mach_trailer_size
)) {
2932 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2934 siz
= (mach_msg_size_t
)ke
->ext
[1] + dispatch_mach_trailer_size
;
2937 if (!dispatch_assume(hdr
)) {
2938 // Kernel will discard message too large to fit
2939 hdr
= _dispatch_get_mach_recv_msg_buf();
2940 siz
= _dispatch_mach_recv_msg_buf_size
;
2942 mach_port_t name
= (mach_port_name_t
)ke
->data
;
2943 const mach_msg_option_t options
= ((DISPATCH_MACH_RCV_OPTIONS
|
2944 MACH_RCV_TIMEOUT
) & ~MACH_RCV_LARGE
);
2945 kr
= mach_msg(hdr
, options
, 0, siz
, name
, MACH_MSG_TIMEOUT_NONE
,
2947 if (fastpath(!kr
)) {
2948 return _dispatch_kevent_mach_msg_recv(hdr
);
2949 } else if (kr
== MACH_RCV_TOO_LARGE
) {
2950 _dispatch_log("BUG in libdispatch client: "
2951 "_dispatch_kevent_mach_msg_drain: dropped message too "
2952 "large to fit in memory: id = 0x%x, size = %lld",
2953 hdr
->msgh_id
, ke
->ext
[1]);
2954 kr
= MACH_MSG_SUCCESS
;
2957 // We don't know which port in the portset contains the large message,
2958 // so need to receive all messages pending on the portset to ensure the
2959 // large message is drained. <rdar://problem/13950432>
2960 bool received
= false;
2962 if (!dispatch_assume(hdr
)) {
2963 DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2965 const mach_msg_option_t options
= (DISPATCH_MACH_RCV_OPTIONS
|
2967 kr
= mach_msg(hdr
, options
, 0, siz
, _dispatch_mach_recv_portset
,
2968 MACH_MSG_TIMEOUT_NONE
, MACH_PORT_NULL
);
2969 if ((!kr
|| kr
== MACH_RCV_TOO_LARGE
) && !dispatch_assume(
2970 hdr
->msgh_size
<= UINT_MAX
- dispatch_mach_trailer_size
)) {
2971 DISPATCH_CRASH("Overlarge message");
2973 if (fastpath(!kr
)) {
2974 msgsiz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
2976 void *shrink
= realloc(hdr
, msgsiz
);
2977 if (shrink
) hdr
= shrink
;
2979 _dispatch_kevent_mach_msg_recv(hdr
);
2982 } else if (kr
== MACH_RCV_TOO_LARGE
) {
2983 siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
2985 if (kr
== MACH_RCV_TIMED_OUT
&& received
) {
2986 kr
= MACH_MSG_SUCCESS
;
2990 hdr
= reallocf(hdr
, siz
);
2993 if (hdr
!= _dispatch_get_mach_recv_msg_buf()) {
2998 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2999 "message reception failed", kr
);
3004 _dispatch_kevent_mach_msg_recv(mach_msg_header_t
*hdr
)
3006 dispatch_source_refs_t dri
;
3007 dispatch_kevent_t dk
;
3008 mach_port_t name
= hdr
->msgh_local_port
;
3009 mach_msg_size_t siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
3011 if (!dispatch_assume(hdr
->msgh_size
<= UINT_MAX
-
3012 dispatch_mach_trailer_size
)) {
3013 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3014 "received overlarge message");
3015 return _dispatch_kevent_mach_msg_destroy(hdr
);
3017 if (!dispatch_assume(name
)) {
3018 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3019 "received message with MACH_PORT_NULL port");
3020 return _dispatch_kevent_mach_msg_destroy(hdr
);
3022 _dispatch_debug_machport(name
);
3023 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
3024 if (!dispatch_assume(dk
)) {
3025 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3026 "received message with unknown kevent");
3027 return _dispatch_kevent_mach_msg_destroy(hdr
);
3029 _dispatch_kevent_debug(&dk
->dk_kevent
, __func__
);
3030 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
3031 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3032 if (dsi
->ds_pending_data_mask
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
3033 return _dispatch_source_merge_mach_msg(dsi
, dri
, dk
, hdr
, siz
);
3036 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3037 "received message with no listeners");
3038 return _dispatch_kevent_mach_msg_destroy(hdr
);
3042 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t
*hdr
)
3045 mach_msg_destroy(hdr
);
3046 if (hdr
!= _dispatch_get_mach_recv_msg_buf()) {
3053 _dispatch_source_merge_mach_msg(dispatch_source_t ds
, dispatch_source_refs_t dr
,
3054 dispatch_kevent_t dk
, mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
3056 if (ds
== _dispatch_mach_notify_source
) {
3057 _dispatch_mach_notify_source_invoke(hdr
);
3058 return _dispatch_kevent_mach_msg_destroy(hdr
);
3060 dispatch_mach_reply_refs_t dmr
= NULL
;
3061 if (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
) {
3062 dmr
= (dispatch_mach_reply_refs_t
)dr
;
3064 return _dispatch_mach_msg_recv((dispatch_mach_t
)ds
, dmr
, hdr
, siz
);
3069 _dispatch_mach_notify_merge(mach_port_t name
, uint32_t flag
, bool final
)
3071 dispatch_source_refs_t dri
, dr_next
;
3072 dispatch_kevent_t dk
;
3075 dk
= _dispatch_kevent_find(name
, DISPATCH_EVFILT_MACH_NOTIFICATION
);
3080 // Update notification registration state.
3081 dk
->dk_kevent
.data
&= ~_DISPATCH_MACH_SP_FLAGS
;
3082 _dispatch_kevent_qos_s kev
= {
3084 .filter
= DISPATCH_EVFILT_MACH_NOTIFICATION
,
3085 .flags
= EV_ADD
|EV_ENABLE
,
3087 .udata
= (uintptr_t)dk
,
3090 // This can never happen again
3093 // Re-register for notification before delivery
3094 unreg
= _dispatch_kevent_resume(dk
, flag
, 0);
3096 DISPATCH_MACH_KEVENT_ARMED(dk
) = 0;
3097 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
3098 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
3099 if (dx_type(dsi
) == DISPATCH_MACH_CHANNEL_TYPE
) {
3100 dispatch_mach_t dm
= (dispatch_mach_t
)dsi
;
3101 _dispatch_mach_merge_kevent(dm
, &kev
);
3102 if (unreg
&& dm
->dm_dkev
) {
3103 _dispatch_mach_kevent_unregister(dm
);
3106 _dispatch_source_merge_kevent(dsi
, &kev
);
3108 _dispatch_source_kevent_unregister(dsi
);
3111 if (!dr_next
|| DISPATCH_MACH_KEVENT_ARMED(dk
)) {
3112 // current merge is last in list (dk might have been freed)
3113 // or it re-armed the notification
3119 static kern_return_t
3120 _dispatch_mach_notify_update(dispatch_kevent_t dk
, uint32_t new_flags
,
3121 uint32_t del_flags
, uint32_t mask
, mach_msg_id_t notify_msgid
,
3122 mach_port_mscount_t notify_sync
)
3124 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
3125 typeof(dk
->dk_kevent
.data
) prev
= dk
->dk_kevent
.data
;
3126 kern_return_t kr
, krr
= 0;
3128 // Update notification registration state.
3129 dk
->dk_kevent
.data
|= (new_flags
| dk
->dk_kevent
.fflags
) & mask
;
3130 dk
->dk_kevent
.data
&= ~(del_flags
& mask
);
3132 _dispatch_debug_machport(port
);
3133 if ((dk
->dk_kevent
.data
& mask
) && !(prev
& mask
)) {
3134 // initialize _dispatch_mach_notify_port:
3135 (void)_dispatch_get_mach_recv_portset();
3136 _dispatch_debug("machport[0x%08x]: registering for send-possible "
3137 "notification", port
);
3138 previous
= MACH_PORT_NULL
;
3139 krr
= mach_port_request_notification(mach_task_self(), port
,
3140 notify_msgid
, notify_sync
, _dispatch_mach_notify_port
,
3141 MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
3142 DISPATCH_VERIFY_MIG(krr
);
3145 case KERN_INVALID_NAME
:
3146 case KERN_INVALID_RIGHT
:
3147 // Supress errors & clear registration state
3148 dk
->dk_kevent
.data
&= ~mask
;
3151 // Else, we dont expect any errors from mach. Log any errors
3152 if (dispatch_assume_zero(krr
)) {
3153 // log the error & clear registration state
3154 dk
->dk_kevent
.data
&= ~mask
;
3155 } else if (dispatch_assume_zero(previous
)) {
3156 // Another subsystem has beat libdispatch to requesting the
3157 // specified Mach notification on this port. We should
3158 // technically cache the previous port and message it when the
3159 // kernel messages our port. Or we can just say screw those
3160 // subsystems and deallocate the previous port.
3161 // They should adopt libdispatch :-P
3162 kr
= mach_port_deallocate(mach_task_self(), previous
);
3163 DISPATCH_VERIFY_MIG(kr
);
3164 (void)dispatch_assume_zero(kr
);
3165 previous
= MACH_PORT_NULL
;
3168 } else if (!(dk
->dk_kevent
.data
& mask
) && (prev
& mask
)) {
3169 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
3170 "notification", port
);
3171 previous
= MACH_PORT_NULL
;
3172 kr
= mach_port_request_notification(mach_task_self(), port
,
3173 notify_msgid
, notify_sync
, MACH_PORT_NULL
,
3174 MACH_MSG_TYPE_MOVE_SEND_ONCE
, &previous
);
3175 DISPATCH_VERIFY_MIG(kr
);
3178 case KERN_INVALID_NAME
:
3179 case KERN_INVALID_RIGHT
:
3180 case KERN_INVALID_ARGUMENT
:
3183 if (dispatch_assume_zero(kr
)) {
3190 if (slowpath(previous
)) {
3191 // the kernel has not consumed the send-once right yet
3192 (void)dispatch_assume_zero(
3193 _dispatch_send_consume_send_once_right(previous
));
3199 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED
)
3201 (void)_dispatch_get_mach_recv_portset();
3202 _dispatch_debug("registering for calendar-change notification");
3203 kern_return_t kr
= host_request_notification(_dispatch_get_mach_host_port(),
3204 HOST_NOTIFY_CALENDAR_CHANGE
, _dispatch_mach_notify_port
);
3205 DISPATCH_VERIFY_MIG(kr
);
3206 (void)dispatch_assume_zero(kr
);
3210 _dispatch_mach_host_calendar_change_register(void)
3212 static dispatch_once_t pred
;
3213 dispatch_once_f(&pred
, NULL
, _dispatch_mach_host_notify_update
);
3217 _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
)
3219 mig_reply_error_t reply
;
3220 dispatch_assert(sizeof(mig_reply_error_t
) == sizeof(union
3221 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
));
3222 dispatch_assert(sizeof(mig_reply_error_t
) < _dispatch_mach_recv_msg_size
);
3223 boolean_t success
= libdispatch_internal_protocol_server(hdr
, &reply
.Head
);
3224 if (!success
&& reply
.RetCode
== MIG_BAD_ID
&& hdr
->msgh_id
== 950) {
3225 // host_notify_reply.defs: host_calendar_changed
3226 _dispatch_debug("calendar-change notification");
3227 _dispatch_timers_calendar_change();
3228 _dispatch_mach_host_notify_update(NULL
);
3230 reply
.RetCode
= KERN_SUCCESS
;
3232 if (dispatch_assume(success
) && reply
.RetCode
!= MIG_NO_REPLY
) {
3233 (void)dispatch_assume_zero(reply
.RetCode
);
3238 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED
,
3239 mach_port_name_t name
)
3242 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
3243 "deleted prematurely", name
);
3246 _dispatch_debug_machport(name
);
3247 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DELETED
, true);
3249 return KERN_SUCCESS
;
3253 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED
,
3254 mach_port_name_t name
)
3258 _dispatch_debug("machport[0x%08x]: dead-name notification", name
);
3259 _dispatch_debug_machport(name
);
3260 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DEAD
, true);
3262 // the act of receiving a dead name notification allocates a dead-name
3263 // right that must be deallocated
3264 kr
= mach_port_deallocate(mach_task_self(), name
);
3265 DISPATCH_VERIFY_MIG(kr
);
3266 //(void)dispatch_assume_zero(kr);
3268 return KERN_SUCCESS
;
3272 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED
,
3273 mach_port_name_t name
)
3275 _dispatch_debug("machport[0x%08x]: send-possible notification", name
);
3276 _dispatch_debug_machport(name
);
3277 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_POSSIBLE
, false);
3279 return KERN_SUCCESS
;
3283 #pragma mark dispatch_mach_t
3285 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
3286 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3287 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3289 static mach_port_t
_dispatch_mach_msg_get_remote_port(dispatch_object_t dou
);
3290 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm
,
3291 mach_port_t local_port
, mach_port_t remote_port
);
3292 static dispatch_mach_msg_t
_dispatch_mach_msg_create_reply_disconnected(
3293 dispatch_object_t dou
, dispatch_mach_reply_refs_t dmr
);
3294 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
,
3295 dispatch_object_t dou
);
3296 static inline mach_msg_header_t
* _dispatch_mach_msg_get_msg(
3297 dispatch_mach_msg_t dmsg
);
3298 static void _dispatch_mach_push(dispatch_object_t dm
, dispatch_object_t dou
,
3299 pthread_priority_t pp
);
3301 static dispatch_mach_t
3302 _dispatch_mach_create(const char *label
, dispatch_queue_t q
, void *context
,
3303 dispatch_mach_handler_function_t handler
, bool handler_is_block
)
3306 dispatch_mach_refs_t dr
;
3308 dm
= _dispatch_alloc(DISPATCH_VTABLE(mach
),
3309 sizeof(struct dispatch_mach_s
));
3310 _dispatch_queue_init((dispatch_queue_t
)dm
);
3311 dm
->dq_label
= label
;
3313 dm
->do_ref_cnt
++; // the reference _dispatch_mach_cancel_invoke holds
3314 dm
->do_ref_cnt
++; // since channel is created suspended
3315 dm
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
3316 dm
->do_targetq
= &_dispatch_mgr_q
;
3318 dr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s
));
3319 dr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3320 dr
->dm_handler_func
= handler
;
3321 dr
->dm_handler_ctxt
= context
;
3323 dm
->dm_handler_is_block
= handler_is_block
;
3325 dm
->dm_refs
= _dispatch_calloc(1ul,
3326 sizeof(struct dispatch_mach_send_refs_s
));
3327 dm
->dm_refs
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3328 dm
->dm_refs
->dm_disconnect_cnt
= DISPATCH_MACH_NEVER_CONNECTED
;
3329 TAILQ_INIT(&dm
->dm_refs
->dm_replies
);
3331 // First item on the channel sets the user-specified target queue
3332 dispatch_set_target_queue(dm
, q
);
3333 _dispatch_object_debug(dm
, "%s", __func__
);
3338 dispatch_mach_create(const char *label
, dispatch_queue_t q
,
3339 dispatch_mach_handler_t handler
)
3341 dispatch_block_t bb
= _dispatch_Block_copy((void*)handler
);
3342 return _dispatch_mach_create(label
, q
, bb
,
3343 (dispatch_mach_handler_function_t
)_dispatch_Block_invoke(bb
), true);
3347 dispatch_mach_create_f(const char *label
, dispatch_queue_t q
, void *context
,
3348 dispatch_mach_handler_function_t handler
)
3350 return _dispatch_mach_create(label
, q
, context
, handler
, false);
3354 _dispatch_mach_dispose(dispatch_mach_t dm
)
3356 _dispatch_object_debug(dm
, "%s", __func__
);
3357 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3358 if (dm
->dm_handler_is_block
&& dr
->dm_handler_ctxt
) {
3359 Block_release(dr
->dm_handler_ctxt
);
3363 _dispatch_queue_destroy(dm
);
3367 dispatch_mach_connect(dispatch_mach_t dm
, mach_port_t receive
,
3368 mach_port_t send
, dispatch_mach_msg_t checkin
)
3370 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3371 dispatch_kevent_t dk
;
3373 if (MACH_PORT_VALID(receive
)) {
3374 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3375 dk
->dk_kevent
= _dispatch_source_type_mach_recv_direct
.ke
;
3376 dk
->dk_kevent
.ident
= receive
;
3377 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3378 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3379 TAILQ_INIT(&dk
->dk_sources
);
3381 dm
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
3382 _dispatch_retain(dm
); // the reference the manager queue holds
3385 if (MACH_PORT_VALID(send
)) {
3387 dispatch_retain(checkin
);
3388 mach_msg_option_t options
= _dispatch_mach_checkin_options();
3389 _dispatch_mach_msg_set_options(checkin
, options
);
3390 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
3392 dr
->dm_checkin
= checkin
;
3394 // monitor message reply ports
3395 dm
->ds_pending_data_mask
|= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
3396 if (slowpath(!dispatch_atomic_cmpxchg2o(dr
, dm_disconnect_cnt
,
3397 DISPATCH_MACH_NEVER_CONNECTED
, 0, release
))) {
3398 DISPATCH_CLIENT_CRASH("Channel already connected");
3400 _dispatch_object_debug(dm
, "%s", __func__
);
3401 return dispatch_resume(dm
);
3406 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
3407 dispatch_mach_reply_refs_t dmr
, bool disconnected
)
3409 dispatch_mach_msg_t dmsgr
= NULL
;
3411 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(NULL
, dmr
);
3413 dispatch_kevent_t dk
= dmr
->dmr_dkev
;
3414 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dmr
, dr_list
);
3415 _dispatch_kevent_unregister(dk
, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
, 0);
3416 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
3417 if (dmr
->dmr_voucher
) _voucher_release(dmr
->dmr_voucher
);
3419 if (dmsgr
) _dispatch_mach_push(dm
, dmsgr
, dmsgr
->dmsg_priority
);
3424 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm
, mach_port_t reply
,
3425 dispatch_mach_msg_t dmsg
)
3427 dispatch_kevent_t dk
;
3428 dispatch_mach_reply_refs_t dmr
;
3430 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3431 dk
->dk_kevent
= _dispatch_source_type_mach_recv_direct
.ke
;
3432 dk
->dk_kevent
.ident
= reply
;
3433 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3434 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
3435 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3436 TAILQ_INIT(&dk
->dk_sources
);
3438 dmr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s
));
3439 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3441 if (dmsg
->dmsg_voucher
) {
3442 dmr
->dmr_voucher
=_voucher_retain(dmsg
->dmsg_voucher
);
3444 dmr
->dmr_priority
= dmsg
->dmsg_priority
;
3445 // make reply context visible to leaks rdar://11777199
3446 dmr
->dmr_ctxt
= dmsg
->do_ctxt
;
3448 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply
,
3451 bool do_resume
= _dispatch_kevent_register(&dmr
->dmr_dkev
, &flags
);
3452 TAILQ_INSERT_TAIL(&dmr
->dmr_dkev
->dk_sources
, (dispatch_source_refs_t
)dmr
,
3454 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dmr_list
);
3455 if (do_resume
&& _dispatch_kevent_resume(dmr
->dmr_dkev
, flags
, 0)) {
3456 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, true);
3462 _dispatch_mach_kevent_unregister(dispatch_mach_t dm
)
3464 dispatch_kevent_t dk
= dm
->dm_dkev
;
3466 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dm
->dm_refs
,
3468 dm
->ds_pending_data_mask
&= ~(unsigned long)
3469 (DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
);
3470 _dispatch_kevent_unregister(dk
,
3471 DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
, 0);
3476 _dispatch_mach_kevent_register(dispatch_mach_t dm
, mach_port_t send
)
3478 dispatch_kevent_t dk
;
3480 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3481 dk
->dk_kevent
= _dispatch_source_type_mach_send
.ke
;
3482 dk
->dk_kevent
.ident
= send
;
3483 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3484 dk
->dk_kevent
.fflags
= DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
;
3485 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3486 TAILQ_INIT(&dk
->dk_sources
);
3488 dm
->ds_pending_data_mask
|= dk
->dk_kevent
.fflags
;
3491 bool do_resume
= _dispatch_kevent_register(&dk
, &flags
);
3492 TAILQ_INSERT_TAIL(&dk
->dk_sources
,
3493 (dispatch_source_refs_t
)dm
->dm_refs
, dr_list
);
3495 if (do_resume
&& _dispatch_kevent_resume(dm
->dm_dkev
, flags
, 0)) {
3496 _dispatch_mach_kevent_unregister(dm
);
3501 _dispatch_mach_push(dispatch_object_t dm
, dispatch_object_t dou
,
3502 pthread_priority_t pp
)
3504 return _dispatch_queue_push(dm
._dq
, dou
, pp
);
3508 _dispatch_mach_msg_set_options(dispatch_object_t dou
, mach_msg_option_t options
)
3510 dou
._do
->do_suspend_cnt
= (unsigned int)options
;
3513 static inline mach_msg_option_t
3514 _dispatch_mach_msg_get_options(dispatch_object_t dou
)
3516 mach_msg_option_t options
= (mach_msg_option_t
)dou
._do
->do_suspend_cnt
;
3521 _dispatch_mach_msg_set_reason(dispatch_object_t dou
, mach_error_t err
,
3522 unsigned long reason
)
3524 dispatch_assert_zero(reason
& ~(unsigned long)code_emask
);
3525 dou
._do
->do_suspend_cnt
= (unsigned int)((err
|| !reason
) ? err
:
3526 err_local
|err_sub(0x3e0)|(mach_error_t
)reason
);
3529 static inline unsigned long
3530 _dispatch_mach_msg_get_reason(dispatch_object_t dou
, mach_error_t
*err_ptr
)
3532 mach_error_t err
= (mach_error_t
)dou
._do
->do_suspend_cnt
;
3533 dou
._do
->do_suspend_cnt
= 0;
3534 if ((err
& system_emask
) == err_local
&& err_get_sub(err
) == 0x3e0) {
3536 return err_get_code(err
);
3539 return err
? DISPATCH_MACH_MESSAGE_SEND_FAILED
: DISPATCH_MACH_MESSAGE_SENT
;
3543 _dispatch_mach_msg_recv(dispatch_mach_t dm
, dispatch_mach_reply_refs_t dmr
,
3544 mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
3546 _dispatch_debug_machport(hdr
->msgh_remote_port
);
3547 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3548 hdr
->msgh_local_port
, hdr
->msgh_id
, hdr
->msgh_remote_port
);
3549 if (slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
3550 return _dispatch_kevent_mach_msg_destroy(hdr
);
3552 dispatch_mach_msg_t dmsg
;
3554 pthread_priority_t priority
;
3557 _voucher_mach_msg_clear(hdr
, false); // deallocate reply message voucher
3558 voucher
= dmr
->dmr_voucher
;
3559 dmr
->dmr_voucher
= NULL
; // transfer reference
3560 priority
= dmr
->dmr_priority
;
3561 ctxt
= dmr
->dmr_ctxt
;
3562 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, false);
3564 voucher
= voucher_create_with_mach_msg(hdr
);
3565 priority
= _voucher_get_priority(voucher
);
3567 dispatch_mach_msg_destructor_t destructor
;
3568 destructor
= (hdr
== _dispatch_get_mach_recv_msg_buf()) ?
3569 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
3570 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
3571 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
3572 dmsg
->dmsg_voucher
= voucher
;
3573 dmsg
->dmsg_priority
= priority
;
3574 dmsg
->do_ctxt
= ctxt
;
3575 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_MESSAGE_RECEIVED
);
3576 _dispatch_voucher_debug("mach-msg[%p] create", voucher
, dmsg
);
3577 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
3578 return _dispatch_mach_push(dm
, dmsg
, dmsg
->dmsg_priority
);
3581 static inline mach_port_t
3582 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou
)
3584 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
3585 mach_port_t remote
= hdr
->msgh_remote_port
;
3590 _dispatch_mach_msg_disconnected(dispatch_mach_t dm
, mach_port_t local_port
,
3591 mach_port_t remote_port
)
3593 mach_msg_header_t
*hdr
;
3594 dispatch_mach_msg_t dmsg
;
3595 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
3596 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
3597 if (local_port
) hdr
->msgh_local_port
= local_port
;
3598 if (remote_port
) hdr
->msgh_remote_port
= remote_port
;
3599 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_DISCONNECTED
);
3600 return _dispatch_mach_push(dm
, dmsg
, dmsg
->dmsg_priority
);
3603 static inline dispatch_mach_msg_t
3604 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou
,
3605 dispatch_mach_reply_refs_t dmr
)
3607 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
3608 if (dmsg
&& !dmsg
->dmsg_reply
) return NULL
;
3609 mach_msg_header_t
*hdr
;
3610 dmsgr
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
3611 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
3613 hdr
->msgh_local_port
= dmsg
->dmsg_reply
;
3614 if (dmsg
->dmsg_voucher
) {
3615 dmsgr
->dmsg_voucher
= _voucher_retain(dmsg
->dmsg_voucher
);
3617 dmsgr
->dmsg_priority
= dmsg
->dmsg_priority
;
3618 dmsgr
->do_ctxt
= dmsg
->do_ctxt
;
3620 hdr
->msgh_local_port
= (mach_port_t
)dmr
->dmr_dkev
->dk_kevent
.ident
;
3621 dmsgr
->dmsg_voucher
= dmr
->dmr_voucher
;
3622 dmr
->dmr_voucher
= NULL
; // transfer reference
3623 dmsgr
->dmsg_priority
= dmr
->dmr_priority
;
3624 dmsgr
->do_ctxt
= dmr
->dmr_ctxt
;
3626 _dispatch_mach_msg_set_reason(dmsgr
, 0, DISPATCH_MACH_DISCONNECTED
);
3632 _dispatch_mach_msg_not_sent(dispatch_mach_t dm
, dispatch_object_t dou
)
3634 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
;
3635 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
3636 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_MESSAGE_NOT_SENT
);
3637 _dispatch_mach_push(dm
, dmsg
, dmsg
->dmsg_priority
);
3638 if (dmsgr
) _dispatch_mach_push(dm
, dmsgr
, dmsgr
->dmsg_priority
);
3642 static dispatch_object_t
3643 _dispatch_mach_msg_send(dispatch_mach_t dm
, dispatch_object_t dou
)
3645 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3646 dispatch_mach_msg_t dmsg
= dou
._dmsg
, dmsgr
= NULL
;
3647 voucher_t voucher
= dmsg
->dmsg_voucher
;
3648 mach_voucher_t ipc_kvoucher
= MACH_VOUCHER_NULL
;
3649 bool clear_voucher
= false, kvoucher_move_send
= false;
3650 dr
->dm_needs_mgr
= 0;
3651 if (slowpath(dr
->dm_checkin
) && dmsg
!= dr
->dm_checkin
) {
3652 // send initial checkin message
3653 if (dm
->dm_dkev
&& slowpath(_dispatch_queue_get_current() !=
3654 &_dispatch_mgr_q
)) {
3655 // send kevent must be uninstalled on the manager queue
3656 dr
->dm_needs_mgr
= 1;
3659 dr
->dm_checkin
= _dispatch_mach_msg_send(dm
, dr
->dm_checkin
)._dmsg
;
3660 if (slowpath(dr
->dm_checkin
)) {
3664 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
3665 mach_msg_return_t kr
= 0;
3666 mach_port_t reply
= dmsg
->dmsg_reply
;
3667 mach_msg_option_t opts
= 0, msg_opts
= _dispatch_mach_msg_get_options(dmsg
);
3668 if (!slowpath(msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
)) {
3669 opts
= MACH_SEND_MSG
| (msg_opts
& ~DISPATCH_MACH_OPTIONS_MASK
);
3670 if (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) !=
3671 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
3672 if (dmsg
!= dr
->dm_checkin
) {
3673 msg
->msgh_remote_port
= dr
->dm_send
;
3675 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
3676 if (slowpath(!dm
->dm_dkev
)) {
3677 _dispatch_mach_kevent_register(dm
, msg
->msgh_remote_port
);
3679 if (fastpath(dm
->dm_dkev
)) {
3680 if (DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) {
3683 opts
|= MACH_SEND_NOTIFY
;
3686 opts
|= MACH_SEND_TIMEOUT
;
3687 if (dmsg
->dmsg_priority
!= _voucher_get_priority(voucher
)) {
3688 ipc_kvoucher
= _voucher_create_mach_voucher_with_priority(
3689 voucher
, dmsg
->dmsg_priority
);
3691 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher
, dmsg
);
3693 kvoucher_move_send
= true;
3694 clear_voucher
= _voucher_mach_msg_set_mach_voucher(msg
,
3695 ipc_kvoucher
, kvoucher_move_send
);
3697 clear_voucher
= _voucher_mach_msg_set(msg
, voucher
);
3700 _voucher_activity_trace_msg(voucher
, msg
, send
);
3701 _dispatch_debug_machport(msg
->msgh_remote_port
);
3702 if (reply
) _dispatch_debug_machport(reply
);
3703 kr
= mach_msg(msg
, opts
, msg
->msgh_size
, 0, MACH_PORT_NULL
, 0,
3705 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
3706 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
3707 "%s - 0x%x", msg
->msgh_remote_port
, msg
->msgh_id
, dmsg
->do_ctxt
,
3708 opts
, msg_opts
, msg
->msgh_voucher_port
, reply
,
3709 mach_error_string(kr
), kr
);
3710 if (clear_voucher
) {
3711 if (kr
== MACH_SEND_INVALID_VOUCHER
&& msg
->msgh_voucher_port
) {
3712 DISPATCH_CRASH("Voucher port corruption");
3715 kv
= _voucher_mach_msg_clear(msg
, kvoucher_move_send
);
3716 if (kvoucher_move_send
) ipc_kvoucher
= kv
;
3719 if (kr
== MACH_SEND_TIMED_OUT
&& (opts
& MACH_SEND_TIMEOUT
)) {
3720 if (opts
& MACH_SEND_NOTIFY
) {
3721 _dispatch_debug("machport[0x%08x]: send-possible notification "
3722 "armed", (mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
);
3723 DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
) = 1;
3725 // send kevent must be installed on the manager queue
3726 dr
->dm_needs_mgr
= 1;
3729 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher
);
3730 voucher_t ipc_voucher
;
3731 ipc_voucher
= _voucher_create_with_priority_and_mach_voucher(
3732 voucher
, dmsg
->dmsg_priority
, ipc_kvoucher
);
3733 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
3734 ipc_voucher
, dmsg
, voucher
);
3735 if (dmsg
->dmsg_voucher
) _voucher_release(dmsg
->dmsg_voucher
);
3736 dmsg
->dmsg_voucher
= ipc_voucher
;
3739 } else if (ipc_kvoucher
&& (kr
|| !kvoucher_move_send
)) {
3740 _voucher_dealloc_mach_voucher(ipc_kvoucher
);
3742 if (fastpath(!kr
) && reply
&&
3743 !(dm
->ds_dkev
&& dm
->ds_dkev
->dk_kevent
.ident
== reply
)) {
3744 if (_dispatch_queue_get_current() != &_dispatch_mgr_q
) {
3745 // reply receive kevent must be installed on the manager queue
3746 dr
->dm_needs_mgr
= 1;
3747 _dispatch_mach_msg_set_options(dmsg
, msg_opts
|
3748 DISPATCH_MACH_REGISTER_FOR_REPLY
);
3751 _dispatch_mach_reply_kevent_register(dm
, reply
, dmsg
);
3753 if (slowpath(dmsg
== dr
->dm_checkin
) && dm
->dm_dkev
) {
3754 _dispatch_mach_kevent_unregister(dm
);
3757 // Send failed, so reply was never connected <rdar://problem/14309159>
3758 dmsgr
= _dispatch_mach_msg_create_reply_disconnected(dmsg
, NULL
);
3760 _dispatch_mach_msg_set_reason(dmsg
, kr
, 0);
3761 _dispatch_mach_push(dm
, dmsg
, dmsg
->dmsg_priority
);
3762 if (dmsgr
) _dispatch_mach_push(dm
, dmsgr
, dmsgr
->dmsg_priority
);
3765 return (dispatch_object_t
)dmsg
;
3768 DISPATCH_ALWAYS_INLINE
3770 _dispatch_mach_send_push_wakeup(dispatch_mach_t dm
, dispatch_object_t dou
,
3773 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3774 struct dispatch_object_s
*prev
, *dc
= dou
._do
;
3777 prev
= dispatch_atomic_xchg2o(dr
, dm_tail
, dc
, release
);
3778 if (fastpath(prev
)) {
3783 if (wakeup
|| !prev
) {
3784 _dispatch_wakeup(dm
);
3788 DISPATCH_ALWAYS_INLINE
3790 _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_object_t dou
)
3792 return _dispatch_mach_send_push_wakeup(dm
, dou
, false);
3797 _dispatch_mach_send_drain(dispatch_mach_t dm
)
3799 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3800 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
3801 while (dr
->dm_tail
) {
3802 _dispatch_wait_until(dc
= fastpath(dr
->dm_head
));
3804 next_dc
= fastpath(dc
->do_next
);
3805 dr
->dm_head
= next_dc
;
3806 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dr
, dm_tail
, dc
, NULL
,
3808 _dispatch_wait_until(next_dc
= fastpath(dc
->do_next
));
3809 dr
->dm_head
= next_dc
;
3811 if (!DISPATCH_OBJ_IS_VTABLE(dc
)) {
3812 if ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
3814 // leave send queue locked until barrier has completed
3815 return _dispatch_mach_push(dm
, dc
,
3816 ((dispatch_continuation_t
)dc
)->dc_priority
);
3818 #if DISPATCH_MACH_SEND_SYNC
3819 if (slowpath((long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
)){
3820 _dispatch_thread_semaphore_signal(
3821 (_dispatch_thread_semaphore_t
)dc
->do_ctxt
);
3824 #endif // DISPATCH_MACH_SEND_SYNC
3825 if (slowpath(!_dispatch_mach_reconnect_invoke(dm
, dc
))) {
3830 _dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t
)dc
);
3831 if (slowpath(dr
->dm_disconnect_cnt
) ||
3832 slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
3833 _dispatch_mach_msg_not_sent(dm
, dc
);
3836 if (slowpath(dc
= _dispatch_mach_msg_send(dm
, dc
)._do
)) {
3839 } while ((dc
= next_dc
));
3842 // if this is not a complete drain, we must undo some things
3845 !dispatch_atomic_cmpxchg2o(dr
, dm_tail
, NULL
, dc
, relaxed
)) {
3846 // wait for enqueue slow path to finish
3847 _dispatch_wait_until(next_dc
= fastpath(dr
->dm_head
));
3848 dc
->do_next
= next_dc
;
3852 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3853 _dispatch_wakeup(dm
);
3857 _dispatch_mach_send(dispatch_mach_t dm
)
3859 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3860 if (!fastpath(dr
->dm_tail
) || !fastpath(dispatch_atomic_cmpxchg2o(dr
,
3861 dm_sending
, 0, 1, acquire
))) {
3864 _dispatch_object_debug(dm
, "%s", __func__
);
3865 _dispatch_mach_send_drain(dm
);
3869 _dispatch_mach_merge_kevent(dispatch_mach_t dm
,
3870 const _dispatch_kevent_qos_s
*ke
)
3872 if (!(ke
->fflags
& dm
->ds_pending_data_mask
)) {
3875 _dispatch_mach_send(dm
);
3878 static inline mach_msg_option_t
3879 _dispatch_mach_checkin_options(void)
3881 mach_msg_option_t options
= 0;
3882 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
3883 options
= MACH_SEND_NOIMPORTANCE
; // <rdar://problem/16996737>
3889 static inline mach_msg_option_t
3890 _dispatch_mach_send_options(void)
3892 mach_msg_option_t options
= 0;
3896 DISPATCH_ALWAYS_INLINE
3897 static inline pthread_priority_t
3898 _dispatch_mach_priority_propagate(mach_msg_option_t options
)
3900 #if DISPATCH_USE_NOIMPORTANCE_QOS
3901 if (options
& MACH_SEND_NOIMPORTANCE
) return 0;
3905 return _dispatch_priority_propagate();
3910 dispatch_mach_send(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
3911 mach_msg_option_t options
)
3913 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3914 if (slowpath(dmsg
->do_next
!= DISPATCH_OBJECT_LISTLESS
)) {
3915 DISPATCH_CLIENT_CRASH("Message already enqueued");
3917 dispatch_retain(dmsg
);
3918 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
3919 pthread_priority_t priority
= _dispatch_mach_priority_propagate(options
);
3920 options
|= _dispatch_mach_send_options();
3921 _dispatch_mach_msg_set_options(dmsg
, options
& ~DISPATCH_MACH_OPTIONS_MASK
);
3922 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
3923 dmsg
->dmsg_reply
= (MACH_MSGH_BITS_LOCAL(msg
->msgh_bits
) ==
3924 MACH_MSG_TYPE_MAKE_SEND_ONCE
&&
3925 MACH_PORT_VALID(msg
->msgh_local_port
) ? msg
->msgh_local_port
:
3927 bool is_reply
= (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) ==
3928 MACH_MSG_TYPE_MOVE_SEND_ONCE
);
3929 dmsg
->dmsg_priority
= priority
;
3930 dmsg
->dmsg_voucher
= _voucher_copy();
3931 _dispatch_voucher_debug("mach-msg[%p] set", dmsg
->dmsg_voucher
, dmsg
);
3932 if ((!is_reply
&& slowpath(dr
->dm_tail
)) ||
3933 slowpath(dr
->dm_disconnect_cnt
) ||
3934 slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
) ||
3935 slowpath(!dispatch_atomic_cmpxchg2o(dr
, dm_sending
, 0, 1,
3937 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
3938 return _dispatch_mach_send_push(dm
, dmsg
);
3940 if (slowpath(dmsg
= _dispatch_mach_msg_send(dm
, dmsg
)._dmsg
)) {
3941 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3942 _dispatch_voucher_ktrace_dmsg_push(dmsg
);
3943 return _dispatch_mach_send_push_wakeup(dm
, dmsg
, true);
3945 if (!is_reply
&& slowpath(dr
->dm_tail
)) {
3946 return _dispatch_mach_send_drain(dm
);
3948 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3949 _dispatch_wakeup(dm
);
3953 _dispatch_mach_disconnect(dispatch_mach_t dm
)
3955 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3957 _dispatch_mach_kevent_unregister(dm
);
3959 if (MACH_PORT_VALID(dr
->dm_send
)) {
3960 _dispatch_mach_msg_disconnected(dm
, MACH_PORT_NULL
, dr
->dm_send
);
3962 dr
->dm_send
= MACH_PORT_NULL
;
3963 if (dr
->dm_checkin
) {
3964 _dispatch_mach_msg_not_sent(dm
, dr
->dm_checkin
);
3965 dr
->dm_checkin
= NULL
;
3967 if (!TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
3968 dispatch_mach_reply_refs_t dmr
, tmp
;
3969 TAILQ_FOREACH_SAFE(dmr
, &dm
->dm_refs
->dm_replies
, dmr_list
, tmp
){
3970 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, true);
3977 _dispatch_mach_cancel(dispatch_mach_t dm
)
3979 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3980 if (!fastpath(dispatch_atomic_cmpxchg2o(dr
, dm_sending
, 0, 1, acquire
))) {
3983 _dispatch_object_debug(dm
, "%s", __func__
);
3984 _dispatch_mach_disconnect(dm
);
3986 mach_port_t local_port
= (mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
;
3987 _dispatch_source_kevent_unregister((dispatch_source_t
)dm
);
3988 _dispatch_mach_msg_disconnected(dm
, local_port
, MACH_PORT_NULL
);
3990 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3996 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
, dispatch_object_t dou
)
3998 if (dm
->dm_dkev
|| !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
3999 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q
)) {
4000 // send/reply kevents must be uninstalled on the manager queue
4004 _dispatch_mach_disconnect(dm
);
4005 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4006 dr
->dm_checkin
= dou
._dc
->dc_data
;
4007 dr
->dm_send
= (mach_port_t
)dou
._dc
->dc_other
;
4008 _dispatch_continuation_free(dou
._dc
);
4009 (void)dispatch_atomic_dec2o(dr
, dm_disconnect_cnt
, relaxed
);
4010 _dispatch_object_debug(dm
, "%s", __func__
);
4016 dispatch_mach_reconnect(dispatch_mach_t dm
, mach_port_t send
,
4017 dispatch_mach_msg_t checkin
)
4019 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4020 (void)dispatch_atomic_inc2o(dr
, dm_disconnect_cnt
, relaxed
);
4021 if (MACH_PORT_VALID(send
) && checkin
) {
4022 dispatch_retain(checkin
);
4023 mach_msg_option_t options
= _dispatch_mach_checkin_options();
4024 _dispatch_mach_msg_set_options(checkin
, options
);
4025 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
4028 dr
->dm_checkin_port
= MACH_PORT_NULL
;
4030 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4031 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
);
4032 dc
->dc_func
= (void*)_dispatch_mach_reconnect_invoke
;
4034 dc
->dc_data
= checkin
;
4035 dc
->dc_other
= (void*)(uintptr_t)send
;
4036 return _dispatch_mach_send_push(dm
, dc
);
4039 #if DISPATCH_MACH_SEND_SYNC
4042 _dispatch_mach_send_sync_slow(dispatch_mach_t dm
)
4044 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
4045 struct dispatch_object_s dc
= {
4046 .do_vtable
= (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT
),
4047 .do_ctxt
= (void*)sema
,
4049 _dispatch_mach_send_push(dm
, &dc
);
4050 _dispatch_thread_semaphore_wait(sema
);
4051 _dispatch_put_thread_semaphore(sema
);
4053 #endif // DISPATCH_MACH_SEND_SYNC
4057 dispatch_mach_get_checkin_port(dispatch_mach_t dm
)
4059 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4060 if (slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
4061 return MACH_PORT_DEAD
;
4063 return dr
->dm_checkin_port
;
4068 _dispatch_mach_connect_invoke(dispatch_mach_t dm
)
4070 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4071 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
4072 DISPATCH_MACH_CONNECTED
, NULL
, 0, dr
->dm_handler_func
);
4073 dm
->dm_connect_handler_called
= 1;
4078 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg
,
4079 dispatch_object_t dou DISPATCH_UNUSED
,
4080 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
4082 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
4083 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4085 unsigned long reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
4087 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
4088 _dispatch_thread_setspecific(dispatch_queue_key
, dm
->do_targetq
);
4089 _dispatch_voucher_ktrace_dmsg_pop(dmsg
);
4090 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg
->dmsg_voucher
, dmsg
);
4091 _dispatch_adopt_priority_and_replace_voucher(dmsg
->dmsg_priority
,
4092 dmsg
->dmsg_voucher
, DISPATCH_PRIORITY_ENFORCE
);
4093 dmsg
->dmsg_voucher
= NULL
;
4094 if (slowpath(!dm
->dm_connect_handler_called
)) {
4095 _dispatch_mach_connect_invoke(dm
);
4097 _dispatch_client_callout4(dr
->dm_handler_ctxt
, reason
, dmsg
, err
,
4098 dr
->dm_handler_func
);
4099 _dispatch_thread_setspecific(dispatch_queue_key
, (dispatch_queue_t
)dm
);
4100 _dispatch_introspection_queue_item_complete(dmsg
);
4101 dispatch_release(dmsg
);
4106 _dispatch_mach_barrier_invoke(void *ctxt
)
4108 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
4109 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4110 struct dispatch_continuation_s
*dc
= ctxt
;
4111 void *context
= dc
->dc_data
;
4112 dispatch_function_t barrier
= dc
->dc_other
;
4113 bool send_barrier
= ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
);
4115 _dispatch_thread_setspecific(dispatch_queue_key
, dm
->do_targetq
);
4116 if (slowpath(!dm
->dm_connect_handler_called
)) {
4117 _dispatch_mach_connect_invoke(dm
);
4119 _dispatch_client_callout(context
, barrier
);
4120 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
4121 DISPATCH_MACH_BARRIER_COMPLETED
, NULL
, 0, dr
->dm_handler_func
);
4122 _dispatch_thread_setspecific(dispatch_queue_key
, (dispatch_queue_t
)dm
);
4124 (void)dispatch_atomic_dec2o(dm
->dm_refs
, dm_sending
, release
);
4130 dispatch_mach_send_barrier_f(dispatch_mach_t dm
, void *context
,
4131 dispatch_function_t barrier
)
4133 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4134 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
4135 dc
->dc_func
= _dispatch_mach_barrier_invoke
;
4137 dc
->dc_data
= context
;
4138 dc
->dc_other
= barrier
;
4139 _dispatch_continuation_voucher_set(dc
, 0);
4140 _dispatch_continuation_priority_set(dc
, 0, 0);
4142 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4143 if (slowpath(dr
->dm_tail
) || slowpath(!dispatch_atomic_cmpxchg2o(dr
,
4144 dm_sending
, 0, 1, acquire
))) {
4145 return _dispatch_mach_send_push(dm
, dc
);
4147 // leave send queue locked until barrier has completed
4148 return _dispatch_mach_push(dm
, dc
, dc
->dc_priority
);
4153 dispatch_mach_receive_barrier_f(dispatch_mach_t dm
, void *context
,
4154 dispatch_function_t barrier
)
4156 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4157 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
);
4158 dc
->dc_func
= _dispatch_mach_barrier_invoke
;
4160 dc
->dc_data
= context
;
4161 dc
->dc_other
= barrier
;
4162 _dispatch_continuation_voucher_set(dc
, 0);
4163 _dispatch_continuation_priority_set(dc
, 0, 0);
4165 return _dispatch_mach_push(dm
, dc
, dc
->dc_priority
);
4170 dispatch_mach_send_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
4172 dispatch_mach_send_barrier_f(dm
, _dispatch_Block_copy(barrier
),
4173 _dispatch_call_block_and_release
);
4178 dispatch_mach_receive_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
4180 dispatch_mach_receive_barrier_f(dm
, _dispatch_Block_copy(barrier
),
4181 _dispatch_call_block_and_release
);
4186 _dispatch_mach_cancel_invoke(dispatch_mach_t dm
)
4188 dispatch_mach_refs_t dr
= dm
->ds_refs
;
4189 if (slowpath(!dm
->dm_connect_handler_called
)) {
4190 _dispatch_mach_connect_invoke(dm
);
4192 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
4193 DISPATCH_MACH_CANCELED
, NULL
, 0, dr
->dm_handler_func
);
4194 dm
->dm_cancel_handler_called
= 1;
4195 _dispatch_release(dm
); // the retain is done at creation time
4200 dispatch_mach_cancel(dispatch_mach_t dm
)
4202 dispatch_source_cancel((dispatch_source_t
)dm
);
4205 DISPATCH_ALWAYS_INLINE
4206 static inline dispatch_queue_t
4207 _dispatch_mach_invoke2(dispatch_object_t dou
,
4208 _dispatch_thread_semaphore_t
*sema_ptr DISPATCH_UNUSED
)
4210 dispatch_mach_t dm
= dou
._dm
;
4212 // This function performs all mach channel actions. Each action is
4213 // responsible for verifying that it takes place on the appropriate queue.
4214 // If the current queue is not the correct queue for this action, the
4215 // correct queue will be returned and the invoke will be re-driven on that
4218 // The order of tests here in invoke and in probe should be consistent.
4220 dispatch_queue_t dq
= _dispatch_queue_get_current();
4221 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4223 if (slowpath(!dm
->ds_is_installed
)) {
4224 // The channel needs to be installed on the manager queue.
4225 if (dq
!= &_dispatch_mgr_q
) {
4226 return &_dispatch_mgr_q
;
4229 _dispatch_source_kevent_register((dispatch_source_t
)dm
);
4231 dm
->ds_is_installed
= true;
4232 _dispatch_mach_send(dm
);
4233 // Apply initial target queue change
4234 _dispatch_queue_drain(dou
);
4235 if (dm
->dq_items_tail
) {
4236 return dm
->do_targetq
;
4238 } else if (dm
->dq_items_tail
) {
4239 // The channel has pending messages to deliver to the target queue.
4240 if (dq
!= dm
->do_targetq
) {
4241 return dm
->do_targetq
;
4243 dispatch_queue_t tq
= dm
->do_targetq
;
4244 if (slowpath(_dispatch_queue_drain(dou
))) {
4245 DISPATCH_CLIENT_CRASH("Sync onto mach channel");
4247 if (slowpath(tq
!= dm
->do_targetq
)) {
4248 // An item on the channel changed the target queue
4249 return dm
->do_targetq
;
4251 } else if (dr
->dm_sending
) {
4252 // Sending and uninstallation below require the send lock, the channel
4253 // will be woken up when the lock is dropped <rdar://15132939&15203957>
4255 } else if (dr
->dm_tail
) {
4256 if (slowpath(dr
->dm_needs_mgr
) || (slowpath(dr
->dm_disconnect_cnt
) &&
4257 (dm
->dm_dkev
|| !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)))) {
4258 // Send/reply kevents need to be installed or uninstalled
4259 if (dq
!= &_dispatch_mgr_q
) {
4260 return &_dispatch_mgr_q
;
4263 if (!(dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) ||
4264 (dm
->ds_atomic_flags
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
4265 // The channel has pending messages to send.
4266 _dispatch_mach_send(dm
);
4268 } else if (dm
->ds_atomic_flags
& DSF_CANCELED
){
4269 // The channel has been cancelled and needs to be uninstalled from the
4270 // manager queue. After uninstallation, the cancellation handler needs
4271 // to be delivered to the target queue.
4272 if (dm
->ds_dkev
|| dm
->dm_dkev
|| dr
->dm_send
||
4273 !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
4274 if (dq
!= &_dispatch_mgr_q
) {
4275 return &_dispatch_mgr_q
;
4277 if (!_dispatch_mach_cancel(dm
)) {
4281 if (!dm
->dm_cancel_handler_called
) {
4282 if (dq
!= dm
->do_targetq
) {
4283 return dm
->do_targetq
;
4285 _dispatch_mach_cancel_invoke(dm
);
4293 _dispatch_mach_invoke(dispatch_mach_t dm
, dispatch_object_t dou
,
4294 dispatch_invoke_flags_t flags
)
4296 _dispatch_queue_class_invoke(dm
, dou
._dc
, flags
, _dispatch_mach_invoke2
);
4300 _dispatch_mach_probe(dispatch_mach_t dm
)
4302 // This function determines whether the mach channel needs to be invoked.
4303 // The order of tests here in probe and in invoke should be consistent.
4305 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
4307 if (slowpath(!dm
->ds_is_installed
)) {
4308 // The channel needs to be installed on the manager queue.
4310 } else if (_dispatch_queue_class_probe(dm
)) {
4311 // The source has pending messages to deliver to the target queue.
4313 } else if (dr
->dm_sending
) {
4314 // Sending and uninstallation below require the send lock, the channel
4315 // will be woken up when the lock is dropped <rdar://15132939&15203957>
4317 } else if (dr
->dm_tail
&&
4318 (!(dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) ||
4319 (dm
->ds_atomic_flags
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
)) {
4320 // The channel has pending messages to send.
4322 } else if (dm
->ds_atomic_flags
& DSF_CANCELED
) {
4323 if (dm
->ds_dkev
|| dm
->dm_dkev
|| dr
->dm_send
||
4324 !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
) ||
4325 !dm
->dm_cancel_handler_called
) {
4326 // The channel needs to be uninstalled from the manager queue, or
4327 // the cancellation handler needs to be delivered to the target
4337 #pragma mark dispatch_mach_msg_t
4340 dispatch_mach_msg_create(mach_msg_header_t
*msg
, size_t size
,
4341 dispatch_mach_msg_destructor_t destructor
, mach_msg_header_t
**msg_ptr
)
4343 if (slowpath(size
< sizeof(mach_msg_header_t
)) ||
4344 slowpath(destructor
&& !msg
)) {
4345 DISPATCH_CLIENT_CRASH("Empty message");
4347 dispatch_mach_msg_t dmsg
= _dispatch_alloc(DISPATCH_VTABLE(mach_msg
),
4348 sizeof(struct dispatch_mach_msg_s
) +
4349 (destructor
? 0 : size
- sizeof(dmsg
->dmsg_msg
)));
4351 dmsg
->dmsg_msg
= msg
;
4353 memcpy(dmsg
->dmsg_buf
, msg
, size
);
4355 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
4356 dmsg
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
4358 dmsg
->dmsg_destructor
= destructor
;
4359 dmsg
->dmsg_size
= size
;
4361 *msg_ptr
= _dispatch_mach_msg_get_msg(dmsg
);
4367 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg
)
4369 if (dmsg
->dmsg_voucher
) {
4370 _voucher_release(dmsg
->dmsg_voucher
);
4371 dmsg
->dmsg_voucher
= NULL
;
4373 switch (dmsg
->dmsg_destructor
) {
4374 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
4376 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE
:
4377 free(dmsg
->dmsg_msg
);
4379 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE
: {
4380 mach_vm_size_t vm_size
= dmsg
->dmsg_size
;
4381 mach_vm_address_t vm_addr
= (uintptr_t)dmsg
->dmsg_msg
;
4382 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
4388 static inline mach_msg_header_t
*
4389 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
)
4391 return dmsg
->dmsg_destructor
? dmsg
->dmsg_msg
:
4392 (mach_msg_header_t
*)dmsg
->dmsg_buf
;
4396 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
, size_t *size_ptr
)
4399 *size_ptr
= dmsg
->dmsg_size
;
4401 return _dispatch_mach_msg_get_msg(dmsg
);
4405 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg
, char* buf
, size_t bufsiz
)
4408 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
4409 dx_kind(dmsg
), dmsg
);
4410 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "xrefcnt = 0x%x, "
4411 "refcnt = 0x%x, ", dmsg
->do_xref_cnt
+ 1, dmsg
->do_ref_cnt
+ 1);
4412 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "opts/err = 0x%x, "
4413 "msgh[%p] = { ", dmsg
->do_suspend_cnt
, dmsg
->dmsg_buf
);
4414 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
4416 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "id 0x%x, ",
4419 if (hdr
->msgh_size
) {
4420 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "size %u, ",
4423 if (hdr
->msgh_bits
) {
4424 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "bits <l %u, r %u",
4425 MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
),
4426 MACH_MSGH_BITS_REMOTE(hdr
->msgh_bits
));
4427 if (MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
)) {
4428 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", o 0x%x",
4429 MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
));
4431 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ">, ");
4433 if (hdr
->msgh_local_port
&& hdr
->msgh_remote_port
) {
4434 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x, "
4435 "remote 0x%x", hdr
->msgh_local_port
, hdr
->msgh_remote_port
);
4436 } else if (hdr
->msgh_local_port
) {
4437 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x",
4438 hdr
->msgh_local_port
);
4439 } else if (hdr
->msgh_remote_port
) {
4440 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "remote 0x%x",
4441 hdr
->msgh_remote_port
);
4443 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "no ports");
4445 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, " } }");
4450 #pragma mark dispatch_mig_server
4453 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
,
4454 dispatch_mig_callback_t callback
)
4456 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
4457 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
4458 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
) | MACH_RCV_VOUCHER
;
4459 mach_msg_options_t tmp_options
;
4460 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
4461 mach_msg_return_t kr
= 0;
4462 uint64_t assertion_token
= 0;
4463 unsigned int cnt
= 1000; // do not stall out serial queues
4464 boolean_t demux_success
;
4465 bool received
= false;
4466 size_t rcv_size
= maxmsgsz
+ MAX_TRAILER_SIZE
;
4468 // XXX FIXME -- allocate these elsewhere
4469 bufRequest
= alloca(rcv_size
);
4470 bufReply
= alloca(rcv_size
);
4471 bufReply
->Head
.msgh_size
= 0;
4472 bufRequest
->RetCode
= 0;
4475 options
|= MACH_RCV_LARGE
; // rdar://problem/8422992
4477 tmp_options
= options
;
4478 // XXX FIXME -- change this to not starve out the target queue
4480 if (DISPATCH_OBJECT_SUSPENDED(ds
) || (--cnt
== 0)) {
4481 options
&= ~MACH_RCV_MSG
;
4482 tmp_options
&= ~MACH_RCV_MSG
;
4484 if (!(tmp_options
& MACH_SEND_MSG
)) {
4488 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
4489 (mach_msg_size_t
)rcv_size
, (mach_port_t
)ds
->ds_ident_hack
, 0,0);
4491 tmp_options
= options
;
4495 case MACH_SEND_INVALID_DEST
:
4496 case MACH_SEND_TIMED_OUT
:
4497 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
4498 mach_msg_destroy(&bufReply
->Head
);
4501 case MACH_RCV_TIMED_OUT
:
4502 // Don't return an error if a message was sent this time or
4503 // a message was successfully received previously
4504 // rdar://problems/7363620&7791738
4505 if(bufReply
->Head
.msgh_remote_port
|| received
) {
4506 kr
= MACH_MSG_SUCCESS
;
4509 case MACH_RCV_INVALID_NAME
:
4512 case MACH_RCV_TOO_LARGE
:
4513 // receive messages that are too large and log their id and size
4514 // rdar://problem/8422992
4515 tmp_options
&= ~MACH_RCV_LARGE
;
4516 size_t large_size
= bufReply
->Head
.msgh_size
+ MAX_TRAILER_SIZE
;
4517 void *large_buf
= malloc(large_size
);
4519 rcv_size
= large_size
;
4520 bufReply
= large_buf
;
4522 if (!mach_msg(&bufReply
->Head
, tmp_options
, 0,
4523 (mach_msg_size_t
)rcv_size
,
4524 (mach_port_t
)ds
->ds_ident_hack
, 0, 0)) {
4525 _dispatch_log("BUG in libdispatch client: "
4526 "dispatch_mig_server received message larger than "
4527 "requested size %zd: id = 0x%x, size = %d",
4528 maxmsgsz
, bufReply
->Head
.msgh_id
,
4529 bufReply
->Head
.msgh_size
);
4537 _dispatch_bug_mach_client(
4538 "dispatch_mig_server: mach_msg() failed", kr
);
4544 if (!(tmp_options
& MACH_RCV_MSG
)) {
4548 if (assertion_token
) {
4549 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4550 int r
= proc_importance_assertion_complete(assertion_token
);
4551 (void)dispatch_assume_zero(r
);
4553 assertion_token
= 0;
4557 bufTemp
= bufRequest
;
4558 bufRequest
= bufReply
;
4561 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4562 #pragma clang diagnostic push
4563 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
4564 int r
= proc_importance_assertion_begin_with_msg(&bufRequest
->Head
,
4565 NULL
, &assertion_token
);
4566 if (r
&& slowpath(r
!= EIO
)) {
4567 (void)dispatch_assume_zero(r
);
4569 #pragma clang diagnostic pop
4571 _voucher_replace(voucher_create_with_mach_msg(&bufRequest
->Head
));
4572 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
4574 if (!demux_success
) {
4575 // destroy the request - but not the reply port
4576 bufRequest
->Head
.msgh_remote_port
= 0;
4577 mach_msg_destroy(&bufRequest
->Head
);
4578 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
4579 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4581 if (slowpath(bufReply
->RetCode
)) {
4582 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
4586 // destroy the request - but not the reply port
4587 bufRequest
->Head
.msgh_remote_port
= 0;
4588 mach_msg_destroy(&bufRequest
->Head
);
4592 if (bufReply
->Head
.msgh_remote_port
) {
4593 tmp_options
|= MACH_SEND_MSG
;
4594 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) !=
4595 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
4596 tmp_options
|= MACH_SEND_TIMEOUT
;
4602 if (assertion_token
) {
4603 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4604 int r
= proc_importance_assertion_complete(assertion_token
);
4605 (void)dispatch_assume_zero(r
);
4612 #endif /* HAVE_MACH */
4615 #pragma mark dispatch_source_debug
4619 _evfiltstr(short filt
)
4622 #define _evfilt2(f) case (f): return #f
4623 _evfilt2(EVFILT_READ
);
4624 _evfilt2(EVFILT_WRITE
);
4625 _evfilt2(EVFILT_AIO
);
4626 _evfilt2(EVFILT_VNODE
);
4627 _evfilt2(EVFILT_PROC
);
4628 _evfilt2(EVFILT_SIGNAL
);
4629 _evfilt2(EVFILT_TIMER
);
4631 _evfilt2(EVFILT_MACHPORT
);
4632 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION
);
4634 _evfilt2(EVFILT_FS
);
4635 _evfilt2(EVFILT_USER
);
4637 _evfilt2(EVFILT_VM
);
4640 _evfilt2(EVFILT_SOCK
);
4642 #ifdef EVFILT_MEMORYSTATUS
4643 _evfilt2(EVFILT_MEMORYSTATUS
);
4646 _evfilt2(DISPATCH_EVFILT_TIMER
);
4647 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
4648 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
4650 return "EVFILT_missing";
4656 _evflagstr2(uint16_t *flagsp
)
4658 #define _evflag2(f) \
4659 if ((*flagsp & (f)) == (f) && (f)) { \
4664 _evflag2(EV_DELETE
);
4665 _evflag2(EV_ENABLE
);
4666 _evflag2(EV_DISABLE
);
4667 _evflag2(EV_ONESHOT
);
4669 _evflag2(EV_RECEIPT
);
4670 _evflag2(EV_DISPATCH
);
4671 _evflag2(EV_UDATA_SPECIFIC
);
4673 _evflag2(EV_OOBAND
);
4677 return "EV_UNKNOWN ";
4682 _evflagstr(uint16_t flags
, char *str
, size_t strsize
)
4686 strlcat(str
, _evflagstr2(&flags
), strsize
);
4688 size_t sz
= strlen(str
);
4689 if (sz
) str
[sz
-1] = 0;
4695 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4697 dispatch_queue_t target
= ds
->do_targetq
;
4698 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%lx, "
4699 "mask = 0x%lx, pending_data = 0x%lx, registered = %d, "
4700 "armed = %d, deleted = %d%s%s, canceled = %d, needs_mgr = %d, ",
4701 target
&& target
->dq_label
? target
->dq_label
: "", target
,
4702 ds
->ds_ident_hack
, ds
->ds_pending_data_mask
, ds
->ds_pending_data
,
4703 ds
->ds_is_installed
, (bool)(ds
->ds_atomic_flags
& DSF_ARMED
),
4704 (bool)(ds
->ds_atomic_flags
& DSF_DELETED
), ds
->ds_pending_delete
?
4705 " (pending)" : "", (ds
->ds_atomic_flags
& DSF_ONESHOT
) ?
4706 " (oneshot)" : "", (bool)(ds
->ds_atomic_flags
& DSF_CANCELED
),
4711 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4713 dispatch_source_refs_t dr
= ds
->ds_refs
;
4714 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx,"
4715 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4716 ds_timer(dr
).target
, ds_timer(dr
).deadline
, ds_timer(dr
).last_fire
,
4717 ds_timer(dr
).interval
, ds_timer(dr
).flags
);
4721 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4724 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
4726 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4727 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4728 if (ds
->ds_is_timer
) {
4729 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4731 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "kevent = %p%s, "
4732 "filter = %s }", ds
->ds_dkev
, ds
->ds_is_direct_kevent
? " (direct)"
4733 : "", ds
->ds_dkev
? _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
) :
4739 _dispatch_mach_debug_attr(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
4741 dispatch_queue_t target
= dm
->do_targetq
;
4742 return dsnprintf(buf
, bufsiz
, "target = %s[%p], receive = 0x%x, "
4743 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4744 "sending = %d, disconnected = %d, canceled = %d ",
4745 target
&& target
->dq_label
? target
->dq_label
: "", target
,
4746 dm
->ds_dkev
?(mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
:0,
4747 dm
->dm_refs
->dm_send
,
4748 dm
->dm_dkev
?(mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
:0,
4749 dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
) ?
4750 " (armed)" : "", dm
->dm_refs
->dm_checkin_port
,
4751 dm
->dm_refs
->dm_checkin
? " (pending)" : "",
4752 dm
->dm_refs
->dm_sending
, dm
->dm_refs
->dm_disconnect_cnt
,
4753 (bool)(dm
->ds_atomic_flags
& DSF_CANCELED
));
4756 _dispatch_mach_debug(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
4759 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
4760 dm
->dq_label
&& !dm
->dm_cancel_handler_called
? dm
->dq_label
:
4762 offset
+= _dispatch_object_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
4763 offset
+= _dispatch_mach_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
4764 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
4771 _dispatch_kevent_debug(const _dispatch_kevent_qos_s
* kev
, const char* str
)
4774 _dispatch_debug("kevent[%p] = { ident = 0x%llx, filter = %s, "
4775 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
4776 "ext[0] = 0x%llx, ext[1] = 0x%llx }: %s", kev
, kev
->ident
,
4777 _evfiltstr(kev
->filter
), _evflagstr(kev
->flags
, flagstr
,
4778 sizeof(flagstr
)), kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
,
4779 kev
->ext
[0], kev
->ext
[1], str
);
4783 _dispatch_kevent_debugger2(void *context
)
4786 socklen_t sa_len
= sizeof(sa
);
4787 int c
, fd
= (int)(long)context
;
4789 dispatch_kevent_t dk
;
4790 dispatch_source_t ds
;
4791 dispatch_source_refs_t dr
;
4794 c
= accept(fd
, &sa
, &sa_len
);
4796 if (errno
!= EAGAIN
) {
4797 (void)dispatch_assume_zero(errno
);
4802 int r
= fcntl(c
, F_SETFL
, 0); // disable non-blocking IO
4804 (void)dispatch_assume_zero(errno
);
4807 debug_stream
= fdopen(c
, "a");
4808 if (!dispatch_assume(debug_stream
)) {
4813 fprintf(debug_stream
, "HTTP/1.0 200 OK\r\n");
4814 fprintf(debug_stream
, "Content-type: text/html\r\n");
4815 fprintf(debug_stream
, "Pragma: nocache\r\n");
4816 fprintf(debug_stream
, "\r\n");
4817 fprintf(debug_stream
, "<html>\n");
4818 fprintf(debug_stream
, "<head><title>PID %u</title></head>\n", getpid());
4819 fprintf(debug_stream
, "<body>\n<ul>\n");
4821 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4822 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4824 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
4825 if (TAILQ_EMPTY(&_dispatch_sources
[i
])) {
4828 TAILQ_FOREACH(dk
, &_dispatch_sources
[i
], dk_list
) {
4829 fprintf(debug_stream
, "\t<br><li>DK %p ident %lu filter %s flags "
4830 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
4831 dk
, (unsigned long)dk
->dk_kevent
.ident
,
4832 _evfiltstr(dk
->dk_kevent
.filter
), dk
->dk_kevent
.flags
,
4833 dk
->dk_kevent
.fflags
, (unsigned long)dk
->dk_kevent
.data
,
4834 (void*)dk
->dk_kevent
.udata
);
4835 fprintf(debug_stream
, "\t\t<ul>\n");
4836 TAILQ_FOREACH(dr
, &dk
->dk_sources
, dr_list
) {
4837 ds
= _dispatch_source_from_refs(dr
);
4838 fprintf(debug_stream
, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4839 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4840 ds
, ds
->do_ref_cnt
+ 1, ds
->do_suspend_cnt
,
4841 ds
->ds_pending_data
, ds
->ds_pending_data_mask
,
4842 ds
->ds_atomic_flags
);
4843 if (ds
->do_suspend_cnt
== DISPATCH_OBJECT_SUSPEND_LOCK
) {
4844 dispatch_queue_t dq
= ds
->do_targetq
;
4845 fprintf(debug_stream
, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4846 "0x%x label: %s\n", dq
, dq
->do_ref_cnt
+ 1,
4847 dq
->do_suspend_cnt
, dq
->dq_label
? dq
->dq_label
:"");
4850 fprintf(debug_stream
, "\t\t</ul>\n");
4851 fprintf(debug_stream
, "\t</li>\n");
4854 fprintf(debug_stream
, "</ul>\n</body>\n</html>\n");
4855 fflush(debug_stream
);
4856 fclose(debug_stream
);
4860 _dispatch_kevent_debugger2_cancel(void *context
)
4862 int ret
, fd
= (int)(long)context
;
4866 (void)dispatch_assume_zero(errno
);
4871 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED
)
4874 struct sockaddr_in sa_in
;
4878 .sin_family
= AF_INET
,
4879 .sin_addr
= { htonl(INADDR_LOOPBACK
), },
4882 dispatch_source_t ds
;
4884 int val
, r
, fd
, sock_opt
= 1;
4885 socklen_t slen
= sizeof(sa_u
);
4890 valstr
= getenv("LIBDISPATCH_DEBUGGER");
4896 sa_u
.sa_in
.sin_addr
.s_addr
= 0;
4898 fd
= socket(PF_INET
, SOCK_STREAM
, 0);
4900 (void)dispatch_assume_zero(errno
);
4903 r
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&sock_opt
,
4904 (socklen_t
) sizeof sock_opt
);
4906 (void)dispatch_assume_zero(errno
);
4910 r
= fcntl(fd
, F_SETFL
, O_NONBLOCK
);
4912 (void)dispatch_assume_zero(errno
);
4916 r
= bind(fd
, &sa_u
.sa
, sizeof(sa_u
));
4918 (void)dispatch_assume_zero(errno
);
4921 r
= listen(fd
, SOMAXCONN
);
4923 (void)dispatch_assume_zero(errno
);
4926 r
= getsockname(fd
, &sa_u
.sa
, &slen
);
4928 (void)dispatch_assume_zero(errno
);
4932 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, (uintptr_t)fd
, 0,
4934 if (dispatch_assume(ds
)) {
4935 _dispatch_log("LIBDISPATCH: debug port: %hu",
4936 (in_port_t
)ntohs(sa_u
.sa_in
.sin_port
));
4938 /* ownership of fd transfers to ds */
4939 dispatch_set_context(ds
, (void *)(long)fd
);
4940 dispatch_source_set_event_handler_f(ds
, _dispatch_kevent_debugger2
);
4941 dispatch_source_set_cancel_handler_f(ds
,
4942 _dispatch_kevent_debugger2_cancel
);
4943 dispatch_resume(ds
);
4953 #ifndef MACH_PORT_TYPE_SPREQUEST
4954 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4959 dispatch_debug_machport(mach_port_t name
, const char* str
)
4961 mach_port_type_t type
;
4962 mach_msg_bits_t ns
= 0, nr
= 0, nso
= 0, nd
= 0;
4963 unsigned int dnreqs
= 0, dnrsiz
;
4964 kern_return_t kr
= mach_port_type(mach_task_self(), name
, &type
);
4966 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name
,
4967 kr
, mach_error_string(kr
), str
);
4970 if (type
& MACH_PORT_TYPE_SEND
) {
4971 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4972 MACH_PORT_RIGHT_SEND
, &ns
));
4974 if (type
& MACH_PORT_TYPE_SEND_ONCE
) {
4975 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4976 MACH_PORT_RIGHT_SEND_ONCE
, &nso
));
4978 if (type
& MACH_PORT_TYPE_DEAD_NAME
) {
4979 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4980 MACH_PORT_RIGHT_DEAD_NAME
, &nd
));
4982 if (type
& (MACH_PORT_TYPE_RECEIVE
|MACH_PORT_TYPE_SEND
)) {
4983 kr
= mach_port_dnrequest_info(mach_task_self(), name
, &dnrsiz
, &dnreqs
);
4984 if (kr
!= KERN_INVALID_RIGHT
) (void)dispatch_assume_zero(kr
);
4986 if (type
& MACH_PORT_TYPE_RECEIVE
) {
4987 mach_port_status_t status
= { .mps_pset
= 0, };
4988 mach_msg_type_number_t cnt
= MACH_PORT_RECEIVE_STATUS_COUNT
;
4989 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4990 MACH_PORT_RIGHT_RECEIVE
, &nr
));
4991 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4992 name
, MACH_PORT_RECEIVE_STATUS
, (void*)&status
, &cnt
));
4993 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4994 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4995 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4996 "seqno(%03u) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
4997 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N",
4998 status
.mps_nsrequest
? "Y":"N", status
.mps_pdrequest
? "Y":"N",
4999 status
.mps_srights
? "Y":"N", status
.mps_sorights
,
5000 status
.mps_qlimit
, status
.mps_msgcount
, status
.mps_mscount
,
5001 status
.mps_seqno
, str
);
5002 } else if (type
& (MACH_PORT_TYPE_SEND
|MACH_PORT_TYPE_SEND_ONCE
|
5003 MACH_PORT_TYPE_DEAD_NAME
)) {
5004 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
5005 "dnreqs(%03u) spreq(%s) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
5006 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N", str
);
5008 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name
, type
,
5015 #endif // DISPATCH_DEBUG