2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 static void _dispatch_source_handler_free(dispatch_source_t ds
, long kind
);
24 static void _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
);
26 #define DISPATCH_TIMERS_UNREGISTER 0x1
27 #define DISPATCH_TIMERS_RETAIN_2 0x2
28 static void _dispatch_timers_update(dispatch_unote_t du
, uint32_t flags
);
29 static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt
);
31 static void _dispatch_source_timer_configure(dispatch_source_t ds
);
32 static inline unsigned long _dispatch_source_timer_data(
33 dispatch_source_t ds
, dispatch_unote_t du
);
36 #pragma mark dispatch_source_t
39 dispatch_source_create(dispatch_source_type_t dst
, uintptr_t handle
,
40 unsigned long mask
, dispatch_queue_t dq
)
42 dispatch_source_refs_t dr
;
45 dr
= dux_create(dst
, handle
, mask
)._dr
;
47 return DISPATCH_BAD_INPUT
;
50 ds
= _dispatch_object_alloc(DISPATCH_VTABLE(source
),
51 sizeof(struct dispatch_source_s
));
52 // Initialize as a queue first, then override some settings below.
53 _dispatch_queue_init(ds
->_as_dq
, DQF_LEGACY
, 1,
54 DISPATCH_QUEUE_INACTIVE
| DISPATCH_QUEUE_ROLE_INNER
);
55 ds
->dq_label
= "source";
56 ds
->do_ref_cnt
++; // the reference the manager queue holds
58 dr
->du_owner_wref
= _dispatch_ptr2wref(ds
);
61 dq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, true);
63 _dispatch_retain((dispatch_queue_t _Nonnull
)dq
);
66 if (dr
->du_is_timer
&& (dr
->du_fflags
& DISPATCH_TIMER_INTERVAL
)) {
67 _dispatch_source_set_interval(ds
, handle
);
69 _dispatch_object_debug(ds
, "%s", __func__
);
74 _dispatch_source_dispose(dispatch_source_t ds
, bool *allow_free
)
76 _dispatch_object_debug(ds
, "%s", __func__
);
77 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
78 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
79 _dispatch_source_handler_free(ds
, DS_CANCEL_HANDLER
);
80 _dispatch_unote_dispose(ds
->ds_refs
);
82 _dispatch_queue_destroy(ds
->_as_dq
, allow_free
);
86 _dispatch_source_xref_dispose(dispatch_source_t ds
)
88 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
89 if (unlikely(!(dqf
& (DQF_LEGACY
|DSF_CANCELED
)))) {
90 DISPATCH_CLIENT_CRASH(ds
, "Release of a source that has not been "
91 "cancelled, but has a mandatory cancel handler");
93 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
);
97 dispatch_source_testcancel(dispatch_source_t ds
)
99 return (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
);
103 dispatch_source_get_mask(dispatch_source_t ds
)
105 dispatch_source_refs_t dr
= ds
->ds_refs
;
106 if (ds
->dq_atomic_flags
& DSF_CANCELED
) {
109 #if DISPATCH_USE_MEMORYSTATUS
110 if (dr
->du_vmpressure_override
) {
111 return NOTE_VM_PRESSURE
;
113 #if TARGET_IPHONE_SIMULATOR
114 if (dr
->du_memorypressure_override
) {
115 return NOTE_MEMORYSTATUS_PRESSURE_WARN
;
118 #endif // DISPATCH_USE_MEMORYSTATUS
119 return dr
->du_fflags
;
123 dispatch_source_get_handle(dispatch_source_t ds
)
125 dispatch_source_refs_t dr
= ds
->ds_refs
;
126 #if TARGET_IPHONE_SIMULATOR
127 if (dr
->du_memorypressure_override
) {
135 dispatch_source_get_data(dispatch_source_t ds
)
137 #if DISPATCH_USE_MEMORYSTATUS
138 dispatch_source_refs_t dr
= ds
->ds_refs
;
139 if (dr
->du_vmpressure_override
) {
140 return NOTE_VM_PRESSURE
;
142 #if TARGET_IPHONE_SIMULATOR
143 if (dr
->du_memorypressure_override
) {
144 return NOTE_MEMORYSTATUS_PRESSURE_WARN
;
147 #endif // DISPATCH_USE_MEMORYSTATUS
148 uint64_t value
= os_atomic_load2o(ds
, ds_data
, relaxed
);
149 return (unsigned long)(
150 ds
->ds_refs
->du_data_action
== DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
151 ? DISPATCH_SOURCE_GET_DATA(value
) : value
);
155 dispatch_source_get_extended_data(dispatch_source_t ds
,
156 dispatch_source_extended_data_t edata
, size_t size
)
158 size_t target_size
= MIN(size
,
159 sizeof(struct dispatch_source_extended_data_s
));
161 unsigned long data
, status
= 0;
162 if (ds
->ds_refs
->du_data_action
163 == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
) {
164 uint64_t combined
= os_atomic_load(&ds
->ds_data
, relaxed
);
165 data
= DISPATCH_SOURCE_GET_DATA(combined
);
166 status
= DISPATCH_SOURCE_GET_STATUS(combined
);
168 data
= dispatch_source_get_data(ds
);
170 if (size
>= offsetof(struct dispatch_source_extended_data_s
, data
)
171 + sizeof(edata
->data
)) {
174 if (size
>= offsetof(struct dispatch_source_extended_data_s
, status
)
175 + sizeof(edata
->status
)) {
176 edata
->status
= status
;
178 if (size
> sizeof(struct dispatch_source_extended_data_s
)) {
180 (char *)edata
+ sizeof(struct dispatch_source_extended_data_s
),
181 0, size
- sizeof(struct dispatch_source_extended_data_s
));
189 _dispatch_source_merge_data(dispatch_source_t ds
, pthread_priority_t pp
,
192 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
193 int filter
= ds
->ds_refs
->du_filter
;
195 if (unlikely(dqf
& (DSF_CANCELED
| DSF_DELETED
))) {
200 case DISPATCH_EVFILT_CUSTOM_ADD
:
201 os_atomic_add2o(ds
, ds_pending_data
, val
, relaxed
);
203 case DISPATCH_EVFILT_CUSTOM_OR
:
204 os_atomic_or2o(ds
, ds_pending_data
, val
, relaxed
);
206 case DISPATCH_EVFILT_CUSTOM_REPLACE
:
207 os_atomic_store2o(ds
, ds_pending_data
, val
, relaxed
);
210 DISPATCH_CLIENT_CRASH(filter
, "Invalid source type");
213 dx_wakeup(ds
, _dispatch_qos_from_pp(pp
), DISPATCH_WAKEUP_MAKE_DIRTY
);
217 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
219 _dispatch_source_merge_data(ds
, 0, val
);
223 #pragma mark dispatch_source_handler
225 DISPATCH_ALWAYS_INLINE
226 static inline dispatch_continuation_t
227 _dispatch_source_get_handler(dispatch_source_refs_t dr
, long kind
)
229 return os_atomic_load(&dr
->ds_handler
[kind
], relaxed
);
231 #define _dispatch_source_get_event_handler(dr) \
232 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
233 #define _dispatch_source_get_cancel_handler(dr) \
234 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
235 #define _dispatch_source_get_registration_handler(dr) \
236 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
238 DISPATCH_ALWAYS_INLINE
239 static inline dispatch_continuation_t
240 _dispatch_source_handler_alloc(dispatch_source_t ds
, void *func
, long kind
,
243 // sources don't propagate priority by default
244 const dispatch_block_flags_t flags
=
245 DISPATCH_BLOCK_HAS_PRIORITY
| DISPATCH_BLOCK_NO_VOUCHER
;
246 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
248 uintptr_t dc_flags
= 0;
250 if (kind
!= DS_EVENT_HANDLER
) {
251 dc_flags
|= DISPATCH_OBJ_CONSUME_BIT
;
255 _dispatch_continuation_init(dc
, ds
, func
, 0, flags
, dc_flags
);
256 #endif /* __BLOCKS__ */
258 dc_flags
|= DISPATCH_OBJ_CTXT_FETCH_BIT
;
259 _dispatch_continuation_init_f(dc
, ds
, ds
->do_ctxt
, func
,
262 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
272 _dispatch_source_handler_dispose(dispatch_continuation_t dc
)
275 if (dc
->dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
276 Block_release(dc
->dc_ctxt
);
278 #endif /* __BLOCKS__ */
279 if (dc
->dc_voucher
) {
280 _voucher_release(dc
->dc_voucher
);
281 dc
->dc_voucher
= VOUCHER_INVALID
;
283 _dispatch_continuation_free(dc
);
286 DISPATCH_ALWAYS_INLINE
287 static inline dispatch_continuation_t
288 _dispatch_source_handler_take(dispatch_source_t ds
, long kind
)
290 return os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], NULL
, relaxed
);
293 DISPATCH_ALWAYS_INLINE
295 _dispatch_source_handler_free(dispatch_source_t ds
, long kind
)
297 dispatch_continuation_t dc
= _dispatch_source_handler_take(ds
, kind
);
298 if (dc
) _dispatch_source_handler_dispose(dc
);
301 DISPATCH_ALWAYS_INLINE
303 _dispatch_source_handler_replace(dispatch_source_t ds
, long kind
,
304 dispatch_continuation_t dc
)
307 _dispatch_continuation_free(dc
);
309 } else if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
310 dc
->dc_ctxt
= ds
->do_ctxt
;
312 dc
= os_atomic_xchg(&ds
->ds_refs
->ds_handler
[kind
], dc
, release
);
313 if (dc
) _dispatch_source_handler_dispose(dc
);
318 _dispatch_source_set_handler_slow(void *context
)
320 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
321 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
323 dispatch_continuation_t dc
= context
;
324 long kind
= (long)dc
->dc_data
;
326 _dispatch_source_handler_replace(ds
, kind
, dc
);
331 _dispatch_source_set_handler(dispatch_source_t ds
, long kind
,
332 dispatch_continuation_t dc
)
334 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
335 if (_dispatch_queue_try_inactive_suspend(ds
->_as_dq
)) {
336 _dispatch_source_handler_replace(ds
, kind
, dc
);
337 return dx_vtable(ds
)->do_resume(ds
, false);
339 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
340 DISPATCH_CLIENT_CRASH(kind
, "Cannot change a handler of this source "
341 "after it has been activated");
343 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation
, ds
);
344 if (kind
== DS_REGISTN_HANDLER
) {
345 _dispatch_bug_deprecated("Setting registration handler after "
346 "the source has been activated");
348 dc
->dc_data
= (void *)kind
;
349 _dispatch_barrier_trysync_or_async_f(ds
->_as_dq
, dc
,
350 _dispatch_source_set_handler_slow
, 0);
355 dispatch_source_set_event_handler(dispatch_source_t ds
,
356 dispatch_block_t handler
)
358 dispatch_continuation_t dc
;
359 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, true);
360 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
362 #endif /* __BLOCKS__ */
365 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
366 dispatch_function_t handler
)
368 dispatch_continuation_t dc
;
369 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_EVENT_HANDLER
, false);
370 _dispatch_source_set_handler(ds
, DS_EVENT_HANDLER
, dc
);
376 _dispatch_source_set_cancel_handler(dispatch_source_t ds
,
377 dispatch_block_t handler
)
379 dispatch_continuation_t dc
;
380 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, true);
381 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
385 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
386 dispatch_block_t handler
)
388 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
389 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
392 return _dispatch_source_set_cancel_handler(ds
, handler
);
396 dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds
,
397 dispatch_block_t handler
)
399 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DQF_LEGACY
);
400 return _dispatch_source_set_cancel_handler(ds
, handler
);
402 #endif /* __BLOCKS__ */
406 _dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
407 dispatch_function_t handler
)
409 dispatch_continuation_t dc
;
410 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_CANCEL_HANDLER
, false);
411 _dispatch_source_set_handler(ds
, DS_CANCEL_HANDLER
, dc
);
415 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
416 dispatch_function_t handler
)
418 if (unlikely(!_dispatch_queue_is_legacy(ds
->_as_dq
))) {
419 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
422 return _dispatch_source_set_cancel_handler_f(ds
, handler
);
426 dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds
,
427 dispatch_function_t handler
)
429 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DQF_LEGACY
);
430 return _dispatch_source_set_cancel_handler_f(ds
, handler
);
435 dispatch_source_set_registration_handler(dispatch_source_t ds
,
436 dispatch_block_t handler
)
438 dispatch_continuation_t dc
;
439 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, true);
440 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
442 #endif /* __BLOCKS__ */
445 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
446 dispatch_function_t handler
)
448 dispatch_continuation_t dc
;
449 dc
= _dispatch_source_handler_alloc(ds
, handler
, DS_REGISTN_HANDLER
, false);
450 _dispatch_source_set_handler(ds
, DS_REGISTN_HANDLER
, dc
);
454 #pragma mark dispatch_source_invoke
457 _dispatch_source_registration_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
458 dispatch_invoke_flags_t flags
)
460 dispatch_continuation_t dc
;
462 dc
= _dispatch_source_handler_take(ds
, DS_REGISTN_HANDLER
);
463 if (ds
->dq_atomic_flags
& (DSF_CANCELED
| DQF_RELEASED
)) {
464 // no registration callout if source is canceled rdar://problem/8955246
465 return _dispatch_source_handler_dispose(dc
);
467 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
468 dc
->dc_ctxt
= ds
->do_ctxt
;
470 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
474 _dispatch_source_cancel_callout(dispatch_source_t ds
, dispatch_queue_t cq
,
475 dispatch_invoke_flags_t flags
)
477 dispatch_continuation_t dc
;
479 dc
= _dispatch_source_handler_take(ds
, DS_CANCEL_HANDLER
);
480 ds
->ds_pending_data
= 0;
482 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
483 _dispatch_source_handler_free(ds
, DS_REGISTN_HANDLER
);
487 if (!(ds
->dq_atomic_flags
& DSF_CANCELED
)) {
488 return _dispatch_source_handler_dispose(dc
);
490 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
491 dc
->dc_ctxt
= ds
->do_ctxt
;
493 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
497 _dispatch_source_latch_and_call(dispatch_source_t ds
, dispatch_queue_t cq
,
498 dispatch_invoke_flags_t flags
)
500 dispatch_source_refs_t dr
= ds
->ds_refs
;
501 dispatch_continuation_t dc
= _dispatch_source_get_handler(dr
, DS_EVENT_HANDLER
);
504 if (dr
->du_is_timer
&& !(dr
->du_fflags
& DISPATCH_TIMER_AFTER
)) {
505 prev
= _dispatch_source_timer_data(ds
, dr
);
507 prev
= os_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
509 if (dr
->du_data_action
== DISPATCH_UNOTE_ACTION_DATA_SET
) {
514 if (!dispatch_assume(prev
!= 0) || !dc
) {
517 _dispatch_continuation_pop(dc
, NULL
, flags
, cq
);
518 if (dr
->du_is_timer
&& (dr
->du_fflags
& DISPATCH_TIMER_AFTER
)) {
519 _dispatch_source_handler_free(ds
, DS_EVENT_HANDLER
);
520 dispatch_release(ds
); // dispatch_after sources are one-shot
526 _dispatch_source_refs_finalize_unregistration(dispatch_source_t ds
)
528 dispatch_queue_flags_t dqf
;
529 dispatch_source_refs_t dr
= ds
->ds_refs
;
531 dqf
= _dispatch_queue_atomic_flags_set_and_clear_orig(ds
->_as_dq
,
532 DSF_DELETED
, DSF_ARMED
| DSF_DEFERRED_DELETE
| DSF_CANCEL_WAITER
);
533 if (dqf
& DSF_CANCEL_WAITER
) {
534 _dispatch_wake_by_address(&ds
->dq_atomic_flags
);
536 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dr
);
537 _dispatch_release_tailcall(ds
); // the retain is done at creation time
541 _dispatch_source_refs_unregister(dispatch_source_t ds
, uint32_t options
)
543 _dispatch_object_debug(ds
, "%s", __func__
);
544 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
545 dispatch_source_refs_t dr
= ds
->ds_refs
;
547 if (dr
->du_is_timer
) {
548 // Because of the optimization to unregister fired oneshot timers
549 // from the target queue, we can't trust _dispatch_unote_registered()
550 // to tell the truth, it may not have happened yet
551 if (dqf
& DSF_ARMED
) {
552 _dispatch_timers_unregister(ds
->ds_timer_refs
);
553 _dispatch_release_2(ds
);
555 dr
->du_ident
= DISPATCH_TIMER_IDENT_CANCELED
;
557 if (_dispatch_unote_needs_rearm(dr
) && !(dqf
& DSF_ARMED
)) {
558 options
|= DU_UNREGISTER_IMMEDIATE_DELETE
;
560 if (!_dispatch_unote_unregister(dr
, options
)) {
561 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
563 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DSF_DEFERRED_DELETE
);
564 return; // deferred unregistration
568 ds
->ds_is_installed
= true;
569 _dispatch_source_refs_finalize_unregistration(ds
);
572 DISPATCH_ALWAYS_INLINE
574 _dispatch_source_tryarm(dispatch_source_t ds
)
576 dispatch_queue_flags_t oqf
, nqf
;
577 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
578 if (oqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
)) {
579 // the test is inside the loop because it's convenient but the
580 // result should not change for the duration of the rmw_loop
581 os_atomic_rmw_loop_give_up(break);
583 nqf
= oqf
| DSF_ARMED
;
587 DISPATCH_ALWAYS_INLINE
589 _dispatch_source_refs_resume(dispatch_source_t ds
)
591 dispatch_source_refs_t dr
= ds
->ds_refs
;
592 if (dr
->du_is_timer
) {
593 _dispatch_timers_update(dr
, 0);
596 if (unlikely(!_dispatch_source_tryarm(ds
))) {
599 _dispatch_unote_resume(dr
);
600 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds
, dr
);
605 _dispatch_source_refs_register(dispatch_source_t ds
, dispatch_wlh_t wlh
,
606 dispatch_priority_t pri
)
608 dispatch_source_refs_t dr
= ds
->ds_refs
;
609 dispatch_priority_t kbp
;
611 dispatch_assert(!ds
->ds_is_installed
);
613 if (dr
->du_is_timer
) {
614 dispatch_queue_t dq
= ds
->_as_dq
;
615 kbp
= _dispatch_queue_compute_priority_and_wlh(dq
, NULL
);
616 // aggressively coalesce background/maintenance QoS timers
617 // <rdar://problem/12200216&27342536>
618 if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp
))) {
619 if (dr
->du_fflags
& DISPATCH_TIMER_STRICT
) {
620 _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer
, ds
);
622 dr
->du_fflags
|= DISPATCH_TIMER_BACKGROUND
;
623 dr
->du_ident
= _dispatch_source_timer_idx(dr
);
626 _dispatch_timers_update(dr
, 0);
630 if (unlikely(!_dispatch_source_tryarm(ds
) ||
631 !_dispatch_unote_register(dr
, wlh
, pri
))) {
632 // Do the parts of dispatch_source_refs_unregister() that
633 // are required after this partial initialization.
634 _dispatch_source_refs_finalize_unregistration(ds
);
636 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds
, dr
);
638 _dispatch_object_debug(ds
, "%s", __func__
);
642 _dispatch_source_set_event_handler_context(void *ctxt
)
644 dispatch_source_t ds
= ctxt
;
645 dispatch_continuation_t dc
= _dispatch_source_get_event_handler(ds
->ds_refs
);
647 if (dc
&& (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
)) {
648 dc
->dc_ctxt
= ds
->do_ctxt
;
652 DISPATCH_ALWAYS_INLINE
654 _dispatch_source_install(dispatch_source_t ds
, dispatch_wlh_t wlh
,
655 dispatch_priority_t pri
)
657 _dispatch_source_refs_register(ds
, wlh
, pri
);
658 ds
->ds_is_installed
= true;
662 _dispatch_source_finalize_activation(dispatch_source_t ds
, bool *allow_resume
)
664 dispatch_continuation_t dc
;
665 dispatch_source_refs_t dr
= ds
->ds_refs
;
666 dispatch_priority_t pri
;
669 if (unlikely(dr
->du_is_direct
&&
670 (_dispatch_queue_atomic_flags(ds
->_as_dq
) & DSF_CANCELED
))) {
671 return _dispatch_source_refs_unregister(ds
, 0);
674 dc
= _dispatch_source_get_event_handler(dr
);
676 if (_dispatch_object_is_barrier(dc
)) {
677 _dispatch_queue_atomic_flags_set(ds
->_as_dq
, DQF_BARRIER_BIT
);
679 ds
->dq_priority
= _dispatch_priority_from_pp_strip_flags(dc
->dc_priority
);
680 if (dc
->dc_flags
& DISPATCH_OBJ_CTXT_FETCH_BIT
) {
681 _dispatch_barrier_async_detached_f(ds
->_as_dq
, ds
,
682 _dispatch_source_set_event_handler_context
);
687 _dispatch_queue_finalize_activation(ds
->_as_dq
, allow_resume
);
689 if (dr
->du_is_direct
&& !ds
->ds_is_installed
) {
690 dispatch_queue_t dq
= ds
->_as_dq
;
691 pri
= _dispatch_queue_compute_priority_and_wlh(dq
, &wlh
);
692 if (pri
) _dispatch_source_install(ds
, wlh
, pri
);
696 DISPATCH_ALWAYS_INLINE
697 static inline dispatch_queue_wakeup_target_t
698 _dispatch_source_invoke2(dispatch_object_t dou
, dispatch_invoke_context_t dic
,
699 dispatch_invoke_flags_t flags
, uint64_t *owned
)
701 dispatch_source_t ds
= dou
._ds
;
702 dispatch_queue_wakeup_target_t retq
= DISPATCH_QUEUE_WAKEUP_NONE
;
703 dispatch_queue_t dq
= _dispatch_queue_get_current();
704 dispatch_source_refs_t dr
= ds
->ds_refs
;
705 dispatch_queue_flags_t dqf
;
707 if (!(flags
& DISPATCH_INVOKE_MANAGER_DRAIN
) &&
708 _dispatch_unote_wlh_changed(dr
, _dispatch_get_wlh())) {
709 dqf
= _dispatch_queue_atomic_flags_set_orig(ds
->_as_dq
,
711 if (!(dqf
& DSF_WLH_CHANGED
)) {
712 _dispatch_bug_deprecated("Changing target queue "
713 "hierarchy after source was activated");
717 if (_dispatch_queue_class_probe(ds
)) {
718 // Intentionally always drain even when on the manager queue
719 // and not the source's regular target queue: we need to be able
720 // to drain timer setting and the like there.
721 dispatch_with_disabled_narrowing(dic
, {
722 retq
= _dispatch_queue_serial_drain(ds
->_as_dq
, dic
, flags
, owned
);
726 // This function performs all source actions. Each action is responsible
727 // for verifying that it takes place on the appropriate queue. If the
728 // current queue is not the correct queue for this action, the correct queue
729 // will be returned and the invoke will be re-driven on that queue.
731 // The order of tests here in invoke and in wakeup should be consistent.
733 dispatch_queue_t dkq
= &_dispatch_mgr_q
;
734 bool prevent_starvation
= false;
736 if (dr
->du_is_direct
) {
737 dkq
= ds
->do_targetq
;
740 if (dr
->du_is_timer
&&
741 os_atomic_load2o(ds
, ds_timer_refs
->dt_pending_config
, relaxed
)) {
742 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
743 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
))) {
744 // timer has to be configured on the kevent queue
748 _dispatch_source_timer_configure(ds
);
752 if (!ds
->ds_is_installed
) {
753 // The source needs to be installed on the kevent queue.
757 _dispatch_source_install(ds
, _dispatch_get_wlh(),
758 _dispatch_get_basepri());
761 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
762 // Source suspended by an item drained from the source queue.
763 return ds
->do_targetq
;
766 if (_dispatch_source_get_registration_handler(dr
)) {
767 // The source has been registered and the registration handler needs
768 // to be delivered on the target queue.
769 if (dq
!= ds
->do_targetq
) {
770 return ds
->do_targetq
;
772 // clears ds_registration_handler
773 _dispatch_source_registration_callout(ds
, dq
, flags
);
776 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
777 if ((dqf
& DSF_DEFERRED_DELETE
) && !(dqf
& DSF_ARMED
)) {
779 // DSF_DELETE: Pending source kevent unregistration has been completed
780 // !DSF_ARMED: event was delivered and can safely be unregistered
784 _dispatch_source_refs_unregister(ds
, DU_UNREGISTER_IMMEDIATE_DELETE
);
785 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
788 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) &&
789 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
790 // The source has pending data to deliver via the event handler callback
791 // on the target queue. Some sources need to be rearmed on the kevent
792 // queue after event delivery.
793 if (dq
== ds
->do_targetq
) {
794 _dispatch_source_latch_and_call(ds
, dq
, flags
);
795 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
797 // starvation avoidance: if the source triggers itself then force a
798 // re-queue to give other things already queued on the target queue
801 // however, if the source is directly targeting an overcommit root
802 // queue, this would requeue the source and ask for a new overcommit
803 // thread right away.
804 prevent_starvation
= dq
->do_targetq
||
805 !(dq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
);
806 if (prevent_starvation
&&
807 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
808 retq
= ds
->do_targetq
;
811 // there is no point trying to be eager, the next thing to do is
812 // to deliver the event
813 return ds
->do_targetq
;
817 if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !(dqf
& DSF_DEFERRED_DELETE
)) {
818 // The source has been cancelled and needs to be uninstalled from the
819 // kevent queue. After uninstallation, the cancellation handler needs
820 // to be delivered to the target queue.
821 if (!(dqf
& DSF_DELETED
)) {
822 if (dr
->du_is_timer
&& !(dqf
& DSF_ARMED
)) {
823 // timers can cheat if not armed because there's nothing left
824 // to do on the manager queue and unregistration can happen
825 // on the regular target queue
826 } else if (dq
!= dkq
) {
829 _dispatch_source_refs_unregister(ds
, 0);
830 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
831 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
832 if (!(dqf
& DSF_ARMED
)) {
833 goto unregister_event
;
835 // we need to wait for the EV_DELETE
836 return retq
? retq
: DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT
;
839 if (dq
!= ds
->do_targetq
&& (_dispatch_source_get_event_handler(dr
) ||
840 _dispatch_source_get_cancel_handler(dr
) ||
841 _dispatch_source_get_registration_handler(dr
))) {
842 retq
= ds
->do_targetq
;
844 _dispatch_source_cancel_callout(ds
, dq
, flags
);
845 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
847 prevent_starvation
= false;
850 if (_dispatch_unote_needs_rearm(dr
) &&
851 !(dqf
& (DSF_ARMED
|DSF_DELETED
|DSF_CANCELED
|DQF_RELEASED
))) {
852 // The source needs to be rearmed on the kevent queue.
856 if (unlikely(dqf
& DSF_DEFERRED_DELETE
)) {
857 // no need for resume when we can directly unregister the kevent
858 goto unregister_event
;
860 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds
))) {
861 // do not try to rearm the kevent if the source is suspended
862 // from the source handler
863 return ds
->do_targetq
;
865 if (prevent_starvation
&& dr
->du_wlh
== DISPATCH_WLH_ANON
) {
866 // keep the old behavior to force re-enqueue to our target queue
869 // if the handler didn't run, or this is a pending delete
870 // or our target queue is a global queue, then starvation is
871 // not a concern and we can rearm right away.
872 return ds
->do_targetq
;
874 if (unlikely(!_dispatch_source_refs_resume(ds
))) {
875 goto unregister_event
;
877 if (!prevent_starvation
&& _dispatch_wlh_should_poll_unote(dr
)) {
878 // try to redrive the drain from under the lock for sources
879 // targeting an overcommit root queue to avoid parking
880 // when the next event has already fired
881 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE
);
890 _dispatch_source_invoke(dispatch_source_t ds
, dispatch_invoke_context_t dic
,
891 dispatch_invoke_flags_t flags
)
893 _dispatch_queue_class_invoke(ds
, dic
, flags
,
894 DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS
, _dispatch_source_invoke2
);
898 _dispatch_source_wakeup(dispatch_source_t ds
, dispatch_qos_t qos
,
899 dispatch_wakeup_flags_t flags
)
901 // This function determines whether the source needs to be invoked.
902 // The order of tests here in wakeup and in invoke should be consistent.
904 dispatch_source_refs_t dr
= ds
->ds_refs
;
905 dispatch_queue_wakeup_target_t dkq
= DISPATCH_QUEUE_WAKEUP_MGR
;
906 dispatch_queue_wakeup_target_t tq
= DISPATCH_QUEUE_WAKEUP_NONE
;
907 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
908 bool deferred_delete
= (dqf
& DSF_DEFERRED_DELETE
);
910 if (dr
->du_is_direct
) {
911 dkq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
914 if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && dr
->du_is_timer
&&
915 os_atomic_load2o(ds
, ds_timer_refs
->dt_pending_config
, relaxed
)) {
916 // timer has to be configured on the kevent queue
918 } else if (!ds
->ds_is_installed
) {
919 // The source needs to be installed on the kevent queue.
921 } else if (_dispatch_source_get_registration_handler(dr
)) {
922 // The registration handler needs to be delivered to the target queue.
923 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
924 } else if (deferred_delete
&& !(dqf
& DSF_ARMED
)) {
925 // Pending source kevent unregistration has been completed
926 // or EV_ONESHOT event can be acknowledged
928 } else if (!(dqf
& (DSF_CANCELED
| DQF_RELEASED
)) &&
929 os_atomic_load2o(ds
, ds_pending_data
, relaxed
)) {
930 // The source has pending data to deliver to the target queue.
931 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
932 } else if ((dqf
& (DSF_CANCELED
| DQF_RELEASED
)) && !deferred_delete
) {
933 // The source needs to be uninstalled from the kevent queue, or the
934 // cancellation handler needs to be delivered to the target queue.
935 // Note: cancellation assumes installation.
936 if (!(dqf
& DSF_DELETED
)) {
937 if (dr
->du_is_timer
&& !(dqf
& DSF_ARMED
)) {
938 // timers can cheat if not armed because there's nothing left
939 // to do on the manager queue and unregistration can happen
940 // on the regular target queue
941 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
945 } else if (_dispatch_source_get_event_handler(dr
) ||
946 _dispatch_source_get_cancel_handler(dr
) ||
947 _dispatch_source_get_registration_handler(dr
)) {
948 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
950 } else if (_dispatch_unote_needs_rearm(dr
) &&
951 !(dqf
& (DSF_ARMED
|DSF_DELETED
|DSF_CANCELED
|DQF_RELEASED
))) {
952 // The source needs to be rearmed on the kevent queue.
955 if (!tq
&& _dispatch_queue_class_probe(ds
)) {
956 tq
= DISPATCH_QUEUE_WAKEUP_TARGET
;
959 if ((tq
== DISPATCH_QUEUE_WAKEUP_TARGET
) &&
960 ds
->do_targetq
== &_dispatch_mgr_q
) {
961 tq
= DISPATCH_QUEUE_WAKEUP_MGR
;
964 return _dispatch_queue_class_wakeup(ds
->_as_dq
, qos
, flags
, tq
);
968 dispatch_source_cancel(dispatch_source_t ds
)
970 _dispatch_object_debug(ds
, "%s", __func__
);
971 // Right after we set the cancel flag, someone else
972 // could potentially invoke the source, do the cancellation,
973 // unregister the source, and deallocate it. We would
974 // need to therefore retain/release before setting the bit
975 _dispatch_retain_2(ds
);
977 dispatch_queue_t q
= ds
->_as_dq
;
978 if (_dispatch_queue_atomic_flags_set_orig(q
, DSF_CANCELED
) & DSF_CANCELED
) {
979 _dispatch_release_2_tailcall(ds
);
981 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
| DISPATCH_WAKEUP_CONSUME_2
);
986 dispatch_source_cancel_and_wait(dispatch_source_t ds
)
988 dispatch_queue_flags_t old_dqf
, dqf
, new_dqf
;
989 dispatch_source_refs_t dr
= ds
->ds_refs
;
991 if (unlikely(_dispatch_source_get_cancel_handler(dr
))) {
992 DISPATCH_CLIENT_CRASH(ds
, "Source has a cancel handler");
995 _dispatch_object_debug(ds
, "%s", __func__
);
996 os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
997 new_dqf
= old_dqf
| DSF_CANCELED
;
998 if (old_dqf
& DSF_CANCEL_WAITER
) {
999 os_atomic_rmw_loop_give_up(break);
1001 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1002 // just add DSF_CANCELED
1003 } else if ((old_dqf
& DSF_DEFERRED_DELETE
) || !dr
->du_is_direct
) {
1004 new_dqf
|= DSF_CANCEL_WAITER
;
1009 if (old_dqf
& DQF_RELEASED
) {
1010 DISPATCH_CLIENT_CRASH(ds
, "Dispatch source used after last release");
1012 if ((old_dqf
& DSF_STATE_MASK
) == DSF_DELETED
) {
1015 if (dqf
& DSF_CANCEL_WAITER
) {
1019 // simplified version of _dispatch_queue_drain_try_lock
1020 // that also sets the DIRTY bit on failure to lock
1021 uint64_t set_owner_and_set_full_width
= _dispatch_lock_value_for_self() |
1022 DISPATCH_QUEUE_WIDTH_FULL_BIT
| DISPATCH_QUEUE_IN_BARRIER
;
1023 uint64_t old_state
, new_state
;
1025 os_atomic_rmw_loop2o(ds
, dq_state
, old_state
, new_state
, seq_cst
, {
1026 new_state
= old_state
;
1027 if (likely(_dq_state_is_runnable(old_state
) &&
1028 !_dq_state_drain_locked(old_state
))) {
1029 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
1030 new_state
|= set_owner_and_set_full_width
;
1031 } else if (old_dqf
& DSF_CANCELED
) {
1032 os_atomic_rmw_loop_give_up(break);
1034 // this case needs a release barrier, hence the seq_cst above
1035 new_state
|= DISPATCH_QUEUE_DIRTY
;
1039 if (unlikely(_dq_state_is_suspended(old_state
))) {
1040 if (unlikely(_dq_state_suspend_cnt(old_state
))) {
1041 DISPATCH_CLIENT_CRASH(ds
, "Source is suspended");
1043 // inactive sources have never been registered and there is no need
1044 // to wait here because activation will notice and mark the source
1045 // as deleted without ever trying to use the fd or mach port.
1046 return dispatch_activate(ds
);
1049 if (likely(_dq_state_is_runnable(old_state
) &&
1050 !_dq_state_drain_locked(old_state
))) {
1051 // same thing _dispatch_source_invoke2() does when handling cancellation
1052 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1053 if (!(dqf
& (DSF_DEFERRED_DELETE
| DSF_DELETED
))) {
1054 _dispatch_source_refs_unregister(ds
, 0);
1055 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1056 if (likely((dqf
& DSF_STATE_MASK
) == DSF_DELETED
)) {
1057 _dispatch_source_cancel_callout(ds
, NULL
, DISPATCH_INVOKE_NONE
);
1060 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE
);
1061 } else if (unlikely(_dq_state_drain_locked_by_self(old_state
))) {
1062 DISPATCH_CLIENT_CRASH(ds
, "dispatch_source_cancel_and_wait "
1063 "called from a source handler");
1067 qos
= _dispatch_qos_from_pp(_dispatch_get_priority());
1068 dx_wakeup(ds
, qos
, DISPATCH_WAKEUP_MAKE_DIRTY
);
1069 dispatch_activate(ds
);
1072 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1073 while (unlikely((dqf
& DSF_STATE_MASK
) != DSF_DELETED
)) {
1074 if (unlikely(!(dqf
& DSF_CANCEL_WAITER
))) {
1075 if (!os_atomic_cmpxchgv2o(ds
, dq_atomic_flags
,
1076 dqf
, dqf
| DSF_CANCEL_WAITER
, &dqf
, relaxed
)) {
1079 dqf
|= DSF_CANCEL_WAITER
;
1081 _dispatch_wait_on_address(&ds
->dq_atomic_flags
, dqf
, DLOCK_LOCK_NONE
);
1082 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1087 _dispatch_source_merge_evt(dispatch_unote_t du
, uint32_t flags
, uintptr_t data
,
1088 uintptr_t status
, pthread_priority_t pp
)
1090 dispatch_source_refs_t dr
= du
._dr
;
1091 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
1092 dispatch_wakeup_flags_t wflags
= 0;
1093 dispatch_queue_flags_t dqf
;
1095 if (_dispatch_unote_needs_rearm(dr
) || (flags
& (EV_DELETE
| EV_ONESHOT
))) {
1096 // once we modify the queue atomic flags below, it will allow concurrent
1097 // threads running _dispatch_source_invoke2 to dispose of the source,
1098 // so we can't safely borrow the reference we get from the muxnote udata
1099 // anymore, and need our own
1100 wflags
= DISPATCH_WAKEUP_CONSUME_2
;
1101 _dispatch_retain_2(ds
); // rdar://20382435
1104 if ((flags
& EV_UDATA_SPECIFIC
) && (flags
& EV_ONESHOT
) &&
1105 !(flags
& EV_DELETE
)) {
1106 dqf
= _dispatch_queue_atomic_flags_set_and_clear(ds
->_as_dq
,
1107 DSF_DEFERRED_DELETE
, DSF_ARMED
);
1108 if (flags
& EV_VANISHED
) {
1109 _dispatch_bug_kevent_client("kevent", dr
->du_type
->dst_kind
,
1110 "monitored resource vanished before the source "
1111 "cancel handler was invoked", 0);
1113 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds
,
1114 (flags
& EV_VANISHED
) ? "vanished" :
1115 "deferred delete oneshot", dr
);
1116 } else if (flags
& (EV_DELETE
| EV_ONESHOT
)) {
1117 _dispatch_source_refs_unregister(ds
, DU_UNREGISTER_ALREADY_DELETED
);
1118 _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds
, dr
);
1119 if (flags
& EV_DELETE
) goto done
;
1120 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1121 } else if (_dispatch_unote_needs_rearm(dr
)) {
1122 dqf
= _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
1123 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds
, dr
);
1125 dqf
= _dispatch_queue_atomic_flags(ds
->_as_dq
);
1128 if (dqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1129 goto done
; // rdar://20204025
1132 dispatch_unote_action_t action
= dr
->du_data_action
;
1133 if ((flags
& EV_UDATA_SPECIFIC
) && (flags
& EV_ONESHOT
) &&
1134 (flags
& EV_VANISHED
)) {
1135 // if the resource behind the ident vanished, the event handler can't
1136 // do anything useful anymore, so do not try to call it at all
1138 // Note: if the kernel doesn't support EV_VANISHED we always get it
1139 // back unchanged from the flags passed at EV_ADD (registration) time
1140 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1141 // if we get both bits it was a real EV_VANISHED delivery
1142 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1144 } else if (dr
->du_filter
== EVFILT_MACHPORT
) {
1145 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1147 } else if (action
== DISPATCH_UNOTE_ACTION_DATA_SET
) {
1148 os_atomic_store2o(ds
, ds_pending_data
, data
, relaxed
);
1149 } else if (action
== DISPATCH_UNOTE_ACTION_DATA_ADD
) {
1150 os_atomic_add2o(ds
, ds_pending_data
, data
, relaxed
);
1151 } else if (data
&& action
== DISPATCH_UNOTE_ACTION_DATA_OR
) {
1152 os_atomic_or2o(ds
, ds_pending_data
, data
, relaxed
);
1153 } else if (data
&& action
== DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
) {
1154 // We combine the data and status into a single 64-bit value.
1155 uint64_t odata
, ndata
;
1156 uint64_t value
= DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data
, status
);
1157 os_atomic_rmw_loop2o(ds
, ds_pending_data
, odata
, ndata
, relaxed
, {
1158 ndata
= DISPATCH_SOURCE_GET_DATA(odata
) | value
;
1161 DISPATCH_INTERNAL_CRASH(action
, "Unexpected source action value");
1163 _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds
, dr
);
1166 _dispatch_object_debug(ds
, "%s", __func__
);
1167 dx_wakeup(ds
, _dispatch_qos_from_pp(pp
), wflags
| DISPATCH_WAKEUP_MAKE_DIRTY
);
1171 #pragma mark dispatch_source_timer
1173 #if DISPATCH_USE_DTRACE
1174 static dispatch_timer_source_refs_t
1175 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1176 #define _dispatch_trace_next_timer_set(x, q) \
1177 _dispatch_trace_next_timer[(q)] = (x)
1178 #define _dispatch_trace_next_timer_program(d, q) \
1179 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1180 DISPATCH_ALWAYS_INLINE
1182 _dispatch_mgr_trace_timers_wakes(void)
1186 if (_dispatch_timers_will_wake
) {
1187 if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
1188 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
1189 if (_dispatch_timers_will_wake
& (1 << qos
)) {
1190 _dispatch_trace_timer_wake(_dispatch_trace_next_timer
[qos
]);
1194 _dispatch_timers_will_wake
= 0;
1198 #define _dispatch_trace_next_timer_set(x, q)
1199 #define _dispatch_trace_next_timer_program(d, q)
1200 #define _dispatch_mgr_trace_timers_wakes()
1203 #define _dispatch_source_timer_telemetry_enabled() false
1207 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1208 dispatch_clock_t clock
, struct dispatch_timer_source_s
*values
)
1210 if (_dispatch_trace_timer_configure_enabled()) {
1211 _dispatch_trace_timer_configure(ds
, clock
, values
);
1215 DISPATCH_ALWAYS_INLINE
1217 _dispatch_source_timer_telemetry(dispatch_source_t ds
, dispatch_clock_t clock
,
1218 struct dispatch_timer_source_s
*values
)
1220 if (_dispatch_trace_timer_configure_enabled() ||
1221 _dispatch_source_timer_telemetry_enabled()) {
1222 _dispatch_source_timer_telemetry_slow(ds
, clock
, values
);
1223 asm(""); // prevent tailcall
1229 _dispatch_source_timer_configure(dispatch_source_t ds
)
1231 dispatch_timer_source_refs_t dt
= ds
->ds_timer_refs
;
1232 dispatch_timer_config_t dtc
;
1234 dtc
= os_atomic_xchg2o(dt
, dt_pending_config
, NULL
, dependency
);
1235 if (dtc
->dtc_clock
== DISPATCH_CLOCK_MACH
) {
1236 dt
->du_fflags
|= DISPATCH_TIMER_CLOCK_MACH
;
1238 dt
->du_fflags
&= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH
;
1240 dt
->dt_timer
= dtc
->dtc_timer
;
1242 if (ds
->ds_is_installed
) {
1243 // Clear any pending data that might have accumulated on
1244 // older timer params <rdar://problem/8574886>
1245 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
1246 _dispatch_timers_update(dt
, 0);
1250 static dispatch_timer_config_t
1251 _dispatch_source_timer_config_create(dispatch_time_t start
,
1252 uint64_t interval
, uint64_t leeway
)
1254 dispatch_timer_config_t dtc
;
1255 dtc
= _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s
));
1256 if (unlikely(interval
== 0)) {
1257 if (start
!= DISPATCH_TIME_FOREVER
) {
1258 _dispatch_bug_deprecated("Setting timer interval to 0 requests "
1259 "a 1ns timer, did you mean FOREVER (a one-shot timer)?");
1262 } else if ((int64_t)interval
< 0) {
1263 // 6866347 - make sure nanoseconds won't overflow
1264 interval
= INT64_MAX
;
1266 if ((int64_t)leeway
< 0) {
1269 if (start
== DISPATCH_TIME_NOW
) {
1270 start
= _dispatch_absolute_time();
1271 } else if (start
== DISPATCH_TIME_FOREVER
) {
1275 if ((int64_t)start
< 0) {
1277 start
= (dispatch_time_t
)-((int64_t)start
);
1278 dtc
->dtc_clock
= DISPATCH_CLOCK_WALL
;
1281 interval
= _dispatch_time_nano2mach(interval
);
1283 // rdar://problem/7287561 interval must be at least one in
1284 // in order to avoid later division by zero when calculating
1285 // the missed interval count. (NOTE: the wall clock's
1286 // interval is already "fixed" to be 1 or more)
1289 leeway
= _dispatch_time_nano2mach(leeway
);
1290 dtc
->dtc_clock
= DISPATCH_CLOCK_MACH
;
1292 if (interval
< INT64_MAX
&& leeway
> interval
/ 2) {
1293 leeway
= interval
/ 2;
1296 dtc
->dtc_timer
.target
= start
;
1297 dtc
->dtc_timer
.interval
= interval
;
1298 if (start
+ leeway
< INT64_MAX
) {
1299 dtc
->dtc_timer
.deadline
= start
+ leeway
;
1301 dtc
->dtc_timer
.deadline
= INT64_MAX
;
1308 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1309 uint64_t interval
, uint64_t leeway
)
1311 dispatch_timer_source_refs_t dt
= ds
->ds_timer_refs
;
1312 dispatch_timer_config_t dtc
;
1314 if (unlikely(!dt
->du_is_timer
|| (dt
->du_fflags
&DISPATCH_TIMER_INTERVAL
))) {
1315 DISPATCH_CLIENT_CRASH(ds
, "Attempt to set timer on a non-timer source");
1318 dtc
= _dispatch_source_timer_config_create(start
, interval
, leeway
);
1319 _dispatch_source_timer_telemetry(ds
, dtc
->dtc_clock
, &dtc
->dtc_timer
);
1320 dtc
= os_atomic_xchg2o(dt
, dt_pending_config
, dtc
, release
);
1322 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
);
1326 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1328 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1329 // approx 1 year (60s * 60m * 24h * 365d)
1330 #define FOREVER_NSEC 31536000000000000ull
1332 dispatch_timer_source_refs_t dr
= ds
->ds_timer_refs
;
1333 const bool animation
= dr
->du_fflags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1334 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1335 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1336 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1338 interval
= FOREVER_NSEC
;
1340 interval
= _dispatch_time_nano2mach(interval
);
1341 uint64_t target
= _dispatch_absolute_time() + interval
;
1342 target
-= (target
% interval
);
1343 const uint64_t leeway
= animation
?
1344 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1345 dr
->dt_timer
.target
= target
;
1346 dr
->dt_timer
.deadline
= target
+ leeway
;
1347 dr
->dt_timer
.interval
= interval
;
1348 _dispatch_source_timer_telemetry(ds
, DISPATCH_CLOCK_MACH
, &dr
->dt_timer
);
1352 #pragma mark dispatch_after
1354 DISPATCH_ALWAYS_INLINE
1356 _dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1357 void *ctxt
, void *handler
, bool block
)
1359 dispatch_timer_source_refs_t dt
;
1360 dispatch_source_t ds
;
1361 uint64_t leeway
, delta
;
1363 if (when
== DISPATCH_TIME_FOREVER
) {
1365 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
1370 delta
= _dispatch_timeout(when
);
1373 return dispatch_async(queue
, handler
);
1375 return dispatch_async_f(queue
, ctxt
, handler
);
1377 leeway
= delta
/ 10; // <rdar://problem/13447496>
1379 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
1380 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
1382 // this function can and should be optimized to not use a dispatch source
1383 ds
= dispatch_source_create(&_dispatch_source_type_after
, 0, 0, queue
);
1384 dt
= ds
->ds_timer_refs
;
1386 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
1388 _dispatch_continuation_init(dc
, ds
, handler
, 0, 0, 0);
1390 _dispatch_continuation_init_f(dc
, ds
, ctxt
, handler
, 0, 0, 0);
1392 // reference `ds` so that it doesn't show up as a leak
1394 _dispatch_trace_continuation_push(ds
->_as_dq
, dc
);
1395 os_atomic_store2o(dt
, ds_handler
[DS_EVENT_HANDLER
], dc
, relaxed
);
1397 if ((int64_t)when
< 0) {
1399 when
= (dispatch_time_t
)-((int64_t)when
);
1402 dt
->du_fflags
|= DISPATCH_TIMER_CLOCK_MACH
;
1403 leeway
= _dispatch_time_nano2mach(leeway
);
1405 dt
->dt_timer
.target
= when
;
1406 dt
->dt_timer
.interval
= UINT64_MAX
;
1407 dt
->dt_timer
.deadline
= when
+ leeway
;
1408 dispatch_activate(ds
);
1413 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
1414 dispatch_function_t func
)
1416 _dispatch_after(when
, queue
, ctxt
, func
, false);
1421 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1422 dispatch_block_t work
)
1424 _dispatch_after(when
, queue
, NULL
, work
, true);
1429 #pragma mark dispatch_timers
1432 * The dispatch_timer_heap_t structure is a double min-heap of timers,
1433 * interleaving the by-target min-heap in the even slots, and the by-deadline
1436 * The min element of these is held inline in the dispatch_timer_heap_t
1437 * structure, and further entries are held in segments.
1439 * dth_segments is the number of allocated segments.
1441 * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
1442 * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
1444 * Segment n (dth_segments - 1) is the last segment and points its final n
1445 * entries to previous segments. Its address is held in the `dth_heap` field.
1447 * segment n [ regular timer pointers | n-1 | k | 0 ]
1449 * segment n-1 <---------------------------' | |
1450 * segment k <--------------------------------' |
1451 * segment 0 <------------------------------------'
1453 #define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u
1456 * There are two min-heaps stored interleaved in a single array,
1457 * even indices are for the by-target min-heap, and odd indices for
1458 * the by-deadline one.
1460 #define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
1461 #define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
1462 #define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
1463 (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))
1465 DISPATCH_ALWAYS_INLINE
1466 static inline uint32_t
1467 _dispatch_timer_heap_capacity(uint32_t segments
)
1469 if (segments
== 0) return 2;
1470 uint32_t seg_no
= segments
- 1;
1471 // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
1472 // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
1473 return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
<< seg_no
) - seg_no
;
1478 _dispatch_timer_heap_grow(dispatch_timer_heap_t dth
)
1480 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1481 uint32_t seg_no
= dth
->dth_segments
++;
1482 void **heap
, **heap_prev
= dth
->dth_heap
;
1485 seg_capacity
<<= (seg_no
- 1);
1487 heap
= _dispatch_calloc(seg_capacity
, sizeof(void *));
1489 uint32_t prev_seg_no
= seg_no
- 1;
1490 uint32_t prev_seg_capacity
= seg_capacity
>> 1;
1491 memcpy(&heap
[seg_capacity
- prev_seg_no
],
1492 &heap_prev
[prev_seg_capacity
- prev_seg_no
],
1493 prev_seg_no
* sizeof(void *));
1496 heap
[seg_capacity
- seg_no
] = heap_prev
;
1498 dth
->dth_heap
= heap
;
1503 _dispatch_timer_heap_shrink(dispatch_timer_heap_t dth
)
1505 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1506 uint32_t seg_no
= --dth
->dth_segments
;
1507 void **heap
= dth
->dth_heap
, **heap_prev
= NULL
;
1510 seg_capacity
<<= (seg_no
- 1);
1511 heap_prev
= heap
[seg_capacity
- seg_no
];
1514 uint32_t prev_seg_no
= seg_no
- 1;
1515 uint32_t prev_seg_capacity
= seg_capacity
>> 1;
1516 memcpy(&heap_prev
[prev_seg_capacity
- prev_seg_no
],
1517 &heap
[seg_capacity
- prev_seg_no
],
1518 prev_seg_no
* sizeof(void *));
1520 dth
->dth_heap
= heap_prev
;
1524 DISPATCH_ALWAYS_INLINE
1525 static inline dispatch_timer_source_refs_t
*
1526 _dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth
, uint32_t idx
)
1528 uint32_t seg_no
, segments
= dth
->dth_segments
;
1531 if (idx
< DTH_ID_COUNT
) {
1532 return &dth
->dth_min
[idx
];
1534 idx
-= DTH_ID_COUNT
;
1536 // Derive the segment number from the index. Naming
1537 // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
1539 // 1: C .. 2 * C - 1
1540 // k: 2^(k-1) * C .. 2^k * C - 1
1541 // so `k` can be derived from the first bit set in `idx`
1542 seg_no
= (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
- 1) -
1543 __builtin_clz(idx
| (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
- 1)));
1544 if (seg_no
+ 1 == segments
) {
1545 segment
= dth
->dth_heap
;
1547 uint32_t seg_capacity
= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
;
1548 seg_capacity
<<= (segments
- 2);
1549 segment
= dth
->dth_heap
[seg_capacity
- seg_no
- 1];
1552 idx
-= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY
<< (seg_no
- 1);
1554 return (dispatch_timer_source_refs_t
*)(segment
+ idx
);
1557 DISPATCH_ALWAYS_INLINE
1559 _dispatch_timer_heap_set(dispatch_timer_source_refs_t
*slot
,
1560 dispatch_timer_source_refs_t dt
, uint32_t idx
)
1563 dt
->dt_heap_entry
[DTH_HEAP_ID(idx
)] = idx
;
1566 DISPATCH_ALWAYS_INLINE
1567 static inline uint32_t
1568 _dispatch_timer_heap_parent(uint32_t idx
)
1570 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1571 idx
= (idx
- DTH_ID_COUNT
) / 2; // go to the parent
1572 return DTH_IDX_FOR_HEAP_ID(idx
, heap_id
);
1575 DISPATCH_ALWAYS_INLINE
1576 static inline uint32_t
1577 _dispatch_timer_heap_left_child(uint32_t idx
)
1579 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1580 // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
1581 return 2 * idx
+ DTH_ID_COUNT
- heap_id
;
1584 #if DISPATCH_HAVE_TIMER_COALESCING
1585 DISPATCH_ALWAYS_INLINE
1586 static inline uint32_t
1587 _dispatch_timer_heap_walk_skip(uint32_t idx
, uint32_t count
)
1589 uint32_t heap_id
= DTH_HEAP_ID(idx
);
1592 if (unlikely(idx
+ DTH_ID_COUNT
== count
)) {
1593 // reaching `count` doesn't mean we're done, but there is a weird
1594 // corner case if the last item of the heap is a left child:
1602 // The formula below would return the sibling of `idx` which is
1603 // out of bounds. Fortunately, the correct answer is the same
1604 // as for idx's parent
1605 idx
= _dispatch_timer_heap_parent(idx
);
1609 // When considering the index in a non interleaved, 1-based array
1610 // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
1611 // for a given idx in our dual-heaps, that index is in one of two forms:
1613 // (a) 1xxxx011111 or (b) 111111111
1616 // The first bit set is the row of the binary tree node (0-based).
1617 // The following digits from most to least significant represent the path
1618 // to that node, where `0` is a left turn and `1` a right turn.
1620 // For example 0b0101 (5) is a node on row 2 accessed going left then right:
1629 // Skipping a sub-tree in walk order means going to the sibling of the last
1630 // node reached after we turned left. If the node was of the form (a),
1631 // this node is 1xxxx1, which for the above example is 0b0011 (3).
1632 // If the node was of the form (b) then we never took a left, meaning
1633 // we reached the last element in traversal order.
1638 // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
1639 // - which is offset by log_2(DTH_ID_COUNT) from the position of the least
1640 // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1641 // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
1642 // - which in turn is the same as the position of the least significant 1 in
1643 // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1645 dispatch_static_assert(powerof2(DTH_ID_COUNT
));
1646 idx
+= DTH_ID_COUNT
+ DTH_ID_COUNT
- 1;
1647 idx
>>= __builtin_ctz(~idx
);
1650 // `idx` is now either:
1651 // - 0 if it was the (b) case above, in which case the walk is done
1652 // - 1xxxx0 as the position in a 0 based array representation of a non
1653 // interleaved heap, so we just have to compute the interleaved index.
1655 return likely(idx
) ? DTH_ID_COUNT
* idx
+ heap_id
: UINT32_MAX
;
1658 DISPATCH_ALWAYS_INLINE
1659 static inline uint32_t
1660 _dispatch_timer_heap_walk_next(uint32_t idx
, uint32_t count
)
1663 // Goes to the next element in heap walk order, which is the prefix ordered
1664 // walk of the tree.
1666 // From a given node, the next item to return is the left child if it
1667 // exists, else the first right sibling we find by walking our parent chain,
1668 // which is exactly what _dispatch_timer_heap_walk_skip() returns.
1670 uint32_t lchild
= _dispatch_timer_heap_left_child(idx
);
1671 if (lchild
< count
) {
1674 return _dispatch_timer_heap_walk_skip(idx
, count
);
1679 _dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth
, uint64_t limit
)
1681 dispatch_timer_source_refs_t dri
;
1682 uint32_t idx
= _dispatch_timer_heap_left_child(DTH_TARGET_ID
);
1683 uint32_t count
= dth
->dth_count
;
1684 uint64_t tmp
, target
= dth
->dth_min
[DTH_TARGET_ID
]->dt_timer
.target
;
1686 while (idx
< count
) {
1687 dri
= *_dispatch_timer_heap_get_slot(dth
, idx
);
1688 tmp
= dri
->dt_timer
.target
;
1690 // skip subtree since none of the targets below can be before limit
1691 idx
= _dispatch_timer_heap_walk_skip(idx
, count
);
1694 idx
= _dispatch_timer_heap_walk_next(idx
, count
);
1699 #endif // DISPATCH_HAVE_TIMER_COALESCING
1703 _dispatch_timer_heap_resift(dispatch_timer_heap_t dth
,
1704 dispatch_timer_source_refs_t dt
, uint32_t idx
)
1706 dispatch_static_assert(offsetof(struct dispatch_timer_source_s
, target
) ==
1707 offsetof(struct dispatch_timer_source_s
, heap_key
[DTH_TARGET_ID
]));
1708 dispatch_static_assert(offsetof(struct dispatch_timer_source_s
, deadline
) ==
1709 offsetof(struct dispatch_timer_source_s
, heap_key
[DTH_DEADLINE_ID
]));
1710 #define dth_cmp(hid, dt1, op, dt2) \
1711 (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])
1713 dispatch_timer_source_refs_t
*pslot
, pdt
;
1714 dispatch_timer_source_refs_t
*cslot
, cdt
;
1715 dispatch_timer_source_refs_t
*rslot
, rdt
;
1716 uint32_t cidx
, dth_count
= dth
->dth_count
;
1717 dispatch_timer_source_refs_t
*slot
;
1718 int heap_id
= DTH_HEAP_ID(idx
);
1719 bool sifted_up
= false;
1723 slot
= _dispatch_timer_heap_get_slot(dth
, idx
);
1724 while (idx
>= DTH_ID_COUNT
) {
1725 uint32_t pidx
= _dispatch_timer_heap_parent(idx
);
1726 pslot
= _dispatch_timer_heap_get_slot(dth
, pidx
);
1728 if (dth_cmp(heap_id
, pdt
, <=, dt
)) {
1731 _dispatch_timer_heap_set(slot
, pdt
, idx
);
1742 while ((cidx
= _dispatch_timer_heap_left_child(idx
)) < dth_count
) {
1743 uint32_t ridx
= cidx
+ DTH_ID_COUNT
;
1744 cslot
= _dispatch_timer_heap_get_slot(dth
, cidx
);
1746 if (ridx
< dth_count
) {
1747 rslot
= _dispatch_timer_heap_get_slot(dth
, ridx
);
1749 if (dth_cmp(heap_id
, cdt
, >, rdt
)) {
1755 if (dth_cmp(heap_id
, dt
, <=, cdt
)) {
1758 _dispatch_timer_heap_set(slot
, cdt
, idx
);
1764 _dispatch_timer_heap_set(slot
, dt
, idx
);
1768 DISPATCH_ALWAYS_INLINE
1770 _dispatch_timer_heap_insert(dispatch_timer_heap_t dth
,
1771 dispatch_timer_source_refs_t dt
)
1773 uint32_t idx
= (dth
->dth_count
+= DTH_ID_COUNT
) - DTH_ID_COUNT
;
1775 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], ==,
1776 DTH_INVALID_ID
, "target idx");
1777 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], ==,
1778 DTH_INVALID_ID
, "deadline idx");
1781 dt
->dt_heap_entry
[DTH_TARGET_ID
] = DTH_TARGET_ID
;
1782 dt
->dt_heap_entry
[DTH_DEADLINE_ID
] = DTH_DEADLINE_ID
;
1783 dth
->dth_min
[DTH_TARGET_ID
] = dth
->dth_min
[DTH_DEADLINE_ID
] = dt
;
1787 if (unlikely(idx
+ DTH_ID_COUNT
>
1788 _dispatch_timer_heap_capacity(dth
->dth_segments
))) {
1789 _dispatch_timer_heap_grow(dth
);
1791 _dispatch_timer_heap_resift(dth
, dt
, idx
+ DTH_TARGET_ID
);
1792 _dispatch_timer_heap_resift(dth
, dt
, idx
+ DTH_DEADLINE_ID
);
1797 _dispatch_timer_heap_remove(dispatch_timer_heap_t dth
,
1798 dispatch_timer_source_refs_t dt
)
1800 uint32_t idx
= (dth
->dth_count
-= DTH_ID_COUNT
);
1802 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], !=,
1803 DTH_INVALID_ID
, "target idx");
1804 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], !=,
1805 DTH_INVALID_ID
, "deadline idx");
1808 DISPATCH_TIMER_ASSERT(dth
->dth_min
[DTH_TARGET_ID
], ==, dt
,
1810 DISPATCH_TIMER_ASSERT(dth
->dth_min
[DTH_DEADLINE_ID
], ==, dt
,
1812 dth
->dth_min
[DTH_TARGET_ID
] = dth
->dth_min
[DTH_DEADLINE_ID
] = NULL
;
1813 goto clear_heap_entry
;
1816 for (uint32_t heap_id
= 0; heap_id
< DTH_ID_COUNT
; heap_id
++) {
1817 dispatch_timer_source_refs_t
*slot
, last_dt
;
1818 slot
= _dispatch_timer_heap_get_slot(dth
, idx
+ heap_id
);
1819 last_dt
= *slot
; *slot
= NULL
;
1820 if (last_dt
!= dt
) {
1821 uint32_t removed_idx
= dt
->dt_heap_entry
[heap_id
];
1822 _dispatch_timer_heap_resift(dth
, last_dt
, removed_idx
);
1825 if (unlikely(idx
<= _dispatch_timer_heap_capacity(dth
->dth_segments
- 1))) {
1826 _dispatch_timer_heap_shrink(dth
);
1830 dt
->dt_heap_entry
[DTH_TARGET_ID
] = DTH_INVALID_ID
;
1831 dt
->dt_heap_entry
[DTH_DEADLINE_ID
] = DTH_INVALID_ID
;
1834 DISPATCH_ALWAYS_INLINE
1836 _dispatch_timer_heap_update(dispatch_timer_heap_t dth
,
1837 dispatch_timer_source_refs_t dt
)
1839 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_TARGET_ID
], !=,
1840 DTH_INVALID_ID
, "target idx");
1841 DISPATCH_TIMER_ASSERT(dt
->dt_heap_entry
[DTH_DEADLINE_ID
], !=,
1842 DTH_INVALID_ID
, "deadline idx");
1845 _dispatch_timer_heap_resift(dth
, dt
, dt
->dt_heap_entry
[DTH_TARGET_ID
]);
1846 _dispatch_timer_heap_resift(dth
, dt
, dt
->dt_heap_entry
[DTH_DEADLINE_ID
]);
1849 DISPATCH_ALWAYS_INLINE
1851 _dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth
,
1852 uint32_t count
, uint32_t mask
)
1854 dispatch_timer_source_refs_t dt
;
1855 bool changed
= false;
1859 for (tidx
= 0; tidx
< count
; tidx
++) {
1860 if (!(mask
& (1u << tidx
))) {
1864 dt
= dth
[tidx
].dth_min
[DTH_TARGET_ID
];
1865 tmp
= dt
? dt
->dt_timer
.target
: UINT64_MAX
;
1866 if (dth
[tidx
].dth_target
!= tmp
) {
1867 dth
[tidx
].dth_target
= tmp
;
1870 dt
= dth
[tidx
].dth_min
[DTH_DEADLINE_ID
];
1871 tmp
= dt
? dt
->dt_timer
.deadline
: UINT64_MAX
;
1872 if (dth
[tidx
].dth_deadline
!= tmp
) {
1873 dth
[tidx
].dth_deadline
= tmp
;
1881 _dispatch_timers_unregister(dispatch_timer_source_refs_t dt
)
1883 uint32_t tidx
= dt
->du_ident
;
1884 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
1886 _dispatch_timer_heap_remove(heap
, dt
);
1887 _dispatch_timers_reconfigure
= true;
1888 _dispatch_timers_processing_mask
|= 1 << tidx
;
1889 dispatch_assert(dt
->du_wlh
== NULL
|| dt
->du_wlh
== DISPATCH_WLH_ANON
);
1894 _dispatch_timers_register(dispatch_timer_source_refs_t dt
, uint32_t tidx
)
1896 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
1897 if (_dispatch_unote_registered(dt
)) {
1898 DISPATCH_TIMER_ASSERT(dt
->du_ident
, ==, tidx
, "tidx");
1899 _dispatch_timer_heap_update(heap
, dt
);
1901 dt
->du_ident
= tidx
;
1902 _dispatch_timer_heap_insert(heap
, dt
);
1904 _dispatch_timers_reconfigure
= true;
1905 _dispatch_timers_processing_mask
|= 1 << tidx
;
1906 dispatch_assert(dt
->du_wlh
== NULL
|| dt
->du_wlh
== DISPATCH_WLH_ANON
);
1907 dt
->du_wlh
= DISPATCH_WLH_ANON
;
1910 DISPATCH_ALWAYS_INLINE
1912 _dispatch_source_timer_tryarm(dispatch_source_t ds
)
1914 dispatch_queue_flags_t oqf
, nqf
;
1915 return os_atomic_rmw_loop2o(ds
, dq_atomic_flags
, oqf
, nqf
, relaxed
, {
1916 if (oqf
& (DSF_CANCELED
| DQF_RELEASED
)) {
1917 // do not install a cancelled timer
1918 os_atomic_rmw_loop_give_up(break);
1920 nqf
= oqf
| DSF_ARMED
;
1924 // Updates the ordered list of timers based on next fire date for changes to ds.
1925 // Should only be called from the context of _dispatch_mgr_q.
1927 _dispatch_timers_update(dispatch_unote_t du
, uint32_t flags
)
1929 dispatch_timer_source_refs_t dr
= du
._dt
;
1930 dispatch_source_t ds
= _dispatch_source_from_refs(dr
);
1931 const char *verb
= "updated";
1932 bool will_register
, disarm
= false;
1934 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1936 if (unlikely(dr
->du_ident
== DISPATCH_TIMER_IDENT_CANCELED
)) {
1937 dispatch_assert((flags
& DISPATCH_TIMERS_RETAIN_2
) == 0);
1941 // Unregister timers that are unconfigured, disabled, suspended or have
1942 // missed intervals. Rearm after dispatch_set_timer(), resume or source
1943 // invoke will reenable them
1944 will_register
= !(flags
& DISPATCH_TIMERS_UNREGISTER
) &&
1945 dr
->dt_timer
.target
< INT64_MAX
&&
1946 !os_atomic_load2o(ds
, ds_pending_data
, relaxed
) &&
1947 !DISPATCH_QUEUE_IS_SUSPENDED(ds
) &&
1948 !os_atomic_load2o(dr
, dt_pending_config
, relaxed
);
1949 if (likely(!_dispatch_unote_registered(dr
))) {
1950 dispatch_assert((flags
& DISPATCH_TIMERS_RETAIN_2
) == 0);
1951 if (unlikely(!will_register
|| !_dispatch_source_timer_tryarm(ds
))) {
1955 } else if (unlikely(!will_register
)) {
1960 // The heap owns a +2 on dispatch sources it references
1962 // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
1963 // when it wants to take over this +2 at the same time we are unregistering
1964 // the timer from the heap.
1966 // Compute our refcount balance according to these rules, if our balance
1967 // would become negative we retain the source upfront, if it is positive, we
1968 // get rid of the extraneous refcounts after we're done touching the source.
1969 int refs
= will_register
? -2 : 0;
1970 if (_dispatch_unote_registered(dr
) && !(flags
& DISPATCH_TIMERS_RETAIN_2
)) {
1974 dispatch_assert(refs
== -2);
1975 _dispatch_retain_2(ds
);
1978 uint32_t tidx
= _dispatch_source_timer_idx(dr
);
1979 if (unlikely(_dispatch_unote_registered(dr
) &&
1980 (!will_register
|| dr
->du_ident
!= tidx
))) {
1981 _dispatch_timers_unregister(dr
);
1983 if (likely(will_register
)) {
1984 _dispatch_timers_register(dr
, tidx
);
1988 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
1990 _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds
, verb
, dr
);
1991 _dispatch_object_debug(ds
, "%s", __func__
);
1993 dispatch_assert(refs
== 2);
1994 _dispatch_release_2_tailcall(ds
);
1998 #define DISPATCH_TIMER_MISSED_MARKER 1ul
2000 DISPATCH_ALWAYS_INLINE
2001 static inline unsigned long
2002 _dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt
,
2003 uint64_t now
, unsigned long prev
)
2005 uint64_t missed
= (now
- dt
->dt_timer
.target
) / dt
->dt_timer
.interval
;
2006 if (++missed
+ prev
> LONG_MAX
) {
2007 missed
= LONG_MAX
- prev
;
2009 if (dt
->dt_timer
.interval
< INT64_MAX
) {
2010 uint64_t push_by
= missed
* dt
->dt_timer
.interval
;
2011 dt
->dt_timer
.target
+= push_by
;
2012 dt
->dt_timer
.deadline
+= push_by
;
2014 dt
->dt_timer
.target
= UINT64_MAX
;
2015 dt
->dt_timer
.deadline
= UINT64_MAX
;
2021 DISPATCH_ALWAYS_INLINE
2022 static inline unsigned long
2023 _dispatch_source_timer_data(dispatch_source_t ds
, dispatch_unote_t du
)
2025 dispatch_timer_source_refs_t dr
= du
._dt
;
2026 unsigned long data
, prev
, clear_prev
= 0;
2028 os_atomic_rmw_loop2o(ds
, ds_pending_data
, prev
, clear_prev
, relaxed
, {
2030 if (unlikely(prev
& DISPATCH_TIMER_MISSED_MARKER
)) {
2031 os_atomic_rmw_loop_give_up(goto handle_missed_intervals
);
2036 handle_missed_intervals
:
2037 // The timer may be in _dispatch_source_invoke2() already for other
2038 // reasons such as running the registration handler when ds_pending_data
2039 // is changed by _dispatch_timers_run2() without holding the drain lock.
2041 // We hence need dependency ordering to pair with the release barrier
2042 // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
2043 os_atomic_thread_fence(dependency
);
2044 dr
= os_atomic_force_dependency_on(dr
, data
);
2046 uint64_t now
= _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr
->du_ident
));
2047 if (now
>= dr
->dt_timer
.target
) {
2048 OS_COMPILER_CAN_ASSUME(dr
->dt_timer
.interval
< INT64_MAX
);
2049 data
= _dispatch_source_timer_compute_missed(dr
, now
, data
);
2052 // When we see the MISSED_MARKER the manager has given up on this timer
2053 // and expects the handler to call "resume".
2055 // However, it may not have reflected this into the atomic flags yet
2056 // so make sure _dispatch_source_invoke2() sees the timer is disarmed
2058 // The subsequent _dispatch_source_refs_resume() will enqueue the source
2059 // on the manager and make the changes to `ds_timer` above visible.
2060 _dispatch_queue_atomic_flags_clear(ds
->_as_dq
, DSF_ARMED
);
2061 os_atomic_store2o(ds
, ds_pending_data
, 0, relaxed
);
2066 _dispatch_timers_run2(dispatch_clock_now_cache_t nows
, uint32_t tidx
)
2068 dispatch_timer_source_refs_t dr
;
2069 dispatch_source_t ds
;
2070 uint64_t data
, pending_data
;
2071 uint64_t now
= _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx
), nows
);
2073 while ((dr
= _dispatch_timers_heap
[tidx
].dth_min
[DTH_TARGET_ID
])) {
2074 DISPATCH_TIMER_ASSERT(dr
->du_filter
, ==, DISPATCH_EVFILT_TIMER
,
2076 DISPATCH_TIMER_ASSERT(dr
->du_ident
, ==, tidx
, "tidx");
2077 DISPATCH_TIMER_ASSERT(dr
->dt_timer
.target
, !=, 0, "missing target");
2078 ds
= _dispatch_source_from_refs(dr
);
2079 if (dr
->dt_timer
.target
> now
) {
2080 // Done running timers for now.
2083 if (dr
->du_fflags
& DISPATCH_TIMER_AFTER
) {
2084 _dispatch_trace_timer_fire(dr
, 1, 1);
2085 _dispatch_source_merge_evt(dr
, EV_ONESHOT
, 1, 0, 0);
2086 _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds
, dr
);
2087 _dispatch_object_debug(ds
, "%s", __func__
);
2091 data
= os_atomic_load2o(ds
, ds_pending_data
, relaxed
);
2092 if (unlikely(data
)) {
2093 // the release barrier is required to make the changes
2094 // to `ds_timer` visible to _dispatch_source_timer_data()
2095 if (os_atomic_cmpxchg2o(ds
, ds_pending_data
, data
,
2096 data
| DISPATCH_TIMER_MISSED_MARKER
, release
)) {
2097 _dispatch_timers_update(dr
, DISPATCH_TIMERS_UNREGISTER
);
2102 data
= _dispatch_source_timer_compute_missed(dr
, now
, 0);
2103 _dispatch_timers_update(dr
, DISPATCH_TIMERS_RETAIN_2
);
2104 pending_data
= data
<< 1;
2105 if (!_dispatch_unote_registered(dr
) && dr
->dt_timer
.target
< INT64_MAX
){
2106 // if we unregistered because of suspension we have to fake we
2108 pending_data
|= DISPATCH_TIMER_MISSED_MARKER
;
2109 os_atomic_store2o(ds
, ds_pending_data
, pending_data
, release
);
2111 os_atomic_store2o(ds
, ds_pending_data
, pending_data
, relaxed
);
2113 _dispatch_trace_timer_fire(dr
, data
, data
);
2114 _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds
, dr
);
2115 _dispatch_object_debug(ds
, "%s", __func__
);
2116 dx_wakeup(ds
, 0, DISPATCH_WAKEUP_MAKE_DIRTY
| DISPATCH_WAKEUP_CONSUME_2
);
2122 _dispatch_timers_run(dispatch_clock_now_cache_t nows
)
2125 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2126 if (_dispatch_timers_heap
[tidx
].dth_count
) {
2127 _dispatch_timers_run2(nows
, tidx
);
2132 #if DISPATCH_HAVE_TIMER_COALESCING
2133 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
2134 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
2136 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
2137 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
2138 #if DISPATCH_HAVE_TIMER_QOS
2139 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
2140 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
2143 #endif // DISPATCH_HAVE_TIMER_COALESCING
2145 static inline dispatch_timer_delay_s
2146 _dispatch_timers_get_delay(dispatch_timer_heap_t dth
, dispatch_clock_t clock
,
2147 uint32_t qos
, dispatch_clock_now_cache_t nows
)
2149 uint64_t target
= dth
->dth_target
, deadline
= dth
->dth_deadline
;
2150 uint64_t delta
= INT64_MAX
, dldelta
= INT64_MAX
;
2151 dispatch_timer_delay_s rc
;
2153 dispatch_assert(target
<= deadline
);
2154 if (delta
== 0 || target
>= INT64_MAX
) {
2158 if (qos
< DISPATCH_TIMER_QOS_COUNT
&& dth
->dth_count
> 2) {
2159 #if DISPATCH_HAVE_TIMER_COALESCING
2160 // Timer pre-coalescing <rdar://problem/13222034>
2161 // When we have several timers with this target/deadline bracket:
2163 // Target window Deadline
2165 // t1: [...........|.................]
2166 // t2: [......|.......]
2167 // t3: [..|..........]
2168 // t4: | [.............]
2172 // Coalescing works better if the Target is delayed to "Optimal", by
2173 // picking the latest target that isn't too close to the deadline.
2174 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
2175 if (target
+ window
< deadline
) {
2176 uint64_t latest
= deadline
- window
;
2177 target
= _dispatch_timer_heap_max_target_before(dth
, latest
);
2182 uint64_t now
= _dispatch_time_now_cached(clock
, nows
);
2183 if (target
<= now
) {
2189 uint64_t tmp
= target
- now
;
2190 if (clock
!= DISPATCH_CLOCK_WALL
) {
2191 tmp
= _dispatch_time_mach2nano(tmp
);
2197 tmp
= deadline
- now
;
2198 if (clock
!= DISPATCH_CLOCK_WALL
) {
2199 tmp
= _dispatch_time_mach2nano(tmp
);
2201 if (tmp
< dldelta
) {
2207 rc
.leeway
= delta
< INT64_MAX
? dldelta
- delta
: INT64_MAX
;
2212 _dispatch_timers_program2(dispatch_clock_now_cache_t nows
, uint32_t tidx
)
2214 uint32_t qos
= DISPATCH_TIMER_QOS(tidx
);
2215 dispatch_clock_t clock
= DISPATCH_TIMER_CLOCK(tidx
);
2216 dispatch_timer_heap_t heap
= &_dispatch_timers_heap
[tidx
];
2217 dispatch_timer_delay_s range
;
2219 range
= _dispatch_timers_get_delay(heap
, clock
, qos
, nows
);
2220 if (range
.delay
== 0 || range
.delay
>= INT64_MAX
) {
2221 _dispatch_trace_next_timer_set(NULL
, qos
);
2222 if (heap
->dth_flags
& DTH_ARMED
) {
2223 _dispatch_event_loop_timer_delete(tidx
);
2225 return range
.delay
== 0;
2228 _dispatch_trace_next_timer_set(heap
->dth_min
[DTH_TARGET_ID
], qos
);
2229 _dispatch_trace_next_timer_program(range
.delay
, qos
);
2230 _dispatch_event_loop_timer_arm(tidx
, range
, nows
);
2236 _dispatch_timers_program(dispatch_clock_now_cache_t nows
)
2239 uint32_t tidx
, timerm
= _dispatch_timers_processing_mask
;
2241 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
2242 if (timerm
& (1 << tidx
)) {
2243 poll
|= _dispatch_timers_program2(nows
, tidx
);
2251 _dispatch_timers_configure(void)
2253 // Find out if there is a new target/deadline on the timer lists
2254 return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap
,
2255 countof(_dispatch_timers_heap
), _dispatch_timers_processing_mask
);
2259 _dispatch_mgr_timers(void)
2261 dispatch_clock_now_cache_s nows
= { };
2262 bool expired
= _dispatch_timers_expired
;
2263 if (unlikely(expired
)) {
2264 _dispatch_timers_run(&nows
);
2266 _dispatch_mgr_trace_timers_wakes();
2267 bool reconfigure
= _dispatch_timers_reconfigure
;
2268 if (unlikely(reconfigure
|| expired
)) {
2270 reconfigure
= _dispatch_timers_configure();
2271 _dispatch_timers_reconfigure
= false;
2273 if (reconfigure
|| expired
) {
2274 expired
= _dispatch_timers_expired
= _dispatch_timers_program(&nows
);
2276 _dispatch_timers_processing_mask
= 0;
2282 #pragma mark dispatch_mgr
2285 _dispatch_mgr_queue_push(dispatch_queue_t dq
, dispatch_object_t dou
,
2286 DISPATCH_UNUSED dispatch_qos_t qos
)
2289 _dispatch_trace_continuation_push(dq
, dou
._do
);
2290 if (unlikely(_dispatch_queue_push_update_tail(dq
, dou
._do
))) {
2291 _dispatch_queue_push_update_head(dq
, dou
._do
);
2292 dq_state
= os_atomic_or2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, release
);
2293 if (!_dq_state_drain_locked_by_self(dq_state
)) {
2294 _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER
, 0, 0);
2301 _dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq
,
2302 DISPATCH_UNUSED dispatch_qos_t qos
,
2303 DISPATCH_UNUSED dispatch_wakeup_flags_t flags
)
2305 DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
2308 #if DISPATCH_USE_MGR_THREAD
2309 DISPATCH_NOINLINE DISPATCH_NORETURN
2311 _dispatch_mgr_invoke(void)
2313 #if DISPATCH_EVENT_BACKEND_KEVENT
2314 dispatch_kevent_s evbuf
[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
];
2316 dispatch_deferred_items_s ddi
= {
2317 #if DISPATCH_EVENT_BACKEND_KEVENT
2318 .ddi_maxevents
= DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
,
2319 .ddi_eventlist
= evbuf
,
2324 _dispatch_deferred_items_set(&ddi
);
2326 _dispatch_mgr_queue_drain();
2327 poll
= _dispatch_mgr_timers();
2328 poll
= poll
|| _dispatch_queue_class_probe(&_dispatch_mgr_q
);
2329 _dispatch_event_loop_drain(poll
? KEVENT_FLAG_IMMEDIATE
: 0);
2332 #endif // DISPATCH_USE_MGR_THREAD
2336 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
,
2337 dispatch_invoke_context_t dic DISPATCH_UNUSED
,
2338 dispatch_invoke_flags_t flags DISPATCH_UNUSED
)
2340 #if DISPATCH_USE_KEVENT_WORKQUEUE
2341 if (_dispatch_kevent_workqueue_enabled
) {
2342 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2343 "kevent workqueue enabled");
2346 #if DISPATCH_USE_MGR_THREAD
2347 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2348 _dispatch_mgr_priority_init();
2349 _dispatch_queue_mgr_lock(&_dispatch_mgr_q
);
2350 // never returns, so burn bridges behind us & clear stack 2k ahead
2351 _dispatch_clear_stack(2048);
2352 _dispatch_mgr_invoke();
2356 #if DISPATCH_USE_KEVENT_WORKQUEUE
2358 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)
2360 _Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN
>=
2361 DISPATCH_DEFERRED_ITEMS_EVENT_COUNT
,
2362 "our list should not be longer than the kernel's");
2364 DISPATCH_ALWAYS_INLINE
2365 static inline dispatch_priority_t
2366 _dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh
,
2367 dispatch_deferred_items_t ddi
)
2369 dispatch_assert(wlh
);
2370 dispatch_priority_t old_dbp
;
2372 pthread_priority_t pp
= _dispatch_get_priority();
2373 if (!(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2374 // If this thread does not have the event manager flag set, don't setup
2375 // as the dispatch manager and let the caller know to only process
2376 // the delivered events.
2378 // Also add the NEEDS_UNBIND flag so that
2379 // _dispatch_priority_compute_update knows it has to unbind
2380 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2381 if (wlh
== DISPATCH_WLH_ANON
) {
2382 pp
|= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2384 // pthread sets the flag when it is an event delivery thread
2385 // so we need to explicitly clear it
2386 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2388 _dispatch_thread_setspecific(dispatch_priority_key
,
2389 (void *)(uintptr_t)pp
);
2390 if (wlh
!= DISPATCH_WLH_ANON
) {
2391 _dispatch_debug("wlh[%p]: handling events", wlh
);
2393 ddi
->ddi_can_stash
= true;
2395 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
;
2398 if ((pp
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) ||
2399 !(pp
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2400 // When the phtread kext is delivering kevents to us, and pthread
2401 // root queues are in use, then the pthread priority TSD is set
2402 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2404 // Given that this isn't a valid QoS we need to fixup the TSD,
2405 // and the best option is to clear the qos/priority bits which tells
2406 // us to not do any QoS related calls on this thread.
2408 // However, in that case the manager thread is opted out of QoS,
2409 // as far as pthread is concerned, and can't be turned into
2410 // something else, so we can't stash.
2411 pp
&= (pthread_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
;
2413 // Managers always park without mutating to a regular worker thread, and
2414 // hence never need to unbind from userland, and when draining a manager,
2415 // the NEEDS_UNBIND flag would cause the mutation to happen.
2416 // So we need to strip this flag
2417 pp
&= ~(pthread_priority_t
)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
;
2418 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
2420 // ensure kevents registered from this thread are registered at manager QoS
2421 old_dbp
= _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER
);
2422 _dispatch_queue_set_current(&_dispatch_mgr_q
);
2423 _dispatch_queue_mgr_lock(&_dispatch_mgr_q
);
2427 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2429 _dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp
)
2431 bool needs_poll
= _dispatch_queue_mgr_unlock(&_dispatch_mgr_q
);
2432 _dispatch_reset_basepri(old_dbp
);
2433 _dispatch_reset_basepri_override();
2434 _dispatch_queue_set_current(NULL
);
2438 DISPATCH_ALWAYS_INLINE
2440 _dispatch_wlh_worker_thread(dispatch_wlh_t wlh
, dispatch_kevent_t events
,
2443 _dispatch_introspection_thread_add();
2444 DISPATCH_PERF_MON_VAR_INIT
2446 dispatch_deferred_items_s ddi
= {
2447 .ddi_eventlist
= events
,
2449 dispatch_priority_t old_dbp
;
2451 old_dbp
= _dispatch_wlh_worker_thread_init(wlh
, &ddi
);
2452 if (old_dbp
== DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
2453 _dispatch_perfmon_start_impl(true);
2455 dispatch_assert(wlh
== DISPATCH_WLH_ANON
);
2456 wlh
= DISPATCH_WLH_ANON
;
2458 _dispatch_deferred_items_set(&ddi
);
2459 _dispatch_event_loop_merge(events
, *nevents
);
2461 if (old_dbp
!= DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
) {
2462 _dispatch_mgr_queue_drain();
2463 bool poll
= _dispatch_mgr_timers();
2464 if (_dispatch_wlh_worker_thread_reset(old_dbp
)) {
2467 if (poll
) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER
, 0, 0);
2468 } else if (ddi
.ddi_stashed_dou
._do
) {
2469 _dispatch_debug("wlh[%p]: draining deferred item %p", wlh
,
2470 ddi
.ddi_stashed_dou
._do
);
2471 if (wlh
== DISPATCH_WLH_ANON
) {
2472 dispatch_assert(ddi
.ddi_nevents
== 0);
2473 _dispatch_deferred_items_set(NULL
);
2474 _dispatch_root_queue_drain_deferred_item(&ddi
2475 DISPATCH_PERF_MON_ARGS
);
2477 _dispatch_root_queue_drain_deferred_wlh(&ddi
2478 DISPATCH_PERF_MON_ARGS
);
2482 _dispatch_deferred_items_set(NULL
);
2483 if (old_dbp
== DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER
&&
2484 !ddi
.ddi_stashed_dou
._do
) {
2485 _dispatch_perfmon_end(perfmon_thread_event_no_steal
);
2487 _dispatch_debug("returning %d deferred kevents", ddi
.ddi_nevents
);
2488 *nevents
= ddi
.ddi_nevents
;
2493 _dispatch_kevent_worker_thread(dispatch_kevent_t
*events
, int *nevents
)
2495 if (!events
&& !nevents
) {
2496 // events for worker thread request have already been delivered earlier
2499 if (!dispatch_assume(*nevents
&& *events
)) return;
2500 _dispatch_adopt_wlh_anon();
2501 _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON
, *events
, nevents
);
2502 _dispatch_reset_wlh();
2506 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2508 #pragma mark dispatch_source_debug
2511 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
2513 dispatch_queue_t target
= ds
->do_targetq
;
2514 dispatch_source_refs_t dr
= ds
->ds_refs
;
2515 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%x, "
2516 "mask = 0x%x, pending_data = 0x%llx, registered = %d, "
2517 "armed = %d, deleted = %d%s, canceled = %d, ",
2518 target
&& target
->dq_label
? target
->dq_label
: "", target
,
2519 dr
->du_ident
, dr
->du_fflags
, (unsigned long long)ds
->ds_pending_data
,
2520 ds
->ds_is_installed
, (bool)(ds
->dq_atomic_flags
& DSF_ARMED
),
2521 (bool)(ds
->dq_atomic_flags
& DSF_DELETED
),
2522 (ds
->dq_atomic_flags
& DSF_DEFERRED_DELETE
) ? " (pending)" : "",
2523 (bool)(ds
->dq_atomic_flags
& DSF_CANCELED
));
2527 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
2529 dispatch_timer_source_refs_t dr
= ds
->ds_timer_refs
;
2530 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx"
2531 ", interval = 0x%llx, flags = 0x%x }, ",
2532 (unsigned long long)dr
->dt_timer
.target
,
2533 (unsigned long long)dr
->dt_timer
.deadline
,
2534 (unsigned long long)dr
->dt_timer
.interval
, dr
->du_fflags
);
2538 _dispatch_source_debug(dispatch_source_t ds
, char *buf
, size_t bufsiz
)
2540 dispatch_source_refs_t dr
= ds
->ds_refs
;
2542 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2544 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2545 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2546 if (dr
->du_is_timer
) {
2547 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
2549 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "kevent = %p%s, "
2550 "filter = %s }", dr
, dr
->du_is_direct
? " (direct)" : "",
2551 dr
->du_type
->dst_kind
);