2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
51 static void _dispatch_sig_thread(void *ctxt
);
52 static void _dispatch_cache_cleanup(void *value
);
53 static void _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
,
54 dispatch_function_t func
, pthread_priority_t pp
);
55 static void _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
);
56 static void _dispatch_queue_cleanup(void *ctxt
);
57 static void _dispatch_deferred_items_cleanup(void *ctxt
);
58 static void _dispatch_frame_cleanup(void *ctxt
);
59 static void _dispatch_context_cleanup(void *ctxt
);
60 static void _dispatch_non_barrier_complete(dispatch_queue_t dq
);
61 static inline void _dispatch_global_queue_poke(dispatch_queue_t dq
);
62 #if HAVE_PTHREAD_WORKQUEUES
63 static void _dispatch_worker_thread4(void *context
);
64 #if HAVE_PTHREAD_WORKQUEUE_QOS
65 static void _dispatch_worker_thread3(pthread_priority_t priority
);
67 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
68 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
71 #if DISPATCH_USE_PTHREAD_POOL
72 static void *_dispatch_worker_thread(void *context
);
73 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
76 #if DISPATCH_COCOA_COMPAT
77 static dispatch_once_t _dispatch_main_q_handle_pred
;
78 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq
,
79 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
);
80 static void _dispatch_runloop_queue_handle_init(void *ctxt
);
81 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
);
84 static void _dispatch_root_queues_init_once(void *context
);
85 static dispatch_once_t _dispatch_root_queues_pred
;
88 #pragma mark dispatch_root_queue
90 struct dispatch_pthread_root_queue_context_s
{
91 pthread_attr_t dpq_thread_attr
;
92 dispatch_block_t dpq_thread_configure
;
93 struct dispatch_semaphore_s dpq_thread_mediator
;
94 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks
;
96 typedef struct dispatch_pthread_root_queue_context_s
*
97 dispatch_pthread_root_queue_context_t
;
99 #if DISPATCH_ENABLE_THREAD_POOL
100 static struct dispatch_pthread_root_queue_context_s
101 _dispatch_pthread_root_queue_contexts
[] = {
102 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {
103 .dpq_thread_mediator
= {
104 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
106 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {
107 .dpq_thread_mediator
= {
108 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {
111 .dpq_thread_mediator
= {
112 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
114 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {
115 .dpq_thread_mediator
= {
116 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
118 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {
119 .dpq_thread_mediator
= {
120 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
122 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {
123 .dpq_thread_mediator
= {
124 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
126 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {
127 .dpq_thread_mediator
= {
128 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
130 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {
131 .dpq_thread_mediator
= {
132 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
134 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {
135 .dpq_thread_mediator
= {
136 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
138 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {
139 .dpq_thread_mediator
= {
140 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
142 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {
143 .dpq_thread_mediator
= {
144 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
146 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {
147 .dpq_thread_mediator
= {
148 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
153 #define MAX_PTHREAD_COUNT 255
155 struct dispatch_root_queue_context_s
{
158 unsigned int volatile dgq_pending
;
159 #if HAVE_PTHREAD_WORKQUEUES
161 int dgq_wq_priority
, dgq_wq_options
;
162 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163 pthread_workqueue_t dgq_kworkqueue
;
165 #endif // HAVE_PTHREAD_WORKQUEUES
166 #if DISPATCH_USE_PTHREAD_POOL
168 uint32_t volatile dgq_thread_pool_size
;
171 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
174 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
176 #define WORKQ_PRIO_INVALID (-1)
177 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
178 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
180 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
181 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
184 DISPATCH_CACHELINE_ALIGN
185 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
186 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {{{
187 #if HAVE_PTHREAD_WORKQUEUES
188 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
189 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
192 #if DISPATCH_ENABLE_THREAD_POOL
193 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
194 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
],
197 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {{{
198 #if HAVE_PTHREAD_WORKQUEUES
199 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
200 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
201 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
203 #if DISPATCH_ENABLE_THREAD_POOL
204 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
205 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
],
208 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {{{
209 #if HAVE_PTHREAD_WORKQUEUES
210 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
211 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
214 #if DISPATCH_ENABLE_THREAD_POOL
215 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
216 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
219 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {{{
220 #if HAVE_PTHREAD_WORKQUEUES
221 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
222 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
223 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
225 #if DISPATCH_ENABLE_THREAD_POOL
226 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
227 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
230 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {{{
231 #if HAVE_PTHREAD_WORKQUEUES
232 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
233 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
236 #if DISPATCH_ENABLE_THREAD_POOL
237 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
238 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
241 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {{{
242 #if HAVE_PTHREAD_WORKQUEUES
243 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
244 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
245 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
247 #if DISPATCH_ENABLE_THREAD_POOL
248 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
249 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
252 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {{{
253 #if HAVE_PTHREAD_WORKQUEUES
254 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
255 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
258 #if DISPATCH_ENABLE_THREAD_POOL
259 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {{{
264 #if HAVE_PTHREAD_WORKQUEUES
265 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
266 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
267 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
269 #if DISPATCH_ENABLE_THREAD_POOL
270 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
271 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
274 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {{{
275 #if HAVE_PTHREAD_WORKQUEUES
276 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
277 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
280 #if DISPATCH_ENABLE_THREAD_POOL
281 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
282 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
285 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {{{
286 #if HAVE_PTHREAD_WORKQUEUES
287 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
288 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
289 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
293 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
296 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {{{
297 #if HAVE_PTHREAD_WORKQUEUES
298 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
299 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
302 #if DISPATCH_ENABLE_THREAD_POOL
303 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
304 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
],
307 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {{{
308 #if HAVE_PTHREAD_WORKQUEUES
309 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
310 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
311 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
313 #if DISPATCH_ENABLE_THREAD_POOL
314 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
315 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
],
320 // 6618342 Contact the team that owns the Instrument DTrace probe before
321 // renaming this symbol
322 DISPATCH_CACHELINE_ALIGN
323 struct dispatch_queue_s _dispatch_root_queues
[] = {
324 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
325 [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
326 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
327 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
328 .do_ctxt = &_dispatch_root_queue_contexts[ \
329 DISPATCH_ROOT_QUEUE_IDX_##n], \
330 .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
331 .dq_override_voucher = DISPATCH_NO_VOUCHER, \
332 .dq_override = DISPATCH_SATURATED_OVERRIDE, \
335 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS
,
336 .dq_label
= "com.apple.root.maintenance-qos",
339 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT
,
340 .dq_label
= "com.apple.root.maintenance-qos.overcommit",
343 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS
,
344 .dq_label
= "com.apple.root.background-qos",
347 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT
,
348 .dq_label
= "com.apple.root.background-qos.overcommit",
351 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS
,
352 .dq_label
= "com.apple.root.utility-qos",
355 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT
,
356 .dq_label
= "com.apple.root.utility-qos.overcommit",
359 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS
,
360 .dq_label
= "com.apple.root.default-qos",
363 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT
,
364 .dq_label
= "com.apple.root.default-qos.overcommit",
367 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS
,
368 .dq_label
= "com.apple.root.user-initiated-qos",
371 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT
,
372 .dq_label
= "com.apple.root.user-initiated-qos.overcommit",
375 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS
,
376 .dq_label
= "com.apple.root.user-interactive-qos",
379 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT
,
380 .dq_label
= "com.apple.root.user-interactive-qos.overcommit",
385 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
386 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
387 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
389 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
390 &_dispatch_root_queues
[
391 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
392 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
393 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
394 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
395 &_dispatch_root_queues
[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
397 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
398 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
399 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
400 &_dispatch_root_queues
[
401 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
402 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
403 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
404 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
405 &_dispatch_root_queues
[
406 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
408 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
410 #define DISPATCH_PRIORITY_COUNT 5
413 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
414 // maintenance priority
415 DISPATCH_PRIORITY_IDX_BACKGROUND
= 0,
416 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
,
417 DISPATCH_PRIORITY_IDX_LOW
,
418 DISPATCH_PRIORITY_IDX_DEFAULT
,
419 DISPATCH_PRIORITY_IDX_HIGH
,
422 static qos_class_t _dispatch_priority2qos
[] = {
423 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = _DISPATCH_QOS_CLASS_BACKGROUND
,
424 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = _DISPATCH_QOS_CLASS_UTILITY
,
425 [DISPATCH_PRIORITY_IDX_LOW
] = _DISPATCH_QOS_CLASS_UTILITY
,
426 [DISPATCH_PRIORITY_IDX_DEFAULT
] = _DISPATCH_QOS_CLASS_DEFAULT
,
427 [DISPATCH_PRIORITY_IDX_HIGH
] = _DISPATCH_QOS_CLASS_USER_INITIATED
,
430 #if HAVE_PTHREAD_WORKQUEUE_QOS
431 static const int _dispatch_priority2wq
[] = {
432 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = WORKQ_BG_PRIOQUEUE
,
433 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = WORKQ_NON_INTERACTIVE_PRIOQUEUE
,
434 [DISPATCH_PRIORITY_IDX_LOW
] = WORKQ_LOW_PRIOQUEUE
,
435 [DISPATCH_PRIORITY_IDX_DEFAULT
] = WORKQ_DEFAULT_PRIOQUEUE
,
436 [DISPATCH_PRIORITY_IDX_HIGH
] = WORKQ_HIGH_PRIOQUEUE
,
440 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
441 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
443 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
444 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
447 // 6618342 Contact the team that owns the Instrument DTrace probe before
448 // renaming this symbol
449 DISPATCH_CACHELINE_ALIGN
450 struct dispatch_queue_s _dispatch_mgr_q
= {
451 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr
),
452 .dq_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(1),
453 .do_targetq
= &_dispatch_mgr_root_queue
,
454 .dq_label
= "com.apple.libdispatch-manager",
456 .dq_override_voucher
= DISPATCH_NO_VOUCHER
,
457 .dq_override
= DISPATCH_SATURATED_OVERRIDE
,
462 dispatch_get_global_queue(long priority
, unsigned long flags
)
464 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
465 return DISPATCH_BAD_INPUT
;
467 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
468 _dispatch_root_queues_init_once
);
471 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
472 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
473 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
]
475 // map maintenance to background on old kernel
476 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
478 qos
= (qos_class_t
)priority
;
481 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
482 case DISPATCH_QUEUE_PRIORITY_BACKGROUND
:
483 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
485 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE
:
486 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
];
488 case DISPATCH_QUEUE_PRIORITY_LOW
:
489 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_LOW
];
491 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
492 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_DEFAULT
];
494 case DISPATCH_QUEUE_PRIORITY_HIGH
:
495 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
497 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
498 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
499 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
]
501 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
507 qos
= (qos_class_t
)priority
;
510 return _dispatch_get_root_queue(qos
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
513 DISPATCH_ALWAYS_INLINE
514 static inline dispatch_queue_t
515 _dispatch_get_current_queue(void)
517 return _dispatch_queue_get_current() ?:
518 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
522 dispatch_get_current_queue(void)
524 return _dispatch_get_current_queue();
527 DISPATCH_NOINLINE DISPATCH_NORETURN
529 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
531 _dispatch_client_assert_fail(
532 "Block was %sexpected to execute on queue [%s]",
533 expected
? "" : "not ", dq
->dq_label
?: "");
536 DISPATCH_NOINLINE DISPATCH_NORETURN
538 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq
)
540 _dispatch_client_assert_fail(
541 "Block was expected to act as a barrier on queue [%s]",
546 dispatch_assert_queue(dispatch_queue_t dq
)
548 unsigned long metatype
= dx_metatype(dq
);
549 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
550 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
551 "dispatch_assert_queue()");
553 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
554 if (unlikely(_dq_state_drain_pended(dq_state
))) {
557 if (likely(_dq_state_drain_owner(dq_state
) == _dispatch_tid_self())) {
560 if (likely(dq
->dq_width
> 1)) {
561 // we can look at the width: if it is changing while we read it,
562 // it means that a barrier is running on `dq` concurrently, which
563 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
564 if (fastpath(_dispatch_thread_frame_find_queue(dq
))) {
569 _dispatch_assert_queue_fail(dq
, true);
573 dispatch_assert_queue_not(dispatch_queue_t dq
)
575 unsigned long metatype
= dx_metatype(dq
);
576 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
577 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
578 "dispatch_assert_queue_not()");
580 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
581 if (_dq_state_drain_pended(dq_state
)) {
584 if (likely(_dq_state_drain_owner(dq_state
) != _dispatch_tid_self())) {
585 if (likely(dq
->dq_width
== 1)) {
586 // we can look at the width: if it is changing while we read it,
587 // it means that a barrier is running on `dq` concurrently, which
588 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
591 if (likely(!_dispatch_thread_frame_find_queue(dq
))) {
595 _dispatch_assert_queue_fail(dq
, false);
599 dispatch_assert_queue_barrier(dispatch_queue_t dq
)
601 dispatch_assert_queue(dq
);
603 if (likely(dq
->dq_width
== 1)) {
607 if (likely(dq
->do_targetq
)) {
608 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
609 if (likely(_dq_state_is_in_barrier(dq_state
))) {
614 _dispatch_assert_queue_barrier_fail(dq
);
617 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
618 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
619 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
621 #define _dispatch_root_queue_debug(...)
622 #define _dispatch_debug_root_queue(...)
626 #pragma mark dispatch_init
628 #if HAVE_PTHREAD_WORKQUEUE_QOS
629 pthread_priority_t _dispatch_background_priority
;
630 pthread_priority_t _dispatch_user_initiated_priority
;
633 _dispatch_root_queues_init_qos(int supported
)
635 pthread_priority_t p
;
638 for (i
= 0; i
< DISPATCH_PRIORITY_COUNT
; i
++) {
639 p
= _pthread_qos_class_encode_workqueue(_dispatch_priority2wq
[i
], 0);
640 qos
= _pthread_qos_class_decode(p
, NULL
, NULL
);
641 dispatch_assert(qos
!= _DISPATCH_QOS_CLASS_UNSPECIFIED
);
642 _dispatch_priority2qos
[i
] = qos
;
644 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
645 qos
= _dispatch_root_queue_contexts
[i
].dgq_qos
;
646 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
647 !(supported
& WORKQ_FEATURE_MAINTENANCE
)) {
650 unsigned long flags
= i
& 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
: 0;
651 flags
|= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG
;
652 if (i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
||
653 i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
) {
654 flags
|= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
656 p
= _pthread_qos_class_encode(qos
, 0, flags
);
657 _dispatch_root_queues
[i
].dq_priority
= (dispatch_priority_t
)p
;
660 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
663 _dispatch_root_queues_init_workq(int *wq_supported
)
668 #if HAVE_PTHREAD_WORKQUEUES
669 bool disable_wq
= false;
670 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
671 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
673 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
674 bool disable_qos
= false;
676 disable_qos
= slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
678 #if DISPATCH_USE_KEVENT_WORKQUEUE
679 bool disable_kevent_wq
= false;
681 disable_kevent_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
684 if (!disable_wq
&& !disable_qos
) {
685 *wq_supported
= _pthread_workqueue_supported();
686 #if DISPATCH_USE_KEVENT_WORKQUEUE
687 if (!disable_kevent_wq
&& (*wq_supported
& WORKQ_FEATURE_KEVENT
)) {
688 r
= _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3
,
689 (pthread_workqueue_function_kevent_t
)
690 _dispatch_kevent_worker_thread
,
691 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
692 #if DISPATCH_USE_MGR_THREAD
693 _dispatch_kevent_workqueue_enabled
= !r
;
695 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
696 _dispatch_evfilt_machport_direct_enabled
= !r
;
701 if (*wq_supported
& WORKQ_FEATURE_FINEPRIO
) {
702 #if DISPATCH_USE_MGR_THREAD
703 r
= _pthread_workqueue_init(_dispatch_worker_thread3
,
704 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
708 if (result
) _dispatch_root_queues_init_qos(*wq_supported
);
710 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
711 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
712 if (!result
&& !disable_wq
) {
713 pthread_workqueue_setdispatchoffset_np(
714 offsetof(struct dispatch_queue_s
, dq_serialnum
));
715 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
716 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 (void)dispatch_assume_zero(r
);
721 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
724 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725 pthread_workqueue_attr_t pwq_attr
;
727 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
728 (void)dispatch_assume_zero(r
);
732 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
733 pthread_workqueue_t pwq
= NULL
;
734 dispatch_root_queue_context_t qc
;
735 qc
= &_dispatch_root_queue_contexts
[i
];
736 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
737 if (!disable_wq
&& qc
->dgq_wq_priority
!= WORKQ_PRIO_INVALID
) {
738 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
739 qc
->dgq_wq_priority
);
740 (void)dispatch_assume_zero(r
);
741 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
743 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
744 (void)dispatch_assume_zero(r
);
745 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
746 (void)dispatch_assume_zero(r
);
747 result
= result
|| dispatch_assume(pwq
);
749 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
750 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
752 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
754 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
755 (void)dispatch_assume_zero(r
);
759 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
760 #endif // HAVE_PTHREAD_WORKQUEUES
764 #if DISPATCH_USE_PTHREAD_POOL
766 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
767 uint8_t pool_size
, bool overcommit
)
769 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
770 uint32_t thread_pool_size
= overcommit
? MAX_PTHREAD_COUNT
:
771 dispatch_hw_config(active_cpus
);
772 if (slowpath(pool_size
) && pool_size
< thread_pool_size
) {
773 thread_pool_size
= pool_size
;
775 qc
->dgq_thread_pool_size
= thread_pool_size
;
776 #if HAVE_PTHREAD_WORKQUEUES
778 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
779 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
780 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
781 #if HAVE_PTHREAD_WORKQUEUE_QOS
782 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
783 &pqc
->dpq_thread_attr
, qc
->dgq_qos
, 0));
786 #endif // HAVE_PTHREAD_WORKQUEUES
787 _os_semaphore_t
*sema
= &pqc
->dpq_thread_mediator
.dsema_sema
;
788 _os_semaphore_init(sema
, _OS_SEM_POLICY_LIFO
);
789 _os_semaphore_create(sema
, _OS_SEM_POLICY_LIFO
);
791 #endif // DISPATCH_USE_PTHREAD_POOL
793 static dispatch_once_t _dispatch_root_queues_pred
;
796 _dispatch_root_queues_init(void)
798 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
799 _dispatch_root_queues_init_once
);
803 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED
)
806 _dispatch_fork_becomes_unsafe();
807 if (!_dispatch_root_queues_init_workq(&wq_supported
)) {
808 #if DISPATCH_ENABLE_THREAD_POOL
810 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
811 bool overcommit
= true;
812 #if TARGET_OS_EMBEDDED
813 // some software hangs if the non-overcommitting queues do not
814 // overcommit when threads block. Someday, this behavior should
815 // apply to all platforms
820 _dispatch_root_queue_init_pthread_pool(
821 &_dispatch_root_queue_contexts
[i
], 0, overcommit
);
824 DISPATCH_INTERNAL_CRASH((errno
<< 16) | wq_supported
,
825 "Root queue initialization failed");
826 #endif // DISPATCH_ENABLE_THREAD_POOL
830 DISPATCH_EXPORT DISPATCH_NOTHROW
832 libdispatch_init(void)
834 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT
== 6);
835 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 12);
837 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
838 -DISPATCH_QUEUE_PRIORITY_HIGH
);
839 dispatch_assert(countof(_dispatch_root_queues
) ==
840 DISPATCH_ROOT_QUEUE_COUNT
);
841 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
842 DISPATCH_ROOT_QUEUE_COUNT
);
843 dispatch_assert(countof(_dispatch_priority2qos
) ==
844 DISPATCH_PRIORITY_COUNT
);
845 #if HAVE_PTHREAD_WORKQUEUE_QOS
846 dispatch_assert(countof(_dispatch_priority2wq
) ==
847 DISPATCH_PRIORITY_COUNT
);
849 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
850 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
851 sizeof(_dispatch_wq2root_queues
[0][0]) ==
852 WORKQ_NUM_PRIOQUEUE
* 2);
854 #if DISPATCH_ENABLE_THREAD_POOL
855 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts
) ==
856 DISPATCH_ROOT_QUEUE_COUNT
);
859 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_next
) ==
860 offsetof(struct dispatch_object_s
, do_next
));
861 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_vtable
) ==
862 offsetof(struct dispatch_object_s
, do_vtable
));
863 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
864 DISPATCH_CONTINUATION_SIZE
);
865 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
867 dispatch_assert(offsetof(struct dispatch_queue_s
, dq_state
) % _Alignof(uint64_t) == 0);
868 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
869 DISPATCH_CACHELINE_SIZE
== 0);
872 #if HAVE_PTHREAD_WORKQUEUE_QOS
873 // 26497968 _dispatch_user_initiated_priority should be set for qos
874 // propagation to work properly
875 pthread_priority_t p
= _pthread_qos_class_encode(qos_class_main(), 0, 0);
876 _dispatch_main_q
.dq_priority
= (dispatch_priority_t
)p
;
877 _dispatch_main_q
.dq_override
= p
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
878 p
= _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED
, 0, 0);
879 _dispatch_user_initiated_priority
= p
;
880 p
= _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND
, 0, 0);
881 _dispatch_background_priority
= p
;
883 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
884 _dispatch_set_qos_class_enabled
= 1;
889 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
890 _dispatch_thread_key_create(&__dispatch_tsd_key
, _libdispatch_tsd_cleanup
);
892 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
893 _dispatch_thread_key_create(&dispatch_deferred_items_key
,
894 _dispatch_deferred_items_cleanup
);
895 _dispatch_thread_key_create(&dispatch_frame_key
, _dispatch_frame_cleanup
);
896 _dispatch_thread_key_create(&dispatch_voucher_key
, _voucher_thread_cleanup
);
897 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
898 _dispatch_thread_key_create(&dispatch_context_key
, _dispatch_context_cleanup
);
899 _dispatch_thread_key_create(&dispatch_defaultpriority_key
, NULL
);
900 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key
,
902 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
903 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
905 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
906 if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
) {
907 _dispatch_thread_key_create(&dispatch_sema4_key
,
908 _dispatch_thread_semaphore_dispose
);
913 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
914 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
915 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
];
918 _dispatch_queue_set_current(&_dispatch_main_q
);
919 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
921 #if DISPATCH_USE_PTHREAD_ATFORK
922 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
923 dispatch_atfork_parent
, dispatch_atfork_child
));
925 _dispatch_hw_config_init();
926 _dispatch_vtable_init();
929 _dispatch_introspection_init();
933 static dispatch_once_t _dispatch_mach_host_port_pred
;
934 static mach_port_t _dispatch_mach_host_port
;
937 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED
)
940 mach_port_t mp
, mhp
= mach_host_self();
941 kr
= host_get_host_port(mhp
, &mp
);
942 DISPATCH_VERIFY_MIG(kr
);
944 // mach_host_self returned the HOST_PRIV port
945 kr
= mach_port_deallocate(mach_task_self(), mhp
);
946 DISPATCH_VERIFY_MIG(kr
);
948 } else if (kr
!= KERN_INVALID_ARGUMENT
) {
949 (void)dispatch_assume_zero(kr
);
951 if (!fastpath(mhp
)) {
952 DISPATCH_CLIENT_CRASH(kr
, "Could not get unprivileged host port");
954 _dispatch_mach_host_port
= mhp
;
958 _dispatch_get_mach_host_port(void)
960 dispatch_once_f(&_dispatch_mach_host_port_pred
, NULL
,
961 _dispatch_mach_host_port_init
);
962 return _dispatch_mach_host_port
;
966 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
968 #include <sys/syscall.h>
971 DISPATCH_ALWAYS_INLINE
975 return (pid_t
) syscall(SYS_gettid
);
978 #error "SYS_gettid unavailable on this system"
981 #define _tsd_call_cleanup(k, f) do { \
982 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
986 _libdispatch_tsd_cleanup(void *ctx
)
988 struct dispatch_tsd
*tsd
= (struct dispatch_tsd
*) ctx
;
990 _tsd_call_cleanup(dispatch_queue_key
, _dispatch_queue_cleanup
);
991 _tsd_call_cleanup(dispatch_frame_key
, _dispatch_frame_cleanup
);
992 _tsd_call_cleanup(dispatch_cache_key
, _dispatch_cache_cleanup
);
993 _tsd_call_cleanup(dispatch_context_key
, _dispatch_context_cleanup
);
994 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key
,
996 _tsd_call_cleanup(dispatch_defaultpriority_key
, NULL
);
997 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
998 _tsd_call_cleanup(dispatch_bcounter_key
, NULL
);
1000 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
1001 _tsd_call_cleanup(dispatch_sema4_key
, _dispatch_thread_semaphore_dispose
);
1003 _tsd_call_cleanup(dispatch_priority_key
, NULL
);
1004 _tsd_call_cleanup(dispatch_voucher_key
, _voucher_thread_cleanup
);
1005 _tsd_call_cleanup(dispatch_deferred_items_key
,
1006 _dispatch_deferred_items_cleanup
);
1012 libdispatch_tsd_init(void)
1014 pthread_setspecific(__dispatch_tsd_key
, &__dispatch_tsd
);
1015 __dispatch_tsd
.tid
= gettid();
1021 _dispatch_queue_atfork_child(void)
1023 void *crash
= (void *)0x100;
1027 _dispatch_mach_host_port_pred
= 0;
1028 _dispatch_mach_host_port
= MACH_PORT_NULL
;
1031 if (!_dispatch_is_multithreaded_inline()) return;
1033 _dispatch_main_q
.dq_items_head
= crash
;
1034 _dispatch_main_q
.dq_items_tail
= crash
;
1036 _dispatch_mgr_q
.dq_items_head
= crash
;
1037 _dispatch_mgr_q
.dq_items_tail
= crash
;
1039 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1040 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1041 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1046 #pragma mark dispatch_queue_attr_t
1048 DISPATCH_ALWAYS_INLINE
1050 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class
, int relative_priority
)
1052 qos_class_t qos
= (qos_class_t
)qos_class
;
1054 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
1055 case _DISPATCH_QOS_CLASS_BACKGROUND
:
1056 case _DISPATCH_QOS_CLASS_UTILITY
:
1057 case _DISPATCH_QOS_CLASS_DEFAULT
:
1058 case _DISPATCH_QOS_CLASS_USER_INITIATED
:
1059 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
1060 case _DISPATCH_QOS_CLASS_UNSPECIFIED
:
1065 if (relative_priority
> 0 || relative_priority
< QOS_MIN_RELATIVE_PRIORITY
){
1071 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1072 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1075 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx
[] = {
1076 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED
),
1077 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE
),
1078 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND
),
1079 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY
),
1080 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT
),
1081 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED
),
1082 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE
),
1085 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1086 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1087 DQA_INDEX_NON_OVERCOMMIT : \
1088 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1089 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1091 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1092 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1094 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1095 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1097 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1100 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1102 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1104 static inline dispatch_queue_attr_t
1105 _dispatch_get_queue_attr(qos_class_t qos
, int prio
,
1106 _dispatch_queue_attr_overcommit_t overcommit
,
1107 dispatch_autorelease_frequency_t frequency
,
1108 bool concurrent
, bool inactive
)
1110 return (dispatch_queue_attr_t
)&_dispatch_queue_attrs
1111 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos
)]
1112 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio
)]
1113 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit
)]
1114 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency
)]
1115 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent
)]
1116 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive
)];
1119 dispatch_queue_attr_t
1120 _dispatch_get_default_queue_attr(void)
1122 return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED
, 0,
1123 _dispatch_queue_attr_overcommit_unspecified
,
1124 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
, false, false);
1127 dispatch_queue_attr_t
1128 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa
,
1129 dispatch_qos_class_t qos_class
, int relative_priority
)
1131 if (!_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
1132 return DISPATCH_BAD_INPUT
;
1134 if (!slowpath(dqa
)) {
1135 dqa
= _dispatch_get_default_queue_attr();
1136 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1137 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1139 return _dispatch_get_queue_attr(qos_class
, relative_priority
,
1140 dqa
->dqa_overcommit
, dqa
->dqa_autorelease_frequency
,
1141 dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1144 dispatch_queue_attr_t
1145 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa
)
1147 if (!slowpath(dqa
)) {
1148 dqa
= _dispatch_get_default_queue_attr();
1149 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1150 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1152 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1153 dqa
->dqa_relative_priority
, dqa
->dqa_overcommit
,
1154 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
, true);
1157 dispatch_queue_attr_t
1158 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa
,
1161 if (!slowpath(dqa
)) {
1162 dqa
= _dispatch_get_default_queue_attr();
1163 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1164 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1166 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1167 dqa
->dqa_relative_priority
, overcommit
?
1168 _dispatch_queue_attr_overcommit_enabled
:
1169 _dispatch_queue_attr_overcommit_disabled
,
1170 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
,
1174 dispatch_queue_attr_t
1175 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa
,
1176 dispatch_autorelease_frequency_t frequency
)
1178 switch (frequency
) {
1179 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
:
1180 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1181 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1184 return DISPATCH_BAD_INPUT
;
1186 if (!slowpath(dqa
)) {
1187 dqa
= _dispatch_get_default_queue_attr();
1188 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1189 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1191 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1192 dqa
->dqa_relative_priority
, dqa
->dqa_overcommit
,
1193 frequency
, dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1197 #pragma mark dispatch_queue_t
1203 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1204 // we use 'xadd' on Intel, so the initial value == next assigned
1205 unsigned long volatile _dispatch_queue_serial_numbers
= 16;
1208 static dispatch_queue_t
1209 _dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1210 dispatch_queue_t tq
, bool legacy
)
1212 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1213 // Be sure the root queue priorities are set
1214 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
1215 _dispatch_root_queues_init_once
);
1217 if (!slowpath(dqa
)) {
1218 dqa
= _dispatch_get_default_queue_attr();
1219 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1220 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1224 // Step 1: Normalize arguments (qos, overcommit, tq)
1227 qos_class_t qos
= dqa
->dqa_qos_class
;
1228 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1229 if (qos
== _DISPATCH_QOS_CLASS_USER_INTERACTIVE
&&
1230 !_dispatch_root_queues
[
1231 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
].dq_priority
) {
1232 qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
;
1235 bool maintenance_fallback
= false;
1236 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1237 maintenance_fallback
= true;
1238 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1239 if (maintenance_fallback
) {
1240 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
1241 !_dispatch_root_queues
[
1242 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
].dq_priority
) {
1243 qos
= _DISPATCH_QOS_CLASS_BACKGROUND
;
1247 _dispatch_queue_attr_overcommit_t overcommit
= dqa
->dqa_overcommit
;
1248 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
&& tq
) {
1249 if (tq
->do_targetq
) {
1250 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify both overcommit and "
1251 "a non-global target queue");
1255 if (tq
&& !tq
->do_targetq
&&
1256 tq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
1257 // Handle discrepancies between attr and target queue, attributes win
1258 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1259 if (tq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
1260 overcommit
= _dispatch_queue_attr_overcommit_enabled
;
1262 overcommit
= _dispatch_queue_attr_overcommit_disabled
;
1265 if (qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1266 tq
= _dispatch_get_root_queue_with_overcommit(tq
,
1267 overcommit
== _dispatch_queue_attr_overcommit_enabled
);
1271 } else if (tq
&& !tq
->do_targetq
) {
1272 // target is a pthread or runloop root queue, setting QoS or overcommit
1274 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
) {
1275 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify an overcommit attribute "
1276 "and use this kind of target queue");
1278 if (qos
!= _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1279 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify a QoS attribute "
1280 "and use this kind of target queue");
1283 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1284 // Serial queues default to overcommit!
1285 overcommit
= dqa
->dqa_concurrent
?
1286 _dispatch_queue_attr_overcommit_disabled
:
1287 _dispatch_queue_attr_overcommit_enabled
;
1291 qos_class_t tq_qos
= qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
?
1292 _DISPATCH_QOS_CLASS_DEFAULT
: qos
;
1293 tq
= _dispatch_get_root_queue(tq_qos
, overcommit
==
1294 _dispatch_queue_attr_overcommit_enabled
);
1295 if (slowpath(!tq
)) {
1296 DISPATCH_CLIENT_CRASH(qos
, "Invalid queue attribute");
1301 // Step 2: Initialize the queue
1305 // if any of these attributes is specified, use non legacy classes
1306 if (dqa
->dqa_inactive
|| dqa
->dqa_autorelease_frequency
) {
1312 dispatch_queue_flags_t dqf
= 0;
1314 vtable
= DISPATCH_VTABLE(queue
);
1315 } else if (dqa
->dqa_concurrent
) {
1316 vtable
= DISPATCH_VTABLE(queue_concurrent
);
1318 vtable
= DISPATCH_VTABLE(queue_serial
);
1320 switch (dqa
->dqa_autorelease_frequency
) {
1321 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1322 dqf
|= DQF_AUTORELEASE_NEVER
;
1324 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1325 dqf
|= DQF_AUTORELEASE_ALWAYS
;
1329 const char *tmp
= _dispatch_strdup_if_mutable(label
);
1331 dqf
|= DQF_LABEL_NEEDS_FREE
;
1336 dispatch_queue_t dq
= _dispatch_alloc(vtable
,
1337 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
1338 _dispatch_queue_init(dq
, dqf
, dqa
->dqa_concurrent
?
1339 DISPATCH_QUEUE_WIDTH_MAX
: 1, dqa
->dqa_inactive
);
1341 dq
->dq_label
= label
;
1343 #if HAVE_PTHREAD_WORKQUEUE_QOS
1344 dq
->dq_priority
= (dispatch_priority_t
)_pthread_qos_class_encode(qos
,
1345 dqa
->dqa_relative_priority
,
1346 overcommit
== _dispatch_queue_attr_overcommit_enabled
?
1347 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
: 0);
1349 _dispatch_retain(tq
);
1350 if (qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1351 // legacy way of inherithing the QoS from the target
1352 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1354 if (!dqa
->dqa_inactive
) {
1355 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
1357 dq
->do_targetq
= tq
;
1358 _dispatch_object_debug(dq
, "%s", __func__
);
1359 return _dispatch_introspection_queue_create(dq
);
1363 dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1364 dispatch_queue_t tq
)
1366 return _dispatch_queue_create_with_target(label
, dqa
, tq
, false);
1370 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
1372 return _dispatch_queue_create_with_target(label
, attr
,
1373 DISPATCH_TARGET_QUEUE_DEFAULT
, true);
1377 dispatch_queue_create_with_accounting_override_voucher(const char *label
,
1378 dispatch_queue_attr_t attr
, voucher_t voucher
)
1380 dispatch_queue_t dq
= dispatch_queue_create_with_target(label
, attr
,
1381 DISPATCH_TARGET_QUEUE_DEFAULT
);
1382 dq
->dq_override_voucher
= _voucher_create_accounting_voucher(voucher
);
1387 _dispatch_queue_destroy(dispatch_queue_t dq
)
1389 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
1390 uint64_t initial_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(dq
->dq_width
);
1392 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
1393 initial_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
;
1395 if (dx_type(dq
) == DISPATCH_SOURCE_KEVENT_TYPE
) {
1396 // dispatch_cancel_and_wait may apply overrides in a racy way with
1397 // the source cancellation finishing. This race is expensive and not
1398 // really worthwhile to resolve since the source becomes dead anyway.
1399 dq_state
&= ~DISPATCH_QUEUE_HAS_OVERRIDE
;
1401 if (slowpath(dq_state
!= initial_state
)) {
1402 if (_dq_state_drain_locked(dq_state
)) {
1403 DISPATCH_CLIENT_CRASH(dq
, "Release of a locked queue");
1408 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
1409 "Release of a queue with corrupt state");
1411 if (slowpath(dq
== _dispatch_queue_get_current())) {
1412 DISPATCH_CLIENT_CRASH(dq
, "Release of a queue by itself");
1414 if (slowpath(dq
->dq_items_tail
)) {
1415 DISPATCH_CLIENT_CRASH(dq
->dq_items_tail
,
1416 "Release of a queue while items are enqueued");
1419 // trash the queue so that use after free will crash
1420 dq
->dq_items_head
= (void *)0x200;
1421 dq
->dq_items_tail
= (void *)0x200;
1422 // poison the state with something that is suspended and is easy to spot
1423 dq
->dq_state
= 0xdead000000000000;
1425 dispatch_queue_t dqsq
= os_atomic_xchg2o(dq
, dq_specific_q
,
1426 (void *)0x200, relaxed
);
1428 _dispatch_release(dqsq
);
1430 if (dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
) {
1431 if (dq
->dq_override_voucher
) _voucher_release(dq
->dq_override_voucher
);
1432 dq
->dq_override_voucher
= DISPATCH_NO_VOUCHER
;
1436 // 6618342 Contact the team that owns the Instrument DTrace probe before
1437 // renaming this symbol
1439 _dispatch_queue_dispose(dispatch_queue_t dq
)
1441 _dispatch_object_debug(dq
, "%s", __func__
);
1442 _dispatch_introspection_queue_dispose(dq
);
1443 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
1444 free((void*)dq
->dq_label
);
1446 _dispatch_queue_destroy(dq
);
1451 _dispatch_queue_suspend_slow(dispatch_queue_t dq
)
1453 uint64_t dq_state
, value
, delta
;
1455 _dispatch_queue_sidelock_lock(dq
);
1457 // what we want to transfer (remove from dq_state)
1458 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1459 // but this is a suspend so add a suspend count at the same time
1460 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1461 if (dq
->dq_side_suspend_cnt
== 0) {
1462 // we substract delta from dq_state, and we want to set this bit
1463 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1466 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1467 // unsigned underflow of the substraction can happen because other
1468 // threads could have touched this value while we were trying to acquire
1469 // the lock, or because another thread raced us to do the same operation
1470 // and got to the lock first.
1471 if (slowpath(os_sub_overflow(dq_state
, delta
, &value
))) {
1472 os_atomic_rmw_loop_give_up(goto retry
);
1475 if (slowpath(os_add_overflow(dq
->dq_side_suspend_cnt
,
1476 DISPATCH_QUEUE_SUSPEND_HALF
, &dq
->dq_side_suspend_cnt
))) {
1477 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1479 return _dispatch_queue_sidelock_unlock(dq
);
1482 _dispatch_queue_sidelock_unlock(dq
);
1483 return dx_vtable(dq
)->do_suspend(dq
);
1487 _dispatch_queue_suspend(dispatch_queue_t dq
)
1489 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1491 uint64_t dq_state
, value
;
1493 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1494 value
= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1495 if (slowpath(os_add_overflow(dq_state
, value
, &value
))) {
1496 os_atomic_rmw_loop_give_up({
1497 return _dispatch_queue_suspend_slow(dq
);
1502 if (!_dq_state_is_suspended(dq_state
)) {
1503 // rdar://8181908 we need to extend the queue life for the duration
1504 // of the call to wakeup at _dispatch_queue_resume() time.
1505 _dispatch_retain(dq
);
1511 _dispatch_queue_resume_slow(dispatch_queue_t dq
)
1513 uint64_t dq_state
, value
, delta
;
1515 _dispatch_queue_sidelock_lock(dq
);
1517 // what we want to transfer
1518 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1519 // but this is a resume so consume a suspend count at the same time
1520 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1521 switch (dq
->dq_side_suspend_cnt
) {
1524 case DISPATCH_QUEUE_SUSPEND_HALF
:
1525 // we will transition the side count to 0, so we want to clear this bit
1526 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1529 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1530 // unsigned overflow of the addition can happen because other
1531 // threads could have touched this value while we were trying to acquire
1532 // the lock, or because another thread raced us to do the same operation
1533 // and got to the lock first.
1534 if (slowpath(os_add_overflow(dq_state
, delta
, &value
))) {
1535 os_atomic_rmw_loop_give_up(goto retry
);
1538 dq
->dq_side_suspend_cnt
-= DISPATCH_QUEUE_SUSPEND_HALF
;
1539 return _dispatch_queue_sidelock_unlock(dq
);
1542 _dispatch_queue_sidelock_unlock(dq
);
1543 return dx_vtable(dq
)->do_resume(dq
, false);
1548 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq
)
1550 // Step 2: run the activation finalizer
1551 if (dx_vtable(dq
)->do_finalize_activation
) {
1552 dx_vtable(dq
)->do_finalize_activation(dq
);
1554 // Step 3: consume the suspend count
1555 return dx_vtable(dq
)->do_resume(dq
, false);
1559 _dispatch_queue_resume(dispatch_queue_t dq
, bool activate
)
1561 // covers all suspend and inactive bits, including side suspend bit
1562 const uint64_t suspend_bits
= DISPATCH_QUEUE_SUSPEND_BITS_MASK
;
1563 // backward compatibility: only dispatch sources can abuse
1564 // dispatch_resume() to really mean dispatch_activate()
1565 bool resume_can_activate
= (dx_type(dq
) == DISPATCH_SOURCE_KEVENT_TYPE
);
1566 uint64_t dq_state
, value
;
1568 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1570 // Activation is a bit tricky as it needs to finalize before the wakeup.
1572 // If after doing its updates to the suspend count and/or inactive bit,
1573 // the last suspension related bit that would remain is the
1574 // NEEDS_ACTIVATION one, then this function:
1576 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1578 // 2. runs the activation finalizer
1579 // 3. consumes the suspend count set in (1), and finishes the resume flow
1581 // Concurrently, some property setters such as setting dispatch source
1582 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1583 // before activation. These protect their action by taking a suspend count.
1584 // Step (1) above cannot happen if such a setter has locked the object.
1586 // relaxed atomic because this doesn't publish anything, this is only
1587 // about picking the thread that gets to finalize the activation
1588 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1589 if ((dq_state
& suspend_bits
) ==
1590 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1591 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1592 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1593 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1594 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1595 } else if (_dq_state_is_inactive(dq_state
)) {
1596 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1597 // simple activation because sc is not 0
1598 // resume will deal with na:1 later
1599 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
;
1601 // object already active, this is a no-op, just exit
1602 os_atomic_rmw_loop_give_up(return);
1606 // release barrier needed to publish the effect of
1607 // - dispatch_set_target_queue()
1608 // - dispatch_set_*_handler()
1609 // - do_finalize_activation()
1610 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, release
, {
1611 if ((dq_state
& suspend_bits
) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1612 + DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1613 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1614 value
= dq_state
- DISPATCH_QUEUE_NEEDS_ACTIVATION
;
1615 } else if (resume_can_activate
&& (dq_state
& suspend_bits
) ==
1616 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1617 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1618 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1619 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1620 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1622 value
= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1623 if (slowpath(os_sub_overflow(dq_state
, value
, &value
))) {
1624 // underflow means over-resume or a suspend count transfer
1625 // to the side count is needed
1626 os_atomic_rmw_loop_give_up({
1627 if (!(dq_state
& DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
)) {
1630 return _dispatch_queue_resume_slow(dq
);
1633 if (_dq_state_is_runnable(value
) &&
1634 !_dq_state_drain_locked(value
)) {
1635 uint64_t full_width
= value
;
1636 if (_dq_state_has_pending_barrier(value
)) {
1637 full_width
-= DISPATCH_QUEUE_PENDING_BARRIER
;
1638 full_width
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
1639 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
1641 full_width
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
1642 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
1644 if ((full_width
& DISPATCH_QUEUE_WIDTH_MASK
) ==
1645 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
1647 value
&= ~DISPATCH_QUEUE_DIRTY
;
1648 value
|= _dispatch_tid_self();
1655 if ((dq_state
^ value
) & DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1656 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1657 return _dispatch_queue_resume_finalize_activation(dq
);
1661 // if we're still in an activate codepath here we should have
1662 // { sc:>0 na:1 }, if not we've got a corrupt state
1663 if (!fastpath(_dq_state_is_suspended(value
))) {
1664 DISPATCH_CLIENT_CRASH(dq
, "Invalid suspension state");
1669 if (_dq_state_is_suspended(value
)) {
1673 if ((dq_state
^ value
) & DISPATCH_QUEUE_IN_BARRIER
) {
1674 _dispatch_try_lock_transfer_or_wakeup(dq
);
1675 } else if (_dq_state_should_wakeup(value
)) {
1676 // <rdar://problem/14637483>
1677 // seq_cst wrt state changes that were flushed and not acted upon
1678 os_atomic_thread_fence(acquire
);
1679 pthread_priority_t pp
= _dispatch_queue_reset_override_priority(dq
,
1680 _dispatch_queue_is_thread_bound(dq
));
1681 // Balancing the retain() done in suspend() for rdar://8181908
1682 return dx_wakeup(dq
, pp
, DISPATCH_WAKEUP_CONSUME
);
1685 // Balancing the retain() done in suspend() for rdar://8181908
1686 return _dispatch_release_tailcall(dq
);
1689 if (slowpath(_dq_state_is_inactive(dq_state
))) {
1690 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an inactive object");
1692 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an object");
1696 dispatch_queue_get_label(dispatch_queue_t dq
)
1698 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
1699 dq
= _dispatch_get_current_queue();
1701 return dq
->dq_label
? dq
->dq_label
: "";
1705 dispatch_queue_get_qos_class(dispatch_queue_t dq
, int *relative_priority_ptr
)
1707 qos_class_t qos
= _DISPATCH_QOS_CLASS_UNSPECIFIED
;
1708 int relative_priority
= 0;
1709 #if HAVE_PTHREAD_WORKQUEUE_QOS
1710 pthread_priority_t dqp
= dq
->dq_priority
;
1711 if (dqp
& _PTHREAD_PRIORITY_INHERIT_FLAG
) dqp
= 0;
1712 qos
= _pthread_qos_class_decode(dqp
, &relative_priority
, NULL
);
1716 if (relative_priority_ptr
) *relative_priority_ptr
= relative_priority
;
1721 _dispatch_queue_set_width2(void *ctxt
)
1723 int w
= (int)(intptr_t)ctxt
; // intentional truncation
1725 dispatch_queue_t dq
= _dispatch_queue_get_current();
1728 tmp
= (unsigned int)w
;
1733 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
1734 tmp
= dispatch_hw_config(physical_cpus
);
1736 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
1737 tmp
= dispatch_hw_config(active_cpus
);
1741 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
1742 tmp
= dispatch_hw_config(logical_cpus
);
1745 if (tmp
> DISPATCH_QUEUE_WIDTH_MAX
) {
1746 tmp
= DISPATCH_QUEUE_WIDTH_MAX
;
1749 dispatch_queue_flags_t old_dqf
, new_dqf
;
1750 os_atomic_rmw_loop2o(dq
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1751 new_dqf
= old_dqf
& ~DQF_WIDTH_MASK
;
1752 new_dqf
|= (tmp
<< DQF_WIDTH_SHIFT
);
1754 _dispatch_object_debug(dq
, "%s", __func__
);
1758 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
1760 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
1761 slowpath(dx_hastypeflag(dq
, QUEUE_ROOT
))) {
1765 unsigned long type
= dx_type(dq
);
1767 case DISPATCH_QUEUE_LEGACY_TYPE
:
1768 case DISPATCH_QUEUE_CONCURRENT_TYPE
:
1770 case DISPATCH_QUEUE_SERIAL_TYPE
:
1771 DISPATCH_CLIENT_CRASH(type
, "Cannot set width of a serial queue");
1773 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1776 _dispatch_barrier_trysync_or_async_f(dq
, (void*)(intptr_t)width
,
1777 _dispatch_queue_set_width2
);
1781 _dispatch_queue_legacy_set_target_queue(void *ctxt
)
1783 dispatch_queue_t dq
= _dispatch_queue_get_current();
1784 dispatch_queue_t tq
= ctxt
;
1785 dispatch_queue_t otq
= dq
->do_targetq
;
1787 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1788 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget
, dq
, otq
, tq
);
1789 _dispatch_bug_deprecated("Changing the target of a queue "
1790 "already targeted by other dispatch objects");
1793 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1794 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
1795 #if HAVE_PTHREAD_WORKQUEUE_QOS
1796 // see _dispatch_queue_class_wakeup()
1797 _dispatch_queue_sidelock_lock(dq
);
1799 dq
->do_targetq
= tq
;
1800 #if HAVE_PTHREAD_WORKQUEUE_QOS
1801 // see _dispatch_queue_class_wakeup()
1802 _dispatch_queue_sidelock_unlock(dq
);
1805 _dispatch_object_debug(dq
, "%s", __func__
);
1806 _dispatch_introspection_target_queue_changed(dq
);
1807 _dispatch_release_tailcall(otq
);
1811 _dispatch_queue_set_target_queue(dispatch_queue_t dq
, dispatch_queue_t tq
)
1813 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
&&
1816 if (slowpath(!tq
)) {
1817 bool is_concurrent_q
= (dq
->dq_width
> 1);
1818 tq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1822 if (_dispatch_queue_try_inactive_suspend(dq
)) {
1823 _dispatch_object_set_target_queue_inline(dq
, tq
);
1824 return dx_vtable(dq
)->do_resume(dq
, false);
1827 if (dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
) {
1828 DISPATCH_CLIENT_CRASH(dq
, "Cannot change the target of a queue or "
1829 "source with an accounting override voucher "
1830 "after it has been activated");
1833 unsigned long type
= dx_type(dq
);
1835 case DISPATCH_QUEUE_LEGACY_TYPE
:
1836 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1837 _dispatch_bug_deprecated("Changing the target of a queue "
1838 "already targeted by other dispatch objects");
1841 case DISPATCH_SOURCE_KEVENT_TYPE
:
1842 case DISPATCH_MACH_CHANNEL_TYPE
:
1843 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget
, dq
);
1844 _dispatch_bug_deprecated("Changing the target of a source "
1845 "after it has been activated");
1848 case DISPATCH_QUEUE_SERIAL_TYPE
:
1849 case DISPATCH_QUEUE_CONCURRENT_TYPE
:
1850 DISPATCH_CLIENT_CRASH(type
, "Cannot change the target of this queue "
1851 "after it has been activated");
1853 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1856 _dispatch_retain(tq
);
1857 return _dispatch_barrier_trysync_or_async_f(dq
, tq
,
1858 _dispatch_queue_legacy_set_target_queue
);
1862 #pragma mark dispatch_mgr_queue
1864 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1865 static struct dispatch_pthread_root_queue_context_s
1866 _dispatch_mgr_root_queue_pthread_context
;
1867 static struct dispatch_root_queue_context_s
1868 _dispatch_mgr_root_queue_context
= {{{
1869 #if HAVE_PTHREAD_WORKQUEUES
1870 .dgq_kworkqueue
= (void*)(~0ul),
1872 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
1873 .dgq_thread_pool_size
= 1,
1876 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
1877 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root
),
1878 .dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
,
1879 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
1880 .dq_label
= "com.apple.root.libdispatch-manager",
1881 .dq_width
= DISPATCH_QUEUE_WIDTH_POOL
,
1882 .dq_override
= DISPATCH_SATURATED_OVERRIDE
,
1883 .dq_override_voucher
= DISPATCH_NO_VOUCHER
,
1886 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1888 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1891 volatile qos_class_t qos
;
1895 } _dispatch_mgr_sched
;
1897 static dispatch_once_t _dispatch_mgr_sched_pred
;
1899 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1901 #if HAVE_PTHREAD_WORKQUEUE_QOS
1902 // Must be kept in sync with list of qos classes in sys/qos.h
1903 static const int _dispatch_mgr_sched_qos2prio
[] = {
1904 [_DISPATCH_QOS_CLASS_MAINTENANCE
] = 4,
1905 [_DISPATCH_QOS_CLASS_BACKGROUND
] = 4,
1906 [_DISPATCH_QOS_CLASS_UTILITY
] = 20,
1907 [_DISPATCH_QOS_CLASS_DEFAULT
] = 31,
1908 [_DISPATCH_QOS_CLASS_USER_INITIATED
] = 37,
1909 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE
] = 47,
1911 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
1914 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
1916 struct sched_param param
;
1917 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1918 pthread_attr_t
*attr
;
1919 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1921 pthread_attr_t a
, *attr
= &a
;
1923 (void)dispatch_assume_zero(pthread_attr_init(attr
));
1924 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
1925 &_dispatch_mgr_sched
.policy
));
1926 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1927 #if HAVE_PTHREAD_WORKQUEUE_QOS
1928 qos_class_t qos
= qos_class_main();
1929 if (qos
== _DISPATCH_QOS_CLASS_DEFAULT
) {
1930 qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
; // rdar://problem/17279292
1933 _dispatch_mgr_sched
.qos
= qos
;
1934 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
1937 _dispatch_mgr_sched
.default_prio
= param
.sched_priority
;
1938 _dispatch_mgr_sched
.prio
= _dispatch_mgr_sched
.default_prio
;
1940 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1942 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1945 _dispatch_mgr_root_queue_init(void)
1947 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
1948 struct sched_param param
;
1949 pthread_attr_t
*attr
;
1950 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1951 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
1952 PTHREAD_CREATE_DETACHED
));
1954 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
1956 #if HAVE_PTHREAD_WORKQUEUE_QOS
1957 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
1959 if (_dispatch_set_qos_class_enabled
) {
1960 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr
,
1963 _dispatch_mgr_q
.dq_priority
=
1964 (dispatch_priority_t
)_pthread_qos_class_encode(qos
, 0, 0);
1967 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1968 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1969 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
1971 return &_dispatch_mgr_sched
.tid
;
1975 _dispatch_mgr_priority_apply(void)
1977 struct sched_param param
;
1979 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1980 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1981 (void)dispatch_assume_zero(pthread_setschedparam(
1982 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
,
1985 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
1990 _dispatch_mgr_priority_init(void)
1992 struct sched_param param
;
1993 pthread_attr_t
*attr
;
1994 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1995 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1996 #if HAVE_PTHREAD_WORKQUEUE_QOS
1997 qos_class_t qos
= 0;
1998 (void)pthread_attr_get_qos_class_np(attr
, &qos
, NULL
);
1999 if (_dispatch_mgr_sched
.qos
> qos
&& _dispatch_set_qos_class_enabled
) {
2000 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched
.qos
, 0);
2001 int p
= _dispatch_mgr_sched_qos2prio
[_dispatch_mgr_sched
.qos
];
2002 if (p
> param
.sched_priority
) {
2003 param
.sched_priority
= p
;
2007 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
2008 return _dispatch_mgr_priority_apply();
2011 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2013 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2016 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
2018 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2019 struct sched_param param
;
2020 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2021 #if HAVE_PTHREAD_WORKQUEUE_QOS
2022 qos_class_t q
, qos
= 0;
2023 (void)pthread_attr_get_qos_class_np((pthread_attr_t
*)attr
, &qos
, NULL
);
2025 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
2026 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, qos
, q
, qos
, relaxed
, {
2027 if (q
>= qos
) os_atomic_rmw_loop_give_up(break);
2031 int p
, prio
= param
.sched_priority
;
2032 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, prio
, p
, prio
, relaxed
, {
2033 if (p
>= prio
) os_atomic_rmw_loop_give_up(return);
2035 #if DISPATCH_USE_KEVENT_WORKQUEUE
2036 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
2037 _dispatch_root_queues_init_once
);
2038 if (_dispatch_kevent_workqueue_enabled
) {
2039 pthread_priority_t pp
= 0;
2040 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2041 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2042 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2043 // problematic in this case, since it the second one is only ever
2044 // used on dq_priority fields.
2045 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2046 // it is meaningful to libdispatch only.
2047 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2049 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2052 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2053 (void)dispatch_assume_zero(r
);
2058 #if DISPATCH_USE_MGR_THREAD
2059 if (_dispatch_mgr_sched
.tid
) {
2060 return _dispatch_mgr_priority_apply();
2064 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2066 #if DISPATCH_USE_KEVENT_WORKQUEUE
2068 _dispatch_kevent_workqueue_init(void)
2070 // Initialize kevent workqueue support
2071 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
2072 _dispatch_root_queues_init_once
);
2073 if (!_dispatch_kevent_workqueue_enabled
) return;
2074 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2075 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
2076 int prio
= _dispatch_mgr_sched
.prio
;
2077 pthread_priority_t pp
= 0;
2079 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2080 _dispatch_mgr_q
.dq_priority
= (dispatch_priority_t
)pp
;
2082 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2083 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2086 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2087 (void)dispatch_assume_zero(r
);
2093 #pragma mark dispatch_pthread_root_queue
2095 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2096 static dispatch_queue_t
2097 _dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2098 const pthread_attr_t
*attr
, dispatch_block_t configure
,
2099 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
)
2101 dispatch_queue_t dq
;
2102 dispatch_root_queue_context_t qc
;
2103 dispatch_pthread_root_queue_context_t pqc
;
2104 dispatch_queue_flags_t dqf
= 0;
2106 uint8_t pool_size
= flags
& _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
?
2107 (uint8_t)(flags
& ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
) : 0;
2109 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
2110 dqs
= roundup(dqs
, _Alignof(struct dispatch_root_queue_context_s
));
2111 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
2112 sizeof(struct dispatch_root_queue_context_s
) +
2113 sizeof(struct dispatch_pthread_root_queue_context_s
));
2114 qc
= (void*)dq
+ dqs
;
2115 dispatch_assert((uintptr_t)qc
% _Alignof(typeof(*qc
)) == 0);
2116 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
2117 dispatch_assert((uintptr_t)pqc
% _Alignof(typeof(*pqc
)) == 0);
2119 const char *tmp
= _dispatch_strdup_if_mutable(label
);
2121 dqf
|= DQF_LABEL_NEEDS_FREE
;
2126 _dispatch_queue_init(dq
, dqf
, DISPATCH_QUEUE_WIDTH_POOL
, false);
2127 dq
->dq_label
= label
;
2128 dq
->dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
,
2129 dq
->dq_override
= DISPATCH_SATURATED_OVERRIDE
;
2131 dq
->do_targetq
= NULL
;
2133 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
2135 #if HAVE_PTHREAD_WORKQUEUES
2136 qc
->dgq_kworkqueue
= (void*)(~0ul);
2138 _dispatch_root_queue_init_pthread_pool(qc
, pool_size
, true);
2141 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
2142 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
2144 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
2146 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2147 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
2149 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
2151 if (observer_hooks
) {
2152 pqc
->dpq_observer_hooks
= *observer_hooks
;
2154 _dispatch_object_debug(dq
, "%s", __func__
);
2155 return _dispatch_introspection_queue_create(dq
);
2159 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2160 const pthread_attr_t
*attr
, dispatch_block_t configure
)
2162 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2166 #if DISPATCH_IOHID_SPI
2168 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label
,
2169 unsigned long flags
, const pthread_attr_t
*attr
,
2170 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
,
2171 dispatch_block_t configure
)
2173 if (!observer_hooks
->queue_will_execute
||
2174 !observer_hooks
->queue_did_execute
) {
2175 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2177 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2183 dispatch_pthread_root_queue_copy_current(void)
2185 dispatch_queue_t dq
= _dispatch_queue_get_current();
2186 if (!dq
) return NULL
;
2187 while (slowpath(dq
->do_targetq
)) {
2188 dq
= dq
->do_targetq
;
2190 if (dx_type(dq
) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
||
2191 dq
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
2194 return (dispatch_queue_t
)_os_object_retain_with_resurrect(dq
->_as_os_obj
);
2197 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2200 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
)
2202 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
2203 DISPATCH_INTERNAL_CRASH(dq
, "Global root queue disposed");
2205 _dispatch_object_debug(dq
, "%s", __func__
);
2206 _dispatch_introspection_queue_dispose(dq
);
2207 #if DISPATCH_USE_PTHREAD_POOL
2208 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2209 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2211 pthread_attr_destroy(&pqc
->dpq_thread_attr
);
2212 _dispatch_semaphore_dispose(&pqc
->dpq_thread_mediator
);
2213 if (pqc
->dpq_thread_configure
) {
2214 Block_release(pqc
->dpq_thread_configure
);
2216 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
2219 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
2220 free((void*)dq
->dq_label
);
2222 _dispatch_queue_destroy(dq
);
2226 #pragma mark dispatch_queue_specific
2228 struct dispatch_queue_specific_queue_s
{
2229 DISPATCH_QUEUE_HEADER(queue_specific_queue
);
2230 TAILQ_HEAD(dispatch_queue_specific_head_s
,
2231 dispatch_queue_specific_s
) dqsq_contexts
;
2232 } DISPATCH_QUEUE_ALIGN
;
2234 struct dispatch_queue_specific_s
{
2235 const void *dqs_key
;
2237 dispatch_function_t dqs_destructor
;
2238 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
2240 DISPATCH_DECL(dispatch_queue_specific
);
2243 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
2245 dispatch_queue_specific_t dqs
, tmp
;
2247 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
2248 if (dqs
->dqs_destructor
) {
2249 dispatch_async_f(_dispatch_get_root_queue(
2250 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
2251 dqs
->dqs_destructor
);
2255 _dispatch_queue_destroy(dqsq
->_as_dq
);
2259 _dispatch_queue_init_specific(dispatch_queue_t dq
)
2261 dispatch_queue_specific_queue_t dqsq
;
2263 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
2264 sizeof(struct dispatch_queue_specific_queue_s
));
2265 _dispatch_queue_init(dqsq
->_as_dq
, DQF_NONE
,
2266 DISPATCH_QUEUE_WIDTH_MAX
, false);
2267 dqsq
->do_xref_cnt
= -1;
2268 dqsq
->do_targetq
= _dispatch_get_root_queue(
2269 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
2270 dqsq
->dq_label
= "queue-specific";
2271 TAILQ_INIT(&dqsq
->dqsq_contexts
);
2272 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
2273 dqsq
->_as_dq
, release
))) {
2274 _dispatch_release(dqsq
->_as_dq
);
2279 _dispatch_queue_set_specific(void *ctxt
)
2281 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
2282 dispatch_queue_specific_queue_t dqsq
=
2283 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2285 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2286 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
2287 // Destroy previous context for existing key
2288 if (dqs
->dqs_destructor
) {
2289 dispatch_async_f(_dispatch_get_root_queue(
2290 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
2291 dqs
->dqs_destructor
);
2293 if (dqsn
->dqs_ctxt
) {
2294 // Copy new context for existing key
2295 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
2296 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
2298 // Remove context storage for existing key
2299 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
2305 // Insert context storage for new key
2306 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
2311 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
2312 void *ctxt
, dispatch_function_t destructor
)
2314 if (slowpath(!key
)) {
2317 dispatch_queue_specific_t dqs
;
2319 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
2321 dqs
->dqs_ctxt
= ctxt
;
2322 dqs
->dqs_destructor
= destructor
;
2323 if (slowpath(!dq
->dq_specific_q
)) {
2324 _dispatch_queue_init_specific(dq
);
2326 _dispatch_barrier_trysync_or_async_f(dq
->dq_specific_q
, dqs
,
2327 _dispatch_queue_set_specific
);
2331 _dispatch_queue_get_specific(void *ctxt
)
2333 void **ctxtp
= ctxt
;
2335 dispatch_queue_specific_queue_t dqsq
=
2336 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2337 dispatch_queue_specific_t dqs
;
2339 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2340 if (dqs
->dqs_key
== key
) {
2341 *ctxtp
= dqs
->dqs_ctxt
;
2350 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
2352 if (slowpath(!key
)) {
2357 if (fastpath(dq
->dq_specific_q
)) {
2359 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
2366 dispatch_get_specific(const void *key
)
2368 if (slowpath(!key
)) {
2372 dispatch_queue_t dq
= _dispatch_queue_get_current();
2374 while (slowpath(dq
)) {
2375 if (slowpath(dq
->dq_specific_q
)) {
2377 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
2378 _dispatch_queue_get_specific
);
2381 dq
= dq
->do_targetq
;
2386 #if DISPATCH_IOHID_SPI
2388 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2389 dispatch_queue_t dq
) // rdar://problem/18033810
2391 if (dq
->dq_width
!= 1) {
2392 DISPATCH_CLIENT_CRASH(dq
->dq_width
, "Invalid queue type");
2394 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2395 return _dq_state_drain_locked_by(dq_state
, _dispatch_tid_self());
2400 #pragma mark dispatch_queue_debug
2403 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2406 dispatch_queue_t target
= dq
->do_targetq
;
2407 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2409 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
,
2410 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2411 target
&& target
->dq_label
? target
->dq_label
: "", target
,
2412 dq
->dq_width
, (unsigned long long)dq_state
);
2413 if (_dq_state_is_suspended(dq_state
)) {
2414 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", suspended = %d",
2415 _dq_state_suspend_cnt(dq_state
));
2417 if (_dq_state_is_inactive(dq_state
)) {
2418 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", inactive");
2419 } else if (_dq_state_needs_activation(dq_state
)) {
2420 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", needs-activation");
2422 if (_dq_state_is_enqueued(dq_state
)) {
2423 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", enqueued");
2425 if (_dq_state_is_dirty(dq_state
)) {
2426 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", dirty");
2428 if (_dq_state_has_override(dq_state
)) {
2429 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", async-override");
2431 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
2432 if (!_dispatch_queue_is_thread_bound(dq
) && owner
) {
2433 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", draining on 0x%x",
2436 if (_dq_state_is_in_barrier(dq_state
)) {
2437 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-barrier");
2439 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-flight = %d",
2440 _dq_state_used_width(dq_state
, dq
->dq_width
));
2442 if (_dq_state_has_pending_barrier(dq_state
)) {
2443 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", pending-barrier");
2445 if (_dispatch_queue_is_thread_bound(dq
)) {
2446 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", thread = 0x%x ",
2453 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2456 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2457 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
2458 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2459 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2460 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2466 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
2468 _dispatch_object_debug(dq
, "%s", str
);
2470 _dispatch_log("queue[NULL]: %s", str
);
2475 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
2476 static OSSpinLock _dispatch_stats_lock
;
2478 uint64_t time_total
;
2479 uint64_t count_total
;
2480 uint64_t thread_total
;
2481 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
2484 _dispatch_queue_merge_stats(uint64_t start
)
2486 uint64_t delta
= _dispatch_absolute_time() - start
;
2487 unsigned long count
;
2489 count
= (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key
);
2490 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
2492 int bucket
= flsl((long)count
);
2494 // 64-bit counters on 32-bit require a lock or a queue
2495 OSSpinLockLock(&_dispatch_stats_lock
);
2497 _dispatch_stats
[bucket
].time_total
+= delta
;
2498 _dispatch_stats
[bucket
].count_total
+= count
;
2499 _dispatch_stats
[bucket
].thread_total
++;
2501 OSSpinLockUnlock(&_dispatch_stats_lock
);
2506 #pragma mark _dispatch_set_priority_and_mach_voucher
2507 #if HAVE_PTHREAD_WORKQUEUE_QOS
2511 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp
,
2514 _pthread_set_flags_t pflags
= 0;
2515 if (pp
&& _dispatch_set_qos_class_enabled
) {
2516 pthread_priority_t old_pri
= _dispatch_get_priority();
2517 if (pp
!= old_pri
) {
2518 if (old_pri
& _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
) {
2519 pflags
|= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND
;
2520 // when we unbind, overcomitness can flip, so we need to learn
2521 // it from the defaultpri, see _dispatch_priority_compute_update
2522 pp
|= (_dispatch_get_defaultpriority() &
2523 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
2525 // else we need to keep the one that is set in the current pri
2526 pp
|= (old_pri
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
2528 if (likely(old_pri
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2529 pflags
|= _PTHREAD_SET_SELF_QOS_FLAG
;
2531 if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q
) ==
2532 _dispatch_tid_self())) {
2533 DISPATCH_INTERNAL_CRASH(pp
,
2534 "Changing the QoS while on the manager queue");
2536 if (unlikely(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2537 DISPATCH_INTERNAL_CRASH(pp
, "Cannot raise oneself to manager");
2539 if (old_pri
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
) {
2540 DISPATCH_INTERNAL_CRASH(old_pri
,
2541 "Cannot turn a manager thread into a normal one");
2545 if (kv
!= VOUCHER_NO_MACH_VOUCHER
) {
2546 #if VOUCHER_USE_MACH_VOUCHER
2547 pflags
|= _PTHREAD_SET_SELF_VOUCHER_FLAG
;
2550 if (!pflags
) return;
2551 int r
= _pthread_set_properties_self(pflags
, pp
, kv
);
2553 DISPATCH_INTERNAL_CRASH(pp
, "_pthread_set_properties_self failed");
2555 (void)dispatch_assume_zero(r
);
2560 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority
,
2561 voucher_t v
, _dispatch_thread_set_self_t flags
)
2563 voucher_t ov
= DISPATCH_NO_VOUCHER
;
2564 mach_voucher_t kv
= VOUCHER_NO_MACH_VOUCHER
;
2565 if (v
!= DISPATCH_NO_VOUCHER
) {
2566 bool retained
= flags
& DISPATCH_VOUCHER_CONSUME
;
2567 ov
= _voucher_get();
2568 if (ov
== v
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2569 if (retained
&& v
) _voucher_release_no_dispose(v
);
2570 ov
= DISPATCH_NO_VOUCHER
;
2572 if (!retained
&& v
) _voucher_retain(v
);
2573 kv
= _voucher_swap_and_get_mach_voucher(ov
, v
);
2576 #if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
2577 flags
&= ~(_dispatch_thread_set_self_t
)DISPATCH_THREAD_PARK
;
2579 if (!(flags
& DISPATCH_THREAD_PARK
)) {
2580 _dispatch_set_priority_and_mach_voucher_slow(priority
, kv
);
2582 if (ov
!= DISPATCH_NO_VOUCHER
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2583 if (ov
) _voucher_release(ov
);
2584 ov
= DISPATCH_NO_VOUCHER
;
2590 #pragma mark dispatch_continuation_t
2593 _dispatch_force_cache_cleanup(void)
2595 dispatch_continuation_t dc
;
2596 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2598 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
2599 _dispatch_cache_cleanup(dc
);
2605 _dispatch_cache_cleanup(void *value
)
2607 dispatch_continuation_t dc
, next_dc
= value
;
2609 while ((dc
= next_dc
)) {
2610 next_dc
= dc
->do_next
;
2611 _dispatch_continuation_free_to_heap(dc
);
2615 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2618 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
2620 _dispatch_continuation_free_to_heap(dc
);
2621 dispatch_continuation_t next_dc
;
2622 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2624 if (!dc
|| (cnt
= dc
->dc_cache_cnt
-
2625 _dispatch_continuation_cache_limit
) <= 0){
2629 next_dc
= dc
->do_next
;
2630 _dispatch_continuation_free_to_heap(dc
);
2631 } while (--cnt
&& (dc
= next_dc
));
2632 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
2636 DISPATCH_ALWAYS_INLINE_NDEBUG
2638 _dispatch_continuation_slow_item_signal(dispatch_queue_t dq
,
2639 dispatch_object_t dou
)
2641 dispatch_continuation_t dc
= dou
._dc
;
2642 pthread_priority_t pp
= dq
->dq_override
;
2644 _dispatch_trace_continuation_pop(dq
, dc
);
2645 if (pp
> (dc
->dc_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
2646 _dispatch_wqthread_override_start((mach_port_t
)dc
->dc_data
, pp
);
2648 _dispatch_thread_event_signal((dispatch_thread_event_t
)dc
->dc_other
);
2649 _dispatch_introspection_queue_item_complete(dc
);
2654 _dispatch_continuation_push(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2656 _dispatch_queue_push(dq
, dc
,
2657 _dispatch_continuation_get_override_priority(dq
, dc
));
2662 _dispatch_continuation_push_sync_slow(dispatch_queue_t dq
,
2663 dispatch_continuation_t dc
)
2665 _dispatch_queue_push_inline(dq
, dc
,
2666 _dispatch_continuation_get_override_priority(dq
, dc
),
2667 DISPATCH_WAKEUP_SLOW_WAITER
);
2670 DISPATCH_ALWAYS_INLINE
2672 _dispatch_continuation_async2(dispatch_queue_t dq
, dispatch_continuation_t dc
,
2675 if (fastpath(barrier
|| !DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
2676 return _dispatch_continuation_push(dq
, dc
);
2678 return _dispatch_async_f2(dq
, dc
);
2683 _dispatch_continuation_async(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2685 _dispatch_continuation_async2(dq
, dc
,
2686 dc
->dc_flags
& DISPATCH_OBJ_BARRIER_BIT
);
2690 #pragma mark dispatch_block_create
2694 DISPATCH_ALWAYS_INLINE
2696 _dispatch_block_flags_valid(dispatch_block_flags_t flags
)
2698 return ((flags
& ~DISPATCH_BLOCK_API_MASK
) == 0);
2701 DISPATCH_ALWAYS_INLINE
2702 static inline dispatch_block_flags_t
2703 _dispatch_block_normalize_flags(dispatch_block_flags_t flags
)
2705 if (flags
& (DISPATCH_BLOCK_NO_VOUCHER
|DISPATCH_BLOCK_DETACHED
)) {
2706 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2708 if (flags
& (DISPATCH_BLOCK_NO_QOS_CLASS
|DISPATCH_BLOCK_DETACHED
)) {
2709 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2714 static inline dispatch_block_t
2715 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags
,
2716 voucher_t voucher
, pthread_priority_t pri
, dispatch_block_t block
)
2718 flags
= _dispatch_block_normalize_flags(flags
);
2719 bool assign
= (flags
& DISPATCH_BLOCK_ASSIGN_CURRENT
);
2721 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_VOUCHER
)) {
2722 #if OS_VOUCHER_ACTIVITY_SPI
2723 voucher
= VOUCHER_CURRENT
;
2725 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2727 #if OS_VOUCHER_ACTIVITY_SPI
2728 if (voucher
== VOUCHER_CURRENT
) {
2729 voucher
= _voucher_get();
2732 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_PRIORITY
)) {
2733 pri
= _dispatch_priority_propagate();
2734 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2736 dispatch_block_t db
= _dispatch_block_create(flags
, voucher
, pri
, block
);
2738 dispatch_assert(_dispatch_block_get_data(db
));
2744 dispatch_block_create(dispatch_block_flags_t flags
, dispatch_block_t block
)
2746 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2747 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
, 0,
2752 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags
,
2753 dispatch_qos_class_t qos_class
, int relative_priority
,
2754 dispatch_block_t block
)
2756 if (!_dispatch_block_flags_valid(flags
) ||
2757 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2758 return DISPATCH_BAD_INPUT
;
2760 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2761 pthread_priority_t pri
= 0;
2762 #if HAVE_PTHREAD_WORKQUEUE_QOS
2763 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2765 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
,
2770 dispatch_block_create_with_voucher(dispatch_block_flags_t flags
,
2771 voucher_t voucher
, dispatch_block_t block
)
2773 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2774 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2775 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
, 0,
2780 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags
,
2781 voucher_t voucher
, dispatch_qos_class_t qos_class
,
2782 int relative_priority
, dispatch_block_t block
)
2784 if (!_dispatch_block_flags_valid(flags
) ||
2785 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2786 return DISPATCH_BAD_INPUT
;
2788 flags
|= (DISPATCH_BLOCK_HAS_VOUCHER
|DISPATCH_BLOCK_HAS_PRIORITY
);
2789 pthread_priority_t pri
= 0;
2790 #if HAVE_PTHREAD_WORKQUEUE_QOS
2791 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2793 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
,
2798 dispatch_block_perform(dispatch_block_flags_t flags
, dispatch_block_t block
)
2800 if (!_dispatch_block_flags_valid(flags
)) {
2801 DISPATCH_CLIENT_CRASH(flags
, "Invalid flags passed to "
2802 "dispatch_block_perform()");
2804 flags
= _dispatch_block_normalize_flags(flags
);
2805 struct dispatch_block_private_data_s dbpds
=
2806 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags
, block
);
2807 return _dispatch_block_invoke_direct(&dbpds
);
2810 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2813 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s
*dbcpd
)
2815 dispatch_block_private_data_t dbpd
= (dispatch_block_private_data_t
)dbcpd
;
2816 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2817 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2818 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2819 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2820 "run more than once and waited for");
2822 if (atomic_flags
& DBF_CANCELED
) goto out
;
2824 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2825 _dispatch_thread_set_self_t adopt_flags
= 0;
2826 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2827 op
= _dispatch_get_priority();
2828 p
= dbpd
->dbpd_priority
;
2829 if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
2830 adopt_flags
|= DISPATCH_PRIORITY_ENFORCE
;
2833 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2834 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2835 v
= dbpd
->dbpd_voucher
;
2837 ov
= _dispatch_adopt_priority_and_set_voucher(p
, v
, adopt_flags
);
2838 dbpd
->dbpd_thread
= _dispatch_tid_self();
2839 _dispatch_client_callout(dbpd
->dbpd_block
,
2840 _dispatch_Block_invoke(dbpd
->dbpd_block
));
2841 _dispatch_reset_priority_and_voucher(op
, ov
);
2843 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2844 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2845 dispatch_group_leave(_dbpd_group(dbpd
));
2851 _dispatch_block_sync_invoke(void *block
)
2853 dispatch_block_t b
= block
;
2854 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2855 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2856 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2857 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2858 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2859 "run more than once and waited for");
2861 if (atomic_flags
& DBF_CANCELED
) goto out
;
2863 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2864 _dispatch_thread_set_self_t adopt_flags
= 0;
2865 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2866 op
= _dispatch_get_priority();
2867 p
= dbpd
->dbpd_priority
;
2868 if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
2869 adopt_flags
|= DISPATCH_PRIORITY_ENFORCE
;
2872 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2873 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2874 v
= dbpd
->dbpd_voucher
;
2876 ov
= _dispatch_adopt_priority_and_set_voucher(p
, v
, adopt_flags
);
2878 _dispatch_reset_priority_and_voucher(op
, ov
);
2880 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2881 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2882 dispatch_group_leave(_dbpd_group(dbpd
));
2887 oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2889 // balances dispatch_{,barrier_,}sync
2890 _os_object_release_internal(oq
->_as_os_obj
);
2894 DISPATCH_ALWAYS_INLINE
2896 _dispatch_block_async_invoke2(dispatch_block_t b
, bool release
)
2898 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2899 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2900 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2901 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2902 "run more than once and waited for");
2904 if (!slowpath(atomic_flags
& DBF_CANCELED
)) {
2907 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2908 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2909 dispatch_group_leave(_dbpd_group(dbpd
));
2913 oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2915 // balances dispatch_{,barrier_,group_}async
2916 _os_object_release_internal_inline(oq
->_as_os_obj
);
2924 _dispatch_block_async_invoke(void *block
)
2926 _dispatch_block_async_invoke2(block
, false);
2930 _dispatch_block_async_invoke_and_release(void *block
)
2932 _dispatch_block_async_invoke2(block
, true);
2936 dispatch_block_cancel(dispatch_block_t db
)
2938 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2940 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2941 "dispatch_block_cancel()");
2943 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
, DBF_CANCELED
, relaxed
);
2947 dispatch_block_testcancel(dispatch_block_t db
)
2949 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2951 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2952 "dispatch_block_testcancel()");
2954 return (bool)(dbpd
->dbpd_atomic_flags
& DBF_CANCELED
);
2958 dispatch_block_wait(dispatch_block_t db
, dispatch_time_t timeout
)
2960 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2962 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2963 "dispatch_block_wait()");
2966 unsigned int flags
= os_atomic_or_orig2o(dbpd
, dbpd_atomic_flags
,
2967 DBF_WAITING
, relaxed
);
2968 if (slowpath(flags
& (DBF_WAITED
| DBF_WAITING
))) {
2969 DISPATCH_CLIENT_CRASH(flags
, "A block object may not be waited for "
2973 // <rdar://problem/17703192> If we know the queue where this block is
2974 // enqueued, or the thread that's executing it, then we should boost
2977 pthread_priority_t pp
= _dispatch_get_priority();
2979 os_mpsc_queue_t boost_oq
;
2980 boost_oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2982 // release balances dispatch_{,barrier_,group_}async.
2983 // Can't put the queue back in the timeout case: the block might
2984 // finish after we fell out of group_wait and see our NULL, so
2985 // neither of us would ever release. Side effect: After a _wait
2986 // that times out, subsequent waits will not boost the qos of the
2987 // still-running block.
2988 dx_wakeup(boost_oq
, pp
, DISPATCH_WAKEUP_OVERRIDING
|
2989 DISPATCH_WAKEUP_CONSUME
);
2992 mach_port_t boost_th
= dbpd
->dbpd_thread
;
2994 _dispatch_thread_override_start(boost_th
, pp
, dbpd
);
2997 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
2998 if (slowpath(performed
> 1 || (boost_th
&& boost_oq
))) {
2999 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3000 "run more than once and waited for");
3003 long ret
= dispatch_group_wait(_dbpd_group(dbpd
), timeout
);
3006 _dispatch_thread_override_end(boost_th
, dbpd
);
3010 // timed out: reverse our changes
3011 (void)os_atomic_and2o(dbpd
, dbpd_atomic_flags
,
3012 ~DBF_WAITING
, relaxed
);
3014 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
,
3015 DBF_WAITED
, relaxed
);
3016 // don't need to re-test here: the second call would see
3017 // the first call's WAITING
3024 dispatch_block_notify(dispatch_block_t db
, dispatch_queue_t queue
,
3025 dispatch_block_t notification_block
)
3027 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3029 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3030 "dispatch_block_notify()");
3032 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
3033 if (slowpath(performed
> 1)) {
3034 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3035 "run more than once and observed");
3038 return dispatch_group_notify(_dbpd_group(dbpd
), queue
, notification_block
);
3043 _dispatch_continuation_init_slow(dispatch_continuation_t dc
,
3044 dispatch_queue_class_t dqu
, dispatch_block_flags_t flags
)
3046 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(dc
->dc_ctxt
);
3047 dispatch_block_flags_t block_flags
= dbpd
->dbpd_flags
;
3048 uintptr_t dc_flags
= dc
->dc_flags
;
3049 os_mpsc_queue_t oq
= dqu
._oq
;
3051 // balanced in d_block_async_invoke_and_release or d_block_wait
3052 if (os_atomic_cmpxchg2o(dbpd
, dbpd_queue
, NULL
, oq
, relaxed
)) {
3053 _os_object_retain_internal_inline(oq
->_as_os_obj
);
3056 if (dc_flags
& DISPATCH_OBJ_CONSUME_BIT
) {
3057 dc
->dc_func
= _dispatch_block_async_invoke_and_release
;
3059 dc
->dc_func
= _dispatch_block_async_invoke
;
3062 flags
|= block_flags
;
3063 if (block_flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3064 _dispatch_continuation_priority_set(dc
, dbpd
->dbpd_priority
, flags
);
3066 _dispatch_continuation_priority_set(dc
, dc
->dc_priority
, flags
);
3068 if (block_flags
& DISPATCH_BLOCK_BARRIER
) {
3069 dc_flags
|= DISPATCH_OBJ_BARRIER_BIT
;
3071 if (block_flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
3072 voucher_t v
= dbpd
->dbpd_voucher
;
3073 dc
->dc_voucher
= v
? _voucher_retain(v
) : NULL
;
3074 dc_flags
|= DISPATCH_OBJ_ENFORCE_VOUCHER
;
3075 _dispatch_voucher_debug("continuation[%p] set", dc
->dc_voucher
, dc
);
3076 _dispatch_voucher_ktrace_dc_push(dc
);
3078 _dispatch_continuation_voucher_set(dc
, oq
, flags
);
3080 dc_flags
|= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
;
3081 dc
->dc_flags
= dc_flags
;
3085 _dispatch_continuation_update_bits(dispatch_continuation_t dc
,
3088 dc
->dc_flags
= dc_flags
;
3089 if (dc_flags
& DISPATCH_OBJ_CONSUME_BIT
) {
3090 if (dc_flags
& DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
) {
3091 dc
->dc_func
= _dispatch_block_async_invoke_and_release
;
3092 } else if (dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
3093 dc
->dc_func
= _dispatch_call_block_and_release
;
3096 if (dc_flags
& DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
) {
3097 dc
->dc_func
= _dispatch_block_async_invoke
;
3098 } else if (dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
3099 dc
->dc_func
= _dispatch_Block_invoke(dc
->dc_ctxt
);
3104 #endif // __BLOCKS__
3107 #pragma mark dispatch_barrier_async
3111 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
3112 dispatch_function_t func
, pthread_priority_t pp
,
3113 dispatch_block_flags_t flags
, uintptr_t dc_flags
)
3115 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
3116 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3117 _dispatch_continuation_async(dq
, dc
);
3120 DISPATCH_ALWAYS_INLINE
3122 _dispatch_barrier_async_f2(dispatch_queue_t dq
, void *ctxt
,
3123 dispatch_function_t func
, pthread_priority_t pp
,
3124 dispatch_block_flags_t flags
)
3126 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3127 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3129 if (!fastpath(dc
)) {
3130 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3133 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3134 _dispatch_continuation_push(dq
, dc
);
3139 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
3140 dispatch_function_t func
)
3142 _dispatch_barrier_async_f2(dq
, ctxt
, func
, 0, 0);
3147 _dispatch_barrier_async_detached_f(dispatch_queue_t dq
, void *ctxt
,
3148 dispatch_function_t func
)
3150 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3151 dc
->dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3154 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3155 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3156 _dispatch_queue_push(dq
, dc
, 0);
3161 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
3163 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3164 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3166 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3167 _dispatch_continuation_push(dq
, dc
);
3172 #pragma mark dispatch_async
3175 _dispatch_async_redirect_invoke(dispatch_continuation_t dc
,
3176 dispatch_invoke_flags_t flags
)
3178 dispatch_thread_frame_s dtf
;
3179 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
3180 dispatch_invoke_flags_t ctxt_flags
= (dispatch_invoke_flags_t
)dc
->dc_ctxt
;
3181 // if we went through _dispatch_root_queue_push_override,
3182 // the "right" root queue was stuffed into dc_func
3183 dispatch_queue_t assumed_rq
= (dispatch_queue_t
)dc
->dc_func
;
3184 dispatch_queue_t dq
= dc
->dc_data
, rq
, old_dq
;
3185 struct _dispatch_identity_s di
;
3187 pthread_priority_t op
, dp
, old_dp
;
3190 flags
&= ~_DISPATCH_INVOKE_AUTORELEASE_MASK
;
3191 flags
|= ctxt_flags
;
3193 old_dq
= _dispatch_get_current_queue();
3195 _dispatch_queue_set_current(assumed_rq
);
3196 _dispatch_root_queue_identity_assume(&di
, 0);
3199 old_dp
= _dispatch_set_defaultpriority(dq
->dq_priority
, &dp
);
3200 op
= dq
->dq_override
;
3201 if (op
> (dp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
3202 _dispatch_wqthread_override_start(_dispatch_tid_self(), op
);
3203 // Ensure that the root queue sees that this thread was overridden.
3204 _dispatch_set_defaultpriority_override();
3207 _dispatch_thread_frame_push(&dtf
, dq
);
3208 _dispatch_continuation_pop_forwarded(dc
, DISPATCH_NO_VOUCHER
,
3209 DISPATCH_OBJ_CONSUME_BIT
, {
3210 _dispatch_continuation_pop(other_dc
, dq
, flags
);
3212 _dispatch_thread_frame_pop(&dtf
);
3214 _dispatch_root_queue_identity_restore(&di
);
3215 _dispatch_queue_set_current(old_dq
);
3217 _dispatch_reset_defaultpriority(old_dp
);
3219 rq
= dq
->do_targetq
;
3220 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
3221 _dispatch_non_barrier_complete(rq
);
3222 rq
= rq
->do_targetq
;
3225 _dispatch_non_barrier_complete(dq
);
3227 if (dtf
.dtf_deferred
) {
3228 struct dispatch_object_s
*dou
= dtf
.dtf_deferred
;
3229 return _dispatch_queue_drain_deferred_invoke(dq
, flags
, 0, dou
);
3232 _dispatch_release_tailcall(dq
);
3235 DISPATCH_ALWAYS_INLINE
3236 static inline dispatch_continuation_t
3237 _dispatch_async_redirect_wrap(dispatch_queue_t dq
, dispatch_object_t dou
)
3239 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3241 dou
._do
->do_next
= NULL
;
3242 dc
->do_vtable
= DC_VTABLE(ASYNC_REDIRECT
);
3244 dc
->dc_ctxt
= (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3246 dc
->dc_other
= dou
._do
;
3247 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3248 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3249 _dispatch_retain(dq
);
3255 _dispatch_async_f_redirect(dispatch_queue_t dq
,
3256 dispatch_object_t dou
, pthread_priority_t pp
)
3258 if (!slowpath(_dispatch_object_is_redirection(dou
))) {
3259 dou
._dc
= _dispatch_async_redirect_wrap(dq
, dou
);
3261 dq
= dq
->do_targetq
;
3263 // Find the queue to redirect to
3264 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
3265 if (!fastpath(_dispatch_queue_try_acquire_async(dq
))) {
3268 if (!dou
._dc
->dc_ctxt
) {
3269 // find first queue in descending target queue order that has
3270 // an autorelease frequency set, and use that as the frequency for
3271 // this continuation.
3272 dou
._dc
->dc_ctxt
= (void *)
3273 (uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3275 dq
= dq
->do_targetq
;
3278 _dispatch_queue_push(dq
, dou
, pp
);
3281 DISPATCH_ALWAYS_INLINE
3283 _dispatch_continuation_redirect(dispatch_queue_t dq
,
3284 struct dispatch_object_s
*dc
)
3286 _dispatch_trace_continuation_pop(dq
, dc
);
3287 // This is a re-redirect, overrides have already been applied
3288 // by _dispatch_async_f2.
3289 // However we want to end up on the root queue matching `dc` qos, so pick up
3290 // the current override of `dq` which includes dc's overrde (and maybe more)
3291 _dispatch_async_f_redirect(dq
, dc
, dq
->dq_override
);
3292 _dispatch_introspection_queue_item_complete(dc
);
3297 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
3299 // <rdar://problem/24738102&24743140> reserving non barrier width
3300 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3301 // equivalent), so we have to check that this thread hasn't enqueued
3302 // anything ahead of this call or we can break ordering
3303 if (slowpath(dq
->dq_items_tail
)) {
3304 return _dispatch_continuation_push(dq
, dc
);
3307 if (slowpath(!_dispatch_queue_try_acquire_async(dq
))) {
3308 return _dispatch_continuation_push(dq
, dc
);
3311 return _dispatch_async_f_redirect(dq
, dc
,
3312 _dispatch_continuation_get_override_priority(dq
, dc
));
3315 DISPATCH_ALWAYS_INLINE
3317 _dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3318 pthread_priority_t pp
, dispatch_block_flags_t flags
)
3320 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3321 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3323 if (!fastpath(dc
)) {
3324 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3327 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3328 _dispatch_continuation_async2(dq
, dc
, false);
3333 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3335 _dispatch_async_f(dq
, ctxt
, func
, 0, 0);
3340 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq
, void *ctxt
,
3341 dispatch_function_t func
)
3343 _dispatch_async_f(dq
, ctxt
, func
, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS
);
3348 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
3350 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3351 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3353 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3354 _dispatch_continuation_async(dq
, dc
);
3359 #pragma mark dispatch_group_async
3361 DISPATCH_ALWAYS_INLINE
3363 _dispatch_continuation_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3364 dispatch_continuation_t dc
)
3366 dispatch_group_enter(dg
);
3368 _dispatch_continuation_async(dq
, dc
);
3373 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
3374 dispatch_function_t func
)
3376 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3377 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3379 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, 0, 0, dc_flags
);
3380 _dispatch_continuation_group_async(dg
, dq
, dc
);
3385 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3386 dispatch_block_t db
)
3388 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3389 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3391 _dispatch_continuation_init(dc
, dq
, db
, 0, 0, dc_flags
);
3392 _dispatch_continuation_group_async(dg
, dq
, dc
);
3397 #pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
3401 _dispatch_sync_function_invoke_slow(dispatch_queue_t dq
, void *ctxt
,
3402 dispatch_function_t func
)
3405 dispatch_thread_frame_s dtf
;
3406 _dispatch_thread_frame_push(&dtf
, dq
);
3407 ov
= _dispatch_set_priority_and_voucher(0, dq
->dq_override_voucher
, 0);
3408 _dispatch_client_callout(ctxt
, func
);
3409 _dispatch_perfmon_workitem_inc();
3410 _dispatch_reset_voucher(ov
, 0);
3411 _dispatch_thread_frame_pop(&dtf
);
3414 DISPATCH_ALWAYS_INLINE
3416 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3417 dispatch_function_t func
)
3419 if (slowpath(dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
)) {
3420 return _dispatch_sync_function_invoke_slow(dq
, ctxt
, func
);
3422 dispatch_thread_frame_s dtf
;
3423 _dispatch_thread_frame_push(&dtf
, dq
);
3424 _dispatch_client_callout(ctxt
, func
);
3425 _dispatch_perfmon_workitem_inc();
3426 _dispatch_thread_frame_pop(&dtf
);
3431 _dispatch_sync_function_invoke(dispatch_queue_t dq
, void *ctxt
,
3432 dispatch_function_t func
)
3434 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3438 _dispatch_sync_recurse_invoke(void *ctxt
)
3440 dispatch_continuation_t dc
= ctxt
;
3441 _dispatch_sync_function_invoke(dc
->dc_data
, dc
->dc_ctxt
, dc
->dc_func
);
3444 DISPATCH_ALWAYS_INLINE
3446 _dispatch_sync_function_recurse(dispatch_queue_t dq
, void *ctxt
,
3447 dispatch_function_t func
, pthread_priority_t pp
)
3449 struct dispatch_continuation_s dc
= {
3454 _dispatch_sync_f(dq
->do_targetq
, &dc
, _dispatch_sync_recurse_invoke
, pp
);
3459 _dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
3460 dispatch_function_t func
)
3462 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3463 _dispatch_non_barrier_complete(dq
);
3468 _dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
3469 dispatch_function_t func
, pthread_priority_t pp
)
3471 _dispatch_sync_function_recurse(dq
, ctxt
, func
, pp
);
3472 _dispatch_non_barrier_complete(dq
);
3475 DISPATCH_ALWAYS_INLINE
3477 _dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3478 dispatch_function_t func
, pthread_priority_t pp
)
3480 _dispatch_introspection_non_barrier_sync_begin(dq
, func
);
3481 if (slowpath(dq
->do_targetq
->do_targetq
)) {
3482 return _dispatch_non_barrier_sync_f_recurse(dq
, ctxt
, func
, pp
);
3484 _dispatch_non_barrier_sync_f_invoke(dq
, ctxt
, func
);
3488 #pragma mark dispatch_barrier_sync
3492 _dispatch_barrier_complete(dispatch_queue_t dq
)
3494 uint64_t owned
= DISPATCH_QUEUE_IN_BARRIER
+
3495 dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3497 if (slowpath(dq
->dq_items_tail
)) {
3498 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3501 if (!fastpath(_dispatch_queue_drain_try_unlock(dq
, owned
))) {
3502 // someone enqueued a slow item at the head
3503 // looping may be its last chance
3504 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3510 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
3511 dispatch_function_t func
, pthread_priority_t pp
)
3513 _dispatch_sync_function_recurse(dq
, ctxt
, func
, pp
);
3514 _dispatch_barrier_complete(dq
);
3519 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
3520 dispatch_function_t func
)
3522 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3523 _dispatch_barrier_complete(dq
);
3526 DISPATCH_ALWAYS_INLINE
3528 _dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3529 dispatch_function_t func
, pthread_priority_t pp
)
3531 _dispatch_introspection_barrier_sync_begin(dq
, func
);
3532 if (slowpath(dq
->do_targetq
->do_targetq
)) {
3533 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
, pp
);
3535 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
3538 typedef struct dispatch_barrier_sync_context_s
{
3539 struct dispatch_continuation_s dbsc_dc
;
3540 dispatch_thread_frame_s dbsc_dtf
;
3541 } *dispatch_barrier_sync_context_t
;
3544 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
3546 dispatch_barrier_sync_context_t dbsc
= ctxt
;
3547 dispatch_continuation_t dc
= &dbsc
->dbsc_dc
;
3548 dispatch_queue_t dq
= dc
->dc_data
;
3549 dispatch_thread_event_t event
= (dispatch_thread_event_t
)dc
->dc_other
;
3551 dispatch_assert(dq
== _dispatch_queue_get_current());
3552 #if DISPATCH_COCOA_COMPAT
3553 if (slowpath(_dispatch_queue_is_thread_bound(dq
))) {
3554 dispatch_assert(_dispatch_thread_frame_get_current() == NULL
);
3556 // the block runs on the thread the queue is bound to and not
3557 // on the calling thread, but we mean to see the calling thread
3558 // dispatch thread frames, so we fake the link, and then undo it
3559 _dispatch_thread_frame_set_current(&dbsc
->dbsc_dtf
);
3560 // The queue is bound to a non-dispatch thread (e.g. main thread)
3561 _dispatch_continuation_voucher_adopt(dc
, DISPATCH_NO_VOUCHER
,
3562 DISPATCH_OBJ_CONSUME_BIT
);
3563 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
3564 os_atomic_store2o(dc
, dc_func
, NULL
, release
);
3565 _dispatch_thread_frame_set_current(NULL
);
3568 _dispatch_thread_event_signal(event
); // release
3573 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
3574 dispatch_function_t func
, pthread_priority_t pp
)
3576 if (slowpath(!dq
->do_targetq
)) {
3577 // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
3578 return _dispatch_sync_function_invoke(dq
, ctxt
, func
);
3582 pp
= _dispatch_get_priority();
3583 pp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3584 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3586 dispatch_thread_event_s event
;
3587 _dispatch_thread_event_init(&event
);
3588 struct dispatch_barrier_sync_context_s dbsc
= {
3591 #if DISPATCH_COCOA_COMPAT
3598 #if DISPATCH_COCOA_COMPAT
3599 // It's preferred to execute synchronous blocks on the current thread
3600 // due to thread-local side effects, etc. However, blocks submitted
3601 // to the main thread MUST be run on the main thread
3602 if (slowpath(_dispatch_queue_is_thread_bound(dq
))) {
3603 // consumed by _dispatch_barrier_sync_f_slow_invoke
3604 // or in the DISPATCH_COCOA_COMPAT hunk below
3605 _dispatch_continuation_voucher_set(&dbsc
.dbsc_dc
, dq
, 0);
3606 // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
3607 _dispatch_thread_frame_save_state(&dbsc
.dbsc_dtf
);
3608 // thread bound queues cannot mutate their target queue hierarchy
3609 // so it's fine to look now
3610 _dispatch_introspection_barrier_sync_begin(dq
, func
);
3613 uint32_t th_self
= _dispatch_tid_self();
3614 struct dispatch_continuation_s dbss
= {
3615 .dc_flags
= DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
,
3616 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
3618 .dc_data
= (void*)(uintptr_t)th_self
,
3621 .dc_voucher
= DISPATCH_NO_VOUCHER
,
3624 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3625 if (unlikely(_dq_state_drain_locked_by(dq_state
, th_self
))) {
3626 DISPATCH_CLIENT_CRASH(dq
, "dispatch_barrier_sync called on queue "
3627 "already owned by current thread");
3630 _dispatch_continuation_push_sync_slow(dq
, &dbss
);
3631 _dispatch_thread_event_wait(&event
); // acquire
3632 _dispatch_thread_event_destroy(&event
);
3633 if (_dispatch_queue_received_override(dq
, pp
)) {
3634 // Ensure that the root queue sees that this thread was overridden.
3635 // pairs with the _dispatch_wqthread_override_start in
3636 // _dispatch_continuation_slow_item_signal
3637 _dispatch_set_defaultpriority_override();
3640 #if DISPATCH_COCOA_COMPAT
3641 // Queue bound to a non-dispatch thread
3642 if (dbsc
.dbsc_dc
.dc_func
== NULL
) {
3644 } else if (dbsc
.dbsc_dc
.dc_voucher
) {
3645 // this almost never happens, unless a dispatch_sync() onto a thread
3646 // bound queue went to the slow path at the same time dispatch_main()
3647 // is called, or the queue is detached from the runloop.
3648 _voucher_release(dbsc
.dbsc_dc
.dc_voucher
);
3652 _dispatch_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3655 DISPATCH_ALWAYS_INLINE
3657 _dispatch_barrier_sync_f2(dispatch_queue_t dq
, void *ctxt
,
3658 dispatch_function_t func
, pthread_priority_t pp
)
3660 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3661 // global concurrent queues and queues bound to non-dispatch threads
3662 // always fall into the slow case
3663 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, pp
);
3666 // TODO: the more correct thing to do would be to set dq_override to the qos
3667 // of the thread that just acquired the barrier lock here. Unwinding that
3668 // would slow down the uncontended fastpath however.
3670 // The chosen tradeoff is that if an enqueue on a lower priority thread
3671 // contends with this fastpath, this thread may receive a useless override.
3672 // Improving this requires the override level to be part of the atomic
3675 _dispatch_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3680 _dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
3681 dispatch_function_t func
, pthread_priority_t pp
)
3683 _dispatch_barrier_sync_f2(dq
, ctxt
, func
, pp
);
3688 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
3689 dispatch_function_t func
)
3691 _dispatch_barrier_sync_f2(dq
, ctxt
, func
, 0);
3697 _dispatch_sync_block_with_private_data(dispatch_queue_t dq
,
3698 void (^work
)(void), dispatch_block_flags_t flags
)
3700 pthread_priority_t pp
= _dispatch_block_get_priority(work
);
3702 flags
|= _dispatch_block_get_flags(work
);
3703 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3704 pthread_priority_t tp
= _dispatch_get_priority();
3705 tp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3707 pp
= tp
| _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3708 } else if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
3709 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3712 // balanced in d_block_sync_invoke or d_block_wait
3713 if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
3714 dbpd_queue
, NULL
, dq
->_as_oq
, relaxed
)) {
3715 _dispatch_retain(dq
);
3717 if (flags
& DISPATCH_BLOCK_BARRIER
) {
3718 _dispatch_barrier_sync_f(dq
, work
, _dispatch_block_sync_invoke
, pp
);
3720 _dispatch_sync_f(dq
, work
, _dispatch_block_sync_invoke
, pp
);
3725 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
3727 if (slowpath(_dispatch_block_has_private_data(work
))) {
3728 dispatch_block_flags_t flags
= DISPATCH_BLOCK_BARRIER
;
3729 return _dispatch_sync_block_with_private_data(dq
, work
, flags
);
3731 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
3737 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq
, void *ctxt
,
3738 dispatch_function_t func
)
3740 // Use for mutation of queue-/source-internal state only, ignores target
3742 if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3743 return _dispatch_barrier_async_detached_f(dq
, ctxt
, func
);
3745 // skip the recursion because it's about the queue state only
3746 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
3750 #pragma mark dispatch_sync
3754 _dispatch_non_barrier_complete(dispatch_queue_t dq
)
3756 uint64_t old_state
, new_state
;
3758 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
3759 new_state
= old_state
- DISPATCH_QUEUE_WIDTH_INTERVAL
;
3760 if (_dq_state_is_runnable(new_state
)) {
3761 if (!_dq_state_is_runnable(old_state
)) {
3762 // we're making a FULL -> non FULL transition
3763 new_state
|= DISPATCH_QUEUE_DIRTY
;
3765 if (!_dq_state_drain_locked(new_state
)) {
3766 uint64_t full_width
= new_state
;
3767 if (_dq_state_has_pending_barrier(new_state
)) {
3768 full_width
-= DISPATCH_QUEUE_PENDING_BARRIER
;
3769 full_width
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
3770 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3772 full_width
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3773 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3775 if ((full_width
& DISPATCH_QUEUE_WIDTH_MASK
) ==
3776 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
3777 new_state
= full_width
;
3778 new_state
&= ~DISPATCH_QUEUE_DIRTY
;
3779 new_state
|= _dispatch_tid_self();
3785 if (_dq_state_is_in_barrier(new_state
)) {
3786 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3788 if (!_dq_state_is_runnable(old_state
)) {
3789 _dispatch_queue_try_wakeup(dq
, new_state
,
3790 DISPATCH_WAKEUP_WAITER_HANDOFF
);
3796 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3797 pthread_priority_t pp
)
3799 dispatch_assert(dq
->do_targetq
);
3801 pp
= _dispatch_get_priority();
3802 pp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3803 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3805 dispatch_thread_event_s event
;
3806 _dispatch_thread_event_init(&event
);
3807 uint32_t th_self
= _dispatch_tid_self();
3808 struct dispatch_continuation_s dc
= {
3809 .dc_flags
= DISPATCH_OBJ_SYNC_SLOW_BIT
,
3810 #if DISPATCH_INTROSPECTION
3814 .dc_data
= (void*)(uintptr_t)th_self
,
3817 .dc_voucher
= DISPATCH_NO_VOUCHER
,
3820 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3821 if (unlikely(_dq_state_drain_locked_by(dq_state
, th_self
))) {
3822 DISPATCH_CLIENT_CRASH(dq
, "dispatch_sync called on queue "
3823 "already owned by current thread");
3826 _dispatch_continuation_push_sync_slow(dq
, &dc
);
3827 _dispatch_thread_event_wait(&event
); // acquire
3828 _dispatch_thread_event_destroy(&event
);
3829 if (_dispatch_queue_received_override(dq
, pp
)) {
3830 // Ensure that the root queue sees that this thread was overridden.
3831 // pairs with the _dispatch_wqthread_override_start in
3832 // _dispatch_continuation_slow_item_signal
3833 _dispatch_set_defaultpriority_override();
3835 _dispatch_non_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3838 DISPATCH_ALWAYS_INLINE
3840 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3841 pthread_priority_t pp
)
3843 // <rdar://problem/24738102&24743140> reserving non barrier width
3844 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3845 // equivalent), so we have to check that this thread hasn't enqueued
3846 // anything ahead of this call or we can break ordering
3847 if (slowpath(dq
->dq_items_tail
)) {
3848 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
);
3850 // concurrent queues do not respect width on sync
3851 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq
))) {
3852 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
);
3854 _dispatch_non_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3859 _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3860 pthread_priority_t pp
)
3862 if (DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
)) {
3863 return _dispatch_sync_f2(dq
, ctxt
, func
, pp
);
3865 return _dispatch_barrier_sync_f(dq
, ctxt
, func
, pp
);
3870 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3872 if (DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
)) {
3873 return _dispatch_sync_f2(dq
, ctxt
, func
, 0);
3875 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
3880 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
3882 if (slowpath(_dispatch_block_has_private_data(work
))) {
3883 return _dispatch_sync_block_with_private_data(dq
, work
, 0);
3885 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
3890 #pragma mark dispatch_trysync
3892 struct trysync_context
{
3893 dispatch_queue_t tc_dq
;
3895 dispatch_function_t tc_func
;
3900 _dispatch_trysync_recurse(dispatch_queue_t dq
,
3901 struct trysync_context
*tc
, bool barrier
)
3903 dispatch_queue_t tq
= dq
->do_targetq
;
3906 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3910 // <rdar://problem/24743140> check nothing was queued by the current
3911 // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
3912 // ignores the ENQUEUED bit which could cause it to miss a barrier_async
3913 // made by the same thread just before.
3914 if (slowpath(dq
->dq_items_tail
)) {
3917 // concurrent queues do not respect width on sync
3918 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq
))) {
3924 if (_dispatch_queue_cannot_trysync(tq
)) {
3925 _dispatch_queue_atomic_flags_set(dq
, DQF_CANNOT_TRYSYNC
);
3927 } else if (tq
->do_targetq
) {
3928 rc
= _dispatch_trysync_recurse(tq
, tc
, tq
->dq_width
== 1);
3929 if (rc
== ENOTSUP
) {
3930 _dispatch_queue_atomic_flags_set(dq
, DQF_CANNOT_TRYSYNC
);
3933 dispatch_thread_frame_s dtf
;
3934 _dispatch_thread_frame_push(&dtf
, tq
);
3935 _dispatch_sync_function_invoke(tc
->tc_dq
, tc
->tc_ctxt
, tc
->tc_func
);
3936 _dispatch_thread_frame_pop(&dtf
);
3939 _dispatch_barrier_complete(dq
);
3941 _dispatch_non_barrier_complete(dq
);
3948 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
3949 dispatch_function_t f
)
3951 if (slowpath(!dq
->do_targetq
)) {
3952 _dispatch_sync_function_invoke(dq
, ctxt
, f
);
3955 if (slowpath(_dispatch_queue_cannot_trysync(dq
))) {
3958 struct trysync_context tc
= {
3963 return _dispatch_trysync_recurse(dq
, &tc
, true) == 0;
3968 _dispatch_trysync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t f
)
3970 if (slowpath(!dq
->do_targetq
)) {
3971 _dispatch_sync_function_invoke(dq
, ctxt
, f
);
3974 if (slowpath(_dispatch_queue_cannot_trysync(dq
))) {
3977 struct trysync_context tc
= {
3982 return _dispatch_trysync_recurse(dq
, &tc
, dq
->dq_width
== 1) == 0;
3986 #pragma mark dispatch_after
3988 DISPATCH_ALWAYS_INLINE
3990 _dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
3991 void *ctxt
, void *handler
, bool block
)
3993 dispatch_source_t ds
;
3994 uint64_t leeway
, delta
;
3996 if (when
== DISPATCH_TIME_FOREVER
) {
3998 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
4003 delta
= _dispatch_timeout(when
);
4006 return dispatch_async(queue
, handler
);
4008 return dispatch_async_f(queue
, ctxt
, handler
);
4010 leeway
= delta
/ 10; // <rdar://problem/13447496>
4012 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
4013 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
4015 // this function can and should be optimized to not use a dispatch source
4016 ds
= dispatch_source_create(&_dispatch_source_type_after
, 0, 0, queue
);
4017 dispatch_assert(ds
);
4019 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4021 _dispatch_continuation_init(dc
, ds
, handler
, 0, 0, 0);
4023 _dispatch_continuation_init_f(dc
, ds
, ctxt
, handler
, 0, 0, 0);
4025 // reference `ds` so that it doesn't show up as a leak
4027 _dispatch_source_set_event_handler_continuation(ds
, dc
);
4028 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, leeway
);
4029 dispatch_activate(ds
);
4034 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
4035 dispatch_function_t func
)
4037 _dispatch_after(when
, queue
, ctxt
, func
, false);
4042 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
4043 dispatch_block_t work
)
4045 _dispatch_after(when
, queue
, NULL
, work
, true);
4050 #pragma mark dispatch_queue_wakeup
4054 _dispatch_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4055 dispatch_wakeup_flags_t flags
)
4057 dispatch_queue_wakeup_target_t target
= DISPATCH_QUEUE_WAKEUP_NONE
;
4059 if (_dispatch_queue_class_probe(dq
)) {
4060 target
= DISPATCH_QUEUE_WAKEUP_TARGET
;
4063 return _dispatch_queue_class_wakeup(dq
, pp
, flags
, target
);
4065 return _dispatch_queue_class_override_drainer(dq
, pp
, flags
);
4066 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4067 return _dispatch_release_tailcall(dq
);
4071 #if DISPATCH_COCOA_COMPAT
4072 DISPATCH_ALWAYS_INLINE
4074 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle
)
4077 return MACH_PORT_VALID(handle
);
4078 #elif defined(__linux__)
4081 #error "runloop support not implemented on this platform"
4085 DISPATCH_ALWAYS_INLINE
4086 static inline dispatch_runloop_handle_t
4087 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq
)
4090 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
);
4091 #elif defined(__linux__)
4092 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4093 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
) - 1;
4095 #error "runloop support not implemented on this platform"
4099 DISPATCH_ALWAYS_INLINE
4101 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq
, dispatch_runloop_handle_t handle
)
4104 dq
->do_ctxt
= (void *)(uintptr_t)handle
;
4105 #elif defined(__linux__)
4106 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4107 dq
->do_ctxt
= (void *)(uintptr_t)(handle
+ 1);
4109 #error "runloop support not implemented on this platform"
4112 #endif // DISPATCH_COCOA_COMPAT
4115 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4116 dispatch_wakeup_flags_t flags
)
4118 #if DISPATCH_COCOA_COMPAT
4119 if (slowpath(_dispatch_queue_atomic_flags(dq
) & DQF_RELEASED
)) {
4120 // <rdar://problem/14026816>
4121 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4124 if (_dispatch_queue_class_probe(dq
)) {
4125 return _dispatch_runloop_queue_poke(dq
, pp
, flags
);
4128 pp
= _dispatch_queue_reset_override_priority(dq
, true);
4130 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4131 if (_dispatch_queue_class_probe(dq
)) {
4132 _dispatch_runloop_queue_poke(dq
, pp
, flags
);
4134 _dispatch_thread_override_end(owner
, dq
);
4137 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4138 return _dispatch_release_tailcall(dq
);
4141 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4146 _dispatch_main_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4147 dispatch_wakeup_flags_t flags
)
4149 #if DISPATCH_COCOA_COMPAT
4150 if (_dispatch_queue_is_thread_bound(dq
)) {
4151 return _dispatch_runloop_queue_wakeup(dq
, pp
, flags
);
4154 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4158 _dispatch_root_queue_wakeup(dispatch_queue_t dq
,
4159 pthread_priority_t pp DISPATCH_UNUSED
,
4160 dispatch_wakeup_flags_t flags
)
4162 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4163 // see _dispatch_queue_push_set_head
4164 dispatch_assert(flags
& DISPATCH_WAKEUP_FLUSH
);
4166 _dispatch_global_queue_poke(dq
);
4170 #pragma mark dispatch root queues poke
4172 #if DISPATCH_COCOA_COMPAT
4174 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq
)
4176 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
4177 if (!_dispatch_runloop_handle_is_valid(handle
)) {
4182 mach_port_t mp
= handle
;
4183 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
4185 case MACH_SEND_TIMEOUT
:
4186 case MACH_SEND_TIMED_OUT
:
4187 case MACH_SEND_INVALID_DEST
:
4190 (void)dispatch_assume_zero(kr
);
4193 #elif defined(__linux__)
4196 result
= eventfd_write(handle
, 1);
4197 } while (result
== -1 && errno
== EINTR
);
4198 (void)dispatch_assume_zero(result
);
4200 #error "runloop support not implemented on this platform"
4206 _dispatch_runloop_queue_poke(dispatch_queue_t dq
,
4207 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
)
4209 // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
4210 // a release barrier and that when runloop queues stop being thread bound
4211 // they have a non optional wake-up to start being a "normal" queue
4212 // either in _dispatch_runloop_queue_xref_dispose,
4213 // or in _dispatch_queue_cleanup2() for the main thread.
4215 if (dq
== &_dispatch_main_q
) {
4216 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
4217 _dispatch_runloop_queue_handle_init
);
4219 _dispatch_queue_override_priority(dq
, /* inout */ &pp
, /* inout */ &flags
);
4220 if (flags
& DISPATCH_WAKEUP_OVERRIDING
) {
4221 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4222 _dispatch_thread_override_start(owner
, pp
, dq
);
4223 if (flags
& DISPATCH_WAKEUP_WAS_OVERRIDDEN
) {
4224 _dispatch_thread_override_end(owner
, dq
);
4227 _dispatch_runloop_queue_class_poke(dq
);
4228 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4229 return _dispatch_release_tailcall(dq
);
4236 _dispatch_global_queue_poke_slow(dispatch_queue_t dq
, unsigned int n
)
4238 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4242 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
4243 _dispatch_root_queues_init_once
);
4245 _dispatch_debug_root_queue(dq
, __func__
);
4246 #if HAVE_PTHREAD_WORKQUEUES
4247 #if DISPATCH_USE_PTHREAD_POOL
4248 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
4251 _dispatch_root_queue_debug("requesting new worker thread for global "
4253 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4254 if (qc
->dgq_kworkqueue
) {
4255 pthread_workitem_handle_t wh
;
4256 unsigned int gen_cnt
;
4258 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
4259 _dispatch_worker_thread4
, dq
, &wh
, &gen_cnt
);
4260 (void)dispatch_assume_zero(r
);
4264 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4265 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4266 if (!dq
->dq_priority
) {
4267 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
4268 qc
->dgq_wq_options
, (int)i
);
4269 (void)dispatch_assume_zero(r
);
4273 #if HAVE_PTHREAD_WORKQUEUE_QOS
4274 r
= _pthread_workqueue_addthreads((int)i
, dq
->dq_priority
);
4275 (void)dispatch_assume_zero(r
);
4279 #endif // HAVE_PTHREAD_WORKQUEUES
4280 #if DISPATCH_USE_PTHREAD_POOL
4281 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
4282 if (fastpath(pqc
->dpq_thread_mediator
.do_vtable
)) {
4283 while (dispatch_semaphore_signal(&pqc
->dpq_thread_mediator
)) {
4289 uint32_t j
, t_count
;
4290 // seq_cst with atomic store to tail <rdar://problem/16932833>
4291 t_count
= os_atomic_load2o(qc
, dgq_thread_pool_size
, ordered
);
4294 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4298 j
= i
> t_count
? t_count
: i
;
4299 } while (!os_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
4300 t_count
- j
, &t_count
, acquire
));
4302 pthread_attr_t
*attr
= &pqc
->dpq_thread_attr
;
4303 pthread_t tid
, *pthr
= &tid
;
4304 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4305 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
4306 pthr
= _dispatch_mgr_root_queue_init();
4310 _dispatch_retain(dq
);
4311 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
4313 (void)dispatch_assume_zero(r
);
4315 _dispatch_temporary_resource_shortage();
4318 #endif // DISPATCH_USE_PTHREAD_POOL
4322 _dispatch_global_queue_poke_n(dispatch_queue_t dq
, unsigned int n
)
4324 if (!_dispatch_queue_class_probe(dq
)) {
4327 #if HAVE_PTHREAD_WORKQUEUES
4328 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4330 #if DISPATCH_USE_PTHREAD_POOL
4331 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
4333 !os_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
4334 _dispatch_root_queue_debug("worker thread request still pending for "
4335 "global queue: %p", dq
);
4338 #endif // HAVE_PTHREAD_WORKQUEUES
4339 return _dispatch_global_queue_poke_slow(dq
, n
);
4343 _dispatch_global_queue_poke(dispatch_queue_t dq
)
4345 return _dispatch_global_queue_poke_n(dq
, 1);
4350 _dispatch_queue_push_list_slow(dispatch_queue_t dq
, unsigned int n
)
4352 return _dispatch_global_queue_poke_n(dq
, n
);
4356 #pragma mark dispatch_queue_drain
4359 _dispatch_continuation_pop(dispatch_object_t dou
, dispatch_queue_t dq
,
4360 dispatch_invoke_flags_t flags
)
4362 _dispatch_continuation_pop_inline(dou
, dq
, flags
);
4366 _dispatch_continuation_invoke(dispatch_object_t dou
, voucher_t override_voucher
,
4367 dispatch_invoke_flags_t flags
)
4369 _dispatch_continuation_invoke_inline(dou
, override_voucher
, flags
);
4373 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4374 * (redirecting or not).
4378 * Serial drain is about serial queues (width == 1). It doesn't support
4379 * the redirecting mode, which doesn't make sense, and treats all continuations
4380 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4381 * is optimized away.
4383 * Serial drain stops if the width of the queue grows to larger than 1.
4384 * Going through a serial drain prevents any recursive drain from being
4389 * When in non-redirecting mode (meaning one of the target queues is serial),
4390 * non-barriers and barriers alike run in the context of the drain thread.
4391 * Slow non-barrier items are still all signaled so that they can make progress
4392 * toward the dispatch_sync() that will serialize them all .
4394 * In redirecting mode, non-barrier work items are redirected downward.
4396 * Concurrent drain stops if the width of the queue becomes 1, so that the
4397 * queue drain moves to the more efficient serial mode.
4399 DISPATCH_ALWAYS_INLINE
4400 static dispatch_queue_t
4401 _dispatch_queue_drain(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
,
4402 uint64_t *owned_ptr
, struct dispatch_object_s
**dc_out
,
4405 dispatch_queue_t orig_tq
= dq
->do_targetq
;
4406 dispatch_thread_frame_s dtf
;
4407 struct dispatch_object_s
*dc
= NULL
, *next_dc
;
4408 uint64_t owned
= *owned_ptr
;
4410 _dispatch_thread_frame_push(&dtf
, dq
);
4411 if (_dq_state_is_in_barrier(owned
)) {
4412 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4413 // but width can change while draining barrier work items, so we only
4414 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4415 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4418 while (dq
->dq_items_tail
) {
4419 dc
= _dispatch_queue_head(dq
);
4421 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq
))) {
4424 if (unlikely(orig_tq
!= dq
->do_targetq
)) {
4427 if (unlikely(serial_drain
!= (dq
->dq_width
== 1))) {
4430 if (serial_drain
|| _dispatch_object_is_barrier(dc
)) {
4431 if (!serial_drain
&& owned
!= DISPATCH_QUEUE_IN_BARRIER
) {
4434 next_dc
= _dispatch_queue_next(dq
, dc
);
4435 if (_dispatch_object_is_slow_item(dc
)) {
4437 goto out_with_deferred
;
4440 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4441 // we just ran barrier work items, we have to make their
4442 // effect visible to other sync work items on other threads
4443 // that may start coming in after this point, hence the
4445 os_atomic_and2o(dq
, dq_state
, ~owned
, release
);
4446 owned
= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4447 } else if (unlikely(owned
== 0)) {
4448 if (_dispatch_object_is_slow_item(dc
)) {
4449 // sync "readers" don't observe the limit
4450 _dispatch_queue_reserve_sync_width(dq
);
4451 } else if (!_dispatch_queue_try_acquire_async(dq
)) {
4452 goto out_with_no_width
;
4454 owned
= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4457 next_dc
= _dispatch_queue_next(dq
, dc
);
4458 if (_dispatch_object_is_slow_item(dc
)) {
4459 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4460 _dispatch_continuation_slow_item_signal(dq
, dc
);
4464 if (flags
& DISPATCH_INVOKE_REDIRECTING_DRAIN
) {
4465 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4466 _dispatch_continuation_redirect(dq
, dc
);
4471 _dispatch_continuation_pop_inline(dc
, dq
, flags
);
4472 _dispatch_perfmon_workitem_inc();
4473 if (unlikely(dtf
.dtf_deferred
)) {
4474 goto out_with_deferred_compute_owned
;
4476 } while ((dc
= next_dc
));
4480 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4481 // if we're IN_BARRIER we really own the full width too
4482 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4485 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4488 _dispatch_thread_frame_pop(&dtf
);
4489 return dc
? dq
->do_targetq
: NULL
;
4493 _dispatch_thread_frame_pop(&dtf
);
4496 out_with_deferred_compute_owned
:
4498 owned
= DISPATCH_QUEUE_IN_BARRIER
+ DISPATCH_QUEUE_WIDTH_INTERVAL
;
4500 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4501 // if we're IN_BARRIER we really own the full width too
4502 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4505 owned
= _dispatch_queue_adjust_owned(dq
, owned
, next_dc
);
4510 if (unlikely(!dc_out
)) {
4511 DISPATCH_INTERNAL_CRASH(dc
,
4512 "Deferred continuation on source, mach channel or mgr");
4515 _dispatch_thread_frame_pop(&dtf
);
4516 return dq
->do_targetq
;
4520 static dispatch_queue_t
4521 _dispatch_queue_concurrent_drain(dispatch_queue_t dq
,
4522 dispatch_invoke_flags_t flags
, uint64_t *owned
,
4523 struct dispatch_object_s
**dc_ptr
)
4525 return _dispatch_queue_drain(dq
, flags
, owned
, dc_ptr
, false);
4530 _dispatch_queue_serial_drain(dispatch_queue_t dq
,
4531 dispatch_invoke_flags_t flags
, uint64_t *owned
,
4532 struct dispatch_object_s
**dc_ptr
)
4534 flags
&= ~(dispatch_invoke_flags_t
)DISPATCH_INVOKE_REDIRECTING_DRAIN
;
4535 return _dispatch_queue_drain(dq
, flags
, owned
, dc_ptr
, true);
4538 #if DISPATCH_COCOA_COMPAT
4540 _dispatch_main_queue_drain(void)
4542 dispatch_queue_t dq
= &_dispatch_main_q
;
4543 dispatch_thread_frame_s dtf
;
4545 if (!dq
->dq_items_tail
) {
4549 if (!fastpath(_dispatch_queue_is_thread_bound(dq
))) {
4550 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
4551 " after dispatch_main()");
4553 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4554 if (slowpath(owner
!= _dispatch_tid_self())) {
4555 DISPATCH_CLIENT_CRASH(owner
, "_dispatch_main_queue_callback_4CF called"
4556 " from the wrong thread");
4559 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
4560 _dispatch_runloop_queue_handle_init
);
4562 _dispatch_perfmon_start();
4563 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
4564 // drains the main runloop, as this should not be observable that way
4565 _dispatch_thread_frame_push_and_rebase(&dtf
, dq
, NULL
);
4567 pthread_priority_t old_pri
= _dispatch_get_priority();
4568 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
, NULL
);
4569 voucher_t voucher
= _voucher_copy();
4571 struct dispatch_object_s
*dc
, *next_dc
, *tail
;
4572 dc
= os_mpsc_capture_snapshot(dq
, dq_items
, &tail
);
4574 next_dc
= os_mpsc_pop_snapshot_head(dc
, tail
, do_next
);
4575 _dispatch_continuation_pop_inline(dc
, dq
, DISPATCH_INVOKE_NONE
);
4576 _dispatch_perfmon_workitem_inc();
4577 } while ((dc
= next_dc
));
4579 // runloop based queues use their port for the queue PUBLISH pattern
4580 // so this raw call to dx_wakeup(0) is valid
4581 dx_wakeup(dq
, 0, 0);
4582 _dispatch_voucher_debug("main queue restore", voucher
);
4583 _dispatch_reset_defaultpriority(old_dp
);
4584 _dispatch_reset_priority_and_voucher(old_pri
, voucher
);
4585 _dispatch_thread_frame_pop(&dtf
);
4586 _dispatch_perfmon_end();
4587 _dispatch_force_cache_cleanup();
4591 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
4593 if (!dq
->dq_items_tail
) {
4596 dispatch_thread_frame_s dtf
;
4597 _dispatch_perfmon_start();
4598 _dispatch_thread_frame_push(&dtf
, dq
);
4599 pthread_priority_t old_pri
= _dispatch_get_priority();
4600 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
, NULL
);
4601 voucher_t voucher
= _voucher_copy();
4603 struct dispatch_object_s
*dc
, *next_dc
;
4604 dc
= _dispatch_queue_head(dq
);
4605 next_dc
= _dispatch_queue_next(dq
, dc
);
4606 _dispatch_continuation_pop_inline(dc
, dq
, DISPATCH_INVOKE_NONE
);
4607 _dispatch_perfmon_workitem_inc();
4610 // runloop based queues use their port for the queue PUBLISH pattern
4611 // so this raw call to dx_wakeup(0) is valid
4612 dx_wakeup(dq
, 0, 0);
4615 _dispatch_voucher_debug("runloop queue restore", voucher
);
4616 _dispatch_reset_defaultpriority(old_dp
);
4617 _dispatch_reset_priority_and_voucher(old_pri
, voucher
);
4618 _dispatch_thread_frame_pop(&dtf
);
4619 _dispatch_perfmon_end();
4620 _dispatch_force_cache_cleanup();
4627 _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq
)
4629 dispatch_continuation_t dc_tmp
, dc_start
, dc_end
;
4630 struct dispatch_object_s
*dc
= NULL
;
4631 uint64_t dq_state
, owned
;
4634 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4635 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4636 attempt_running_slow_head
:
4637 if (slowpath(dq
->dq_items_tail
) && !DISPATCH_QUEUE_IS_SUSPENDED(dq
)) {
4638 dc
= _dispatch_queue_head(dq
);
4639 if (!_dispatch_object_is_slow_item(dc
)) {
4640 // not a slow item, needs to wake up
4641 } else if (fastpath(dq
->dq_width
== 1) ||
4642 _dispatch_object_is_barrier(dc
)) {
4643 // rdar://problem/8290662 "barrier/writer lock transfer"
4644 dc_start
= dc_end
= (dispatch_continuation_t
)dc
;
4647 dc
= _dispatch_queue_next(dq
, dc
);
4649 // <rdar://problem/10164594> "reader lock transfer"
4650 // we must not signal semaphores immediately because our right
4651 // for dequeuing is granted through holding the full "barrier" width
4652 // which a signaled work item could relinquish out from our feet
4653 dc_start
= (dispatch_continuation_t
)dc
;
4655 // no check on width here because concurrent queues
4656 // do not respect width for blocked readers, the thread
4657 // is already spent anyway
4658 dc_end
= (dispatch_continuation_t
)dc
;
4659 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4661 dc
= _dispatch_queue_next(dq
, dc
);
4662 } while (dc
&& _dispatch_object_is_slow_non_barrier(dc
));
4666 _dispatch_queue_drain_transfer_lock(dq
, owned
, dc_start
);
4668 // signaled job will release the continuation
4670 dc_start
= dc_start
->do_next
;
4671 _dispatch_continuation_slow_item_signal(dq
, dc_tmp
);
4672 } while (dc_tmp
!= dc_end
);
4677 if (dc
|| dx_metatype(dq
) != _DISPATCH_QUEUE_TYPE
) {
4678 // <rdar://problem/23336992> the following wakeup is needed for sources
4679 // or mach channels: when ds_pending_data is set at the same time
4680 // as a trysync_f happens, lock transfer code above doesn't know about
4681 // ds_pending_data or the wakeup logic, but lock transfer is useless
4682 // for sources and mach channels in the first place.
4683 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4684 dq_state
= _dispatch_queue_drain_unlock(dq
, owned
, NULL
);
4685 return _dispatch_queue_try_wakeup(dq
, dq_state
,
4686 DISPATCH_WAKEUP_WAITER_HANDOFF
);
4687 } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq
, owned
))) {
4688 // someone enqueued a slow item at the head
4689 // looping may be its last chance
4690 goto attempt_running_slow_head
;
4695 _dispatch_mgr_queue_drain(void)
4697 const dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_MANAGER_DRAIN
;
4698 dispatch_queue_t dq
= &_dispatch_mgr_q
;
4699 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
4701 if (dq
->dq_items_tail
) {
4702 _dispatch_perfmon_start();
4703 if (slowpath(_dispatch_queue_serial_drain(dq
, flags
, &owned
, NULL
))) {
4704 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
4706 _dispatch_voucher_debug("mgr queue clear", NULL
);
4708 _dispatch_reset_defaultpriority_override();
4709 _dispatch_perfmon_end();
4712 #if DISPATCH_USE_KEVENT_WORKQUEUE
4713 if (!_dispatch_kevent_workqueue_enabled
)
4716 _dispatch_force_cache_cleanup();
4721 #pragma mark dispatch_queue_invoke
4724 _dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq
,
4725 dispatch_invoke_flags_t flags
, uint64_t to_unlock
,
4726 struct dispatch_object_s
*dc
)
4728 if (_dispatch_object_is_slow_item(dc
)) {
4729 dispatch_assert(to_unlock
== 0);
4730 _dispatch_queue_drain_transfer_lock(dq
, to_unlock
, dc
);
4731 _dispatch_continuation_slow_item_signal(dq
, dc
);
4732 return _dispatch_release_tailcall(dq
);
4735 bool should_defer_again
= false, should_pend_queue
= true;
4736 uint64_t old_state
, new_state
;
4738 if (_dispatch_get_current_queue()->do_targetq
) {
4739 _dispatch_thread_frame_get_current()->dtf_deferred
= dc
;
4740 should_defer_again
= true;
4741 should_pend_queue
= false;
4744 if (dq
->dq_width
> 1) {
4745 should_pend_queue
= false;
4746 } else if (should_pend_queue
) {
4747 dispatch_assert(to_unlock
==
4748 DISPATCH_QUEUE_WIDTH_INTERVAL
+ DISPATCH_QUEUE_IN_BARRIER
);
4749 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
4750 new_state
= old_state
;
4751 if (_dq_state_has_waiters(old_state
) ||
4752 _dq_state_is_enqueued(old_state
)) {
4753 os_atomic_rmw_loop_give_up(break);
4755 new_state
+= DISPATCH_QUEUE_DRAIN_PENDED
;
4756 new_state
-= DISPATCH_QUEUE_IN_BARRIER
;
4757 new_state
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4759 should_pend_queue
= (new_state
& DISPATCH_QUEUE_DRAIN_PENDED
);
4762 if (!should_pend_queue
) {
4763 if (to_unlock
& DISPATCH_QUEUE_IN_BARRIER
) {
4764 _dispatch_try_lock_transfer_or_wakeup(dq
);
4765 _dispatch_release(dq
);
4766 } else if (to_unlock
) {
4767 uint64_t dq_state
= _dispatch_queue_drain_unlock(dq
, to_unlock
, NULL
);
4768 _dispatch_queue_try_wakeup(dq
, dq_state
, DISPATCH_WAKEUP_CONSUME
);
4770 _dispatch_release(dq
);
4775 if (!should_defer_again
) {
4776 dx_invoke(dc
, flags
& _DISPATCH_INVOKE_PROPAGATE_MASK
);
4780 uint32_t self
= _dispatch_tid_self();
4781 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
4782 new_state
= old_state
;
4783 if (!_dq_state_drain_pended(old_state
) ||
4784 _dq_state_drain_owner(old_state
) != self
) {
4785 os_atomic_rmw_loop_give_up({
4786 // We may have been overridden, so inform the root queue
4787 _dispatch_set_defaultpriority_override();
4788 return _dispatch_release_tailcall(dq
);
4791 new_state
= DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state
);
4793 if (_dq_state_has_override(old_state
)) {
4794 // Ensure that the root queue sees that this thread was overridden.
4795 _dispatch_set_defaultpriority_override();
4797 return dx_invoke(dq
, flags
| DISPATCH_INVOKE_STEALING
);
4802 _dispatch_queue_finalize_activation(dispatch_queue_t dq
)
4804 dispatch_queue_t tq
= dq
->do_targetq
;
4805 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
4806 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
4807 if (dq
->dq_override_voucher
== DISPATCH_NO_VOUCHER
) {
4808 voucher_t v
= tq
->dq_override_voucher
;
4809 if (v
!= DISPATCH_NO_VOUCHER
) {
4810 if (v
) _voucher_retain(v
);
4811 dq
->dq_override_voucher
= v
;
4816 DISPATCH_ALWAYS_INLINE
4817 static inline dispatch_queue_t
4818 dispatch_queue_invoke2(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
,
4819 uint64_t *owned
, struct dispatch_object_s
**dc_ptr
)
4821 dispatch_queue_t otq
= dq
->do_targetq
;
4822 dispatch_queue_t cq
= _dispatch_queue_get_current();
4824 if (slowpath(cq
!= otq
)) {
4827 if (dq
->dq_width
== 1) {
4828 return _dispatch_queue_serial_drain(dq
, flags
, owned
, dc_ptr
);
4830 return _dispatch_queue_concurrent_drain(dq
, flags
, owned
, dc_ptr
);
4833 // 6618342 Contact the team that owns the Instrument DTrace probe before
4834 // renaming this symbol
4837 _dispatch_queue_invoke(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
)
4839 _dispatch_queue_class_invoke(dq
, flags
, dispatch_queue_invoke2
);
4843 #pragma mark dispatch_queue_class_wakeup
4845 #if HAVE_PTHREAD_WORKQUEUE_QOS
4847 _dispatch_queue_override_invoke(dispatch_continuation_t dc
,
4848 dispatch_invoke_flags_t flags
)
4850 dispatch_queue_t old_rq
= _dispatch_queue_get_current();
4851 dispatch_queue_t assumed_rq
= dc
->dc_other
;
4852 voucher_t ov
= DISPATCH_NO_VOUCHER
;
4853 dispatch_object_t dou
;
4855 dou
._do
= dc
->dc_data
;
4856 _dispatch_queue_set_current(assumed_rq
);
4857 flags
|= DISPATCH_INVOKE_OVERRIDING
;
4858 if (dc_type(dc
) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING
)) {
4859 flags
|= DISPATCH_INVOKE_STEALING
;
4861 // balance the fake continuation push in
4862 // _dispatch_root_queue_push_override
4863 _dispatch_trace_continuation_pop(assumed_rq
, dou
._do
);
4865 _dispatch_continuation_pop_forwarded(dc
, ov
, DISPATCH_OBJ_CONSUME_BIT
, {
4866 if (_dispatch_object_has_vtable(dou
._do
)) {
4867 dx_invoke(dou
._do
, flags
);
4869 _dispatch_continuation_invoke_inline(dou
, ov
, flags
);
4872 _dispatch_queue_set_current(old_rq
);
4875 DISPATCH_ALWAYS_INLINE
4877 _dispatch_need_global_root_queue_override(dispatch_queue_t rq
,
4878 pthread_priority_t pp
)
4880 pthread_priority_t rqp
= rq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4881 bool defaultqueue
= rq
->dq_priority
& _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
4883 if (unlikely(!rqp
)) return false;
4885 pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4886 return defaultqueue
? pp
&& pp
!= rqp
: pp
> rqp
;
4889 DISPATCH_ALWAYS_INLINE
4891 _dispatch_need_global_root_queue_override_stealer(dispatch_queue_t rq
,
4892 pthread_priority_t pp
, dispatch_wakeup_flags_t wflags
)
4894 pthread_priority_t rqp
= rq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4895 bool defaultqueue
= rq
->dq_priority
& _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
4897 if (unlikely(!rqp
)) return false;
4899 if (wflags
& DISPATCH_WAKEUP_WAITER_HANDOFF
) {
4900 if (!(wflags
& _DISPATCH_WAKEUP_OVERRIDE_BITS
)) {
4905 pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4906 return defaultqueue
|| pp
> rqp
;
4911 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq
,
4912 dispatch_object_t dou
, pthread_priority_t pp
)
4914 bool overcommit
= orig_rq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4915 dispatch_queue_t rq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
4916 dispatch_continuation_t dc
= dou
._dc
;
4918 if (_dispatch_object_is_redirection(dc
)) {
4919 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
4921 dc
->dc_func
= (void *)orig_rq
;
4923 dc
= _dispatch_continuation_alloc();
4924 dc
->do_vtable
= DC_VTABLE(OVERRIDE_OWNING
);
4925 // fake that we queued `dou` on `orig_rq` for introspection purposes
4926 _dispatch_trace_continuation_push(orig_rq
, dou
);
4928 dc
->dc_other
= orig_rq
;
4929 dc
->dc_data
= dou
._do
;
4930 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
4931 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
4934 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
4935 _dispatch_queue_push_inline(rq
, dc
, 0, 0);
4940 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq
,
4941 dispatch_queue_t dq
, pthread_priority_t pp
)
4943 bool overcommit
= orig_rq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4944 dispatch_queue_t rq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
4945 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4947 dc
->do_vtable
= DC_VTABLE(OVERRIDE_STEALING
);
4948 _dispatch_retain(dq
);
4951 dc
->dc_other
= orig_rq
;
4953 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
4954 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
4956 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
4957 _dispatch_queue_push_inline(rq
, dc
, 0, 0);
4962 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq
,
4963 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
, uint64_t dq_state
)
4965 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
4966 pthread_priority_t pp2
;
4967 dispatch_queue_t tq
;
4971 int rc
= _dispatch_wqthread_override_start_check_owner(owner
, pp
,
4972 &dq
->dq_state_lock
);
4973 // EPERM means the target of the override is not a work queue thread
4974 // and could be a thread bound queue such as the main queue.
4975 // When that happens we must get to that queue and wake it up if we
4976 // want the override to be appplied and take effect.
4982 if (_dq_state_is_suspended(dq_state
)) {
4986 tq
= dq
->do_targetq
;
4988 if (_dispatch_queue_has_immutable_target(dq
)) {
4990 } else if (_dispatch_is_in_root_queues_array(tq
)) {
4991 // avoid locking when we recognize the target queue as a global root
4992 // queue it is gross, but is a very common case. The locking isn't
4993 // needed because these target queues cannot go away.
4995 } else if (_dispatch_queue_sidelock_trylock(dq
, pp
)) {
4996 // <rdar://problem/17735825> to traverse the tq chain safely we must
4997 // lock it to ensure it cannot change
4999 tq
= dq
->do_targetq
;
5000 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5003 // Leading to being there, the current thread has:
5004 // 1. enqueued an object on `dq`
5005 // 2. raised the dq_override value of `dq`
5006 // 3. set the HAS_OVERRIDE bit and not seen an owner
5007 // 4. tried and failed to acquire the side lock
5010 // The side lock owner can only be one of three things:
5012 // - The suspend/resume side count code. Besides being unlikely,
5013 // it means that at this moment the queue is actually suspended,
5014 // which transfers the responsibility of applying the override to
5015 // the eventual dispatch_resume().
5017 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5018 // means that the trysync it does wasn't being drained when (3)
5019 // happened which can only be explained by one of these interleavings:
5021 // o `dq` became idle between when the object queued in (1) ran and
5022 // the set_target_queue call and we were unlucky enough that our
5023 // step (3) happened while this queue was idle. There is no reason
5024 // to override anything anymore, the queue drained to completion
5025 // while we were preempted, our job is done.
5027 // o `dq` is queued but not draining during (1-3), then when we try
5028 // to lock at (4) the queue is now draining a set_target_queue.
5029 // Since we set HAS_OVERRIDE with a release barrier, the effect of
5030 // (2) was visible to the drainer when he acquired the drain lock,
5031 // and that guy has applied our override. Our job is done.
5033 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5034 // which is fine because trylock leaves a hint that we failed our
5035 // trylock, causing the tryunlock below to fail and reassess whether
5036 // a better override needs to be applied.
5038 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5043 if (dx_type(tq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
5044 if (_dispatch_need_global_root_queue_override_stealer(tq
, pp
, flags
)) {
5045 _dispatch_root_queue_push_override_stealer(tq
, dq
, pp
);
5047 } else if (flags
& DISPATCH_WAKEUP_WAITER_HANDOFF
) {
5048 dx_wakeup(tq
, pp
, flags
);
5049 } else if (_dispatch_queue_need_override(tq
, pp
)) {
5050 dx_wakeup(tq
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
5052 while (unlikely(locked
&& !_dispatch_queue_sidelock_tryunlock(dq
))) {
5053 // rdar://problem/24081326
5055 // Another instance of _dispatch_queue_class_wakeup_with_override()
5056 // tried to acquire the side lock while we were running, and could have
5057 // had a better override than ours to apply.
5059 pp2
= dq
->dq_override
;
5062 // The other instance had a better priority than ours, override
5063 // our thread, and apply the override that wasn't applied to `dq`
5070 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5071 return _dispatch_release_tailcall(dq
);
5074 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5078 _dispatch_queue_class_override_drainer(dispatch_queue_t dq
,
5079 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
)
5081 #if HAVE_PTHREAD_WORKQUEUE_QOS
5082 uint64_t dq_state
, value
;
5085 // Someone is trying to override the last work item of the queue.
5086 // Do not remember this override on the queue because we know the precise
5087 // duration the override is required for: until the current drain unlocks.
5089 // That is why this function only tries to set HAS_OVERRIDE if we can
5090 // still observe a drainer, and doesn't need to set the DIRTY bit
5091 // because oq_override wasn't touched and there is no race to resolve
5093 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
5094 if (!_dq_state_drain_locked(dq_state
)) {
5095 os_atomic_rmw_loop_give_up(break);
5097 value
= dq_state
| DISPATCH_QUEUE_HAS_OVERRIDE
;
5099 if (_dq_state_drain_locked(dq_state
)) {
5100 return _dispatch_queue_class_wakeup_with_override(dq
, pp
,
5105 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5106 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5107 return _dispatch_release_tailcall(dq
);
5111 #if DISPATCH_USE_KEVENT_WORKQUEUE
5114 _dispatch_trystash_to_deferred_items(dispatch_queue_t dq
, dispatch_object_t dou
,
5115 pthread_priority_t pp
, dispatch_deferred_items_t ddi
)
5117 dispatch_priority_t old_pp
= ddi
->ddi_stashed_pp
;
5118 dispatch_queue_t old_dq
= ddi
->ddi_stashed_dq
;
5119 struct dispatch_object_s
*old_dou
= ddi
->ddi_stashed_dou
;
5120 dispatch_priority_t rq_overcommit
;
5122 rq_overcommit
= dq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
5123 if (likely(!old_pp
|| rq_overcommit
)) {
5124 ddi
->ddi_stashed_dq
= dq
;
5125 ddi
->ddi_stashed_dou
= dou
._do
;
5126 ddi
->ddi_stashed_pp
= (dispatch_priority_t
)pp
| rq_overcommit
|
5127 _PTHREAD_PRIORITY_PRIORITY_MASK
;
5128 if (likely(!old_pp
)) {
5131 // push the previously stashed item
5132 pp
= old_pp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5136 if (_dispatch_need_global_root_queue_override(dq
, pp
)) {
5137 return _dispatch_root_queue_push_override(dq
, dou
, pp
);
5139 // bit of cheating: we should really pass `pp` but we know that we are
5140 // pushing onto a global queue at this point, and we just checked that
5141 // `pp` doesn't matter.
5142 DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
5143 _dispatch_queue_push_inline(dq
, dou
, 0, 0);
5149 _dispatch_queue_push_slow(dispatch_queue_t dq
, dispatch_object_t dou
,
5150 pthread_priority_t pp
)
5152 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
5153 _dispatch_root_queues_init_once
);
5154 _dispatch_queue_push(dq
, dou
, pp
);
5159 _dispatch_queue_push(dispatch_queue_t dq
, dispatch_object_t dou
,
5160 pthread_priority_t pp
)
5162 _dispatch_assert_is_valid_qos_override(pp
);
5163 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
5164 #if DISPATCH_USE_KEVENT_WORKQUEUE
5165 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
5166 if (unlikely(ddi
&& !(ddi
->ddi_stashed_pp
&
5167 (dispatch_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
))) {
5168 dispatch_assert(_dispatch_root_queues_pred
== DLOCK_ONCE_DONE
);
5169 return _dispatch_trystash_to_deferred_items(dq
, dou
, pp
, ddi
);
5172 #if HAVE_PTHREAD_WORKQUEUE_QOS
5173 // can't use dispatch_once_f() as it would create a frame
5174 if (unlikely(_dispatch_root_queues_pred
!= DLOCK_ONCE_DONE
)) {
5175 return _dispatch_queue_push_slow(dq
, dou
, pp
);
5177 if (_dispatch_need_global_root_queue_override(dq
, pp
)) {
5178 return _dispatch_root_queue_push_override(dq
, dou
, pp
);
5182 _dispatch_queue_push_inline(dq
, dou
, pp
, 0);
5187 _dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq
, pthread_priority_t pp
,
5188 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
)
5190 dispatch_queue_t tq
;
5192 if (flags
& (DISPATCH_WAKEUP_OVERRIDING
| DISPATCH_WAKEUP_WAS_OVERRIDDEN
)) {
5193 // _dispatch_queue_drain_try_unlock may have reset the override while
5194 // we were becoming the enqueuer
5195 _dispatch_queue_reinstate_override_priority(dq
, (dispatch_priority_t
)pp
);
5197 if (!(flags
& DISPATCH_WAKEUP_CONSUME
)) {
5198 _dispatch_retain(dq
);
5200 if (target
== DISPATCH_QUEUE_WAKEUP_TARGET
) {
5201 // try_become_enqueuer has no acquire barrier, as the last block
5202 // of a queue asyncing to that queue is not an uncommon pattern
5203 // and in that case the acquire is completely useless
5205 // so instead use a thread fence here when we will read the targetq
5206 // pointer because that is the only thing that really requires
5208 os_atomic_thread_fence(acquire
);
5209 tq
= dq
->do_targetq
;
5211 dispatch_assert(target
== DISPATCH_QUEUE_WAKEUP_MGR
);
5212 tq
= &_dispatch_mgr_q
;
5214 return _dispatch_queue_push(tq
, dq
, pp
);
5219 _dispatch_queue_class_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
5220 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
)
5222 uint64_t old_state
, new_state
, bits
= 0;
5224 #if HAVE_PTHREAD_WORKQUEUE_QOS
5225 _dispatch_queue_override_priority(dq
, /* inout */ &pp
, /* inout */ &flags
);
5228 if (flags
& DISPATCH_WAKEUP_FLUSH
) {
5229 bits
= DISPATCH_QUEUE_DIRTY
;
5231 if (flags
& DISPATCH_WAKEUP_OVERRIDING
) {
5233 // Setting the dirty bit here is about forcing callers of
5234 // _dispatch_queue_drain_try_unlock() to loop again when an override
5235 // has just been set to close the following race:
5237 // Drainer (in drain_try_unlokc():
5238 // override_reset();
5242 // atomic_or(oq_override, override, relaxed);
5243 // atomic_or(dq_state, HAS_OVERRIDE, release);
5247 // successful drain_unlock() and leaks `oq_override`
5249 bits
= DISPATCH_QUEUE_DIRTY
| DISPATCH_QUEUE_HAS_OVERRIDE
;
5252 if (flags
& DISPATCH_WAKEUP_SLOW_WAITER
) {
5253 uint64_t pending_barrier_width
=
5254 (dq
->dq_width
- 1) * DISPATCH_QUEUE_WIDTH_INTERVAL
;
5255 uint64_t xor_owner_and_set_full_width_and_in_barrier
=
5256 _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT
|
5257 DISPATCH_QUEUE_IN_BARRIER
;
5259 #ifdef DLOCK_NOWAITERS_BIT
5260 bits
|= DLOCK_NOWAITERS_BIT
;
5262 bits
|= DLOCK_WAITERS_BIT
;
5264 flags
^= DISPATCH_WAKEUP_SLOW_WAITER
;
5265 dispatch_assert(!(flags
& DISPATCH_WAKEUP_CONSUME
));
5267 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
5268 new_state
= old_state
| bits
;
5269 if (_dq_state_drain_pended(old_state
)) {
5270 // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
5271 // but we want to be more efficient wrt the WAITERS_BIT
5272 new_state
&= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK
;
5273 new_state
&= ~DISPATCH_QUEUE_DRAIN_PENDED
;
5275 if (unlikely(_dq_state_drain_locked(new_state
))) {
5276 #ifdef DLOCK_NOWAITERS_BIT
5277 new_state
&= ~(uint64_t)DLOCK_NOWAITERS_BIT
;
5279 } else if (unlikely(!_dq_state_is_runnable(new_state
) ||
5280 !(flags
& DISPATCH_WAKEUP_FLUSH
))) {
5281 // either not runnable, or was not for the first item (26700358)
5282 // so we should not try to lock and handle overrides instead
5283 } else if (_dq_state_has_pending_barrier(old_state
) ||
5284 new_state
+ pending_barrier_width
<
5285 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
5286 // see _dispatch_queue_drain_try_lock
5287 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
5288 new_state
^= xor_owner_and_set_full_width_and_in_barrier
;
5290 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5293 if ((old_state
^ new_state
) & DISPATCH_QUEUE_IN_BARRIER
) {
5294 return _dispatch_try_lock_transfer_or_wakeup(dq
);
5297 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
5298 new_state
= old_state
| bits
;
5299 if (likely(_dq_state_should_wakeup(old_state
))) {
5300 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5304 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
,{
5305 new_state
= old_state
;
5306 if (likely(_dq_state_should_wakeup(old_state
))) {
5307 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5309 os_atomic_rmw_loop_give_up(break);
5314 if ((old_state
^ new_state
) & DISPATCH_QUEUE_ENQUEUED
) {
5315 return _dispatch_queue_class_wakeup_enqueue(dq
, pp
, flags
, target
);
5318 #if HAVE_PTHREAD_WORKQUEUE_QOS
5319 if ((flags
& (DISPATCH_WAKEUP_OVERRIDING
| DISPATCH_WAKEUP_WAITER_HANDOFF
))
5320 && target
== DISPATCH_QUEUE_WAKEUP_TARGET
) {
5321 return _dispatch_queue_class_wakeup_with_override(dq
, pp
,
5326 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5327 return _dispatch_release_tailcall(dq
);
5332 #pragma mark dispatch_root_queue_drain
5336 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq
)
5338 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5339 struct dispatch_object_s
*const mediator
= (void *)~0ul;
5340 bool pending
= false, available
= true;
5341 unsigned int sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
5344 // Spin for a short while in case the contention is temporary -- e.g.
5345 // when starting up after dispatch_apply, or when executing a few
5346 // short continuations in a row.
5347 if (_dispatch_contention_wait_until(dq
->dq_items_head
!= mediator
)) {
5350 // Since we have serious contention, we need to back off.
5352 // Mark this queue as pending to avoid requests for further threads
5353 (void)os_atomic_inc2o(qc
, dgq_pending
, relaxed
);
5356 _dispatch_contention_usleep(sleep_time
);
5357 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
5359 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
5361 // The ratio of work to libdispatch overhead must be bad. This
5362 // scenario implies that there are too many threads in the pool.
5363 // Create a new pending thread and then exit this thread.
5364 // The kernel will grant a new thread when the load subsides.
5365 _dispatch_debug("contention on global queue: %p", dq
);
5369 (void)os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5372 _dispatch_global_queue_poke(dq
);
5377 DISPATCH_ALWAYS_INLINE
5379 _dispatch_root_queue_drain_one2(dispatch_queue_t dq
)
5381 // Wait for queue head and tail to be both non-empty or both empty
5382 bool available
; // <rdar://problem/15917893>
5383 _dispatch_wait_until((dq
->dq_items_head
!= NULL
) ==
5384 (available
= (dq
->dq_items_tail
!= NULL
)));
5388 DISPATCH_ALWAYS_INLINE_NDEBUG
5389 static inline struct dispatch_object_s
*
5390 _dispatch_root_queue_drain_one(dispatch_queue_t dq
)
5392 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
5395 // The mediator value acts both as a "lock" and a signal
5396 head
= os_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
5398 if (slowpath(head
== NULL
)) {
5399 // The first xchg on the tail will tell the enqueueing thread that it
5400 // is safe to blindly write out to the head pointer. A cmpxchg honors
5402 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
,
5406 if (slowpath(dq
->dq_items_tail
) && // <rdar://problem/14416349>
5407 _dispatch_root_queue_drain_one2(dq
)) {
5410 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
5414 if (slowpath(head
== mediator
)) {
5415 // This thread lost the race for ownership of the queue.
5416 if (fastpath(_dispatch_root_queue_drain_one_slow(dq
))) {
5422 // Restore the head pointer to a sane value before returning.
5423 // If 'next' is NULL, then this item _might_ be the last item.
5424 next
= fastpath(head
->do_next
);
5426 if (slowpath(!next
)) {
5427 os_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
5428 // 22708742: set tail to NULL with release, so that NULL write to head
5429 // above doesn't clobber head from concurrent enqueuer
5430 if (os_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, release
)) {
5431 // both head and tail are NULL now
5434 // There must be a next item now.
5435 _dispatch_wait_until(next
= head
->do_next
);
5438 os_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
5439 _dispatch_global_queue_poke(dq
);
5445 _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq
,
5446 struct dispatch_object_s
*dou
, pthread_priority_t pp
)
5448 struct _dispatch_identity_s di
;
5450 // fake that we queued `dou` on `dq` for introspection purposes
5451 _dispatch_trace_continuation_push(dq
, dou
);
5453 pp
= _dispatch_priority_inherit_from_root_queue(pp
, dq
);
5454 _dispatch_queue_set_current(dq
);
5455 _dispatch_root_queue_identity_assume(&di
, pp
);
5456 #if DISPATCH_COCOA_COMPAT
5457 void *pool
= _dispatch_last_resort_autorelease_pool_push();
5458 #endif // DISPATCH_COCOA_COMPAT
5460 _dispatch_perfmon_start();
5461 _dispatch_continuation_pop_inline(dou
, dq
,
5462 DISPATCH_INVOKE_WORKER_DRAIN
| DISPATCH_INVOKE_REDIRECTING_DRAIN
);
5463 _dispatch_perfmon_workitem_inc();
5464 _dispatch_perfmon_end();
5466 #if DISPATCH_COCOA_COMPAT
5467 _dispatch_last_resort_autorelease_pool_pop(pool
);
5468 #endif // DISPATCH_COCOA_COMPAT
5469 _dispatch_reset_defaultpriority(di
.old_pp
);
5470 _dispatch_queue_set_current(NULL
);
5472 _dispatch_voucher_debug("root queue clear", NULL
);
5473 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5476 DISPATCH_NOT_TAIL_CALLED
// prevent tailcall (for Instrument DTrace probe)
5478 _dispatch_root_queue_drain(dispatch_queue_t dq
, pthread_priority_t pri
)
5481 dispatch_queue_t cq
;
5482 if (slowpath(cq
= _dispatch_queue_get_current())) {
5483 DISPATCH_INTERNAL_CRASH(cq
, "Premature thread recycling");
5486 _dispatch_queue_set_current(dq
);
5487 if (dq
->dq_priority
) pri
= dq
->dq_priority
;
5488 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(pri
, NULL
);
5489 #if DISPATCH_COCOA_COMPAT
5490 void *pool
= _dispatch_last_resort_autorelease_pool_push();
5491 #endif // DISPATCH_COCOA_COMPAT
5493 _dispatch_perfmon_start();
5494 struct dispatch_object_s
*item
;
5496 while ((item
= fastpath(_dispatch_root_queue_drain_one(dq
)))) {
5497 if (reset
) _dispatch_wqthread_override_reset();
5498 _dispatch_continuation_pop_inline(item
, dq
,
5499 DISPATCH_INVOKE_WORKER_DRAIN
|DISPATCH_INVOKE_REDIRECTING_DRAIN
);
5500 _dispatch_perfmon_workitem_inc();
5501 reset
= _dispatch_reset_defaultpriority_override();
5503 _dispatch_perfmon_end();
5505 #if DISPATCH_COCOA_COMPAT
5506 _dispatch_last_resort_autorelease_pool_pop(pool
);
5507 #endif // DISPATCH_COCOA_COMPAT
5508 _dispatch_reset_defaultpriority(old_dp
);
5509 _dispatch_queue_set_current(NULL
);
5513 #pragma mark dispatch_worker_thread
5515 #if HAVE_PTHREAD_WORKQUEUES
5517 _dispatch_worker_thread4(void *context
)
5519 dispatch_queue_t dq
= context
;
5520 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5522 _dispatch_introspection_thread_add();
5523 int pending
= (int)os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5524 dispatch_assert(pending
>= 0);
5525 _dispatch_root_queue_drain(dq
, _dispatch_get_priority());
5526 _dispatch_voucher_debug("root queue clear", NULL
);
5527 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5530 #if HAVE_PTHREAD_WORKQUEUE_QOS
5532 _dispatch_worker_thread3(pthread_priority_t pp
)
5534 bool overcommit
= pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
5535 dispatch_queue_t dq
;
5536 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
5537 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
5538 dq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
5539 return _dispatch_worker_thread4(dq
);
5541 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5543 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5544 // 6618342 Contact the team that owns the Instrument DTrace probe before
5545 // renaming this symbol
5547 _dispatch_worker_thread2(int priority
, int options
,
5548 void *context DISPATCH_UNUSED
)
5550 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
5551 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
5552 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
5554 return _dispatch_worker_thread4(dq
);
5556 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5557 #endif // HAVE_PTHREAD_WORKQUEUES
5559 #if DISPATCH_USE_PTHREAD_POOL
5560 // 6618342 Contact the team that owns the Instrument DTrace probe before
5561 // renaming this symbol
5563 _dispatch_worker_thread(void *context
)
5565 dispatch_queue_t dq
= context
;
5566 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5567 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
5569 if (pqc
->dpq_observer_hooks
.queue_will_execute
) {
5570 _dispatch_set_pthread_root_queue_observer_hooks(
5571 &pqc
->dpq_observer_hooks
);
5573 if (pqc
->dpq_thread_configure
) {
5574 pqc
->dpq_thread_configure();
5579 // workaround tweaks the kernel workqueue does for us
5580 r
= sigfillset(&mask
);
5581 (void)dispatch_assume_zero(r
);
5582 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
5583 (void)dispatch_assume_zero(r
);
5584 _dispatch_introspection_thread_add();
5586 const int64_t timeout
= 5ull * NSEC_PER_SEC
;
5587 pthread_priority_t old_pri
= _dispatch_get_priority();
5589 _dispatch_root_queue_drain(dq
, old_pri
);
5590 _dispatch_reset_priority_and_voucher(old_pri
, NULL
);
5591 } while (dispatch_semaphore_wait(&pqc
->dpq_thread_mediator
,
5592 dispatch_time(0, timeout
)) == 0);
5594 (void)os_atomic_inc2o(qc
, dgq_thread_pool_size
, release
);
5595 _dispatch_global_queue_poke(dq
);
5596 _dispatch_release(dq
);
5602 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
5606 /* Workaround: 6269619 Not all signals can be delivered on any thread */
5608 r
= sigdelset(set
, SIGILL
);
5609 (void)dispatch_assume_zero(r
);
5610 r
= sigdelset(set
, SIGTRAP
);
5611 (void)dispatch_assume_zero(r
);
5612 #if HAVE_DECL_SIGEMT
5613 r
= sigdelset(set
, SIGEMT
);
5614 (void)dispatch_assume_zero(r
);
5616 r
= sigdelset(set
, SIGFPE
);
5617 (void)dispatch_assume_zero(r
);
5618 r
= sigdelset(set
, SIGBUS
);
5619 (void)dispatch_assume_zero(r
);
5620 r
= sigdelset(set
, SIGSEGV
);
5621 (void)dispatch_assume_zero(r
);
5622 r
= sigdelset(set
, SIGSYS
);
5623 (void)dispatch_assume_zero(r
);
5624 r
= sigdelset(set
, SIGPIPE
);
5625 (void)dispatch_assume_zero(r
);
5627 return pthread_sigmask(how
, set
, oset
);
5629 #endif // DISPATCH_USE_PTHREAD_POOL
5632 #pragma mark dispatch_runloop_queue
5634 static bool _dispatch_program_is_probably_callback_driven
;
5636 #if DISPATCH_COCOA_COMPAT
5639 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
5641 dispatch_queue_t dq
;
5644 if (slowpath(flags
)) {
5645 return DISPATCH_BAD_INPUT
;
5647 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
5648 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
5649 _dispatch_queue_init(dq
, DQF_THREAD_BOUND
| DQF_CANNOT_TRYSYNC
, 1, false);
5650 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,true);
5651 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
5652 _dispatch_runloop_queue_handle_init(dq
);
5653 _dispatch_queue_set_bound_thread(dq
);
5654 _dispatch_object_debug(dq
, "%s", __func__
);
5655 return _dispatch_introspection_queue_create(dq
);
5659 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
5661 _dispatch_object_debug(dq
, "%s", __func__
);
5663 pthread_priority_t pp
= _dispatch_queue_reset_override_priority(dq
, true);
5664 _dispatch_queue_clear_bound_thread(dq
);
5665 dx_wakeup(dq
, pp
, DISPATCH_WAKEUP_FLUSH
);
5666 if (pp
) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq
), dq
);
5670 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
)
5672 _dispatch_object_debug(dq
, "%s", __func__
);
5673 _dispatch_introspection_queue_dispose(dq
);
5674 _dispatch_runloop_queue_handle_dispose(dq
);
5675 _dispatch_queue_destroy(dq
);
5679 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
5681 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5682 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5684 dispatch_retain(dq
);
5685 bool r
= _dispatch_runloop_queue_drain_one(dq
);
5686 dispatch_release(dq
);
5691 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
5693 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5694 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5696 _dispatch_runloop_queue_wakeup(dq
, 0, false);
5699 dispatch_runloop_handle_t
5700 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
5702 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5703 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5705 return _dispatch_runloop_queue_get_handle(dq
);
5709 _dispatch_runloop_queue_handle_init(void *ctxt
)
5711 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
5712 dispatch_runloop_handle_t handle
;
5714 _dispatch_fork_becomes_unsafe();
5719 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
5720 DISPATCH_VERIFY_MIG(kr
);
5721 (void)dispatch_assume_zero(kr
);
5722 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
5723 MACH_MSG_TYPE_MAKE_SEND
);
5724 DISPATCH_VERIFY_MIG(kr
);
5725 (void)dispatch_assume_zero(kr
);
5726 if (dq
!= &_dispatch_main_q
) {
5727 struct mach_port_limits limits
= {
5730 kr
= mach_port_set_attributes(mach_task_self(), mp
,
5731 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
5733 DISPATCH_VERIFY_MIG(kr
);
5734 (void)dispatch_assume_zero(kr
);
5737 #elif defined(__linux__)
5738 int fd
= eventfd(0, EFD_CLOEXEC
| EFD_NONBLOCK
);
5743 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5744 "process is out of file descriptors");
5747 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5748 "system is out of file descriptors");
5751 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5752 "kernel is out of memory");
5755 DISPATCH_INTERNAL_CRASH(err
, "eventfd() failure");
5761 #error "runloop support not implemented on this platform"
5763 _dispatch_runloop_queue_set_handle(dq
, handle
);
5765 _dispatch_program_is_probably_callback_driven
= true;
5769 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
)
5771 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
5772 if (!_dispatch_runloop_handle_is_valid(handle
)) {
5777 mach_port_t mp
= handle
;
5778 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
5779 DISPATCH_VERIFY_MIG(kr
);
5780 (void)dispatch_assume_zero(kr
);
5781 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
5782 DISPATCH_VERIFY_MIG(kr
);
5783 (void)dispatch_assume_zero(kr
);
5784 #elif defined(__linux__)
5785 int rc
= close(handle
);
5786 (void)dispatch_assume_zero(rc
);
5788 #error "runloop support not implemented on this platform"
5793 #pragma mark dispatch_main_queue
5795 dispatch_runloop_handle_t
5796 _dispatch_get_main_queue_handle_4CF(void)
5798 dispatch_queue_t dq
= &_dispatch_main_q
;
5799 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
5800 _dispatch_runloop_queue_handle_init
);
5801 return _dispatch_runloop_queue_get_handle(dq
);
5805 dispatch_runloop_handle_t
5806 _dispatch_get_main_queue_port_4CF(void)
5808 return _dispatch_get_main_queue_handle_4CF();
5812 static bool main_q_is_draining
;
5814 // 6618342 Contact the team that owns the Instrument DTrace probe before
5815 // renaming this symbol
5818 _dispatch_queue_set_mainq_drain_state(bool arg
)
5820 main_q_is_draining
= arg
;
5824 _dispatch_main_queue_callback_4CF(
5825 void *ignored DISPATCH_UNUSED
)
5827 if (main_q_is_draining
) {
5830 _dispatch_queue_set_mainq_drain_state(true);
5831 _dispatch_main_queue_drain();
5832 _dispatch_queue_set_mainq_drain_state(false);
5840 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
5841 _dispatch_root_queues_init_once
);
5843 #if HAVE_PTHREAD_MAIN_NP
5844 if (pthread_main_np()) {
5846 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
5847 _dispatch_program_is_probably_callback_driven
= true;
5848 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE
);
5850 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
5851 // To avoid that, just before calling pthread_exit we register a TSD destructor
5852 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
5853 // This relies on an implementation detail (currently true in glibc) that TSD destructors
5854 // will be called in the order of creation to cause all the TSD cleanup functions to
5855 // run before the thread becomes trapped in sigsuspend.
5856 pthread_key_t dispatch_main_key
;
5857 pthread_key_create(&dispatch_main_key
, _dispatch_sig_thread
);
5858 pthread_setspecific(dispatch_main_key
, &dispatch_main_key
);
5861 DISPATCH_INTERNAL_CRASH(errno
, "pthread_exit() returned");
5862 #if HAVE_PTHREAD_MAIN_NP
5864 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
5868 DISPATCH_NOINLINE DISPATCH_NORETURN
5870 _dispatch_sigsuspend(void)
5872 static const sigset_t mask
;
5881 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
5883 // never returns, so burn bridges behind us
5884 _dispatch_clear_stack(0);
5885 _dispatch_sigsuspend();
5890 _dispatch_queue_cleanup2(void)
5892 dispatch_queue_t dq
= &_dispatch_main_q
;
5893 _dispatch_queue_clear_bound_thread(dq
);
5895 // <rdar://problem/22623242>
5896 // Here is what happens when both this cleanup happens because of
5897 // dispatch_main() being called, and a concurrent enqueuer makes the queue
5900 // _dispatch_queue_cleanup2:
5901 // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
5902 // maximal_barrier();
5903 // if (load(dq_items_tail, seq_cst)) {
5904 // // do the wake up the normal serial queue way
5906 // // do no wake up <----
5910 // store(dq_items_tail, new_tail, release);
5911 // if (load(dq_is_thread_bound, relaxed)) {
5912 // // do the wake up the runloop way <----
5914 // // do the wake up the normal serial way
5917 // what would be bad is to take both paths marked <---- because the queue
5918 // wouldn't be woken up until the next time it's used (which may never
5921 // An enqueuer that speculates the load of the old value of thread_bound
5922 // and then does the store may wake up the main queue the runloop way.
5923 // But then, the cleanup thread will see that store because the load
5924 // of dq_items_tail is sequentially consistent, and we have just thrown away
5927 // By the time cleanup2() is out of the maximally synchronizing barrier,
5928 // no other thread can speculate the wrong load anymore, and both cleanup2()
5929 // and a concurrent enqueuer would treat the queue in the standard non
5932 _dispatch_queue_atomic_flags_clear(dq
,
5933 DQF_THREAD_BOUND
| DQF_CANNOT_TRYSYNC
);
5934 os_atomic_maximally_synchronizing_barrier();
5935 // no need to drop the override, the thread will die anyway
5936 // the barrier above includes an acquire, so it's ok to do this raw
5937 // call to dx_wakeup(0)
5938 dx_wakeup(dq
, 0, 0);
5940 // overload the "probably" variable to mean that dispatch_main() or
5941 // similar non-POSIX API was called
5942 // this has to run before the DISPATCH_COCOA_COMPAT below
5943 // See dispatch_main for call to _dispatch_sig_thread on linux.
5945 if (_dispatch_program_is_probably_callback_driven
) {
5946 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
5947 _DISPATCH_QOS_CLASS_DEFAULT
, true), NULL
, _dispatch_sig_thread
);
5948 sleep(1); // workaround 6778970
5952 #if DISPATCH_COCOA_COMPAT
5953 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
5954 _dispatch_runloop_queue_handle_init
);
5955 _dispatch_runloop_queue_handle_dispose(dq
);
5960 _dispatch_queue_cleanup(void *ctxt
)
5962 if (ctxt
== &_dispatch_main_q
) {
5963 return _dispatch_queue_cleanup2();
5965 // POSIX defines that destructors are only called if 'ctxt' is non-null
5966 DISPATCH_INTERNAL_CRASH(ctxt
,
5967 "Premature thread exit while a dispatch queue is running");
5971 _dispatch_deferred_items_cleanup(void *ctxt
)
5973 // POSIX defines that destructors are only called if 'ctxt' is non-null
5974 DISPATCH_INTERNAL_CRASH(ctxt
,
5975 "Premature thread exit with unhandled deferred items");
5979 _dispatch_frame_cleanup(void *ctxt
)
5981 // POSIX defines that destructors are only called if 'ctxt' is non-null
5982 DISPATCH_INTERNAL_CRASH(ctxt
,
5983 "Premature thread exit while a dispatch frame is active");
5987 _dispatch_context_cleanup(void *ctxt
)
5989 // POSIX defines that destructors are only called if 'ctxt' is non-null
5990 DISPATCH_INTERNAL_CRASH(ctxt
,
5991 "Premature thread exit while a dispatch context is set");