]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-913.1.6.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 static void _dispatch_source_handler_free(dispatch_source_t ds, long kind);
24 static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval);
25
26 #define DISPATCH_TIMERS_UNREGISTER 0x1
27 #define DISPATCH_TIMERS_RETAIN_2 0x2
28 static void _dispatch_timers_update(dispatch_unote_t du, uint32_t flags);
29 static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt);
30
31 static void _dispatch_source_timer_configure(dispatch_source_t ds);
32 static inline unsigned long _dispatch_source_timer_data(
33 dispatch_source_t ds, dispatch_unote_t du);
34
35 #pragma mark -
36 #pragma mark dispatch_source_t
37
38 dispatch_source_t
39 dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
40 unsigned long mask, dispatch_queue_t dq)
41 {
42 dispatch_source_refs_t dr;
43 dispatch_source_t ds;
44
45 dr = dux_create(dst, handle, mask)._dr;
46 if (unlikely(!dr)) {
47 return DISPATCH_BAD_INPUT;
48 }
49
50 ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
51 sizeof(struct dispatch_source_s));
52 // Initialize as a queue first, then override some settings below.
53 _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
54 DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
55 ds->dq_label = "source";
56 ds->do_ref_cnt++; // the reference the manager queue holds
57 ds->ds_refs = dr;
58 dr->du_owner_wref = _dispatch_ptr2wref(ds);
59
60 if (slowpath(!dq)) {
61 dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
62 } else {
63 _dispatch_retain((dispatch_queue_t _Nonnull)dq);
64 }
65 ds->do_targetq = dq;
66 if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
67 _dispatch_source_set_interval(ds, handle);
68 }
69 _dispatch_object_debug(ds, "%s", __func__);
70 return ds;
71 }
72
73 void
74 _dispatch_source_dispose(dispatch_source_t ds, bool *allow_free)
75 {
76 _dispatch_object_debug(ds, "%s", __func__);
77 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
78 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
79 _dispatch_source_handler_free(ds, DS_CANCEL_HANDLER);
80 _dispatch_unote_dispose(ds->ds_refs);
81 ds->ds_refs = NULL;
82 _dispatch_queue_destroy(ds->_as_dq, allow_free);
83 }
84
85 void
86 _dispatch_source_xref_dispose(dispatch_source_t ds)
87 {
88 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
89 if (unlikely(!(dqf & (DQF_LEGACY|DSF_CANCELED)))) {
90 DISPATCH_CLIENT_CRASH(ds, "Release of a source that has not been "
91 "cancelled, but has a mandatory cancel handler");
92 }
93 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
94 }
95
96 long
97 dispatch_source_testcancel(dispatch_source_t ds)
98 {
99 return (bool)(ds->dq_atomic_flags & DSF_CANCELED);
100 }
101
102 unsigned long
103 dispatch_source_get_mask(dispatch_source_t ds)
104 {
105 dispatch_source_refs_t dr = ds->ds_refs;
106 if (ds->dq_atomic_flags & DSF_CANCELED) {
107 return 0;
108 }
109 #if DISPATCH_USE_MEMORYSTATUS
110 if (dr->du_vmpressure_override) {
111 return NOTE_VM_PRESSURE;
112 }
113 #if TARGET_IPHONE_SIMULATOR
114 if (dr->du_memorypressure_override) {
115 return NOTE_MEMORYSTATUS_PRESSURE_WARN;
116 }
117 #endif
118 #endif // DISPATCH_USE_MEMORYSTATUS
119 return dr->du_fflags;
120 }
121
122 uintptr_t
123 dispatch_source_get_handle(dispatch_source_t ds)
124 {
125 dispatch_source_refs_t dr = ds->ds_refs;
126 #if TARGET_IPHONE_SIMULATOR
127 if (dr->du_memorypressure_override) {
128 return 0;
129 }
130 #endif
131 return dr->du_ident;
132 }
133
134 unsigned long
135 dispatch_source_get_data(dispatch_source_t ds)
136 {
137 #if DISPATCH_USE_MEMORYSTATUS
138 dispatch_source_refs_t dr = ds->ds_refs;
139 if (dr->du_vmpressure_override) {
140 return NOTE_VM_PRESSURE;
141 }
142 #if TARGET_IPHONE_SIMULATOR
143 if (dr->du_memorypressure_override) {
144 return NOTE_MEMORYSTATUS_PRESSURE_WARN;
145 }
146 #endif
147 #endif // DISPATCH_USE_MEMORYSTATUS
148 uint64_t value = os_atomic_load2o(ds, ds_data, relaxed);
149 return (unsigned long)(
150 ds->ds_refs->du_data_action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
151 ? DISPATCH_SOURCE_GET_DATA(value) : value);
152 }
153
154 size_t
155 dispatch_source_get_extended_data(dispatch_source_t ds,
156 dispatch_source_extended_data_t edata, size_t size)
157 {
158 size_t target_size = MIN(size,
159 sizeof(struct dispatch_source_extended_data_s));
160 if (size > 0) {
161 unsigned long data, status = 0;
162 if (ds->ds_refs->du_data_action
163 == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
164 uint64_t combined = os_atomic_load(&ds->ds_data, relaxed);
165 data = DISPATCH_SOURCE_GET_DATA(combined);
166 status = DISPATCH_SOURCE_GET_STATUS(combined);
167 } else {
168 data = dispatch_source_get_data(ds);
169 }
170 if (size >= offsetof(struct dispatch_source_extended_data_s, data)
171 + sizeof(edata->data)) {
172 edata->data = data;
173 }
174 if (size >= offsetof(struct dispatch_source_extended_data_s, status)
175 + sizeof(edata->status)) {
176 edata->status = status;
177 }
178 if (size > sizeof(struct dispatch_source_extended_data_s)) {
179 memset(
180 (char *)edata + sizeof(struct dispatch_source_extended_data_s),
181 0, size - sizeof(struct dispatch_source_extended_data_s));
182 }
183 }
184 return target_size;
185 }
186
187 DISPATCH_NOINLINE
188 void
189 _dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp,
190 unsigned long val)
191 {
192 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
193 int filter = ds->ds_refs->du_filter;
194
195 if (unlikely(dqf & (DSF_CANCELED | DSF_DELETED))) {
196 return;
197 }
198
199 switch (filter) {
200 case DISPATCH_EVFILT_CUSTOM_ADD:
201 os_atomic_add2o(ds, ds_pending_data, val, relaxed);
202 break;
203 case DISPATCH_EVFILT_CUSTOM_OR:
204 os_atomic_or2o(ds, ds_pending_data, val, relaxed);
205 break;
206 case DISPATCH_EVFILT_CUSTOM_REPLACE:
207 os_atomic_store2o(ds, ds_pending_data, val, relaxed);
208 break;
209 default:
210 DISPATCH_CLIENT_CRASH(filter, "Invalid source type");
211 }
212
213 dx_wakeup(ds, _dispatch_qos_from_pp(pp), DISPATCH_WAKEUP_MAKE_DIRTY);
214 }
215
216 void
217 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
218 {
219 _dispatch_source_merge_data(ds, 0, val);
220 }
221
222 #pragma mark -
223 #pragma mark dispatch_source_handler
224
225 DISPATCH_ALWAYS_INLINE
226 static inline dispatch_continuation_t
227 _dispatch_source_get_handler(dispatch_source_refs_t dr, long kind)
228 {
229 return os_atomic_load(&dr->ds_handler[kind], relaxed);
230 }
231 #define _dispatch_source_get_event_handler(dr) \
232 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
233 #define _dispatch_source_get_cancel_handler(dr) \
234 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
235 #define _dispatch_source_get_registration_handler(dr) \
236 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
237
238 DISPATCH_ALWAYS_INLINE
239 static inline dispatch_continuation_t
240 _dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind,
241 bool block)
242 {
243 // sources don't propagate priority by default
244 const dispatch_block_flags_t flags =
245 DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
246 dispatch_continuation_t dc = _dispatch_continuation_alloc();
247 if (func) {
248 uintptr_t dc_flags = 0;
249
250 if (kind != DS_EVENT_HANDLER) {
251 dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
252 }
253 if (block) {
254 #ifdef __BLOCKS__
255 _dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
256 #endif /* __BLOCKS__ */
257 } else {
258 dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
259 _dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
260 0, flags, dc_flags);
261 }
262 _dispatch_trace_continuation_push(ds->_as_dq, dc);
263 } else {
264 dc->dc_flags = 0;
265 dc->dc_func = NULL;
266 }
267 return dc;
268 }
269
270 DISPATCH_NOINLINE
271 static void
272 _dispatch_source_handler_dispose(dispatch_continuation_t dc)
273 {
274 #ifdef __BLOCKS__
275 if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
276 Block_release(dc->dc_ctxt);
277 }
278 #endif /* __BLOCKS__ */
279 if (dc->dc_voucher) {
280 _voucher_release(dc->dc_voucher);
281 dc->dc_voucher = VOUCHER_INVALID;
282 }
283 _dispatch_continuation_free(dc);
284 }
285
286 DISPATCH_ALWAYS_INLINE
287 static inline dispatch_continuation_t
288 _dispatch_source_handler_take(dispatch_source_t ds, long kind)
289 {
290 return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed);
291 }
292
293 DISPATCH_ALWAYS_INLINE
294 static inline void
295 _dispatch_source_handler_free(dispatch_source_t ds, long kind)
296 {
297 dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind);
298 if (dc) _dispatch_source_handler_dispose(dc);
299 }
300
301 DISPATCH_ALWAYS_INLINE
302 static inline void
303 _dispatch_source_handler_replace(dispatch_source_t ds, long kind,
304 dispatch_continuation_t dc)
305 {
306 if (!dc->dc_func) {
307 _dispatch_continuation_free(dc);
308 dc = NULL;
309 } else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
310 dc->dc_ctxt = ds->do_ctxt;
311 }
312 dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
313 if (dc) _dispatch_source_handler_dispose(dc);
314 }
315
316 DISPATCH_NOINLINE
317 static void
318 _dispatch_source_set_handler_slow(void *context)
319 {
320 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
321 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
322
323 dispatch_continuation_t dc = context;
324 long kind = (long)dc->dc_data;
325 dc->dc_data = NULL;
326 _dispatch_source_handler_replace(ds, kind, dc);
327 }
328
329 DISPATCH_NOINLINE
330 static void
331 _dispatch_source_set_handler(dispatch_source_t ds, long kind,
332 dispatch_continuation_t dc)
333 {
334 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
335 if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) {
336 _dispatch_source_handler_replace(ds, kind, dc);
337 return dx_vtable(ds)->do_resume(ds, false);
338 }
339 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
340 DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
341 "after it has been activated");
342 }
343 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
344 if (kind == DS_REGISTN_HANDLER) {
345 _dispatch_bug_deprecated("Setting registration handler after "
346 "the source has been activated");
347 }
348 dc->dc_data = (void *)kind;
349 _dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc,
350 _dispatch_source_set_handler_slow, 0);
351 }
352
353 #ifdef __BLOCKS__
354 void
355 dispatch_source_set_event_handler(dispatch_source_t ds,
356 dispatch_block_t handler)
357 {
358 dispatch_continuation_t dc;
359 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
360 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
361 }
362 #endif /* __BLOCKS__ */
363
364 void
365 dispatch_source_set_event_handler_f(dispatch_source_t ds,
366 dispatch_function_t handler)
367 {
368 dispatch_continuation_t dc;
369 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
370 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
371 }
372
373 #ifdef __BLOCKS__
374 DISPATCH_NOINLINE
375 static void
376 _dispatch_source_set_cancel_handler(dispatch_source_t ds,
377 dispatch_block_t handler)
378 {
379 dispatch_continuation_t dc;
380 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
381 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
382 }
383
384 void
385 dispatch_source_set_cancel_handler(dispatch_source_t ds,
386 dispatch_block_t handler)
387 {
388 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
389 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
390 "this source");
391 }
392 return _dispatch_source_set_cancel_handler(ds, handler);
393 }
394
395 void
396 dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds,
397 dispatch_block_t handler)
398 {
399 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
400 return _dispatch_source_set_cancel_handler(ds, handler);
401 }
402 #endif /* __BLOCKS__ */
403
404 DISPATCH_NOINLINE
405 static void
406 _dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
407 dispatch_function_t handler)
408 {
409 dispatch_continuation_t dc;
410 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
411 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
412 }
413
414 void
415 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
416 dispatch_function_t handler)
417 {
418 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
419 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
420 "this source");
421 }
422 return _dispatch_source_set_cancel_handler_f(ds, handler);
423 }
424
425 void
426 dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds,
427 dispatch_function_t handler)
428 {
429 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
430 return _dispatch_source_set_cancel_handler_f(ds, handler);
431 }
432
433 #ifdef __BLOCKS__
434 void
435 dispatch_source_set_registration_handler(dispatch_source_t ds,
436 dispatch_block_t handler)
437 {
438 dispatch_continuation_t dc;
439 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
440 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
441 }
442 #endif /* __BLOCKS__ */
443
444 void
445 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
446 dispatch_function_t handler)
447 {
448 dispatch_continuation_t dc;
449 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
450 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
451 }
452
453 #pragma mark -
454 #pragma mark dispatch_source_invoke
455
456 static void
457 _dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq,
458 dispatch_invoke_flags_t flags)
459 {
460 dispatch_continuation_t dc;
461
462 dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER);
463 if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) {
464 // no registration callout if source is canceled rdar://problem/8955246
465 return _dispatch_source_handler_dispose(dc);
466 }
467 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
468 dc->dc_ctxt = ds->do_ctxt;
469 }
470 _dispatch_continuation_pop(dc, NULL, flags, cq);
471 }
472
473 static void
474 _dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq,
475 dispatch_invoke_flags_t flags)
476 {
477 dispatch_continuation_t dc;
478
479 dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER);
480 ds->ds_pending_data = 0;
481 ds->ds_data = 0;
482 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
483 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
484 if (!dc) {
485 return;
486 }
487 if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
488 return _dispatch_source_handler_dispose(dc);
489 }
490 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
491 dc->dc_ctxt = ds->do_ctxt;
492 }
493 _dispatch_continuation_pop(dc, NULL, flags, cq);
494 }
495
496 static void
497 _dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq,
498 dispatch_invoke_flags_t flags)
499 {
500 dispatch_source_refs_t dr = ds->ds_refs;
501 dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER);
502 uint64_t prev;
503
504 if (dr->du_is_timer && !(dr->du_fflags & DISPATCH_TIMER_AFTER)) {
505 prev = _dispatch_source_timer_data(ds, dr);
506 } else {
507 prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
508 }
509 if (dr->du_data_action == DISPATCH_UNOTE_ACTION_DATA_SET) {
510 ds->ds_data = ~prev;
511 } else {
512 ds->ds_data = prev;
513 }
514 if (!dispatch_assume(prev != 0) || !dc) {
515 return;
516 }
517 _dispatch_continuation_pop(dc, NULL, flags, cq);
518 if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_AFTER)) {
519 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
520 dispatch_release(ds); // dispatch_after sources are one-shot
521 }
522 }
523
524 DISPATCH_NOINLINE
525 static void
526 _dispatch_source_refs_finalize_unregistration(dispatch_source_t ds)
527 {
528 dispatch_queue_flags_t dqf;
529 dispatch_source_refs_t dr = ds->ds_refs;
530
531 dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq,
532 DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER);
533 if (dqf & DSF_CANCEL_WAITER) {
534 _dispatch_wake_by_address(&ds->dq_atomic_flags);
535 }
536 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
537 _dispatch_release_tailcall(ds); // the retain is done at creation time
538 }
539
540 void
541 _dispatch_source_refs_unregister(dispatch_source_t ds, uint32_t options)
542 {
543 _dispatch_object_debug(ds, "%s", __func__);
544 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
545 dispatch_source_refs_t dr = ds->ds_refs;
546
547 if (dr->du_is_timer) {
548 // Because of the optimization to unregister fired oneshot timers
549 // from the target queue, we can't trust _dispatch_unote_registered()
550 // to tell the truth, it may not have happened yet
551 if (dqf & DSF_ARMED) {
552 _dispatch_timers_unregister(ds->ds_timer_refs);
553 _dispatch_release_2(ds);
554 }
555 dr->du_ident = DISPATCH_TIMER_IDENT_CANCELED;
556 } else {
557 if (_dispatch_unote_needs_rearm(dr) && !(dqf & DSF_ARMED)) {
558 options |= DU_UNREGISTER_IMMEDIATE_DELETE;
559 }
560 if (!_dispatch_unote_unregister(dr, options)) {
561 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
562 ds, dr);
563 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
564 return; // deferred unregistration
565 }
566 }
567
568 ds->ds_is_installed = true;
569 _dispatch_source_refs_finalize_unregistration(ds);
570 }
571
572 DISPATCH_ALWAYS_INLINE
573 static inline bool
574 _dispatch_source_tryarm(dispatch_source_t ds)
575 {
576 dispatch_queue_flags_t oqf, nqf;
577 return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
578 if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) {
579 // the test is inside the loop because it's convenient but the
580 // result should not change for the duration of the rmw_loop
581 os_atomic_rmw_loop_give_up(break);
582 }
583 nqf = oqf | DSF_ARMED;
584 });
585 }
586
587 DISPATCH_ALWAYS_INLINE
588 static inline bool
589 _dispatch_source_refs_resume(dispatch_source_t ds)
590 {
591 dispatch_source_refs_t dr = ds->ds_refs;
592 if (dr->du_is_timer) {
593 _dispatch_timers_update(dr, 0);
594 return true;
595 }
596 if (unlikely(!_dispatch_source_tryarm(ds))) {
597 return false;
598 }
599 _dispatch_unote_resume(dr);
600 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dr);
601 return true;
602 }
603
604 void
605 _dispatch_source_refs_register(dispatch_source_t ds, dispatch_wlh_t wlh,
606 dispatch_priority_t pri)
607 {
608 dispatch_source_refs_t dr = ds->ds_refs;
609 dispatch_priority_t kbp;
610
611 dispatch_assert(!ds->ds_is_installed);
612
613 if (dr->du_is_timer) {
614 dispatch_queue_t dq = ds->_as_dq;
615 kbp = _dispatch_queue_compute_priority_and_wlh(dq, NULL);
616 // aggressively coalesce background/maintenance QoS timers
617 // <rdar://problem/12200216&27342536>
618 if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp))) {
619 if (dr->du_fflags & DISPATCH_TIMER_STRICT) {
620 _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer, ds);
621 } else {
622 dr->du_fflags |= DISPATCH_TIMER_BACKGROUND;
623 dr->du_ident = _dispatch_source_timer_idx(dr);
624 }
625 }
626 _dispatch_timers_update(dr, 0);
627 return;
628 }
629
630 if (unlikely(!_dispatch_source_tryarm(ds) ||
631 !_dispatch_unote_register(dr, wlh, pri))) {
632 // Do the parts of dispatch_source_refs_unregister() that
633 // are required after this partial initialization.
634 _dispatch_source_refs_finalize_unregistration(ds);
635 } else {
636 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, dr);
637 }
638 _dispatch_object_debug(ds, "%s", __func__);
639 }
640
641 static void
642 _dispatch_source_set_event_handler_context(void *ctxt)
643 {
644 dispatch_source_t ds = ctxt;
645 dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);
646
647 if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) {
648 dc->dc_ctxt = ds->do_ctxt;
649 }
650 }
651
652 DISPATCH_ALWAYS_INLINE
653 static inline void
654 _dispatch_source_install(dispatch_source_t ds, dispatch_wlh_t wlh,
655 dispatch_priority_t pri)
656 {
657 _dispatch_source_refs_register(ds, wlh, pri);
658 ds->ds_is_installed = true;
659 }
660
661 void
662 _dispatch_source_finalize_activation(dispatch_source_t ds, bool *allow_resume)
663 {
664 dispatch_continuation_t dc;
665 dispatch_source_refs_t dr = ds->ds_refs;
666 dispatch_priority_t pri;
667 dispatch_wlh_t wlh;
668
669 if (unlikely(dr->du_is_direct &&
670 (_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) {
671 return _dispatch_source_refs_unregister(ds, 0);
672 }
673
674 dc = _dispatch_source_get_event_handler(dr);
675 if (dc) {
676 if (_dispatch_object_is_barrier(dc)) {
677 _dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT);
678 }
679 ds->dq_priority = _dispatch_priority_from_pp_strip_flags(dc->dc_priority);
680 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
681 _dispatch_barrier_async_detached_f(ds->_as_dq, ds,
682 _dispatch_source_set_event_handler_context);
683 }
684 }
685
686 // call "super"
687 _dispatch_queue_finalize_activation(ds->_as_dq, allow_resume);
688
689 if (dr->du_is_direct && !ds->ds_is_installed) {
690 dispatch_queue_t dq = ds->_as_dq;
691 pri = _dispatch_queue_compute_priority_and_wlh(dq, &wlh);
692 if (pri) _dispatch_source_install(ds, wlh, pri);
693 }
694 }
695
696 DISPATCH_ALWAYS_INLINE
697 static inline dispatch_queue_wakeup_target_t
698 _dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
699 dispatch_invoke_flags_t flags, uint64_t *owned)
700 {
701 dispatch_source_t ds = dou._ds;
702 dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
703 dispatch_queue_t dq = _dispatch_queue_get_current();
704 dispatch_source_refs_t dr = ds->ds_refs;
705 dispatch_queue_flags_t dqf;
706
707 if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN) &&
708 _dispatch_unote_wlh_changed(dr, _dispatch_get_wlh())) {
709 dqf = _dispatch_queue_atomic_flags_set_orig(ds->_as_dq,
710 DSF_WLH_CHANGED);
711 if (!(dqf & DSF_WLH_CHANGED)) {
712 _dispatch_bug_deprecated("Changing target queue "
713 "hierarchy after source was activated");
714 }
715 }
716
717 if (_dispatch_queue_class_probe(ds)) {
718 // Intentionally always drain even when on the manager queue
719 // and not the source's regular target queue: we need to be able
720 // to drain timer setting and the like there.
721 dispatch_with_disabled_narrowing(dic, {
722 retq = _dispatch_queue_serial_drain(ds->_as_dq, dic, flags, owned);
723 });
724 }
725
726 // This function performs all source actions. Each action is responsible
727 // for verifying that it takes place on the appropriate queue. If the
728 // current queue is not the correct queue for this action, the correct queue
729 // will be returned and the invoke will be re-driven on that queue.
730
731 // The order of tests here in invoke and in wakeup should be consistent.
732
733 dispatch_queue_t dkq = &_dispatch_mgr_q;
734 bool prevent_starvation = false;
735
736 if (dr->du_is_direct) {
737 dkq = ds->do_targetq;
738 }
739
740 if (dr->du_is_timer &&
741 os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
742 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
743 if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
744 // timer has to be configured on the kevent queue
745 if (dq != dkq) {
746 return dkq;
747 }
748 _dispatch_source_timer_configure(ds);
749 }
750 }
751
752 if (!ds->ds_is_installed) {
753 // The source needs to be installed on the kevent queue.
754 if (dq != dkq) {
755 return dkq;
756 }
757 _dispatch_source_install(ds, _dispatch_get_wlh(),
758 _dispatch_get_basepri());
759 }
760
761 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
762 // Source suspended by an item drained from the source queue.
763 return ds->do_targetq;
764 }
765
766 if (_dispatch_source_get_registration_handler(dr)) {
767 // The source has been registered and the registration handler needs
768 // to be delivered on the target queue.
769 if (dq != ds->do_targetq) {
770 return ds->do_targetq;
771 }
772 // clears ds_registration_handler
773 _dispatch_source_registration_callout(ds, dq, flags);
774 }
775
776 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
777 if ((dqf & DSF_DEFERRED_DELETE) && !(dqf & DSF_ARMED)) {
778 unregister_event:
779 // DSF_DELETE: Pending source kevent unregistration has been completed
780 // !DSF_ARMED: event was delivered and can safely be unregistered
781 if (dq != dkq) {
782 return dkq;
783 }
784 _dispatch_source_refs_unregister(ds, DU_UNREGISTER_IMMEDIATE_DELETE);
785 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
786 }
787
788 if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
789 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
790 // The source has pending data to deliver via the event handler callback
791 // on the target queue. Some sources need to be rearmed on the kevent
792 // queue after event delivery.
793 if (dq == ds->do_targetq) {
794 _dispatch_source_latch_and_call(ds, dq, flags);
795 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
796
797 // starvation avoidance: if the source triggers itself then force a
798 // re-queue to give other things already queued on the target queue
799 // a chance to run.
800 //
801 // however, if the source is directly targeting an overcommit root
802 // queue, this would requeue the source and ask for a new overcommit
803 // thread right away.
804 prevent_starvation = dq->do_targetq ||
805 !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
806 if (prevent_starvation &&
807 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
808 retq = ds->do_targetq;
809 }
810 } else {
811 // there is no point trying to be eager, the next thing to do is
812 // to deliver the event
813 return ds->do_targetq;
814 }
815 }
816
817 if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
818 // The source has been cancelled and needs to be uninstalled from the
819 // kevent queue. After uninstallation, the cancellation handler needs
820 // to be delivered to the target queue.
821 if (!(dqf & DSF_DELETED)) {
822 if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
823 // timers can cheat if not armed because there's nothing left
824 // to do on the manager queue and unregistration can happen
825 // on the regular target queue
826 } else if (dq != dkq) {
827 return dkq;
828 }
829 _dispatch_source_refs_unregister(ds, 0);
830 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
831 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
832 if (!(dqf & DSF_ARMED)) {
833 goto unregister_event;
834 }
835 // we need to wait for the EV_DELETE
836 return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
837 }
838 }
839 if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
840 _dispatch_source_get_cancel_handler(dr) ||
841 _dispatch_source_get_registration_handler(dr))) {
842 retq = ds->do_targetq;
843 } else {
844 _dispatch_source_cancel_callout(ds, dq, flags);
845 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
846 }
847 prevent_starvation = false;
848 }
849
850 if (_dispatch_unote_needs_rearm(dr) &&
851 !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
852 // The source needs to be rearmed on the kevent queue.
853 if (dq != dkq) {
854 return dkq;
855 }
856 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
857 // no need for resume when we can directly unregister the kevent
858 goto unregister_event;
859 }
860 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
861 // do not try to rearm the kevent if the source is suspended
862 // from the source handler
863 return ds->do_targetq;
864 }
865 if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
866 // keep the old behavior to force re-enqueue to our target queue
867 // for the rearm.
868 //
869 // if the handler didn't run, or this is a pending delete
870 // or our target queue is a global queue, then starvation is
871 // not a concern and we can rearm right away.
872 return ds->do_targetq;
873 }
874 if (unlikely(!_dispatch_source_refs_resume(ds))) {
875 goto unregister_event;
876 }
877 if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
878 // try to redrive the drain from under the lock for sources
879 // targeting an overcommit root queue to avoid parking
880 // when the next event has already fired
881 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
882 }
883 }
884
885 return retq;
886 }
887
888 DISPATCH_NOINLINE
889 void
890 _dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
891 dispatch_invoke_flags_t flags)
892 {
893 _dispatch_queue_class_invoke(ds, dic, flags,
894 DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
895 }
896
897 void
898 _dispatch_source_wakeup(dispatch_source_t ds, dispatch_qos_t qos,
899 dispatch_wakeup_flags_t flags)
900 {
901 // This function determines whether the source needs to be invoked.
902 // The order of tests here in wakeup and in invoke should be consistent.
903
904 dispatch_source_refs_t dr = ds->ds_refs;
905 dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
906 dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
907 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
908 bool deferred_delete = (dqf & DSF_DEFERRED_DELETE);
909
910 if (dr->du_is_direct) {
911 dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
912 }
913
914 if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && dr->du_is_timer &&
915 os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
916 // timer has to be configured on the kevent queue
917 tq = dkq;
918 } else if (!ds->ds_is_installed) {
919 // The source needs to be installed on the kevent queue.
920 tq = dkq;
921 } else if (_dispatch_source_get_registration_handler(dr)) {
922 // The registration handler needs to be delivered to the target queue.
923 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
924 } else if (deferred_delete && !(dqf & DSF_ARMED)) {
925 // Pending source kevent unregistration has been completed
926 // or EV_ONESHOT event can be acknowledged
927 tq = dkq;
928 } else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
929 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
930 // The source has pending data to deliver to the target queue.
931 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
932 } else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) {
933 // The source needs to be uninstalled from the kevent queue, or the
934 // cancellation handler needs to be delivered to the target queue.
935 // Note: cancellation assumes installation.
936 if (!(dqf & DSF_DELETED)) {
937 if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
938 // timers can cheat if not armed because there's nothing left
939 // to do on the manager queue and unregistration can happen
940 // on the regular target queue
941 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
942 } else {
943 tq = dkq;
944 }
945 } else if (_dispatch_source_get_event_handler(dr) ||
946 _dispatch_source_get_cancel_handler(dr) ||
947 _dispatch_source_get_registration_handler(dr)) {
948 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
949 }
950 } else if (_dispatch_unote_needs_rearm(dr) &&
951 !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
952 // The source needs to be rearmed on the kevent queue.
953 tq = dkq;
954 }
955 if (!tq && _dispatch_queue_class_probe(ds)) {
956 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
957 }
958
959 if ((tq == DISPATCH_QUEUE_WAKEUP_TARGET) &&
960 ds->do_targetq == &_dispatch_mgr_q) {
961 tq = DISPATCH_QUEUE_WAKEUP_MGR;
962 }
963
964 return _dispatch_queue_class_wakeup(ds->_as_dq, qos, flags, tq);
965 }
966
967 void
968 dispatch_source_cancel(dispatch_source_t ds)
969 {
970 _dispatch_object_debug(ds, "%s", __func__);
971 // Right after we set the cancel flag, someone else
972 // could potentially invoke the source, do the cancellation,
973 // unregister the source, and deallocate it. We would
974 // need to therefore retain/release before setting the bit
975 _dispatch_retain_2(ds);
976
977 dispatch_queue_t q = ds->_as_dq;
978 if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) {
979 _dispatch_release_2_tailcall(ds);
980 } else {
981 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
982 }
983 }
984
985 void
986 dispatch_source_cancel_and_wait(dispatch_source_t ds)
987 {
988 dispatch_queue_flags_t old_dqf, dqf, new_dqf;
989 dispatch_source_refs_t dr = ds->ds_refs;
990
991 if (unlikely(_dispatch_source_get_cancel_handler(dr))) {
992 DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler");
993 }
994
995 _dispatch_object_debug(ds, "%s", __func__);
996 os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
997 new_dqf = old_dqf | DSF_CANCELED;
998 if (old_dqf & DSF_CANCEL_WAITER) {
999 os_atomic_rmw_loop_give_up(break);
1000 }
1001 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1002 // just add DSF_CANCELED
1003 } else if ((old_dqf & DSF_DEFERRED_DELETE) || !dr->du_is_direct) {
1004 new_dqf |= DSF_CANCEL_WAITER;
1005 }
1006 });
1007 dqf = new_dqf;
1008
1009 if (old_dqf & DQF_RELEASED) {
1010 DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release");
1011 }
1012 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1013 return;
1014 }
1015 if (dqf & DSF_CANCEL_WAITER) {
1016 goto wakeup;
1017 }
1018
1019 // simplified version of _dispatch_queue_drain_try_lock
1020 // that also sets the DIRTY bit on failure to lock
1021 uint64_t set_owner_and_set_full_width = _dispatch_lock_value_for_self() |
1022 DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
1023 uint64_t old_state, new_state;
1024
1025 os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, {
1026 new_state = old_state;
1027 if (likely(_dq_state_is_runnable(old_state) &&
1028 !_dq_state_drain_locked(old_state))) {
1029 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
1030 new_state |= set_owner_and_set_full_width;
1031 } else if (old_dqf & DSF_CANCELED) {
1032 os_atomic_rmw_loop_give_up(break);
1033 } else {
1034 // this case needs a release barrier, hence the seq_cst above
1035 new_state |= DISPATCH_QUEUE_DIRTY;
1036 }
1037 });
1038
1039 if (unlikely(_dq_state_is_suspended(old_state))) {
1040 if (unlikely(_dq_state_suspend_cnt(old_state))) {
1041 DISPATCH_CLIENT_CRASH(ds, "Source is suspended");
1042 }
1043 // inactive sources have never been registered and there is no need
1044 // to wait here because activation will notice and mark the source
1045 // as deleted without ever trying to use the fd or mach port.
1046 return dispatch_activate(ds);
1047 }
1048
1049 if (likely(_dq_state_is_runnable(old_state) &&
1050 !_dq_state_drain_locked(old_state))) {
1051 // same thing _dispatch_source_invoke2() does when handling cancellation
1052 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1053 if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) {
1054 _dispatch_source_refs_unregister(ds, 0);
1055 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1056 if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) {
1057 _dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE);
1058 }
1059 }
1060 dx_wakeup(ds, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
1061 } else if (unlikely(_dq_state_drain_locked_by_self(old_state))) {
1062 DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait "
1063 "called from a source handler");
1064 } else {
1065 dispatch_qos_t qos;
1066 wakeup:
1067 qos = _dispatch_qos_from_pp(_dispatch_get_priority());
1068 dx_wakeup(ds, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
1069 dispatch_activate(ds);
1070 }
1071
1072 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1073 while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
1074 if (unlikely(!(dqf & DSF_CANCEL_WAITER))) {
1075 if (!os_atomic_cmpxchgv2o(ds, dq_atomic_flags,
1076 dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) {
1077 continue;
1078 }
1079 dqf |= DSF_CANCEL_WAITER;
1080 }
1081 _dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE);
1082 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1083 }
1084 }
1085
1086 void
1087 _dispatch_source_merge_evt(dispatch_unote_t du, uint32_t flags, uintptr_t data,
1088 uintptr_t status, pthread_priority_t pp)
1089 {
1090 dispatch_source_refs_t dr = du._dr;
1091 dispatch_source_t ds = _dispatch_source_from_refs(dr);
1092 dispatch_wakeup_flags_t wflags = 0;
1093 dispatch_queue_flags_t dqf;
1094
1095 if (_dispatch_unote_needs_rearm(dr) || (flags & (EV_DELETE | EV_ONESHOT))) {
1096 // once we modify the queue atomic flags below, it will allow concurrent
1097 // threads running _dispatch_source_invoke2 to dispose of the source,
1098 // so we can't safely borrow the reference we get from the muxnote udata
1099 // anymore, and need our own
1100 wflags = DISPATCH_WAKEUP_CONSUME_2;
1101 _dispatch_retain_2(ds); // rdar://20382435
1102 }
1103
1104 if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
1105 !(flags & EV_DELETE)) {
1106 dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
1107 DSF_DEFERRED_DELETE, DSF_ARMED);
1108 if (flags & EV_VANISHED) {
1109 _dispatch_bug_kevent_client("kevent", dr->du_type->dst_kind,
1110 "monitored resource vanished before the source "
1111 "cancel handler was invoked", 0);
1112 }
1113 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds,
1114 (flags & EV_VANISHED) ? "vanished" :
1115 "deferred delete oneshot", dr);
1116 } else if (flags & (EV_DELETE | EV_ONESHOT)) {
1117 _dispatch_source_refs_unregister(ds, DU_UNREGISTER_ALREADY_DELETED);
1118 _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds, dr);
1119 if (flags & EV_DELETE) goto done;
1120 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1121 } else if (_dispatch_unote_needs_rearm(dr)) {
1122 dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
1123 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
1124 } else {
1125 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1126 }
1127
1128 if (dqf & (DSF_CANCELED | DQF_RELEASED)) {
1129 goto done; // rdar://20204025
1130 }
1131
1132 dispatch_unote_action_t action = dr->du_data_action;
1133 if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
1134 (flags & EV_VANISHED)) {
1135 // if the resource behind the ident vanished, the event handler can't
1136 // do anything useful anymore, so do not try to call it at all
1137 //
1138 // Note: if the kernel doesn't support EV_VANISHED we always get it
1139 // back unchanged from the flags passed at EV_ADD (registration) time
1140 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1141 // if we get both bits it was a real EV_VANISHED delivery
1142 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
1143 #if HAVE_MACH
1144 } else if (dr->du_filter == EVFILT_MACHPORT) {
1145 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
1146 #endif
1147 } else if (action == DISPATCH_UNOTE_ACTION_DATA_SET) {
1148 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
1149 } else if (action == DISPATCH_UNOTE_ACTION_DATA_ADD) {
1150 os_atomic_add2o(ds, ds_pending_data, data, relaxed);
1151 } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR) {
1152 os_atomic_or2o(ds, ds_pending_data, data, relaxed);
1153 } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
1154 // We combine the data and status into a single 64-bit value.
1155 uint64_t odata, ndata;
1156 uint64_t value = DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data, status);
1157 os_atomic_rmw_loop2o(ds, ds_pending_data, odata, ndata, relaxed, {
1158 ndata = DISPATCH_SOURCE_GET_DATA(odata) | value;
1159 });
1160 } else if (data) {
1161 DISPATCH_INTERNAL_CRASH(action, "Unexpected source action value");
1162 }
1163 _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds, dr);
1164
1165 done:
1166 _dispatch_object_debug(ds, "%s", __func__);
1167 dx_wakeup(ds, _dispatch_qos_from_pp(pp), wflags | DISPATCH_WAKEUP_MAKE_DIRTY);
1168 }
1169
1170 #pragma mark -
1171 #pragma mark dispatch_source_timer
1172
1173 #if DISPATCH_USE_DTRACE
1174 static dispatch_timer_source_refs_t
1175 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1176 #define _dispatch_trace_next_timer_set(x, q) \
1177 _dispatch_trace_next_timer[(q)] = (x)
1178 #define _dispatch_trace_next_timer_program(d, q) \
1179 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1180 DISPATCH_ALWAYS_INLINE
1181 static inline void
1182 _dispatch_mgr_trace_timers_wakes(void)
1183 {
1184 uint32_t qos;
1185
1186 if (_dispatch_timers_will_wake) {
1187 if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
1188 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1189 if (_dispatch_timers_will_wake & (1 << qos)) {
1190 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[qos]);
1191 }
1192 }
1193 }
1194 _dispatch_timers_will_wake = 0;
1195 }
1196 }
1197 #else
1198 #define _dispatch_trace_next_timer_set(x, q)
1199 #define _dispatch_trace_next_timer_program(d, q)
1200 #define _dispatch_mgr_trace_timers_wakes()
1201 #endif
1202
1203 #define _dispatch_source_timer_telemetry_enabled() false
1204
1205 DISPATCH_NOINLINE
1206 static void
1207 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1208 dispatch_clock_t clock, struct dispatch_timer_source_s *values)
1209 {
1210 if (_dispatch_trace_timer_configure_enabled()) {
1211 _dispatch_trace_timer_configure(ds, clock, values);
1212 }
1213 }
1214
1215 DISPATCH_ALWAYS_INLINE
1216 static inline void
1217 _dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock,
1218 struct dispatch_timer_source_s *values)
1219 {
1220 if (_dispatch_trace_timer_configure_enabled() ||
1221 _dispatch_source_timer_telemetry_enabled()) {
1222 _dispatch_source_timer_telemetry_slow(ds, clock, values);
1223 asm(""); // prevent tailcall
1224 }
1225 }
1226
1227 DISPATCH_NOINLINE
1228 static void
1229 _dispatch_source_timer_configure(dispatch_source_t ds)
1230 {
1231 dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
1232 dispatch_timer_config_t dtc;
1233
1234 dtc = os_atomic_xchg2o(dt, dt_pending_config, NULL, dependency);
1235 if (dtc->dtc_clock == DISPATCH_CLOCK_MACH) {
1236 dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
1237 } else {
1238 dt->du_fflags &= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH;
1239 }
1240 dt->dt_timer = dtc->dtc_timer;
1241 free(dtc);
1242 if (ds->ds_is_installed) {
1243 // Clear any pending data that might have accumulated on
1244 // older timer params <rdar://problem/8574886>
1245 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
1246 _dispatch_timers_update(dt, 0);
1247 }
1248 }
1249
1250 static dispatch_timer_config_t
1251 _dispatch_source_timer_config_create(dispatch_time_t start,
1252 uint64_t interval, uint64_t leeway)
1253 {
1254 dispatch_timer_config_t dtc;
1255 dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s));
1256 if (unlikely(interval == 0)) {
1257 if (start != DISPATCH_TIME_FOREVER) {
1258 _dispatch_bug_deprecated("Setting timer interval to 0 requests "
1259 "a 1ns timer, did you mean FOREVER (a one-shot timer)?");
1260 }
1261 interval = 1;
1262 } else if ((int64_t)interval < 0) {
1263 // 6866347 - make sure nanoseconds won't overflow
1264 interval = INT64_MAX;
1265 }
1266 if ((int64_t)leeway < 0) {
1267 leeway = INT64_MAX;
1268 }
1269 if (start == DISPATCH_TIME_NOW) {
1270 start = _dispatch_absolute_time();
1271 } else if (start == DISPATCH_TIME_FOREVER) {
1272 start = INT64_MAX;
1273 }
1274
1275 if ((int64_t)start < 0) {
1276 // wall clock
1277 start = (dispatch_time_t)-((int64_t)start);
1278 dtc->dtc_clock = DISPATCH_CLOCK_WALL;
1279 } else {
1280 // absolute clock
1281 interval = _dispatch_time_nano2mach(interval);
1282 if (interval < 1) {
1283 // rdar://problem/7287561 interval must be at least one in
1284 // in order to avoid later division by zero when calculating
1285 // the missed interval count. (NOTE: the wall clock's
1286 // interval is already "fixed" to be 1 or more)
1287 interval = 1;
1288 }
1289 leeway = _dispatch_time_nano2mach(leeway);
1290 dtc->dtc_clock = DISPATCH_CLOCK_MACH;
1291 }
1292 if (interval < INT64_MAX && leeway > interval / 2) {
1293 leeway = interval / 2;
1294 }
1295
1296 dtc->dtc_timer.target = start;
1297 dtc->dtc_timer.interval = interval;
1298 if (start + leeway < INT64_MAX) {
1299 dtc->dtc_timer.deadline = start + leeway;
1300 } else {
1301 dtc->dtc_timer.deadline = INT64_MAX;
1302 }
1303 return dtc;
1304 }
1305
1306 DISPATCH_NOINLINE
1307 void
1308 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1309 uint64_t interval, uint64_t leeway)
1310 {
1311 dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
1312 dispatch_timer_config_t dtc;
1313
1314 if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) {
1315 DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
1316 }
1317
1318 dtc = _dispatch_source_timer_config_create(start, interval, leeway);
1319 _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
1320 dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
1321 if (dtc) free(dtc);
1322 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
1323 }
1324
1325 static void
1326 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1327 {
1328 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1329 // approx 1 year (60s * 60m * 24h * 365d)
1330 #define FOREVER_NSEC 31536000000000000ull
1331
1332 dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
1333 const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION;
1334 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1335 FOREVER_NSEC/NSEC_PER_MSEC))) {
1336 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1337 } else {
1338 interval = FOREVER_NSEC;
1339 }
1340 interval = _dispatch_time_nano2mach(interval);
1341 uint64_t target = _dispatch_absolute_time() + interval;
1342 target -= (target % interval);
1343 const uint64_t leeway = animation ?
1344 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1345 dr->dt_timer.target = target;
1346 dr->dt_timer.deadline = target + leeway;
1347 dr->dt_timer.interval = interval;
1348 _dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer);
1349 }
1350
1351 #pragma mark -
1352 #pragma mark dispatch_after
1353
1354 DISPATCH_ALWAYS_INLINE
1355 static inline void
1356 _dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1357 void *ctxt, void *handler, bool block)
1358 {
1359 dispatch_timer_source_refs_t dt;
1360 dispatch_source_t ds;
1361 uint64_t leeway, delta;
1362
1363 if (when == DISPATCH_TIME_FOREVER) {
1364 #if DISPATCH_DEBUG
1365 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
1366 #endif
1367 return;
1368 }
1369
1370 delta = _dispatch_timeout(when);
1371 if (delta == 0) {
1372 if (block) {
1373 return dispatch_async(queue, handler);
1374 }
1375 return dispatch_async_f(queue, ctxt, handler);
1376 }
1377 leeway = delta / 10; // <rdar://problem/13447496>
1378
1379 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
1380 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
1381
1382 // this function can and should be optimized to not use a dispatch source
1383 ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
1384 dt = ds->ds_timer_refs;
1385
1386 dispatch_continuation_t dc = _dispatch_continuation_alloc();
1387 if (block) {
1388 _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
1389 } else {
1390 _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
1391 }
1392 // reference `ds` so that it doesn't show up as a leak
1393 dc->dc_data = ds;
1394 _dispatch_trace_continuation_push(ds->_as_dq, dc);
1395 os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
1396
1397 if ((int64_t)when < 0) {
1398 // wall clock
1399 when = (dispatch_time_t)-((int64_t)when);
1400 } else {
1401 // absolute clock
1402 dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
1403 leeway = _dispatch_time_nano2mach(leeway);
1404 }
1405 dt->dt_timer.target = when;
1406 dt->dt_timer.interval = UINT64_MAX;
1407 dt->dt_timer.deadline = when + leeway;
1408 dispatch_activate(ds);
1409 }
1410
1411 DISPATCH_NOINLINE
1412 void
1413 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
1414 dispatch_function_t func)
1415 {
1416 _dispatch_after(when, queue, ctxt, func, false);
1417 }
1418
1419 #ifdef __BLOCKS__
1420 void
1421 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1422 dispatch_block_t work)
1423 {
1424 _dispatch_after(when, queue, NULL, work, true);
1425 }
1426 #endif
1427
1428 #pragma mark -
1429 #pragma mark dispatch_timers
1430
1431 /*
1432 * The dispatch_timer_heap_t structure is a double min-heap of timers,
1433 * interleaving the by-target min-heap in the even slots, and the by-deadline
1434 * in the odd ones.
1435 *
1436 * The min element of these is held inline in the dispatch_timer_heap_t
1437 * structure, and further entries are held in segments.
1438 *
1439 * dth_segments is the number of allocated segments.
1440 *
1441 * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
1442 * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
1443 *
1444 * Segment n (dth_segments - 1) is the last segment and points its final n
1445 * entries to previous segments. Its address is held in the `dth_heap` field.
1446 *
1447 * segment n [ regular timer pointers | n-1 | k | 0 ]
1448 * | | |
1449 * segment n-1 <---------------------------' | |
1450 * segment k <--------------------------------' |
1451 * segment 0 <------------------------------------'
1452 */
1453 #define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u
1454
1455 /*
1456 * There are two min-heaps stored interleaved in a single array,
1457 * even indices are for the by-target min-heap, and odd indices for
1458 * the by-deadline one.
1459 */
1460 #define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
1461 #define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
1462 #define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
1463 (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))
1464
1465 DISPATCH_ALWAYS_INLINE
1466 static inline uint32_t
1467 _dispatch_timer_heap_capacity(uint32_t segments)
1468 {
1469 if (segments == 0) return 2;
1470 uint32_t seg_no = segments - 1;
1471 // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
1472 // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
1473 return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << seg_no) - seg_no;
1474 }
1475
1476 DISPATCH_NOINLINE
1477 static void
1478 _dispatch_timer_heap_grow(dispatch_timer_heap_t dth)
1479 {
1480 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1481 uint32_t seg_no = dth->dth_segments++;
1482 void **heap, **heap_prev = dth->dth_heap;
1483
1484 if (seg_no > 0) {
1485 seg_capacity <<= (seg_no - 1);
1486 }
1487 heap = _dispatch_calloc(seg_capacity, sizeof(void *));
1488 if (seg_no > 1) {
1489 uint32_t prev_seg_no = seg_no - 1;
1490 uint32_t prev_seg_capacity = seg_capacity >> 1;
1491 memcpy(&heap[seg_capacity - prev_seg_no],
1492 &heap_prev[prev_seg_capacity - prev_seg_no],
1493 prev_seg_no * sizeof(void *));
1494 }
1495 if (seg_no > 0) {
1496 heap[seg_capacity - seg_no] = heap_prev;
1497 }
1498 dth->dth_heap = heap;
1499 }
1500
1501 DISPATCH_NOINLINE
1502 static void
1503 _dispatch_timer_heap_shrink(dispatch_timer_heap_t dth)
1504 {
1505 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1506 uint32_t seg_no = --dth->dth_segments;
1507 void **heap = dth->dth_heap, **heap_prev = NULL;
1508
1509 if (seg_no > 0) {
1510 seg_capacity <<= (seg_no - 1);
1511 heap_prev = heap[seg_capacity - seg_no];
1512 }
1513 if (seg_no > 1) {
1514 uint32_t prev_seg_no = seg_no - 1;
1515 uint32_t prev_seg_capacity = seg_capacity >> 1;
1516 memcpy(&heap_prev[prev_seg_capacity - prev_seg_no],
1517 &heap[seg_capacity - prev_seg_no],
1518 prev_seg_no * sizeof(void *));
1519 }
1520 dth->dth_heap = heap_prev;
1521 free(heap);
1522 }
1523
1524 DISPATCH_ALWAYS_INLINE
1525 static inline dispatch_timer_source_refs_t *
1526 _dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth, uint32_t idx)
1527 {
1528 uint32_t seg_no, segments = dth->dth_segments;
1529 void **segment;
1530
1531 if (idx < DTH_ID_COUNT) {
1532 return &dth->dth_min[idx];
1533 }
1534 idx -= DTH_ID_COUNT;
1535
1536 // Derive the segment number from the index. Naming
1537 // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
1538 // 0: 0 .. (C - 1)
1539 // 1: C .. 2 * C - 1
1540 // k: 2^(k-1) * C .. 2^k * C - 1
1541 // so `k` can be derived from the first bit set in `idx`
1542 seg_no = (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1) -
1543 __builtin_clz(idx | (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1)));
1544 if (seg_no + 1 == segments) {
1545 segment = dth->dth_heap;
1546 } else {
1547 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1548 seg_capacity <<= (segments - 2);
1549 segment = dth->dth_heap[seg_capacity - seg_no - 1];
1550 }
1551 if (seg_no) {
1552 idx -= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (seg_no - 1);
1553 }
1554 return (dispatch_timer_source_refs_t *)(segment + idx);
1555 }
1556
1557 DISPATCH_ALWAYS_INLINE
1558 static inline void
1559 _dispatch_timer_heap_set(dispatch_timer_source_refs_t *slot,
1560 dispatch_timer_source_refs_t dt, uint32_t idx)
1561 {
1562 *slot = dt;
1563 dt->dt_heap_entry[DTH_HEAP_ID(idx)] = idx;
1564 }
1565
1566 DISPATCH_ALWAYS_INLINE
1567 static inline uint32_t
1568 _dispatch_timer_heap_parent(uint32_t idx)
1569 {
1570 uint32_t heap_id = DTH_HEAP_ID(idx);
1571 idx = (idx - DTH_ID_COUNT) / 2; // go to the parent
1572 return DTH_IDX_FOR_HEAP_ID(idx, heap_id);
1573 }
1574
1575 DISPATCH_ALWAYS_INLINE
1576 static inline uint32_t
1577 _dispatch_timer_heap_left_child(uint32_t idx)
1578 {
1579 uint32_t heap_id = DTH_HEAP_ID(idx);
1580 // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
1581 return 2 * idx + DTH_ID_COUNT - heap_id;
1582 }
1583
1584 #if DISPATCH_HAVE_TIMER_COALESCING
1585 DISPATCH_ALWAYS_INLINE
1586 static inline uint32_t
1587 _dispatch_timer_heap_walk_skip(uint32_t idx, uint32_t count)
1588 {
1589 uint32_t heap_id = DTH_HEAP_ID(idx);
1590
1591 idx -= heap_id;
1592 if (unlikely(idx + DTH_ID_COUNT == count)) {
1593 // reaching `count` doesn't mean we're done, but there is a weird
1594 // corner case if the last item of the heap is a left child:
1595 //
1596 // /\
1597 // / \
1598 // / __\
1599 // /__/
1600 // ^
1601 //
1602 // The formula below would return the sibling of `idx` which is
1603 // out of bounds. Fortunately, the correct answer is the same
1604 // as for idx's parent
1605 idx = _dispatch_timer_heap_parent(idx);
1606 }
1607
1608 //
1609 // When considering the index in a non interleaved, 1-based array
1610 // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
1611 // for a given idx in our dual-heaps, that index is in one of two forms:
1612 //
1613 // (a) 1xxxx011111 or (b) 111111111
1614 // d i 0 d 0
1615 //
1616 // The first bit set is the row of the binary tree node (0-based).
1617 // The following digits from most to least significant represent the path
1618 // to that node, where `0` is a left turn and `1` a right turn.
1619 //
1620 // For example 0b0101 (5) is a node on row 2 accessed going left then right:
1621 //
1622 // row 0 1
1623 // / .
1624 // row 1 2 3
1625 // . \ . .
1626 // row 2 4 5 6 7
1627 // : : : : : : : :
1628 //
1629 // Skipping a sub-tree in walk order means going to the sibling of the last
1630 // node reached after we turned left. If the node was of the form (a),
1631 // this node is 1xxxx1, which for the above example is 0b0011 (3).
1632 // If the node was of the form (b) then we never took a left, meaning
1633 // we reached the last element in traversal order.
1634 //
1635
1636 //
1637 // we want to find
1638 // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
1639 // - which is offset by log_2(DTH_ID_COUNT) from the position of the least
1640 // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1641 // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
1642 // - which in turn is the same as the position of the least significant 1 in
1643 // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1644 //
1645 dispatch_static_assert(powerof2(DTH_ID_COUNT));
1646 idx += DTH_ID_COUNT + DTH_ID_COUNT - 1;
1647 idx >>= __builtin_ctz(~idx);
1648
1649 //
1650 // `idx` is now either:
1651 // - 0 if it was the (b) case above, in which case the walk is done
1652 // - 1xxxx0 as the position in a 0 based array representation of a non
1653 // interleaved heap, so we just have to compute the interleaved index.
1654 //
1655 return likely(idx) ? DTH_ID_COUNT * idx + heap_id : UINT32_MAX;
1656 }
1657
1658 DISPATCH_ALWAYS_INLINE
1659 static inline uint32_t
1660 _dispatch_timer_heap_walk_next(uint32_t idx, uint32_t count)
1661 {
1662 //
1663 // Goes to the next element in heap walk order, which is the prefix ordered
1664 // walk of the tree.
1665 //
1666 // From a given node, the next item to return is the left child if it
1667 // exists, else the first right sibling we find by walking our parent chain,
1668 // which is exactly what _dispatch_timer_heap_walk_skip() returns.
1669 //
1670 uint32_t lchild = _dispatch_timer_heap_left_child(idx);
1671 if (lchild < count) {
1672 return lchild;
1673 }
1674 return _dispatch_timer_heap_walk_skip(idx, count);
1675 }
1676
1677 DISPATCH_NOINLINE
1678 static uint64_t
1679 _dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth, uint64_t limit)
1680 {
1681 dispatch_timer_source_refs_t dri;
1682 uint32_t idx = _dispatch_timer_heap_left_child(DTH_TARGET_ID);
1683 uint32_t count = dth->dth_count;
1684 uint64_t tmp, target = dth->dth_min[DTH_TARGET_ID]->dt_timer.target;
1685
1686 while (idx < count) {
1687 dri = *_dispatch_timer_heap_get_slot(dth, idx);
1688 tmp = dri->dt_timer.target;
1689 if (tmp > limit) {
1690 // skip subtree since none of the targets below can be before limit
1691 idx = _dispatch_timer_heap_walk_skip(idx, count);
1692 } else {
1693 target = tmp;
1694 idx = _dispatch_timer_heap_walk_next(idx, count);
1695 }
1696 }
1697 return target;
1698 }
1699 #endif // DISPATCH_HAVE_TIMER_COALESCING
1700
1701 DISPATCH_NOINLINE
1702 static void
1703 _dispatch_timer_heap_resift(dispatch_timer_heap_t dth,
1704 dispatch_timer_source_refs_t dt, uint32_t idx)
1705 {
1706 dispatch_static_assert(offsetof(struct dispatch_timer_source_s, target) ==
1707 offsetof(struct dispatch_timer_source_s, heap_key[DTH_TARGET_ID]));
1708 dispatch_static_assert(offsetof(struct dispatch_timer_source_s, deadline) ==
1709 offsetof(struct dispatch_timer_source_s, heap_key[DTH_DEADLINE_ID]));
1710 #define dth_cmp(hid, dt1, op, dt2) \
1711 (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])
1712
1713 dispatch_timer_source_refs_t *pslot, pdt;
1714 dispatch_timer_source_refs_t *cslot, cdt;
1715 dispatch_timer_source_refs_t *rslot, rdt;
1716 uint32_t cidx, dth_count = dth->dth_count;
1717 dispatch_timer_source_refs_t *slot;
1718 int heap_id = DTH_HEAP_ID(idx);
1719 bool sifted_up = false;
1720
1721 // try to sift up
1722
1723 slot = _dispatch_timer_heap_get_slot(dth, idx);
1724 while (idx >= DTH_ID_COUNT) {
1725 uint32_t pidx = _dispatch_timer_heap_parent(idx);
1726 pslot = _dispatch_timer_heap_get_slot(dth, pidx);
1727 pdt = *pslot;
1728 if (dth_cmp(heap_id, pdt, <=, dt)) {
1729 break;
1730 }
1731 _dispatch_timer_heap_set(slot, pdt, idx);
1732 slot = pslot;
1733 idx = pidx;
1734 sifted_up = true;
1735 }
1736 if (sifted_up) {
1737 goto done;
1738 }
1739
1740 // try to sift down
1741
1742 while ((cidx = _dispatch_timer_heap_left_child(idx)) < dth_count) {
1743 uint32_t ridx = cidx + DTH_ID_COUNT;
1744 cslot = _dispatch_timer_heap_get_slot(dth, cidx);
1745 cdt = *cslot;
1746 if (ridx < dth_count) {
1747 rslot = _dispatch_timer_heap_get_slot(dth, ridx);
1748 rdt = *rslot;
1749 if (dth_cmp(heap_id, cdt, >, rdt)) {
1750 cidx = ridx;
1751 cdt = rdt;
1752 cslot = rslot;
1753 }
1754 }
1755 if (dth_cmp(heap_id, dt, <=, cdt)) {
1756 break;
1757 }
1758 _dispatch_timer_heap_set(slot, cdt, idx);
1759 slot = cslot;
1760 idx = cidx;
1761 }
1762
1763 done:
1764 _dispatch_timer_heap_set(slot, dt, idx);
1765 #undef dth_cmp
1766 }
1767
1768 DISPATCH_ALWAYS_INLINE
1769 static void
1770 _dispatch_timer_heap_insert(dispatch_timer_heap_t dth,
1771 dispatch_timer_source_refs_t dt)
1772 {
1773 uint32_t idx = (dth->dth_count += DTH_ID_COUNT) - DTH_ID_COUNT;
1774
1775 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], ==,
1776 DTH_INVALID_ID, "target idx");
1777 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], ==,
1778 DTH_INVALID_ID, "deadline idx");
1779
1780 if (idx == 0) {
1781 dt->dt_heap_entry[DTH_TARGET_ID] = DTH_TARGET_ID;
1782 dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_DEADLINE_ID;
1783 dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = dt;
1784 return;
1785 }
1786
1787 if (unlikely(idx + DTH_ID_COUNT >
1788 _dispatch_timer_heap_capacity(dth->dth_segments))) {
1789 _dispatch_timer_heap_grow(dth);
1790 }
1791 _dispatch_timer_heap_resift(dth, dt, idx + DTH_TARGET_ID);
1792 _dispatch_timer_heap_resift(dth, dt, idx + DTH_DEADLINE_ID);
1793 }
1794
1795 DISPATCH_NOINLINE
1796 static void
1797 _dispatch_timer_heap_remove(dispatch_timer_heap_t dth,
1798 dispatch_timer_source_refs_t dt)
1799 {
1800 uint32_t idx = (dth->dth_count -= DTH_ID_COUNT);
1801
1802 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
1803 DTH_INVALID_ID, "target idx");
1804 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
1805 DTH_INVALID_ID, "deadline idx");
1806
1807 if (idx == 0) {
1808 DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_TARGET_ID], ==, dt,
1809 "target slot");
1810 DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_DEADLINE_ID], ==, dt,
1811 "deadline slot");
1812 dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = NULL;
1813 goto clear_heap_entry;
1814 }
1815
1816 for (uint32_t heap_id = 0; heap_id < DTH_ID_COUNT; heap_id++) {
1817 dispatch_timer_source_refs_t *slot, last_dt;
1818 slot = _dispatch_timer_heap_get_slot(dth, idx + heap_id);
1819 last_dt = *slot; *slot = NULL;
1820 if (last_dt != dt) {
1821 uint32_t removed_idx = dt->dt_heap_entry[heap_id];
1822 _dispatch_timer_heap_resift(dth, last_dt, removed_idx);
1823 }
1824 }
1825 if (unlikely(idx <= _dispatch_timer_heap_capacity(dth->dth_segments - 1))) {
1826 _dispatch_timer_heap_shrink(dth);
1827 }
1828
1829 clear_heap_entry:
1830 dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
1831 dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
1832 }
1833
1834 DISPATCH_ALWAYS_INLINE
1835 static inline void
1836 _dispatch_timer_heap_update(dispatch_timer_heap_t dth,
1837 dispatch_timer_source_refs_t dt)
1838 {
1839 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
1840 DTH_INVALID_ID, "target idx");
1841 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
1842 DTH_INVALID_ID, "deadline idx");
1843
1844
1845 _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_TARGET_ID]);
1846 _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_DEADLINE_ID]);
1847 }
1848
1849 DISPATCH_ALWAYS_INLINE
1850 static bool
1851 _dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth,
1852 uint32_t count, uint32_t mask)
1853 {
1854 dispatch_timer_source_refs_t dt;
1855 bool changed = false;
1856 uint64_t tmp;
1857 uint32_t tidx;
1858
1859 for (tidx = 0; tidx < count; tidx++) {
1860 if (!(mask & (1u << tidx))) {
1861 continue;
1862 }
1863
1864 dt = dth[tidx].dth_min[DTH_TARGET_ID];
1865 tmp = dt ? dt->dt_timer.target : UINT64_MAX;
1866 if (dth[tidx].dth_target != tmp) {
1867 dth[tidx].dth_target = tmp;
1868 changed = true;
1869 }
1870 dt = dth[tidx].dth_min[DTH_DEADLINE_ID];
1871 tmp = dt ? dt->dt_timer.deadline : UINT64_MAX;
1872 if (dth[tidx].dth_deadline != tmp) {
1873 dth[tidx].dth_deadline = tmp;
1874 changed = true;
1875 }
1876 }
1877 return changed;
1878 }
1879
1880 static inline void
1881 _dispatch_timers_unregister(dispatch_timer_source_refs_t dt)
1882 {
1883 uint32_t tidx = dt->du_ident;
1884 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
1885
1886 _dispatch_timer_heap_remove(heap, dt);
1887 _dispatch_timers_reconfigure = true;
1888 _dispatch_timers_processing_mask |= 1 << tidx;
1889 dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
1890 dt->du_wlh = NULL;
1891 }
1892
1893 static inline void
1894 _dispatch_timers_register(dispatch_timer_source_refs_t dt, uint32_t tidx)
1895 {
1896 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
1897 if (_dispatch_unote_registered(dt)) {
1898 DISPATCH_TIMER_ASSERT(dt->du_ident, ==, tidx, "tidx");
1899 _dispatch_timer_heap_update(heap, dt);
1900 } else {
1901 dt->du_ident = tidx;
1902 _dispatch_timer_heap_insert(heap, dt);
1903 }
1904 _dispatch_timers_reconfigure = true;
1905 _dispatch_timers_processing_mask |= 1 << tidx;
1906 dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
1907 dt->du_wlh = DISPATCH_WLH_ANON;
1908 }
1909
1910 DISPATCH_ALWAYS_INLINE
1911 static inline bool
1912 _dispatch_source_timer_tryarm(dispatch_source_t ds)
1913 {
1914 dispatch_queue_flags_t oqf, nqf;
1915 return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
1916 if (oqf & (DSF_CANCELED | DQF_RELEASED)) {
1917 // do not install a cancelled timer
1918 os_atomic_rmw_loop_give_up(break);
1919 }
1920 nqf = oqf | DSF_ARMED;
1921 });
1922 }
1923
1924 // Updates the ordered list of timers based on next fire date for changes to ds.
1925 // Should only be called from the context of _dispatch_mgr_q.
1926 static void
1927 _dispatch_timers_update(dispatch_unote_t du, uint32_t flags)
1928 {
1929 dispatch_timer_source_refs_t dr = du._dt;
1930 dispatch_source_t ds = _dispatch_source_from_refs(dr);
1931 const char *verb = "updated";
1932 bool will_register, disarm = false;
1933
1934 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1935
1936 if (unlikely(dr->du_ident == DISPATCH_TIMER_IDENT_CANCELED)) {
1937 dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
1938 return;
1939 }
1940
1941 // Unregister timers that are unconfigured, disabled, suspended or have
1942 // missed intervals. Rearm after dispatch_set_timer(), resume or source
1943 // invoke will reenable them
1944 will_register = !(flags & DISPATCH_TIMERS_UNREGISTER) &&
1945 dr->dt_timer.target < INT64_MAX &&
1946 !os_atomic_load2o(ds, ds_pending_data, relaxed) &&
1947 !DISPATCH_QUEUE_IS_SUSPENDED(ds) &&
1948 !os_atomic_load2o(dr, dt_pending_config, relaxed);
1949 if (likely(!_dispatch_unote_registered(dr))) {
1950 dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
1951 if (unlikely(!will_register || !_dispatch_source_timer_tryarm(ds))) {
1952 return;
1953 }
1954 verb = "armed";
1955 } else if (unlikely(!will_register)) {
1956 disarm = true;
1957 verb = "disarmed";
1958 }
1959
1960 // The heap owns a +2 on dispatch sources it references
1961 //
1962 // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
1963 // when it wants to take over this +2 at the same time we are unregistering
1964 // the timer from the heap.
1965 //
1966 // Compute our refcount balance according to these rules, if our balance
1967 // would become negative we retain the source upfront, if it is positive, we
1968 // get rid of the extraneous refcounts after we're done touching the source.
1969 int refs = will_register ? -2 : 0;
1970 if (_dispatch_unote_registered(dr) && !(flags & DISPATCH_TIMERS_RETAIN_2)) {
1971 refs += 2;
1972 }
1973 if (refs < 0) {
1974 dispatch_assert(refs == -2);
1975 _dispatch_retain_2(ds);
1976 }
1977
1978 uint32_t tidx = _dispatch_source_timer_idx(dr);
1979 if (unlikely(_dispatch_unote_registered(dr) &&
1980 (!will_register || dr->du_ident != tidx))) {
1981 _dispatch_timers_unregister(dr);
1982 }
1983 if (likely(will_register)) {
1984 _dispatch_timers_register(dr, tidx);
1985 }
1986
1987 if (disarm) {
1988 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
1989 }
1990 _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds, verb, dr);
1991 _dispatch_object_debug(ds, "%s", __func__);
1992 if (refs > 0) {
1993 dispatch_assert(refs == 2);
1994 _dispatch_release_2_tailcall(ds);
1995 }
1996 }
1997
1998 #define DISPATCH_TIMER_MISSED_MARKER 1ul
1999
2000 DISPATCH_ALWAYS_INLINE
2001 static inline unsigned long
2002 _dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt,
2003 uint64_t now, unsigned long prev)
2004 {
2005 uint64_t missed = (now - dt->dt_timer.target) / dt->dt_timer.interval;
2006 if (++missed + prev > LONG_MAX) {
2007 missed = LONG_MAX - prev;
2008 }
2009 if (dt->dt_timer.interval < INT64_MAX) {
2010 uint64_t push_by = missed * dt->dt_timer.interval;
2011 dt->dt_timer.target += push_by;
2012 dt->dt_timer.deadline += push_by;
2013 } else {
2014 dt->dt_timer.target = UINT64_MAX;
2015 dt->dt_timer.deadline = UINT64_MAX;
2016 }
2017 prev += missed;
2018 return prev;
2019 }
2020
2021 DISPATCH_ALWAYS_INLINE
2022 static inline unsigned long
2023 _dispatch_source_timer_data(dispatch_source_t ds, dispatch_unote_t du)
2024 {
2025 dispatch_timer_source_refs_t dr = du._dt;
2026 unsigned long data, prev, clear_prev = 0;
2027
2028 os_atomic_rmw_loop2o(ds, ds_pending_data, prev, clear_prev, relaxed, {
2029 data = prev >> 1;
2030 if (unlikely(prev & DISPATCH_TIMER_MISSED_MARKER)) {
2031 os_atomic_rmw_loop_give_up(goto handle_missed_intervals);
2032 }
2033 });
2034 return data;
2035
2036 handle_missed_intervals:
2037 // The timer may be in _dispatch_source_invoke2() already for other
2038 // reasons such as running the registration handler when ds_pending_data
2039 // is changed by _dispatch_timers_run2() without holding the drain lock.
2040 //
2041 // We hence need dependency ordering to pair with the release barrier
2042 // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
2043 os_atomic_thread_fence(dependency);
2044 dr = os_atomic_force_dependency_on(dr, data);
2045
2046 uint64_t now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr->du_ident));
2047 if (now >= dr->dt_timer.target) {
2048 OS_COMPILER_CAN_ASSUME(dr->dt_timer.interval < INT64_MAX);
2049 data = _dispatch_source_timer_compute_missed(dr, now, data);
2050 }
2051
2052 // When we see the MISSED_MARKER the manager has given up on this timer
2053 // and expects the handler to call "resume".
2054 //
2055 // However, it may not have reflected this into the atomic flags yet
2056 // so make sure _dispatch_source_invoke2() sees the timer is disarmed
2057 //
2058 // The subsequent _dispatch_source_refs_resume() will enqueue the source
2059 // on the manager and make the changes to `ds_timer` above visible.
2060 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
2061 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
2062 return data;
2063 }
2064
2065 static inline void
2066 _dispatch_timers_run2(dispatch_clock_now_cache_t nows, uint32_t tidx)
2067 {
2068 dispatch_timer_source_refs_t dr;
2069 dispatch_source_t ds;
2070 uint64_t data, pending_data;
2071 uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
2072
2073 while ((dr = _dispatch_timers_heap[tidx].dth_min[DTH_TARGET_ID])) {
2074 DISPATCH_TIMER_ASSERT(dr->du_filter, ==, DISPATCH_EVFILT_TIMER,
2075 "invalid filter");
2076 DISPATCH_TIMER_ASSERT(dr->du_ident, ==, tidx, "tidx");
2077 DISPATCH_TIMER_ASSERT(dr->dt_timer.target, !=, 0, "missing target");
2078 ds = _dispatch_source_from_refs(dr);
2079 if (dr->dt_timer.target > now) {
2080 // Done running timers for now.
2081 break;
2082 }
2083 if (dr->du_fflags & DISPATCH_TIMER_AFTER) {
2084 _dispatch_trace_timer_fire(dr, 1, 1);
2085 _dispatch_source_merge_evt(dr, EV_ONESHOT, 1, 0, 0);
2086 _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds, dr);
2087 _dispatch_object_debug(ds, "%s", __func__);
2088 continue;
2089 }
2090
2091 data = os_atomic_load2o(ds, ds_pending_data, relaxed);
2092 if (unlikely(data)) {
2093 // the release barrier is required to make the changes
2094 // to `ds_timer` visible to _dispatch_source_timer_data()
2095 if (os_atomic_cmpxchg2o(ds, ds_pending_data, data,
2096 data | DISPATCH_TIMER_MISSED_MARKER, release)) {
2097 _dispatch_timers_update(dr, DISPATCH_TIMERS_UNREGISTER);
2098 continue;
2099 }
2100 }
2101
2102 data = _dispatch_source_timer_compute_missed(dr, now, 0);
2103 _dispatch_timers_update(dr, DISPATCH_TIMERS_RETAIN_2);
2104 pending_data = data << 1;
2105 if (!_dispatch_unote_registered(dr) && dr->dt_timer.target < INT64_MAX){
2106 // if we unregistered because of suspension we have to fake we
2107 // missed events.
2108 pending_data |= DISPATCH_TIMER_MISSED_MARKER;
2109 os_atomic_store2o(ds, ds_pending_data, pending_data, release);
2110 } else {
2111 os_atomic_store2o(ds, ds_pending_data, pending_data, relaxed);
2112 }
2113 _dispatch_trace_timer_fire(dr, data, data);
2114 _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds, dr);
2115 _dispatch_object_debug(ds, "%s", __func__);
2116 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
2117 }
2118 }
2119
2120 DISPATCH_NOINLINE
2121 static void
2122 _dispatch_timers_run(dispatch_clock_now_cache_t nows)
2123 {
2124 uint32_t tidx;
2125 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2126 if (_dispatch_timers_heap[tidx].dth_count) {
2127 _dispatch_timers_run2(nows, tidx);
2128 }
2129 }
2130 }
2131
2132 #if DISPATCH_HAVE_TIMER_COALESCING
2133 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
2134 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
2135
2136 static const uint64_t _dispatch_kevent_coalescing_window[] = {
2137 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
2138 #if DISPATCH_HAVE_TIMER_QOS
2139 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
2140 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
2141 #endif
2142 };
2143 #endif // DISPATCH_HAVE_TIMER_COALESCING
2144
2145 static inline dispatch_timer_delay_s
2146 _dispatch_timers_get_delay(dispatch_timer_heap_t dth, dispatch_clock_t clock,
2147 uint32_t qos, dispatch_clock_now_cache_t nows)
2148 {
2149 uint64_t target = dth->dth_target, deadline = dth->dth_deadline;
2150 uint64_t delta = INT64_MAX, dldelta = INT64_MAX;
2151 dispatch_timer_delay_s rc;
2152
2153 dispatch_assert(target <= deadline);
2154 if (delta == 0 || target >= INT64_MAX) {
2155 goto done;
2156 }
2157
2158 if (qos < DISPATCH_TIMER_QOS_COUNT && dth->dth_count > 2) {
2159 #if DISPATCH_HAVE_TIMER_COALESCING
2160 // Timer pre-coalescing <rdar://problem/13222034>
2161 // When we have several timers with this target/deadline bracket:
2162 //
2163 // Target window Deadline
2164 // V <-------V
2165 // t1: [...........|.................]
2166 // t2: [......|.......]
2167 // t3: [..|..........]
2168 // t4: | [.............]
2169 // ^
2170 // Optimal Target
2171 //
2172 // Coalescing works better if the Target is delayed to "Optimal", by
2173 // picking the latest target that isn't too close to the deadline.
2174 uint64_t window = _dispatch_kevent_coalescing_window[qos];
2175 if (target + window < deadline) {
2176 uint64_t latest = deadline - window;
2177 target = _dispatch_timer_heap_max_target_before(dth, latest);
2178 }
2179 #endif
2180 }
2181
2182 uint64_t now = _dispatch_time_now_cached(clock, nows);
2183 if (target <= now) {
2184 delta = 0;
2185 dldelta = 0;
2186 goto done;
2187 }
2188
2189 uint64_t tmp = target - now;
2190 if (clock != DISPATCH_CLOCK_WALL) {
2191 tmp = _dispatch_time_mach2nano(tmp);
2192 }
2193 if (tmp < delta) {
2194 delta = tmp;
2195 }
2196
2197 tmp = deadline - now;
2198 if (clock != DISPATCH_CLOCK_WALL) {
2199 tmp = _dispatch_time_mach2nano(tmp);
2200 }
2201 if (tmp < dldelta) {
2202 dldelta = tmp;
2203 }
2204
2205 done:
2206 rc.delay = delta;
2207 rc.leeway = delta < INT64_MAX ? dldelta - delta : INT64_MAX;
2208 return rc;
2209 }
2210
2211 static bool
2212 _dispatch_timers_program2(dispatch_clock_now_cache_t nows, uint32_t tidx)
2213 {
2214 uint32_t qos = DISPATCH_TIMER_QOS(tidx);
2215 dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx);
2216 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
2217 dispatch_timer_delay_s range;
2218
2219 range = _dispatch_timers_get_delay(heap, clock, qos, nows);
2220 if (range.delay == 0 || range.delay >= INT64_MAX) {
2221 _dispatch_trace_next_timer_set(NULL, qos);
2222 if (heap->dth_flags & DTH_ARMED) {
2223 _dispatch_event_loop_timer_delete(tidx);
2224 }
2225 return range.delay == 0;
2226 }
2227
2228 _dispatch_trace_next_timer_set(heap->dth_min[DTH_TARGET_ID], qos);
2229 _dispatch_trace_next_timer_program(range.delay, qos);
2230 _dispatch_event_loop_timer_arm(tidx, range, nows);
2231 return false;
2232 }
2233
2234 DISPATCH_NOINLINE
2235 static bool
2236 _dispatch_timers_program(dispatch_clock_now_cache_t nows)
2237 {
2238 bool poll = false;
2239 uint32_t tidx, timerm = _dispatch_timers_processing_mask;
2240
2241 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2242 if (timerm & (1 << tidx)) {
2243 poll |= _dispatch_timers_program2(nows, tidx);
2244 }
2245 }
2246 return poll;
2247 }
2248
2249 DISPATCH_NOINLINE
2250 static bool
2251 _dispatch_timers_configure(void)
2252 {
2253 // Find out if there is a new target/deadline on the timer lists
2254 return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap,
2255 countof(_dispatch_timers_heap), _dispatch_timers_processing_mask);
2256 }
2257
2258 static inline bool
2259 _dispatch_mgr_timers(void)
2260 {
2261 dispatch_clock_now_cache_s nows = { };
2262 bool expired = _dispatch_timers_expired;
2263 if (unlikely(expired)) {
2264 _dispatch_timers_run(&nows);
2265 }
2266 _dispatch_mgr_trace_timers_wakes();
2267 bool reconfigure = _dispatch_timers_reconfigure;
2268 if (unlikely(reconfigure || expired)) {
2269 if (reconfigure) {
2270 reconfigure = _dispatch_timers_configure();
2271 _dispatch_timers_reconfigure = false;
2272 }
2273 if (reconfigure || expired) {
2274 expired = _dispatch_timers_expired = _dispatch_timers_program(&nows);
2275 }
2276 _dispatch_timers_processing_mask = 0;
2277 }
2278 return expired;
2279 }
2280
2281 #pragma mark -
2282 #pragma mark dispatch_mgr
2283
2284 void
2285 _dispatch_mgr_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
2286 DISPATCH_UNUSED dispatch_qos_t qos)
2287 {
2288 uint64_t dq_state;
2289 _dispatch_trace_continuation_push(dq, dou._do);
2290 if (unlikely(_dispatch_queue_push_update_tail(dq, dou._do))) {
2291 _dispatch_queue_push_update_head(dq, dou._do);
2292 dq_state = os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
2293 if (!_dq_state_drain_locked_by_self(dq_state)) {
2294 _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
2295 }
2296 }
2297 }
2298
2299 DISPATCH_NORETURN
2300 void
2301 _dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq,
2302 DISPATCH_UNUSED dispatch_qos_t qos,
2303 DISPATCH_UNUSED dispatch_wakeup_flags_t flags)
2304 {
2305 DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
2306 }
2307
2308 #if DISPATCH_USE_MGR_THREAD
2309 DISPATCH_NOINLINE DISPATCH_NORETURN
2310 static void
2311 _dispatch_mgr_invoke(void)
2312 {
2313 #if DISPATCH_EVENT_BACKEND_KEVENT
2314 dispatch_kevent_s evbuf[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT];
2315 #endif
2316 dispatch_deferred_items_s ddi = {
2317 #if DISPATCH_EVENT_BACKEND_KEVENT
2318 .ddi_maxevents = DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
2319 .ddi_eventlist = evbuf,
2320 #endif
2321 };
2322 bool poll;
2323
2324 _dispatch_deferred_items_set(&ddi);
2325 for (;;) {
2326 _dispatch_mgr_queue_drain();
2327 poll = _dispatch_mgr_timers();
2328 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2329 _dispatch_event_loop_drain(poll ? KEVENT_FLAG_IMMEDIATE : 0);
2330 }
2331 }
2332 #endif // DISPATCH_USE_MGR_THREAD
2333
2334 DISPATCH_NORETURN
2335 void
2336 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
2337 dispatch_invoke_context_t dic DISPATCH_UNUSED,
2338 dispatch_invoke_flags_t flags DISPATCH_UNUSED)
2339 {
2340 #if DISPATCH_USE_KEVENT_WORKQUEUE
2341 if (_dispatch_kevent_workqueue_enabled) {
2342 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2343 "kevent workqueue enabled");
2344 }
2345 #endif
2346 #if DISPATCH_USE_MGR_THREAD
2347 _dispatch_queue_set_current(&_dispatch_mgr_q);
2348 _dispatch_mgr_priority_init();
2349 _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
2350 // never returns, so burn bridges behind us & clear stack 2k ahead
2351 _dispatch_clear_stack(2048);
2352 _dispatch_mgr_invoke();
2353 #endif
2354 }
2355
2356 #if DISPATCH_USE_KEVENT_WORKQUEUE
2357
2358 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)
2359
2360 _Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN >=
2361 DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
2362 "our list should not be longer than the kernel's");
2363
2364 DISPATCH_ALWAYS_INLINE
2365 static inline dispatch_priority_t
2366 _dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh,
2367 dispatch_deferred_items_t ddi)
2368 {
2369 dispatch_assert(wlh);
2370 dispatch_priority_t old_dbp;
2371
2372 pthread_priority_t pp = _dispatch_get_priority();
2373 if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2374 // If this thread does not have the event manager flag set, don't setup
2375 // as the dispatch manager and let the caller know to only process
2376 // the delivered events.
2377 //
2378 // Also add the NEEDS_UNBIND flag so that
2379 // _dispatch_priority_compute_update knows it has to unbind
2380 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
2381 if (wlh == DISPATCH_WLH_ANON) {
2382 pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2383 } else {
2384 // pthread sets the flag when it is an event delivery thread
2385 // so we need to explicitly clear it
2386 pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2387 }
2388 _dispatch_thread_setspecific(dispatch_priority_key,
2389 (void *)(uintptr_t)pp);
2390 if (wlh != DISPATCH_WLH_ANON) {
2391 _dispatch_debug("wlh[%p]: handling events", wlh);
2392 } else {
2393 ddi->ddi_can_stash = true;
2394 }
2395 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER;
2396 }
2397
2398 if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) ||
2399 !(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2400 // When the phtread kext is delivering kevents to us, and pthread
2401 // root queues are in use, then the pthread priority TSD is set
2402 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2403 //
2404 // Given that this isn't a valid QoS we need to fixup the TSD,
2405 // and the best option is to clear the qos/priority bits which tells
2406 // us to not do any QoS related calls on this thread.
2407 //
2408 // However, in that case the manager thread is opted out of QoS,
2409 // as far as pthread is concerned, and can't be turned into
2410 // something else, so we can't stash.
2411 pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK;
2412 }
2413 // Managers always park without mutating to a regular worker thread, and
2414 // hence never need to unbind from userland, and when draining a manager,
2415 // the NEEDS_UNBIND flag would cause the mutation to happen.
2416 // So we need to strip this flag
2417 pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2418 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
2419
2420 // ensure kevents registered from this thread are registered at manager QoS
2421 old_dbp = _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER);
2422 _dispatch_queue_set_current(&_dispatch_mgr_q);
2423 _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
2424 return old_dbp;
2425 }
2426
2427 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2428 static inline bool
2429 _dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp)
2430 {
2431 bool needs_poll = _dispatch_queue_mgr_unlock(&_dispatch_mgr_q);
2432 _dispatch_reset_basepri(old_dbp);
2433 _dispatch_reset_basepri_override();
2434 _dispatch_queue_set_current(NULL);
2435 return needs_poll;
2436 }
2437
2438 DISPATCH_ALWAYS_INLINE
2439 static void
2440 _dispatch_wlh_worker_thread(dispatch_wlh_t wlh, dispatch_kevent_t events,
2441 int *nevents)
2442 {
2443 _dispatch_introspection_thread_add();
2444 DISPATCH_PERF_MON_VAR_INIT
2445
2446 dispatch_deferred_items_s ddi = {
2447 .ddi_eventlist = events,
2448 };
2449 dispatch_priority_t old_dbp;
2450
2451 old_dbp = _dispatch_wlh_worker_thread_init(wlh, &ddi);
2452 if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
2453 _dispatch_perfmon_start_impl(true);
2454 } else {
2455 dispatch_assert(wlh == DISPATCH_WLH_ANON);
2456 wlh = DISPATCH_WLH_ANON;
2457 }
2458 _dispatch_deferred_items_set(&ddi);
2459 _dispatch_event_loop_merge(events, *nevents);
2460
2461 if (old_dbp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
2462 _dispatch_mgr_queue_drain();
2463 bool poll = _dispatch_mgr_timers();
2464 if (_dispatch_wlh_worker_thread_reset(old_dbp)) {
2465 poll = true;
2466 }
2467 if (poll) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
2468 } else if (ddi.ddi_stashed_dou._do) {
2469 _dispatch_debug("wlh[%p]: draining deferred item %p", wlh,
2470 ddi.ddi_stashed_dou._do);
2471 if (wlh == DISPATCH_WLH_ANON) {
2472 dispatch_assert(ddi.ddi_nevents == 0);
2473 _dispatch_deferred_items_set(NULL);
2474 _dispatch_root_queue_drain_deferred_item(&ddi
2475 DISPATCH_PERF_MON_ARGS);
2476 } else {
2477 _dispatch_root_queue_drain_deferred_wlh(&ddi
2478 DISPATCH_PERF_MON_ARGS);
2479 }
2480 }
2481
2482 _dispatch_deferred_items_set(NULL);
2483 if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER &&
2484 !ddi.ddi_stashed_dou._do) {
2485 _dispatch_perfmon_end(perfmon_thread_event_no_steal);
2486 }
2487 _dispatch_debug("returning %d deferred kevents", ddi.ddi_nevents);
2488 *nevents = ddi.ddi_nevents;
2489 }
2490
2491 DISPATCH_NOINLINE
2492 void
2493 _dispatch_kevent_worker_thread(dispatch_kevent_t *events, int *nevents)
2494 {
2495 if (!events && !nevents) {
2496 // events for worker thread request have already been delivered earlier
2497 return;
2498 }
2499 if (!dispatch_assume(*nevents && *events)) return;
2500 _dispatch_adopt_wlh_anon();
2501 _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON, *events, nevents);
2502 _dispatch_reset_wlh();
2503 }
2504
2505
2506 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2507 #pragma mark -
2508 #pragma mark dispatch_source_debug
2509
2510 static size_t
2511 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
2512 {
2513 dispatch_queue_t target = ds->do_targetq;
2514 dispatch_source_refs_t dr = ds->ds_refs;
2515 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%x, "
2516 "mask = 0x%x, pending_data = 0x%llx, registered = %d, "
2517 "armed = %d, deleted = %d%s, canceled = %d, ",
2518 target && target->dq_label ? target->dq_label : "", target,
2519 dr->du_ident, dr->du_fflags, (unsigned long long)ds->ds_pending_data,
2520 ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED),
2521 (bool)(ds->dq_atomic_flags & DSF_DELETED),
2522 (ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "",
2523 (bool)(ds->dq_atomic_flags & DSF_CANCELED));
2524 }
2525
2526 static size_t
2527 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
2528 {
2529 dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
2530 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx"
2531 ", interval = 0x%llx, flags = 0x%x }, ",
2532 (unsigned long long)dr->dt_timer.target,
2533 (unsigned long long)dr->dt_timer.deadline,
2534 (unsigned long long)dr->dt_timer.interval, dr->du_fflags);
2535 }
2536
2537 size_t
2538 _dispatch_source_debug(dispatch_source_t ds, char *buf, size_t bufsiz)
2539 {
2540 dispatch_source_refs_t dr = ds->ds_refs;
2541 size_t offset = 0;
2542 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2543 dx_kind(ds), ds);
2544 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
2545 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
2546 if (dr->du_is_timer) {
2547 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
2548 }
2549 offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
2550 "filter = %s }", dr, dr->du_is_direct ? " (direct)" : "",
2551 dr->du_type->dst_kind);
2552 return offset;
2553 }