]> git.saurik.com Git - apple/libdispatch.git/blame - src/source.c
libdispatch-913.30.4.tar.gz
[apple/libdispatch.git] / src / source.c
CommitLineData
0ab74447 1/*
beb15981 2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
0ab74447
A
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
e85f4437 5 *
0ab74447
A
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
e85f4437 9 *
0ab74447 10 * http://www.apache.org/licenses/LICENSE-2.0
e85f4437 11 *
0ab74447
A
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
e85f4437 17 *
0ab74447
A
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
0ab74447 22
beb15981 23static void _dispatch_source_handler_free(dispatch_source_t ds, long kind);
6b746eb4
A
24static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval);
25
26#define DISPATCH_TIMERS_UNREGISTER 0x1
27#define DISPATCH_TIMERS_RETAIN_2 0x2
28static void _dispatch_timers_update(dispatch_unote_t du, uint32_t flags);
29static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt);
30
31static void _dispatch_source_timer_configure(dispatch_source_t ds);
e85f4437 32static inline unsigned long _dispatch_source_timer_data(
6b746eb4 33 dispatch_source_t ds, dispatch_unote_t du);
0ab74447 34
e85f4437
A
35#pragma mark -
36#pragma mark dispatch_source_t
0ab74447 37
e85f4437 38dispatch_source_t
6b746eb4 39dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
beb15981 40 unsigned long mask, dispatch_queue_t dq)
0ab74447 41{
6b746eb4 42 dispatch_source_refs_t dr;
517da941 43 dispatch_source_t ds;
0ab74447 44
6b746eb4
A
45 dr = dux_create(dst, handle, mask)._dr;
46 if (unlikely(!dr)) {
beb15981
A
47 return DISPATCH_BAD_INPUT;
48 }
0ab74447 49
6b746eb4 50 ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
c093abd6 51 sizeof(struct dispatch_source_s));
e85f4437 52 // Initialize as a queue first, then override some settings below.
6b746eb4
A
53 _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
54 DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
517da941 55 ds->dq_label = "source";
517da941 56 ds->do_ref_cnt++; // the reference the manager queue holds
6b746eb4
A
57 ds->ds_refs = dr;
58 dr->du_owner_wref = _dispatch_ptr2wref(ds);
e85f4437 59
beb15981 60 if (slowpath(!dq)) {
6b746eb4 61 dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
45201a42 62 } else {
6b746eb4 63 _dispatch_retain((dispatch_queue_t _Nonnull)dq);
45201a42 64 }
beb15981 65 ds->do_targetq = dq;
6b746eb4
A
66 if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
67 _dispatch_source_set_interval(ds, handle);
68 }
517da941 69 _dispatch_object_debug(ds, "%s", __func__);
e85f4437 70 return ds;
0ab74447
A
71}
72
c093abd6 73void
6b746eb4 74_dispatch_source_dispose(dispatch_source_t ds, bool *allow_free)
0ab74447 75{
517da941 76 _dispatch_object_debug(ds, "%s", __func__);
beb15981
A
77 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
78 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
79 _dispatch_source_handler_free(ds, DS_CANCEL_HANDLER);
6b746eb4
A
80 _dispatch_unote_dispose(ds->ds_refs);
81 ds->ds_refs = NULL;
82 _dispatch_queue_destroy(ds->_as_dq, allow_free);
e85f4437 83}
0ab74447 84
e85f4437 85void
c093abd6 86_dispatch_source_xref_dispose(dispatch_source_t ds)
e85f4437 87{
6b746eb4
A
88 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
89 if (unlikely(!(dqf & (DQF_LEGACY|DSF_CANCELED)))) {
90 DISPATCH_CLIENT_CRASH(ds, "Release of a source that has not been "
91 "cancelled, but has a mandatory cancel handler");
92 }
93 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
0ab74447 94}
0ab74447
A
95
96long
97dispatch_source_testcancel(dispatch_source_t ds)
98{
beb15981 99 return (bool)(ds->dq_atomic_flags & DSF_CANCELED);
0ab74447
A
100}
101
0ab74447
A
102unsigned long
103dispatch_source_get_mask(dispatch_source_t ds)
104{
6b746eb4
A
105 dispatch_source_refs_t dr = ds->ds_refs;
106 if (ds->dq_atomic_flags & DSF_CANCELED) {
107 return 0;
108 }
109#if DISPATCH_USE_MEMORYSTATUS
110 if (dr->du_vmpressure_override) {
111 return NOTE_VM_PRESSURE;
98cf8cd2
A
112 }
113#if TARGET_IPHONE_SIMULATOR
6b746eb4
A
114 if (dr->du_memorypressure_override) {
115 return NOTE_MEMORYSTATUS_PRESSURE_WARN;
98cf8cd2
A
116 }
117#endif
6b746eb4
A
118#endif // DISPATCH_USE_MEMORYSTATUS
119 return dr->du_fflags;
0ab74447
A
120}
121
122uintptr_t
123dispatch_source_get_handle(dispatch_source_t ds)
124{
6b746eb4 125 dispatch_source_refs_t dr = ds->ds_refs;
98cf8cd2 126#if TARGET_IPHONE_SIMULATOR
6b746eb4
A
127 if (dr->du_memorypressure_override) {
128 return 0;
98cf8cd2
A
129 }
130#endif
6b746eb4 131 return dr->du_ident;
0ab74447
A
132}
133
134unsigned long
135dispatch_source_get_data(dispatch_source_t ds)
136{
6b746eb4
A
137#if DISPATCH_USE_MEMORYSTATUS
138 dispatch_source_refs_t dr = ds->ds_refs;
139 if (dr->du_vmpressure_override) {
140 return NOTE_VM_PRESSURE;
98cf8cd2
A
141 }
142#if TARGET_IPHONE_SIMULATOR
6b746eb4
A
143 if (dr->du_memorypressure_override) {
144 return NOTE_MEMORYSTATUS_PRESSURE_WARN;
98cf8cd2
A
145 }
146#endif
6b746eb4
A
147#endif // DISPATCH_USE_MEMORYSTATUS
148 uint64_t value = os_atomic_load2o(ds, ds_data, relaxed);
149 return (unsigned long)(
150 ds->ds_refs->du_data_action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
151 ? DISPATCH_SOURCE_GET_DATA(value) : value);
0ab74447
A
152}
153
6b746eb4
A
154size_t
155dispatch_source_get_extended_data(dispatch_source_t ds,
156 dispatch_source_extended_data_t edata, size_t size)
157{
158 size_t target_size = MIN(size,
159 sizeof(struct dispatch_source_extended_data_s));
160 if (size > 0) {
161 unsigned long data, status = 0;
162 if (ds->ds_refs->du_data_action
163 == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
164 uint64_t combined = os_atomic_load(&ds->ds_data, relaxed);
165 data = DISPATCH_SOURCE_GET_DATA(combined);
166 status = DISPATCH_SOURCE_GET_STATUS(combined);
167 } else {
168 data = dispatch_source_get_data(ds);
169 }
170 if (size >= offsetof(struct dispatch_source_extended_data_s, data)
171 + sizeof(edata->data)) {
172 edata->data = data;
173 }
174 if (size >= offsetof(struct dispatch_source_extended_data_s, status)
175 + sizeof(edata->status)) {
176 edata->status = status;
177 }
178 if (size > sizeof(struct dispatch_source_extended_data_s)) {
179 memset(
180 (char *)edata + sizeof(struct dispatch_source_extended_data_s),
181 0, size - sizeof(struct dispatch_source_extended_data_s));
182 }
183 }
184 return target_size;
e85f4437
A
185}
186
6b746eb4 187DISPATCH_NOINLINE
beb15981 188void
6b746eb4
A
189_dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp,
190 unsigned long val)
beb15981 191{
6b746eb4
A
192 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
193 int filter = ds->ds_refs->du_filter;
194
195 if (unlikely(dqf & (DSF_CANCELED | DSF_DELETED))) {
196 return;
197 }
198
199 switch (filter) {
200 case DISPATCH_EVFILT_CUSTOM_ADD:
201 os_atomic_add2o(ds, ds_pending_data, val, relaxed);
202 break;
203 case DISPATCH_EVFILT_CUSTOM_OR:
204 os_atomic_or2o(ds, ds_pending_data, val, relaxed);
205 break;
206 case DISPATCH_EVFILT_CUSTOM_REPLACE:
207 os_atomic_store2o(ds, ds_pending_data, val, relaxed);
208 break;
209 default:
210 DISPATCH_CLIENT_CRASH(filter, "Invalid source type");
211 }
212
213 dx_wakeup(ds, _dispatch_qos_from_pp(pp), DISPATCH_WAKEUP_MAKE_DIRTY);
beb15981
A
214}
215
216void
6b746eb4 217dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
beb15981 218{
6b746eb4 219 _dispatch_source_merge_data(ds, 0, val);
beb15981
A
220}
221
e85f4437
A
222#pragma mark -
223#pragma mark dispatch_source_handler
224
98cf8cd2
A
225DISPATCH_ALWAYS_INLINE
226static inline dispatch_continuation_t
beb15981
A
227_dispatch_source_get_handler(dispatch_source_refs_t dr, long kind)
228{
229 return os_atomic_load(&dr->ds_handler[kind], relaxed);
230}
231#define _dispatch_source_get_event_handler(dr) \
232 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
233#define _dispatch_source_get_cancel_handler(dr) \
234 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
235#define _dispatch_source_get_registration_handler(dr) \
236 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
237
238DISPATCH_ALWAYS_INLINE
239static inline dispatch_continuation_t
240_dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind,
98cf8cd2 241 bool block)
e85f4437 242{
beb15981
A
243 // sources don't propagate priority by default
244 const dispatch_block_flags_t flags =
245 DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
98cf8cd2 246 dispatch_continuation_t dc = _dispatch_continuation_alloc();
beb15981
A
247 if (func) {
248 uintptr_t dc_flags = 0;
249
250 if (kind != DS_EVENT_HANDLER) {
251 dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
252 }
98cf8cd2
A
253 if (block) {
254#ifdef __BLOCKS__
beb15981 255 _dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
98cf8cd2
A
256#endif /* __BLOCKS__ */
257 } else {
beb15981
A
258 dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
259 _dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
260 0, flags, dc_flags);
98cf8cd2 261 }
beb15981 262 _dispatch_trace_continuation_push(ds->_as_dq, dc);
98cf8cd2 263 } else {
beb15981 264 dc->dc_flags = 0;
98cf8cd2 265 dc->dc_func = NULL;
0ab74447 266 }
98cf8cd2 267 return dc;
0ab74447
A
268}
269
beb15981
A
270DISPATCH_NOINLINE
271static void
272_dispatch_source_handler_dispose(dispatch_continuation_t dc)
0ab74447 273{
e85f4437 274#ifdef __BLOCKS__
beb15981
A
275 if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
276 Block_release(dc->dc_ctxt);
277 }
98cf8cd2 278#endif /* __BLOCKS__ */
beb15981
A
279 if (dc->dc_voucher) {
280 _voucher_release(dc->dc_voucher);
281 dc->dc_voucher = VOUCHER_INVALID;
e85f4437 282 }
beb15981
A
283 _dispatch_continuation_free(dc);
284}
285
286DISPATCH_ALWAYS_INLINE
287static inline dispatch_continuation_t
288_dispatch_source_handler_take(dispatch_source_t ds, long kind)
289{
290 return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed);
291}
292
293DISPATCH_ALWAYS_INLINE
294static inline void
295_dispatch_source_handler_free(dispatch_source_t ds, long kind)
296{
297 dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind);
298 if (dc) _dispatch_source_handler_dispose(dc);
0ab74447
A
299}
300
beb15981 301DISPATCH_ALWAYS_INLINE
98cf8cd2 302static inline void
beb15981
A
303_dispatch_source_handler_replace(dispatch_source_t ds, long kind,
304 dispatch_continuation_t dc)
0ab74447 305{
beb15981
A
306 if (!dc->dc_func) {
307 _dispatch_continuation_free(dc);
308 dc = NULL;
309 } else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
310 dc->dc_ctxt = ds->do_ctxt;
311 }
312 dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
313 if (dc) _dispatch_source_handler_dispose(dc);
e85f4437 314}
0ab74447 315
beb15981 316DISPATCH_NOINLINE
e85f4437 317static void
beb15981 318_dispatch_source_set_handler_slow(void *context)
e85f4437
A
319{
320 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
c093abd6 321 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
beb15981 322
98cf8cd2
A
323 dispatch_continuation_t dc = context;
324 long kind = (long)dc->dc_data;
beb15981
A
325 dc->dc_data = NULL;
326 _dispatch_source_handler_replace(ds, kind, dc);
327}
328
329DISPATCH_NOINLINE
330static void
331_dispatch_source_set_handler(dispatch_source_t ds, long kind,
332 dispatch_continuation_t dc)
333{
334 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
335 if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) {
336 _dispatch_source_handler_replace(ds, kind, dc);
337 return dx_vtable(ds)->do_resume(ds, false);
98cf8cd2 338 }
6b746eb4
A
339 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
340 DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
341 "after it has been activated");
342 }
beb15981
A
343 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
344 if (kind == DS_REGISTN_HANDLER) {
345 _dispatch_bug_deprecated("Setting registration handler after "
346 "the source has been activated");
0ab74447 347 }
beb15981
A
348 dc->dc_data = (void *)kind;
349 _dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc,
6b746eb4 350 _dispatch_source_set_handler_slow, 0);
e85f4437 351}
0ab74447 352
98cf8cd2 353#ifdef __BLOCKS__
e85f4437 354void
98cf8cd2
A
355dispatch_source_set_event_handler(dispatch_source_t ds,
356 dispatch_block_t handler)
e85f4437 357{
98cf8cd2
A
358 dispatch_continuation_t dc;
359 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
beb15981 360 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
e85f4437
A
361}
362#endif /* __BLOCKS__ */
0ab74447 363
98cf8cd2
A
364void
365dispatch_source_set_event_handler_f(dispatch_source_t ds,
366 dispatch_function_t handler)
e85f4437 367{
98cf8cd2
A
368 dispatch_continuation_t dc;
369 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
beb15981 370 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
e85f4437 371}
0ab74447 372
6b746eb4
A
373#ifdef __BLOCKS__
374DISPATCH_NOINLINE
375static void
376_dispatch_source_set_cancel_handler(dispatch_source_t ds,
377 dispatch_block_t handler)
e85f4437 378{
6b746eb4
A
379 dispatch_continuation_t dc;
380 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
381 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
e85f4437 382}
0ab74447 383
e85f4437 384void
98cf8cd2
A
385dispatch_source_set_cancel_handler(dispatch_source_t ds,
386 dispatch_block_t handler)
e85f4437 387{
6b746eb4
A
388 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
389 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
390 "this source");
391 }
392 return _dispatch_source_set_cancel_handler(ds, handler);
0ab74447 393}
e85f4437 394
98cf8cd2 395void
6b746eb4
A
396dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds,
397 dispatch_block_t handler)
398{
399 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
400 return _dispatch_source_set_cancel_handler(ds, handler);
401}
402#endif /* __BLOCKS__ */
403
404DISPATCH_NOINLINE
405static void
406_dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
98cf8cd2 407 dispatch_function_t handler)
e85f4437 408{
98cf8cd2
A
409 dispatch_continuation_t dc;
410 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
beb15981 411 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
98cf8cd2 412}
0ab74447 413
6b746eb4
A
414void
415dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
416 dispatch_function_t handler)
417{
418 if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
419 DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
420 "this source");
421 }
422 return _dispatch_source_set_cancel_handler_f(ds, handler);
423}
424
425void
426dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds,
427 dispatch_function_t handler)
428{
429 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
430 return _dispatch_source_set_cancel_handler_f(ds, handler);
431}
432
e85f4437 433#ifdef __BLOCKS__
98cf8cd2
A
434void
435dispatch_source_set_registration_handler(dispatch_source_t ds,
436 dispatch_block_t handler)
437{
438 dispatch_continuation_t dc;
439 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
beb15981 440 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
e85f4437 441}
98cf8cd2 442#endif /* __BLOCKS__ */
0ab74447
A
443
444void
e85f4437
A
445dispatch_source_set_registration_handler_f(dispatch_source_t ds,
446 dispatch_function_t handler)
0ab74447 447{
98cf8cd2
A
448 dispatch_continuation_t dc;
449 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
beb15981 450 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
e85f4437
A
451}
452
453#pragma mark -
454#pragma mark dispatch_source_invoke
455
9d9a7e41
A
456bool
457_dispatch_source_will_reenable_kevent_4NW(dispatch_source_t ds)
458{
459 uint64_t dq_state = os_atomic_load2o(ds, dq_state, relaxed);
460 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
461
462 if (unlikely(!_dq_state_drain_locked_by_self(dq_state))) {
463 DISPATCH_CLIENT_CRASH(0, "_dispatch_source_will_reenable_kevent_4NW "
464 "not called from within the event handler");
465 }
466
467 return _dispatch_unote_needs_rearm(ds->ds_refs) && !(dqf & DSF_ARMED);
468}
469
e85f4437 470static void
beb15981
A
471_dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq,
472 dispatch_invoke_flags_t flags)
e85f4437 473{
beb15981
A
474 dispatch_continuation_t dc;
475
476 dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER);
477 if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) {
e85f4437 478 // no registration callout if source is canceled rdar://problem/8955246
beb15981 479 return _dispatch_source_handler_dispose(dc);
e85f4437 480 }
beb15981 481 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
98cf8cd2
A
482 dc->dc_ctxt = ds->do_ctxt;
483 }
6b746eb4 484 _dispatch_continuation_pop(dc, NULL, flags, cq);
e85f4437
A
485}
486
487static void
beb15981
A
488_dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq,
489 dispatch_invoke_flags_t flags)
e85f4437 490{
beb15981
A
491 dispatch_continuation_t dc;
492
493 dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER);
e85f4437
A
494 ds->ds_pending_data = 0;
495 ds->ds_data = 0;
beb15981
A
496 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
497 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
98cf8cd2 498 if (!dc) {
0ab74447 499 return;
e85f4437 500 }
beb15981
A
501 if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
502 return _dispatch_source_handler_dispose(dc);
e85f4437 503 }
beb15981 504 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
98cf8cd2
A
505 dc->dc_ctxt = ds->do_ctxt;
506 }
6b746eb4 507 _dispatch_continuation_pop(dc, NULL, flags, cq);
e85f4437
A
508}
509
510static void
beb15981
A
511_dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq,
512 dispatch_invoke_flags_t flags)
e85f4437 513{
e85f4437 514 dispatch_source_refs_t dr = ds->ds_refs;
beb15981 515 dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER);
6b746eb4
A
516 uint64_t prev;
517
518 if (dr->du_is_timer && !(dr->du_fflags & DISPATCH_TIMER_AFTER)) {
519 prev = _dispatch_source_timer_data(ds, dr);
520 } else {
521 prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
522 }
523 if (dr->du_data_action == DISPATCH_UNOTE_ACTION_DATA_SET) {
e85f4437 524 ds->ds_data = ~prev;
e85f4437
A
525 } else {
526 ds->ds_data = prev;
527 }
6b746eb4 528 if (!dispatch_assume(prev != 0) || !dc) {
98cf8cd2 529 return;
e85f4437 530 }
6b746eb4
A
531 _dispatch_continuation_pop(dc, NULL, flags, cq);
532 if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_AFTER)) {
beb15981
A
533 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
534 dispatch_release(ds); // dispatch_after sources are one-shot
535 }
e85f4437
A
536}
537
6b746eb4 538DISPATCH_NOINLINE
517da941 539static void
6b746eb4
A
540_dispatch_source_refs_finalize_unregistration(dispatch_source_t ds)
541{
542 dispatch_queue_flags_t dqf;
543 dispatch_source_refs_t dr = ds->ds_refs;
544
545 dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq,
546 DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER);
547 if (dqf & DSF_CANCEL_WAITER) {
548 _dispatch_wake_by_address(&ds->dq_atomic_flags);
549 }
550 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
551 _dispatch_release_tailcall(ds); // the retain is done at creation time
552}
553
554void
555_dispatch_source_refs_unregister(dispatch_source_t ds, uint32_t options)
517da941
A
556{
557 _dispatch_object_debug(ds, "%s", __func__);
beb15981 558 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
6b746eb4 559 dispatch_source_refs_t dr = ds->ds_refs;
beb15981 560
6b746eb4
A
561 if (dr->du_is_timer) {
562 // Because of the optimization to unregister fired oneshot timers
563 // from the target queue, we can't trust _dispatch_unote_registered()
564 // to tell the truth, it may not have happened yet
565 if (dqf & DSF_ARMED) {
566 _dispatch_timers_unregister(ds->ds_timer_refs);
567 _dispatch_release_2(ds);
beb15981 568 }
6b746eb4 569 dr->du_ident = DISPATCH_TIMER_IDENT_CANCELED;
45201a42 570 } else {
6b746eb4
A
571 if (_dispatch_unote_needs_rearm(dr) && !(dqf & DSF_ARMED)) {
572 options |= DU_UNREGISTER_IMMEDIATE_DELETE;
45201a42 573 }
6b746eb4 574 if (!_dispatch_unote_unregister(dr, options)) {
45201a42 575 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
6b746eb4 576 ds, dr);
beb15981 577 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
45201a42 578 return; // deferred unregistration
45201a42 579 }
beb15981 580 }
6b746eb4 581
beb15981 582 ds->ds_is_installed = true;
6b746eb4 583 _dispatch_source_refs_finalize_unregistration(ds);
517da941
A
584}
585
beb15981 586DISPATCH_ALWAYS_INLINE
6b746eb4 587static inline bool
beb15981
A
588_dispatch_source_tryarm(dispatch_source_t ds)
589{
590 dispatch_queue_flags_t oqf, nqf;
591 return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
592 if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) {
593 // the test is inside the loop because it's convenient but the
594 // result should not change for the duration of the rmw_loop
595 os_atomic_rmw_loop_give_up(break);
596 }
597 nqf = oqf | DSF_ARMED;
598 });
599}
600
6b746eb4
A
601DISPATCH_ALWAYS_INLINE
602static inline bool
603_dispatch_source_refs_resume(dispatch_source_t ds)
e85f4437 604{
6b746eb4
A
605 dispatch_source_refs_t dr = ds->ds_refs;
606 if (dr->du_is_timer) {
607 _dispatch_timers_update(dr, 0);
beb15981 608 return true;
0ab74447 609 }
beb15981
A
610 if (unlikely(!_dispatch_source_tryarm(ds))) {
611 return false;
e85f4437 612 }
6b746eb4
A
613 _dispatch_unote_resume(dr);
614 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dr);
beb15981 615 return true;
0ab74447
A
616}
617
6b746eb4
A
618void
619_dispatch_source_refs_register(dispatch_source_t ds, dispatch_wlh_t wlh,
620 dispatch_priority_t pri)
517da941 621{
6b746eb4
A
622 dispatch_source_refs_t dr = ds->ds_refs;
623 dispatch_priority_t kbp;
624
625 dispatch_assert(!ds->ds_is_installed);
626
627 if (dr->du_is_timer) {
628 dispatch_queue_t dq = ds->_as_dq;
629 kbp = _dispatch_queue_compute_priority_and_wlh(dq, NULL);
fa22f35b
A
630 // aggressively coalesce background/maintenance QoS timers
631 // <rdar://problem/12200216&27342536>
6b746eb4
A
632 if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp))) {
633 if (dr->du_fflags & DISPATCH_TIMER_STRICT) {
634 _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer, ds);
635 } else {
636 dr->du_fflags |= DISPATCH_TIMER_BACKGROUND;
637 dr->du_ident = _dispatch_source_timer_idx(dr);
638 }
fa22f35b 639 }
6b746eb4 640 _dispatch_timers_update(dr, 0);
45201a42 641 return;
517da941 642 }
6b746eb4
A
643
644 if (unlikely(!_dispatch_source_tryarm(ds) ||
645 !_dispatch_unote_register(dr, wlh, pri))) {
646 // Do the parts of dispatch_source_refs_unregister() that
647 // are required after this partial initialization.
648 _dispatch_source_refs_finalize_unregistration(ds);
beb15981 649 } else {
6b746eb4 650 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, dr);
517da941 651 }
517da941
A
652 _dispatch_object_debug(ds, "%s", __func__);
653}
654
beb15981
A
655static void
656_dispatch_source_set_event_handler_context(void *ctxt)
657{
658 dispatch_source_t ds = ctxt;
659 dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);
660
661 if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) {
662 dc->dc_ctxt = ds->do_ctxt;
663 }
664}
665
6b746eb4
A
666DISPATCH_ALWAYS_INLINE
667static inline void
668_dispatch_source_install(dispatch_source_t ds, dispatch_wlh_t wlh,
669 dispatch_priority_t pri)
beb15981 670{
6b746eb4
A
671 _dispatch_source_refs_register(ds, wlh, pri);
672 ds->ds_is_installed = true;
beb15981
A
673}
674
675void
6b746eb4 676_dispatch_source_finalize_activation(dispatch_source_t ds, bool *allow_resume)
beb15981
A
677{
678 dispatch_continuation_t dc;
6b746eb4
A
679 dispatch_source_refs_t dr = ds->ds_refs;
680 dispatch_priority_t pri;
681 dispatch_wlh_t wlh;
beb15981 682
6b746eb4 683 if (unlikely(dr->du_is_direct &&
beb15981 684 (_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) {
6b746eb4 685 return _dispatch_source_refs_unregister(ds, 0);
beb15981
A
686 }
687
6b746eb4 688 dc = _dispatch_source_get_event_handler(dr);
beb15981
A
689 if (dc) {
690 if (_dispatch_object_is_barrier(dc)) {
691 _dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT);
692 }
6b746eb4 693 ds->dq_priority = _dispatch_priority_from_pp_strip_flags(dc->dc_priority);
beb15981
A
694 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
695 _dispatch_barrier_async_detached_f(ds->_as_dq, ds,
696 _dispatch_source_set_event_handler_context);
697 }
698 }
699
700 // call "super"
6b746eb4 701 _dispatch_queue_finalize_activation(ds->_as_dq, allow_resume);
beb15981 702
6b746eb4
A
703 if (dr->du_is_direct && !ds->ds_is_installed) {
704 dispatch_queue_t dq = ds->_as_dq;
705 pri = _dispatch_queue_compute_priority_and_wlh(dq, &wlh);
706 if (pri) _dispatch_source_install(ds, wlh, pri);
beb15981
A
707 }
708}
709
517da941 710DISPATCH_ALWAYS_INLINE
6b746eb4
A
711static inline dispatch_queue_wakeup_target_t
712_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
713 dispatch_invoke_flags_t flags, uint64_t *owned)
0ab74447 714{
517da941 715 dispatch_source_t ds = dou._ds;
6b746eb4 716 dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
beb15981 717 dispatch_queue_t dq = _dispatch_queue_get_current();
6b746eb4
A
718 dispatch_source_refs_t dr = ds->ds_refs;
719 dispatch_queue_flags_t dqf;
720
721 if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN) &&
722 _dispatch_unote_wlh_changed(dr, _dispatch_get_wlh())) {
723 dqf = _dispatch_queue_atomic_flags_set_orig(ds->_as_dq,
724 DSF_WLH_CHANGED);
725 if (!(dqf & DSF_WLH_CHANGED)) {
726 _dispatch_bug_deprecated("Changing target queue "
727 "hierarchy after source was activated");
728 }
729 }
beb15981 730
45201a42 731 if (_dispatch_queue_class_probe(ds)) {
beb15981
A
732 // Intentionally always drain even when on the manager queue
733 // and not the source's regular target queue: we need to be able
734 // to drain timer setting and the like there.
6b746eb4
A
735 dispatch_with_disabled_narrowing(dic, {
736 retq = _dispatch_queue_serial_drain(ds->_as_dq, dic, flags, owned);
737 });
517da941
A
738 }
739
0ab74447
A
740 // This function performs all source actions. Each action is responsible
741 // for verifying that it takes place on the appropriate queue. If the
742 // current queue is not the correct queue for this action, the correct queue
743 // will be returned and the invoke will be re-driven on that queue.
744
beb15981 745 // The order of tests here in invoke and in wakeup should be consistent.
e85f4437 746
beb15981 747 dispatch_queue_t dkq = &_dispatch_mgr_q;
6b746eb4 748 bool prevent_starvation = false;
beb15981 749
6b746eb4 750 if (dr->du_is_direct) {
beb15981
A
751 dkq = ds->do_targetq;
752 }
0ab74447 753
6b746eb4
A
754 if (dr->du_is_timer &&
755 os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
756 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
757 if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
758 // timer has to be configured on the kevent queue
759 if (dq != dkq) {
760 return dkq;
761 }
762 _dispatch_source_timer_configure(ds);
763 }
764 }
765
0ab74447 766 if (!ds->ds_is_installed) {
45201a42
A
767 // The source needs to be installed on the kevent queue.
768 if (dq != dkq) {
769 return dkq;
0ab74447 770 }
6b746eb4
A
771 _dispatch_source_install(ds, _dispatch_get_wlh(),
772 _dispatch_get_basepri());
beb15981
A
773 }
774
775 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
e85f4437 776 // Source suspended by an item drained from the source queue.
beb15981
A
777 return ds->do_targetq;
778 }
779
780 if (_dispatch_source_get_registration_handler(dr)) {
e85f4437
A
781 // The source has been registered and the registration handler needs
782 // to be delivered on the target queue.
783 if (dq != ds->do_targetq) {
784 return ds->do_targetq;
785 }
786 // clears ds_registration_handler
beb15981
A
787 _dispatch_source_registration_callout(ds, dq, flags);
788 }
789
6b746eb4
A
790 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
791 if ((dqf & DSF_DEFERRED_DELETE) && !(dqf & DSF_ARMED)) {
beb15981
A
792unregister_event:
793 // DSF_DELETE: Pending source kevent unregistration has been completed
794 // !DSF_ARMED: event was delivered and can safely be unregistered
45201a42
A
795 if (dq != dkq) {
796 return dkq;
797 }
6b746eb4 798 _dispatch_source_refs_unregister(ds, DU_UNREGISTER_IMMEDIATE_DELETE);
beb15981
A
799 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
800 }
801
6b746eb4
A
802 if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
803 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
beb15981
A
804 // The source has pending data to deliver via the event handler callback
805 // on the target queue. Some sources need to be rearmed on the kevent
806 // queue after event delivery.
807 if (dq == ds->do_targetq) {
808 _dispatch_source_latch_and_call(ds, dq, flags);
809 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
810
811 // starvation avoidance: if the source triggers itself then force a
812 // re-queue to give other things already queued on the target queue
813 // a chance to run.
814 //
6b746eb4 815 // however, if the source is directly targeting an overcommit root
beb15981
A
816 // queue, this would requeue the source and ask for a new overcommit
817 // thread right away.
818 prevent_starvation = dq->do_targetq ||
6b746eb4
A
819 !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
820 if (prevent_starvation &&
821 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
beb15981 822 retq = ds->do_targetq;
45201a42 823 }
beb15981
A
824 } else {
825 // there is no point trying to be eager, the next thing to do is
826 // to deliver the event
45201a42
A
827 return ds->do_targetq;
828 }
beb15981
A
829 }
830
831 if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
0ab74447 832 // The source has been cancelled and needs to be uninstalled from the
45201a42 833 // kevent queue. After uninstallation, the cancellation handler needs
0ab74447 834 // to be delivered to the target queue.
beb15981 835 if (!(dqf & DSF_DELETED)) {
6b746eb4
A
836 if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
837 // timers can cheat if not armed because there's nothing left
838 // to do on the manager queue and unregistration can happen
839 // on the regular target queue
840 } else if (dq != dkq) {
45201a42 841 return dkq;
0ab74447 842 }
6b746eb4 843 _dispatch_source_refs_unregister(ds, 0);
beb15981
A
844 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
845 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
846 if (!(dqf & DSF_ARMED)) {
847 goto unregister_event;
45201a42 848 }
beb15981 849 // we need to wait for the EV_DELETE
6b746eb4 850 return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
45201a42 851 }
3014670f 852 }
beb15981
A
853 if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
854 _dispatch_source_get_cancel_handler(dr) ||
855 _dispatch_source_get_registration_handler(dr))) {
856 retq = ds->do_targetq;
857 } else {
858 _dispatch_source_cancel_callout(ds, dq, flags);
859 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
0ab74447 860 }
beb15981
A
861 prevent_starvation = false;
862 }
863
6b746eb4
A
864 if (_dispatch_unote_needs_rearm(dr) &&
865 !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
45201a42
A
866 // The source needs to be rearmed on the kevent queue.
867 if (dq != dkq) {
868 return dkq;
0ab74447 869 }
beb15981
A
870 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
871 // no need for resume when we can directly unregister the kevent
872 goto unregister_event;
873 }
6b746eb4
A
874 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
875 // do not try to rearm the kevent if the source is suspended
876 // from the source handler
877 return ds->do_targetq;
878 }
879 if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
beb15981 880 // keep the old behavior to force re-enqueue to our target queue
6b746eb4 881 // for the rearm.
beb15981
A
882 //
883 // if the handler didn't run, or this is a pending delete
884 // or our target queue is a global queue, then starvation is
885 // not a concern and we can rearm right away.
886 return ds->do_targetq;
887 }
6b746eb4 888 if (unlikely(!_dispatch_source_refs_resume(ds))) {
beb15981
A
889 goto unregister_event;
890 }
6b746eb4
A
891 if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
892 // try to redrive the drain from under the lock for sources
893 // targeting an overcommit root queue to avoid parking
894 // when the next event has already fired
895 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
896 }
beb15981
A
897 }
898
899 return retq;
0ab74447
A
900}
901
517da941
A
902DISPATCH_NOINLINE
903void
6b746eb4
A
904_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
905 dispatch_invoke_flags_t flags)
517da941 906{
6b746eb4
A
907 _dispatch_queue_class_invoke(ds, dic, flags,
908 DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
517da941
A
909}
910
beb15981 911void
6b746eb4 912_dispatch_source_wakeup(dispatch_source_t ds, dispatch_qos_t qos,
beb15981 913 dispatch_wakeup_flags_t flags)
0ab74447
A
914{
915 // This function determines whether the source needs to be invoked.
beb15981 916 // The order of tests here in wakeup and in invoke should be consistent.
0ab74447 917
e85f4437 918 dispatch_source_refs_t dr = ds->ds_refs;
beb15981
A
919 dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
920 dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
921 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
922 bool deferred_delete = (dqf & DSF_DEFERRED_DELETE);
923
6b746eb4 924 if (dr->du_is_direct) {
beb15981
A
925 dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
926 }
927
6b746eb4
A
928 if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && dr->du_is_timer &&
929 os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
930 // timer has to be configured on the kevent queue
931 tq = dkq;
932 } else if (!ds->ds_is_installed) {
45201a42 933 // The source needs to be installed on the kevent queue.
beb15981
A
934 tq = dkq;
935 } else if (_dispatch_source_get_registration_handler(dr)) {
e85f4437 936 // The registration handler needs to be delivered to the target queue.
beb15981 937 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
6b746eb4 938 } else if (deferred_delete && !(dqf & DSF_ARMED)) {
45201a42 939 // Pending source kevent unregistration has been completed
beb15981
A
940 // or EV_ONESHOT event can be acknowledged
941 tq = dkq;
6b746eb4
A
942 } else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
943 os_atomic_load2o(ds, ds_pending_data, relaxed)) {
beb15981
A
944 // The source has pending data to deliver to the target queue.
945 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
946 } else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) {
45201a42 947 // The source needs to be uninstalled from the kevent queue, or the
0ab74447
A
948 // cancellation handler needs to be delivered to the target queue.
949 // Note: cancellation assumes installation.
beb15981 950 if (!(dqf & DSF_DELETED)) {
6b746eb4
A
951 if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
952 // timers can cheat if not armed because there's nothing left
953 // to do on the manager queue and unregistration can happen
954 // on the regular target queue
955 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
956 } else {
957 tq = dkq;
958 }
beb15981
A
959 } else if (_dispatch_source_get_event_handler(dr) ||
960 _dispatch_source_get_cancel_handler(dr) ||
961 _dispatch_source_get_registration_handler(dr)) {
962 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
0ab74447 963 }
6b746eb4
A
964 } else if (_dispatch_unote_needs_rearm(dr) &&
965 !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
45201a42 966 // The source needs to be rearmed on the kevent queue.
beb15981
A
967 tq = dkq;
968 }
969 if (!tq && _dispatch_queue_class_probe(ds)) {
970 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
971 }
972
6b746eb4
A
973 if ((tq == DISPATCH_QUEUE_WAKEUP_TARGET) &&
974 ds->do_targetq == &_dispatch_mgr_q) {
975 tq = DISPATCH_QUEUE_WAKEUP_MGR;
beb15981 976 }
6b746eb4
A
977
978 return _dispatch_queue_class_wakeup(ds->_as_dq, qos, flags, tq);
beb15981
A
979}
980
981void
982dispatch_source_cancel(dispatch_source_t ds)
983{
984 _dispatch_object_debug(ds, "%s", __func__);
985 // Right after we set the cancel flag, someone else
986 // could potentially invoke the source, do the cancellation,
987 // unregister the source, and deallocate it. We would
988 // need to therefore retain/release before setting the bit
6b746eb4 989 _dispatch_retain_2(ds);
beb15981
A
990
991 dispatch_queue_t q = ds->_as_dq;
992 if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) {
6b746eb4 993 _dispatch_release_2_tailcall(ds);
beb15981 994 } else {
6b746eb4 995 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
beb15981
A
996 }
997}
998
999void
1000dispatch_source_cancel_and_wait(dispatch_source_t ds)
1001{
1002 dispatch_queue_flags_t old_dqf, dqf, new_dqf;
6b746eb4 1003 dispatch_source_refs_t dr = ds->ds_refs;
beb15981 1004
6b746eb4 1005 if (unlikely(_dispatch_source_get_cancel_handler(dr))) {
beb15981
A
1006 DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler");
1007 }
1008
1009 _dispatch_object_debug(ds, "%s", __func__);
1010 os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1011 new_dqf = old_dqf | DSF_CANCELED;
1012 if (old_dqf & DSF_CANCEL_WAITER) {
1013 os_atomic_rmw_loop_give_up(break);
1014 }
1015 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1016 // just add DSF_CANCELED
6b746eb4 1017 } else if ((old_dqf & DSF_DEFERRED_DELETE) || !dr->du_is_direct) {
beb15981
A
1018 new_dqf |= DSF_CANCEL_WAITER;
1019 }
1020 });
1021 dqf = new_dqf;
1022
1023 if (old_dqf & DQF_RELEASED) {
1024 DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release");
1025 }
1026 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1027 return;
1028 }
1029 if (dqf & DSF_CANCEL_WAITER) {
6b746eb4 1030 goto wakeup;
beb15981
A
1031 }
1032
1033 // simplified version of _dispatch_queue_drain_try_lock
1034 // that also sets the DIRTY bit on failure to lock
6b746eb4 1035 uint64_t set_owner_and_set_full_width = _dispatch_lock_value_for_self() |
beb15981
A
1036 DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
1037 uint64_t old_state, new_state;
1038
1039 os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, {
1040 new_state = old_state;
1041 if (likely(_dq_state_is_runnable(old_state) &&
1042 !_dq_state_drain_locked(old_state))) {
1043 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
6b746eb4 1044 new_state |= set_owner_and_set_full_width;
beb15981
A
1045 } else if (old_dqf & DSF_CANCELED) {
1046 os_atomic_rmw_loop_give_up(break);
1047 } else {
1048 // this case needs a release barrier, hence the seq_cst above
1049 new_state |= DISPATCH_QUEUE_DIRTY;
1050 }
1051 });
1052
1053 if (unlikely(_dq_state_is_suspended(old_state))) {
1054 if (unlikely(_dq_state_suspend_cnt(old_state))) {
1055 DISPATCH_CLIENT_CRASH(ds, "Source is suspended");
1056 }
1057 // inactive sources have never been registered and there is no need
1058 // to wait here because activation will notice and mark the source
1059 // as deleted without ever trying to use the fd or mach port.
1060 return dispatch_activate(ds);
1061 }
1062
1063 if (likely(_dq_state_is_runnable(old_state) &&
1064 !_dq_state_drain_locked(old_state))) {
1065 // same thing _dispatch_source_invoke2() does when handling cancellation
1066 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1067 if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) {
6b746eb4 1068 _dispatch_source_refs_unregister(ds, 0);
beb15981
A
1069 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1070 if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) {
1071 _dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE);
1072 }
1073 }
6b746eb4
A
1074 dx_wakeup(ds, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
1075 } else if (unlikely(_dq_state_drain_locked_by_self(old_state))) {
beb15981
A
1076 DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait "
1077 "called from a source handler");
1078 } else {
6b746eb4
A
1079 dispatch_qos_t qos;
1080wakeup:
1081 qos = _dispatch_qos_from_pp(_dispatch_get_priority());
1082 dx_wakeup(ds, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
beb15981
A
1083 dispatch_activate(ds);
1084 }
1085
1086 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1087 while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
1088 if (unlikely(!(dqf & DSF_CANCEL_WAITER))) {
6b746eb4 1089 if (!os_atomic_cmpxchgv2o(ds, dq_atomic_flags,
beb15981
A
1090 dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) {
1091 continue;
1092 }
1093 dqf |= DSF_CANCEL_WAITER;
1094 }
1095 _dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE);
1096 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
0ab74447 1097 }
0ab74447
A
1098}
1099
6b746eb4
A
1100void
1101_dispatch_source_merge_evt(dispatch_unote_t du, uint32_t flags, uintptr_t data,
1102 uintptr_t status, pthread_priority_t pp)
0ab74447 1103{
6b746eb4
A
1104 dispatch_source_refs_t dr = du._dr;
1105 dispatch_source_t ds = _dispatch_source_from_refs(dr);
1106 dispatch_wakeup_flags_t wflags = 0;
beb15981 1107 dispatch_queue_flags_t dqf;
beb15981 1108
6b746eb4 1109 if (_dispatch_unote_needs_rearm(dr) || (flags & (EV_DELETE | EV_ONESHOT))) {
beb15981
A
1110 // once we modify the queue atomic flags below, it will allow concurrent
1111 // threads running _dispatch_source_invoke2 to dispose of the source,
6b746eb4 1112 // so we can't safely borrow the reference we get from the muxnote udata
beb15981 1113 // anymore, and need our own
6b746eb4
A
1114 wflags = DISPATCH_WAKEUP_CONSUME_2;
1115 _dispatch_retain_2(ds); // rdar://20382435
beb15981
A
1116 }
1117
6b746eb4
A
1118 if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
1119 !(flags & EV_DELETE)) {
beb15981
A
1120 dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
1121 DSF_DEFERRED_DELETE, DSF_ARMED);
6b746eb4
A
1122 if (flags & EV_VANISHED) {
1123 _dispatch_bug_kevent_client("kevent", dr->du_type->dst_kind,
beb15981
A
1124 "monitored resource vanished before the source "
1125 "cancel handler was invoked", 0);
1126 }
1127 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds,
6b746eb4
A
1128 (flags & EV_VANISHED) ? "vanished" :
1129 "deferred delete oneshot", dr);
1130 } else if (flags & (EV_DELETE | EV_ONESHOT)) {
1131 _dispatch_source_refs_unregister(ds, DU_UNREGISTER_ALREADY_DELETED);
1132 _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds, dr);
1133 if (flags & EV_DELETE) goto done;
1134 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1135 } else if (_dispatch_unote_needs_rearm(dr)) {
beb15981 1136 dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
6b746eb4 1137 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
beb15981
A
1138 } else {
1139 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
45201a42 1140 }
beb15981
A
1141
1142 if (dqf & (DSF_CANCELED | DQF_RELEASED)) {
45201a42 1143 goto done; // rdar://20204025
0ab74447 1144 }
beb15981 1145
6b746eb4
A
1146 dispatch_unote_action_t action = dr->du_data_action;
1147 if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
1148 (flags & EV_VANISHED)) {
beb15981
A
1149 // if the resource behind the ident vanished, the event handler can't
1150 // do anything useful anymore, so do not try to call it at all
1151 //
1152 // Note: if the kernel doesn't support EV_VANISHED we always get it
1153 // back unchanged from the flags passed at EV_ADD (registration) time
1154 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1155 // if we get both bits it was a real EV_VANISHED delivery
1156 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
1157#if HAVE_MACH
6b746eb4 1158 } else if (dr->du_filter == EVFILT_MACHPORT) {
beb15981
A
1159 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
1160#endif
6b746eb4 1161 } else if (action == DISPATCH_UNOTE_ACTION_DATA_SET) {
beb15981 1162 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
6b746eb4 1163 } else if (action == DISPATCH_UNOTE_ACTION_DATA_ADD) {
beb15981 1164 os_atomic_add2o(ds, ds_pending_data, data, relaxed);
6b746eb4 1165 } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR) {
beb15981 1166 os_atomic_or2o(ds, ds_pending_data, data, relaxed);
6b746eb4
A
1167 } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
1168 // We combine the data and status into a single 64-bit value.
1169 uint64_t odata, ndata;
1170 uint64_t value = DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data, status);
1171 os_atomic_rmw_loop2o(ds, ds_pending_data, odata, ndata, relaxed, {
1172 ndata = DISPATCH_SOURCE_GET_DATA(odata) | value;
1173 });
1174 } else if (data) {
1175 DISPATCH_INTERNAL_CRASH(action, "Unexpected source action value");
e85f4437 1176 }
6b746eb4 1177 _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds, dr);
beb15981 1178
45201a42 1179done:
6b746eb4
A
1180 _dispatch_object_debug(ds, "%s", __func__);
1181 dx_wakeup(ds, _dispatch_qos_from_pp(pp), wflags | DISPATCH_WAKEUP_MAKE_DIRTY);
0ab74447
A
1182}
1183
e85f4437 1184#pragma mark -
6b746eb4 1185#pragma mark dispatch_source_timer
517da941 1186
98cf8cd2 1187#if DISPATCH_USE_DTRACE
6b746eb4 1188static dispatch_timer_source_refs_t
517da941
A
1189 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1190#define _dispatch_trace_next_timer_set(x, q) \
1191 _dispatch_trace_next_timer[(q)] = (x)
1192#define _dispatch_trace_next_timer_program(d, q) \
1193 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
6b746eb4
A
1194DISPATCH_ALWAYS_INLINE
1195static inline void
1196_dispatch_mgr_trace_timers_wakes(void)
1197{
1198 uint32_t qos;
1199
1200 if (_dispatch_timers_will_wake) {
1201 if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
1202 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1203 if (_dispatch_timers_will_wake & (1 << qos)) {
1204 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[qos]);
1205 }
1206 }
1207 }
1208 _dispatch_timers_will_wake = 0;
1209 }
1210}
517da941
A
1211#else
1212#define _dispatch_trace_next_timer_set(x, q)
1213#define _dispatch_trace_next_timer_program(d, q)
6b746eb4 1214#define _dispatch_mgr_trace_timers_wakes()
517da941
A
1215#endif
1216
1217#define _dispatch_source_timer_telemetry_enabled() false
1218
1219DISPATCH_NOINLINE
1220static void
1221_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
fa22f35b 1222 dispatch_clock_t clock, struct dispatch_timer_source_s *values)
517da941
A
1223{
1224 if (_dispatch_trace_timer_configure_enabled()) {
fa22f35b 1225 _dispatch_trace_timer_configure(ds, clock, values);
0ab74447
A
1226 }
1227}
1228
517da941 1229DISPATCH_ALWAYS_INLINE
e85f4437 1230static inline void
fa22f35b 1231_dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock,
517da941 1232 struct dispatch_timer_source_s *values)
0ab74447 1233{
517da941
A
1234 if (_dispatch_trace_timer_configure_enabled() ||
1235 _dispatch_source_timer_telemetry_enabled()) {
fa22f35b 1236 _dispatch_source_timer_telemetry_slow(ds, clock, values);
517da941
A
1237 asm(""); // prevent tailcall
1238 }
1239}
0ab74447 1240
6b746eb4 1241DISPATCH_NOINLINE
0ab74447 1242static void
6b746eb4 1243_dispatch_source_timer_configure(dispatch_source_t ds)
0ab74447 1244{
6b746eb4
A
1245 dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
1246 dispatch_timer_config_t dtc;
1247
1248 dtc = os_atomic_xchg2o(dt, dt_pending_config, NULL, dependency);
1249 if (dtc->dtc_clock == DISPATCH_CLOCK_MACH) {
1250 dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
fa22f35b 1251 } else {
6b746eb4 1252 dt->du_fflags &= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH;
fa22f35b 1253 }
6b746eb4
A
1254 dt->dt_timer = dtc->dtc_timer;
1255 free(dtc);
1256 if (ds->ds_is_installed) {
1257 // Clear any pending data that might have accumulated on
1258 // older timer params <rdar://problem/8574886>
1259 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
1260 _dispatch_timers_update(dt, 0);
517da941 1261 }
0ab74447
A
1262}
1263
6b746eb4
A
1264static dispatch_timer_config_t
1265_dispatch_source_timer_config_create(dispatch_time_t start,
517da941 1266 uint64_t interval, uint64_t leeway)
0ab74447 1267{
6b746eb4
A
1268 dispatch_timer_config_t dtc;
1269 dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s));
1270 if (unlikely(interval == 0)) {
1271 if (start != DISPATCH_TIME_FOREVER) {
1272 _dispatch_bug_deprecated("Setting timer interval to 0 requests "
1273 "a 1ns timer, did you mean FOREVER (a one-shot timer)?");
1274 }
0ab74447
A
1275 interval = 1;
1276 } else if ((int64_t)interval < 0) {
1277 // 6866347 - make sure nanoseconds won't overflow
1278 interval = INT64_MAX;
1279 }
e85f4437
A
1280 if ((int64_t)leeway < 0) {
1281 leeway = INT64_MAX;
1282 }
0ab74447 1283 if (start == DISPATCH_TIME_NOW) {
e85f4437 1284 start = _dispatch_absolute_time();
0ab74447
A
1285 } else if (start == DISPATCH_TIME_FOREVER) {
1286 start = INT64_MAX;
1287 }
1288
0ab74447
A
1289 if ((int64_t)start < 0) {
1290 // wall clock
517da941 1291 start = (dispatch_time_t)-((int64_t)start);
6b746eb4 1292 dtc->dtc_clock = DISPATCH_CLOCK_WALL;
0ab74447 1293 } else {
e85f4437 1294 // absolute clock
517da941
A
1295 interval = _dispatch_time_nano2mach(interval);
1296 if (interval < 1) {
1297 // rdar://problem/7287561 interval must be at least one in
1298 // in order to avoid later division by zero when calculating
1299 // the missed interval count. (NOTE: the wall clock's
1300 // interval is already "fixed" to be 1 or more)
1301 interval = 1;
e85f4437 1302 }
517da941 1303 leeway = _dispatch_time_nano2mach(leeway);
6b746eb4
A
1304 dtc->dtc_clock = DISPATCH_CLOCK_MACH;
1305 }
1306 if (interval < INT64_MAX && leeway > interval / 2) {
1307 leeway = interval / 2;
0ab74447 1308 }
517da941 1309
6b746eb4
A
1310 dtc->dtc_timer.target = start;
1311 dtc->dtc_timer.interval = interval;
1312 if (start + leeway < INT64_MAX) {
1313 dtc->dtc_timer.deadline = start + leeway;
517da941 1314 } else {
6b746eb4 1315 dtc->dtc_timer.deadline = INT64_MAX;
517da941 1316 }
6b746eb4 1317 return dtc;
0ab74447 1318}
0ab74447 1319
6b746eb4 1320DISPATCH_NOINLINE
517da941
A
1321void
1322dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1323 uint64_t interval, uint64_t leeway)
1324{
6b746eb4
A
1325 dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
1326 dispatch_timer_config_t dtc;
0ab74447 1327
6b746eb4
A
1328 if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) {
1329 DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
1330 }
1331
1332 dtc = _dispatch_source_timer_config_create(start, interval, leeway);
1333 _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
1334 dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
1335 if (dtc) free(dtc);
1336 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
517da941 1337}
0ab74447 1338
6b746eb4 1339static void
517da941
A
1340_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1341{
fa22f35b
A
1342#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1343// approx 1 year (60s * 60m * 24h * 365d)
1344#define FOREVER_NSEC 31536000000000000ull
1345
6b746eb4
A
1346 dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
1347 const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION;
517da941
A
1348 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1349 FOREVER_NSEC/NSEC_PER_MSEC))) {
1350 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1351 } else {
1352 interval = FOREVER_NSEC;
1353 }
1354 interval = _dispatch_time_nano2mach(interval);
1355 uint64_t target = _dispatch_absolute_time() + interval;
6b746eb4 1356 target -= (target % interval);
517da941
A
1357 const uint64_t leeway = animation ?
1358 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
6b746eb4
A
1359 dr->dt_timer.target = target;
1360 dr->dt_timer.deadline = target + leeway;
1361 dr->dt_timer.interval = interval;
1362 _dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer);
517da941 1363}
0ab74447 1364
517da941 1365#pragma mark -
6b746eb4 1366#pragma mark dispatch_after
517da941 1367
6b746eb4
A
1368DISPATCH_ALWAYS_INLINE
1369static inline void
1370_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1371 void *ctxt, void *handler, bool block)
1372{
1373 dispatch_timer_source_refs_t dt;
1374 dispatch_source_t ds;
1375 uint64_t leeway, delta;
517da941 1376
6b746eb4
A
1377 if (when == DISPATCH_TIME_FOREVER) {
1378#if DISPATCH_DEBUG
1379 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
beb15981 1380#endif
6b746eb4
A
1381 return;
1382 }
0ab74447 1383
6b746eb4
A
1384 delta = _dispatch_timeout(when);
1385 if (delta == 0) {
1386 if (block) {
1387 return dispatch_async(queue, handler);
1388 }
1389 return dispatch_async_f(queue, ctxt, handler);
1390 }
1391 leeway = delta / 10; // <rdar://problem/13447496>
0ab74447 1392
6b746eb4
A
1393 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
1394 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
0ab74447 1395
6b746eb4
A
1396 // this function can and should be optimized to not use a dispatch source
1397 ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
1398 dt = ds->ds_timer_refs;
0ab74447 1399
6b746eb4
A
1400 dispatch_continuation_t dc = _dispatch_continuation_alloc();
1401 if (block) {
1402 _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
1403 } else {
1404 _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
e85f4437 1405 }
6b746eb4
A
1406 // reference `ds` so that it doesn't show up as a leak
1407 dc->dc_data = ds;
1408 _dispatch_trace_continuation_push(ds->_as_dq, dc);
1409 os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
1410
1411 if ((int64_t)when < 0) {
1412 // wall clock
1413 when = (dispatch_time_t)-((int64_t)when);
1414 } else {
1415 // absolute clock
1416 dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
1417 leeway = _dispatch_time_nano2mach(leeway);
98cf8cd2 1418 }
6b746eb4
A
1419 dt->dt_timer.target = when;
1420 dt->dt_timer.interval = UINT64_MAX;
1421 dt->dt_timer.deadline = when + leeway;
1422 dispatch_activate(ds);
517da941 1423}
0ab74447 1424
6b746eb4
A
1425DISPATCH_NOINLINE
1426void
1427dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
1428 dispatch_function_t func)
517da941 1429{
6b746eb4
A
1430 _dispatch_after(when, queue, ctxt, func, false);
1431}
0ab74447 1432
6b746eb4
A
1433#ifdef __BLOCKS__
1434void
1435dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1436 dispatch_block_t work)
1437{
1438 _dispatch_after(when, queue, NULL, work, true);
0ab74447 1439}
6b746eb4 1440#endif
0ab74447 1441
6b746eb4
A
1442#pragma mark -
1443#pragma mark dispatch_timers
1444
1445/*
1446 * The dispatch_timer_heap_t structure is a double min-heap of timers,
1447 * interleaving the by-target min-heap in the even slots, and the by-deadline
1448 * in the odd ones.
1449 *
1450 * The min element of these is held inline in the dispatch_timer_heap_t
1451 * structure, and further entries are held in segments.
1452 *
1453 * dth_segments is the number of allocated segments.
1454 *
1455 * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
1456 * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
1457 *
1458 * Segment n (dth_segments - 1) is the last segment and points its final n
1459 * entries to previous segments. Its address is held in the `dth_heap` field.
1460 *
1461 * segment n [ regular timer pointers | n-1 | k | 0 ]
1462 * | | |
1463 * segment n-1 <---------------------------' | |
1464 * segment k <--------------------------------' |
1465 * segment 0 <------------------------------------'
1466 */
1467#define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u
1468
1469/*
1470 * There are two min-heaps stored interleaved in a single array,
1471 * even indices are for the by-target min-heap, and odd indices for
1472 * the by-deadline one.
1473 */
1474#define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
1475#define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
1476#define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
1477 (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))
1478
1479DISPATCH_ALWAYS_INLINE
1480static inline uint32_t
1481_dispatch_timer_heap_capacity(uint32_t segments)
0ab74447 1482{
6b746eb4
A
1483 if (segments == 0) return 2;
1484 uint32_t seg_no = segments - 1;
1485 // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
1486 // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
1487 return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << seg_no) - seg_no;
1488}
0ab74447 1489
6b746eb4
A
1490DISPATCH_NOINLINE
1491static void
1492_dispatch_timer_heap_grow(dispatch_timer_heap_t dth)
1493{
1494 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1495 uint32_t seg_no = dth->dth_segments++;
1496 void **heap, **heap_prev = dth->dth_heap;
0ab74447 1497
6b746eb4
A
1498 if (seg_no > 0) {
1499 seg_capacity <<= (seg_no - 1);
517da941 1500 }
6b746eb4
A
1501 heap = _dispatch_calloc(seg_capacity, sizeof(void *));
1502 if (seg_no > 1) {
1503 uint32_t prev_seg_no = seg_no - 1;
1504 uint32_t prev_seg_capacity = seg_capacity >> 1;
1505 memcpy(&heap[seg_capacity - prev_seg_no],
1506 &heap_prev[prev_seg_capacity - prev_seg_no],
1507 prev_seg_no * sizeof(void *));
517da941 1508 }
6b746eb4
A
1509 if (seg_no > 0) {
1510 heap[seg_capacity - seg_no] = heap_prev;
517da941 1511 }
6b746eb4
A
1512 dth->dth_heap = heap;
1513}
1514
1515DISPATCH_NOINLINE
1516static void
1517_dispatch_timer_heap_shrink(dispatch_timer_heap_t dth)
1518{
1519 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1520 uint32_t seg_no = --dth->dth_segments;
1521 void **heap = dth->dth_heap, **heap_prev = NULL;
1522
1523 if (seg_no > 0) {
1524 seg_capacity <<= (seg_no - 1);
1525 heap_prev = heap[seg_capacity - seg_no];
517da941 1526 }
6b746eb4
A
1527 if (seg_no > 1) {
1528 uint32_t prev_seg_no = seg_no - 1;
1529 uint32_t prev_seg_capacity = seg_capacity >> 1;
1530 memcpy(&heap_prev[prev_seg_capacity - prev_seg_no],
1531 &heap[seg_capacity - prev_seg_no],
1532 prev_seg_no * sizeof(void *));
517da941 1533 }
6b746eb4
A
1534 dth->dth_heap = heap_prev;
1535 free(heap);
1536}
1537
1538DISPATCH_ALWAYS_INLINE
1539static inline dispatch_timer_source_refs_t *
1540_dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth, uint32_t idx)
1541{
1542 uint32_t seg_no, segments = dth->dth_segments;
1543 void **segment;
1544
1545 if (idx < DTH_ID_COUNT) {
1546 return &dth->dth_min[idx];
1547 }
1548 idx -= DTH_ID_COUNT;
1549
1550 // Derive the segment number from the index. Naming
1551 // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
1552 // 0: 0 .. (C - 1)
1553 // 1: C .. 2 * C - 1
1554 // k: 2^(k-1) * C .. 2^k * C - 1
1555 // so `k` can be derived from the first bit set in `idx`
1556 seg_no = (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1) -
1557 __builtin_clz(idx | (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1)));
1558 if (seg_no + 1 == segments) {
1559 segment = dth->dth_heap;
1560 } else {
1561 uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
1562 seg_capacity <<= (segments - 2);
1563 segment = dth->dth_heap[seg_capacity - seg_no - 1];
517da941 1564 }
6b746eb4
A
1565 if (seg_no) {
1566 idx -= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (seg_no - 1);
517da941 1567 }
6b746eb4 1568 return (dispatch_timer_source_refs_t *)(segment + idx);
0ab74447
A
1569}
1570
6b746eb4 1571DISPATCH_ALWAYS_INLINE
517da941 1572static inline void
6b746eb4
A
1573_dispatch_timer_heap_set(dispatch_timer_source_refs_t *slot,
1574 dispatch_timer_source_refs_t dt, uint32_t idx)
0ab74447 1575{
6b746eb4
A
1576 *slot = dt;
1577 dt->dt_heap_entry[DTH_HEAP_ID(idx)] = idx;
0ab74447
A
1578}
1579
6b746eb4
A
1580DISPATCH_ALWAYS_INLINE
1581static inline uint32_t
1582_dispatch_timer_heap_parent(uint32_t idx)
0ab74447 1583{
6b746eb4
A
1584 uint32_t heap_id = DTH_HEAP_ID(idx);
1585 idx = (idx - DTH_ID_COUNT) / 2; // go to the parent
1586 return DTH_IDX_FOR_HEAP_ID(idx, heap_id);
517da941 1587}
0ab74447 1588
6b746eb4
A
1589DISPATCH_ALWAYS_INLINE
1590static inline uint32_t
1591_dispatch_timer_heap_left_child(uint32_t idx)
517da941 1592{
6b746eb4
A
1593 uint32_t heap_id = DTH_HEAP_ID(idx);
1594 // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
1595 return 2 * idx + DTH_ID_COUNT - heap_id;
0ab74447
A
1596}
1597
6b746eb4
A
1598#if DISPATCH_HAVE_TIMER_COALESCING
1599DISPATCH_ALWAYS_INLINE
1600static inline uint32_t
1601_dispatch_timer_heap_walk_skip(uint32_t idx, uint32_t count)
beb15981 1602{
6b746eb4
A
1603 uint32_t heap_id = DTH_HEAP_ID(idx);
1604
1605 idx -= heap_id;
1606 if (unlikely(idx + DTH_ID_COUNT == count)) {
1607 // reaching `count` doesn't mean we're done, but there is a weird
1608 // corner case if the last item of the heap is a left child:
1609 //
1610 // /\
1611 // / \
1612 // / __\
1613 // /__/
1614 // ^
1615 //
1616 // The formula below would return the sibling of `idx` which is
1617 // out of bounds. Fortunately, the correct answer is the same
1618 // as for idx's parent
1619 idx = _dispatch_timer_heap_parent(idx);
1620 }
1621
1622 //
1623 // When considering the index in a non interleaved, 1-based array
1624 // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
1625 // for a given idx in our dual-heaps, that index is in one of two forms:
1626 //
1627 // (a) 1xxxx011111 or (b) 111111111
1628 // d i 0 d 0
1629 //
1630 // The first bit set is the row of the binary tree node (0-based).
1631 // The following digits from most to least significant represent the path
1632 // to that node, where `0` is a left turn and `1` a right turn.
1633 //
1634 // For example 0b0101 (5) is a node on row 2 accessed going left then right:
1635 //
1636 // row 0 1
1637 // / .
1638 // row 1 2 3
1639 // . \ . .
1640 // row 2 4 5 6 7
1641 // : : : : : : : :
1642 //
1643 // Skipping a sub-tree in walk order means going to the sibling of the last
1644 // node reached after we turned left. If the node was of the form (a),
1645 // this node is 1xxxx1, which for the above example is 0b0011 (3).
1646 // If the node was of the form (b) then we never took a left, meaning
1647 // we reached the last element in traversal order.
1648 //
1649
1650 //
1651 // we want to find
1652 // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
1653 // - which is offset by log_2(DTH_ID_COUNT) from the position of the least
1654 // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1655 // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
1656 // - which in turn is the same as the position of the least significant 1 in
1657 // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
1658 //
1659 dispatch_static_assert(powerof2(DTH_ID_COUNT));
1660 idx += DTH_ID_COUNT + DTH_ID_COUNT - 1;
1661 idx >>= __builtin_ctz(~idx);
1662
1663 //
1664 // `idx` is now either:
1665 // - 0 if it was the (b) case above, in which case the walk is done
1666 // - 1xxxx0 as the position in a 0 based array representation of a non
1667 // interleaved heap, so we just have to compute the interleaved index.
1668 //
1669 return likely(idx) ? DTH_ID_COUNT * idx + heap_id : UINT32_MAX;
beb15981
A
1670}
1671
6b746eb4
A
1672DISPATCH_ALWAYS_INLINE
1673static inline uint32_t
1674_dispatch_timer_heap_walk_next(uint32_t idx, uint32_t count)
beb15981 1675{
6b746eb4
A
1676 //
1677 // Goes to the next element in heap walk order, which is the prefix ordered
1678 // walk of the tree.
1679 //
1680 // From a given node, the next item to return is the left child if it
1681 // exists, else the first right sibling we find by walking our parent chain,
1682 // which is exactly what _dispatch_timer_heap_walk_skip() returns.
1683 //
1684 uint32_t lchild = _dispatch_timer_heap_left_child(idx);
1685 if (lchild < count) {
1686 return lchild;
beb15981 1687 }
6b746eb4 1688 return _dispatch_timer_heap_walk_skip(idx, count);
beb15981 1689}
beb15981 1690
6b746eb4
A
1691DISPATCH_NOINLINE
1692static uint64_t
1693_dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth, uint64_t limit)
1694{
1695 dispatch_timer_source_refs_t dri;
1696 uint32_t idx = _dispatch_timer_heap_left_child(DTH_TARGET_ID);
1697 uint32_t count = dth->dth_count;
1698 uint64_t tmp, target = dth->dth_min[DTH_TARGET_ID]->dt_timer.target;
1699
1700 while (idx < count) {
1701 dri = *_dispatch_timer_heap_get_slot(dth, idx);
1702 tmp = dri->dt_timer.target;
1703 if (tmp > limit) {
1704 // skip subtree since none of the targets below can be before limit
1705 idx = _dispatch_timer_heap_walk_skip(idx, count);
1706 } else {
1707 target = tmp;
1708 idx = _dispatch_timer_heap_walk_next(idx, count);
beb15981 1709 }
517da941 1710 }
6b746eb4 1711 return target;
517da941 1712}
6b746eb4 1713#endif // DISPATCH_HAVE_TIMER_COALESCING
e85f4437 1714
517da941 1715DISPATCH_NOINLINE
6b746eb4
A
1716static void
1717_dispatch_timer_heap_resift(dispatch_timer_heap_t dth,
1718 dispatch_timer_source_refs_t dt, uint32_t idx)
1719{
1720 dispatch_static_assert(offsetof(struct dispatch_timer_source_s, target) ==
1721 offsetof(struct dispatch_timer_source_s, heap_key[DTH_TARGET_ID]));
1722 dispatch_static_assert(offsetof(struct dispatch_timer_source_s, deadline) ==
1723 offsetof(struct dispatch_timer_source_s, heap_key[DTH_DEADLINE_ID]));
1724#define dth_cmp(hid, dt1, op, dt2) \
1725 (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])
1726
1727 dispatch_timer_source_refs_t *pslot, pdt;
1728 dispatch_timer_source_refs_t *cslot, cdt;
1729 dispatch_timer_source_refs_t *rslot, rdt;
1730 uint32_t cidx, dth_count = dth->dth_count;
1731 dispatch_timer_source_refs_t *slot;
1732 int heap_id = DTH_HEAP_ID(idx);
1733 bool sifted_up = false;
1734
1735 // try to sift up
1736
1737 slot = _dispatch_timer_heap_get_slot(dth, idx);
1738 while (idx >= DTH_ID_COUNT) {
1739 uint32_t pidx = _dispatch_timer_heap_parent(idx);
1740 pslot = _dispatch_timer_heap_get_slot(dth, pidx);
1741 pdt = *pslot;
1742 if (dth_cmp(heap_id, pdt, <=, dt)) {
1743 break;
517da941 1744 }
6b746eb4
A
1745 _dispatch_timer_heap_set(slot, pdt, idx);
1746 slot = pslot;
1747 idx = pidx;
1748 sifted_up = true;
1749 }
1750 if (sifted_up) {
1751 goto done;
e85f4437 1752 }
517da941 1753
6b746eb4
A
1754 // try to sift down
1755
1756 while ((cidx = _dispatch_timer_heap_left_child(idx)) < dth_count) {
1757 uint32_t ridx = cidx + DTH_ID_COUNT;
1758 cslot = _dispatch_timer_heap_get_slot(dth, cidx);
1759 cdt = *cslot;
1760 if (ridx < dth_count) {
1761 rslot = _dispatch_timer_heap_get_slot(dth, ridx);
1762 rdt = *rslot;
1763 if (dth_cmp(heap_id, cdt, >, rdt)) {
1764 cidx = ridx;
1765 cdt = rdt;
1766 cslot = rslot;
1767 }
1768 }
1769 if (dth_cmp(heap_id, dt, <=, cdt)) {
1770 break;
1771 }
1772 _dispatch_timer_heap_set(slot, cdt, idx);
1773 slot = cslot;
1774 idx = cidx;
1775 }
1776
1777done:
1778 _dispatch_timer_heap_set(slot, dt, idx);
1779#undef dth_cmp
517da941
A
1780}
1781
6b746eb4 1782DISPATCH_ALWAYS_INLINE
517da941 1783static void
6b746eb4
A
1784_dispatch_timer_heap_insert(dispatch_timer_heap_t dth,
1785 dispatch_timer_source_refs_t dt)
517da941 1786{
6b746eb4
A
1787 uint32_t idx = (dth->dth_count += DTH_ID_COUNT) - DTH_ID_COUNT;
1788
1789 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], ==,
1790 DTH_INVALID_ID, "target idx");
1791 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], ==,
1792 DTH_INVALID_ID, "deadline idx");
1793
1794 if (idx == 0) {
1795 dt->dt_heap_entry[DTH_TARGET_ID] = DTH_TARGET_ID;
1796 dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_DEADLINE_ID;
1797 dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = dt;
1798 return;
1799 }
beb15981 1800
6b746eb4
A
1801 if (unlikely(idx + DTH_ID_COUNT >
1802 _dispatch_timer_heap_capacity(dth->dth_segments))) {
1803 _dispatch_timer_heap_grow(dth);
beb15981 1804 }
6b746eb4
A
1805 _dispatch_timer_heap_resift(dth, dt, idx + DTH_TARGET_ID);
1806 _dispatch_timer_heap_resift(dth, dt, idx + DTH_DEADLINE_ID);
517da941
A
1807}
1808
6b746eb4 1809DISPATCH_NOINLINE
517da941 1810static void
6b746eb4
A
1811_dispatch_timer_heap_remove(dispatch_timer_heap_t dth,
1812 dispatch_timer_source_refs_t dt)
517da941 1813{
6b746eb4 1814 uint32_t idx = (dth->dth_count -= DTH_ID_COUNT);
517da941 1815
6b746eb4
A
1816 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
1817 DTH_INVALID_ID, "target idx");
1818 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
1819 DTH_INVALID_ID, "deadline idx");
1820
1821 if (idx == 0) {
1822 DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_TARGET_ID], ==, dt,
1823 "target slot");
1824 DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_DEADLINE_ID], ==, dt,
1825 "deadline slot");
1826 dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = NULL;
1827 goto clear_heap_entry;
517da941 1828 }
6b746eb4
A
1829
1830 for (uint32_t heap_id = 0; heap_id < DTH_ID_COUNT; heap_id++) {
1831 dispatch_timer_source_refs_t *slot, last_dt;
1832 slot = _dispatch_timer_heap_get_slot(dth, idx + heap_id);
1833 last_dt = *slot; *slot = NULL;
1834 if (last_dt != dt) {
1835 uint32_t removed_idx = dt->dt_heap_entry[heap_id];
1836 _dispatch_timer_heap_resift(dth, last_dt, removed_idx);
517da941 1837 }
517da941 1838 }
6b746eb4
A
1839 if (unlikely(idx <= _dispatch_timer_heap_capacity(dth->dth_segments - 1))) {
1840 _dispatch_timer_heap_shrink(dth);
517da941 1841 }
517da941 1842
6b746eb4
A
1843clear_heap_entry:
1844 dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
1845 dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
517da941
A
1846}
1847
6b746eb4
A
1848DISPATCH_ALWAYS_INLINE
1849static inline void
1850_dispatch_timer_heap_update(dispatch_timer_heap_t dth,
1851 dispatch_timer_source_refs_t dt)
517da941 1852{
6b746eb4
A
1853 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
1854 DTH_INVALID_ID, "target idx");
1855 DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
1856 DTH_INVALID_ID, "deadline idx");
517da941 1857
6b746eb4
A
1858
1859 _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_TARGET_ID]);
1860 _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_DEADLINE_ID]);
517da941
A
1861}
1862
6b746eb4
A
1863DISPATCH_ALWAYS_INLINE
1864static bool
1865_dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth,
1866 uint32_t count, uint32_t mask)
517da941 1867{
6b746eb4
A
1868 dispatch_timer_source_refs_t dt;
1869 bool changed = false;
1870 uint64_t tmp;
1871 uint32_t tidx;
1872
1873 for (tidx = 0; tidx < count; tidx++) {
1874 if (!(mask & (1u << tidx))) {
517da941
A
1875 continue;
1876 }
6b746eb4
A
1877
1878 dt = dth[tidx].dth_min[DTH_TARGET_ID];
1879 tmp = dt ? dt->dt_timer.target : UINT64_MAX;
1880 if (dth[tidx].dth_target != tmp) {
1881 dth[tidx].dth_target = tmp;
1882 changed = true;
1883 }
1884 dt = dth[tidx].dth_min[DTH_DEADLINE_ID];
1885 tmp = dt ? dt->dt_timer.deadline : UINT64_MAX;
1886 if (dth[tidx].dth_deadline != tmp) {
1887 dth[tidx].dth_deadline = tmp;
1888 changed = true;
1889 }
517da941 1890 }
6b746eb4 1891 return changed;
517da941
A
1892}
1893
1894static inline void
6b746eb4 1895_dispatch_timers_unregister(dispatch_timer_source_refs_t dt)
517da941 1896{
6b746eb4
A
1897 uint32_t tidx = dt->du_ident;
1898 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
1899
1900 _dispatch_timer_heap_remove(heap, dt);
1901 _dispatch_timers_reconfigure = true;
1902 _dispatch_timers_processing_mask |= 1 << tidx;
1903 dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
1904 dt->du_wlh = NULL;
517da941
A
1905}
1906
6b746eb4
A
1907static inline void
1908_dispatch_timers_register(dispatch_timer_source_refs_t dt, uint32_t tidx)
517da941 1909{
6b746eb4
A
1910 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
1911 if (_dispatch_unote_registered(dt)) {
1912 DISPATCH_TIMER_ASSERT(dt->du_ident, ==, tidx, "tidx");
1913 _dispatch_timer_heap_update(heap, dt);
1914 } else {
1915 dt->du_ident = tidx;
1916 _dispatch_timer_heap_insert(heap, dt);
517da941 1917 }
6b746eb4
A
1918 _dispatch_timers_reconfigure = true;
1919 _dispatch_timers_processing_mask |= 1 << tidx;
1920 dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
1921 dt->du_wlh = DISPATCH_WLH_ANON;
517da941
A
1922}
1923
6b746eb4
A
1924DISPATCH_ALWAYS_INLINE
1925static inline bool
1926_dispatch_source_timer_tryarm(dispatch_source_t ds)
517da941 1927{
6b746eb4
A
1928 dispatch_queue_flags_t oqf, nqf;
1929 return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
1930 if (oqf & (DSF_CANCELED | DQF_RELEASED)) {
1931 // do not install a cancelled timer
1932 os_atomic_rmw_loop_give_up(break);
1933 }
1934 nqf = oqf | DSF_ARMED;
1935 });
517da941
A
1936}
1937
6b746eb4
A
1938// Updates the ordered list of timers based on next fire date for changes to ds.
1939// Should only be called from the context of _dispatch_mgr_q.
517da941 1940static void
6b746eb4 1941_dispatch_timers_update(dispatch_unote_t du, uint32_t flags)
517da941 1942{
6b746eb4
A
1943 dispatch_timer_source_refs_t dr = du._dt;
1944 dispatch_source_t ds = _dispatch_source_from_refs(dr);
1945 const char *verb = "updated";
1946 bool will_register, disarm = false;
45201a42 1947
6b746eb4 1948 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
517da941 1949
6b746eb4
A
1950 if (unlikely(dr->du_ident == DISPATCH_TIMER_IDENT_CANCELED)) {
1951 dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
beb15981
A
1952 return;
1953 }
beb15981 1954
6b746eb4
A
1955 // Unregister timers that are unconfigured, disabled, suspended or have
1956 // missed intervals. Rearm after dispatch_set_timer(), resume or source
1957 // invoke will reenable them
1958 will_register = !(flags & DISPATCH_TIMERS_UNREGISTER) &&
1959 dr->dt_timer.target < INT64_MAX &&
1960 !os_atomic_load2o(ds, ds_pending_data, relaxed) &&
1961 !DISPATCH_QUEUE_IS_SUSPENDED(ds) &&
1962 !os_atomic_load2o(dr, dt_pending_config, relaxed);
1963 if (likely(!_dispatch_unote_registered(dr))) {
1964 dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
1965 if (unlikely(!will_register || !_dispatch_source_timer_tryarm(ds))) {
1966 return;
517da941 1967 }
6b746eb4
A
1968 verb = "armed";
1969 } else if (unlikely(!will_register)) {
1970 disarm = true;
1971 verb = "disarmed";
517da941 1972 }
517da941 1973
6b746eb4
A
1974 // The heap owns a +2 on dispatch sources it references
1975 //
1976 // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
1977 // when it wants to take over this +2 at the same time we are unregistering
1978 // the timer from the heap.
1979 //
1980 // Compute our refcount balance according to these rules, if our balance
1981 // would become negative we retain the source upfront, if it is positive, we
1982 // get rid of the extraneous refcounts after we're done touching the source.
1983 int refs = will_register ? -2 : 0;
1984 if (_dispatch_unote_registered(dr) && !(flags & DISPATCH_TIMERS_RETAIN_2)) {
1985 refs += 2;
1986 }
1987 if (refs < 0) {
1988 dispatch_assert(refs == -2);
1989 _dispatch_retain_2(ds);
517da941 1990 }
517da941 1991
6b746eb4
A
1992 uint32_t tidx = _dispatch_source_timer_idx(dr);
1993 if (unlikely(_dispatch_unote_registered(dr) &&
1994 (!will_register || dr->du_ident != tidx))) {
1995 _dispatch_timers_unregister(dr);
1996 }
1997 if (likely(will_register)) {
1998 _dispatch_timers_register(dr, tidx);
517da941 1999 }
beb15981 2000
6b746eb4
A
2001 if (disarm) {
2002 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
45201a42 2003 }
6b746eb4
A
2004 _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds, verb, dr);
2005 _dispatch_object_debug(ds, "%s", __func__);
2006 if (refs > 0) {
2007 dispatch_assert(refs == 2);
2008 _dispatch_release_2_tailcall(ds);
517da941 2009 }
517da941
A
2010}
2011
6b746eb4 2012#define DISPATCH_TIMER_MISSED_MARKER 1ul
517da941 2013
beb15981 2014DISPATCH_ALWAYS_INLINE
6b746eb4
A
2015static inline unsigned long
2016_dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt,
2017 uint64_t now, unsigned long prev)
beb15981 2018{
6b746eb4
A
2019 uint64_t missed = (now - dt->dt_timer.target) / dt->dt_timer.interval;
2020 if (++missed + prev > LONG_MAX) {
2021 missed = LONG_MAX - prev;
2022 }
2023 if (dt->dt_timer.interval < INT64_MAX) {
2024 uint64_t push_by = missed * dt->dt_timer.interval;
2025 dt->dt_timer.target += push_by;
2026 dt->dt_timer.deadline += push_by;
2027 } else {
2028 dt->dt_timer.target = UINT64_MAX;
2029 dt->dt_timer.deadline = UINT64_MAX;
2030 }
2031 prev += missed;
2032 return prev;
517da941
A
2033}
2034
6b746eb4
A
2035DISPATCH_ALWAYS_INLINE
2036static inline unsigned long
2037_dispatch_source_timer_data(dispatch_source_t ds, dispatch_unote_t du)
517da941 2038{
6b746eb4
A
2039 dispatch_timer_source_refs_t dr = du._dt;
2040 unsigned long data, prev, clear_prev = 0;
517da941 2041
6b746eb4
A
2042 os_atomic_rmw_loop2o(ds, ds_pending_data, prev, clear_prev, relaxed, {
2043 data = prev >> 1;
2044 if (unlikely(prev & DISPATCH_TIMER_MISSED_MARKER)) {
2045 os_atomic_rmw_loop_give_up(goto handle_missed_intervals);
517da941 2046 }
6b746eb4
A
2047 });
2048 return data;
2049
2050handle_missed_intervals:
2051 // The timer may be in _dispatch_source_invoke2() already for other
2052 // reasons such as running the registration handler when ds_pending_data
2053 // is changed by _dispatch_timers_run2() without holding the drain lock.
2054 //
2055 // We hence need dependency ordering to pair with the release barrier
2056 // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
2057 os_atomic_thread_fence(dependency);
2058 dr = os_atomic_force_dependency_on(dr, data);
2059
2060 uint64_t now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr->du_ident));
2061 if (now >= dr->dt_timer.target) {
2062 OS_COMPILER_CAN_ASSUME(dr->dt_timer.interval < INT64_MAX);
2063 data = _dispatch_source_timer_compute_missed(dr, now, data);
2064 }
2065
2066 // When we see the MISSED_MARKER the manager has given up on this timer
2067 // and expects the handler to call "resume".
2068 //
2069 // However, it may not have reflected this into the atomic flags yet
2070 // so make sure _dispatch_source_invoke2() sees the timer is disarmed
2071 //
2072 // The subsequent _dispatch_source_refs_resume() will enqueue the source
2073 // on the manager and make the changes to `ds_timer` above visible.
2074 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
2075 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
2076 return data;
beb15981
A
2077}
2078
6b746eb4
A
2079static inline void
2080_dispatch_timers_run2(dispatch_clock_now_cache_t nows, uint32_t tidx)
beb15981 2081{
6b746eb4
A
2082 dispatch_timer_source_refs_t dr;
2083 dispatch_source_t ds;
2084 uint64_t data, pending_data;
2085 uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
2086
2087 while ((dr = _dispatch_timers_heap[tidx].dth_min[DTH_TARGET_ID])) {
2088 DISPATCH_TIMER_ASSERT(dr->du_filter, ==, DISPATCH_EVFILT_TIMER,
2089 "invalid filter");
2090 DISPATCH_TIMER_ASSERT(dr->du_ident, ==, tidx, "tidx");
2091 DISPATCH_TIMER_ASSERT(dr->dt_timer.target, !=, 0, "missing target");
2092 ds = _dispatch_source_from_refs(dr);
2093 if (dr->dt_timer.target > now) {
2094 // Done running timers for now.
2095 break;
45201a42 2096 }
6b746eb4
A
2097 if (dr->du_fflags & DISPATCH_TIMER_AFTER) {
2098 _dispatch_trace_timer_fire(dr, 1, 1);
2099 _dispatch_source_merge_evt(dr, EV_ONESHOT, 1, 0, 0);
2100 _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds, dr);
2101 _dispatch_object_debug(ds, "%s", __func__);
2102 continue;
beb15981 2103 }
6b746eb4
A
2104
2105 data = os_atomic_load2o(ds, ds_pending_data, relaxed);
2106 if (unlikely(data)) {
2107 // the release barrier is required to make the changes
2108 // to `ds_timer` visible to _dispatch_source_timer_data()
2109 if (os_atomic_cmpxchg2o(ds, ds_pending_data, data,
2110 data | DISPATCH_TIMER_MISSED_MARKER, release)) {
2111 _dispatch_timers_update(dr, DISPATCH_TIMERS_UNREGISTER);
2112 continue;
2113 }
beb15981 2114 }
6b746eb4
A
2115
2116 data = _dispatch_source_timer_compute_missed(dr, now, 0);
2117 _dispatch_timers_update(dr, DISPATCH_TIMERS_RETAIN_2);
2118 pending_data = data << 1;
2119 if (!_dispatch_unote_registered(dr) && dr->dt_timer.target < INT64_MAX){
2120 // if we unregistered because of suspension we have to fake we
2121 // missed events.
2122 pending_data |= DISPATCH_TIMER_MISSED_MARKER;
2123 os_atomic_store2o(ds, ds_pending_data, pending_data, release);
2124 } else {
2125 os_atomic_store2o(ds, ds_pending_data, pending_data, relaxed);
beb15981 2126 }
6b746eb4
A
2127 _dispatch_trace_timer_fire(dr, data, data);
2128 _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds, dr);
2129 _dispatch_object_debug(ds, "%s", __func__);
2130 dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
beb15981
A
2131 }
2132}
2133
6b746eb4
A
2134DISPATCH_NOINLINE
2135static void
2136_dispatch_timers_run(dispatch_clock_now_cache_t nows)
beb15981 2137{
6b746eb4
A
2138 uint32_t tidx;
2139 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2140 if (_dispatch_timers_heap[tidx].dth_count) {
2141 _dispatch_timers_run2(nows, tidx);
517da941 2142 }
517da941 2143 }
517da941
A
2144}
2145
6b746eb4
A
2146#if DISPATCH_HAVE_TIMER_COALESCING
2147#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
2148 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
517da941 2149
6b746eb4
A
2150static const uint64_t _dispatch_kevent_coalescing_window[] = {
2151 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
2152#if DISPATCH_HAVE_TIMER_QOS
2153 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
2154 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
517da941 2155#endif
6b746eb4
A
2156};
2157#endif // DISPATCH_HAVE_TIMER_COALESCING
517da941 2158
6b746eb4
A
2159static inline dispatch_timer_delay_s
2160_dispatch_timers_get_delay(dispatch_timer_heap_t dth, dispatch_clock_t clock,
2161 uint32_t qos, dispatch_clock_now_cache_t nows)
beb15981 2162{
6b746eb4
A
2163 uint64_t target = dth->dth_target, deadline = dth->dth_deadline;
2164 uint64_t delta = INT64_MAX, dldelta = INT64_MAX;
2165 dispatch_timer_delay_s rc;
2166
2167 dispatch_assert(target <= deadline);
2168 if (delta == 0 || target >= INT64_MAX) {
2169 goto done;
beb15981 2170 }
517da941 2171
6b746eb4
A
2172 if (qos < DISPATCH_TIMER_QOS_COUNT && dth->dth_count > 2) {
2173#if DISPATCH_HAVE_TIMER_COALESCING
2174 // Timer pre-coalescing <rdar://problem/13222034>
2175 // When we have several timers with this target/deadline bracket:
2176 //
2177 // Target window Deadline
2178 // V <-------V
2179 // t1: [...........|.................]
2180 // t2: [......|.......]
2181 // t3: [..|..........]
2182 // t4: | [.............]
2183 // ^
2184 // Optimal Target
2185 //
2186 // Coalescing works better if the Target is delayed to "Optimal", by
2187 // picking the latest target that isn't too close to the deadline.
2188 uint64_t window = _dispatch_kevent_coalescing_window[qos];
2189 if (target + window < deadline) {
2190 uint64_t latest = deadline - window;
2191 target = _dispatch_timer_heap_max_target_before(dth, latest);
2192 }
2193#endif
beb15981
A
2194 }
2195
6b746eb4
A
2196 uint64_t now = _dispatch_time_now_cached(clock, nows);
2197 if (target <= now) {
2198 delta = 0;
2199 dldelta = 0;
2200 goto done;
2201 }
2202
2203 uint64_t tmp = target - now;
2204 if (clock != DISPATCH_CLOCK_WALL) {
2205 tmp = _dispatch_time_mach2nano(tmp);
2206 }
2207 if (tmp < delta) {
2208 delta = tmp;
beb15981
A
2209 }
2210
6b746eb4
A
2211 tmp = deadline - now;
2212 if (clock != DISPATCH_CLOCK_WALL) {
2213 tmp = _dispatch_time_mach2nano(tmp);
2214 }
2215 if (tmp < dldelta) {
2216 dldelta = tmp;
2217 }
2218
2219done:
2220 rc.delay = delta;
2221 rc.leeway = delta < INT64_MAX ? dldelta - delta : INT64_MAX;
2222 return rc;
517da941
A
2223}
2224
6b746eb4
A
2225static bool
2226_dispatch_timers_program2(dispatch_clock_now_cache_t nows, uint32_t tidx)
517da941 2227{
6b746eb4
A
2228 uint32_t qos = DISPATCH_TIMER_QOS(tidx);
2229 dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx);
2230 dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
2231 dispatch_timer_delay_s range;
2232
2233 range = _dispatch_timers_get_delay(heap, clock, qos, nows);
2234 if (range.delay == 0 || range.delay >= INT64_MAX) {
2235 _dispatch_trace_next_timer_set(NULL, qos);
2236 if (heap->dth_flags & DTH_ARMED) {
2237 _dispatch_event_loop_timer_delete(tidx);
2238 }
2239 return range.delay == 0;
2240 }
2241
2242 _dispatch_trace_next_timer_set(heap->dth_min[DTH_TARGET_ID], qos);
2243 _dispatch_trace_next_timer_program(range.delay, qos);
2244 _dispatch_event_loop_timer_arm(tidx, range, nows);
2245 return false;
beb15981
A
2246}
2247
beb15981 2248DISPATCH_NOINLINE
6b746eb4
A
2249static bool
2250_dispatch_timers_program(dispatch_clock_now_cache_t nows)
beb15981 2251{
6b746eb4
A
2252 bool poll = false;
2253 uint32_t tidx, timerm = _dispatch_timers_processing_mask;
2254
2255 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2256 if (timerm & (1 << tidx)) {
2257 poll |= _dispatch_timers_program2(nows, tidx);
2258 }
beb15981 2259 }
6b746eb4 2260 return poll;
beb15981
A
2261}
2262
2263DISPATCH_NOINLINE
2264static bool
6b746eb4 2265_dispatch_timers_configure(void)
beb15981 2266{
6b746eb4
A
2267 // Find out if there is a new target/deadline on the timer lists
2268 return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap,
2269 countof(_dispatch_timers_heap), _dispatch_timers_processing_mask);
2270}
2271
2272static inline bool
2273_dispatch_mgr_timers(void)
2274{
2275 dispatch_clock_now_cache_s nows = { };
2276 bool expired = _dispatch_timers_expired;
2277 if (unlikely(expired)) {
2278 _dispatch_timers_run(&nows);
2279 }
2280 _dispatch_mgr_trace_timers_wakes();
2281 bool reconfigure = _dispatch_timers_reconfigure;
2282 if (unlikely(reconfigure || expired)) {
2283 if (reconfigure) {
2284 reconfigure = _dispatch_timers_configure();
2285 _dispatch_timers_reconfigure = false;
2286 }
2287 if (reconfigure || expired) {
2288 expired = _dispatch_timers_expired = _dispatch_timers_program(&nows);
2289 }
2290 _dispatch_timers_processing_mask = 0;
2291 }
2292 return expired;
2293}
2294
2295#pragma mark -
2296#pragma mark dispatch_mgr
2297
2298void
2299_dispatch_mgr_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
2300 DISPATCH_UNUSED dispatch_qos_t qos)
2301{
2302 uint64_t dq_state;
2303 _dispatch_trace_continuation_push(dq, dou._do);
2304 if (unlikely(_dispatch_queue_push_update_tail(dq, dou._do))) {
2305 _dispatch_queue_push_update_head(dq, dou._do);
2306 dq_state = os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
2307 if (!_dq_state_drain_locked_by_self(dq_state)) {
2308 _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
beb15981
A
2309 }
2310 }
517da941
A
2311}
2312
6b746eb4
A
2313DISPATCH_NORETURN
2314void
2315_dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq,
2316 DISPATCH_UNUSED dispatch_qos_t qos,
2317 DISPATCH_UNUSED dispatch_wakeup_flags_t flags)
2318{
2319 DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
2320}
2321
2322#if DISPATCH_USE_MGR_THREAD
517da941
A
2323DISPATCH_NOINLINE DISPATCH_NORETURN
2324static void
2325_dispatch_mgr_invoke(void)
2326{
6b746eb4
A
2327#if DISPATCH_EVENT_BACKEND_KEVENT
2328 dispatch_kevent_s evbuf[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT];
2329#endif
2330 dispatch_deferred_items_s ddi = {
2331#if DISPATCH_EVENT_BACKEND_KEVENT
2332 .ddi_maxevents = DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
2333 .ddi_eventlist = evbuf,
2334#endif
2335 };
517da941 2336 bool poll;
beb15981 2337
beb15981 2338 _dispatch_deferred_items_set(&ddi);
517da941
A
2339 for (;;) {
2340 _dispatch_mgr_queue_drain();
2341 poll = _dispatch_mgr_timers();
98cf8cd2 2342 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
6b746eb4 2343 _dispatch_event_loop_drain(poll ? KEVENT_FLAG_IMMEDIATE : 0);
517da941
A
2344 }
2345}
beb15981 2346#endif // DISPATCH_USE_MGR_THREAD
517da941
A
2347
2348DISPATCH_NORETURN
2349void
45201a42 2350_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
6b746eb4 2351 dispatch_invoke_context_t dic DISPATCH_UNUSED,
45201a42 2352 dispatch_invoke_flags_t flags DISPATCH_UNUSED)
517da941 2353{
beb15981
A
2354#if DISPATCH_USE_KEVENT_WORKQUEUE
2355 if (_dispatch_kevent_workqueue_enabled) {
2356 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2357 "kevent workqueue enabled");
2358 }
2359#endif
2360#if DISPATCH_USE_MGR_THREAD
6b746eb4
A
2361 _dispatch_queue_set_current(&_dispatch_mgr_q);
2362 _dispatch_mgr_priority_init();
2363 _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
517da941
A
2364 // never returns, so burn bridges behind us & clear stack 2k ahead
2365 _dispatch_clear_stack(2048);
2366 _dispatch_mgr_invoke();
beb15981 2367#endif
517da941
A
2368}
2369
beb15981
A
2370#if DISPATCH_USE_KEVENT_WORKQUEUE
2371
6b746eb4
A
2372#define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)
2373
2374_Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN >=
2375 DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
2376 "our list should not be longer than the kernel's");
beb15981
A
2377
2378DISPATCH_ALWAYS_INLINE
6b746eb4
A
2379static inline dispatch_priority_t
2380_dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh,
2381 dispatch_deferred_items_t ddi)
beb15981 2382{
6b746eb4
A
2383 dispatch_assert(wlh);
2384 dispatch_priority_t old_dbp;
beb15981
A
2385
2386 pthread_priority_t pp = _dispatch_get_priority();
2387 if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2388 // If this thread does not have the event manager flag set, don't setup
2389 // as the dispatch manager and let the caller know to only process
2390 // the delivered events.
2391 //
2392 // Also add the NEEDS_UNBIND flag so that
2393 // _dispatch_priority_compute_update knows it has to unbind
2394 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
6b746eb4
A
2395 if (wlh == DISPATCH_WLH_ANON) {
2396 pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2397 } else {
2398 // pthread sets the flag when it is an event delivery thread
2399 // so we need to explicitly clear it
2400 pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2401 }
beb15981 2402 _dispatch_thread_setspecific(dispatch_priority_key,
6b746eb4
A
2403 (void *)(uintptr_t)pp);
2404 if (wlh != DISPATCH_WLH_ANON) {
2405 _dispatch_debug("wlh[%p]: handling events", wlh);
2406 } else {
2407 ddi->ddi_can_stash = true;
2408 }
beb15981
A
2409 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER;
2410 }
2411
2412 if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) ||
2413 !(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2414 // When the phtread kext is delivering kevents to us, and pthread
2415 // root queues are in use, then the pthread priority TSD is set
2416 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2417 //
2418 // Given that this isn't a valid QoS we need to fixup the TSD,
2419 // and the best option is to clear the qos/priority bits which tells
2420 // us to not do any QoS related calls on this thread.
2421 //
2422 // However, in that case the manager thread is opted out of QoS,
2423 // as far as pthread is concerned, and can't be turned into
2424 // something else, so we can't stash.
2425 pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK;
2426 }
2427 // Managers always park without mutating to a regular worker thread, and
2428 // hence never need to unbind from userland, and when draining a manager,
2429 // the NEEDS_UNBIND flag would cause the mutation to happen.
2430 // So we need to strip this flag
2431 pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2432 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
2433
2434 // ensure kevents registered from this thread are registered at manager QoS
6b746eb4 2435 old_dbp = _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER);
beb15981 2436 _dispatch_queue_set_current(&_dispatch_mgr_q);
6b746eb4
A
2437 _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
2438 return old_dbp;
beb15981
A
2439}
2440
2441DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2442static inline bool
6b746eb4 2443_dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp)
beb15981 2444{
6b746eb4
A
2445 bool needs_poll = _dispatch_queue_mgr_unlock(&_dispatch_mgr_q);
2446 _dispatch_reset_basepri(old_dbp);
2447 _dispatch_reset_basepri_override();
beb15981 2448 _dispatch_queue_set_current(NULL);
6b746eb4 2449 return needs_poll;
beb15981
A
2450}
2451
6b746eb4
A
2452DISPATCH_ALWAYS_INLINE
2453static void
2454_dispatch_wlh_worker_thread(dispatch_wlh_t wlh, dispatch_kevent_t events,
2455 int *nevents)
beb15981
A
2456{
2457 _dispatch_introspection_thread_add();
6b746eb4 2458 DISPATCH_PERF_MON_VAR_INIT
beb15981 2459
6b746eb4
A
2460 dispatch_deferred_items_s ddi = {
2461 .ddi_eventlist = events,
2462 };
2463 dispatch_priority_t old_dbp;
beb15981 2464
6b746eb4
A
2465 old_dbp = _dispatch_wlh_worker_thread_init(wlh, &ddi);
2466 if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
2467 _dispatch_perfmon_start_impl(true);
2468 } else {
2469 dispatch_assert(wlh == DISPATCH_WLH_ANON);
2470 wlh = DISPATCH_WLH_ANON;
beb15981 2471 }
6b746eb4
A
2472 _dispatch_deferred_items_set(&ddi);
2473 _dispatch_event_loop_merge(events, *nevents);
beb15981 2474
6b746eb4 2475 if (old_dbp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
beb15981
A
2476 _dispatch_mgr_queue_drain();
2477 bool poll = _dispatch_mgr_timers();
6b746eb4 2478 if (_dispatch_wlh_worker_thread_reset(old_dbp)) {
beb15981
A
2479 poll = true;
2480 }
6b746eb4
A
2481 if (poll) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
2482 } else if (ddi.ddi_stashed_dou._do) {
2483 _dispatch_debug("wlh[%p]: draining deferred item %p", wlh,
2484 ddi.ddi_stashed_dou._do);
2485 if (wlh == DISPATCH_WLH_ANON) {
2486 dispatch_assert(ddi.ddi_nevents == 0);
2487 _dispatch_deferred_items_set(NULL);
2488 _dispatch_root_queue_drain_deferred_item(&ddi
2489 DISPATCH_PERF_MON_ARGS);
2490 } else {
2491 _dispatch_root_queue_drain_deferred_wlh(&ddi
2492 DISPATCH_PERF_MON_ARGS);
beb15981 2493 }
beb15981 2494 }
517da941 2495
6b746eb4
A
2496 _dispatch_deferred_items_set(NULL);
2497 if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER &&
2498 !ddi.ddi_stashed_dou._do) {
2499 _dispatch_perfmon_end(perfmon_thread_event_no_steal);
517da941 2500 }
6b746eb4
A
2501 _dispatch_debug("returning %d deferred kevents", ddi.ddi_nevents);
2502 *nevents = ddi.ddi_nevents;
517da941
A
2503}
2504
beb15981 2505DISPATCH_NOINLINE
6b746eb4
A
2506void
2507_dispatch_kevent_worker_thread(dispatch_kevent_t *events, int *nevents)
517da941 2508{
6b746eb4
A
2509 if (!events && !nevents) {
2510 // events for worker thread request have already been delivered earlier
2511 return;
517da941 2512 }
6b746eb4
A
2513 if (!dispatch_assume(*nevents && *events)) return;
2514 _dispatch_adopt_wlh_anon();
2515 _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON, *events, nevents);
2516 _dispatch_reset_wlh();
517da941
A
2517}
2518
517da941 2519
6b746eb4
A
2520#endif // DISPATCH_USE_KEVENT_WORKQUEUE
2521#pragma mark -
2522#pragma mark dispatch_source_debug
517da941 2523
6b746eb4
A
2524static size_t
2525_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
517da941 2526{
6b746eb4
A
2527 dispatch_queue_t target = ds->do_targetq;
2528 dispatch_source_refs_t dr = ds->ds_refs;
2529 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%x, "
2530 "mask = 0x%x, pending_data = 0x%llx, registered = %d, "
2531 "armed = %d, deleted = %d%s, canceled = %d, ",
2532 target && target->dq_label ? target->dq_label : "", target,
2533 dr->du_ident, dr->du_fflags, (unsigned long long)ds->ds_pending_data,
2534 ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED),
2535 (bool)(ds->dq_atomic_flags & DSF_DELETED),
2536 (ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "",
2537 (bool)(ds->dq_atomic_flags & DSF_CANCELED));
517da941
A
2538}
2539
6b746eb4
A
2540static size_t
2541_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
517da941 2542{
6b746eb4
A
2543 dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
2544 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx"
2545 ", interval = 0x%llx, flags = 0x%x }, ",
2546 (unsigned long long)dr->dt_timer.target,
2547 (unsigned long long)dr->dt_timer.deadline,
2548 (unsigned long long)dr->dt_timer.interval, dr->du_fflags);
517da941
A
2549}
2550
6b746eb4
A
2551size_t
2552_dispatch_source_debug(dispatch_source_t ds, char *buf, size_t bufsiz)
517da941 2553{
6b746eb4
A
2554 dispatch_source_refs_t dr = ds->ds_refs;
2555 size_t offset = 0;
2556 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2557 dx_kind(ds), ds);
2558 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
2559 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
2560 if (dr->du_is_timer) {
2561 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
517da941 2562 }
6b746eb4
A
2563 offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
2564 "filter = %s }", dr, dr->du_is_direct ? " (direct)" : "",
2565 dr->du_type->dst_kind);
2566 return offset;
517da941 2567}