2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #include "protocol.h" // _dispatch_send_wakeup_runloop_thread
26 #if HAVE_PTHREAD_WORKQUEUES || DISPATCH_USE_INTERNAL_WORKQUEUE
27 #define DISPATCH_USE_WORKQUEUES 1
29 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
30 !defined(DISPATCH_ENABLE_THREAD_POOL)
31 #define DISPATCH_ENABLE_THREAD_POOL 1
33 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
34 #define DISPATCH_USE_PTHREAD_POOL 1
36 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || \
37 DISPATCH_DEBUG) && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
38 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
39 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && (DISPATCH_DEBUG || \
42 (!DISPATCH_USE_KEVENT_WORKQUEUE && !HAVE_PTHREAD_WORKQUEUE_QOS)) && \
43 !defined(DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP)
44 #define DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 1
46 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || \
47 DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || \
48 DISPATCH_USE_INTERNAL_WORKQUEUE
49 #if !DISPATCH_USE_INTERNAL_WORKQUEUE
50 #define DISPATCH_USE_WORKQ_PRIORITY 1
52 #define DISPATCH_USE_WORKQ_OPTIONS 1
55 #if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
56 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
57 #define pthread_workqueue_t void*
60 static void _dispatch_sig_thread(void *ctxt
);
61 static void _dispatch_cache_cleanup(void *value
);
62 static void _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
);
63 static void _dispatch_queue_cleanup(void *ctxt
);
64 static void _dispatch_wlh_cleanup(void *ctxt
);
65 static void _dispatch_deferred_items_cleanup(void *ctxt
);
66 static void _dispatch_frame_cleanup(void *ctxt
);
67 static void _dispatch_context_cleanup(void *ctxt
);
68 static void _dispatch_queue_barrier_complete(dispatch_queue_t dq
,
69 dispatch_qos_t qos
, dispatch_wakeup_flags_t flags
);
70 static void _dispatch_queue_non_barrier_complete(dispatch_queue_t dq
);
71 static void _dispatch_queue_push_sync_waiter(dispatch_queue_t dq
,
72 dispatch_sync_context_t dsc
, dispatch_qos_t qos
);
73 #if HAVE_PTHREAD_WORKQUEUE_QOS
74 static void _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq
,
75 dispatch_queue_t dq
, dispatch_qos_t qos
);
76 static inline void _dispatch_queue_class_wakeup_with_override(dispatch_queue_t
,
77 uint64_t dq_state
, dispatch_wakeup_flags_t flags
);
79 #if HAVE_PTHREAD_WORKQUEUES
80 static void _dispatch_worker_thread4(void *context
);
81 #if HAVE_PTHREAD_WORKQUEUE_QOS
82 static void _dispatch_worker_thread3(pthread_priority_t priority
);
84 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
85 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
88 #if DISPATCH_USE_PTHREAD_POOL
89 static void *_dispatch_worker_thread(void *context
);
92 #if DISPATCH_COCOA_COMPAT
93 static dispatch_once_t _dispatch_main_q_handle_pred
;
94 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq
,
95 dispatch_qos_t qos
, dispatch_wakeup_flags_t flags
);
96 static void _dispatch_runloop_queue_handle_init(void *ctxt
);
97 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
);
101 #pragma mark dispatch_root_queue
103 struct dispatch_pthread_root_queue_context_s
{
104 pthread_attr_t dpq_thread_attr
;
105 dispatch_block_t dpq_thread_configure
;
106 struct dispatch_semaphore_s dpq_thread_mediator
;
107 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks
;
109 typedef struct dispatch_pthread_root_queue_context_s
*
110 dispatch_pthread_root_queue_context_t
;
112 #if DISPATCH_ENABLE_THREAD_POOL
113 static struct dispatch_pthread_root_queue_context_s
114 _dispatch_pthread_root_queue_contexts
[] = {
115 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {
116 .dpq_thread_mediator
= {
117 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
119 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {
120 .dpq_thread_mediator
= {
121 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
123 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {
124 .dpq_thread_mediator
= {
125 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
127 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {
128 .dpq_thread_mediator
= {
129 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
131 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {
132 .dpq_thread_mediator
= {
133 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
135 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {
136 .dpq_thread_mediator
= {
137 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
139 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {
140 .dpq_thread_mediator
= {
141 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
143 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {
144 .dpq_thread_mediator
= {
145 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
147 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {
148 .dpq_thread_mediator
= {
149 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
151 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {
152 .dpq_thread_mediator
= {
153 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
155 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {
156 .dpq_thread_mediator
= {
157 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
159 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {
160 .dpq_thread_mediator
= {
161 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
166 #ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
167 #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
170 struct dispatch_root_queue_context_s
{
173 int volatile dgq_pending
;
174 #if DISPATCH_USE_WORKQUEUES
176 #if DISPATCH_USE_WORKQ_PRIORITY
179 #if DISPATCH_USE_WORKQ_OPTIONS
182 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
183 pthread_workqueue_t dgq_kworkqueue
;
185 #endif // DISPATCH_USE_WORKQUEUES
186 #if DISPATCH_USE_PTHREAD_POOL
188 int32_t volatile dgq_thread_pool_size
;
191 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
194 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
196 #define WORKQ_PRIO_INVALID (-1)
197 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
198 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
200 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
201 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
204 DISPATCH_CACHELINE_ALIGN
205 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
206 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {{{
207 #if DISPATCH_USE_WORKQUEUES
208 .dgq_qos
= QOS_CLASS_MAINTENANCE
,
209 #if DISPATCH_USE_WORKQ_PRIORITY
210 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
212 #if DISPATCH_USE_WORKQ_OPTIONS
216 #if DISPATCH_ENABLE_THREAD_POOL
217 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
218 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
],
221 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {{{
222 #if DISPATCH_USE_WORKQUEUES
223 .dgq_qos
= QOS_CLASS_MAINTENANCE
,
224 #if DISPATCH_USE_WORKQ_PRIORITY
225 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
227 #if DISPATCH_USE_WORKQ_OPTIONS
228 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
231 #if DISPATCH_ENABLE_THREAD_POOL
232 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
233 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
],
236 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {{{
237 #if DISPATCH_USE_WORKQUEUES
238 .dgq_qos
= QOS_CLASS_BACKGROUND
,
239 #if DISPATCH_USE_WORKQ_PRIORITY
240 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
242 #if DISPATCH_USE_WORKQ_OPTIONS
246 #if DISPATCH_ENABLE_THREAD_POOL
247 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
248 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
251 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {{{
252 #if DISPATCH_USE_WORKQUEUES
253 .dgq_qos
= QOS_CLASS_BACKGROUND
,
254 #if DISPATCH_USE_WORKQ_PRIORITY
255 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
257 #if DISPATCH_USE_WORKQ_OPTIONS
258 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
261 #if DISPATCH_ENABLE_THREAD_POOL
262 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
263 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
266 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {{{
267 #if DISPATCH_USE_WORKQUEUES
268 .dgq_qos
= QOS_CLASS_UTILITY
,
269 #if DISPATCH_USE_WORKQ_PRIORITY
270 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
272 #if DISPATCH_USE_WORKQ_OPTIONS
276 #if DISPATCH_ENABLE_THREAD_POOL
277 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
278 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
281 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {{{
282 #if DISPATCH_USE_WORKQUEUES
283 .dgq_qos
= QOS_CLASS_UTILITY
,
284 #if DISPATCH_USE_WORKQ_PRIORITY
285 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
287 #if DISPATCH_USE_WORKQ_OPTIONS
288 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
293 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
296 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {{{
297 #if DISPATCH_USE_WORKQUEUES
298 .dgq_qos
= QOS_CLASS_DEFAULT
,
299 #if DISPATCH_USE_WORKQ_PRIORITY
300 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
302 #if DISPATCH_USE_WORKQ_OPTIONS
306 #if DISPATCH_ENABLE_THREAD_POOL
307 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
308 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
311 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {{{
312 #if DISPATCH_USE_WORKQUEUES
313 .dgq_qos
= QOS_CLASS_DEFAULT
,
314 #if DISPATCH_USE_WORKQ_PRIORITY
315 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
317 #if DISPATCH_USE_WORKQ_OPTIONS
318 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
321 #if DISPATCH_ENABLE_THREAD_POOL
322 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
323 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
326 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {{{
327 #if DISPATCH_USE_WORKQUEUES
328 .dgq_qos
= QOS_CLASS_USER_INITIATED
,
329 #if DISPATCH_USE_WORKQ_PRIORITY
330 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
332 #if DISPATCH_USE_WORKQ_OPTIONS
336 #if DISPATCH_ENABLE_THREAD_POOL
337 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
338 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
341 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {{{
342 #if DISPATCH_USE_WORKQUEUES
343 .dgq_qos
= QOS_CLASS_USER_INITIATED
,
344 #if DISPATCH_USE_WORKQ_PRIORITY
345 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
347 #if DISPATCH_USE_WORKQ_OPTIONS
348 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
351 #if DISPATCH_ENABLE_THREAD_POOL
352 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
353 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
356 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {{{
357 #if DISPATCH_USE_WORKQUEUES
358 .dgq_qos
= QOS_CLASS_USER_INTERACTIVE
,
359 #if DISPATCH_USE_WORKQ_PRIORITY
360 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
362 #if DISPATCH_USE_WORKQ_OPTIONS
366 #if DISPATCH_ENABLE_THREAD_POOL
367 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
368 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
],
371 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {{{
372 #if DISPATCH_USE_WORKQUEUES
373 .dgq_qos
= QOS_CLASS_USER_INTERACTIVE
,
374 #if DISPATCH_USE_WORKQ_PRIORITY
375 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
377 #if DISPATCH_USE_WORKQ_OPTIONS
378 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
381 #if DISPATCH_ENABLE_THREAD_POOL
382 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
383 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
],
388 // 6618342 Contact the team that owns the Instrument DTrace probe before
389 // renaming this symbol
390 DISPATCH_CACHELINE_ALIGN
391 struct dispatch_queue_s _dispatch_root_queues
[] = {
392 #define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
393 ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
394 DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
395 DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
396 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
397 [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
398 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
399 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
400 .do_ctxt = &_dispatch_root_queue_contexts[ \
401 _DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
402 .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
403 .dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
404 DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
405 ((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
406 DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
409 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE
, 0,
410 .dq_label
= "com.apple.root.maintenance-qos",
413 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE
, DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
414 .dq_label
= "com.apple.root.maintenance-qos.overcommit",
417 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND
, 0,
418 .dq_label
= "com.apple.root.background-qos",
421 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND
, DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
422 .dq_label
= "com.apple.root.background-qos.overcommit",
425 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY
, 0,
426 .dq_label
= "com.apple.root.utility-qos",
429 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY
, DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
430 .dq_label
= "com.apple.root.utility-qos.overcommit",
433 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT
, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE
,
434 .dq_label
= "com.apple.root.default-qos",
437 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT
,
438 DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE
| DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
439 .dq_label
= "com.apple.root.default-qos.overcommit",
442 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED
, 0,
443 .dq_label
= "com.apple.root.user-initiated-qos",
446 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED
, DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
447 .dq_label
= "com.apple.root.user-initiated-qos.overcommit",
450 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE
, 0,
451 .dq_label
= "com.apple.root.user-interactive-qos",
454 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE
, DISPATCH_PRIORITY_FLAG_OVERCOMMIT
,
455 .dq_label
= "com.apple.root.user-interactive-qos.overcommit",
460 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
461 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
462 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
463 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
464 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
465 &_dispatch_root_queues
[
466 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
467 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
468 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
469 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
470 &_dispatch_root_queues
[
471 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
472 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
473 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
474 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
475 &_dispatch_root_queues
[
476 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
477 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
478 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
479 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
480 &_dispatch_root_queues
[
481 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
483 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
485 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
486 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
488 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
489 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
492 // 6618342 Contact the team that owns the Instrument DTrace probe before
493 // renaming this symbol
494 DISPATCH_CACHELINE_ALIGN
495 struct dispatch_queue_s _dispatch_mgr_q
= {
496 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr
),
497 .dq_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
498 DISPATCH_QUEUE_ROLE_BASE_ANON
,
499 .do_targetq
= &_dispatch_mgr_root_queue
,
500 .dq_label
= "com.apple.libdispatch-manager",
501 .dq_atomic_flags
= DQF_WIDTH(1),
502 .dq_priority
= DISPATCH_PRIORITY_FLAG_MANAGER
|
503 DISPATCH_PRIORITY_SATURATED_OVERRIDE
,
508 dispatch_get_global_queue(long priority
, unsigned long flags
)
510 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
511 return DISPATCH_BAD_INPUT
;
513 dispatch_qos_t qos
= _dispatch_qos_from_queue_priority(priority
);
514 #if !HAVE_PTHREAD_WORKQUEUE_QOS
515 if (qos
== QOS_CLASS_MAINTENANCE
) {
516 qos
= DISPATCH_QOS_BACKGROUND
;
517 } else if (qos
== QOS_CLASS_USER_INTERACTIVE
) {
518 qos
= DISPATCH_QOS_USER_INITIATED
;
521 if (qos
== DISPATCH_QOS_UNSPECIFIED
) {
522 return DISPATCH_BAD_INPUT
;
524 return _dispatch_get_root_queue(qos
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
527 DISPATCH_ALWAYS_INLINE
528 static inline dispatch_queue_t
529 _dispatch_get_current_queue(void)
531 return _dispatch_queue_get_current() ?:
532 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, true);
536 dispatch_get_current_queue(void)
538 return _dispatch_get_current_queue();
541 DISPATCH_NOINLINE DISPATCH_NORETURN
543 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
545 _dispatch_client_assert_fail(
546 "Block was %sexpected to execute on queue [%s]",
547 expected
? "" : "not ", dq
->dq_label
?: "");
550 DISPATCH_NOINLINE DISPATCH_NORETURN
552 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq
)
554 _dispatch_client_assert_fail(
555 "Block was expected to act as a barrier on queue [%s]",
560 dispatch_assert_queue(dispatch_queue_t dq
)
562 unsigned long metatype
= dx_metatype(dq
);
563 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
564 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
565 "dispatch_assert_queue()");
567 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
568 if (likely(_dq_state_drain_locked_by_self(dq_state
))) {
571 // we can look at the width: if it is changing while we read it,
572 // it means that a barrier is running on `dq` concurrently, which
573 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
575 // However if we can have thread bound queues, these mess with lock
576 // ownership and we always have to take the slowpath
577 if (likely(DISPATCH_COCOA_COMPAT
|| dq
->dq_width
> 1)) {
578 if (likely(_dispatch_thread_frame_find_queue(dq
))) {
582 _dispatch_assert_queue_fail(dq
, true);
586 dispatch_assert_queue_not(dispatch_queue_t dq
)
588 unsigned long metatype
= dx_metatype(dq
);
589 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
590 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
591 "dispatch_assert_queue_not()");
593 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
594 if (likely(!_dq_state_drain_locked_by_self(dq_state
))) {
595 // we can look at the width: if it is changing while we read it,
596 // it means that a barrier is running on `dq` concurrently, which
597 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
599 // However if we can have thread bound queues, these mess with lock
600 // ownership and we always have to take the slowpath
601 if (likely(!DISPATCH_COCOA_COMPAT
&& dq
->dq_width
== 1)) {
604 if (likely(!_dispatch_thread_frame_find_queue(dq
))) {
608 _dispatch_assert_queue_fail(dq
, false);
612 dispatch_assert_queue_barrier(dispatch_queue_t dq
)
614 dispatch_assert_queue(dq
);
616 if (likely(dq
->dq_width
== 1)) {
620 if (likely(dq
->do_targetq
)) {
621 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
622 if (likely(_dq_state_is_in_barrier(dq_state
))) {
627 _dispatch_assert_queue_barrier_fail(dq
);
630 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
631 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
632 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
634 #define _dispatch_root_queue_debug(...)
635 #define _dispatch_debug_root_queue(...)
639 #pragma mark dispatch_init
642 _dispatch_root_queues_init_workq(int *wq_supported
)
647 #if DISPATCH_USE_WORKQUEUES
648 bool disable_wq
= false; (void)disable_wq
;
649 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
650 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
652 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
653 bool disable_qos
= false;
655 disable_qos
= slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
657 #if DISPATCH_USE_KEVENT_WORKQUEUE
658 bool disable_kevent_wq
= false;
659 #if DISPATCH_DEBUG || DISPATCH_PROFILE
660 disable_kevent_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
664 if (!disable_wq
&& !disable_qos
) {
665 *wq_supported
= _pthread_workqueue_supported();
666 #if DISPATCH_USE_KEVENT_WORKQUEUE
667 if (!disable_kevent_wq
&& (*wq_supported
& WORKQ_FEATURE_KEVENT
)) {
668 r
= _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3
,
669 (pthread_workqueue_function_kevent_t
)
670 _dispatch_kevent_worker_thread
,
671 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
672 #if DISPATCH_USE_MGR_THREAD
673 _dispatch_kevent_workqueue_enabled
= !r
;
677 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
678 if (*wq_supported
& WORKQ_FEATURE_FINEPRIO
) {
679 #if DISPATCH_USE_MGR_THREAD
680 r
= _pthread_workqueue_init(_dispatch_worker_thread3
,
681 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
685 if (!(*wq_supported
& WORKQ_FEATURE_MAINTENANCE
)) {
686 DISPATCH_INTERNAL_CRASH(*wq_supported
,
687 "QoS Maintenance support required");
690 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
691 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
692 if (!result
&& !disable_wq
) {
693 pthread_workqueue_setdispatchoffset_np(
694 offsetof(struct dispatch_queue_s
, dq_serialnum
));
695 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
696 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
697 (void)dispatch_assume_zero(r
);
701 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
702 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
704 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
705 pthread_workqueue_attr_t pwq_attr
;
707 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
708 (void)dispatch_assume_zero(r
);
712 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
713 pthread_workqueue_t pwq
= NULL
;
714 dispatch_root_queue_context_t qc
;
715 qc
= &_dispatch_root_queue_contexts
[i
];
716 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 if (!disable_wq
&& qc
->dgq_wq_priority
!= WORKQ_PRIO_INVALID
) {
718 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
719 qc
->dgq_wq_priority
);
720 (void)dispatch_assume_zero(r
);
721 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
723 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
724 (void)dispatch_assume_zero(r
);
725 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
726 (void)dispatch_assume_zero(r
);
727 result
= result
|| dispatch_assume(pwq
);
729 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
731 qc
->dgq_kworkqueue
= pwq
;
733 qc
->dgq_kworkqueue
= (void*)(~0ul);
734 // because the fastpath of _dispatch_global_queue_poke didn't
735 // know yet that we're using the internal pool implementation
736 // we have to undo its setting of dgq_pending
740 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
742 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
743 (void)dispatch_assume_zero(r
);
747 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
748 #endif // DISPATCH_USE_WORKQUEUES
752 #if DISPATCH_USE_PTHREAD_POOL
754 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
755 int32_t pool_size
, bool overcommit
)
757 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
758 int32_t thread_pool_size
= overcommit
? DISPATCH_WORKQ_MAX_PTHREAD_COUNT
:
759 (int32_t)dispatch_hw_config(active_cpus
);
760 if (slowpath(pool_size
) && pool_size
< thread_pool_size
) {
761 thread_pool_size
= pool_size
;
763 qc
->dgq_thread_pool_size
= thread_pool_size
;
764 #if DISPATCH_USE_WORKQUEUES
766 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
767 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
768 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
769 #if HAVE_PTHREAD_WORKQUEUE_QOS
770 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
771 &pqc
->dpq_thread_attr
, qc
->dgq_qos
, 0));
774 #endif // HAVE_PTHREAD_WORKQUEUES
775 _dispatch_sema4_t
*sema
= &pqc
->dpq_thread_mediator
.dsema_sema
;
776 _dispatch_sema4_init(sema
, _DSEMA4_POLICY_LIFO
);
777 _dispatch_sema4_create(sema
, _DSEMA4_POLICY_LIFO
);
779 #endif // DISPATCH_USE_PTHREAD_POOL
782 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED
)
785 _dispatch_fork_becomes_unsafe();
786 if (!_dispatch_root_queues_init_workq(&wq_supported
)) {
787 #if DISPATCH_ENABLE_THREAD_POOL
789 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
790 bool overcommit
= true;
791 #if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
792 // some software hangs if the non-overcommitting queues do not
793 // overcommit when threads block. Someday, this behavior should
794 // apply to all platforms
799 _dispatch_root_queue_init_pthread_pool(
800 &_dispatch_root_queue_contexts
[i
], 0, overcommit
);
803 DISPATCH_INTERNAL_CRASH((errno
<< 16) | wq_supported
,
804 "Root queue initialization failed");
805 #endif // DISPATCH_ENABLE_THREAD_POOL
810 _dispatch_root_queues_init(void)
812 static dispatch_once_t _dispatch_root_queues_pred
;
813 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
814 _dispatch_root_queues_init_once
);
817 DISPATCH_EXPORT DISPATCH_NOTHROW
819 libdispatch_init(void)
821 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 2 * DISPATCH_QOS_MAX
);
823 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
824 -DISPATCH_QUEUE_PRIORITY_HIGH
);
825 dispatch_assert(countof(_dispatch_root_queues
) ==
826 DISPATCH_ROOT_QUEUE_COUNT
);
827 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
828 DISPATCH_ROOT_QUEUE_COUNT
);
829 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
830 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
831 sizeof(_dispatch_wq2root_queues
[0][0]) ==
832 WORKQ_NUM_PRIOQUEUE
* 2);
834 #if DISPATCH_ENABLE_THREAD_POOL
835 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts
) ==
836 DISPATCH_ROOT_QUEUE_COUNT
);
839 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_next
) ==
840 offsetof(struct dispatch_object_s
, do_next
));
841 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_vtable
) ==
842 offsetof(struct dispatch_object_s
, do_vtable
));
843 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
844 DISPATCH_CONTINUATION_SIZE
);
845 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
847 dispatch_assert(offsetof(struct dispatch_queue_s
, dq_state
) % _Alignof(uint64_t) == 0);
848 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
849 DISPATCH_CACHELINE_SIZE
== 0);
851 #if HAVE_PTHREAD_WORKQUEUE_QOS
852 dispatch_qos_t qos
= _dispatch_qos_from_qos_class(qos_class_main());
853 dispatch_priority_t pri
= _dispatch_priority_make(qos
, 0);
854 _dispatch_main_q
.dq_priority
= _dispatch_priority_with_override_qos(pri
, qos
);
856 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
857 _dispatch_set_qos_class_enabled
= 1;
862 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
863 _dispatch_thread_key_create(&__dispatch_tsd_key
, _libdispatch_tsd_cleanup
);
865 _dispatch_thread_key_create(&dispatch_priority_key
, NULL
);
866 _dispatch_thread_key_create(&dispatch_r2k_key
, NULL
);
867 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
868 _dispatch_thread_key_create(&dispatch_frame_key
, _dispatch_frame_cleanup
);
869 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
870 _dispatch_thread_key_create(&dispatch_context_key
, _dispatch_context_cleanup
);
871 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key
,
873 _dispatch_thread_key_create(&dispatch_basepri_key
, NULL
);
874 #if DISPATCH_INTROSPECTION
875 _dispatch_thread_key_create(&dispatch_introspection_key
, NULL
);
876 #elif DISPATCH_PERF_MON
877 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
879 _dispatch_thread_key_create(&dispatch_wlh_key
, _dispatch_wlh_cleanup
);
880 _dispatch_thread_key_create(&dispatch_voucher_key
, _voucher_thread_cleanup
);
881 _dispatch_thread_key_create(&dispatch_deferred_items_key
,
882 _dispatch_deferred_items_cleanup
);
885 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
886 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
887 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
];
890 _dispatch_queue_set_current(&_dispatch_main_q
);
891 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
893 #if DISPATCH_USE_PTHREAD_ATFORK
894 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
895 dispatch_atfork_parent
, dispatch_atfork_child
));
897 _dispatch_hw_config_init();
898 _dispatch_time_init();
899 _dispatch_vtable_init();
902 _dispatch_introspection_init();
905 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
907 #include <sys/syscall.h>
911 DISPATCH_ALWAYS_INLINE
915 return (pid_t
) syscall(SYS_gettid
);
918 #error "SYS_gettid unavailable on this system"
919 #endif /* SYS_gettid */
920 #endif /* ! __ANDROID__ */
922 #define _tsd_call_cleanup(k, f) do { \
923 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
927 static void (*_dispatch_thread_detach_callback
)(void);
930 _dispatch_install_thread_detach_callback(dispatch_function_t cb
)
932 if (os_atomic_xchg(&_dispatch_thread_detach_callback
, cb
, relaxed
)) {
933 DISPATCH_CLIENT_CRASH(0, "Installing a thread detach callback twice");
939 _libdispatch_tsd_cleanup(void *ctx
)
941 struct dispatch_tsd
*tsd
= (struct dispatch_tsd
*) ctx
;
943 _tsd_call_cleanup(dispatch_priority_key
, NULL
);
944 _tsd_call_cleanup(dispatch_r2k_key
, NULL
);
946 _tsd_call_cleanup(dispatch_queue_key
, _dispatch_queue_cleanup
);
947 _tsd_call_cleanup(dispatch_frame_key
, _dispatch_frame_cleanup
);
948 _tsd_call_cleanup(dispatch_cache_key
, _dispatch_cache_cleanup
);
949 _tsd_call_cleanup(dispatch_context_key
, _dispatch_context_cleanup
);
950 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key
,
952 _tsd_call_cleanup(dispatch_basepri_key
, NULL
);
953 #if DISPATCH_INTROSPECTION
954 _tsd_call_cleanup(dispatch_introspection_key
, NULL
);
955 #elif DISPATCH_PERF_MON
956 _tsd_call_cleanup(dispatch_bcounter_key
, NULL
);
958 _tsd_call_cleanup(dispatch_wlh_key
, _dispatch_wlh_cleanup
);
959 _tsd_call_cleanup(dispatch_voucher_key
, _voucher_thread_cleanup
);
960 _tsd_call_cleanup(dispatch_deferred_items_key
,
961 _dispatch_deferred_items_cleanup
);
963 if (_dispatch_thread_detach_callback
) {
964 _dispatch_thread_detach_callback();
972 libdispatch_tsd_init(void)
974 pthread_setspecific(__dispatch_tsd_key
, &__dispatch_tsd
);
975 __dispatch_tsd
.tid
= gettid();
981 _dispatch_queue_atfork_child(void)
983 dispatch_queue_t main_q
= &_dispatch_main_q
;
984 void *crash
= (void *)0x100;
987 if (_dispatch_queue_is_thread_bound(main_q
)) {
988 _dispatch_queue_set_bound_thread(main_q
);
991 if (!_dispatch_is_multithreaded_inline()) return;
993 main_q
->dq_items_head
= crash
;
994 main_q
->dq_items_tail
= crash
;
996 _dispatch_mgr_q
.dq_items_head
= crash
;
997 _dispatch_mgr_q
.dq_items_tail
= crash
;
999 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1000 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1001 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1007 _dispatch_fork_becomes_unsafe_slow(void)
1009 uint8_t value
= os_atomic_or(&_dispatch_unsafe_fork
,
1010 _DISPATCH_UNSAFE_FORK_MULTITHREADED
, relaxed
);
1011 if (value
& _DISPATCH_UNSAFE_FORK_PROHIBIT
) {
1012 DISPATCH_CLIENT_CRASH(0, "Transition to multithreaded is prohibited");
1018 _dispatch_prohibit_transition_to_multithreaded(bool prohibit
)
1021 uint8_t value
= os_atomic_or(&_dispatch_unsafe_fork
,
1022 _DISPATCH_UNSAFE_FORK_PROHIBIT
, relaxed
);
1023 if (value
& _DISPATCH_UNSAFE_FORK_MULTITHREADED
) {
1024 DISPATCH_CLIENT_CRASH(0, "The executable is already multithreaded");
1027 os_atomic_and(&_dispatch_unsafe_fork
,
1028 (uint8_t)~_DISPATCH_UNSAFE_FORK_PROHIBIT
, relaxed
);
1033 #pragma mark dispatch_queue_attr_t
1035 DISPATCH_ALWAYS_INLINE
1037 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class
, int relative_priority
)
1039 qos_class_t qos
= (qos_class_t
)qos_class
;
1041 case QOS_CLASS_MAINTENANCE
:
1042 case QOS_CLASS_BACKGROUND
:
1043 case QOS_CLASS_UTILITY
:
1044 case QOS_CLASS_DEFAULT
:
1045 case QOS_CLASS_USER_INITIATED
:
1046 case QOS_CLASS_USER_INTERACTIVE
:
1047 case QOS_CLASS_UNSPECIFIED
:
1052 if (relative_priority
> 0 || relative_priority
< QOS_MIN_RELATIVE_PRIORITY
){
1058 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1059 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1060 DQA_INDEX_NON_OVERCOMMIT : \
1061 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1062 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1064 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1065 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1067 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1068 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1070 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1073 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1075 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (qos)
1077 static inline dispatch_queue_attr_t
1078 _dispatch_get_queue_attr(dispatch_qos_t qos
, int prio
,
1079 _dispatch_queue_attr_overcommit_t overcommit
,
1080 dispatch_autorelease_frequency_t frequency
,
1081 bool concurrent
, bool inactive
)
1083 return (dispatch_queue_attr_t
)&_dispatch_queue_attrs
1084 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos
)]
1085 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio
)]
1086 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit
)]
1087 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency
)]
1088 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent
)]
1089 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive
)];
1092 dispatch_queue_attr_t
1093 _dispatch_get_default_queue_attr(void)
1095 return _dispatch_get_queue_attr(DISPATCH_QOS_UNSPECIFIED
, 0,
1096 _dispatch_queue_attr_overcommit_unspecified
,
1097 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
, false, false);
1100 dispatch_queue_attr_t
1101 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa
,
1102 dispatch_qos_class_t qos_class
, int relpri
)
1104 if (!_dispatch_qos_class_valid(qos_class
, relpri
)) {
1105 return DISPATCH_BAD_INPUT
;
1107 if (!slowpath(dqa
)) {
1108 dqa
= _dispatch_get_default_queue_attr();
1109 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1110 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1112 return _dispatch_get_queue_attr(_dispatch_qos_from_qos_class(qos_class
),
1113 relpri
, dqa
->dqa_overcommit
, dqa
->dqa_autorelease_frequency
,
1114 dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1117 dispatch_queue_attr_t
1118 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa
)
1120 if (!slowpath(dqa
)) {
1121 dqa
= _dispatch_get_default_queue_attr();
1122 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1123 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1125 dispatch_priority_t pri
= dqa
->dqa_qos_and_relpri
;
1126 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri
),
1127 _dispatch_priority_relpri(pri
), dqa
->dqa_overcommit
,
1128 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
, true);
1131 dispatch_queue_attr_t
1132 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa
,
1135 if (!slowpath(dqa
)) {
1136 dqa
= _dispatch_get_default_queue_attr();
1137 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1138 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1140 dispatch_priority_t pri
= dqa
->dqa_qos_and_relpri
;
1141 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri
),
1142 _dispatch_priority_relpri(pri
), overcommit
?
1143 _dispatch_queue_attr_overcommit_enabled
:
1144 _dispatch_queue_attr_overcommit_disabled
,
1145 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
,
1149 dispatch_queue_attr_t
1150 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa
,
1151 dispatch_autorelease_frequency_t frequency
)
1153 switch (frequency
) {
1154 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
:
1155 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1156 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1159 return DISPATCH_BAD_INPUT
;
1161 if (!slowpath(dqa
)) {
1162 dqa
= _dispatch_get_default_queue_attr();
1163 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1164 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1166 dispatch_priority_t pri
= dqa
->dqa_qos_and_relpri
;
1167 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri
),
1168 _dispatch_priority_relpri(pri
), dqa
->dqa_overcommit
,
1169 frequency
, dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1173 #pragma mark dispatch_queue_t
1176 dispatch_queue_set_label_nocopy(dispatch_queue_t dq
, const char *label
)
1178 if (dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
1181 dispatch_queue_flags_t dqf
= _dispatch_queue_atomic_flags(dq
);
1182 if (unlikely(dqf
& DQF_LABEL_NEEDS_FREE
)) {
1183 DISPATCH_CLIENT_CRASH(dq
, "Cannot change label for this queue");
1185 dq
->dq_label
= label
;
1189 _dispatch_base_queue_is_wlh(dispatch_queue_t dq
, dispatch_queue_t tq
)
1196 _dispatch_queue_inherit_wlh_from_target(dispatch_queue_t dq
,
1197 dispatch_queue_t tq
)
1199 uint64_t old_state
, new_state
, role
;
1201 if (!dx_hastypeflag(tq
, QUEUE_ROOT
)) {
1202 role
= DISPATCH_QUEUE_ROLE_INNER
;
1203 } else if (_dispatch_base_queue_is_wlh(dq
, tq
)) {
1204 role
= DISPATCH_QUEUE_ROLE_BASE_WLH
;
1206 role
= DISPATCH_QUEUE_ROLE_BASE_ANON
;
1209 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
1210 new_state
= old_state
& ~DISPATCH_QUEUE_ROLE_MASK
;
1212 if (old_state
== new_state
) {
1213 os_atomic_rmw_loop_give_up(break);
1217 dispatch_wlh_t cur_wlh
= _dispatch_get_wlh();
1218 if (cur_wlh
== (dispatch_wlh_t
)dq
&& !_dq_state_is_base_wlh(new_state
)) {
1219 _dispatch_event_loop_leave_immediate(cur_wlh
, new_state
);
1221 if (!dx_hastypeflag(tq
, QUEUE_ROOT
)) {
1222 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1223 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
1225 _dispatch_queue_atomic_flags_set_and_clear(tq
, DQF_TARGETED
, DQF_LEGACY
);
1230 unsigned long volatile _dispatch_queue_serial_numbers
=
1231 DISPATCH_QUEUE_SERIAL_NUMBER_INIT
;
1234 _dispatch_queue_compute_priority_and_wlh(dispatch_queue_t dq
,
1235 dispatch_wlh_t
*wlh_out
)
1237 dispatch_priority_t p
= dq
->dq_priority
& DISPATCH_PRIORITY_REQUESTED_MASK
;
1238 dispatch_queue_t tq
= dq
->do_targetq
;
1239 dispatch_priority_t tqp
= tq
->dq_priority
&DISPATCH_PRIORITY_REQUESTED_MASK
;
1240 dispatch_wlh_t wlh
= DISPATCH_WLH_ANON
;
1242 if (_dq_state_is_base_wlh(dq
->dq_state
)) {
1243 wlh
= (dispatch_wlh_t
)dq
;
1246 while (unlikely(!dx_hastypeflag(tq
, QUEUE_ROOT
))) {
1247 if (unlikely(tq
== &_dispatch_mgr_q
)) {
1248 if (wlh_out
) *wlh_out
= DISPATCH_WLH_ANON
;
1249 return DISPATCH_PRIORITY_FLAG_MANAGER
;
1251 if (unlikely(_dispatch_queue_is_thread_bound(tq
))) {
1252 // thread-bound hierarchies are weird, we need to install
1253 // from the context of the thread this hierarchy is bound to
1254 if (wlh_out
) *wlh_out
= NULL
;
1257 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq
))) {
1258 // this queue may not be activated yet, so the queue graph may not
1259 // have stabilized yet
1260 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, dq
);
1261 if (wlh_out
) *wlh_out
= NULL
;
1265 if (_dq_state_is_base_wlh(tq
->dq_state
)) {
1266 wlh
= (dispatch_wlh_t
)tq
;
1267 } else if (unlikely(_dispatch_queue_is_legacy(tq
))) {
1268 // we're not allowed to dereference tq->do_targetq
1269 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration
, dq
);
1270 if (wlh_out
) *wlh_out
= NULL
;
1274 if (!(tq
->dq_priority
& DISPATCH_PRIORITY_FLAG_INHERIT
)) {
1275 if (p
< tqp
) p
= tqp
;
1277 tq
= tq
->do_targetq
;
1278 tqp
= tq
->dq_priority
& DISPATCH_PRIORITY_REQUESTED_MASK
;
1281 if (unlikely(!tqp
)) {
1282 // pthread root queues opt out of QoS
1283 if (wlh_out
) *wlh_out
= DISPATCH_WLH_ANON
;
1284 return DISPATCH_PRIORITY_FLAG_MANAGER
;
1286 if (wlh_out
) *wlh_out
= wlh
;
1287 return _dispatch_priority_inherit_from_root_queue(p
, tq
);
1291 static dispatch_queue_t
1292 _dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1293 dispatch_queue_t tq
, bool legacy
)
1295 if (!slowpath(dqa
)) {
1296 dqa
= _dispatch_get_default_queue_attr();
1297 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1298 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1302 // Step 1: Normalize arguments (qos, overcommit, tq)
1305 dispatch_qos_t qos
= _dispatch_priority_qos(dqa
->dqa_qos_and_relpri
);
1306 #if !HAVE_PTHREAD_WORKQUEUE_QOS
1307 if (qos
== DISPATCH_QOS_USER_INTERACTIVE
) {
1308 qos
= DISPATCH_QOS_USER_INITIATED
;
1310 if (qos
== DISPATCH_QOS_MAINTENANCE
) {
1311 qos
= DISPATCH_QOS_BACKGROUND
;
1313 #endif // !HAVE_PTHREAD_WORKQUEUE_QOS
1315 _dispatch_queue_attr_overcommit_t overcommit
= dqa
->dqa_overcommit
;
1316 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
&& tq
) {
1317 if (tq
->do_targetq
) {
1318 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify both overcommit and "
1319 "a non-global target queue");
1323 if (tq
&& !tq
->do_targetq
&&
1324 tq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
1325 // Handle discrepancies between attr and target queue, attributes win
1326 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1327 if (tq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
) {
1328 overcommit
= _dispatch_queue_attr_overcommit_enabled
;
1330 overcommit
= _dispatch_queue_attr_overcommit_disabled
;
1333 if (qos
== DISPATCH_QOS_UNSPECIFIED
) {
1334 dispatch_qos_t tq_qos
= _dispatch_priority_qos(tq
->dq_priority
);
1335 tq
= _dispatch_get_root_queue(tq_qos
,
1336 overcommit
== _dispatch_queue_attr_overcommit_enabled
);
1340 } else if (tq
&& !tq
->do_targetq
) {
1341 // target is a pthread or runloop root queue, setting QoS or overcommit
1343 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
) {
1344 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify an overcommit attribute "
1345 "and use this kind of target queue");
1347 if (qos
!= DISPATCH_QOS_UNSPECIFIED
) {
1348 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify a QoS attribute "
1349 "and use this kind of target queue");
1352 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1353 // Serial queues default to overcommit!
1354 overcommit
= dqa
->dqa_concurrent
?
1355 _dispatch_queue_attr_overcommit_disabled
:
1356 _dispatch_queue_attr_overcommit_enabled
;
1360 tq
= _dispatch_get_root_queue(
1361 qos
== DISPATCH_QOS_UNSPECIFIED
? DISPATCH_QOS_DEFAULT
: qos
,
1362 overcommit
== _dispatch_queue_attr_overcommit_enabled
);
1363 if (slowpath(!tq
)) {
1364 DISPATCH_CLIENT_CRASH(qos
, "Invalid queue attribute");
1369 // Step 2: Initialize the queue
1373 // if any of these attributes is specified, use non legacy classes
1374 if (dqa
->dqa_inactive
|| dqa
->dqa_autorelease_frequency
) {
1380 dispatch_queue_flags_t dqf
= 0;
1382 vtable
= DISPATCH_VTABLE(queue
);
1383 } else if (dqa
->dqa_concurrent
) {
1384 vtable
= DISPATCH_VTABLE(queue_concurrent
);
1386 vtable
= DISPATCH_VTABLE(queue_serial
);
1388 switch (dqa
->dqa_autorelease_frequency
) {
1389 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1390 dqf
|= DQF_AUTORELEASE_NEVER
;
1392 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1393 dqf
|= DQF_AUTORELEASE_ALWAYS
;
1400 const char *tmp
= _dispatch_strdup_if_mutable(label
);
1402 dqf
|= DQF_LABEL_NEEDS_FREE
;
1407 dispatch_queue_t dq
= _dispatch_object_alloc(vtable
,
1408 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
1409 _dispatch_queue_init(dq
, dqf
, dqa
->dqa_concurrent
?
1410 DISPATCH_QUEUE_WIDTH_MAX
: 1, DISPATCH_QUEUE_ROLE_INNER
|
1411 (dqa
->dqa_inactive
? DISPATCH_QUEUE_INACTIVE
: 0));
1413 dq
->dq_label
= label
;
1414 #if HAVE_PTHREAD_WORKQUEUE_QOS
1415 dq
->dq_priority
= dqa
->dqa_qos_and_relpri
;
1416 if (overcommit
== _dispatch_queue_attr_overcommit_enabled
) {
1417 dq
->dq_priority
|= DISPATCH_PRIORITY_FLAG_OVERCOMMIT
;
1420 _dispatch_retain(tq
);
1421 if (qos
== QOS_CLASS_UNSPECIFIED
) {
1422 // legacy way of inherithing the QoS from the target
1423 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1425 if (!dqa
->dqa_inactive
) {
1426 _dispatch_queue_inherit_wlh_from_target(dq
, tq
);
1428 dq
->do_targetq
= tq
;
1429 _dispatch_object_debug(dq
, "%s", __func__
);
1430 return _dispatch_introspection_queue_create(dq
);
1434 dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1435 dispatch_queue_t tq
)
1437 return _dispatch_queue_create_with_target(label
, dqa
, tq
, false);
1441 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
1443 return _dispatch_queue_create_with_target(label
, attr
,
1444 DISPATCH_TARGET_QUEUE_DEFAULT
, true);
1448 dispatch_queue_create_with_accounting_override_voucher(const char *label
,
1449 dispatch_queue_attr_t attr
, voucher_t voucher
)
1451 (void)label
; (void)attr
; (void)voucher
;
1452 DISPATCH_CLIENT_CRASH(0, "Unsupported interface");
1456 _dispatch_queue_destroy(dispatch_queue_t dq
, bool *allow_free
)
1458 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
1459 uint64_t initial_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(dq
->dq_width
);
1461 if (dx_hastypeflag(dq
, QUEUE_ROOT
)) {
1462 initial_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
;
1464 dq_state
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
1465 dq_state
&= ~DISPATCH_QUEUE_DIRTY
;
1466 dq_state
&= ~DISPATCH_QUEUE_ROLE_MASK
;
1467 if (slowpath(dq_state
!= initial_state
)) {
1468 if (_dq_state_drain_locked(dq_state
)) {
1469 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
1470 "Release of a locked queue");
1475 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
1476 "Release of a queue with corrupt state");
1478 if (slowpath(dq
->dq_items_tail
)) {
1479 DISPATCH_CLIENT_CRASH(dq
->dq_items_tail
,
1480 "Release of a queue while items are enqueued");
1483 // trash the queue so that use after free will crash
1484 dq
->dq_items_head
= (void *)0x200;
1485 dq
->dq_items_tail
= (void *)0x200;
1487 dispatch_queue_t dqsq
= os_atomic_xchg2o(dq
, dq_specific_q
,
1488 (void *)0x200, relaxed
);
1490 _dispatch_release(dqsq
);
1493 // fastpath for queues that never got their storage retained
1494 if (likely(os_atomic_load2o(dq
, dq_sref_cnt
, relaxed
) == 0)) {
1495 // poison the state with something that is suspended and is easy to spot
1496 dq
->dq_state
= 0xdead000000000000;
1500 // Take over freeing the memory from _dispatch_object_dealloc()
1502 // As soon as we call _dispatch_queue_release_storage(), we forfeit
1503 // the possibility for the caller of dx_dispose() to finalize the object
1504 // so that responsibility is ours.
1505 _dispatch_object_finalize(dq
);
1506 *allow_free
= false;
1507 dq
->dq_label
= "<released queue, pending free>";
1508 dq
->do_targetq
= NULL
;
1509 dq
->do_finalizer
= NULL
;
1511 return _dispatch_queue_release_storage(dq
);
1514 // 6618342 Contact the team that owns the Instrument DTrace probe before
1515 // renaming this symbol
1517 _dispatch_queue_dispose(dispatch_queue_t dq
, bool *allow_free
)
1519 _dispatch_object_debug(dq
, "%s", __func__
);
1520 _dispatch_introspection_queue_dispose(dq
);
1521 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
1522 free((void*)dq
->dq_label
);
1524 _dispatch_queue_destroy(dq
, allow_free
);
1528 _dispatch_queue_xref_dispose(dispatch_queue_t dq
)
1530 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
1531 if (unlikely(_dq_state_is_suspended(dq_state
))) {
1532 long state
= (long)dq_state
;
1533 if (sizeof(long) < sizeof(uint64_t)) state
= (long)(dq_state
>> 32);
1534 if (unlikely(_dq_state_is_inactive(dq_state
))) {
1535 // Arguments for and against this assert are within 6705399
1536 DISPATCH_CLIENT_CRASH(state
, "Release of an inactive object");
1538 DISPATCH_CLIENT_CRASH(dq_state
, "Release of a suspended object");
1540 os_atomic_or2o(dq
, dq_atomic_flags
, DQF_RELEASED
, relaxed
);
1545 _dispatch_queue_suspend_slow(dispatch_queue_t dq
)
1547 uint64_t dq_state
, value
, delta
;
1549 _dispatch_queue_sidelock_lock(dq
);
1551 // what we want to transfer (remove from dq_state)
1552 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1553 // but this is a suspend so add a suspend count at the same time
1554 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1555 if (dq
->dq_side_suspend_cnt
== 0) {
1556 // we substract delta from dq_state, and we want to set this bit
1557 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1560 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1561 // unsigned underflow of the substraction can happen because other
1562 // threads could have touched this value while we were trying to acquire
1563 // the lock, or because another thread raced us to do the same operation
1564 // and got to the lock first.
1565 if (unlikely(os_sub_overflow(dq_state
, delta
, &value
))) {
1566 os_atomic_rmw_loop_give_up(goto retry
);
1569 if (unlikely(os_add_overflow(dq
->dq_side_suspend_cnt
,
1570 DISPATCH_QUEUE_SUSPEND_HALF
, &dq
->dq_side_suspend_cnt
))) {
1571 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1573 return _dispatch_queue_sidelock_unlock(dq
);
1576 _dispatch_queue_sidelock_unlock(dq
);
1577 return dx_vtable(dq
)->do_suspend(dq
);
1581 _dispatch_queue_suspend(dispatch_queue_t dq
)
1583 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1585 uint64_t dq_state
, value
;
1587 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1588 value
= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1589 if (unlikely(os_add_overflow(dq_state
, value
, &value
))) {
1590 os_atomic_rmw_loop_give_up({
1591 return _dispatch_queue_suspend_slow(dq
);
1596 if (!_dq_state_is_suspended(dq_state
)) {
1597 // rdar://8181908 we need to extend the queue life for the duration
1598 // of the call to wakeup at _dispatch_queue_resume() time.
1599 _dispatch_retain_2(dq
);
1605 _dispatch_queue_resume_slow(dispatch_queue_t dq
)
1607 uint64_t dq_state
, value
, delta
;
1609 _dispatch_queue_sidelock_lock(dq
);
1611 // what we want to transfer
1612 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1613 // but this is a resume so consume a suspend count at the same time
1614 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1615 switch (dq
->dq_side_suspend_cnt
) {
1618 case DISPATCH_QUEUE_SUSPEND_HALF
:
1619 // we will transition the side count to 0, so we want to clear this bit
1620 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1623 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1624 // unsigned overflow of the addition can happen because other
1625 // threads could have touched this value while we were trying to acquire
1626 // the lock, or because another thread raced us to do the same operation
1627 // and got to the lock first.
1628 if (unlikely(os_add_overflow(dq_state
, delta
, &value
))) {
1629 os_atomic_rmw_loop_give_up(goto retry
);
1632 dq
->dq_side_suspend_cnt
-= DISPATCH_QUEUE_SUSPEND_HALF
;
1633 return _dispatch_queue_sidelock_unlock(dq
);
1636 _dispatch_queue_sidelock_unlock(dq
);
1637 return dx_vtable(dq
)->do_resume(dq
, false);
1642 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq
)
1644 bool allow_resume
= true;
1645 // Step 2: run the activation finalizer
1646 if (dx_vtable(dq
)->do_finalize_activation
) {
1647 dx_vtable(dq
)->do_finalize_activation(dq
, &allow_resume
);
1649 // Step 3: consume the suspend count
1651 return dx_vtable(dq
)->do_resume(dq
, false);
1656 _dispatch_queue_resume(dispatch_queue_t dq
, bool activate
)
1658 // covers all suspend and inactive bits, including side suspend bit
1659 const uint64_t suspend_bits
= DISPATCH_QUEUE_SUSPEND_BITS_MASK
;
1660 uint64_t pending_barrier_width
=
1661 (dq
->dq_width
- 1) * DISPATCH_QUEUE_WIDTH_INTERVAL
;
1662 uint64_t set_owner_and_set_full_width_and_in_barrier
=
1663 _dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT
|
1664 DISPATCH_QUEUE_IN_BARRIER
;
1666 // backward compatibility: only dispatch sources can abuse
1667 // dispatch_resume() to really mean dispatch_activate()
1668 bool is_source
= (dx_metatype(dq
) == _DISPATCH_SOURCE_TYPE
);
1669 uint64_t dq_state
, value
;
1671 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1673 // Activation is a bit tricky as it needs to finalize before the wakeup.
1675 // If after doing its updates to the suspend count and/or inactive bit,
1676 // the last suspension related bit that would remain is the
1677 // NEEDS_ACTIVATION one, then this function:
1679 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1681 // 2. runs the activation finalizer
1682 // 3. consumes the suspend count set in (1), and finishes the resume flow
1684 // Concurrently, some property setters such as setting dispatch source
1685 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1686 // before activation. These protect their action by taking a suspend count.
1687 // Step (1) above cannot happen if such a setter has locked the object.
1689 // relaxed atomic because this doesn't publish anything, this is only
1690 // about picking the thread that gets to finalize the activation
1691 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1692 if ((dq_state
& suspend_bits
) ==
1693 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1694 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1695 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1696 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1697 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1698 } else if (_dq_state_is_inactive(dq_state
)) {
1699 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1700 // simple activation because sc is not 0
1701 // resume will deal with na:1 later
1702 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
;
1704 // object already active, this is a no-op, just exit
1705 os_atomic_rmw_loop_give_up(return);
1709 // release barrier needed to publish the effect of
1710 // - dispatch_set_target_queue()
1711 // - dispatch_set_*_handler()
1712 // - do_finalize_activation()
1713 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, release
, {
1714 if ((dq_state
& suspend_bits
) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1715 + DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1716 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1717 value
= dq_state
- DISPATCH_QUEUE_NEEDS_ACTIVATION
;
1718 } else if (is_source
&& (dq_state
& suspend_bits
) ==
1719 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1720 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1721 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1722 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1723 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1724 } else if (unlikely(os_sub_overflow(dq_state
,
1725 DISPATCH_QUEUE_SUSPEND_INTERVAL
, &value
))) {
1726 // underflow means over-resume or a suspend count transfer
1727 // to the side count is needed
1728 os_atomic_rmw_loop_give_up({
1729 if (!(dq_state
& DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
)) {
1732 return _dispatch_queue_resume_slow(dq
);
1735 // below this, value = dq_state - DISPATCH_QUEUE_SUSPEND_INTERVAL
1737 } else if (!_dq_state_is_runnable(value
)) {
1738 // Out of width or still suspended.
1739 // For the former, force _dispatch_queue_non_barrier_complete
1740 // to reconsider whether it has work to do
1741 value
|= DISPATCH_QUEUE_DIRTY
;
1742 } else if (_dq_state_drain_locked(value
)) {
1743 // still locked by someone else, make drain_try_unlock() fail
1744 // and reconsider whether it has work to do
1745 value
|= DISPATCH_QUEUE_DIRTY
;
1746 } else if (!is_source
&& (_dq_state_has_pending_barrier(value
) ||
1747 value
+ pending_barrier_width
<
1748 DISPATCH_QUEUE_WIDTH_FULL_BIT
)) {
1749 // if we can, acquire the full width drain lock
1750 // and then perform a lock transfer
1752 // However this is never useful for a source where there are no
1753 // sync waiters, so never take the lock and do a plain wakeup
1754 value
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
1755 value
|= set_owner_and_set_full_width_and_in_barrier
;
1757 // clear overrides and force a wakeup
1758 value
&= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK
;
1759 value
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
1764 if ((dq_state
^ value
) & DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1765 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1766 return _dispatch_queue_resume_finalize_activation(dq
);
1770 // if we're still in an activate codepath here we should have
1771 // { sc:>0 na:1 }, if not we've got a corrupt state
1772 if (unlikely(!_dq_state_is_suspended(value
))) {
1773 DISPATCH_CLIENT_CRASH(dq
, "Invalid suspension state");
1778 if (_dq_state_is_suspended(value
)) {
1782 if (_dq_state_is_dirty(dq_state
)) {
1783 // <rdar://problem/14637483>
1784 // dependency ordering for dq state changes that were flushed
1785 // and not acted upon
1786 os_atomic_thread_fence(dependency
);
1787 dq
= os_atomic_force_dependency_on(dq
, dq_state
);
1789 // Balancing the retain_2 done in suspend() for rdar://8181908
1790 dispatch_wakeup_flags_t flags
= DISPATCH_WAKEUP_CONSUME_2
;
1791 if ((dq_state
^ value
) & DISPATCH_QUEUE_IN_BARRIER
) {
1792 flags
|= DISPATCH_WAKEUP_BARRIER_COMPLETE
;
1793 } else if (!_dq_state_is_runnable(value
)) {
1794 if (_dq_state_is_base_wlh(dq_state
)) {
1795 _dispatch_event_loop_assert_not_owned((dispatch_wlh_t
)dq
);
1797 return _dispatch_release_2(dq
);
1799 dispatch_assert(!_dq_state_received_sync_wait(dq_state
));
1800 dispatch_assert(!_dq_state_in_sync_transfer(dq_state
));
1801 return dx_wakeup(dq
, _dq_state_max_qos(dq_state
), flags
);
1804 if (unlikely(_dq_state_is_inactive(dq_state
))) {
1805 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an inactive object");
1807 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an object");
1811 dispatch_queue_get_label(dispatch_queue_t dq
)
1813 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
1814 dq
= _dispatch_get_current_queue();
1816 return dq
->dq_label
? dq
->dq_label
: "";
1820 dispatch_queue_get_qos_class(dispatch_queue_t dq
, int *relpri_ptr
)
1822 dispatch_qos_class_t qos
= _dispatch_priority_qos(dq
->dq_priority
);
1824 *relpri_ptr
= qos
? _dispatch_priority_relpri(dq
->dq_priority
) : 0;
1826 return _dispatch_qos_to_qos_class(qos
);
1830 _dispatch_queue_set_width2(void *ctxt
)
1832 int w
= (int)(intptr_t)ctxt
; // intentional truncation
1834 dispatch_queue_t dq
= _dispatch_queue_get_current();
1837 tmp
= w
? (unsigned int)w
: 1;
1839 dispatch_qos_t qos
= _dispatch_qos_from_pp(_dispatch_get_priority());
1841 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
1842 tmp
= _dispatch_qos_max_parallelism(qos
,
1843 DISPATCH_MAX_PARALLELISM_PHYSICAL
);
1845 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
1846 tmp
= _dispatch_qos_max_parallelism(qos
,
1847 DISPATCH_MAX_PARALLELISM_ACTIVE
);
1849 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
1851 tmp
= _dispatch_qos_max_parallelism(qos
, 0);
1855 if (tmp
> DISPATCH_QUEUE_WIDTH_MAX
) {
1856 tmp
= DISPATCH_QUEUE_WIDTH_MAX
;
1859 dispatch_queue_flags_t old_dqf
, new_dqf
;
1860 os_atomic_rmw_loop2o(dq
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1861 new_dqf
= (old_dqf
& DQF_FLAGS_MASK
) | DQF_WIDTH(tmp
);
1863 _dispatch_queue_inherit_wlh_from_target(dq
, dq
->do_targetq
);
1864 _dispatch_object_debug(dq
, "%s", __func__
);
1868 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
1870 if (unlikely(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
||
1871 dx_hastypeflag(dq
, QUEUE_ROOT
) ||
1872 dx_hastypeflag(dq
, QUEUE_BASE
))) {
1876 unsigned long type
= dx_type(dq
);
1878 case DISPATCH_QUEUE_LEGACY_TYPE
:
1879 case DISPATCH_QUEUE_CONCURRENT_TYPE
:
1881 case DISPATCH_QUEUE_SERIAL_TYPE
:
1882 DISPATCH_CLIENT_CRASH(type
, "Cannot set width of a serial queue");
1884 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1887 if (likely((int)width
>= 0)) {
1888 _dispatch_barrier_trysync_or_async_f(dq
, (void*)(intptr_t)width
,
1889 _dispatch_queue_set_width2
, DISPATCH_BARRIER_TRYSYNC_SUSPEND
);
1891 // The negative width constants need to execute on the queue to
1892 // query the queue QoS
1893 _dispatch_barrier_async_detached_f(dq
, (void*)(intptr_t)width
,
1894 _dispatch_queue_set_width2
);
1899 _dispatch_queue_legacy_set_target_queue(void *ctxt
)
1901 dispatch_queue_t dq
= _dispatch_queue_get_current();
1902 dispatch_queue_t tq
= ctxt
;
1903 dispatch_queue_t otq
= dq
->do_targetq
;
1905 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1906 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1907 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget
, dq
, otq
, tq
);
1908 _dispatch_bug_deprecated("Changing the target of a queue "
1909 "already targeted by other dispatch objects");
1911 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1912 "already targeted by other dispatch objects");
1916 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1917 _dispatch_queue_inherit_wlh_from_target(dq
, tq
);
1918 #if HAVE_PTHREAD_WORKQUEUE_QOS
1919 // see _dispatch_queue_class_wakeup()
1920 _dispatch_queue_sidelock_lock(dq
);
1922 dq
->do_targetq
= tq
;
1923 #if HAVE_PTHREAD_WORKQUEUE_QOS
1924 // see _dispatch_queue_class_wakeup()
1925 _dispatch_queue_sidelock_unlock(dq
);
1928 _dispatch_object_debug(dq
, "%s", __func__
);
1929 _dispatch_introspection_target_queue_changed(dq
);
1930 _dispatch_release_tailcall(otq
);
1934 _dispatch_queue_set_target_queue(dispatch_queue_t dq
, dispatch_queue_t tq
)
1936 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
&&
1939 if (unlikely(!tq
)) {
1940 bool is_concurrent_q
= (dq
->dq_width
> 1);
1941 tq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, !is_concurrent_q
);
1944 if (_dispatch_queue_try_inactive_suspend(dq
)) {
1945 _dispatch_object_set_target_queue_inline(dq
, tq
);
1946 return dx_vtable(dq
)->do_resume(dq
, false);
1949 #if !DISPATCH_ALLOW_NON_LEAF_RETARGET
1950 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1951 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1952 "already targeted by other dispatch objects");
1956 if (unlikely(!_dispatch_queue_is_legacy(dq
))) {
1957 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1958 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1959 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1960 "already targeted by other dispatch objects");
1963 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of this object "
1964 "after it has been activated");
1967 unsigned long type
= dx_type(dq
);
1969 case DISPATCH_QUEUE_LEGACY_TYPE
:
1970 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1971 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1972 _dispatch_bug_deprecated("Changing the target of a queue "
1973 "already targeted by other dispatch objects");
1977 case DISPATCH_SOURCE_KEVENT_TYPE
:
1978 case DISPATCH_MACH_CHANNEL_TYPE
:
1979 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget
, dq
);
1980 _dispatch_bug_deprecated("Changing the target of a source "
1981 "after it has been activated");
1984 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1987 _dispatch_retain(tq
);
1988 return _dispatch_barrier_trysync_or_async_f(dq
, tq
,
1989 _dispatch_queue_legacy_set_target_queue
,
1990 DISPATCH_BARRIER_TRYSYNC_SUSPEND
);
1994 #pragma mark dispatch_mgr_queue
1996 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1997 static struct dispatch_pthread_root_queue_context_s
1998 _dispatch_mgr_root_queue_pthread_context
;
1999 static struct dispatch_root_queue_context_s
2000 _dispatch_mgr_root_queue_context
= {{{
2001 #if DISPATCH_USE_WORKQUEUES
2002 .dgq_kworkqueue
= (void*)(~0ul),
2004 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
2005 .dgq_thread_pool_size
= 1,
2008 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
2009 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root
),
2010 .dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
,
2011 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
2012 .dq_label
= "com.apple.root.libdispatch-manager",
2013 .dq_atomic_flags
= DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL
),
2014 .dq_priority
= DISPATCH_PRIORITY_FLAG_MANAGER
|
2015 DISPATCH_PRIORITY_SATURATED_OVERRIDE
,
2018 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2020 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
2023 volatile qos_class_t qos
;
2027 } _dispatch_mgr_sched
;
2029 static dispatch_once_t _dispatch_mgr_sched_pred
;
2031 #if HAVE_PTHREAD_WORKQUEUE_QOS
2032 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
2033 // Must be kept in sync with list of qos classes in sys/qos.h
2034 static const int _dispatch_mgr_sched_qos2prio
[] = {
2035 [QOS_CLASS_MAINTENANCE
] = 4,
2036 [QOS_CLASS_BACKGROUND
] = 4,
2037 [QOS_CLASS_UTILITY
] = 20,
2038 [QOS_CLASS_DEFAULT
] = 31,
2039 [QOS_CLASS_USER_INITIATED
] = 37,
2040 [QOS_CLASS_USER_INTERACTIVE
] = 47,
2042 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
2045 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
2047 struct sched_param param
;
2048 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2049 pthread_attr_t
*attr
;
2050 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
2052 pthread_attr_t a
, *attr
= &a
;
2054 (void)dispatch_assume_zero(pthread_attr_init(attr
));
2055 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
2056 &_dispatch_mgr_sched
.policy
));
2057 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2058 #if HAVE_PTHREAD_WORKQUEUE_QOS
2059 qos_class_t qos
= qos_class_main();
2060 if (qos
== QOS_CLASS_DEFAULT
) {
2061 qos
= QOS_CLASS_USER_INITIATED
; // rdar://problem/17279292
2064 _dispatch_mgr_sched
.qos
= qos
;
2065 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
2068 _dispatch_mgr_sched
.default_prio
= param
.sched_priority
;
2069 _dispatch_mgr_sched
.prio
= _dispatch_mgr_sched
.default_prio
;
2071 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
2073 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2076 _dispatch_mgr_root_queue_init(void)
2078 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2079 struct sched_param param
;
2080 pthread_attr_t
*attr
;
2081 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
2082 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
2083 PTHREAD_CREATE_DETACHED
));
2085 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
2087 #if HAVE_PTHREAD_WORKQUEUE_QOS
2088 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
2090 if (_dispatch_set_qos_class_enabled
) {
2091 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr
,
2096 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
2097 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
2098 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
2100 return &_dispatch_mgr_sched
.tid
;
2104 _dispatch_mgr_priority_apply(void)
2106 struct sched_param param
;
2108 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
2109 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
2110 (void)dispatch_assume_zero(pthread_setschedparam(
2111 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
,
2114 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
2119 _dispatch_mgr_priority_init(void)
2121 struct sched_param param
;
2122 pthread_attr_t
*attr
;
2123 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
2124 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2125 #if HAVE_PTHREAD_WORKQUEUE_QOS
2126 qos_class_t qos
= 0;
2127 (void)pthread_attr_get_qos_class_np(attr
, &qos
, NULL
);
2128 if (_dispatch_mgr_sched
.qos
> qos
&& _dispatch_set_qos_class_enabled
) {
2129 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched
.qos
, 0);
2130 int p
= _dispatch_mgr_sched_qos2prio
[_dispatch_mgr_sched
.qos
];
2131 if (p
> param
.sched_priority
) {
2132 param
.sched_priority
= p
;
2136 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
2137 return _dispatch_mgr_priority_apply();
2140 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2142 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2145 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
2147 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2148 struct sched_param param
;
2149 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2150 #if HAVE_PTHREAD_WORKQUEUE_QOS
2151 qos_class_t q
, qos
= 0;
2152 (void)pthread_attr_get_qos_class_np((pthread_attr_t
*)attr
, &qos
, NULL
);
2154 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
2155 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, qos
, q
, qos
, relaxed
, {
2156 if (q
>= qos
) os_atomic_rmw_loop_give_up(break);
2160 int p
, prio
= param
.sched_priority
;
2161 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, prio
, p
, prio
, relaxed
, {
2162 if (p
>= prio
) os_atomic_rmw_loop_give_up(return);
2164 #if DISPATCH_USE_KEVENT_WORKQUEUE
2165 _dispatch_root_queues_init();
2166 if (_dispatch_kevent_workqueue_enabled
) {
2167 pthread_priority_t pp
= 0;
2168 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2169 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2170 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2171 // problematic in this case, since it the second one is only ever
2172 // used on dq_priority fields.
2173 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2174 // it is meaningful to libdispatch only.
2175 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2177 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2180 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2181 (void)dispatch_assume_zero(r
);
2186 #if DISPATCH_USE_MGR_THREAD
2187 if (_dispatch_mgr_sched
.tid
) {
2188 return _dispatch_mgr_priority_apply();
2192 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2194 #if DISPATCH_USE_KEVENT_WORKQUEUE
2196 _dispatch_kevent_workqueue_init(void)
2198 // Initialize kevent workqueue support
2199 _dispatch_root_queues_init();
2200 if (!_dispatch_kevent_workqueue_enabled
) return;
2201 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2202 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
2203 int prio
= _dispatch_mgr_sched
.prio
;
2204 pthread_priority_t pp
= 0;
2206 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2208 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2209 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2212 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2213 (void)dispatch_assume_zero(r
);
2216 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2219 #pragma mark dispatch_pthread_root_queue
2221 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2222 static dispatch_queue_t
2223 _dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2224 const pthread_attr_t
*attr
, dispatch_block_t configure
,
2225 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
)
2227 dispatch_queue_t dq
;
2228 dispatch_root_queue_context_t qc
;
2229 dispatch_pthread_root_queue_context_t pqc
;
2230 dispatch_queue_flags_t dqf
= 0;
2232 int32_t pool_size
= flags
& _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
?
2233 (int8_t)(flags
& ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
) : 0;
2235 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
2236 dqs
= roundup(dqs
, _Alignof(struct dispatch_root_queue_context_s
));
2237 dq
= _dispatch_object_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
2238 sizeof(struct dispatch_root_queue_context_s
) +
2239 sizeof(struct dispatch_pthread_root_queue_context_s
));
2240 qc
= (void*)dq
+ dqs
;
2241 dispatch_assert((uintptr_t)qc
% _Alignof(typeof(*qc
)) == 0);
2242 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
2243 dispatch_assert((uintptr_t)pqc
% _Alignof(typeof(*pqc
)) == 0);
2245 const char *tmp
= _dispatch_strdup_if_mutable(label
);
2247 dqf
|= DQF_LABEL_NEEDS_FREE
;
2252 _dispatch_queue_init(dq
, dqf
, DISPATCH_QUEUE_WIDTH_POOL
, 0);
2253 dq
->dq_label
= label
;
2254 dq
->dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
;
2256 dq
->dq_priority
= DISPATCH_PRIORITY_SATURATED_OVERRIDE
;
2258 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
2260 #if DISPATCH_USE_WORKQUEUES
2261 qc
->dgq_kworkqueue
= (void*)(~0ul);
2263 _dispatch_root_queue_init_pthread_pool(qc
, pool_size
, true);
2266 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
2267 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
2269 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
2271 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2272 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
2274 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
2276 if (observer_hooks
) {
2277 pqc
->dpq_observer_hooks
= *observer_hooks
;
2279 _dispatch_object_debug(dq
, "%s", __func__
);
2280 return _dispatch_introspection_queue_create(dq
);
2284 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2285 const pthread_attr_t
*attr
, dispatch_block_t configure
)
2287 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2291 #if DISPATCH_IOHID_SPI
2293 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label
,
2294 unsigned long flags
, const pthread_attr_t
*attr
,
2295 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
,
2296 dispatch_block_t configure
)
2298 if (!observer_hooks
->queue_will_execute
||
2299 !observer_hooks
->queue_did_execute
) {
2300 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2302 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2308 dispatch_pthread_root_queue_copy_current(void)
2310 dispatch_queue_t dq
= _dispatch_queue_get_current();
2311 if (!dq
) return NULL
;
2312 while (unlikely(dq
->do_targetq
)) {
2313 dq
= dq
->do_targetq
;
2315 if (dx_type(dq
) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
||
2316 dq
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
2319 return (dispatch_queue_t
)_os_object_retain_with_resurrect(dq
->_as_os_obj
);
2322 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2325 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
, bool *allow_free
)
2327 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
2328 DISPATCH_INTERNAL_CRASH(dq
, "Global root queue disposed");
2330 _dispatch_object_debug(dq
, "%s", __func__
);
2331 _dispatch_introspection_queue_dispose(dq
);
2332 #if DISPATCH_USE_PTHREAD_POOL
2333 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2334 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2336 pthread_attr_destroy(&pqc
->dpq_thread_attr
);
2337 _dispatch_semaphore_dispose(&pqc
->dpq_thread_mediator
, NULL
);
2338 if (pqc
->dpq_thread_configure
) {
2339 Block_release(pqc
->dpq_thread_configure
);
2341 dq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false);
2343 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
2344 free((void*)dq
->dq_label
);
2346 _dispatch_queue_destroy(dq
, allow_free
);
2350 #pragma mark dispatch_queue_specific
2352 struct dispatch_queue_specific_queue_s
{
2353 DISPATCH_QUEUE_HEADER(queue_specific_queue
);
2354 TAILQ_HEAD(dispatch_queue_specific_head_s
,
2355 dispatch_queue_specific_s
) dqsq_contexts
;
2356 } DISPATCH_ATOMIC64_ALIGN
;
2358 struct dispatch_queue_specific_s
{
2359 const void *dqs_key
;
2361 dispatch_function_t dqs_destructor
;
2362 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
2364 DISPATCH_DECL(dispatch_queue_specific
);
2367 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
,
2370 dispatch_queue_specific_t dqs
, tmp
;
2371 dispatch_queue_t rq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false);
2373 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
2374 if (dqs
->dqs_destructor
) {
2375 dispatch_async_f(rq
, dqs
->dqs_ctxt
, dqs
->dqs_destructor
);
2379 _dispatch_queue_destroy(dqsq
->_as_dq
, allow_free
);
2383 _dispatch_queue_init_specific(dispatch_queue_t dq
)
2385 dispatch_queue_specific_queue_t dqsq
;
2387 dqsq
= _dispatch_object_alloc(DISPATCH_VTABLE(queue_specific_queue
),
2388 sizeof(struct dispatch_queue_specific_queue_s
));
2389 _dispatch_queue_init(dqsq
->_as_dq
, DQF_NONE
, DISPATCH_QUEUE_WIDTH_MAX
,
2390 DISPATCH_QUEUE_ROLE_BASE_ANON
);
2391 dqsq
->do_xref_cnt
= -1;
2392 dqsq
->do_targetq
= _dispatch_get_root_queue(
2393 DISPATCH_QOS_USER_INITIATED
, true);
2394 dqsq
->dq_label
= "queue-specific";
2395 TAILQ_INIT(&dqsq
->dqsq_contexts
);
2396 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
2397 dqsq
->_as_dq
, release
))) {
2398 _dispatch_release(dqsq
->_as_dq
);
2403 _dispatch_queue_set_specific(void *ctxt
)
2405 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
2406 dispatch_queue_specific_queue_t dqsq
=
2407 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2409 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2410 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
2411 // Destroy previous context for existing key
2412 if (dqs
->dqs_destructor
) {
2413 dispatch_async_f(_dispatch_get_root_queue(
2414 DISPATCH_QOS_DEFAULT
, false), dqs
->dqs_ctxt
,
2415 dqs
->dqs_destructor
);
2417 if (dqsn
->dqs_ctxt
) {
2418 // Copy new context for existing key
2419 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
2420 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
2422 // Remove context storage for existing key
2423 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
2429 // Insert context storage for new key
2430 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
2435 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
2436 void *ctxt
, dispatch_function_t destructor
)
2438 if (slowpath(!key
)) {
2441 dispatch_queue_specific_t dqs
;
2443 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
2445 dqs
->dqs_ctxt
= ctxt
;
2446 dqs
->dqs_destructor
= destructor
;
2447 if (slowpath(!dq
->dq_specific_q
)) {
2448 _dispatch_queue_init_specific(dq
);
2450 _dispatch_barrier_trysync_or_async_f(dq
->dq_specific_q
, dqs
,
2451 _dispatch_queue_set_specific
, 0);
2455 _dispatch_queue_get_specific(void *ctxt
)
2457 void **ctxtp
= ctxt
;
2459 dispatch_queue_specific_queue_t dqsq
=
2460 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2461 dispatch_queue_specific_t dqs
;
2463 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2464 if (dqs
->dqs_key
== key
) {
2465 *ctxtp
= dqs
->dqs_ctxt
;
2472 DISPATCH_ALWAYS_INLINE
2473 static inline void *
2474 _dispatch_queue_get_specific_inline(dispatch_queue_t dq
, const void *key
)
2477 if (fastpath(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
&& dq
->dq_specific_q
)){
2479 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
2486 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
2488 if (slowpath(!key
)) {
2491 return _dispatch_queue_get_specific_inline(dq
, key
);
2496 dispatch_get_specific(const void *key
)
2498 if (slowpath(!key
)) {
2502 dispatch_queue_t dq
= _dispatch_queue_get_current();
2504 while (slowpath(dq
)) {
2505 ctxt
= _dispatch_queue_get_specific_inline(dq
, key
);
2507 dq
= dq
->do_targetq
;
2512 #if DISPATCH_IOHID_SPI
2514 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2515 dispatch_queue_t dq
) // rdar://problem/18033810
2517 if (dq
->dq_width
!= 1) {
2518 DISPATCH_CLIENT_CRASH(dq
->dq_width
, "Invalid queue type");
2520 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2521 return _dq_state_drain_locked_by_self(dq_state
);
2526 #pragma mark dispatch_queue_debug
2529 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2532 dispatch_queue_t target
= dq
->do_targetq
;
2533 const char *tlabel
= target
&& target
->dq_label
? target
->dq_label
: "";
2534 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2536 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "sref = %d, "
2537 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2538 dq
->dq_sref_cnt
+ 1, tlabel
, target
, dq
->dq_width
,
2539 (unsigned long long)dq_state
);
2540 if (_dq_state_is_suspended(dq_state
)) {
2541 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", suspended = %d",
2542 _dq_state_suspend_cnt(dq_state
));
2544 if (_dq_state_is_inactive(dq_state
)) {
2545 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", inactive");
2546 } else if (_dq_state_needs_activation(dq_state
)) {
2547 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", needs-activation");
2549 if (_dq_state_is_enqueued(dq_state
)) {
2550 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", enqueued");
2552 if (_dq_state_is_dirty(dq_state
)) {
2553 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", dirty");
2555 dispatch_qos_t qos
= _dq_state_max_qos(dq_state
);
2557 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", max qos %d", qos
);
2559 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
2560 if (!_dispatch_queue_is_thread_bound(dq
) && owner
) {
2561 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", draining on 0x%x",
2564 if (_dq_state_is_in_barrier(dq_state
)) {
2565 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-barrier");
2567 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-flight = %d",
2568 _dq_state_used_width(dq_state
, dq
->dq_width
));
2570 if (_dq_state_has_pending_barrier(dq_state
)) {
2571 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", pending-barrier");
2573 if (_dispatch_queue_is_thread_bound(dq
)) {
2574 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", thread = 0x%x ",
2581 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2584 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2585 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
2586 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2587 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2588 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2594 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
2596 _dispatch_object_debug(dq
, "%s", str
);
2598 _dispatch_log("queue[NULL]: %s", str
);
2603 #if DISPATCH_PERF_MON
2605 #define DISPATCH_PERF_MON_BUCKETS 8
2608 uint64_t volatile time_total
;
2609 uint64_t volatile count_total
;
2610 uint64_t volatile thread_total
;
2611 } _dispatch_stats
[DISPATCH_PERF_MON_BUCKETS
];
2612 DISPATCH_USED
static size_t _dispatch_stat_buckets
= DISPATCH_PERF_MON_BUCKETS
;
2615 _dispatch_queue_merge_stats(uint64_t start
, bool trace
, perfmon_thread_type type
)
2617 uint64_t delta
= _dispatch_absolute_time() - start
;
2618 unsigned long count
;
2620 count
= (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key
);
2621 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
2624 if (trace
) _dispatch_ktrace1(DISPATCH_PERF_MON_worker_useless
, type
);
2626 bucket
= MIN(DISPATCH_PERF_MON_BUCKETS
- 1,
2627 (int)sizeof(count
) * CHAR_BIT
- __builtin_clzl(count
));
2628 os_atomic_add(&_dispatch_stats
[bucket
].count_total
, count
, relaxed
);
2630 os_atomic_add(&_dispatch_stats
[bucket
].time_total
, delta
, relaxed
);
2631 os_atomic_inc(&_dispatch_stats
[bucket
].thread_total
, relaxed
);
2633 _dispatch_ktrace3(DISPATCH_PERF_MON_worker_thread_end
, count
, delta
, type
);
2640 #pragma mark _dispatch_set_priority_and_mach_voucher
2641 #if HAVE_PTHREAD_WORKQUEUE_QOS
2645 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp
,
2648 _pthread_set_flags_t pflags
= 0;
2649 if (pp
&& _dispatch_set_qos_class_enabled
) {
2650 pthread_priority_t old_pri
= _dispatch_get_priority();
2651 if (pp
!= old_pri
) {
2652 if (old_pri
& _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
) {
2653 pflags
|= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND
;
2654 // when we unbind, overcomitness can flip, so we need to learn
2655 // it from the defaultpri, see _dispatch_priority_compute_update
2656 pp
|= (_dispatch_get_basepri() &
2657 DISPATCH_PRIORITY_FLAG_OVERCOMMIT
);
2659 // else we need to keep the one that is set in the current pri
2660 pp
|= (old_pri
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
2662 if (likely(old_pri
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2663 pflags
|= _PTHREAD_SET_SELF_QOS_FLAG
;
2665 uint64_t mgr_dq_state
=
2666 os_atomic_load2o(&_dispatch_mgr_q
, dq_state
, relaxed
);
2667 if (unlikely(_dq_state_drain_locked_by_self(mgr_dq_state
))) {
2668 DISPATCH_INTERNAL_CRASH(pp
,
2669 "Changing the QoS while on the manager queue");
2671 if (unlikely(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2672 DISPATCH_INTERNAL_CRASH(pp
, "Cannot raise oneself to manager");
2674 if (old_pri
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
) {
2675 DISPATCH_INTERNAL_CRASH(old_pri
,
2676 "Cannot turn a manager thread into a normal one");
2680 if (kv
!= VOUCHER_NO_MACH_VOUCHER
) {
2681 #if VOUCHER_USE_MACH_VOUCHER
2682 pflags
|= _PTHREAD_SET_SELF_VOUCHER_FLAG
;
2685 if (!pflags
) return;
2686 int r
= _pthread_set_properties_self(pflags
, pp
, kv
);
2688 DISPATCH_INTERNAL_CRASH(pp
, "_pthread_set_properties_self failed");
2690 (void)dispatch_assume_zero(r
);
2695 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority
,
2696 voucher_t v
, dispatch_thread_set_self_t flags
)
2698 voucher_t ov
= DISPATCH_NO_VOUCHER
;
2699 mach_voucher_t kv
= VOUCHER_NO_MACH_VOUCHER
;
2700 if (v
!= DISPATCH_NO_VOUCHER
) {
2701 bool retained
= flags
& DISPATCH_VOUCHER_CONSUME
;
2702 ov
= _voucher_get();
2703 if (ov
== v
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2704 if (retained
&& v
) _voucher_release_no_dispose(v
);
2705 ov
= DISPATCH_NO_VOUCHER
;
2707 if (!retained
&& v
) _voucher_retain(v
);
2708 kv
= _voucher_swap_and_get_mach_voucher(ov
, v
);
2711 if (!(flags
& DISPATCH_THREAD_PARK
)) {
2712 _dispatch_set_priority_and_mach_voucher_slow(priority
, kv
);
2714 if (ov
!= DISPATCH_NO_VOUCHER
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2715 if (ov
) _voucher_release(ov
);
2716 ov
= DISPATCH_NO_VOUCHER
;
2722 #pragma mark dispatch_continuation_t
2724 const struct dispatch_continuation_vtable_s _dispatch_continuation_vtables
[] = {
2725 DC_VTABLE_ENTRY(ASYNC_REDIRECT
,
2726 .do_kind
= "dc-redirect",
2727 .do_invoke
= _dispatch_async_redirect_invoke
),
2729 DC_VTABLE_ENTRY(MACH_SEND_BARRRIER_DRAIN
,
2730 .do_kind
= "dc-mach-send-drain",
2731 .do_invoke
= _dispatch_mach_send_barrier_drain_invoke
),
2732 DC_VTABLE_ENTRY(MACH_SEND_BARRIER
,
2733 .do_kind
= "dc-mach-send-barrier",
2734 .do_invoke
= _dispatch_mach_barrier_invoke
),
2735 DC_VTABLE_ENTRY(MACH_RECV_BARRIER
,
2736 .do_kind
= "dc-mach-recv-barrier",
2737 .do_invoke
= _dispatch_mach_barrier_invoke
),
2738 DC_VTABLE_ENTRY(MACH_ASYNC_REPLY
,
2739 .do_kind
= "dc-mach-async-reply",
2740 .do_invoke
= _dispatch_mach_msg_async_reply_invoke
),
2742 #if HAVE_PTHREAD_WORKQUEUE_QOS
2743 DC_VTABLE_ENTRY(OVERRIDE_STEALING
,
2744 .do_kind
= "dc-override-stealing",
2745 .do_invoke
= _dispatch_queue_override_invoke
),
2746 DC_VTABLE_ENTRY(OVERRIDE_OWNING
,
2747 .do_kind
= "dc-override-owning",
2748 .do_invoke
= _dispatch_queue_override_invoke
),
2753 _dispatch_force_cache_cleanup(void)
2755 dispatch_continuation_t dc
;
2756 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2758 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
2759 _dispatch_cache_cleanup(dc
);
2765 _dispatch_cache_cleanup(void *value
)
2767 dispatch_continuation_t dc
, next_dc
= value
;
2769 while ((dc
= next_dc
)) {
2770 next_dc
= dc
->do_next
;
2771 _dispatch_continuation_free_to_heap(dc
);
2775 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2778 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
2780 _dispatch_continuation_free_to_heap(dc
);
2781 dispatch_continuation_t next_dc
;
2782 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2784 if (!dc
|| (cnt
= dc
->dc_cache_cnt
-
2785 _dispatch_continuation_cache_limit
) <= 0) {
2789 next_dc
= dc
->do_next
;
2790 _dispatch_continuation_free_to_heap(dc
);
2791 } while (--cnt
&& (dc
= next_dc
));
2792 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
2798 _dispatch_continuation_push(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2800 dx_push(dq
, dc
, _dispatch_continuation_override_qos(dq
, dc
));
2803 DISPATCH_ALWAYS_INLINE
2805 _dispatch_continuation_async2(dispatch_queue_t dq
, dispatch_continuation_t dc
,
2808 if (fastpath(barrier
|| !DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
2809 return _dispatch_continuation_push(dq
, dc
);
2811 return _dispatch_async_f2(dq
, dc
);
2816 _dispatch_continuation_async(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2818 _dispatch_continuation_async2(dq
, dc
,
2819 dc
->dc_flags
& DISPATCH_OBJ_BARRIER_BIT
);
2823 #pragma mark dispatch_block_create
2827 DISPATCH_ALWAYS_INLINE
2829 _dispatch_block_flags_valid(dispatch_block_flags_t flags
)
2831 return ((flags
& ~DISPATCH_BLOCK_API_MASK
) == 0);
2834 DISPATCH_ALWAYS_INLINE
2835 static inline dispatch_block_flags_t
2836 _dispatch_block_normalize_flags(dispatch_block_flags_t flags
)
2838 if (flags
& (DISPATCH_BLOCK_NO_VOUCHER
|DISPATCH_BLOCK_DETACHED
)) {
2839 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2841 if (flags
& (DISPATCH_BLOCK_NO_QOS_CLASS
|DISPATCH_BLOCK_DETACHED
)) {
2842 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2847 static inline dispatch_block_t
2848 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags
,
2849 voucher_t voucher
, pthread_priority_t pri
, dispatch_block_t block
)
2851 flags
= _dispatch_block_normalize_flags(flags
);
2852 bool assign
= (flags
& DISPATCH_BLOCK_ASSIGN_CURRENT
);
2854 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_VOUCHER
)) {
2855 #if OS_VOUCHER_ACTIVITY_SPI
2856 voucher
= VOUCHER_CURRENT
;
2858 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2860 #if OS_VOUCHER_ACTIVITY_SPI
2861 if (voucher
== VOUCHER_CURRENT
) {
2862 voucher
= _voucher_get();
2865 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_PRIORITY
)) {
2866 pri
= _dispatch_priority_propagate();
2867 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2869 dispatch_block_t db
= _dispatch_block_create(flags
, voucher
, pri
, block
);
2871 dispatch_assert(_dispatch_block_get_data(db
));
2877 dispatch_block_create(dispatch_block_flags_t flags
, dispatch_block_t block
)
2879 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2880 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
, 0,
2885 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags
,
2886 dispatch_qos_class_t qos_class
, int relative_priority
,
2887 dispatch_block_t block
)
2889 if (!_dispatch_block_flags_valid(flags
) ||
2890 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2891 return DISPATCH_BAD_INPUT
;
2893 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2894 pthread_priority_t pri
= 0;
2895 #if HAVE_PTHREAD_WORKQUEUE_QOS
2896 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2898 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
,
2903 dispatch_block_create_with_voucher(dispatch_block_flags_t flags
,
2904 voucher_t voucher
, dispatch_block_t block
)
2906 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2907 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2908 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
, 0,
2913 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags
,
2914 voucher_t voucher
, dispatch_qos_class_t qos_class
,
2915 int relative_priority
, dispatch_block_t block
)
2917 if (!_dispatch_block_flags_valid(flags
) ||
2918 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2919 return DISPATCH_BAD_INPUT
;
2921 flags
|= (DISPATCH_BLOCK_HAS_VOUCHER
|DISPATCH_BLOCK_HAS_PRIORITY
);
2922 pthread_priority_t pri
= 0;
2923 #if HAVE_PTHREAD_WORKQUEUE_QOS
2924 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2926 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
,
2931 dispatch_block_perform(dispatch_block_flags_t flags
, dispatch_block_t block
)
2933 if (!_dispatch_block_flags_valid(flags
)) {
2934 DISPATCH_CLIENT_CRASH(flags
, "Invalid flags passed to "
2935 "dispatch_block_perform()");
2937 flags
= _dispatch_block_normalize_flags(flags
);
2938 struct dispatch_block_private_data_s dbpds
=
2939 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags
, block
);
2940 return _dispatch_block_invoke_direct(&dbpds
);
2943 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2946 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s
*dbcpd
)
2948 dispatch_block_private_data_t dbpd
= (dispatch_block_private_data_t
)dbcpd
;
2949 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2950 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2951 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2952 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2953 "run more than once and waited for");
2955 if (atomic_flags
& DBF_CANCELED
) goto out
;
2957 pthread_priority_t op
= 0, p
= 0;
2958 op
= _dispatch_block_invoke_should_set_priority(flags
, dbpd
->dbpd_priority
);
2960 p
= dbpd
->dbpd_priority
;
2962 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2963 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2964 v
= dbpd
->dbpd_voucher
;
2966 ov
= _dispatch_set_priority_and_voucher(p
, v
, 0);
2967 dbpd
->dbpd_thread
= _dispatch_tid_self();
2968 _dispatch_client_callout(dbpd
->dbpd_block
,
2969 _dispatch_Block_invoke(dbpd
->dbpd_block
));
2970 _dispatch_reset_priority_and_voucher(op
, ov
);
2972 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2973 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2974 dispatch_group_leave(_dbpd_group(dbpd
));
2980 _dispatch_block_sync_invoke(void *block
)
2982 dispatch_block_t b
= block
;
2983 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2984 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2985 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2986 if (unlikely(atomic_flags
& DBF_WAITED
)) {
2987 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2988 "run more than once and waited for");
2990 if (atomic_flags
& DBF_CANCELED
) goto out
;
2992 voucher_t ov
= DISPATCH_NO_VOUCHER
;
2993 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2994 ov
= _dispatch_adopt_priority_and_set_voucher(0, dbpd
->dbpd_voucher
, 0);
2997 _dispatch_reset_voucher(ov
, 0);
2999 if ((atomic_flags
& DBF_PERFORM
) == 0) {
3000 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
3001 dispatch_group_leave(_dbpd_group(dbpd
));
3006 oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
3008 // balances dispatch_{,barrier_,}sync
3009 _os_object_release_internal_n(oq
->_as_os_obj
, 2);
3014 _dispatch_block_async_invoke_reset_max_qos(dispatch_queue_t dq
,
3017 uint64_t old_state
, new_state
, qos_bits
= _dq_state_from_qos(qos
);
3019 // Only dispatch queues can reach this point (as opposed to sources or more
3020 // complex objects) which allows us to handle the DIRTY bit protocol by only
3021 // looking at the tail
3022 dispatch_assert(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
);
3025 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
3026 dispatch_assert(_dq_state_is_base_wlh(old_state
));
3027 if ((old_state
& DISPATCH_QUEUE_MAX_QOS_MASK
) <= qos_bits
) {
3028 // Nothing to do if the QoS isn't going down
3029 os_atomic_rmw_loop_give_up(return);
3031 if (_dq_state_is_dirty(old_state
)) {
3032 os_atomic_rmw_loop_give_up({
3033 // just renew the drain lock with an acquire barrier, to see
3034 // what the enqueuer that set DIRTY has done.
3035 // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
3036 // is already in a register
3037 os_atomic_xor2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, acquire
);
3038 if (!dq
->dq_items_tail
) {
3045 new_state
= old_state
;
3046 new_state
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
3047 new_state
|= qos_bits
;
3050 _dispatch_deferred_items_get()->ddi_wlh_needs_update
= true;
3051 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE
);
3054 #define DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE 0x1
3055 #define DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET 0x2
3059 _dispatch_block_async_invoke2(dispatch_block_t b
, unsigned long invoke_flags
)
3061 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
3062 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
3063 if (slowpath(atomic_flags
& DBF_WAITED
)) {
3064 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
3065 "run more than once and waited for");
3068 if (unlikely((dbpd
->dbpd_flags
&
3069 DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE
) &&
3070 !(invoke_flags
& DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET
))) {
3071 dispatch_queue_t dq
= _dispatch_get_current_queue();
3072 dispatch_qos_t qos
= _dispatch_qos_from_pp(_dispatch_get_priority());
3073 if ((dispatch_wlh_t
)dq
== _dispatch_get_wlh() && !dq
->dq_items_tail
) {
3074 _dispatch_block_async_invoke_reset_max_qos(dq
, qos
);
3078 if (!slowpath(atomic_flags
& DBF_CANCELED
)) {
3081 if ((atomic_flags
& DBF_PERFORM
) == 0) {
3082 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
3083 dispatch_group_leave(_dbpd_group(dbpd
));
3087 os_mpsc_queue_t oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
3089 // balances dispatch_{,barrier_,group_}async
3090 _os_object_release_internal_n_inline(oq
->_as_os_obj
, 2);
3093 if (invoke_flags
& DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE
) {
3099 _dispatch_block_async_invoke(void *block
)
3101 _dispatch_block_async_invoke2(block
, 0);
3105 _dispatch_block_async_invoke_and_release(void *block
)
3107 _dispatch_block_async_invoke2(block
, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE
);
3111 _dispatch_block_async_invoke_and_release_mach_barrier(void *block
)
3113 _dispatch_block_async_invoke2(block
, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE
|
3114 DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET
);
3117 DISPATCH_ALWAYS_INLINE
3119 _dispatch_block_supports_wait_and_cancel(dispatch_block_private_data_t dbpd
)
3121 return dbpd
&& !(dbpd
->dbpd_flags
&
3122 DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE
);
3126 dispatch_block_cancel(dispatch_block_t db
)
3128 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3129 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd
))) {
3130 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3131 "dispatch_block_cancel()");
3133 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
, DBF_CANCELED
, relaxed
);
3137 dispatch_block_testcancel(dispatch_block_t db
)
3139 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3140 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd
))) {
3141 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3142 "dispatch_block_testcancel()");
3144 return (bool)(dbpd
->dbpd_atomic_flags
& DBF_CANCELED
);
3148 dispatch_block_wait(dispatch_block_t db
, dispatch_time_t timeout
)
3150 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3151 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd
))) {
3152 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3153 "dispatch_block_wait()");
3156 unsigned int flags
= os_atomic_or_orig2o(dbpd
, dbpd_atomic_flags
,
3157 DBF_WAITING
, relaxed
);
3158 if (slowpath(flags
& (DBF_WAITED
| DBF_WAITING
))) {
3159 DISPATCH_CLIENT_CRASH(flags
, "A block object may not be waited for "
3163 // <rdar://problem/17703192> If we know the queue where this block is
3164 // enqueued, or the thread that's executing it, then we should boost
3167 pthread_priority_t pp
= _dispatch_get_priority();
3169 os_mpsc_queue_t boost_oq
;
3170 boost_oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
3172 // release balances dispatch_{,barrier_,group_}async.
3173 // Can't put the queue back in the timeout case: the block might
3174 // finish after we fell out of group_wait and see our NULL, so
3175 // neither of us would ever release. Side effect: After a _wait
3176 // that times out, subsequent waits will not boost the qos of the
3177 // still-running block.
3178 dx_wakeup(boost_oq
, _dispatch_qos_from_pp(pp
),
3179 DISPATCH_WAKEUP_BLOCK_WAIT
| DISPATCH_WAKEUP_CONSUME_2
);
3182 mach_port_t boost_th
= dbpd
->dbpd_thread
;
3184 _dispatch_thread_override_start(boost_th
, pp
, dbpd
);
3187 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
3188 if (slowpath(performed
> 1 || (boost_th
&& boost_oq
))) {
3189 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3190 "run more than once and waited for");
3193 long ret
= dispatch_group_wait(_dbpd_group(dbpd
), timeout
);
3196 _dispatch_thread_override_end(boost_th
, dbpd
);
3200 // timed out: reverse our changes
3201 (void)os_atomic_and2o(dbpd
, dbpd_atomic_flags
,
3202 ~DBF_WAITING
, relaxed
);
3204 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
,
3205 DBF_WAITED
, relaxed
);
3206 // don't need to re-test here: the second call would see
3207 // the first call's WAITING
3214 dispatch_block_notify(dispatch_block_t db
, dispatch_queue_t queue
,
3215 dispatch_block_t notification_block
)
3217 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3219 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3220 "dispatch_block_notify()");
3222 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
3223 if (slowpath(performed
> 1)) {
3224 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3225 "run more than once and observed");
3228 return dispatch_group_notify(_dbpd_group(dbpd
), queue
, notification_block
);
3233 _dispatch_continuation_init_slow(dispatch_continuation_t dc
,
3234 dispatch_queue_class_t dqu
, dispatch_block_flags_t flags
)
3236 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(dc
->dc_ctxt
);
3237 dispatch_block_flags_t block_flags
= dbpd
->dbpd_flags
;
3238 uintptr_t dc_flags
= dc
->dc_flags
;
3239 os_mpsc_queue_t oq
= dqu
._oq
;
3241 // balanced in d_block_async_invoke_and_release or d_block_wait
3242 if (os_atomic_cmpxchg2o(dbpd
, dbpd_queue
, NULL
, oq
, relaxed
)) {
3243 _os_object_retain_internal_n_inline(oq
->_as_os_obj
, 2);
3246 if (dc_flags
& DISPATCH_OBJ_MACH_BARRIER
) {
3247 dispatch_assert(dc_flags
& DISPATCH_OBJ_CONSUME_BIT
);
3248 dc
->dc_func
= _dispatch_block_async_invoke_and_release_mach_barrier
;
3249 } else if (dc_flags
& DISPATCH_OBJ_CONSUME_BIT
) {
3250 dc
->dc_func
= _dispatch_block_async_invoke_and_release
;
3252 dc
->dc_func
= _dispatch_block_async_invoke
;
3255 flags
|= block_flags
;
3256 if (block_flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3257 _dispatch_continuation_priority_set(dc
, dbpd
->dbpd_priority
, flags
);
3259 _dispatch_continuation_priority_set(dc
, dc
->dc_priority
, flags
);
3261 if (block_flags
& DISPATCH_BLOCK_BARRIER
) {
3262 dc_flags
|= DISPATCH_OBJ_BARRIER_BIT
;
3264 if (block_flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
3265 voucher_t v
= dbpd
->dbpd_voucher
;
3266 dc
->dc_voucher
= v
? _voucher_retain(v
) : NULL
;
3267 dc_flags
|= DISPATCH_OBJ_ENFORCE_VOUCHER
;
3268 _dispatch_voucher_debug("continuation[%p] set", dc
->dc_voucher
, dc
);
3269 _dispatch_voucher_ktrace_dc_push(dc
);
3271 _dispatch_continuation_voucher_set(dc
, oq
, flags
);
3273 dc_flags
|= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
;
3274 dc
->dc_flags
= dc_flags
;
3277 #endif // __BLOCKS__
3279 #pragma mark dispatch_barrier_async
3283 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
3284 dispatch_function_t func
, pthread_priority_t pp
,
3285 dispatch_block_flags_t flags
, uintptr_t dc_flags
)
3287 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
3288 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3289 _dispatch_continuation_async(dq
, dc
);
3292 DISPATCH_ALWAYS_INLINE
3294 _dispatch_barrier_async_f2(dispatch_queue_t dq
, void *ctxt
,
3295 dispatch_function_t func
, pthread_priority_t pp
,
3296 dispatch_block_flags_t flags
)
3298 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3299 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3301 if (!fastpath(dc
)) {
3302 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3305 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3306 _dispatch_continuation_push(dq
, dc
);
3311 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
3312 dispatch_function_t func
)
3314 _dispatch_barrier_async_f2(dq
, ctxt
, func
, 0, 0);
3319 _dispatch_barrier_async_detached_f(dispatch_queue_t dq
, void *ctxt
,
3320 dispatch_function_t func
)
3322 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3323 dc
->dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3326 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3327 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3333 dispatch_barrier_async(dispatch_queue_t dq
, dispatch_block_t work
)
3335 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3336 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3338 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3339 _dispatch_continuation_push(dq
, dc
);
3344 #pragma mark dispatch_async
3347 _dispatch_async_redirect_invoke(dispatch_continuation_t dc
,
3348 dispatch_invoke_context_t dic
, dispatch_invoke_flags_t flags
)
3350 dispatch_thread_frame_s dtf
;
3351 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
3352 dispatch_invoke_flags_t ctxt_flags
= (dispatch_invoke_flags_t
)dc
->dc_ctxt
;
3353 // if we went through _dispatch_root_queue_push_override,
3354 // the "right" root queue was stuffed into dc_func
3355 dispatch_queue_t assumed_rq
= (dispatch_queue_t
)dc
->dc_func
;
3356 dispatch_queue_t dq
= dc
->dc_data
, rq
, old_dq
;
3357 dispatch_priority_t old_dbp
;
3360 flags
&= ~_DISPATCH_INVOKE_AUTORELEASE_MASK
;
3361 flags
|= ctxt_flags
;
3363 old_dq
= _dispatch_get_current_queue();
3365 old_dbp
= _dispatch_root_queue_identity_assume(assumed_rq
);
3366 _dispatch_set_basepri(dq
->dq_priority
);
3368 old_dbp
= _dispatch_set_basepri(dq
->dq_priority
);
3371 _dispatch_thread_frame_push(&dtf
, dq
);
3372 _dispatch_continuation_pop_forwarded(dc
, DISPATCH_NO_VOUCHER
,
3373 DISPATCH_OBJ_CONSUME_BIT
, {
3374 _dispatch_continuation_pop(other_dc
, dic
, flags
, dq
);
3376 _dispatch_thread_frame_pop(&dtf
);
3377 if (assumed_rq
) _dispatch_queue_set_current(old_dq
);
3378 _dispatch_reset_basepri(old_dbp
);
3380 rq
= dq
->do_targetq
;
3381 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
3382 _dispatch_queue_non_barrier_complete(rq
);
3383 rq
= rq
->do_targetq
;
3386 _dispatch_queue_non_barrier_complete(dq
);
3387 _dispatch_release_tailcall(dq
); // pairs with _dispatch_async_redirect_wrap
3390 DISPATCH_ALWAYS_INLINE
3391 static inline dispatch_continuation_t
3392 _dispatch_async_redirect_wrap(dispatch_queue_t dq
, dispatch_object_t dou
)
3394 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3396 dou
._do
->do_next
= NULL
;
3397 dc
->do_vtable
= DC_VTABLE(ASYNC_REDIRECT
);
3399 dc
->dc_ctxt
= (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3401 dc
->dc_other
= dou
._do
;
3402 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3403 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3404 _dispatch_retain(dq
); // released in _dispatch_async_redirect_invoke
3410 _dispatch_async_f_redirect(dispatch_queue_t dq
,
3411 dispatch_object_t dou
, dispatch_qos_t qos
)
3413 if (!slowpath(_dispatch_object_is_redirection(dou
))) {
3414 dou
._dc
= _dispatch_async_redirect_wrap(dq
, dou
);
3416 dq
= dq
->do_targetq
;
3418 // Find the queue to redirect to
3419 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
3420 if (!fastpath(_dispatch_queue_try_acquire_async(dq
))) {
3423 if (!dou
._dc
->dc_ctxt
) {
3424 // find first queue in descending target queue order that has
3425 // an autorelease frequency set, and use that as the frequency for
3426 // this continuation.
3427 dou
._dc
->dc_ctxt
= (void *)
3428 (uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3430 dq
= dq
->do_targetq
;
3433 dx_push(dq
, dou
, qos
);
3436 DISPATCH_ALWAYS_INLINE
3438 _dispatch_continuation_redirect(dispatch_queue_t dq
,
3439 struct dispatch_object_s
*dc
)
3441 _dispatch_trace_continuation_pop(dq
, dc
);
3442 // This is a re-redirect, overrides have already been applied
3443 // by _dispatch_async_f2.
3444 // However we want to end up on the root queue matching `dc` qos, so pick up
3445 // the current override of `dq` which includes dc's overrde (and maybe more)
3446 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3447 _dispatch_async_f_redirect(dq
, dc
, _dq_state_max_qos(dq_state
));
3448 _dispatch_introspection_queue_item_complete(dc
);
3453 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
3455 // <rdar://problem/24738102&24743140> reserving non barrier width
3456 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3457 // equivalent), so we have to check that this thread hasn't enqueued
3458 // anything ahead of this call or we can break ordering
3459 if (slowpath(dq
->dq_items_tail
)) {
3460 return _dispatch_continuation_push(dq
, dc
);
3463 if (slowpath(!_dispatch_queue_try_acquire_async(dq
))) {
3464 return _dispatch_continuation_push(dq
, dc
);
3467 return _dispatch_async_f_redirect(dq
, dc
,
3468 _dispatch_continuation_override_qos(dq
, dc
));
3471 DISPATCH_ALWAYS_INLINE
3473 _dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3474 pthread_priority_t pp
, dispatch_block_flags_t flags
)
3476 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3477 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3479 if (!fastpath(dc
)) {
3480 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3483 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3484 _dispatch_continuation_async2(dq
, dc
, false);
3489 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3491 _dispatch_async_f(dq
, ctxt
, func
, 0, 0);
3496 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq
, void *ctxt
,
3497 dispatch_function_t func
)
3499 _dispatch_async_f(dq
, ctxt
, func
, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS
);
3504 dispatch_async(dispatch_queue_t dq
, dispatch_block_t work
)
3506 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3507 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3509 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3510 _dispatch_continuation_async(dq
, dc
);
3515 #pragma mark dispatch_group_async
3517 DISPATCH_ALWAYS_INLINE
3519 _dispatch_continuation_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3520 dispatch_continuation_t dc
)
3522 dispatch_group_enter(dg
);
3524 _dispatch_continuation_async(dq
, dc
);
3529 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
3530 dispatch_function_t func
)
3532 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3533 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3535 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, 0, 0, dc_flags
);
3536 _dispatch_continuation_group_async(dg
, dq
, dc
);
3541 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3542 dispatch_block_t db
)
3544 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3545 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3547 _dispatch_continuation_init(dc
, dq
, db
, 0, 0, dc_flags
);
3548 _dispatch_continuation_group_async(dg
, dq
, dc
);
3553 #pragma mark _dispatch_sync_invoke / _dispatch_sync_complete
3557 _dispatch_queue_non_barrier_complete(dispatch_queue_t dq
)
3559 uint64_t old_state
, new_state
, owner_self
= _dispatch_lock_value_for_self();
3561 // see _dispatch_queue_resume()
3562 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
3563 new_state
= old_state
- DISPATCH_QUEUE_WIDTH_INTERVAL
;
3564 if (unlikely(_dq_state_drain_locked(old_state
))) {
3565 // make drain_try_unlock() fail and reconsider whether there's
3566 // enough width now for a new item
3567 new_state
|= DISPATCH_QUEUE_DIRTY
;
3568 } else if (likely(_dq_state_is_runnable(new_state
))) {
3569 uint64_t full_width
= new_state
;
3570 if (_dq_state_has_pending_barrier(old_state
)) {
3571 full_width
-= DISPATCH_QUEUE_PENDING_BARRIER
;
3572 full_width
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
3573 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3575 full_width
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3576 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3578 if ((full_width
& DISPATCH_QUEUE_WIDTH_MASK
) ==
3579 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
3580 new_state
= full_width
;
3581 new_state
&= ~DISPATCH_QUEUE_DIRTY
;
3582 new_state
|= owner_self
;
3583 } else if (_dq_state_is_dirty(old_state
)) {
3584 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
3589 if ((old_state
^ new_state
) & DISPATCH_QUEUE_IN_BARRIER
) {
3590 if (_dq_state_is_dirty(old_state
)) {
3591 // <rdar://problem/14637483>
3592 // dependency ordering for dq state changes that were flushed
3593 // and not acted upon
3594 os_atomic_thread_fence(dependency
);
3595 dq
= os_atomic_force_dependency_on(dq
, old_state
);
3597 return _dispatch_queue_barrier_complete(dq
, 0, 0);
3600 if ((old_state
^ new_state
) & DISPATCH_QUEUE_ENQUEUED
) {
3601 _dispatch_retain_2(dq
);
3602 dispatch_assert(!_dq_state_is_base_wlh(new_state
));
3603 return dx_push(dq
->do_targetq
, dq
, _dq_state_max_qos(new_state
));
3608 DISPATCH_ALWAYS_INLINE
3610 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3611 dispatch_function_t func
)
3613 dispatch_thread_frame_s dtf
;
3614 _dispatch_thread_frame_push(&dtf
, dq
);
3615 _dispatch_client_callout(ctxt
, func
);
3616 _dispatch_perfmon_workitem_inc();
3617 _dispatch_thread_frame_pop(&dtf
);
3622 _dispatch_sync_function_invoke(dispatch_queue_t dq
, void *ctxt
,
3623 dispatch_function_t func
)
3625 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3630 _dispatch_sync_complete_recurse(dispatch_queue_t dq
, dispatch_queue_t stop_dq
,
3633 bool barrier
= (dc_flags
& DISPATCH_OBJ_BARRIER_BIT
);
3635 if (dq
== stop_dq
) return;
3637 _dispatch_queue_barrier_complete(dq
, 0, 0);
3639 _dispatch_queue_non_barrier_complete(dq
);
3641 dq
= dq
->do_targetq
;
3642 barrier
= (dq
->dq_width
== 1);
3643 } while (unlikely(dq
->do_targetq
));
3648 _dispatch_sync_invoke_and_complete_recurse(dispatch_queue_t dq
, void *ctxt
,
3649 dispatch_function_t func
, uintptr_t dc_flags
)
3651 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3652 _dispatch_sync_complete_recurse(dq
, NULL
, dc_flags
);
3657 _dispatch_sync_invoke_and_complete(dispatch_queue_t dq
, void *ctxt
,
3658 dispatch_function_t func
)
3660 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3661 _dispatch_queue_non_barrier_complete(dq
);
3665 * For queues we can cheat and inline the unlock code, which is invalid
3666 * for objects with a more complex state machine (sources or mach channels)
3670 _dispatch_queue_barrier_sync_invoke_and_complete(dispatch_queue_t dq
,
3671 void *ctxt
, dispatch_function_t func
)
3673 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3674 if (unlikely(dq
->dq_items_tail
|| dq
->dq_width
> 1)) {
3675 return _dispatch_queue_barrier_complete(dq
, 0, 0);
3678 // Presence of any of these bits requires more work that only
3679 // _dispatch_queue_barrier_complete() handles properly
3681 // Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
3682 // checking the role is sloppy, but is a super fast check, and neither of
3683 // these bits should be set if the lock was never contended/discovered.
3684 const uint64_t fail_unlock_mask
= DISPATCH_QUEUE_SUSPEND_BITS_MASK
|
3685 DISPATCH_QUEUE_ENQUEUED
| DISPATCH_QUEUE_DIRTY
|
3686 DISPATCH_QUEUE_RECEIVED_OVERRIDE
| DISPATCH_QUEUE_SYNC_TRANSFER
|
3687 DISPATCH_QUEUE_RECEIVED_SYNC_WAIT
;
3688 uint64_t old_state
, new_state
;
3690 // similar to _dispatch_queue_drain_try_unlock
3691 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
3692 new_state
= old_state
- DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
3693 new_state
&= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK
;
3694 new_state
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
3695 if (unlikely(old_state
& fail_unlock_mask
)) {
3696 os_atomic_rmw_loop_give_up({
3697 return _dispatch_queue_barrier_complete(dq
, 0, 0);
3701 if (_dq_state_is_base_wlh(old_state
)) {
3702 _dispatch_event_loop_assert_not_owned((dispatch_wlh_t
)dq
);
3707 #pragma mark _dispatch_sync_wait / _dispatch_sync_waiter_wake
3709 #define DISPATCH_SYNC_WAITER_NO_UNLOCK (~0ull)
3713 _dispatch_sync_waiter_wake(dispatch_sync_context_t dsc
,
3714 dispatch_wlh_t wlh
, uint64_t old_state
, uint64_t new_state
)
3716 dispatch_wlh_t waiter_wlh
= dsc
->dc_data
;
3718 if (_dq_state_in_sync_transfer(old_state
) ||
3719 _dq_state_in_sync_transfer(new_state
) ||
3720 (waiter_wlh
!= DISPATCH_WLH_ANON
)) {
3721 _dispatch_event_loop_wake_owner(dsc
, wlh
, old_state
, new_state
);
3723 if (waiter_wlh
== DISPATCH_WLH_ANON
) {
3724 if (dsc
->dsc_override_qos
> dsc
->dsc_override_qos_floor
) {
3725 _dispatch_wqthread_override_start(dsc
->dsc_waiter
,
3726 dsc
->dsc_override_qos
);
3728 _dispatch_thread_event_signal(&dsc
->dsc_event
);
3730 _dispatch_introspection_queue_item_complete(dsc
->_as_dc
);
3735 _dispatch_sync_waiter_redirect_or_wake(dispatch_queue_t dq
, uint64_t owned
,
3736 dispatch_object_t dou
)
3738 dispatch_sync_context_t dsc
= (dispatch_sync_context_t
)dou
._dc
;
3739 uint64_t next_owner
= 0, old_state
, new_state
;
3740 dispatch_wlh_t wlh
= NULL
;
3742 _dispatch_trace_continuation_pop(dq
, dsc
->_as_dc
);
3744 if (owned
== DISPATCH_SYNC_WAITER_NO_UNLOCK
) {
3745 dispatch_assert(!(dsc
->dc_flags
& DISPATCH_OBJ_BARRIER_BIT
));
3746 new_state
= old_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3748 if (dsc
->dc_flags
& DISPATCH_OBJ_BARRIER_BIT
) {
3749 next_owner
= _dispatch_lock_value_from_tid(dsc
->dsc_waiter
);
3751 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
3752 new_state
= old_state
- owned
;
3753 new_state
&= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK
;
3754 new_state
&= ~DISPATCH_QUEUE_DIRTY
;
3755 new_state
|= next_owner
;
3756 if (_dq_state_is_base_wlh(old_state
)) {
3757 new_state
|= DISPATCH_QUEUE_SYNC_TRANSFER
;
3760 if (_dq_state_is_base_wlh(old_state
)) {
3761 wlh
= (dispatch_wlh_t
)dq
;
3762 } else if (_dq_state_received_override(old_state
)) {
3763 // Ensure that the root queue sees that this thread was overridden.
3764 _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state
));
3768 if (dsc
->dc_data
== DISPATCH_WLH_ANON
) {
3769 if (dsc
->dsc_override_qos
< _dq_state_max_qos(old_state
)) {
3770 dsc
->dsc_override_qos
= _dq_state_max_qos(old_state
);
3774 if (unlikely(_dq_state_is_inner_queue(old_state
))) {
3775 dispatch_queue_t tq
= dq
->do_targetq
;
3776 if (likely(tq
->dq_width
== 1)) {
3777 dsc
->dc_flags
= DISPATCH_OBJ_BARRIER_BIT
|
3778 DISPATCH_OBJ_SYNC_WAITER_BIT
;
3780 dsc
->dc_flags
= DISPATCH_OBJ_SYNC_WAITER_BIT
;
3782 _dispatch_introspection_queue_item_complete(dsc
->_as_dc
);
3783 return _dispatch_queue_push_sync_waiter(tq
, dsc
, 0);
3786 return _dispatch_sync_waiter_wake(dsc
, wlh
, old_state
, new_state
);
3791 _dispatch_queue_class_barrier_complete(dispatch_queue_t dq
, dispatch_qos_t qos
,
3792 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
,
3795 uint64_t old_state
, new_state
, enqueue
;
3796 dispatch_queue_t tq
;
3798 if (target
== DISPATCH_QUEUE_WAKEUP_MGR
) {
3799 tq
= &_dispatch_mgr_q
;
3800 enqueue
= DISPATCH_QUEUE_ENQUEUED_ON_MGR
;
3801 } else if (target
) {
3802 tq
= (target
== DISPATCH_QUEUE_WAKEUP_TARGET
) ? dq
->do_targetq
: target
;
3803 enqueue
= DISPATCH_QUEUE_ENQUEUED
;
3809 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
3810 new_state
= _dq_state_merge_qos(old_state
- owned
, qos
);
3811 new_state
&= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK
;
3812 if (unlikely(_dq_state_is_suspended(old_state
))) {
3813 if (likely(_dq_state_is_base_wlh(old_state
))) {
3814 new_state
&= ~DISPATCH_QUEUE_ENQUEUED
;
3816 } else if (enqueue
) {
3817 if (!_dq_state_is_enqueued(old_state
)) {
3818 new_state
|= enqueue
;
3820 } else if (unlikely(_dq_state_is_dirty(old_state
))) {
3821 os_atomic_rmw_loop_give_up({
3822 // just renew the drain lock with an acquire barrier, to see
3823 // what the enqueuer that set DIRTY has done.
3824 // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
3825 // is already in a register
3826 os_atomic_xor2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, acquire
);
3827 flags
|= DISPATCH_WAKEUP_BARRIER_COMPLETE
;
3828 return dx_wakeup(dq
, qos
, flags
);
3830 } else if (likely(_dq_state_is_base_wlh(old_state
))) {
3831 new_state
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
3832 new_state
&= ~DISPATCH_QUEUE_ENQUEUED
;
3834 new_state
&= ~DISPATCH_QUEUE_MAX_QOS_MASK
;
3838 dispatch_assert(_dq_state_drain_locked_by_self(old_state
));
3839 dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state
));
3842 if (_dq_state_received_override(old_state
)) {
3843 // Ensure that the root queue sees that this thread was overridden.
3844 _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state
));
3848 if (likely((old_state
^ new_state
) & enqueue
)) {
3849 dispatch_assert(_dq_state_is_enqueued(new_state
));
3850 dispatch_assert(flags
& DISPATCH_WAKEUP_CONSUME_2
);
3851 return _dispatch_queue_push_queue(tq
, dq
, new_state
);
3853 #if HAVE_PTHREAD_WORKQUEUE_QOS
3854 // <rdar://problem/27694093> when doing sync to async handoff
3855 // if the queue received an override we have to forecefully redrive
3856 // the same override so that a new stealer is enqueued because
3857 // the previous one may be gone already
3858 if (_dq_state_should_override(new_state
)) {
3859 return _dispatch_queue_class_wakeup_with_override(dq
, new_state
,
3864 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
3865 return _dispatch_release_2_tailcall(dq
);
3871 _dispatch_queue_barrier_complete(dispatch_queue_t dq
, dispatch_qos_t qos
,
3872 dispatch_wakeup_flags_t flags
)
3874 dispatch_continuation_t dc_tmp
, dc_start
= NULL
, dc_end
= NULL
;
3875 dispatch_queue_wakeup_target_t target
= DISPATCH_QUEUE_WAKEUP_NONE
;
3876 struct dispatch_object_s
*dc
= NULL
;
3877 uint64_t owned
= DISPATCH_QUEUE_IN_BARRIER
+
3878 dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3881 dispatch_assert(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
);
3883 if (dq
->dq_items_tail
&& !DISPATCH_QUEUE_IS_SUSPENDED(dq
)) {
3884 dc
= _dispatch_queue_head(dq
);
3885 if (!_dispatch_object_is_sync_waiter(dc
)) {
3886 // not a slow item, needs to wake up
3887 } else if (likely(dq
->dq_width
== 1) ||
3888 _dispatch_object_is_barrier(dc
)) {
3889 // rdar://problem/8290662 "barrier/writer lock transfer"
3890 dc_start
= dc_end
= (dispatch_continuation_t
)dc
;
3893 dc
= _dispatch_queue_next(dq
, dc
);
3895 // <rdar://problem/10164594> "reader lock transfer"
3896 // we must not wake waiters immediately because our right
3897 // for dequeuing is granted through holding the full "barrier" width
3898 // which a signaled work item could relinquish out from our feet
3899 dc_start
= (dispatch_continuation_t
)dc
;
3901 // no check on width here because concurrent queues
3902 // do not respect width for blocked readers, the thread
3903 // is already spent anyway
3904 dc_end
= (dispatch_continuation_t
)dc
;
3905 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
3907 dc
= _dispatch_queue_next(dq
, dc
);
3908 } while (dc
&& _dispatch_object_is_sync_waiter_non_barrier(dc
));
3914 dc_start
= dc_start
->do_next
;
3915 _dispatch_sync_waiter_redirect_or_wake(dq
, owned
, dc_tmp
);
3916 owned
= DISPATCH_SYNC_WAITER_NO_UNLOCK
;
3917 } while (dc_tmp
!= dc_end
);
3918 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
3919 return _dispatch_release_2_tailcall(dq
);
3923 if (!(flags
& DISPATCH_WAKEUP_CONSUME_2
)) {
3924 _dispatch_retain_2(dq
);
3925 flags
|= DISPATCH_WAKEUP_CONSUME_2
;
3927 target
= DISPATCH_QUEUE_WAKEUP_TARGET
;
3930 return _dispatch_queue_class_barrier_complete(dq
, qos
, flags
, target
,owned
);
3933 #if DISPATCH_COCOA_COMPAT
3935 _dispatch_sync_thread_bound_invoke(void *ctxt
)
3937 dispatch_sync_context_t dsc
= ctxt
;
3938 dispatch_queue_t cq
= _dispatch_queue_get_current();
3939 dispatch_queue_t orig_dq
= dsc
->dc_other
;
3940 dispatch_thread_frame_s dtf
;
3941 dispatch_assert(_dispatch_queue_is_thread_bound(cq
));
3943 // the block runs on the thread the queue is bound to and not
3944 // on the calling thread, but we mean to see the calling thread
3945 // dispatch thread frames, so we fake the link, and then undo it
3946 _dispatch_thread_frame_push_and_rebase(&dtf
, orig_dq
, &dsc
->dsc_dtf
);
3947 _dispatch_client_callout(dsc
->dsc_ctxt
, dsc
->dsc_func
);
3948 _dispatch_thread_frame_pop(&dtf
);
3950 // communicate back to _dispatch_sync_wait who the thread bound queue
3951 // was so that we skip it during _dispatch_sync_complete_recurse
3953 dsc
->dsc_func
= NULL
;
3954 _dispatch_thread_event_signal(&dsc
->dsc_event
); // release
3958 DISPATCH_ALWAYS_INLINE
3959 static inline uint64_t
3960 _dispatch_sync_wait_prepare(dispatch_queue_t dq
)
3962 uint64_t old_state
, new_state
;
3964 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
3965 if (_dq_state_is_suspended(old_state
) ||
3966 !_dq_state_is_base_wlh(old_state
)) {
3967 os_atomic_rmw_loop_give_up(return old_state
);
3969 if (!_dq_state_drain_locked(old_state
) ||
3970 _dq_state_in_sync_transfer(old_state
)) {
3971 os_atomic_rmw_loop_give_up(return old_state
);
3973 new_state
= old_state
| DISPATCH_QUEUE_RECEIVED_SYNC_WAIT
;
3979 _dispatch_sync_waiter_compute_wlh(dispatch_queue_t dq
,
3980 dispatch_sync_context_t dsc
)
3982 bool needs_locking
= _dispatch_queue_is_legacy(dq
);
3984 if (needs_locking
) {
3985 dsc
->dsc_release_storage
= true;
3986 _dispatch_queue_sidelock_lock(dq
);
3989 dispatch_queue_t tq
= dq
->do_targetq
;
3990 uint64_t dq_state
= _dispatch_sync_wait_prepare(tq
);
3992 if (_dq_state_is_suspended(dq_state
) ||
3993 _dq_state_is_base_anon(dq_state
)) {
3994 dsc
->dsc_release_storage
= false;
3995 dsc
->dc_data
= DISPATCH_WLH_ANON
;
3996 } else if (_dq_state_is_base_wlh(dq_state
)) {
3997 if (dsc
->dsc_release_storage
) {
3998 _dispatch_queue_retain_storage(tq
);
4000 dsc
->dc_data
= (dispatch_wlh_t
)tq
;
4002 _dispatch_sync_waiter_compute_wlh(tq
, dsc
);
4004 if (needs_locking
) _dispatch_queue_sidelock_unlock(dq
);
4009 _dispatch_sync_wait(dispatch_queue_t top_dq
, void *ctxt
,
4010 dispatch_function_t func
, uintptr_t top_dc_flags
,
4011 dispatch_queue_t dq
, uintptr_t dc_flags
)
4013 pthread_priority_t pp
= _dispatch_get_priority();
4014 dispatch_tid tid
= _dispatch_tid_self();
4018 dq_state
= _dispatch_sync_wait_prepare(dq
);
4019 if (unlikely(_dq_state_drain_locked_by(dq_state
, tid
))) {
4020 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
4021 "dispatch_sync called on queue "
4022 "already owned by current thread");
4025 struct dispatch_sync_context_s dsc
= {
4026 .dc_flags
= dc_flags
| DISPATCH_OBJ_SYNC_WAITER_BIT
,
4028 .dc_priority
= pp
| _PTHREAD_PRIORITY_ENFORCE_FLAG
,
4029 .dc_voucher
= DISPATCH_NO_VOUCHER
,
4034 if (_dq_state_is_suspended(dq_state
) ||
4035 _dq_state_is_base_anon(dq_state
)) {
4036 dsc
.dc_data
= DISPATCH_WLH_ANON
;
4037 } else if (_dq_state_is_base_wlh(dq_state
)) {
4038 dsc
.dc_data
= (dispatch_wlh_t
)dq
;
4040 _dispatch_sync_waiter_compute_wlh(dq
, &dsc
);
4042 #if DISPATCH_COCOA_COMPAT
4043 // It's preferred to execute synchronous blocks on the current thread
4044 // due to thread-local side effects, etc. However, blocks submitted
4045 // to the main thread MUST be run on the main thread
4047 // Since we don't know whether that will happen, save the frame linkage
4048 // for the sake of _dispatch_sync_thread_bound_invoke
4049 _dispatch_thread_frame_save_state(&dsc
.dsc_dtf
);
4051 // Since the continuation doesn't have the CONSUME bit, the voucher will be
4052 // retained on adoption on the thread bound queue if it happens so we can
4053 // borrow this thread's reference
4054 dsc
.dc_voucher
= _voucher_get();
4055 dsc
.dc_func
= _dispatch_sync_thread_bound_invoke
;
4059 if (dsc
.dc_data
== DISPATCH_WLH_ANON
) {
4060 dsc
.dsc_override_qos_floor
= dsc
.dsc_override_qos
=
4061 _dispatch_get_basepri_override_qos_floor();
4062 qos
= _dispatch_qos_from_pp(pp
);
4063 _dispatch_thread_event_init(&dsc
.dsc_event
);
4067 _dispatch_queue_push_sync_waiter(dq
, &dsc
, qos
);
4068 if (dsc
.dc_data
== DISPATCH_WLH_ANON
) {
4069 _dispatch_thread_event_wait(&dsc
.dsc_event
); // acquire
4070 _dispatch_thread_event_destroy(&dsc
.dsc_event
);
4071 // If _dispatch_sync_waiter_wake() gave this thread an override,
4072 // ensure that the root queue sees it.
4073 if (dsc
.dsc_override_qos
> dsc
.dsc_override_qos_floor
) {
4074 _dispatch_set_basepri_override_qos(dsc
.dsc_override_qos
);
4077 _dispatch_event_loop_wait_for_ownership(&dsc
);
4079 _dispatch_introspection_sync_begin(top_dq
);
4080 #if DISPATCH_COCOA_COMPAT
4081 if (unlikely(dsc
.dsc_func
== NULL
)) {
4082 // Queue bound to a non-dispatch thread, the continuation already ran
4083 // so just unlock all the things, except for the thread bound queue
4084 dispatch_queue_t bound_dq
= dsc
.dc_other
;
4085 return _dispatch_sync_complete_recurse(top_dq
, bound_dq
, top_dc_flags
);
4088 _dispatch_sync_invoke_and_complete_recurse(top_dq
, ctxt
, func
,top_dc_flags
);
4093 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
4094 dispatch_function_t func
, uintptr_t dc_flags
)
4096 if (unlikely(!dq
->do_targetq
)) {
4097 return _dispatch_sync_function_invoke(dq
, ctxt
, func
);
4099 _dispatch_sync_wait(dq
, ctxt
, func
, dc_flags
, dq
, dc_flags
);
4103 #pragma mark dispatch_sync / dispatch_barrier_sync
4107 _dispatch_sync_recurse(dispatch_queue_t dq
, void *ctxt
,
4108 dispatch_function_t func
, uintptr_t dc_flags
)
4110 dispatch_tid tid
= _dispatch_tid_self();
4111 dispatch_queue_t tq
= dq
->do_targetq
;
4114 if (likely(tq
->dq_width
== 1)) {
4115 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq
, tid
))) {
4116 return _dispatch_sync_wait(dq
, ctxt
, func
, dc_flags
, tq
,
4117 DISPATCH_OBJ_BARRIER_BIT
);
4120 if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq
))) {
4121 return _dispatch_sync_wait(dq
, ctxt
, func
, dc_flags
, tq
, 0);
4124 tq
= tq
->do_targetq
;
4125 } while (unlikely(tq
->do_targetq
));
4127 return _dispatch_sync_invoke_and_complete_recurse(dq
, ctxt
, func
, dc_flags
);
4132 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
4133 dispatch_function_t func
)
4135 dispatch_tid tid
= _dispatch_tid_self();
4137 // The more correct thing to do would be to merge the qos of the thread
4138 // that just acquired the barrier lock into the queue state.
4140 // However this is too expensive for the fastpath, so skip doing it.
4141 // The chosen tradeoff is that if an enqueue on a lower priority thread
4142 // contends with this fastpath, this thread may receive a useless override.
4144 // Global concurrent queues and queues bound to non-dispatch threads
4145 // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
4146 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq
, tid
))) {
4147 return _dispatch_sync_f_slow(dq
, ctxt
, func
, DISPATCH_OBJ_BARRIER_BIT
);
4150 _dispatch_introspection_sync_begin(dq
);
4151 if (unlikely(dq
->do_targetq
->do_targetq
)) {
4152 return _dispatch_sync_recurse(dq
, ctxt
, func
, DISPATCH_OBJ_BARRIER_BIT
);
4154 _dispatch_queue_barrier_sync_invoke_and_complete(dq
, ctxt
, func
);
4159 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
4161 if (likely(dq
->dq_width
== 1)) {
4162 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
4165 // Global concurrent queues and queues bound to non-dispatch threads
4166 // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
4167 if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq
))) {
4168 return _dispatch_sync_f_slow(dq
, ctxt
, func
, 0);
4171 _dispatch_introspection_sync_begin(dq
);
4172 if (unlikely(dq
->do_targetq
->do_targetq
)) {
4173 return _dispatch_sync_recurse(dq
, ctxt
, func
, 0);
4175 _dispatch_sync_invoke_and_complete(dq
, ctxt
, func
);
4181 _dispatch_sync_block_with_private_data(dispatch_queue_t dq
,
4182 dispatch_block_t work
, dispatch_block_flags_t flags
)
4184 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(work
);
4185 pthread_priority_t op
= 0, p
= 0;
4187 flags
|= dbpd
->dbpd_flags
;
4188 op
= _dispatch_block_invoke_should_set_priority(flags
, dbpd
->dbpd_priority
);
4190 p
= dbpd
->dbpd_priority
;
4192 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
4193 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
4194 v
= dbpd
->dbpd_voucher
;
4196 ov
= _dispatch_set_priority_and_voucher(p
, v
, 0);
4198 // balanced in d_block_sync_invoke or d_block_wait
4199 if (os_atomic_cmpxchg2o(dbpd
, dbpd_queue
, NULL
, dq
->_as_oq
, relaxed
)) {
4200 _dispatch_retain_2(dq
);
4202 if (flags
& DISPATCH_BLOCK_BARRIER
) {
4203 dispatch_barrier_sync_f(dq
, work
, _dispatch_block_sync_invoke
);
4205 dispatch_sync_f(dq
, work
, _dispatch_block_sync_invoke
);
4207 _dispatch_reset_priority_and_voucher(op
, ov
);
4211 dispatch_barrier_sync(dispatch_queue_t dq
, dispatch_block_t work
)
4213 if (unlikely(_dispatch_block_has_private_data(work
))) {
4214 dispatch_block_flags_t flags
= DISPATCH_BLOCK_BARRIER
;
4215 return _dispatch_sync_block_with_private_data(dq
, work
, flags
);
4217 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
4221 dispatch_sync(dispatch_queue_t dq
, dispatch_block_t work
)
4223 if (unlikely(_dispatch_block_has_private_data(work
))) {
4224 return _dispatch_sync_block_with_private_data(dq
, work
, 0);
4226 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
4228 #endif // __BLOCKS__
4231 #pragma mark dispatch_trysync
4235 _dispatch_barrier_trysync_or_async_f_complete(dispatch_queue_t dq
,
4236 void *ctxt
, dispatch_function_t func
, uint32_t flags
)
4238 dispatch_wakeup_flags_t wflags
= DISPATCH_WAKEUP_BARRIER_COMPLETE
;
4240 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
4241 if (flags
& DISPATCH_BARRIER_TRYSYNC_SUSPEND
) {
4242 uint64_t dq_state
= os_atomic_sub2o(dq
, dq_state
,
4243 DISPATCH_QUEUE_SUSPEND_INTERVAL
, relaxed
);
4244 if (!_dq_state_is_suspended(dq_state
)) {
4245 wflags
|= DISPATCH_WAKEUP_CONSUME_2
;
4248 dx_wakeup(dq
, 0, wflags
);
4251 // Use for mutation of queue-/source-internal state only
4252 // ignores target queue hierarchy!
4255 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq
, void *ctxt
,
4256 dispatch_function_t func
, uint32_t flags
)
4258 dispatch_tid tid
= _dispatch_tid_self();
4259 uint64_t suspend_count
= (flags
& DISPATCH_BARRIER_TRYSYNC_SUSPEND
) ? 1 : 0;
4260 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync_and_suspend(dq
, tid
,
4262 return _dispatch_barrier_async_detached_f(dq
, ctxt
, func
);
4264 if (flags
& DISPATCH_BARRIER_TRYSYNC_SUSPEND
) {
4265 _dispatch_retain_2(dq
); // see _dispatch_queue_suspend
4267 _dispatch_barrier_trysync_or_async_f_complete(dq
, ctxt
, func
, flags
);
4272 _dispatch_trysync_recurse(dispatch_queue_t dq
, void *ctxt
,
4273 dispatch_function_t f
, uintptr_t dc_flags
)
4275 dispatch_tid tid
= _dispatch_tid_self();
4276 dispatch_queue_t q
, tq
= dq
->do_targetq
;
4279 if (likely(tq
->do_targetq
== NULL
)) {
4280 _dispatch_sync_invoke_and_complete_recurse(dq
, ctxt
, f
, dc_flags
);
4283 if (unlikely(_dispatch_queue_cannot_trysync(tq
))) {
4284 for (q
= dq
; q
!= tq
; q
= q
->do_targetq
) {
4285 _dispatch_queue_atomic_flags_set(q
, DQF_CANNOT_TRYSYNC
);
4289 if (likely(tq
->dq_width
== 1)) {
4290 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq
, tid
))) {
4294 if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq
))) {
4298 tq
= tq
->do_targetq
;
4301 _dispatch_sync_complete_recurse(dq
, tq
, dc_flags
);
4307 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
4308 dispatch_function_t f
)
4310 dispatch_tid tid
= _dispatch_tid_self();
4311 if (unlikely(!dq
->do_targetq
)) {
4312 DISPATCH_CLIENT_CRASH(dq
, "_dispatch_trsync called on a root queue");
4314 if (unlikely(_dispatch_queue_cannot_trysync(dq
))) {
4317 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq
, tid
))) {
4320 return _dispatch_trysync_recurse(dq
, ctxt
, f
, DISPATCH_OBJ_BARRIER_BIT
);
4325 _dispatch_trysync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t f
)
4327 if (likely(dq
->dq_width
== 1)) {
4328 return _dispatch_barrier_trysync_f(dq
, ctxt
, f
);
4330 if (unlikely(!dq
->do_targetq
)) {
4331 DISPATCH_CLIENT_CRASH(dq
, "_dispatch_trsync called on a root queue");
4333 if (unlikely(_dispatch_queue_cannot_trysync(dq
))) {
4336 if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq
))) {
4339 return _dispatch_trysync_recurse(dq
, ctxt
, f
, 0);
4343 #pragma mark dispatch_queue_wakeup
4347 _dispatch_queue_wakeup(dispatch_queue_t dq
, dispatch_qos_t qos
,
4348 dispatch_wakeup_flags_t flags
)
4350 dispatch_queue_wakeup_target_t target
= DISPATCH_QUEUE_WAKEUP_NONE
;
4352 if (unlikely(flags
& DISPATCH_WAKEUP_BARRIER_COMPLETE
)) {
4353 return _dispatch_queue_barrier_complete(dq
, qos
, flags
);
4355 if (_dispatch_queue_class_probe(dq
)) {
4356 target
= DISPATCH_QUEUE_WAKEUP_TARGET
;
4358 return _dispatch_queue_class_wakeup(dq
, qos
, flags
, target
);
4361 #if DISPATCH_COCOA_COMPAT
4362 DISPATCH_ALWAYS_INLINE
4364 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle
)
4367 return MACH_PORT_VALID(handle
);
4368 #elif defined(__linux__)
4371 #error "runloop support not implemented on this platform"
4375 DISPATCH_ALWAYS_INLINE
4376 static inline dispatch_runloop_handle_t
4377 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq
)
4380 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
);
4381 #elif defined(__linux__)
4382 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4383 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
) - 1;
4385 #error "runloop support not implemented on this platform"
4389 DISPATCH_ALWAYS_INLINE
4391 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq
, dispatch_runloop_handle_t handle
)
4394 dq
->do_ctxt
= (void *)(uintptr_t)handle
;
4395 #elif defined(__linux__)
4396 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4397 dq
->do_ctxt
= (void *)(uintptr_t)(handle
+ 1);
4399 #error "runloop support not implemented on this platform"
4402 #endif // DISPATCH_COCOA_COMPAT
4404 DISPATCH_ALWAYS_INLINE
4405 static inline dispatch_qos_t
4406 _dispatch_runloop_queue_reset_max_qos(dispatch_queue_class_t dqu
)
4408 uint64_t old_state
, clear_bits
= DISPATCH_QUEUE_MAX_QOS_MASK
|
4409 DISPATCH_QUEUE_RECEIVED_OVERRIDE
;
4410 old_state
= os_atomic_and_orig2o(dqu
._dq
, dq_state
, ~clear_bits
, relaxed
);
4411 return _dq_state_max_qos(old_state
);
4415 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
, dispatch_qos_t qos
,
4416 dispatch_wakeup_flags_t flags
)
4418 #if DISPATCH_COCOA_COMPAT
4419 if (slowpath(_dispatch_queue_atomic_flags(dq
) & DQF_RELEASED
)) {
4420 // <rdar://problem/14026816>
4421 return _dispatch_queue_wakeup(dq
, qos
, flags
);
4424 if (flags
& DISPATCH_WAKEUP_MAKE_DIRTY
) {
4425 os_atomic_or2o(dq
, dq_state
, DISPATCH_QUEUE_DIRTY
, release
);
4427 if (_dispatch_queue_class_probe(dq
)) {
4428 return _dispatch_runloop_queue_poke(dq
, qos
, flags
);
4431 qos
= _dispatch_runloop_queue_reset_max_qos(dq
);
4433 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4434 if (_dispatch_queue_class_probe(dq
)) {
4435 _dispatch_runloop_queue_poke(dq
, qos
, flags
);
4437 _dispatch_thread_override_end(owner
, dq
);
4440 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
4441 return _dispatch_release_2_tailcall(dq
);
4444 return _dispatch_queue_wakeup(dq
, qos
, flags
);
4449 _dispatch_main_queue_wakeup(dispatch_queue_t dq
, dispatch_qos_t qos
,
4450 dispatch_wakeup_flags_t flags
)
4452 #if DISPATCH_COCOA_COMPAT
4453 if (_dispatch_queue_is_thread_bound(dq
)) {
4454 return _dispatch_runloop_queue_wakeup(dq
, qos
, flags
);
4457 return _dispatch_queue_wakeup(dq
, qos
, flags
);
4461 #pragma mark dispatch root queues poke
4463 #if DISPATCH_COCOA_COMPAT
4465 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq
)
4467 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
4468 if (!_dispatch_runloop_handle_is_valid(handle
)) {
4473 mach_port_t mp
= handle
;
4474 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
4476 case MACH_SEND_TIMEOUT
:
4477 case MACH_SEND_TIMED_OUT
:
4478 case MACH_SEND_INVALID_DEST
:
4481 (void)dispatch_assume_zero(kr
);
4484 #elif defined(__linux__)
4487 result
= eventfd_write(handle
, 1);
4488 } while (result
== -1 && errno
== EINTR
);
4489 (void)dispatch_assume_zero(result
);
4491 #error "runloop support not implemented on this platform"
4497 _dispatch_runloop_queue_poke(dispatch_queue_t dq
, dispatch_qos_t qos
,
4498 dispatch_wakeup_flags_t flags
)
4500 // it's not useful to handle WAKEUP_MAKE_DIRTY because mach_msg() will have
4501 // a release barrier and that when runloop queues stop being thread-bound
4502 // they have a non optional wake-up to start being a "normal" queue
4503 // either in _dispatch_runloop_queue_xref_dispose,
4504 // or in _dispatch_queue_cleanup2() for the main thread.
4505 uint64_t old_state
, new_state
;
4507 if (dq
== &_dispatch_main_q
) {
4508 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
4509 _dispatch_runloop_queue_handle_init
);
4512 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
4513 new_state
= _dq_state_merge_qos(old_state
, qos
);
4514 if (old_state
== new_state
) {
4515 os_atomic_rmw_loop_give_up(goto no_change
);
4519 dispatch_qos_t dq_qos
= _dispatch_priority_qos(dq
->dq_priority
);
4521 mach_port_t owner
= _dq_state_drain_owner(new_state
);
4522 pthread_priority_t pp
= _dispatch_qos_to_pp(qos
);
4523 _dispatch_thread_override_start(owner
, pp
, dq
);
4524 if (_dq_state_max_qos(old_state
) > dq_qos
) {
4525 _dispatch_thread_override_end(owner
, dq
);
4529 _dispatch_runloop_queue_class_poke(dq
);
4530 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
4531 return _dispatch_release_2_tailcall(dq
);
4538 _dispatch_global_queue_poke_slow(dispatch_queue_t dq
, int n
, int floor
)
4540 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4544 _dispatch_root_queues_init();
4545 _dispatch_debug_root_queue(dq
, __func__
);
4546 #if DISPATCH_USE_WORKQUEUES
4547 #if DISPATCH_USE_PTHREAD_POOL
4548 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
4551 _dispatch_root_queue_debug("requesting new worker thread for global "
4553 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4554 if (qc
->dgq_kworkqueue
) {
4555 pthread_workitem_handle_t wh
;
4556 unsigned int gen_cnt
;
4558 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
4559 _dispatch_worker_thread4
, dq
, &wh
, &gen_cnt
);
4560 (void)dispatch_assume_zero(r
);
4561 } while (--remaining
);
4564 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4565 #if HAVE_PTHREAD_WORKQUEUE_QOS
4566 r
= _pthread_workqueue_addthreads(remaining
,
4567 _dispatch_priority_to_pp(dq
->dq_priority
));
4568 #elif DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4569 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
4570 qc
->dgq_wq_options
, remaining
);
4572 (void)dispatch_assume_zero(r
);
4575 #endif // DISPATCH_USE_WORKQUEUES
4576 #if DISPATCH_USE_PTHREAD_POOL
4577 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
4578 if (fastpath(pqc
->dpq_thread_mediator
.do_vtable
)) {
4579 while (dispatch_semaphore_signal(&pqc
->dpq_thread_mediator
)) {
4580 _dispatch_root_queue_debug("signaled sleeping worker for "
4581 "global queue: %p", dq
);
4588 bool overcommit
= dq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
;
4590 os_atomic_add2o(qc
, dgq_pending
, remaining
, relaxed
);
4592 if (!os_atomic_cmpxchg2o(qc
, dgq_pending
, 0, remaining
, relaxed
)) {
4593 _dispatch_root_queue_debug("worker thread request still pending for "
4594 "global queue: %p", dq
);
4599 int32_t can_request
, t_count
;
4600 // seq_cst with atomic store to tail <rdar://problem/16932833>
4601 t_count
= os_atomic_load2o(qc
, dgq_thread_pool_size
, ordered
);
4603 can_request
= t_count
< floor
? 0 : t_count
- floor
;
4604 if (remaining
> can_request
) {
4605 _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
4606 remaining
, can_request
);
4607 os_atomic_sub2o(qc
, dgq_pending
, remaining
- can_request
, relaxed
);
4608 remaining
= can_request
;
4610 if (remaining
== 0) {
4611 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4615 } while (!os_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
4616 t_count
- remaining
, &t_count
, acquire
));
4618 pthread_attr_t
*attr
= &pqc
->dpq_thread_attr
;
4619 pthread_t tid
, *pthr
= &tid
;
4620 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4621 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
4622 pthr
= _dispatch_mgr_root_queue_init();
4626 _dispatch_retain(dq
); // released in _dispatch_worker_thread
4627 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
4629 (void)dispatch_assume_zero(r
);
4631 _dispatch_temporary_resource_shortage();
4633 } while (--remaining
);
4634 #endif // DISPATCH_USE_PTHREAD_POOL
4639 _dispatch_global_queue_poke(dispatch_queue_t dq
, int n
, int floor
)
4641 if (!_dispatch_queue_class_probe(dq
)) {
4644 #if DISPATCH_USE_WORKQUEUES
4645 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4647 #if DISPATCH_USE_PTHREAD_POOL
4648 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
4650 !os_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
4651 _dispatch_root_queue_debug("worker thread request still pending for "
4652 "global queue: %p", dq
);
4655 #endif // DISPATCH_USE_WORKQUEUES
4656 return _dispatch_global_queue_poke_slow(dq
, n
, floor
);
4660 #pragma mark dispatch_queue_drain
4663 _dispatch_continuation_pop(dispatch_object_t dou
, dispatch_invoke_context_t dic
,
4664 dispatch_invoke_flags_t flags
, dispatch_queue_t dq
)
4666 _dispatch_continuation_pop_inline(dou
, dic
, flags
, dq
);
4670 _dispatch_continuation_invoke(dispatch_object_t dou
, voucher_t ov
,
4671 dispatch_invoke_flags_t flags
)
4673 _dispatch_continuation_invoke_inline(dou
, ov
, flags
);
4678 _dispatch_return_to_kernel(void)
4680 if (unlikely(_dispatch_get_wlh() == DISPATCH_WLH_ANON
)) {
4681 _dispatch_clear_return_to_kernel();
4683 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE
);
4688 _dispatch_poll_for_events_4launchd(void)
4690 #if DISPATCH_USE_KEVENT_WORKQUEUE
4691 if (_dispatch_get_wlh()) {
4692 dispatch_assert(_dispatch_deferred_items_get()->ddi_wlh_servicing
);
4693 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE
);
4698 #if HAVE_PTHREAD_WORKQUEUE_NARROWING
4699 static os_atomic(uint64_t) _dispatch_narrowing_deadlines
[DISPATCH_QOS_MAX
];
4700 #if !DISPATCH_TIME_UNIT_USES_NANOSECONDS
4701 static uint64_t _dispatch_narrow_check_interval_cache
;
4704 DISPATCH_ALWAYS_INLINE
4705 static inline uint64_t
4706 _dispatch_narrow_check_interval(void)
4708 #if DISPATCH_TIME_UNIT_USES_NANOSECONDS
4709 return 50 * NSEC_PER_MSEC
;
4711 if (_dispatch_narrow_check_interval_cache
== 0) {
4712 _dispatch_narrow_check_interval_cache
=
4713 _dispatch_time_nano2mach(50 * NSEC_PER_MSEC
);
4715 return _dispatch_narrow_check_interval_cache
;
4719 DISPATCH_ALWAYS_INLINE
4721 _dispatch_queue_drain_init_narrowing_check_deadline(dispatch_invoke_context_t dic
,
4722 dispatch_priority_t pri
)
4724 if (_dispatch_priority_qos(pri
) &&
4725 !(pri
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
)) {
4726 dic
->dic_next_narrow_check
= _dispatch_approximate_time() +
4727 _dispatch_narrow_check_interval();
4733 _dispatch_queue_drain_should_narrow_slow(uint64_t now
,
4734 dispatch_invoke_context_t dic
)
4736 if (dic
->dic_next_narrow_check
!= DISPATCH_THREAD_IS_NARROWING
) {
4737 pthread_priority_t pp
= _dispatch_get_priority();
4738 dispatch_qos_t qos
= _dispatch_qos_from_pp(pp
);
4739 if (unlikely(!qos
|| qos
> countof(_dispatch_narrowing_deadlines
))) {
4740 DISPATCH_CLIENT_CRASH(pp
, "Thread QoS corruption");
4742 size_t idx
= qos
- 1; // no entry needed for DISPATCH_QOS_UNSPECIFIED
4743 os_atomic(uint64_t) *deadline
= &_dispatch_narrowing_deadlines
[idx
];
4744 uint64_t oldval
, newval
= now
+ _dispatch_narrow_check_interval();
4746 dic
->dic_next_narrow_check
= newval
;
4747 os_atomic_rmw_loop(deadline
, oldval
, newval
, relaxed
, {
4749 os_atomic_rmw_loop_give_up(return false);
4753 if (!_pthread_workqueue_should_narrow(pp
)) {
4756 dic
->dic_next_narrow_check
= DISPATCH_THREAD_IS_NARROWING
;
4761 DISPATCH_ALWAYS_INLINE
4763 _dispatch_queue_drain_should_narrow(dispatch_invoke_context_t dic
)
4765 uint64_t next_check
= dic
->dic_next_narrow_check
;
4766 if (unlikely(next_check
)) {
4767 uint64_t now
= _dispatch_approximate_time();
4768 if (unlikely(next_check
< now
)) {
4769 return _dispatch_queue_drain_should_narrow_slow(now
, dic
);
4775 #define _dispatch_queue_drain_init_narrowing_check_deadline(rq, dic) ((void)0)
4776 #define _dispatch_queue_drain_should_narrow(dic) false
4780 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4781 * (redirecting or not).
4785 * Serial drain is about serial queues (width == 1). It doesn't support
4786 * the redirecting mode, which doesn't make sense, and treats all continuations
4787 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4788 * is optimized away.
4790 * Serial drain stops if the width of the queue grows to larger than 1.
4791 * Going through a serial drain prevents any recursive drain from being
4796 * When in non-redirecting mode (meaning one of the target queues is serial),
4797 * non-barriers and barriers alike run in the context of the drain thread.
4798 * Slow non-barrier items are still all signaled so that they can make progress
4799 * toward the dispatch_sync() that will serialize them all .
4801 * In redirecting mode, non-barrier work items are redirected downward.
4803 * Concurrent drain stops if the width of the queue becomes 1, so that the
4804 * queue drain moves to the more efficient serial mode.
4806 DISPATCH_ALWAYS_INLINE
4807 static dispatch_queue_wakeup_target_t
4808 _dispatch_queue_drain(dispatch_queue_t dq
, dispatch_invoke_context_t dic
,
4809 dispatch_invoke_flags_t flags
, uint64_t *owned_ptr
, bool serial_drain
)
4811 dispatch_queue_t orig_tq
= dq
->do_targetq
;
4812 dispatch_thread_frame_s dtf
;
4813 struct dispatch_object_s
*dc
= NULL
, *next_dc
;
4814 uint64_t dq_state
, owned
= *owned_ptr
;
4816 if (unlikely(!dq
->dq_items_tail
)) return NULL
;
4818 _dispatch_thread_frame_push(&dtf
, dq
);
4819 if (serial_drain
|| _dq_state_is_in_barrier(owned
)) {
4820 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4821 // but width can change while draining barrier work items, so we only
4822 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4823 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4825 owned
&= DISPATCH_QUEUE_WIDTH_MASK
;
4828 dc
= _dispatch_queue_head(dq
);
4829 goto first_iteration
;
4833 if (unlikely(dic
->dic_deferred
)) {
4834 goto out_with_deferred_compute_owned
;
4836 if (unlikely(_dispatch_needs_to_return_to_kernel())) {
4837 _dispatch_return_to_kernel();
4839 if (unlikely(!dc
)) {
4840 if (!dq
->dq_items_tail
) {
4843 dc
= _dispatch_queue_head(dq
);
4845 if (unlikely(serial_drain
!= (dq
->dq_width
== 1))) {
4848 if (unlikely(_dispatch_queue_drain_should_narrow(dic
))) {
4853 dq_state
= os_atomic_load(&dq
->dq_state
, relaxed
);
4854 if (unlikely(_dq_state_is_suspended(dq_state
))) {
4857 if (unlikely(orig_tq
!= dq
->do_targetq
)) {
4861 if (serial_drain
|| _dispatch_object_is_barrier(dc
)) {
4862 if (!serial_drain
&& owned
!= DISPATCH_QUEUE_IN_BARRIER
) {
4863 if (!_dispatch_queue_try_upgrade_full_width(dq
, owned
)) {
4864 goto out_with_no_width
;
4866 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4868 next_dc
= _dispatch_queue_next(dq
, dc
);
4869 if (_dispatch_object_is_sync_waiter(dc
)) {
4871 dic
->dic_deferred
= dc
;
4872 goto out_with_deferred
;
4875 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4876 // we just ran barrier work items, we have to make their
4877 // effect visible to other sync work items on other threads
4878 // that may start coming in after this point, hence the
4880 os_atomic_xor2o(dq
, dq_state
, owned
, release
);
4881 owned
= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4882 } else if (unlikely(owned
== 0)) {
4883 if (_dispatch_object_is_sync_waiter(dc
)) {
4884 // sync "readers" don't observe the limit
4885 _dispatch_queue_reserve_sync_width(dq
);
4886 } else if (!_dispatch_queue_try_acquire_async(dq
)) {
4887 goto out_with_no_width
;
4889 owned
= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4892 next_dc
= _dispatch_queue_next(dq
, dc
);
4893 if (_dispatch_object_is_sync_waiter(dc
)) {
4894 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4895 _dispatch_sync_waiter_redirect_or_wake(dq
,
4896 DISPATCH_SYNC_WAITER_NO_UNLOCK
, dc
);
4900 if (flags
& DISPATCH_INVOKE_REDIRECTING_DRAIN
) {
4901 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4902 _dispatch_continuation_redirect(dq
, dc
);
4907 _dispatch_continuation_pop_inline(dc
, dic
, flags
, dq
);
4910 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4911 // if we're IN_BARRIER we really own the full width too
4912 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4915 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4917 *owned_ptr
&= DISPATCH_QUEUE_ENQUEUED
| DISPATCH_QUEUE_ENQUEUED_ON_MGR
;
4918 *owned_ptr
|= owned
;
4919 _dispatch_thread_frame_pop(&dtf
);
4920 return dc
? dq
->do_targetq
: NULL
;
4923 *owned_ptr
&= DISPATCH_QUEUE_ENQUEUED
| DISPATCH_QUEUE_ENQUEUED_ON_MGR
;
4924 _dispatch_thread_frame_pop(&dtf
);
4925 return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT
;
4927 out_with_deferred_compute_owned
:
4929 owned
= DISPATCH_QUEUE_IN_BARRIER
+ DISPATCH_QUEUE_WIDTH_INTERVAL
;
4931 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4932 // if we're IN_BARRIER we really own the full width too
4933 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4936 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4940 *owned_ptr
&= DISPATCH_QUEUE_ENQUEUED
| DISPATCH_QUEUE_ENQUEUED_ON_MGR
;
4941 *owned_ptr
|= owned
;
4942 if (unlikely(flags
& DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS
)) {
4943 DISPATCH_INTERNAL_CRASH(dc
,
4944 "Deferred continuation on source, mach channel or mgr");
4946 _dispatch_thread_frame_pop(&dtf
);
4947 return dq
->do_targetq
;
4951 static dispatch_queue_wakeup_target_t
4952 _dispatch_queue_concurrent_drain(dispatch_queue_t dq
,
4953 dispatch_invoke_context_t dic
, dispatch_invoke_flags_t flags
,
4956 return _dispatch_queue_drain(dq
, dic
, flags
, owned
, false);
4960 dispatch_queue_wakeup_target_t
4961 _dispatch_queue_serial_drain(dispatch_queue_t dq
, dispatch_invoke_context_t dic
,
4962 dispatch_invoke_flags_t flags
, uint64_t *owned
)
4964 flags
&= ~(dispatch_invoke_flags_t
)DISPATCH_INVOKE_REDIRECTING_DRAIN
;
4965 return _dispatch_queue_drain(dq
, dic
, flags
, owned
, true);
4968 #if DISPATCH_COCOA_COMPAT
4971 _dispatch_main_queue_update_priority_from_thread(void)
4973 dispatch_queue_t dq
= &_dispatch_main_q
;
4974 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
4975 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
4977 dispatch_priority_t main_pri
=
4978 _dispatch_priority_from_pp_strip_flags(_dispatch_get_priority());
4979 dispatch_qos_t main_qos
= _dispatch_priority_qos(main_pri
);
4980 dispatch_qos_t max_qos
= _dq_state_max_qos(dq_state
);
4981 dispatch_qos_t old_qos
= _dispatch_priority_qos(dq
->dq_priority
);
4983 // the main thread QoS was adjusted by someone else, learn the new QoS
4984 // and reinitialize _dispatch_main_q.dq_priority
4985 dq
->dq_priority
= _dispatch_priority_with_override_qos(main_pri
, main_qos
);
4987 if (old_qos
< max_qos
&& main_qos
== DISPATCH_QOS_UNSPECIFIED
) {
4988 // main thread is opted out of QoS and we had an override
4989 return _dispatch_thread_override_end(owner
, dq
);
4992 if (old_qos
< max_qos
&& max_qos
<= main_qos
) {
4993 // main QoS was raised, and we had an override which is now useless
4994 return _dispatch_thread_override_end(owner
, dq
);
4997 if (main_qos
< max_qos
&& max_qos
<= old_qos
) {
4998 // main thread QoS was lowered, and we actually need an override
4999 pthread_priority_t pp
= _dispatch_qos_to_pp(max_qos
);
5000 return _dispatch_thread_override_start(owner
, pp
, dq
);
5005 _dispatch_main_queue_drain(void)
5007 dispatch_queue_t dq
= &_dispatch_main_q
;
5008 dispatch_thread_frame_s dtf
;
5010 if (!dq
->dq_items_tail
) {
5014 _dispatch_perfmon_start_notrace();
5015 if (!fastpath(_dispatch_queue_is_thread_bound(dq
))) {
5016 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
5017 " after dispatch_main()");
5019 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
5020 if (unlikely(!_dq_state_drain_locked_by_self(dq_state
))) {
5021 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
5022 "_dispatch_main_queue_callback_4CF called"
5023 " from the wrong thread");
5026 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
5027 _dispatch_runloop_queue_handle_init
);
5029 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
5030 // drains the main runloop, as this should not be observable that way
5031 _dispatch_adopt_wlh_anon();
5032 _dispatch_thread_frame_push_and_rebase(&dtf
, dq
, NULL
);
5034 pthread_priority_t pp
= _dispatch_get_priority();
5035 dispatch_priority_t pri
= _dispatch_priority_from_pp(pp
);
5036 dispatch_qos_t qos
= _dispatch_priority_qos(pri
);
5037 voucher_t voucher
= _voucher_copy();
5039 if (unlikely(qos
!= _dispatch_priority_qos(dq
->dq_priority
))) {
5040 _dispatch_main_queue_update_priority_from_thread();
5042 dispatch_priority_t old_dbp
= _dispatch_set_basepri(pri
);
5043 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED
);
5045 dispatch_invoke_context_s dic
= { };
5046 struct dispatch_object_s
*dc
, *next_dc
, *tail
;
5047 dc
= os_mpsc_capture_snapshot(dq
, dq_items
, &tail
);
5049 next_dc
= os_mpsc_pop_snapshot_head(dc
, tail
, do_next
);
5050 _dispatch_continuation_pop_inline(dc
, &dic
, DISPATCH_INVOKE_NONE
, dq
);
5051 } while ((dc
= next_dc
));
5053 dx_wakeup(dq
, 0, 0);
5054 _dispatch_voucher_debug("main queue restore", voucher
);
5055 _dispatch_reset_basepri(old_dbp
);
5056 _dispatch_reset_basepri_override();
5057 _dispatch_reset_priority_and_voucher(pp
, voucher
);
5058 _dispatch_thread_frame_pop(&dtf
);
5059 _dispatch_reset_wlh();
5060 _dispatch_force_cache_cleanup();
5061 _dispatch_perfmon_end_notrace();
5065 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
5067 if (!dq
->dq_items_tail
) {
5070 _dispatch_perfmon_start_notrace();
5071 dispatch_thread_frame_s dtf
;
5072 bool should_reset_wlh
= _dispatch_adopt_wlh_anon_recurse();
5073 _dispatch_thread_frame_push(&dtf
, dq
);
5074 pthread_priority_t pp
= _dispatch_get_priority();
5075 dispatch_priority_t pri
= _dispatch_priority_from_pp(pp
);
5076 voucher_t voucher
= _voucher_copy();
5077 dispatch_priority_t old_dbp
= _dispatch_set_basepri(pri
);
5078 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED
);
5080 dispatch_invoke_context_s dic
= { };
5081 struct dispatch_object_s
*dc
, *next_dc
;
5082 dc
= _dispatch_queue_head(dq
);
5083 next_dc
= _dispatch_queue_next(dq
, dc
);
5084 _dispatch_continuation_pop_inline(dc
, &dic
, DISPATCH_INVOKE_NONE
, dq
);
5087 dx_wakeup(dq
, 0, 0);
5090 _dispatch_voucher_debug("runloop queue restore", voucher
);
5091 _dispatch_reset_basepri(old_dbp
);
5092 _dispatch_reset_basepri_override();
5093 _dispatch_reset_priority_and_voucher(pp
, voucher
);
5094 _dispatch_thread_frame_pop(&dtf
);
5095 if (should_reset_wlh
) _dispatch_reset_wlh();
5096 _dispatch_force_cache_cleanup();
5097 _dispatch_perfmon_end_notrace();
5103 _dispatch_mgr_queue_drain(void)
5105 const dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_MANAGER_DRAIN
;
5106 dispatch_invoke_context_s dic
= { };
5107 dispatch_queue_t dq
= &_dispatch_mgr_q
;
5108 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
5110 if (dq
->dq_items_tail
) {
5111 _dispatch_perfmon_start();
5112 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED
);
5113 if (slowpath(_dispatch_queue_serial_drain(dq
, &dic
, flags
, &owned
))) {
5114 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
5116 _dispatch_voucher_debug("mgr queue clear", NULL
);
5118 _dispatch_reset_basepri_override();
5119 _dispatch_perfmon_end(perfmon_thread_manager
);
5122 #if DISPATCH_USE_KEVENT_WORKQUEUE
5123 if (!_dispatch_kevent_workqueue_enabled
)
5126 _dispatch_force_cache_cleanup();
5131 #pragma mark dispatch_queue_invoke
5134 _dispatch_queue_drain_sync_waiter(dispatch_queue_t dq
,
5135 dispatch_invoke_context_t dic
, dispatch_invoke_flags_t flags
,
5138 struct dispatch_object_s
*dc
= dic
->dic_deferred
;
5139 dispatch_assert(_dispatch_object_is_sync_waiter(dc
));
5140 dic
->dic_deferred
= NULL
;
5141 if (flags
& DISPATCH_INVOKE_WLH
) {
5142 // Leave the enqueued bit in place, completion of the last sync waiter
5143 // in the handoff chain is responsible for dequeuing
5145 // We currently have a +2 to consume, but we need to keep a +1
5146 // for the thread request
5147 dispatch_assert(_dq_state_is_enqueued_on_target(owned
));
5148 dispatch_assert(!_dq_state_is_enqueued_on_manager(owned
));
5149 owned
&= ~DISPATCH_QUEUE_ENQUEUED
;
5150 _dispatch_release_no_dispose(dq
);
5152 // The sync waiter must own a reference
5153 _dispatch_release_2_no_dispose(dq
);
5155 return _dispatch_sync_waiter_redirect_or_wake(dq
, owned
, dc
);
5159 _dispatch_queue_finalize_activation(dispatch_queue_t dq
,
5160 DISPATCH_UNUSED
bool *allow_resume
)
5162 dispatch_queue_t tq
= dq
->do_targetq
;
5163 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
5164 _dispatch_queue_inherit_wlh_from_target(dq
, tq
);
5167 DISPATCH_ALWAYS_INLINE
5168 static inline dispatch_queue_wakeup_target_t
5169 dispatch_queue_invoke2(dispatch_queue_t dq
, dispatch_invoke_context_t dic
,
5170 dispatch_invoke_flags_t flags
, uint64_t *owned
)
5172 dispatch_queue_t otq
= dq
->do_targetq
;
5173 dispatch_queue_t cq
= _dispatch_queue_get_current();
5175 if (slowpath(cq
!= otq
)) {
5178 if (dq
->dq_width
== 1) {
5179 return _dispatch_queue_serial_drain(dq
, dic
, flags
, owned
);
5181 return _dispatch_queue_concurrent_drain(dq
, dic
, flags
, owned
);
5184 // 6618342 Contact the team that owns the Instrument DTrace probe before
5185 // renaming this symbol
5188 _dispatch_queue_invoke(dispatch_queue_t dq
, dispatch_invoke_context_t dic
,
5189 dispatch_invoke_flags_t flags
)
5191 _dispatch_queue_class_invoke(dq
, dic
, flags
, 0, dispatch_queue_invoke2
);
5195 #pragma mark dispatch_queue_class_wakeup
5197 #if HAVE_PTHREAD_WORKQUEUE_QOS
5199 _dispatch_queue_override_invoke(dispatch_continuation_t dc
,
5200 dispatch_invoke_context_t dic
, dispatch_invoke_flags_t flags
)
5202 dispatch_queue_t old_rq
= _dispatch_queue_get_current();
5203 dispatch_queue_t assumed_rq
= dc
->dc_other
;
5204 dispatch_priority_t old_dp
;
5205 voucher_t ov
= DISPATCH_NO_VOUCHER
;
5206 dispatch_object_t dou
;
5208 dou
._do
= dc
->dc_data
;
5209 old_dp
= _dispatch_root_queue_identity_assume(assumed_rq
);
5210 if (dc_type(dc
) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING
)) {
5211 flags
|= DISPATCH_INVOKE_STEALING
;
5213 // balance the fake continuation push in
5214 // _dispatch_root_queue_push_override
5215 _dispatch_trace_continuation_pop(assumed_rq
, dou
._do
);
5217 _dispatch_continuation_pop_forwarded(dc
, ov
, DISPATCH_OBJ_CONSUME_BIT
, {
5218 if (_dispatch_object_has_vtable(dou
._do
)) {
5219 dx_invoke(dou
._do
, dic
, flags
);
5221 _dispatch_continuation_invoke_inline(dou
, ov
, flags
);
5224 _dispatch_reset_basepri(old_dp
);
5225 _dispatch_queue_set_current(old_rq
);
5228 DISPATCH_ALWAYS_INLINE
5230 _dispatch_root_queue_push_needs_override(dispatch_queue_t rq
,
5233 dispatch_qos_t rqos
= _dispatch_priority_qos(rq
->dq_priority
);
5234 bool defaultqueue
= rq
->dq_priority
& DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE
;
5236 if (unlikely(!rqos
)) return false;
5238 return defaultqueue
? qos
&& qos
!= rqos
: qos
> rqos
;
5241 DISPATCH_ALWAYS_INLINE
5243 _dispatch_root_queue_push_queue_override_needed(dispatch_queue_t rq
,
5246 // for root queues, the override is the guaranteed minimum override level
5247 return qos
> _dispatch_priority_override_qos(rq
->dq_priority
);
5252 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq
,
5253 dispatch_object_t dou
, dispatch_qos_t qos
)
5255 bool overcommit
= orig_rq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
;
5256 dispatch_queue_t rq
= _dispatch_get_root_queue(qos
, overcommit
);
5257 dispatch_continuation_t dc
= dou
._dc
;
5259 if (_dispatch_object_is_redirection(dc
)) {
5260 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
5262 dc
->dc_func
= (void *)orig_rq
;
5264 dc
= _dispatch_continuation_alloc();
5265 dc
->do_vtable
= DC_VTABLE(OVERRIDE_OWNING
);
5266 // fake that we queued `dou` on `orig_rq` for introspection purposes
5267 _dispatch_trace_continuation_push(orig_rq
, dou
);
5269 dc
->dc_other
= orig_rq
;
5270 dc
->dc_data
= dou
._do
;
5271 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5272 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5274 _dispatch_root_queue_push_inline(rq
, dc
, dc
, 1);
5279 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq
,
5280 dispatch_queue_t dq
, dispatch_qos_t qos
)
5282 bool overcommit
= orig_rq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
;
5283 dispatch_queue_t rq
= _dispatch_get_root_queue(qos
, overcommit
);
5284 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
5286 dc
->do_vtable
= DC_VTABLE(OVERRIDE_STEALING
);
5287 _dispatch_retain_2(dq
);
5290 dc
->dc_other
= orig_rq
;
5292 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
5293 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
5294 _dispatch_root_queue_push_inline(rq
, dc
, dc
, 1);
5299 _dispatch_queue_class_wakeup_with_override_slow(dispatch_queue_t dq
,
5300 uint64_t dq_state
, dispatch_wakeup_flags_t flags
)
5302 dispatch_qos_t oqos
, qos
= _dq_state_max_qos(dq_state
);
5303 dispatch_queue_t tq
;
5306 if (_dq_state_is_base_anon(dq_state
)) {
5307 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
5309 (void)_dispatch_wqthread_override_start_check_owner(owner
, qos
,
5310 &dq
->dq_state_lock
);
5315 tq
= dq
->do_targetq
;
5317 if (likely(!_dispatch_queue_is_legacy(dq
))) {
5319 } else if (_dispatch_is_in_root_queues_array(tq
)) {
5320 // avoid locking when we recognize the target queue as a global root
5321 // queue it is gross, but is a very common case. The locking isn't
5322 // needed because these target queues cannot go away.
5324 } else if (_dispatch_queue_sidelock_trylock(dq
, qos
)) {
5325 // <rdar://problem/17735825> to traverse the tq chain safely we must
5326 // lock it to ensure it cannot change
5328 tq
= dq
->do_targetq
;
5329 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5332 // Leading to being there, the current thread has:
5333 // 1. enqueued an object on `dq`
5334 // 2. raised the max_qos value, set RECEIVED_OVERRIDE on `dq`
5335 // and didn't see an owner
5336 // 3. tried and failed to acquire the side lock
5338 // The side lock owner can only be one of three things:
5340 // - The suspend/resume side count code. Besides being unlikely,
5341 // it means that at this moment the queue is actually suspended,
5342 // which transfers the responsibility of applying the override to
5343 // the eventual dispatch_resume().
5345 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5346 // means that the trysync it does wasn't being drained when (2)
5347 // happened which can only be explained by one of these interleavings:
5349 // o `dq` became idle between when the object queued in (1) ran and
5350 // the set_target_queue call and we were unlucky enough that our
5351 // step (2) happened while this queue was idle. There is no reason
5352 // to override anything anymore, the queue drained to completion
5353 // while we were preempted, our job is done.
5355 // o `dq` is queued but not draining during (1-2), then when we try
5356 // to lock at (3) the queue is now draining a set_target_queue.
5357 // This drainer must have seen the effects of (2) and that guy has
5358 // applied our override. Our job is done.
5360 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5361 // which is fine because trylock leaves a hint that we failed our
5362 // trylock, causing the tryunlock below to fail and reassess whether
5363 // a better override needs to be applied.
5365 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5370 if (dx_hastypeflag(tq
, QUEUE_ROOT
)) {
5371 if (_dispatch_root_queue_push_queue_override_needed(tq
, qos
)) {
5372 _dispatch_root_queue_push_override_stealer(tq
, dq
, qos
);
5374 } else if (_dispatch_queue_need_override(tq
, qos
)) {
5375 dx_wakeup(tq
, qos
, 0);
5377 while (unlikely(locked
&& !_dispatch_queue_sidelock_tryunlock(dq
))) {
5378 // rdar://problem/24081326
5380 // Another instance of _dispatch_queue_class_wakeup_with_override()
5381 // tried to acquire the side lock while we were running, and could have
5382 // had a better override than ours to apply.
5384 oqos
= _dq_state_max_qos(os_atomic_load2o(dq
, dq_state
, relaxed
));
5387 // The other instance had a better priority than ours, override
5388 // our thread, and apply the override that wasn't applied to `dq`
5395 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
5396 return _dispatch_release_2_tailcall(dq
);
5401 DISPATCH_ALWAYS_INLINE
5403 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq
,
5404 uint64_t dq_state
, dispatch_wakeup_flags_t flags
)
5406 dispatch_assert(_dq_state_should_override(dq_state
));
5408 return _dispatch_queue_class_wakeup_with_override_slow(dq
, dq_state
, flags
);
5410 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5414 _dispatch_root_queue_push(dispatch_queue_t rq
, dispatch_object_t dou
,
5417 #if DISPATCH_USE_KEVENT_WORKQUEUE
5418 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
5419 if (unlikely(ddi
&& ddi
->ddi_can_stash
)) {
5420 dispatch_object_t old_dou
= ddi
->ddi_stashed_dou
;
5421 dispatch_priority_t rq_overcommit
;
5422 rq_overcommit
= rq
->dq_priority
& DISPATCH_PRIORITY_FLAG_OVERCOMMIT
;
5424 if (likely(!old_dou
._do
|| rq_overcommit
)) {
5425 dispatch_queue_t old_rq
= ddi
->ddi_stashed_rq
;
5426 dispatch_qos_t old_qos
= ddi
->ddi_stashed_qos
;
5427 ddi
->ddi_stashed_rq
= rq
;
5428 ddi
->ddi_stashed_dou
= dou
;
5429 ddi
->ddi_stashed_qos
= qos
;
5430 _dispatch_debug("deferring item %p, rq %p, qos %d",
5432 if (rq_overcommit
) {
5433 ddi
->ddi_can_stash
= false;
5435 if (likely(!old_dou
._do
)) {
5438 // push the previously stashed item
5445 #if HAVE_PTHREAD_WORKQUEUE_QOS
5446 if (_dispatch_root_queue_push_needs_override(rq
, qos
)) {
5447 return _dispatch_root_queue_push_override(rq
, dou
, qos
);
5452 _dispatch_root_queue_push_inline(rq
, dou
, dou
, 1);
5456 _dispatch_root_queue_wakeup(dispatch_queue_t dq
,
5457 DISPATCH_UNUSED dispatch_qos_t qos
, dispatch_wakeup_flags_t flags
)
5459 if (!(flags
& DISPATCH_WAKEUP_BLOCK_WAIT
)) {
5460 DISPATCH_INTERNAL_CRASH(dq
->dq_priority
,
5461 "Don't try to wake up or override a root queue");
5463 if (flags
& DISPATCH_WAKEUP_CONSUME_2
) {
5464 return _dispatch_release_2_tailcall(dq
);
5470 _dispatch_queue_push(dispatch_queue_t dq
, dispatch_object_t dou
,
5473 _dispatch_queue_push_inline(dq
, dou
, qos
);
5478 _dispatch_queue_class_wakeup(dispatch_queue_t dq
, dispatch_qos_t qos
,
5479 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
)
5481 dispatch_assert(target
!= DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT
);
5483 if (target
&& !(flags
& DISPATCH_WAKEUP_CONSUME_2
)) {
5484 _dispatch_retain_2(dq
);
5485 flags
|= DISPATCH_WAKEUP_CONSUME_2
;
5488 if (unlikely(flags
& DISPATCH_WAKEUP_BARRIER_COMPLETE
)) {
5490 // _dispatch_queue_class_barrier_complete() is about what both regular
5491 // queues and sources needs to evaluate, but the former can have sync
5492 // handoffs to perform which _dispatch_queue_class_barrier_complete()
5493 // doesn't handle, only _dispatch_queue_barrier_complete() does.
5495 // _dispatch_queue_wakeup() is the one for plain queues that calls
5496 // _dispatch_queue_barrier_complete(), and this is only taken for non
5499 dispatch_assert(dx_metatype(dq
) != _DISPATCH_QUEUE_TYPE
);
5500 qos
= _dispatch_queue_override_qos(dq
, qos
);
5501 return _dispatch_queue_class_barrier_complete(dq
, qos
, flags
, target
,
5502 DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
);
5506 uint64_t old_state
, new_state
, enqueue
= DISPATCH_QUEUE_ENQUEUED
;
5507 if (target
== DISPATCH_QUEUE_WAKEUP_MGR
) {
5508 enqueue
= DISPATCH_QUEUE_ENQUEUED_ON_MGR
;
5510 qos
= _dispatch_queue_override_qos(dq
, qos
);
5511 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
5512 new_state
= _dq_state_merge_qos(old_state
, qos
);
5513 if (likely(!_dq_state_is_suspended(old_state
) &&
5514 !_dq_state_is_enqueued(old_state
) &&
5515 (!_dq_state_drain_locked(old_state
) ||
5516 (enqueue
!= DISPATCH_QUEUE_ENQUEUED_ON_MGR
&&
5517 _dq_state_is_base_wlh(old_state
))))) {
5518 new_state
|= enqueue
;
5520 if (flags
& DISPATCH_WAKEUP_MAKE_DIRTY
) {
5521 new_state
|= DISPATCH_QUEUE_DIRTY
;
5522 } else if (new_state
== old_state
) {
5523 os_atomic_rmw_loop_give_up(goto done
);
5527 if (likely((old_state
^ new_state
) & enqueue
)) {
5528 dispatch_queue_t tq
;
5529 if (target
== DISPATCH_QUEUE_WAKEUP_TARGET
) {
5530 // the rmw_loop above has no acquire barrier, as the last block
5531 // of a queue asyncing to that queue is not an uncommon pattern
5532 // and in that case the acquire would be completely useless
5534 // so instead use depdendency ordering to read
5535 // the targetq pointer.
5536 os_atomic_thread_fence(dependency
);
5537 tq
= os_atomic_load_with_dependency_on2o(dq
, do_targetq
,
5542 dispatch_assert(_dq_state_is_enqueued(new_state
));
5543 return _dispatch_queue_push_queue(tq
, dq
, new_state
);
5545 #if HAVE_PTHREAD_WORKQUEUE_QOS
5546 if (unlikely((old_state
^ new_state
) & DISPATCH_QUEUE_MAX_QOS_MASK
)) {
5547 if (_dq_state_should_override(new_state
)) {
5548 return _dispatch_queue_class_wakeup_with_override(dq
, new_state
,
5554 // Someone is trying to override the last work item of the queue.
5556 uint64_t old_state
, new_state
;
5557 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
5558 if (!_dq_state_drain_locked(old_state
) ||
5559 !_dq_state_is_enqueued(old_state
)) {
5560 os_atomic_rmw_loop_give_up(goto done
);
5562 new_state
= _dq_state_merge_qos(old_state
, qos
);
5563 if (new_state
== old_state
) {
5564 os_atomic_rmw_loop_give_up(goto done
);
5567 if (_dq_state_should_override(new_state
)) {
5568 return _dispatch_queue_class_wakeup_with_override(dq
, new_state
,
5571 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5574 if (likely(flags
& DISPATCH_WAKEUP_CONSUME_2
)) {
5575 return _dispatch_release_2_tailcall(dq
);
5581 _dispatch_queue_push_sync_waiter(dispatch_queue_t dq
,
5582 dispatch_sync_context_t dsc
, dispatch_qos_t qos
)
5584 uint64_t old_state
, new_state
;
5586 if (unlikely(dx_type(dq
) == DISPATCH_QUEUE_NETWORK_EVENT_TYPE
)) {
5587 DISPATCH_CLIENT_CRASH(0,
5588 "dispatch_sync onto a network event queue");
5591 _dispatch_trace_continuation_push(dq
, dsc
->_as_dc
);
5593 if (unlikely(_dispatch_queue_push_update_tail(dq
, dsc
->_as_do
))) {
5594 // for slow waiters, we borrow the reference of the caller
5595 // so we don't need to protect the wakeup with a temporary retain
5596 _dispatch_queue_push_update_head(dq
, dsc
->_as_do
);
5597 if (unlikely(_dispatch_queue_is_thread_bound(dq
))) {
5598 return dx_wakeup(dq
, qos
, DISPATCH_WAKEUP_MAKE_DIRTY
);
5601 uint64_t pending_barrier_width
=
5602 (dq
->dq_width
- 1) * DISPATCH_QUEUE_WIDTH_INTERVAL
;
5603 uint64_t set_owner_and_set_full_width_and_in_barrier
=
5604 _dispatch_lock_value_for_self() |
5605 DISPATCH_QUEUE_WIDTH_FULL_BIT
| DISPATCH_QUEUE_IN_BARRIER
;
5606 // similar to _dispatch_queue_drain_try_unlock()
5607 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
5608 new_state
= _dq_state_merge_qos(old_state
, qos
);
5609 new_state
|= DISPATCH_QUEUE_DIRTY
;
5610 if (unlikely(_dq_state_drain_locked(old_state
) ||
5611 !_dq_state_is_runnable(old_state
))) {
5612 // not runnable, so we should just handle overrides
5613 } else if (_dq_state_is_base_wlh(old_state
) &&
5614 _dq_state_is_enqueued(old_state
)) {
5615 // 32123779 let the event thread redrive since it's out already
5616 } else if (_dq_state_has_pending_barrier(old_state
) ||
5617 new_state
+ pending_barrier_width
<
5618 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
5619 // see _dispatch_queue_drain_try_lock
5620 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
5621 new_state
|= set_owner_and_set_full_width_and_in_barrier
;
5625 if (_dq_state_is_base_wlh(old_state
) &&
5626 (dsc
->dsc_waiter
== _dispatch_tid_self())) {
5627 dsc
->dsc_wlh_was_first
= true;
5630 if ((old_state
^ new_state
) & DISPATCH_QUEUE_IN_BARRIER
) {
5631 return _dispatch_queue_barrier_complete(dq
, qos
, 0);
5633 #if HAVE_PTHREAD_WORKQUEUE_QOS
5634 if (unlikely((old_state
^ new_state
) & DISPATCH_QUEUE_MAX_QOS_MASK
)) {
5635 if (_dq_state_should_override(new_state
)) {
5636 return _dispatch_queue_class_wakeup_with_override(dq
,
5640 } else if (unlikely(qos
)) {
5641 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
5642 new_state
= _dq_state_merge_qos(old_state
, qos
);
5643 if (old_state
== new_state
) {
5644 os_atomic_rmw_loop_give_up(return);
5647 if (_dq_state_should_override(new_state
)) {
5648 return _dispatch_queue_class_wakeup_with_override(dq
, new_state
, 0);
5650 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5655 #pragma mark dispatch_root_queue_drain
5659 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq
)
5661 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5662 struct dispatch_object_s
*const mediator
= (void *)~0ul;
5663 bool pending
= false, available
= true;
5664 unsigned int sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
5667 // Spin for a short while in case the contention is temporary -- e.g.
5668 // when starting up after dispatch_apply, or when executing a few
5669 // short continuations in a row.
5670 if (_dispatch_contention_wait_until(dq
->dq_items_head
!= mediator
)) {
5673 // Since we have serious contention, we need to back off.
5675 // Mark this queue as pending to avoid requests for further threads
5676 (void)os_atomic_inc2o(qc
, dgq_pending
, relaxed
);
5679 _dispatch_contention_usleep(sleep_time
);
5680 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
5682 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
5684 // The ratio of work to libdispatch overhead must be bad. This
5685 // scenario implies that there are too many threads in the pool.
5686 // Create a new pending thread and then exit this thread.
5687 // The kernel will grant a new thread when the load subsides.
5688 _dispatch_debug("contention on global queue: %p", dq
);
5692 (void)os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5695 _dispatch_global_queue_poke(dq
, 1, 0);
5700 DISPATCH_ALWAYS_INLINE
5702 _dispatch_root_queue_drain_one2(dispatch_queue_t dq
)
5704 // Wait for queue head and tail to be both non-empty or both empty
5705 bool available
; // <rdar://problem/15917893>
5706 _dispatch_wait_until((dq
->dq_items_head
!= NULL
) ==
5707 (available
= (dq
->dq_items_tail
!= NULL
)));
5711 DISPATCH_ALWAYS_INLINE_NDEBUG
5712 static inline struct dispatch_object_s
*
5713 _dispatch_root_queue_drain_one(dispatch_queue_t dq
)
5715 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
5718 // The mediator value acts both as a "lock" and a signal
5719 head
= os_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
5721 if (slowpath(head
== NULL
)) {
5722 // The first xchg on the tail will tell the enqueueing thread that it
5723 // is safe to blindly write out to the head pointer. A cmpxchg honors
5725 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
,
5729 if (slowpath(dq
->dq_items_tail
) && // <rdar://problem/14416349>
5730 _dispatch_root_queue_drain_one2(dq
)) {
5733 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
5737 if (slowpath(head
== mediator
)) {
5738 // This thread lost the race for ownership of the queue.
5739 if (fastpath(_dispatch_root_queue_drain_one_slow(dq
))) {
5745 // Restore the head pointer to a sane value before returning.
5746 // If 'next' is NULL, then this item _might_ be the last item.
5747 next
= fastpath(head
->do_next
);
5749 if (slowpath(!next
)) {
5750 os_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
5751 // 22708742: set tail to NULL with release, so that NULL write to head
5752 // above doesn't clobber head from concurrent enqueuer
5753 if (os_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, release
)) {
5754 // both head and tail are NULL now
5757 // There must be a next item now.
5758 next
= os_mpsc_get_next(head
, do_next
);
5761 os_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
5762 _dispatch_global_queue_poke(dq
, 1, 0);
5767 #if DISPATCH_USE_KEVENT_WORKQUEUE
5769 _dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi
5770 DISPATCH_PERF_MON_ARGS_PROTO
)
5772 dispatch_queue_t rq
= ddi
->ddi_stashed_rq
;
5773 dispatch_queue_t dq
= ddi
->ddi_stashed_dou
._dq
;
5774 _dispatch_queue_set_current(rq
);
5775 dispatch_priority_t old_pri
= _dispatch_set_basepri_wlh(rq
->dq_priority
);
5776 dispatch_invoke_context_s dic
= { };
5777 dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_WORKER_DRAIN
|
5778 DISPATCH_INVOKE_REDIRECTING_DRAIN
| DISPATCH_INVOKE_WLH
;
5779 _dispatch_queue_drain_init_narrowing_check_deadline(&dic
, rq
->dq_priority
);
5782 ddi
->ddi_wlh_servicing
= true;
5783 if (unlikely(_dispatch_needs_to_return_to_kernel())) {
5784 _dispatch_return_to_kernel();
5787 dispatch_assert(ddi
->ddi_wlh_needs_delete
);
5788 _dispatch_trace_continuation_pop(rq
, dq
);
5790 if (_dispatch_queue_drain_try_lock_wlh(dq
, &dq_state
)) {
5791 dx_invoke(dq
, &dic
, flags
);
5792 if (!ddi
->ddi_wlh_needs_delete
) {
5795 dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
5796 if (unlikely(!_dq_state_is_base_wlh(dq_state
))) { // rdar://32671286
5799 if (unlikely(_dq_state_is_enqueued_on_target(dq_state
))) {
5800 _dispatch_retain(dq
);
5801 _dispatch_trace_continuation_push(dq
->do_targetq
, dq
);
5805 if (_dq_state_is_suspended(dq_state
)) {
5806 dispatch_assert(!_dq_state_is_enqueued(dq_state
));
5807 _dispatch_release_2_no_dispose(dq
);
5809 dispatch_assert(_dq_state_is_enqueued(dq_state
));
5810 dispatch_assert(_dq_state_drain_locked(dq_state
));
5811 _dispatch_release_no_dispose(dq
);
5815 _dispatch_event_loop_leave_deferred((dispatch_wlh_t
)dq
, dq_state
);
5818 // event thread that could steal
5819 _dispatch_perfmon_end(perfmon_thread_event_steal
);
5820 _dispatch_reset_basepri(old_pri
);
5821 _dispatch_reset_basepri_override();
5822 _dispatch_queue_set_current(NULL
);
5824 _dispatch_voucher_debug("root queue clear", NULL
);
5825 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5829 _dispatch_root_queue_drain_deferred_item(dispatch_deferred_items_t ddi
5830 DISPATCH_PERF_MON_ARGS_PROTO
)
5832 dispatch_queue_t rq
= ddi
->ddi_stashed_rq
;
5833 _dispatch_queue_set_current(rq
);
5834 dispatch_priority_t old_pri
= _dispatch_set_basepri(rq
->dq_priority
);
5836 dispatch_invoke_context_s dic
= { };
5837 dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_WORKER_DRAIN
|
5838 DISPATCH_INVOKE_REDIRECTING_DRAIN
;
5839 #if DISPATCH_COCOA_COMPAT
5840 _dispatch_last_resort_autorelease_pool_push(&dic
);
5841 #endif // DISPATCH_COCOA_COMPAT
5842 _dispatch_queue_drain_init_narrowing_check_deadline(&dic
, rq
->dq_priority
);
5843 _dispatch_continuation_pop_inline(ddi
->ddi_stashed_dou
, &dic
, flags
, rq
);
5845 // event thread that could steal
5846 _dispatch_perfmon_end(perfmon_thread_event_steal
);
5847 #if DISPATCH_COCOA_COMPAT
5848 _dispatch_last_resort_autorelease_pool_pop(&dic
);
5849 #endif // DISPATCH_COCOA_COMPAT
5850 _dispatch_reset_basepri(old_pri
);
5851 _dispatch_reset_basepri_override();
5852 _dispatch_queue_set_current(NULL
);
5854 _dispatch_voucher_debug("root queue clear", NULL
);
5855 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5859 DISPATCH_NOT_TAIL_CALLED
// prevent tailcall (for Instrument DTrace probe)
5861 _dispatch_root_queue_drain(dispatch_queue_t dq
, pthread_priority_t pp
)
5864 dispatch_queue_t cq
;
5865 if (slowpath(cq
= _dispatch_queue_get_current())) {
5866 DISPATCH_INTERNAL_CRASH(cq
, "Premature thread recycling");
5869 _dispatch_queue_set_current(dq
);
5870 dispatch_priority_t pri
= dq
->dq_priority
;
5871 if (!pri
) pri
= _dispatch_priority_from_pp(pp
);
5872 dispatch_priority_t old_dbp
= _dispatch_set_basepri(pri
);
5873 _dispatch_adopt_wlh_anon();
5875 struct dispatch_object_s
*item
;
5877 dispatch_invoke_context_s dic
= { };
5878 #if DISPATCH_COCOA_COMPAT
5879 _dispatch_last_resort_autorelease_pool_push(&dic
);
5880 #endif // DISPATCH_COCOA_COMPAT
5881 dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_WORKER_DRAIN
|
5882 DISPATCH_INVOKE_REDIRECTING_DRAIN
;
5883 _dispatch_queue_drain_init_narrowing_check_deadline(&dic
, pri
);
5884 _dispatch_perfmon_start();
5885 while ((item
= fastpath(_dispatch_root_queue_drain_one(dq
)))) {
5886 if (reset
) _dispatch_wqthread_override_reset();
5887 _dispatch_continuation_pop_inline(item
, &dic
, flags
, dq
);
5888 reset
= _dispatch_reset_basepri_override();
5889 if (unlikely(_dispatch_queue_drain_should_narrow(&dic
))) {
5894 // overcommit or not. worker thread
5895 if (pri
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
5896 _dispatch_perfmon_end(perfmon_thread_worker_oc
);
5898 _dispatch_perfmon_end(perfmon_thread_worker_non_oc
);
5901 #if DISPATCH_COCOA_COMPAT
5902 _dispatch_last_resort_autorelease_pool_pop(&dic
);
5903 #endif // DISPATCH_COCOA_COMPAT
5904 _dispatch_reset_wlh();
5905 _dispatch_reset_basepri(old_dbp
);
5906 _dispatch_reset_basepri_override();
5907 _dispatch_queue_set_current(NULL
);
5911 #pragma mark dispatch_worker_thread
5913 #if HAVE_PTHREAD_WORKQUEUES
5915 _dispatch_worker_thread4(void *context
)
5917 dispatch_queue_t dq
= context
;
5918 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5920 _dispatch_introspection_thread_add();
5921 int pending
= os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5922 dispatch_assert(pending
>= 0);
5923 _dispatch_root_queue_drain(dq
, _dispatch_get_priority());
5924 _dispatch_voucher_debug("root queue clear", NULL
);
5925 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5928 #if HAVE_PTHREAD_WORKQUEUE_QOS
5930 _dispatch_worker_thread3(pthread_priority_t pp
)
5932 bool overcommit
= pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
5933 dispatch_queue_t dq
;
5934 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
5935 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
5936 dq
= _dispatch_get_root_queue(_dispatch_qos_from_pp(pp
), overcommit
);
5937 return _dispatch_worker_thread4(dq
);
5939 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5941 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5942 // 6618342 Contact the team that owns the Instrument DTrace probe before
5943 // renaming this symbol
5945 _dispatch_worker_thread2(int priority
, int options
,
5946 void *context DISPATCH_UNUSED
)
5948 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
5949 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
5950 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
5952 return _dispatch_worker_thread4(dq
);
5954 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5955 #endif // HAVE_PTHREAD_WORKQUEUES
5957 #if DISPATCH_USE_PTHREAD_POOL
5958 // 6618342 Contact the team that owns the Instrument DTrace probe before
5959 // renaming this symbol
5961 _dispatch_worker_thread(void *context
)
5963 dispatch_queue_t dq
= context
;
5964 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5965 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
5967 int pending
= os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5968 if (unlikely(pending
< 0)) {
5969 DISPATCH_INTERNAL_CRASH(pending
, "Pending thread request underflow");
5972 if (pqc
->dpq_observer_hooks
.queue_will_execute
) {
5973 _dispatch_set_pthread_root_queue_observer_hooks(
5974 &pqc
->dpq_observer_hooks
);
5976 if (pqc
->dpq_thread_configure
) {
5977 pqc
->dpq_thread_configure();
5980 // workaround tweaks the kernel workqueue does for us
5981 _dispatch_sigmask();
5982 _dispatch_introspection_thread_add();
5984 #if DISPATCH_USE_INTERNAL_WORKQUEUE
5985 bool overcommit
= (qc
->dgq_wq_options
& WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
5986 bool manager
= (dq
== &_dispatch_mgr_root_queue
);
5987 bool monitored
= !(overcommit
|| manager
);
5989 _dispatch_workq_worker_register(dq
, qc
->dgq_qos
);
5993 const int64_t timeout
= 5ull * NSEC_PER_SEC
;
5994 pthread_priority_t old_pri
= _dispatch_get_priority();
5996 _dispatch_root_queue_drain(dq
, old_pri
);
5997 _dispatch_reset_priority_and_voucher(old_pri
, NULL
);
5998 } while (dispatch_semaphore_wait(&pqc
->dpq_thread_mediator
,
5999 dispatch_time(0, timeout
)) == 0);
6001 #if DISPATCH_USE_INTERNAL_WORKQUEUE
6003 _dispatch_workq_worker_unregister(dq
, qc
->dgq_qos
);
6006 (void)os_atomic_inc2o(qc
, dgq_thread_pool_size
, release
);
6007 _dispatch_global_queue_poke(dq
, 1, 0);
6008 _dispatch_release(dq
); // retained in _dispatch_global_queue_poke_slow
6011 #endif // DISPATCH_USE_PTHREAD_POOL
6014 #pragma mark dispatch_network_root_queue
6018 _dispatch_network_root_queue_create_4NW(const char *label
,
6019 const pthread_attr_t
*attrs
, dispatch_block_t configure
)
6021 unsigned long flags
= dispatch_pthread_root_queue_flags_pool_size(1);
6022 return dispatch_pthread_root_queue_create(label
, flags
, attrs
, configure
);
6025 #endif // TARGET_OS_MAC
6027 #pragma mark dispatch_runloop_queue
6029 static bool _dispatch_program_is_probably_callback_driven
;
6031 #if DISPATCH_COCOA_COMPAT
6034 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
6036 dispatch_queue_t dq
;
6039 if (slowpath(flags
)) {
6040 return DISPATCH_BAD_INPUT
;
6042 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
6043 dq
= _dispatch_object_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
6044 _dispatch_queue_init(dq
, DQF_THREAD_BOUND
| DQF_CANNOT_TRYSYNC
, 1,
6045 DISPATCH_QUEUE_ROLE_BASE_ANON
);
6046 dq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, true);
6047 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
6048 _dispatch_runloop_queue_handle_init(dq
);
6049 _dispatch_queue_set_bound_thread(dq
);
6050 _dispatch_object_debug(dq
, "%s", __func__
);
6051 return _dispatch_introspection_queue_create(dq
);
6055 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
6057 _dispatch_object_debug(dq
, "%s", __func__
);
6059 dispatch_qos_t qos
= _dispatch_runloop_queue_reset_max_qos(dq
);
6060 _dispatch_queue_clear_bound_thread(dq
);
6061 dx_wakeup(dq
, qos
, DISPATCH_WAKEUP_MAKE_DIRTY
);
6062 if (qos
) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq
), dq
);
6066 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
, bool *allow_free
)
6068 _dispatch_object_debug(dq
, "%s", __func__
);
6069 _dispatch_introspection_queue_dispose(dq
);
6070 _dispatch_runloop_queue_handle_dispose(dq
);
6071 _dispatch_queue_destroy(dq
, allow_free
);
6075 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
6077 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
6078 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
6080 dispatch_retain(dq
);
6081 bool r
= _dispatch_runloop_queue_drain_one(dq
);
6082 dispatch_release(dq
);
6087 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
6089 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
6090 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
6092 _dispatch_runloop_queue_wakeup(dq
, 0, false);
6096 dispatch_runloop_handle_t
6097 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
6099 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
6100 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
6102 return _dispatch_runloop_queue_get_handle(dq
);
6107 _dispatch_runloop_queue_handle_init(void *ctxt
)
6109 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
6110 dispatch_runloop_handle_t handle
;
6112 _dispatch_fork_becomes_unsafe();
6117 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
6118 DISPATCH_VERIFY_MIG(kr
);
6119 (void)dispatch_assume_zero(kr
);
6120 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
6121 MACH_MSG_TYPE_MAKE_SEND
);
6122 DISPATCH_VERIFY_MIG(kr
);
6123 (void)dispatch_assume_zero(kr
);
6124 if (dq
!= &_dispatch_main_q
) {
6125 struct mach_port_limits limits
= {
6128 kr
= mach_port_set_attributes(mach_task_self(), mp
,
6129 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
6131 DISPATCH_VERIFY_MIG(kr
);
6132 (void)dispatch_assume_zero(kr
);
6135 #elif defined(__linux__)
6136 int fd
= eventfd(0, EFD_CLOEXEC
| EFD_NONBLOCK
);
6141 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
6142 "process is out of file descriptors");
6145 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
6146 "system is out of file descriptors");
6149 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
6150 "kernel is out of memory");
6153 DISPATCH_INTERNAL_CRASH(err
, "eventfd() failure");
6159 #error "runloop support not implemented on this platform"
6161 _dispatch_runloop_queue_set_handle(dq
, handle
);
6163 _dispatch_program_is_probably_callback_driven
= true;
6167 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
)
6169 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
6170 if (!_dispatch_runloop_handle_is_valid(handle
)) {
6175 mach_port_t mp
= handle
;
6176 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
6177 DISPATCH_VERIFY_MIG(kr
);
6178 (void)dispatch_assume_zero(kr
);
6179 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
6180 DISPATCH_VERIFY_MIG(kr
);
6181 (void)dispatch_assume_zero(kr
);
6182 #elif defined(__linux__)
6183 int rc
= close(handle
);
6184 (void)dispatch_assume_zero(rc
);
6186 #error "runloop support not implemented on this platform"
6191 #pragma mark dispatch_main_queue
6193 dispatch_runloop_handle_t
6194 _dispatch_get_main_queue_handle_4CF(void)
6196 dispatch_queue_t dq
= &_dispatch_main_q
;
6197 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
6198 _dispatch_runloop_queue_handle_init
);
6199 return _dispatch_runloop_queue_get_handle(dq
);
6203 dispatch_runloop_handle_t
6204 _dispatch_get_main_queue_port_4CF(void)
6206 return _dispatch_get_main_queue_handle_4CF();
6210 static bool main_q_is_draining
;
6212 // 6618342 Contact the team that owns the Instrument DTrace probe before
6213 // renaming this symbol
6216 _dispatch_queue_set_mainq_drain_state(bool arg
)
6218 main_q_is_draining
= arg
;
6222 _dispatch_main_queue_callback_4CF(
6223 void *ignored DISPATCH_UNUSED
)
6225 if (main_q_is_draining
) {
6228 _dispatch_queue_set_mainq_drain_state(true);
6229 _dispatch_main_queue_drain();
6230 _dispatch_queue_set_mainq_drain_state(false);
6238 _dispatch_root_queues_init();
6239 #if HAVE_PTHREAD_MAIN_NP
6240 if (pthread_main_np()) {
6242 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
6243 _dispatch_program_is_probably_callback_driven
= true;
6244 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE
);
6246 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
6247 // To avoid that, just before calling pthread_exit we register a TSD destructor
6248 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
6249 // This relies on an implementation detail (currently true in glibc) that TSD destructors
6250 // will be called in the order of creation to cause all the TSD cleanup functions to
6251 // run before the thread becomes trapped in sigsuspend.
6252 pthread_key_t dispatch_main_key
;
6253 pthread_key_create(&dispatch_main_key
, _dispatch_sig_thread
);
6254 pthread_setspecific(dispatch_main_key
, &dispatch_main_key
);
6255 _dispatch_sigmask();
6258 DISPATCH_INTERNAL_CRASH(errno
, "pthread_exit() returned");
6259 #if HAVE_PTHREAD_MAIN_NP
6261 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
6265 DISPATCH_NOINLINE DISPATCH_NORETURN
6267 _dispatch_sigsuspend(void)
6269 static const sigset_t mask
;
6278 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
6280 // never returns, so burn bridges behind us
6281 _dispatch_clear_stack(0);
6282 _dispatch_sigsuspend();
6287 _dispatch_queue_cleanup2(void)
6289 dispatch_queue_t dq
= &_dispatch_main_q
;
6290 uint64_t old_state
, new_state
;
6292 // Turning the main queue from a runloop queue into an ordinary serial queue
6293 // is a 3 steps operation:
6294 // 1. finish taking the main queue lock the usual way
6295 // 2. clear the THREAD_BOUND flag
6298 // If an enqueuer executes concurrently, he may do the wakeup the runloop
6299 // way, because he still believes the queue to be thread-bound, but the
6300 // dirty bit will force this codepath to notice the enqueue, and the usual
6301 // lock transfer will do the proper wakeup.
6302 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, acquire
, {
6303 new_state
= old_state
& ~DISPATCH_QUEUE_DIRTY
;
6304 new_state
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
6305 new_state
+= DISPATCH_QUEUE_IN_BARRIER
;
6307 _dispatch_queue_atomic_flags_clear(dq
, DQF_THREAD_BOUND
|DQF_CANNOT_TRYSYNC
);
6308 _dispatch_queue_barrier_complete(dq
, 0, 0);
6310 // overload the "probably" variable to mean that dispatch_main() or
6311 // similar non-POSIX API was called
6312 // this has to run before the DISPATCH_COCOA_COMPAT below
6313 // See dispatch_main for call to _dispatch_sig_thread on linux.
6315 if (_dispatch_program_is_probably_callback_driven
) {
6316 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
6317 DISPATCH_QOS_DEFAULT
, true), NULL
, _dispatch_sig_thread
);
6318 sleep(1); // workaround 6778970
6322 #if DISPATCH_COCOA_COMPAT
6323 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
6324 _dispatch_runloop_queue_handle_init
);
6325 _dispatch_runloop_queue_handle_dispose(dq
);
6330 _dispatch_queue_cleanup(void *ctxt
)
6332 if (ctxt
== &_dispatch_main_q
) {
6333 return _dispatch_queue_cleanup2();
6335 // POSIX defines that destructors are only called if 'ctxt' is non-null
6336 DISPATCH_INTERNAL_CRASH(ctxt
,
6337 "Premature thread exit while a dispatch queue is running");
6341 _dispatch_wlh_cleanup(void *ctxt
)
6343 // POSIX defines that destructors are only called if 'ctxt' is non-null
6344 dispatch_queue_t wlh
;
6345 wlh
= (dispatch_queue_t
)((uintptr_t)ctxt
& ~DISPATCH_WLH_STORAGE_REF
);
6346 _dispatch_queue_release_storage(wlh
);
6351 _dispatch_deferred_items_cleanup(void *ctxt
)
6353 // POSIX defines that destructors are only called if 'ctxt' is non-null
6354 DISPATCH_INTERNAL_CRASH(ctxt
,
6355 "Premature thread exit with unhandled deferred items");
6360 _dispatch_frame_cleanup(void *ctxt
)
6362 // POSIX defines that destructors are only called if 'ctxt' is non-null
6363 DISPATCH_INTERNAL_CRASH(ctxt
,
6364 "Premature thread exit while a dispatch frame is active");
6369 _dispatch_context_cleanup(void *ctxt
)
6371 // POSIX defines that destructors are only called if 'ctxt' is non-null
6372 DISPATCH_INTERNAL_CRASH(ctxt
,
6373 "Premature thread exit while a dispatch context is set");