2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 static void _dispatch_source_handler_free(dispatch_source_t ds
, long kind
);
24 static void _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
);
26 #define DISPATCH_TIMERS_UNREGISTER 0x1
27 #define DISPATCH_TIMERS_RETAIN_2 0x2
28 static void _dispatch_timers_update(dispatch_unote_t du
, uint32_t flags
);
29 static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt
);
31 static void _dispatch_source_timer_configure(dispatch_source_t ds
);
32 static inline unsigned long _dispatch_source_timer_data(
33 dispatch_source_t ds
, dispatch_unote_t du
);
36 #pragma mark dispatch_source_t
39 dispatch_source_create(dispatch_source_type_t dst
, uintptr_t handle
,
40 unsigned long mask
, dispatch_queue_t dq
)
42 dispatch_source_refs_t dr
;
45 dr
= dux_create(dst
, handle
, mask
)._dr
;
47 return DISPATCH_BAD_INPUT
;
50 ds
= _dispatch_object_alloc(DISPATCH_VTABLE(source
),
51 sizeof(struct dispatch_source_s
));
52 // Initialize as a queue first, then override some settings below.
53 _dispatch_queue_init(ds
->_as_dq
, DQF_LEGACY
, 1,
54 DISPATCH_QUEUE_INACTIVE
| DISPATCH_QUEUE_ROLE_INNER
);
55 ds
->dq_label
= "source";
56 ds
->do_ref_cnt
++; // the reference the manager queue holds
58 dr
->du_owner_wref
= _dispatch_ptr2wref(ds
);
61 dq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, true);
63 _dispatch_retain((dispatch_queue_t _Nonnull
)dq
);
66 if (dr
->du_is_timer
&& (dr
->du_fflags
& DISPATCH_TIMER_INTERVAL
)) {
67 _dispatch_source_set_interval(ds
, handle
);
69 _dispatch_object_debug(ds
, "%s", __func__
);
74 _dispatch_source_dispose(dispatch_source_t ds
, bool *allow_free
)
76 _dispatch_object_debug(ds
, "%s", __func__
);
77 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
78 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
79 _dispatch_source_handler_free(ds
, DS_CANCEL_HANDLER
);
80 _dispatch_unote_dispose(ds
->ds_refs
);
82 _dispatch_queue_destroy(ds
->_as_dq
, allow_free
);
86 _dispatch_source_xref_dispose(dispatch_source_t ds
)
88 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
89 if (unlikely(!(dqf
& (DQF_LEGACY
|DSF_CANCELED
)))) {
90 DISPATCH_CLIENT_CRASH(ds
, "Release of a source that has not been "
91 "cancelled, but has a mandatory cancel handler");
93 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
);
97 dispatch_source_testcancel(dispatch_source_t ds
)
99 return (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
);
103 dispatch_source_get_mask(dispatch_source_t ds
)
105 dispatch_source_refs_t dr
= ds
->ds_refs
;
106 if (ds
->dq_atomic_flags
& DSF_CANCELED
) {
109 #if DISPATCH_USE_MEMORYSTATUS
110 if (dr
->du_vmpressure_override
) {
111 return NOTE_VM_PRESSURE
;
113 #if TARGET_IPHONE_SIMULATOR
114 if (dr
->du_memorypressure_override
) {
115 return NOTE_MEMORYSTATUS_PRESSURE_WARN
;
118 #endif // DISPATCH_USE_MEMORYSTATUS
119 return dr
->du_fflags
;
123 dispatch_source_get_handle(dispatch_source_t ds
)
125 dispatch_source_refs_t dr
= ds
->ds_refs
;
126 #if TARGET_IPHONE_SIMULATOR
127 if (dr
->du_memorypressure_override
) {
135 dispatch_source_get_data(dispatch_source_t ds
)
137 #if DISPATCH_USE_MEMORYSTATUS
138 dispatch_source_refs_t dr
= ds
->ds_refs
;
139 if (dr
->du_vmpressure_override
) {
140 return NOTE_VM_PRESSURE
;
142 #if TARGET_IPHONE_SIMULATOR
143 if (dr
->du_memorypressure_override
) {
144 return NOTE_MEMORYSTATUS_PRESSURE_WARN
;
147 #endif // DISPATCH_USE_MEMORYSTATUS
148 uint64_t value
= os_atomic_load2o(ds
, ds_data
, relaxed
);
149 return (unsigned long)(
150 ds
->ds_refs
->du_data_action
== DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
151 ? DISPATCH_SOURCE_GET_DATA(value
) : value
);
155 dispatch_source_get_extended_data(dispatch_source_t ds
,
156 dispatch_source_extended_data_t edata
, size_t size
)
158 size_t target_size
= MIN(size
,
159 sizeof(struct dispatch_source_extended_data_s
));
161 unsigned long data
, status
= 0;
162 if (ds
->ds_refs
->du_data_action
163 == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
) {
164 uint64_t combined
= os_atomic_load(&ds
->ds_data
, relaxed
);
165 data
= DISPATCH_SOURCE_GET_DATA(combined
);
166 status
= DISPATCH_SOURCE_GET_STATUS(combined
);
168 data
= dispatch_source_get_data(ds
);
170 if (size
>= offsetof(struct dispatch_source_extended_data_s
, data
)
171 + sizeof(edata
->data
)) {
174 if (size
>= offsetof(struct dispatch_source_extended_data_s
, status
)
175 + sizeof(edata
->status
)) {
176 edata
->status
= status
;
178 if (size
> sizeof(struct dispatch_source_extended_data_s
)) {
180 (char *)edata
+ sizeof(struct dispatch_source_extended_data_s
),
181 0, size
- sizeof(struct dispatch_source_extended_data_s
));
189 _dispatch_source_merge_data(dispatch_source_t ds
, pthread_priority_t pp
,
192 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
193 int filter
= ds
->ds_refs
->du_filter
;
195 if (unlikely(dqf
& (DSF_CANCELED
| DSF_DELETED
))) {
200 case DISPATCH_EVFILT_CUSTOM_ADD
:
201 os_atomic_add2o(ds
, ds_pending_data
, val
, relaxed
);
203 case DISPATCH_EVFILT_CUSTOM_OR
:
204 os_atomic_or2o(ds
, ds_pending_data
, val
, relaxed
);
206 case DISPATCH_EVFILT_CUSTOM_REPLACE
:
207 os_atomic_store2o(ds
, ds_pending_data
, val
, relaxed
);
210 DISPATCH_CLIENT_CRASH(filter
, "Invalid source type");
213 dx_wakeup(ds
, _dispatch_qos_from_pp(pp
), DISPATCH_WAKEUP_MAKE_DIRTY
);
217 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
219 _dispatch_source_merge_data(ds
, 0, val
);
223 #pragma mark dispatch_source_handler
225 DISPATCH_ALWAYS_INLINE
226 static inline dispatch_continuation_t
227 _dispatch_source_get_handler(dispatch_source_refs_t dr
, long kind
)
229 return os_atomic_load(&dr
->ds_handler
[kind
], relaxed
);
231 #define _dispatch_source_get_event_handler(dr) \
232 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
233 #define _dispatch_source_get_cancel_handler(dr) \
234 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
235 #define _dispatch_source_get_registration_handler(dr) \
236 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
238 DISPATCH_ALWAYS_INLINE
239 static inline dispatch_continuation_t
240 _dispatch_source_handler_alloc(dispatch_source_t ds
, void *func
, long kind
,
243 // sources don't propagate priority by default
244 const dispatch_block_flags_t flags
=
245 DISPATCH_BLOCK_HAS_PRIORITY
| DISPATCH_BLOCK_NO_VOUCHER
;
246 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
248 uintptr_t dc_flags
= 0;
250 if (kind
!= DS_EVENT_HANDLER
) {
251 dc_flags
|= DISPATCH_OBJ_CONSUME_BIT
;
255 _dispatch_continuation_init(dc
, ds
, func
, 0, flags
, dc_flags
);
256 #endif /* __BLOCKS__ */
258 dc_flags
|= DISPATCH_OBJ_CTXT_FETCH_BIT
;
259 _dispatch_continuation_init_f(dc
, ds
, ds
->do_ctxt
, func
,
262 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
272 _dispatch_source_handler_dispose(dispatch_continuation_t dc
)
275 if (dc
->dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
276 Block_release(dc
->dc_ctxt
);
278 #endif /* __BLOCKS__ */
279 if (dc
->dc_voucher
) {
280 _voucher_release(dc
->dc_voucher
);
281 dc
->dc_voucher
= VOUCHER_INVALID
;
283 _dispatch_continuation_free(dc
);
286 DISPATCH_ALWAYS_INLINE
287 static inline dispatch_continuation_t
288 _dispatch_source_handler_take(dispatch_source_t ds
, long kind
)
290 return os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], NULL
, relaxed
);
293 DISPATCH_ALWAYS_INLINE
295 _dispatch_source_handler_free(dispatch_source_t ds
, long kind
)
297 dispatch_continuation_t dc
= _dispatch_source_handler_take(ds
, kind
);
298 if (dc
) _dispatch_source_handler_dispose(dc
);
301 DISPATCH_ALWAYS_INLINE
303 _dispatch_source_handler_replace(dispatch_source_t ds
, long kind
,
304 dispatch_continuation_t dc
)
307 _dispatch_continuation_free(dc
);
309 } else if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
310 dc
->dc_ctxt
= ds
->do_ctxt
;
312 dc
= os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], dc
, release
);
313 if (dc
) _dispatch_source_handler_dispose(dc
);
318 _dispatch_source_set_handler_slow(void *context
)
320 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
321 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
323 dispatch_continuation_t dc
= context
;
324 long kind
= (long)dc
->dc_data
;
326 _dispatch_source_handler_replace(ds
, kind
, dc
);
331 _dispatch_source_set_handler(dispatch_source_t ds
, long kind
,
332 dispatch_continuation_t dc
)
334 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
335 if (_dispatch_queue_try_inactive_suspend(ds
->_as_dq
)) {
336 _dispatch_source_handler_replace(ds
, kind
, dc
);
337 return dx_vtable(ds
)->do_resume(ds
, false);
339 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
340 DISPATCH_CLIENT_CRASH(kind
, "Cannot change a handler of this source "
341 "after it has been activated");
343 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation
, ds
);
344 if (kind
== DS_REGISTN_HANDLER
) {
345 _dispatch_bug_deprecated("Setting registration handler after "
346 "the source has been activated");
348 dc
->dc_data
= (void *)kind
;
349 _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, dc
,
350 _dispatch_source_set_handler_slow
, 0);
355 dispatch_source_set_event_handler(dispatch_source_t ds
,
356 dispatch_block_t handler
)
358 dispatch_continuation_t dc
;
359 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, true);
360 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
362 #endif /* __BLOCKS__ */
365 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
366 dispatch_function_t handler
)
368 dispatch_continuation_t dc
;
369 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
370 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
376 _dispatch_source_set_cancel_handler(dispatch_source_t ds
,
377 dispatch_block_t handler
)
379 dispatch_continuation_t dc
;
380 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, true);
381 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
385 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
386 dispatch_block_t handler
)
388 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
389 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
392 return _dispatch_source_set_cancel_handler(ds
, handler
);
396 dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds
,
397 dispatch_block_t handler
)
399 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DQF_LEGACY
);
400 return _dispatch_source_set_cancel_handler(ds
, handler
);
402 #endif /* __BLOCKS__ */
406 _dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
407 dispatch_function_t handler
)
409 dispatch_continuation_t dc
;
410 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, false);
411 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
415 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
416 dispatch_function_t handler
)
418 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
419 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
422 return _dispatch_source_set_cancel_handler_f(ds
, handler
);
426 dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds
,
427 dispatch_function_t handler
)
429 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DQF_LEGACY
);
430 return _dispatch_source_set_cancel_handler_f(ds
, handler
);
435 dispatch_source_set_registration_handler(dispatch_source_t ds
,
436 dispatch_block_t handler
)
438 dispatch_continuation_t dc
;
439 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, true);
440 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
442 #endif /* __BLOCKS__ */
445 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
446 dispatch_function_t handler
)
448 dispatch_continuation_t dc
;
449 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, false);
450 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
454 #pragma mark dispatch_source_invoke
457 _dispatch_source_will_reenable_kevent_4NW(dispatch_source_t ds
)
459 uint64_t dq_state
= os_atomic_load2o(ds
, dq_state
, relaxed
);
460 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
462 if (unlikely(!_dq_state_drain_locked_by_self(dq_state
))) {
463 DISPATCH_CLIENT_CRASH(0, "_dispatch_source_will_reenable_kevent_4NW "
464 "not called from within the event handler");
467 return _dispatch_unote_needs_rearm(ds
->ds_refs
) && !(dqf
& DSF_ARMED
);
471 _dispatch_source_registration_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
472 dispatch_invoke_flags_t flags
)
474 dispatch_continuation_t dc
;
476 dc
= _dispatch_source_handler_take(ds
, DS_REGISTN_HANDLER
);
477 if (ds
->dq_atomic_flags
& (DSF_CANCELED
| DQF_RELEASED
)) {
478 // no registration callout if source is canceled rdar://problem/8955246
479 return _dispatch_source_handler_dispose(dc
);
481 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
482 dc
->dc_ctxt
= ds
->do_ctxt
;
484 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
488 _dispatch_source_cancel_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
489 dispatch_invoke_flags_t flags
)
491 dispatch_continuation_t dc
;
493 dc
= _dispatch_source_handler_take(ds
, DS_CANCEL_HANDLER
);
494 ds
->ds_pending_data
= 0;
496 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
497 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
501 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
502 return _dispatch_source_handler_dispose(dc
);
504 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
505 dc
->dc_ctxt
= ds
->do_ctxt
;
507 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
511 _dispatch_source_latch_and_call(dispatch_source_t ds
, dispatch_queue_t cq
,
512 dispatch_invoke_flags_t flags
)
514 dispatch_source_refs_t dr
= ds
->ds_refs
;
515 dispatch_continuation_t dc
= _dispatch_source_get_handler(dr
, DS_EVENT_HANDLER
);
518 if (dr
->du_is_timer
&& !(dr
->du_fflags
& DISPATCH_TIMER_AFTER
)) {
519 prev
= _dispatch_source_timer_data(ds
, dr
);
521 prev
= os_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
523 if (dr
->du_data_action
== DISPATCH_UNOTE_ACTION_DATA_SET
) {
528 if (!dispatch_assume(prev
!= 0) || !dc
) {
531 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
532 if (dr
->du_is_timer
&& (dr
->du_fflags
& DISPATCH_TIMER_AFTER
)) {
533 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
534 dispatch_release(ds
); // dispatch_after sources are one-shot
540 _dispatch_source_refs_finalize_unregistration(dispatch_source_t ds
)
542 dispatch_queue_flags_t dqf
;
543 dispatch_source_refs_t dr
= ds
->ds_refs
;
545 dqf
= _dispatch_queue_atomic_flags_set_and_clear_orig(ds
->_as_dq
,
546 DSF_DELETED
, DSF_ARMED
| DSF_DEFERRED_DELETE
| DSF_CANCEL_WAITER
);
547 if (dqf
& DSF_CANCEL_WAITER
) {
548 _dispatch_wake_by_address(&ds
->dq_atomic_flags
);
550 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dr
);
551 _dispatch_release_tailcall(ds
); // the retain is done at creation time
555 _dispatch_source_refs_unregister(dispatch_source_t ds
, uint32_t options
)
557 _dispatch_object_debug(ds
, "%s", __func__
);
558 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
559 dispatch_source_refs_t dr
= ds
->ds_refs
;
561 if (dr
->du_is_timer
) {
562 // Because of the optimization to unregister fired oneshot timers
563 // from the target queue, we can't trust _dispatch_unote_registered()
564 // to tell the truth, it may not have happened yet
565 if (dqf
& DSF_ARMED
) {
566 _dispatch_timers_unregister(ds
->ds_timer_refs
);
567 _dispatch_release_2(ds
);
569 dr
->du_ident
= DISPATCH_TIMER_IDENT_CANCELED
;
571 if (_dispatch_unote_needs_rearm(dr
) && !(dqf
& DSF_ARMED
)) {
572 options
|= DU_UNREGISTER_IMMEDIATE_DELETE
;
574 if (!_dispatch_unote_unregister(dr
, options
)) {
575 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
577 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
578 return; // deferred unregistration
582 ds
->ds_is_installed
= true;
583 _dispatch_source_refs_finalize_unregistration(ds
);
586 DISPATCH_ALWAYS_INLINE
588 _dispatch_source_tryarm(dispatch_source_t ds
)
590 dispatch_queue_flags_t oqf
, nqf
;
591 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
592 if (oqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
)) {
593 // the test is inside the loop because it's convenient but the
594 // result should not change for the duration of the rmw_loop
595 os_atomic_rmw_loop_give_up(break);
597 nqf
= oqf
| DSF_ARMED
;
601 DISPATCH_ALWAYS_INLINE
603 _dispatch_source_refs_resume(dispatch_source_t ds
)
605 dispatch_source_refs_t dr
= ds
->ds_refs
;
606 if (dr
->du_is_timer
) {
607 _dispatch_timers_update(dr
, 0);
610 if (unlikely(!_dispatch_source_tryarm(ds
))) {
613 _dispatch_unote_resume(dr
);
614 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
, dr
);
619 _dispatch_source_refs_register(dispatch_source_t ds
, dispatch_wlh_t wlh
,
620 dispatch_priority_t pri
)
622 dispatch_source_refs_t dr
= ds
->ds_refs
;
623 dispatch_priority_t kbp
;
625 dispatch_assert(!ds
->ds_is_installed
);
627 if (dr
->du_is_timer
) {
628 dispatch_queue_t dq
= ds
->_as_dq
;
629 kbp
= _dispatch_queue_compute_priority_and_wlh(dq
, NULL
);
630 // aggressively coalesce background/maintenance QoS timers
631 // <rdar://problem/12200216&27342536>
632 if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp
))) {
633 if (dr
->du_fflags
& DISPATCH_TIMER_STRICT
) {
634 _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer
, ds
);
636 dr
->du_fflags
|= DISPATCH_TIMER_BACKGROUND
;
637 dr
->du_ident
= _dispatch_source_timer_idx(dr
);
640 _dispatch_timers_update(dr
, 0);
644 if (unlikely(!_dispatch_source_tryarm(ds
) ||
645 !_dispatch_unote_register(dr
, wlh
, pri
))) {
646 // Do the parts of dispatch_source_refs_unregister() that
647 // are required after this partial initialization.
648 _dispatch_source_refs_finalize_unregistration(ds
);
650 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, dr
);
652 _dispatch_object_debug(ds
, "%s", __func__
);
656 _dispatch_source_set_event_handler_context(void *ctxt
)
658 dispatch_source_t ds
= ctxt
;
659 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
661 if (dc
&& (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
)) {
662 dc
->dc_ctxt
= ds
->do_ctxt
;
666 DISPATCH_ALWAYS_INLINE
668 _dispatch_source_install(dispatch_source_t ds
, dispatch_wlh_t wlh
,
669 dispatch_priority_t pri
)
671 _dispatch_source_refs_register(ds
, wlh
, pri
);
672 ds
->ds_is_installed
= true;
676 _dispatch_source_finalize_activation(dispatch_source_t ds
, bool *allow_resume
)
678 dispatch_continuation_t dc
;
679 dispatch_source_refs_t dr
= ds
->ds_refs
;
680 dispatch_priority_t pri
;
683 if (unlikely(dr
->du_is_direct
&&
684 (_dispatch_queue_atomic_flags(ds
->_as_dq
) & DSF_CANCELED
))) {
685 return _dispatch_source_refs_unregister(ds
, 0);
688 dc
= _dispatch_source_get_event_handler(dr
);
690 if (_dispatch_object_is_barrier(dc
)) {
691 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DQF_BARRIER_BIT
);
693 ds
->dq_priority
= _dispatch_priority_from_pp_strip_flags(dc
->dc_priority
);
694 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
695 _dispatch_barrier_async_detached_f(ds
->_as_dq
, ds
,
696 _dispatch_source_set_event_handler_context
);
701 _dispatch_queue_finalize_activation(ds
->_as_dq
, allow_resume
);
703 if (dr
->du_is_direct
&& !ds
->ds_is_installed
) {
704 dispatch_queue_t dq
= ds
->_as_dq
;
705 pri
= _dispatch_queue_compute_priority_and_wlh(dq
, &wlh
);
706 if (pri
) _dispatch_source_install(ds
, wlh
, pri
);
710 DISPATCH_ALWAYS_INLINE
711 static inline dispatch_queue_wakeup_target_t
712 _dispatch_source_invoke2(dispatch_object_t dou
, dispatch_invoke_context_t dic
,
713 dispatch_invoke_flags_t flags
, uint64_t *owned
)
715 dispatch_source_t ds
= dou
._ds
;
716 dispatch_queue_wakeup_target_t retq
= DISPATCH_QUEUE_WAKEUP_NONE
;
717 dispatch_queue_t dq
= _dispatch_queue_get_current();
718 dispatch_source_refs_t dr
= ds
->ds_refs
;
719 dispatch_queue_flags_t dqf
;
721 if (!(flags
& DISPATCH_INVOKE_MANAGER_DRAIN
) &&
722 _dispatch_unote_wlh_changed(dr
, _dispatch_get_wlh())) {
723 dqf
= _dispatch_queue_atomic_flags_set_orig(ds
->_as_dq
,
725 if (!(dqf
& DSF_WLH_CHANGED
)) {
726 _dispatch_bug_deprecated("Changing target queue "
727 "hierarchy after source was activated");
731 if (_dispatch_queue_class_probe(ds
)) {
732 // Intentionally always drain even when on the manager queue
733 // and not the source's regular target queue: we need to be able
734 // to drain timer setting and the like there.
735 dispatch_with_disabled_narrowing(dic
, {
736 retq
= _dispatch_queue_serial_drain(ds
->_as_dq
, dic
, flags
, owned
);
740 // This function performs all source actions. Each action is responsible
741 // for verifying that it takes place on the appropriate queue. If the
742 // current queue is not the correct queue for this action, the correct queue
743 // will be returned and the invoke will be re-driven on that queue.
745 // The order of tests here in invoke and in wakeup should be consistent.
747 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
748 bool prevent_starvation
= false;
750 if (dr
->du_is_direct
) {
751 dkq
= ds
->do_targetq
;
754 if (dr
->du_is_timer
&&
755 os_atomic_load2o(ds
, ds_timer_refs
->dt_pending_config
, relaxed
)) {
756 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
757 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
))) {
758 // timer has to be configured on the kevent queue
762 _dispatch_source_timer_configure(ds
);
766 if (!ds
->ds_is_installed
) {
767 // The source needs to be installed on the kevent queue.
771 _dispatch_source_install(ds
, _dispatch_get_wlh(),
772 _dispatch_get_basepri());
775 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
776 // Source suspended by an item drained from the source queue.
777 return ds
->do_targetq
;
780 if (_dispatch_source_get_registration_handler(dr
)) {
781 // The source has been registered and the registration handler needs
782 // to be delivered on the target queue.
783 if (dq
!= ds
->do_targetq
) {
784 return ds
->do_targetq
;
786 // clears ds_registration_handler
787 _dispatch_source_registration_callout(ds
, dq
, flags
);
790 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
791 if ((dqf
& DSF_DEFERRED_DELETE
) && !(dqf
& DSF_ARMED
)) {
793 // DSF_DELETE: Pending source kevent unregistration has been completed
794 // !DSF_ARMED: event was delivered and can safely be unregistered
798 _dispatch_source_refs_unregister(ds
, DU_UNREGISTER_IMMEDIATE_DELETE
);
799 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
802 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) &&
803 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
804 // The source has pending data to deliver via the event handler callback
805 // on the target queue. Some sources need to be rearmed on the kevent
806 // queue after event delivery.
807 if (dq
== ds
->do_targetq
) {
808 _dispatch_source_latch_and_call(ds
, dq
, flags
);
809 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
811 // starvation avoidance: if the source triggers itself then force a
812 // re-queue to give other things already queued on the target queue
815 // however, if the source is directly targeting an overcommit root
816 // queue, this would requeue the source and ask for a new overcommit
817 // thread right away.
818 prevent_starvation
= dq
->do_targetq
||
819 !(dq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
);
820 if (prevent_starvation
&&
821 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
822 retq
= ds
->do_targetq
;
825 // there is no point trying to be eager, the next thing to do is
826 // to deliver the event
827 return ds
->do_targetq
;
831 if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !(dqf
& DSF_DEFERRED_DELETE
)) {
832 // The source has been cancelled and needs to be uninstalled from the
833 // kevent queue. After uninstallation, the cancellation handler needs
834 // to be delivered to the target queue.
835 if (!(dqf
& DSF_DELETED
)) {
836 if (dr
->du_is_timer
&& !(dqf
& DSF_ARMED
)) {
837 // timers can cheat if not armed because there's nothing left
838 // to do on the manager queue and unregistration can happen
839 // on the regular target queue
840 } else if (dq
!= dkq
) {
843 _dispatch_source_refs_unregister(ds
, 0);
844 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
845 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
846 if (!(dqf
& DSF_ARMED
)) {
847 goto unregister_event
;
849 // we need to wait for the EV_DELETE
850 return retq
? retq
: DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT
;
853 if (dq
!= ds
->do_targetq
&& (_dispatch_source_get_event_handler(dr
) ||
854 _dispatch_source_get_cancel_handler(dr
) ||
855 _dispatch_source_get_registration_handler(dr
))) {
856 retq
= ds
->do_targetq
;
858 _dispatch_source_cancel_callout(ds
, dq
, flags
);
859 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
861 prevent_starvation
= false;
864 if (_dispatch_unote_needs_rearm(dr
) &&
865 !(dqf
& (DSF_ARMED
|DSF_DELETED
|DSF_CANCELED
|DQF_RELEASED
))) {
866 // The source needs to be rearmed on the kevent queue.
870 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
871 // no need for resume when we can directly unregister the kevent
872 goto unregister_event
;
874 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
875 // do not try to rearm the kevent if the source is suspended
876 // from the source handler
877 return ds
->do_targetq
;
879 if (prevent_starvation
&& dr
->du_wlh
== DISPATCH_WLH_ANON
) {
880 // keep the old behavior to force re-enqueue to our target queue
883 // if the handler didn't run, or this is a pending delete
884 // or our target queue is a global queue, then starvation is
885 // not a concern and we can rearm right away.
886 return ds
->do_targetq
;
888 if (unlikely(!_dispatch_source_refs_resume(ds
))) {
889 goto unregister_event
;
891 if (!prevent_starvation
&& _dispatch_wlh_should_poll_unote(dr
)) {
892 // try to redrive the drain from under the lock for sources
893 // targeting an overcommit root queue to avoid parking
894 // when the next event has already fired
895 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE
);
904 _dispatch_source_invoke(dispatch_source_t ds
, dispatch_invoke_context_t dic
,
905 dispatch_invoke_flags_t flags
)
907 _dispatch_queue_class_invoke(ds
, dic
, flags
,
908 DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS
, _dispatch_source_invoke2
);
912 _dispatch_source_wakeup(dispatch_source_t ds
, dispatch_qos_t qos
,
913 dispatch_wakeup_flags_t flags
)
915 // This function determines whether the source needs to be invoked.
916 // The order of tests here in wakeup and in invoke should be consistent.
918 dispatch_source_refs_t dr
= ds
->ds_refs
;
919 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
920 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
921 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
922 bool deferred_delete
= (dqf
& DSF_DEFERRED_DELETE
);
924 if (dr
->du_is_direct
) {
925 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
928 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && dr
->du_is_timer
&&
929 os_atomic_load2o(ds
, ds_timer_refs
->dt_pending_config
, relaxed
)) {
930 // timer has to be configured on the kevent queue
932 } else if (!ds
->ds_is_installed
) {
933 // The source needs to be installed on the kevent queue.
935 } else if (_dispatch_source_get_registration_handler(dr
)) {
936 // The registration handler needs to be delivered to the target queue.
937 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
938 } else if (deferred_delete
&& !(dqf
& DSF_ARMED
)) {
939 // Pending source kevent unregistration has been completed
940 // or EV_ONESHOT event can be acknowledged
942 } else if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) &&
943 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
944 // The source has pending data to deliver to the target queue.
945 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
946 } else if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !deferred_delete
) {
947 // The source needs to be uninstalled from the kevent queue, or the
948 // cancellation handler needs to be delivered to the target queue.
949 // Note: cancellation assumes installation.
950 if (!(dqf
& DSF_DELETED
)) {
951 if (dr
->du_is_timer
&& !(dqf
& DSF_ARMED
)) {
952 // timers can cheat if not armed because there's nothing left
953 // to do on the manager queue and unregistration can happen
954 // on the regular target queue
955 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
959 } else if (_dispatch_source_get_event_handler(dr
) ||
960 _dispatch_source_get_cancel_handler(dr
) ||
961 _dispatch_source_get_registration_handler(dr
)) {
962 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
964 } else if (_dispatch_unote_needs_rearm(dr
) &&
965 !(dqf
& (DSF_ARMED
|DSF_DELETED
|DSF_CANCELED
|DQF_RELEASED
))) {
966 // The source needs to be rearmed on the kevent queue.
969 if (!tq
&& _dispatch_queue_class_probe(ds
)) {
970 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
973 if ((tq
== DISPATCH_QUEUE_WAKEUP_TARGET
) &&
974 ds
->do_targetq
== &_dispatch_mgr_q
) {
975 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
978 return _dispatch_queue_class_wakeup(ds
->_as_dq
, qos
, flags
, tq
);
982 dispatch_source_cancel(dispatch_source_t ds
)
984 _dispatch_object_debug(ds
, "%s", __func__
);
985 // Right after we set the cancel flag, someone else
986 // could potentially invoke the source, do the cancellation,
987 // unregister the source, and deallocate it. We would
988 // need to therefore retain/release before setting the bit
989 _dispatch_retain_2(ds
);
991 dispatch_queue_t q
= ds
->_as_dq
;
992 if (_dispatch_queue_atomic_flags_set_orig(q
, DSF_CANCELED
) & DSF_CANCELED
) {
993 _dispatch_release_2_tailcall(ds
);
995 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
| DISPATCH_WAKEUP_CONSUME_2
);
1000 dispatch_source_cancel_and_wait(dispatch_source_t ds
)
1002 dispatch_queue_flags_t old_dqf
, dqf
, new_dqf
;
1003 dispatch_source_refs_t dr
= ds
->ds_refs
;
1005 if (unlikely(_dispatch_source_get_cancel_handler(dr
))) {
1006 DISPATCH_CLIENT_CRASH(ds
, "Source has a cancel handler");
1009 _dispatch_object_debug(ds
, "%s", __func__
);
1010 os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1011 new_dqf
= old_dqf
| DSF_CANCELED
;
1012 if (old_dqf
& DSF_CANCEL_WAITER
) {
1013 os_atomic_rmw_loop_give_up(break);
1015 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1016 // just add DSF_CANCELED
1017 } else if ((old_dqf
& DSF_DEFERRED_DELETE
) || !dr
->du_is_direct
) {
1018 new_dqf
|= DSF_CANCEL_WAITER
;
1023 if (old_dqf
& DQF_RELEASED
) {
1024 DISPATCH_CLIENT_CRASH(ds
, "Dispatch source used after last release");
1026 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1029 if (dqf
& DSF_CANCEL_WAITER
) {
1033 // simplified version of _dispatch_queue_drain_try_lock
1034 // that also sets the DIRTY bit on failure to lock
1035 uint64_t set_owner_and_set_full_width
= _dispatch_lock_value_for_self() |
1036 DISPATCH_QUEUE_WIDTH_FULL_BIT
| DISPATCH_QUEUE_IN_BARRIER
;
1037 uint64_t old_state
, new_state
;
1039 os_atomic_rmw_loop2o(ds
, dq_state
, old_state
, new_state
, seq_cst
, {
1040 new_state
= old_state
;
1041 if (likely(_dq_state_is_runnable(old_state
) &&
1042 !_dq_state_drain_locked(old_state
))) {
1043 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
1044 new_state
|= set_owner_and_set_full_width
;
1045 } else if (old_dqf
& DSF_CANCELED
) {
1046 os_atomic_rmw_loop_give_up(break);
1048 // this case needs a release barrier, hence the seq_cst above
1049 new_state
|= DISPATCH_QUEUE_DIRTY
;
1053 if (unlikely(_dq_state_is_suspended(old_state
))) {
1054 if (unlikely(_dq_state_suspend_cnt(old_state
))) {
1055 DISPATCH_CLIENT_CRASH(ds
, "Source is suspended");
1057 // inactive sources have never been registered and there is no need
1058 // to wait here because activation will notice and mark the source
1059 // as deleted without ever trying to use the fd or mach port.
1060 return dispatch_activate(ds
);
1063 if (likely(_dq_state_is_runnable(old_state
) &&
1064 !_dq_state_drain_locked(old_state
))) {
1065 // same thing _dispatch_source_invoke2() does when handling cancellation
1066 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1067 if (!(dqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
))) {
1068 _dispatch_source_refs_unregister(ds
, 0);
1069 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1070 if (likely((dqf
& DSF_STATE_MASK
) == DSF_DELETED
)) {
1071 _dispatch_source_cancel_callout(ds
, NULL
, DISPATCH_INVOKE_NONE
);
1074 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE
);
1075 } else if (unlikely(_dq_state_drain_locked_by_self(old_state
))) {
1076 DISPATCH_CLIENT_CRASH(ds
, "dispatch_source_cancel_and_wait "
1077 "called from a source handler");
1081 qos
= _dispatch_qos_from_pp(_dispatch_get_priority());
1082 dx_wakeup(ds
, qos
, DISPATCH_WAKEUP_MAKE_DIRTY
);
1083 dispatch_activate(ds
);
1086 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1087 while (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
1088 if (unlikely(!(dqf
& DSF_CANCEL_WAITER
))) {
1089 if (!os_atomic_cmpxchgv2o(ds
, dq_atomic_flags
,
1090 dqf
, dqf
| DSF_CANCEL_WAITER
, &dqf
, relaxed
)) {
1093 dqf
|= DSF_CANCEL_WAITER
;
1095 _dispatch_wait_on_address(&ds
->dq_atomic_flags
, dqf
, DLOCK_LOCK_NONE
);
1096 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1101 _dispatch_source_merge_evt(dispatch_unote_t du
, uint32_t flags
, uintptr_t data
,
1102 uintptr_t status
, pthread_priority_t pp
)
1104 dispatch_source_refs_t dr
= du
._dr
;
1105 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
1106 dispatch_wakeup_flags_t wflags
= 0;
1107 dispatch_queue_flags_t dqf
;
1109 if (_dispatch_unote_needs_rearm(dr
) || (flags
& (EV_DELETE
| EV_ONESHOT
))) {
1110 // once we modify the queue atomic flags below, it will allow concurrent
1111 // threads running _dispatch_source_invoke2 to dispose of the source,
1112 // so we can't safely borrow the reference we get from the muxnote udata
1113 // anymore, and need our own
1114 wflags
= DISPATCH_WAKEUP_CONSUME_2
;
1115 _dispatch_retain_2(ds
); // rdar://20382435
1118 if ((flags
& EV_UDATA_SPECIFIC
) && (flags
& EV_ONESHOT
) &&
1119 !(flags
& EV_DELETE
)) {
1120 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1121 DSF_DEFERRED_DELETE
, DSF_ARMED
);
1122 if (flags
& EV_VANISHED
) {
1123 _dispatch_bug_kevent_client("kevent", dr
->du_type
->dst_kind
,
1124 "monitored resource vanished before the source "
1125 "cancel handler was invoked", 0);
1127 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds
,
1128 (flags
& EV_VANISHED
) ? "vanished" :
1129 "deferred delete oneshot", dr
);
1130 } else if (flags
& (EV_DELETE
| EV_ONESHOT
)) {
1131 _dispatch_source_refs_unregister(ds
, DU_UNREGISTER_ALREADY_DELETED
);
1132 _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds
, dr
);
1133 if (flags
& EV_DELETE
) goto done
;
1134 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1135 } else if (_dispatch_unote_needs_rearm(dr
)) {
1136 dqf
= _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
1137 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dr
);
1139 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1142 if (dqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1143 goto done
; // rdar://20204025
1146 dispatch_unote_action_t action
= dr
->du_data_action
;
1147 if ((flags
& EV_UDATA_SPECIFIC
) && (flags
& EV_ONESHOT
) &&
1148 (flags
& EV_VANISHED
)) {
1149 // if the resource behind the ident vanished, the event handler can't
1150 // do anything useful anymore, so do not try to call it at all
1152 // Note: if the kernel doesn't support EV_VANISHED we always get it
1153 // back unchanged from the flags passed at EV_ADD (registration) time
1154 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1155 // if we get both bits it was a real EV_VANISHED delivery
1156 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1158 } else if (dr
->du_filter
== EVFILT_MACHPORT
) {
1159 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1161 } else if (action
== DISPATCH_UNOTE_ACTION_DATA_SET
) {
1162 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1163 } else if (action
== DISPATCH_UNOTE_ACTION_DATA_ADD
) {
1164 os_atomic_add2o(ds
, ds_pending_data
, data
, relaxed
);
1165 } else if (data
&& action
== DISPATCH_UNOTE_ACTION_DATA_OR
) {
1166 os_atomic_or2o(ds
, ds_pending_data
, data
, relaxed
);
1167 } else if (data
&& action
== DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
) {
1168 // We combine the data and status into a single 64-bit value.
1169 uint64_t odata
, ndata
;
1170 uint64_t value
= DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data
, status
);
1171 os_atomic_rmw_loop2o(ds
, ds_pending_data
, odata
, ndata
, relaxed
, {
1172 ndata
= DISPATCH_SOURCE_GET_DATA(odata
) | value
;
1175 DISPATCH_INTERNAL_CRASH(action
, "Unexpected source action value");
1177 _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds
, dr
);
1180 _dispatch_object_debug(ds
, "%s", __func__
);
1181 dx_wakeup(ds
, _dispatch_qos_from_pp(pp
), wflags
| DISPATCH_WAKEUP_MAKE_DIRTY
);
1185 #pragma mark dispatch_source_timer
1187 #if DISPATCH_USE_DTRACE
1188 static dispatch_timer_source_refs_t
1189 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1190 #define _dispatch_trace_next_timer_set(x, q) \
1191 _dispatch_trace_next_timer[(q)] = (x)
1192 #define _dispatch_trace_next_timer_program(d, q) \
1193 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1194 DISPATCH_ALWAYS_INLINE
1196 _dispatch_mgr_trace_timers_wakes(void)
1200 if (_dispatch_timers_will_wake
) {
1201 if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
1202 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
1203 if (_dispatch_timers_will_wake
& (1 << qos
)) {
1204 _dispatch_trace_timer_wake(_dispatch_trace_next_timer
[qos
]);
1208 _dispatch_timers_will_wake
= 0;
1212 #define _dispatch_trace_next_timer_set(x, q)
1213 #define _dispatch_trace_next_timer_program(d, q)
1214 #define _dispatch_mgr_trace_timers_wakes()
1217 #define _dispatch_source_timer_telemetry_enabled() false
1221 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1222 dispatch_clock_t clock
, struct dispatch_timer_source_s
*values
)
1224 if (_dispatch_trace_timer_configure_enabled()) {
1225 _dispatch_trace_timer_configure(ds
, clock
, values
);
1229 DISPATCH_ALWAYS_INLINE
1231 _dispatch_source_timer_telemetry(dispatch_source_t ds
, dispatch_clock_t clock
,
1232 struct dispatch_timer_source_s
*values
)
1234 if (_dispatch_trace_timer_configure_enabled() ||
1235 _dispatch_source_timer_telemetry_enabled()) {
1236 _dispatch_source_timer_telemetry_slow(ds
, clock
, values
);
1237 asm(""); // prevent tailcall
1243 _dispatch_source_timer_configure(dispatch_source_t ds
)
1245 dispatch_timer_source_refs_t dt
= ds
->ds_timer_refs
;
1246 dispatch_timer_config_t dtc
;
1248 dtc
= os_atomic_xchg2o(dt
, dt_pending_config
, NULL
, dependency
);
1249 if (dtc
->dtc_clock
== DISPATCH_CLOCK_MACH
) {
1250 dt
->du_fflags
|= DISPATCH_TIMER_CLOCK_MACH
;
1252 dt
->du_fflags
&= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH
;
1254 dt
->dt_timer
= dtc
->dtc_timer
;
1256 if (ds
->ds_is_installed
) {
1257 // Clear any pending data that might have accumulated on
1258 // older timer params <rdar://problem/8574886>
1259 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1260 _dispatch_timers_update(dt
, 0);
1264 static dispatch_timer_config_t
1265 _dispatch_source_timer_config_create(dispatch_time_t start
,
1266 uint64_t interval
, uint64_t leeway
)
1268 dispatch_timer_config_t dtc
;
1269 dtc
= _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s
));
1270 if (unlikely(interval
== 0)) {
1271 if (start
!= DISPATCH_TIME_FOREVER
) {
1272 _dispatch_bug_deprecated("Setting timer interval to 0 requests "
1273 "a 1ns timer, did you mean FOREVER (a one-shot timer)?");
1276 } else if ((int64_t)interval
< 0) {
1277 // 6866347 - make sure nanoseconds won't overflow
1278 interval
= INT64_MAX
;
1280 if ((int64_t)leeway
< 0) {
1283 if (start
== DISPATCH_TIME_NOW
) {
1284 start
= _dispatch_absolute_time();
1285 } else if (start
== DISPATCH_TIME_FOREVER
) {
1289 if ((int64_t)start
< 0) {
1291 start
= (dispatch_time_t
)-((int64_t)start
);
1292 dtc
->dtc_clock
= DISPATCH_CLOCK_WALL
;
1295 interval
= _dispatch_time_nano2mach(interval
);
1297 // rdar://problem/7287561 interval must be at least one in
1298 // in order to avoid later division by zero when calculating
1299 // the missed interval count. (NOTE: the wall clock's
1300 // interval is already "fixed" to be 1 or more)
1303 leeway
= _dispatch_time_nano2mach(leeway
);
1304 dtc
->dtc_clock
= DISPATCH_CLOCK_MACH
;
1306 if (interval
< INT64_MAX
&& leeway
> interval
/ 2) {
1307 leeway
= interval
/ 2;
1310 dtc
->dtc_timer
.target
= start
;
1311 dtc
->dtc_timer
.interval
= interval
;
1312 if (start
+ leeway
< INT64_MAX
) {
1313 dtc
->dtc_timer
.deadline
= start
+ leeway
;
1315 dtc
->dtc_timer
.deadline
= INT64_MAX
;
1322 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1323 uint64_t interval
, uint64_t leeway
)
1325 dispatch_timer_source_refs_t dt
= ds
->ds_timer_refs
;
1326 dispatch_timer_config_t dtc
;
1328 if (unlikely(!dt
->du_is_timer
|| (dt
->du_fflags
&DISPATCH_TIMER_INTERVAL
))) {
1329 DISPATCH_CLIENT_CRASH(ds
, "Attempt to set timer on a non-timer source");
1332 dtc
= _dispatch_source_timer_config_create(start
, interval
, leeway
);
1333 _dispatch_source_timer_telemetry(ds
, dtc
->dtc_clock
, &dtc
->dtc_timer
);
1334 dtc
= os_atomic_xchg2o(dt
, dt_pending_config
, dtc
, release
);
1336 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
);
1340 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1342 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1343 // approx 1 year (60s * 60m * 24h * 365d)
1344 #define FOREVER_NSEC 31536000000000000ull
1346 dispatch_timer_source_refs_t dr
= ds
->ds_timer_refs
;
1347 const bool animation
= dr
->du_fflags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1348 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1349 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1350 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1352 interval
= FOREVER_NSEC
;
1354 interval
= _dispatch_time_nano2mach(interval
);
1355 uint64_t target
= _dispatch_absolute_time() + interval
;
1356 target
-= (target
% interval
);
1357 const uint64_t leeway
= animation
?
1358 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1359 dr
->dt_timer
.target
= target
;
1360 dr
->dt_timer
.deadline
= target
+ leeway
;
1361 dr
->dt_timer
.interval
= interval
;
1362 _dispatch_source_timer_telemetry(ds
, DISPATCH_CLOCK_MACH
, &dr
->dt_timer
);
1366 #pragma mark dispatch_after
1368 DISPATCH_ALWAYS_INLINE
1370 _dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1371 void *ctxt
, void *handler
, bool block
)
1373 dispatch_timer_source_refs_t dt
;
1374 dispatch_source_t ds
;
1375 uint64_t leeway
, delta
;
1377 if (when
== DISPATCH_TIME_FOREVER
) {
1379 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
1384 delta
= _dispatch_timeout(when
);
1387 return dispatch_async(queue
, handler
);
1389 return dispatch_async_f(queue
, ctxt
, handler
);
1391 leeway
= delta
/ 10; // <rdar://problem/13447496>
1393 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
1394 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
1396 // this function can and should be optimized to not use a dispatch source
1397 ds
= dispatch_source_create(&_dispatch_source_type_after
, 0, 0, queue
);
1398 dt
= ds
->ds_timer_refs
;
1400 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
1402 _dispatch_continuation_init(dc
, ds
, handler
, 0, 0, 0);
1404 _dispatch_continuation_init_f(dc
, ds
, ctxt
, handler
, 0, 0, 0);
1406 // reference `ds` so that it doesn't show up as a leak
1408 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
1409 os_atomic_store2o(dt
, ds_handler
[DS_EVENT_HANDLER
], dc
, relaxed
);
1411 if ((int64_t)when
< 0) {
1413 when
= (dispatch_time_t
)-((int64_t)when
);
1416 dt
->du_fflags
|= DISPATCH_TIMER_CLOCK_MACH
;
1417 leeway
= _dispatch_time_nano2mach(leeway
);
1419 dt
->dt_timer
.target
= when
;
1420 dt
->dt_timer
.interval
= UINT64_MAX
;
1421 dt
->dt_timer
.deadline
= when
+ leeway
;
1422 dispatch_activate(ds
);
1427 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
1428 dispatch_function_t func
)
1430 _dispatch_after(when
, queue
, ctxt
, func
, false);
1435 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1436 dispatch_block_t work
)
1438 _dispatch_after(when
, queue
, NULL
, work
, true);
1443 #pragma mark dispatch_timers
1446 * The dispatch_timer_heap_t structure is a double min-heap of timers,
1447 * interleaving the by-target min-heap in the even slots, and the by-deadline
1450 * The min element of these is held inline in the dispatch_timer_heap_t
1451 * structure, and further entries are held in segments.
1453 * dth_segments is the number of allocated segments.
1455 * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
1456 * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
1458 * Segment n (dth_segments - 1) is the last segment and points its final n
1459 * entries to previous segments. Its address is held in the `dth_heap` field.
1461 * segment n [ regular timer pointers | n-1 | k | 0 ]
1463 * segment n-1 <---------------------------' | |
1464 * segment k <--------------------------------' |
1465 * segment 0 <------------------------------------'
1467 #define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u
1470 * There are two min-heaps stored interleaved in a single array,
1471 * even indices are for the by-target min-heap, and odd indices for
1472 * the by-deadline one.
1474 #define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
1475 #define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
1476 #define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
1477 (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))
1479 DISPATCH_ALWAYS_INLINE
1480 static inline uint32_t
1481 _dispatch_timer_heap_capacity(uint32_t segments
)
1483 if (segments
== 0) return 2;
1484 uint32_t seg_no
= segments
- 1;
1485 // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
1486 // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
1487 return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
<< seg_no
) - seg_no
;
1492 _dispatch_timer_heap_grow(dispatch_timer_heap_t dth
)
1494 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1495 uint32_t seg_no
= dth
->dth_segments
++;
1496 void **heap
, **heap_prev
= dth
->dth_heap
;
1499 seg_capacity
<<= (seg_no
- 1);
1501 heap
= _dispatch_calloc(seg_capacity
, sizeof(void *));
1503 uint32_t prev_seg_no
= seg_no
- 1;
1504 uint32_t prev_seg_capacity
= seg_capacity
>> 1;
1505 memcpy(&heap
[seg_capacity
- prev_seg_no
],
1506 &heap_prev
[prev_seg_capacity
- prev_seg_no
],
1507 prev_seg_no
* sizeof(void *));
1510 heap
[seg_capacity
- seg_no
] = heap_prev
;
1512 dth
->dth_heap
= heap
;
1517 _dispatch_timer_heap_shrink(dispatch_timer_heap_t dth
)
1519 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1520 uint32_t seg_no
= --dth
->dth_segments
;
1521 void **heap
= dth
->dth_heap
, **heap_prev
= NULL
;
1524 seg_capacity
<<= (seg_no
- 1);
1525 heap_prev
= heap
[seg_capacity
- seg_no
];
1528 uint32_t prev_seg_no
= seg_no
- 1;
1529 uint32_t prev_seg_capacity
= seg_capacity
>> 1;
1530 memcpy(&heap_prev
[prev_seg_capacity
- prev_seg_no
],
1531 &heap
[seg_capacity
- prev_seg_no
],
1532 prev_seg_no
* sizeof(void *));
1534 dth
->dth_heap
= heap_prev
;
1538 DISPATCH_ALWAYS_INLINE
1539 static inline dispatch_timer_source_refs_t
*
1540 _dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth
, uint32_t idx
)
1542 uint32_t seg_no
, segments
= dth
->dth_segments
;
1545 if (idx
< DTH_ID_COUNT
) {
1546 return &dth
->dth_min
[idx
];
1548 idx
-= DTH_ID_COUNT
;
1550 // Derive the segment number from the index. Naming
1551 // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
1553 // 1: C .. 2 * C - 1
1554 // k: 2^(k-1) * C .. 2^k * C - 1
1555 // so `k` can be derived from the first bit set in `idx`
1556 seg_no
= (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
- 1) -
1557 __builtin_clz(idx
| (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
- 1)));
1558 if (seg_no
+ 1 == segments
) {
1559 segment
= dth
->dth_heap
;
1561 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1562 seg_capacity
<<= (segments
- 2);
1563 segment
= dth
->dth_heap
[seg_capacity
- seg_no
- 1];
1566 idx
-= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
<< (seg_no
- 1);
1568 return (dispatch_timer_source_refs_t
*)(segment
+ idx
);
1571 DISPATCH_ALWAYS_INLINE
1573 _dispatch_timer_heap_set(dispatch_timer_source_refs_t
*slot
,
1574 dispatch_timer_source_refs_t dt
, uint32_t idx
)
1577 dt
->dt_heap_entry
[DTH_HEAP_ID(idx
)] = idx
;
1580 DISPATCH_ALWAYS_INLINE
1581 static inline uint32_t
1582 _dispatch_timer_heap_parent(uint32_t idx
)
1584 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1585 idx
= (idx
- DTH_ID_COUNT
) / 2; // go to the parent
1586 return DTH_IDX_FOR_HEAP_ID(idx
, heap_id
);
1589 DISPATCH_ALWAYS_INLINE
1590 static inline uint32_t
1591 _dispatch_timer_heap_left_child(uint32_t idx
)
1593 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1594 // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
1595 return 2 * idx
+ DTH_ID_COUNT
- heap_id
;
1598 #if DISPATCH_HAVE_TIMER_COALESCING
1599 DISPATCH_ALWAYS_INLINE
1600 static inline uint32_t
1601 _dispatch_timer_heap_walk_skip(uint32_t idx
, uint32_t count
)
1603 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1606 if (unlikely(idx
+ DTH_ID_COUNT
== count
)) {
1607 // reaching `count` doesn't mean we're done, but there is a weird
1608 // corner case if the last item of the heap is a left child:
1616 // The formula below would return the sibling of `idx` which is
1617 // out of bounds. Fortunately, the correct answer is the same
1618 // as for idx's parent
1619 idx
= _dispatch_timer_heap_parent(idx
);
1623 // When considering the index in a non interleaved, 1-based array
1624 // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
1625 // for a given idx in our dual-heaps, that index is in one of two forms:
1627 // (a) 1xxxx011111 or (b) 111111111
1630 // The first bit set is the row of the binary tree node (0-based).
1631 // The following digits from most to least significant represent the path
1632 // to that node, where `0` is a left turn and `1` a right turn.
1634 // For example 0b0101 (5) is a node on row 2 accessed going left then right:
1643 // Skipping a sub-tree in walk order means going to the sibling of the last
1644 // node reached after we turned left. If the node was of the form (a),
1645 // this node is 1xxxx1, which for the above example is 0b0011 (3).
1646 // If the node was of the form (b) then we never took a left, meaning
1647 // we reached the last element in traversal order.
1652 // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
1653 // - which is offset by log_2(DTH_ID_COUNT) from the position of the least
1654 // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1655 // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
1656 // - which in turn is the same as the position of the least significant 1 in
1657 // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1659 dispatch_static_assert(powerof2(DTH_ID_COUNT
));
1660 idx
+= DTH_ID_COUNT
+ DTH_ID_COUNT
- 1;
1661 idx
>>= __builtin_ctz(~idx
);
1664 // `idx` is now either:
1665 // - 0 if it was the (b) case above, in which case the walk is done
1666 // - 1xxxx0 as the position in a 0 based array representation of a non
1667 // interleaved heap, so we just have to compute the interleaved index.
1669 return likely(idx
) ? DTH_ID_COUNT
* idx
+ heap_id
: UINT32_MAX
;
1672 DISPATCH_ALWAYS_INLINE
1673 static inline uint32_t
1674 _dispatch_timer_heap_walk_next(uint32_t idx
, uint32_t count
)
1677 // Goes to the next element in heap walk order, which is the prefix ordered
1678 // walk of the tree.
1680 // From a given node, the next item to return is the left child if it
1681 // exists, else the first right sibling we find by walking our parent chain,
1682 // which is exactly what _dispatch_timer_heap_walk_skip() returns.
1684 uint32_t lchild
= _dispatch_timer_heap_left_child(idx
);
1685 if (lchild
< count
) {
1688 return _dispatch_timer_heap_walk_skip(idx
, count
);
1693 _dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth
, uint64_t limit
)
1695 dispatch_timer_source_refs_t dri
;
1696 uint32_t idx
= _dispatch_timer_heap_left_child(DTH_TARGET_ID
);
1697 uint32_t count
= dth
->dth_count
;
1698 uint64_t tmp
, target
= dth
->dth_min
[DTH_TARGET_ID
]->dt_timer
.target
;
1700 while (idx
< count
) {
1701 dri
= *_dispatch_timer_heap_get_slot(dth
, idx
);
1702 tmp
= dri
->dt_timer
.target
;
1704 // skip subtree since none of the targets below can be before limit
1705 idx
= _dispatch_timer_heap_walk_skip(idx
, count
);
1708 idx
= _dispatch_timer_heap_walk_next(idx
, count
);
1713 #endif // DISPATCH_HAVE_TIMER_COALESCING
1717 _dispatch_timer_heap_resift(dispatch_timer_heap_t dth
,
1718 dispatch_timer_source_refs_t dt
, uint32_t idx
)
1720 dispatch_static_assert(offsetof(struct dispatch_timer_source_s
, target
) ==
1721 offsetof(struct dispatch_timer_source_s
, heap_key
[DTH_TARGET_ID
]));
1722 dispatch_static_assert(offsetof(struct dispatch_timer_source_s
, deadline
) ==
1723 offsetof(struct dispatch_timer_source_s
, heap_key
[DTH_DEADLINE_ID
]));
1724 #define dth_cmp(hid, dt1, op, dt2) \
1725 (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])
1727 dispatch_timer_source_refs_t
*pslot
, pdt
;
1728 dispatch_timer_source_refs_t
*cslot
, cdt
;
1729 dispatch_timer_source_refs_t
*rslot
, rdt
;
1730 uint32_t cidx
, dth_count
= dth
->dth_count
;
1731 dispatch_timer_source_refs_t
*slot
;
1732 int heap_id
= DTH_HEAP_ID(idx
);
1733 bool sifted_up
= false;
1737 slot
= _dispatch_timer_heap_get_slot(dth
, idx
);
1738 while (idx
>= DTH_ID_COUNT
) {
1739 uint32_t pidx
= _dispatch_timer_heap_parent(idx
);
1740 pslot
= _dispatch_timer_heap_get_slot(dth
, pidx
);
1742 if (dth_cmp(heap_id
, pdt
, <=, dt
)) {
1745 _dispatch_timer_heap_set(slot
, pdt
, idx
);
1756 while ((cidx
= _dispatch_timer_heap_left_child(idx
)) < dth_count
) {
1757 uint32_t ridx
= cidx
+ DTH_ID_COUNT
;
1758 cslot
= _dispatch_timer_heap_get_slot(dth
, cidx
);
1760 if (ridx
< dth_count
) {
1761 rslot
= _dispatch_timer_heap_get_slot(dth
, ridx
);
1763 if (dth_cmp(heap_id
, cdt
, >, rdt
)) {
1769 if (dth_cmp(heap_id
, dt
, <=, cdt
)) {
1772 _dispatch_timer_heap_set(slot
, cdt
, idx
);
1778 _dispatch_timer_heap_set(slot
, dt
, idx
);
1782 DISPATCH_ALWAYS_INLINE
1784 _dispatch_timer_heap_insert(dispatch_timer_heap_t dth
,
1785 dispatch_timer_source_refs_t dt
)
1787 uint32_t idx
= (dth
->dth_count
+= DTH_ID_COUNT
) - DTH_ID_COUNT
;
1789 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], ==,
1790 DTH_INVALID_ID
, "target idx");
1791 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], ==,
1792 DTH_INVALID_ID
, "deadline idx");
1795 dt
->dt_heap_entry
[DTH_TARGET_ID
] = DTH_TARGET_ID
;
1796 dt
->dt_heap_entry
[DTH_DEADLINE_ID
] = DTH_DEADLINE_ID
;
1797 dth
->dth_min
[DTH_TARGET_ID
] = dth
->dth_min
[DTH_DEADLINE_ID
] = dt
;
1801 if (unlikely(idx
+ DTH_ID_COUNT
>
1802 _dispatch_timer_heap_capacity(dth
->dth_segments
))) {
1803 _dispatch_timer_heap_grow(dth
);
1805 _dispatch_timer_heap_resift(dth
, dt
, idx
+ DTH_TARGET_ID
);
1806 _dispatch_timer_heap_resift(dth
, dt
, idx
+ DTH_DEADLINE_ID
);
1811 _dispatch_timer_heap_remove(dispatch_timer_heap_t dth
,
1812 dispatch_timer_source_refs_t dt
)
1814 uint32_t idx
= (dth
->dth_count
-= DTH_ID_COUNT
);
1816 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], !=,
1817 DTH_INVALID_ID
, "target idx");
1818 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], !=,
1819 DTH_INVALID_ID
, "deadline idx");
1822 DISPATCH_TIMER_ASSERT(dth
->dth_min
[DTH_TARGET_ID
], ==, dt
,
1824 DISPATCH_TIMER_ASSERT(dth
->dth_min
[DTH_DEADLINE_ID
], ==, dt
,
1826 dth
->dth_min
[DTH_TARGET_ID
] = dth
->dth_min
[DTH_DEADLINE_ID
] = NULL
;
1827 goto clear_heap_entry
;
1830 for (uint32_t heap_id
= 0; heap_id
< DTH_ID_COUNT
; heap_id
++) {
1831 dispatch_timer_source_refs_t
*slot
, last_dt
;
1832 slot
= _dispatch_timer_heap_get_slot(dth
, idx
+ heap_id
);
1833 last_dt
= *slot
; *slot
= NULL
;
1834 if (last_dt
!= dt
) {
1835 uint32_t removed_idx
= dt
->dt_heap_entry
[heap_id
];
1836 _dispatch_timer_heap_resift(dth
, last_dt
, removed_idx
);
1839 if (unlikely(idx
<= _dispatch_timer_heap_capacity(dth
->dth_segments
- 1))) {
1840 _dispatch_timer_heap_shrink(dth
);
1844 dt
->dt_heap_entry
[DTH_TARGET_ID
] = DTH_INVALID_ID
;
1845 dt
->dt_heap_entry
[DTH_DEADLINE_ID
] = DTH_INVALID_ID
;
1848 DISPATCH_ALWAYS_INLINE
1850 _dispatch_timer_heap_update(dispatch_timer_heap_t dth
,
1851 dispatch_timer_source_refs_t dt
)
1853 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], !=,
1854 DTH_INVALID_ID
, "target idx");
1855 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], !=,
1856 DTH_INVALID_ID
, "deadline idx");
1859 _dispatch_timer_heap_resift(dth
, dt
, dt
->dt_heap_entry
[DTH_TARGET_ID
]);
1860 _dispatch_timer_heap_resift(dth
, dt
, dt
->dt_heap_entry
[DTH_DEADLINE_ID
]);
1863 DISPATCH_ALWAYS_INLINE
1865 _dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth
,
1866 uint32_t count
, uint32_t mask
)
1868 dispatch_timer_source_refs_t dt
;
1869 bool changed
= false;
1873 for (tidx
= 0; tidx
< count
; tidx
++) {
1874 if (!(mask
& (1u << tidx
))) {
1878 dt
= dth
[tidx
].dth_min
[DTH_TARGET_ID
];
1879 tmp
= dt
? dt
->dt_timer
.target
: UINT64_MAX
;
1880 if (dth
[tidx
].dth_target
!= tmp
) {
1881 dth
[tidx
].dth_target
= tmp
;
1884 dt
= dth
[tidx
].dth_min
[DTH_DEADLINE_ID
];
1885 tmp
= dt
? dt
->dt_timer
.deadline
: UINT64_MAX
;
1886 if (dth
[tidx
].dth_deadline
!= tmp
) {
1887 dth
[tidx
].dth_deadline
= tmp
;
1895 _dispatch_timers_unregister(dispatch_timer_source_refs_t dt
)
1897 uint32_t tidx
= dt
->du_ident
;
1898 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
1900 _dispatch_timer_heap_remove(heap
, dt
);
1901 _dispatch_timers_reconfigure
= true;
1902 _dispatch_timers_processing_mask
|= 1 << tidx
;
1903 dispatch_assert(dt
->du_wlh
== NULL
|| dt
->du_wlh
== DISPATCH_WLH_ANON
);
1908 _dispatch_timers_register(dispatch_timer_source_refs_t dt
, uint32_t tidx
)
1910 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
1911 if (_dispatch_unote_registered(dt
)) {
1912 DISPATCH_TIMER_ASSERT(dt
->du_ident
, ==, tidx
, "tidx");
1913 _dispatch_timer_heap_update(heap
, dt
);
1915 dt
->du_ident
= tidx
;
1916 _dispatch_timer_heap_insert(heap
, dt
);
1918 _dispatch_timers_reconfigure
= true;
1919 _dispatch_timers_processing_mask
|= 1 << tidx
;
1920 dispatch_assert(dt
->du_wlh
== NULL
|| dt
->du_wlh
== DISPATCH_WLH_ANON
);
1921 dt
->du_wlh
= DISPATCH_WLH_ANON
;
1924 DISPATCH_ALWAYS_INLINE
1926 _dispatch_source_timer_tryarm(dispatch_source_t ds
)
1928 dispatch_queue_flags_t oqf
, nqf
;
1929 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
1930 if (oqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1931 // do not install a cancelled timer
1932 os_atomic_rmw_loop_give_up(break);
1934 nqf
= oqf
| DSF_ARMED
;
1938 // Updates the ordered list of timers based on next fire date for changes to ds.
1939 // Should only be called from the context of _dispatch_mgr_q.
1941 _dispatch_timers_update(dispatch_unote_t du
, uint32_t flags
)
1943 dispatch_timer_source_refs_t dr
= du
._dt
;
1944 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
1945 const char *verb
= "updated";
1946 bool will_register
, disarm
= false;
1948 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1950 if (unlikely(dr
->du_ident
== DISPATCH_TIMER_IDENT_CANCELED
)) {
1951 dispatch_assert((flags
& DISPATCH_TIMERS_RETAIN_2
) == 0);
1955 // Unregister timers that are unconfigured, disabled, suspended or have
1956 // missed intervals. Rearm after dispatch_set_timer(), resume or source
1957 // invoke will reenable them
1958 will_register
= !(flags
& DISPATCH_TIMERS_UNREGISTER
) &&
1959 dr
->dt_timer
.target
< INT64_MAX
&&
1960 !os_atomic_load2o(ds
, ds_pending_data
, relaxed
) &&
1961 !DISPATCH_QUEUE_IS_SUSPENDED(ds
) &&
1962 !os_atomic_load2o(dr
, dt_pending_config
, relaxed
);
1963 if (likely(!_dispatch_unote_registered(dr
))) {
1964 dispatch_assert((flags
& DISPATCH_TIMERS_RETAIN_2
) == 0);
1965 if (unlikely(!will_register
|| !_dispatch_source_timer_tryarm(ds
))) {
1969 } else if (unlikely(!will_register
)) {
1974 // The heap owns a +2 on dispatch sources it references
1976 // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
1977 // when it wants to take over this +2 at the same time we are unregistering
1978 // the timer from the heap.
1980 // Compute our refcount balance according to these rules, if our balance
1981 // would become negative we retain the source upfront, if it is positive, we
1982 // get rid of the extraneous refcounts after we're done touching the source.
1983 int refs
= will_register
? -2 : 0;
1984 if (_dispatch_unote_registered(dr
) && !(flags
& DISPATCH_TIMERS_RETAIN_2
)) {
1988 dispatch_assert(refs
== -2);
1989 _dispatch_retain_2(ds
);
1992 uint32_t tidx
= _dispatch_source_timer_idx(dr
);
1993 if (unlikely(_dispatch_unote_registered(dr
) &&
1994 (!will_register
|| dr
->du_ident
!= tidx
))) {
1995 _dispatch_timers_unregister(dr
);
1997 if (likely(will_register
)) {
1998 _dispatch_timers_register(dr
, tidx
);
2002 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
2004 _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds
, verb
, dr
);
2005 _dispatch_object_debug(ds
, "%s", __func__
);
2007 dispatch_assert(refs
== 2);
2008 _dispatch_release_2_tailcall(ds
);
2012 #define DISPATCH_TIMER_MISSED_MARKER 1ul
2014 DISPATCH_ALWAYS_INLINE
2015 static inline unsigned long
2016 _dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt
,
2017 uint64_t now
, unsigned long prev
)
2019 uint64_t missed
= (now
- dt
->dt_timer
.target
) / dt
->dt_timer
.interval
;
2020 if (++missed
+ prev
> LONG_MAX
) {
2021 missed
= LONG_MAX
- prev
;
2023 if (dt
->dt_timer
.interval
< INT64_MAX
) {
2024 uint64_t push_by
= missed
* dt
->dt_timer
.interval
;
2025 dt
->dt_timer
.target
+= push_by
;
2026 dt
->dt_timer
.deadline
+= push_by
;
2028 dt
->dt_timer
.target
= UINT64_MAX
;
2029 dt
->dt_timer
.deadline
= UINT64_MAX
;
2035 DISPATCH_ALWAYS_INLINE
2036 static inline unsigned long
2037 _dispatch_source_timer_data(dispatch_source_t ds
, dispatch_unote_t du
)
2039 dispatch_timer_source_refs_t dr
= du
._dt
;
2040 unsigned long data
, prev
, clear_prev
= 0;
2042 os_atomic_rmw_loop2o(ds
, ds_pending_data
, prev
, clear_prev
, relaxed
, {
2044 if (unlikely(prev
& DISPATCH_TIMER_MISSED_MARKER
)) {
2045 os_atomic_rmw_loop_give_up(goto handle_missed_intervals
);
2050 handle_missed_intervals
:
2051 // The timer may be in _dispatch_source_invoke2() already for other
2052 // reasons such as running the registration handler when ds_pending_data
2053 // is changed by _dispatch_timers_run2() without holding the drain lock.
2055 // We hence need dependency ordering to pair with the release barrier
2056 // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
2057 os_atomic_thread_fence(dependency
);
2058 dr
= os_atomic_force_dependency_on(dr
, data
);
2060 uint64_t now
= _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr
->du_ident
));
2061 if (now
>= dr
->dt_timer
.target
) {
2062 OS_COMPILER_CAN_ASSUME(dr
->dt_timer
.interval
< INT64_MAX
);
2063 data
= _dispatch_source_timer_compute_missed(dr
, now
, data
);
2066 // When we see the MISSED_MARKER the manager has given up on this timer
2067 // and expects the handler to call "resume".
2069 // However, it may not have reflected this into the atomic flags yet
2070 // so make sure _dispatch_source_invoke2() sees the timer is disarmed
2072 // The subsequent _dispatch_source_refs_resume() will enqueue the source
2073 // on the manager and make the changes to `ds_timer` above visible.
2074 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
2075 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
2080 _dispatch_timers_run2(dispatch_clock_now_cache_t nows
, uint32_t tidx
)
2082 dispatch_timer_source_refs_t dr
;
2083 dispatch_source_t ds
;
2084 uint64_t data
, pending_data
;
2085 uint64_t now
= _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx
), nows
);
2087 while ((dr
= _dispatch_timers_heap
[tidx
].dth_min
[DTH_TARGET_ID
])) {
2088 DISPATCH_TIMER_ASSERT(dr
->du_filter
, ==, DISPATCH_EVFILT_TIMER
,
2090 DISPATCH_TIMER_ASSERT(dr
->du_ident
, ==, tidx
, "tidx");
2091 DISPATCH_TIMER_ASSERT(dr
->dt_timer
.target
, !=, 0, "missing target");
2092 ds
= _dispatch_source_from_refs(dr
);
2093 if (dr
->dt_timer
.target
> now
) {
2094 // Done running timers for now.
2097 if (dr
->du_fflags
& DISPATCH_TIMER_AFTER
) {
2098 _dispatch_trace_timer_fire(dr
, 1, 1);
2099 _dispatch_source_merge_evt(dr
, EV_ONESHOT
, 1, 0, 0);
2100 _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds
, dr
);
2101 _dispatch_object_debug(ds
, "%s", __func__
);
2105 data
= os_atomic_load2o(ds
, ds_pending_data
, relaxed
);
2106 if (unlikely(data
)) {
2107 // the release barrier is required to make the changes
2108 // to `ds_timer` visible to _dispatch_source_timer_data()
2109 if (os_atomic_cmpxchg2o(ds
, ds_pending_data
, data
,
2110 data
| DISPATCH_TIMER_MISSED_MARKER
, release
)) {
2111 _dispatch_timers_update(dr
, DISPATCH_TIMERS_UNREGISTER
);
2116 data
= _dispatch_source_timer_compute_missed(dr
, now
, 0);
2117 _dispatch_timers_update(dr
, DISPATCH_TIMERS_RETAIN_2
);
2118 pending_data
= data
<< 1;
2119 if (!_dispatch_unote_registered(dr
) && dr
->dt_timer
.target
< INT64_MAX
){
2120 // if we unregistered because of suspension we have to fake we
2122 pending_data
|= DISPATCH_TIMER_MISSED_MARKER
;
2123 os_atomic_store2o(ds
, ds_pending_data
, pending_data
, release
);
2125 os_atomic_store2o(ds
, ds_pending_data
, pending_data
, relaxed
);
2127 _dispatch_trace_timer_fire(dr
, data
, data
);
2128 _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds
, dr
);
2129 _dispatch_object_debug(ds
, "%s", __func__
);
2130 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
| DISPATCH_WAKEUP_CONSUME_2
);
2136 _dispatch_timers_run(dispatch_clock_now_cache_t nows
)
2139 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2140 if (_dispatch_timers_heap
[tidx
].dth_count
) {
2141 _dispatch_timers_run2(nows
, tidx
);
2146 #if DISPATCH_HAVE_TIMER_COALESCING
2147 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
2148 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
2150 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
2151 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
2152 #if DISPATCH_HAVE_TIMER_QOS
2153 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
2154 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
2157 #endif // DISPATCH_HAVE_TIMER_COALESCING
2159 static inline dispatch_timer_delay_s
2160 _dispatch_timers_get_delay(dispatch_timer_heap_t dth
, dispatch_clock_t clock
,
2161 uint32_t qos
, dispatch_clock_now_cache_t nows
)
2163 uint64_t target
= dth
->dth_target
, deadline
= dth
->dth_deadline
;
2164 uint64_t delta
= INT64_MAX
, dldelta
= INT64_MAX
;
2165 dispatch_timer_delay_s rc
;
2167 dispatch_assert(target
<= deadline
);
2168 if (delta
== 0 || target
>= INT64_MAX
) {
2172 if (qos
< DISPATCH_TIMER_QOS_COUNT
&& dth
->dth_count
> 2) {
2173 #if DISPATCH_HAVE_TIMER_COALESCING
2174 // Timer pre-coalescing <rdar://problem/13222034>
2175 // When we have several timers with this target/deadline bracket:
2177 // Target window Deadline
2179 // t1: [...........|.................]
2180 // t2: [......|.......]
2181 // t3: [..|..........]
2182 // t4: | [.............]
2186 // Coalescing works better if the Target is delayed to "Optimal", by
2187 // picking the latest target that isn't too close to the deadline.
2188 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
2189 if (target
+ window
< deadline
) {
2190 uint64_t latest
= deadline
- window
;
2191 target
= _dispatch_timer_heap_max_target_before(dth
, latest
);
2196 uint64_t now
= _dispatch_time_now_cached(clock
, nows
);
2197 if (target
<= now
) {
2203 uint64_t tmp
= target
- now
;
2204 if (clock
!= DISPATCH_CLOCK_WALL
) {
2205 tmp
= _dispatch_time_mach2nano(tmp
);
2211 tmp
= deadline
- now
;
2212 if (clock
!= DISPATCH_CLOCK_WALL
) {
2213 tmp
= _dispatch_time_mach2nano(tmp
);
2215 if (tmp
< dldelta
) {
2221 rc
.leeway
= delta
< INT64_MAX
? dldelta
- delta
: INT64_MAX
;
2226 _dispatch_timers_program2(dispatch_clock_now_cache_t nows
, uint32_t tidx
)
2228 uint32_t qos
= DISPATCH_TIMER_QOS(tidx
);
2229 dispatch_clock_t clock
= DISPATCH_TIMER_CLOCK(tidx
);
2230 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
2231 dispatch_timer_delay_s range
;
2233 range
= _dispatch_timers_get_delay(heap
, clock
, qos
, nows
);
2234 if (range
.delay
== 0 || range
.delay
>= INT64_MAX
) {
2235 _dispatch_trace_next_timer_set(NULL
, qos
);
2236 if (heap
->dth_flags
& DTH_ARMED
) {
2237 _dispatch_event_loop_timer_delete(tidx
);
2239 return range
.delay
== 0;
2242 _dispatch_trace_next_timer_set(heap
->dth_min
[DTH_TARGET_ID
], qos
);
2243 _dispatch_trace_next_timer_program(range
.delay
, qos
);
2244 _dispatch_event_loop_timer_arm(tidx
, range
, nows
);
2250 _dispatch_timers_program(dispatch_clock_now_cache_t nows
)
2253 uint32_t tidx
, timerm
= _dispatch_timers_processing_mask
;
2255 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2256 if (timerm
& (1 << tidx
)) {
2257 poll
|= _dispatch_timers_program2(nows
, tidx
);
2265 _dispatch_timers_configure(void)
2267 // Find out if there is a new target/deadline on the timer lists
2268 return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap
,
2269 countof(_dispatch_timers_heap
), _dispatch_timers_processing_mask
);
2273 _dispatch_mgr_timers(void)
2275 dispatch_clock_now_cache_s nows
= { };
2276 bool expired
= _dispatch_timers_expired
;
2277 if (unlikely(expired
)) {
2278 _dispatch_timers_run(&nows
);
2280 _dispatch_mgr_trace_timers_wakes();
2281 bool reconfigure
= _dispatch_timers_reconfigure
;
2282 if (unlikely(reconfigure
|| expired
)) {
2284 reconfigure
= _dispatch_timers_configure();
2285 _dispatch_timers_reconfigure
= false;
2287 if (reconfigure
|| expired
) {
2288 expired
= _dispatch_timers_expired
= _dispatch_timers_program(&nows
);
2290 _dispatch_timers_processing_mask
= 0;
2296 #pragma mark dispatch_mgr
2299 _dispatch_mgr_queue_push(dispatch_queue_t dq
, dispatch_object_t dou
,
2300 DISPATCH_UNUSED dispatch_qos_t qos
)
2303 _dispatch_trace_continuation_push(dq
, dou
._do
);
2304 if (unlikely(_dispatch_queue_push_update_tail(dq
, dou
._do
))) {
2305 _dispatch_queue_push_update_head(dq
, dou
._do
);
2306 dq_state
= os_atomic_or2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, release
);
2307 if (!_dq_state_drain_locked_by_self(dq_state
)) {
2308 _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER
, 0, 0);
2315 _dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq
,
2316 DISPATCH_UNUSED dispatch_qos_t qos
,
2317 DISPATCH_UNUSED dispatch_wakeup_flags_t flags
)
2319 DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
2322 #if DISPATCH_USE_MGR_THREAD
2323 DISPATCH_NOINLINE DISPATCH_NORETURN
2325 _dispatch_mgr_invoke(void)
2327 #if DISPATCH_EVENT_BACKEND_KEVENT
2328 dispatch_kevent_s evbuf
[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
];
2330 dispatch_deferred_items_s ddi
= {
2331 #if DISPATCH_EVENT_BACKEND_KEVENT
2332 .ddi_maxevents
= DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
,
2333 .ddi_eventlist
= evbuf
,
2338 _dispatch_deferred_items_set(&ddi
);
2340 _dispatch_mgr_queue_drain();
2341 poll
= _dispatch_mgr_timers();
2342 poll
= poll
|| _dispatch_queue_class_probe(&_dispatch_mgr_q
);
2343 _dispatch_event_loop_drain(poll
? KEVENT_FLAG_IMMEDIATE
: 0);
2346 #endif // DISPATCH_USE_MGR_THREAD
2350 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
,
2351 dispatch_invoke_context_t dic DISPATCH_UNUSED
,
2352 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
2354 #if DISPATCH_USE_KEVENT_WORKQUEUE
2355 if (_dispatch_kevent_workqueue_enabled
) {
2356 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2357 "kevent workqueue enabled");
2360 #if DISPATCH_USE_MGR_THREAD
2361 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2362 _dispatch_mgr_priority_init();
2363 _dispatch_queue_mgr_lock(&_dispatch_mgr_q
);
2364 // never returns, so burn bridges behind us & clear stack 2k ahead
2365 _dispatch_clear_stack(2048);
2366 _dispatch_mgr_invoke();
2370 #if DISPATCH_USE_KEVENT_WORKQUEUE
2372 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)
2374 _Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN
>=
2375 DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
,
2376 "our list should not be longer than the kernel's");
2378 DISPATCH_ALWAYS_INLINE
2379 static inline dispatch_priority_t
2380 _dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh
,
2381 dispatch_deferred_items_t ddi
)
2383 dispatch_assert(wlh
);
2384 dispatch_priority_t old_dbp
;
2386 pthread_priority_t pp
= _dispatch_get_priority();
2387 if (!(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2388 // If this thread does not have the event manager flag set, don't setup
2389 // as the dispatch manager and let the caller know to only process
2390 // the delivered events.
2392 // Also add the NEEDS_UNBIND flag so that
2393 // _dispatch_priority_compute_update knows it has to unbind
2394 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2395 if (wlh
== DISPATCH_WLH_ANON
) {
2396 pp
|= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2398 // pthread sets the flag when it is an event delivery thread
2399 // so we need to explicitly clear it
2400 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2402 _dispatch_thread_setspecific(dispatch_priority_key
,
2403 (void *)(uintptr_t)pp
);
2404 if (wlh
!= DISPATCH_WLH_ANON
) {
2405 _dispatch_debug("wlh[%p]: handling events", wlh
);
2407 ddi
->ddi_can_stash
= true;
2409 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
;
2412 if ((pp
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) ||
2413 !(pp
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2414 // When the phtread kext is delivering kevents to us, and pthread
2415 // root queues are in use, then the pthread priority TSD is set
2416 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2418 // Given that this isn't a valid QoS we need to fixup the TSD,
2419 // and the best option is to clear the qos/priority bits which tells
2420 // us to not do any QoS related calls on this thread.
2422 // However, in that case the manager thread is opted out of QoS,
2423 // as far as pthread is concerned, and can't be turned into
2424 // something else, so we can't stash.
2425 pp
&= (pthread_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
;
2427 // Managers always park without mutating to a regular worker thread, and
2428 // hence never need to unbind from userland, and when draining a manager,
2429 // the NEEDS_UNBIND flag would cause the mutation to happen.
2430 // So we need to strip this flag
2431 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2432 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
2434 // ensure kevents registered from this thread are registered at manager QoS
2435 old_dbp
= _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER
);
2436 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2437 _dispatch_queue_mgr_lock(&_dispatch_mgr_q
);
2441 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2443 _dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp
)
2445 bool needs_poll
= _dispatch_queue_mgr_unlock(&_dispatch_mgr_q
);
2446 _dispatch_reset_basepri(old_dbp
);
2447 _dispatch_reset_basepri_override();
2448 _dispatch_queue_set_current(NULL
);
2452 DISPATCH_ALWAYS_INLINE
2454 _dispatch_wlh_worker_thread(dispatch_wlh_t wlh
, dispatch_kevent_t events
,
2457 _dispatch_introspection_thread_add();
2458 DISPATCH_PERF_MON_VAR_INIT
2460 dispatch_deferred_items_s ddi
= {
2461 .ddi_eventlist
= events
,
2463 dispatch_priority_t old_dbp
;
2465 old_dbp
= _dispatch_wlh_worker_thread_init(wlh
, &ddi
);
2466 if (old_dbp
== DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
2467 _dispatch_perfmon_start_impl(true);
2469 dispatch_assert(wlh
== DISPATCH_WLH_ANON
);
2470 wlh
= DISPATCH_WLH_ANON
;
2472 _dispatch_deferred_items_set(&ddi
);
2473 _dispatch_event_loop_merge(events
, *nevents
);
2475 if (old_dbp
!= DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
2476 _dispatch_mgr_queue_drain();
2477 bool poll
= _dispatch_mgr_timers();
2478 if (_dispatch_wlh_worker_thread_reset(old_dbp
)) {
2481 if (poll
) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER
, 0, 0);
2482 } else if (ddi
.ddi_stashed_dou
._do
) {
2483 _dispatch_debug("wlh[%p]: draining deferred item %p", wlh
,
2484 ddi
.ddi_stashed_dou
._do
);
2485 if (wlh
== DISPATCH_WLH_ANON
) {
2486 dispatch_assert(ddi
.ddi_nevents
== 0);
2487 _dispatch_deferred_items_set(NULL
);
2488 _dispatch_root_queue_drain_deferred_item(&ddi
2489 DISPATCH_PERF_MON_ARGS
);
2491 _dispatch_root_queue_drain_deferred_wlh(&ddi
2492 DISPATCH_PERF_MON_ARGS
);
2496 _dispatch_deferred_items_set(NULL
);
2497 if (old_dbp
== DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
&&
2498 !ddi
.ddi_stashed_dou
._do
) {
2499 _dispatch_perfmon_end(perfmon_thread_event_no_steal
);
2501 _dispatch_debug("returning %d deferred kevents", ddi
.ddi_nevents
);
2502 *nevents
= ddi
.ddi_nevents
;
2507 _dispatch_kevent_worker_thread(dispatch_kevent_t
*events
, int *nevents
)
2509 if (!events
&& !nevents
) {
2510 // events for worker thread request have already been delivered earlier
2513 if (!dispatch_assume(*nevents
&& *events
)) return;
2514 _dispatch_adopt_wlh_anon();
2515 _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON
, *events
, nevents
);
2516 _dispatch_reset_wlh();
2520 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2522 #pragma mark dispatch_source_debug
2525 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
2527 dispatch_queue_t target
= ds
->do_targetq
;
2528 dispatch_source_refs_t dr
= ds
->ds_refs
;
2529 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%x, "
2530 "mask = 0x%x, pending_data = 0x%llx, registered = %d, "
2531 "armed = %d, deleted = %d%s, canceled = %d, ",
2532 target
&& target
->dq_label
? target
->dq_label
: "", target
,
2533 dr
->du_ident
, dr
->du_fflags
, (unsigned long long)ds
->ds_pending_data
,
2534 ds
->ds_is_installed
, (bool)(ds
->dq_atomic_flags
& DSF_ARMED
),
2535 (bool)(ds
->dq_atomic_flags
& DSF_DELETED
),
2536 (ds
->dq_atomic_flags
& DSF_DEFERRED_DELETE
) ? " (pending)" : "",
2537 (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
));
2541 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
2543 dispatch_timer_source_refs_t dr
= ds
->ds_timer_refs
;
2544 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx"
2545 ", interval = 0x%llx, flags = 0x%x }, ",
2546 (unsigned long long)dr
->dt_timer
.target
,
2547 (unsigned long long)dr
->dt_timer
.deadline
,
2548 (unsigned long long)dr
->dt_timer
.interval
, dr
->du_fflags
);
2552 _dispatch_source_debug(dispatch_source_t ds
, char *buf
, size_t bufsiz
)
2554 dispatch_source_refs_t dr
= ds
->ds_refs
;
2556 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2558 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2559 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2560 if (dr
->du_is_timer
) {
2561 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2563 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "kevent = %p%s, "
2564 "filter = %s }", dr
, dr
->du_is_direct
? " (direct)" : "",
2565 dr
->du_type
->dst_kind
);