2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
51 static void _dispatch_sig_thread(void *ctxt
);
52 static void _dispatch_cache_cleanup(void *value
);
53 static void _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
,
54 dispatch_function_t func
, pthread_priority_t pp
);
55 static void _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
);
56 static void _dispatch_queue_cleanup(void *ctxt
);
57 static void _dispatch_deferred_items_cleanup(void *ctxt
);
58 static void _dispatch_frame_cleanup(void *ctxt
);
59 static void _dispatch_context_cleanup(void *ctxt
);
60 static void _dispatch_non_barrier_complete(dispatch_queue_t dq
);
61 static inline void _dispatch_global_queue_poke(dispatch_queue_t dq
);
62 #if HAVE_PTHREAD_WORKQUEUES
63 static void _dispatch_worker_thread4(void *context
);
64 #if HAVE_PTHREAD_WORKQUEUE_QOS
65 static void _dispatch_worker_thread3(pthread_priority_t priority
);
67 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
68 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
71 #if DISPATCH_USE_PTHREAD_POOL
72 static void *_dispatch_worker_thread(void *context
);
73 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
76 #if DISPATCH_COCOA_COMPAT
77 static dispatch_once_t _dispatch_main_q_handle_pred
;
78 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq
,
79 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
);
80 static void _dispatch_runloop_queue_handle_init(void *ctxt
);
81 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
);
84 static void _dispatch_root_queues_init_once(void *context
);
85 static dispatch_once_t _dispatch_root_queues_pred
;
88 #pragma mark dispatch_root_queue
90 struct dispatch_pthread_root_queue_context_s
{
91 pthread_attr_t dpq_thread_attr
;
92 dispatch_block_t dpq_thread_configure
;
93 struct dispatch_semaphore_s dpq_thread_mediator
;
94 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks
;
96 typedef struct dispatch_pthread_root_queue_context_s
*
97 dispatch_pthread_root_queue_context_t
;
99 #if DISPATCH_ENABLE_THREAD_POOL
100 static struct dispatch_pthread_root_queue_context_s
101 _dispatch_pthread_root_queue_contexts
[] = {
102 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {
103 .dpq_thread_mediator
= {
104 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
106 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {
107 .dpq_thread_mediator
= {
108 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {
111 .dpq_thread_mediator
= {
112 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
114 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {
115 .dpq_thread_mediator
= {
116 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
118 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {
119 .dpq_thread_mediator
= {
120 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
122 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {
123 .dpq_thread_mediator
= {
124 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
126 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {
127 .dpq_thread_mediator
= {
128 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
130 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {
131 .dpq_thread_mediator
= {
132 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
134 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {
135 .dpq_thread_mediator
= {
136 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
138 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {
139 .dpq_thread_mediator
= {
140 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
142 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {
143 .dpq_thread_mediator
= {
144 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
146 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {
147 .dpq_thread_mediator
= {
148 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore
),
153 #define MAX_PTHREAD_COUNT 255
155 struct dispatch_root_queue_context_s
{
158 unsigned int volatile dgq_pending
;
159 #if HAVE_PTHREAD_WORKQUEUES
161 int dgq_wq_priority
, dgq_wq_options
;
162 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163 pthread_workqueue_t dgq_kworkqueue
;
165 #endif // HAVE_PTHREAD_WORKQUEUES
166 #if DISPATCH_USE_PTHREAD_POOL
168 uint32_t volatile dgq_thread_pool_size
;
171 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
174 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
176 #define WORKQ_PRIO_INVALID (-1)
177 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
178 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
180 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
181 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
184 DISPATCH_CACHELINE_ALIGN
185 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
186 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {{{
187 #if HAVE_PTHREAD_WORKQUEUES
188 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
189 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
192 #if DISPATCH_ENABLE_THREAD_POOL
193 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
194 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
],
197 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {{{
198 #if HAVE_PTHREAD_WORKQUEUES
199 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
200 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
201 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
203 #if DISPATCH_ENABLE_THREAD_POOL
204 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
205 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
],
208 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {{{
209 #if HAVE_PTHREAD_WORKQUEUES
210 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
211 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
214 #if DISPATCH_ENABLE_THREAD_POOL
215 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
216 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
219 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {{{
220 #if HAVE_PTHREAD_WORKQUEUES
221 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
222 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE_CONDITIONAL
,
223 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
225 #if DISPATCH_ENABLE_THREAD_POOL
226 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
227 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
230 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {{{
231 #if HAVE_PTHREAD_WORKQUEUES
232 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
233 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
236 #if DISPATCH_ENABLE_THREAD_POOL
237 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
238 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
241 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {{{
242 #if HAVE_PTHREAD_WORKQUEUES
243 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
244 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
245 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
247 #if DISPATCH_ENABLE_THREAD_POOL
248 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
249 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
252 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {{{
253 #if HAVE_PTHREAD_WORKQUEUES
254 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
255 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
258 #if DISPATCH_ENABLE_THREAD_POOL
259 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {{{
264 #if HAVE_PTHREAD_WORKQUEUES
265 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
266 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
267 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
269 #if DISPATCH_ENABLE_THREAD_POOL
270 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
271 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
274 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {{{
275 #if HAVE_PTHREAD_WORKQUEUES
276 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
277 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
280 #if DISPATCH_ENABLE_THREAD_POOL
281 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
282 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
285 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {{{
286 #if HAVE_PTHREAD_WORKQUEUES
287 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
288 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
289 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
293 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
296 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {{{
297 #if HAVE_PTHREAD_WORKQUEUES
298 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
299 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
302 #if DISPATCH_ENABLE_THREAD_POOL
303 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
304 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
],
307 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {{{
308 #if HAVE_PTHREAD_WORKQUEUES
309 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
310 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
,
311 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
313 #if DISPATCH_ENABLE_THREAD_POOL
314 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
315 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
],
320 // 6618342 Contact the team that owns the Instrument DTrace probe before
321 // renaming this symbol
322 DISPATCH_CACHELINE_ALIGN
323 struct dispatch_queue_s _dispatch_root_queues
[] = {
324 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
325 [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
326 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
327 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
328 .do_ctxt = &_dispatch_root_queue_contexts[ \
329 DISPATCH_ROOT_QUEUE_IDX_##n], \
330 .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
331 .dq_override_voucher = DISPATCH_NO_VOUCHER, \
332 .dq_override = DISPATCH_SATURATED_OVERRIDE, \
335 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS
,
336 .dq_label
= "com.apple.root.maintenance-qos",
339 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT
,
340 .dq_label
= "com.apple.root.maintenance-qos.overcommit",
343 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS
,
344 .dq_label
= "com.apple.root.background-qos",
347 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT
,
348 .dq_label
= "com.apple.root.background-qos.overcommit",
351 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS
,
352 .dq_label
= "com.apple.root.utility-qos",
355 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT
,
356 .dq_label
= "com.apple.root.utility-qos.overcommit",
359 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS
,
360 .dq_label
= "com.apple.root.default-qos",
363 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT
,
364 .dq_label
= "com.apple.root.default-qos.overcommit",
367 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS
,
368 .dq_label
= "com.apple.root.user-initiated-qos",
371 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT
,
372 .dq_label
= "com.apple.root.user-initiated-qos.overcommit",
375 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS
,
376 .dq_label
= "com.apple.root.user-interactive-qos",
379 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT
,
380 .dq_label
= "com.apple.root.user-interactive-qos.overcommit",
385 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
386 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
387 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
389 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
390 &_dispatch_root_queues
[
391 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
392 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
393 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
394 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
395 &_dispatch_root_queues
[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
397 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
398 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
399 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
400 &_dispatch_root_queues
[
401 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
402 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
403 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
404 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
405 &_dispatch_root_queues
[
406 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
408 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
410 #define DISPATCH_PRIORITY_COUNT 5
413 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
414 // maintenance priority
415 DISPATCH_PRIORITY_IDX_BACKGROUND
= 0,
416 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
,
417 DISPATCH_PRIORITY_IDX_LOW
,
418 DISPATCH_PRIORITY_IDX_DEFAULT
,
419 DISPATCH_PRIORITY_IDX_HIGH
,
422 static qos_class_t _dispatch_priority2qos
[] = {
423 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = _DISPATCH_QOS_CLASS_BACKGROUND
,
424 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = _DISPATCH_QOS_CLASS_UTILITY
,
425 [DISPATCH_PRIORITY_IDX_LOW
] = _DISPATCH_QOS_CLASS_UTILITY
,
426 [DISPATCH_PRIORITY_IDX_DEFAULT
] = _DISPATCH_QOS_CLASS_DEFAULT
,
427 [DISPATCH_PRIORITY_IDX_HIGH
] = _DISPATCH_QOS_CLASS_USER_INITIATED
,
430 #if HAVE_PTHREAD_WORKQUEUE_QOS
431 static const int _dispatch_priority2wq
[] = {
432 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = WORKQ_BG_PRIOQUEUE
,
433 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = WORKQ_NON_INTERACTIVE_PRIOQUEUE
,
434 [DISPATCH_PRIORITY_IDX_LOW
] = WORKQ_LOW_PRIOQUEUE
,
435 [DISPATCH_PRIORITY_IDX_DEFAULT
] = WORKQ_DEFAULT_PRIOQUEUE
,
436 [DISPATCH_PRIORITY_IDX_HIGH
] = WORKQ_HIGH_PRIOQUEUE
,
440 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
441 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
443 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
444 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
447 // 6618342 Contact the team that owns the Instrument DTrace probe before
448 // renaming this symbol
449 DISPATCH_CACHELINE_ALIGN
450 struct dispatch_queue_s _dispatch_mgr_q
= {
451 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr
),
452 .dq_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(1),
453 .do_targetq
= &_dispatch_mgr_root_queue
,
454 .dq_label
= "com.apple.libdispatch-manager",
456 .dq_override_voucher
= DISPATCH_NO_VOUCHER
,
457 .dq_override
= DISPATCH_SATURATED_OVERRIDE
,
462 dispatch_get_global_queue(long priority
, unsigned long flags
)
464 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
465 return DISPATCH_BAD_INPUT
;
467 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
468 _dispatch_root_queues_init_once
);
471 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
472 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
473 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
]
475 // map maintenance to background on old kernel
476 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
478 qos
= (qos_class_t
)priority
;
481 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
482 case DISPATCH_QUEUE_PRIORITY_BACKGROUND
:
483 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
485 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE
:
486 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
];
488 case DISPATCH_QUEUE_PRIORITY_LOW
:
489 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_LOW
];
491 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
492 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_DEFAULT
];
494 case DISPATCH_QUEUE_PRIORITY_HIGH
:
495 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
497 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
498 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
499 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
]
501 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
507 qos
= (qos_class_t
)priority
;
510 return _dispatch_get_root_queue(qos
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
513 DISPATCH_ALWAYS_INLINE
514 static inline dispatch_queue_t
515 _dispatch_get_current_queue(void)
517 return _dispatch_queue_get_current() ?:
518 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
522 dispatch_get_current_queue(void)
524 return _dispatch_get_current_queue();
527 DISPATCH_NOINLINE DISPATCH_NORETURN
529 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
531 _dispatch_client_assert_fail(
532 "Block was %sexpected to execute on queue [%s]",
533 expected
? "" : "not ", dq
->dq_label
?: "");
536 DISPATCH_NOINLINE DISPATCH_NORETURN
538 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq
)
540 _dispatch_client_assert_fail(
541 "Block was expected to act as a barrier on queue [%s]",
546 dispatch_assert_queue(dispatch_queue_t dq
)
548 unsigned long metatype
= dx_metatype(dq
);
549 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
550 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
551 "dispatch_assert_queue()");
553 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
554 if (unlikely(_dq_state_drain_pended(dq_state
))) {
557 if (likely(_dq_state_drain_owner(dq_state
) == _dispatch_tid_self())) {
560 if (likely(dq
->dq_width
> 1)) {
561 // we can look at the width: if it is changing while we read it,
562 // it means that a barrier is running on `dq` concurrently, which
563 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
564 if (fastpath(_dispatch_thread_frame_find_queue(dq
))) {
569 _dispatch_assert_queue_fail(dq
, true);
573 dispatch_assert_queue_not(dispatch_queue_t dq
)
575 unsigned long metatype
= dx_metatype(dq
);
576 if (unlikely(metatype
!= _DISPATCH_QUEUE_TYPE
)) {
577 DISPATCH_CLIENT_CRASH(metatype
, "invalid queue passed to "
578 "dispatch_assert_queue_not()");
580 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
581 if (_dq_state_drain_pended(dq_state
)) {
584 if (likely(_dq_state_drain_owner(dq_state
) != _dispatch_tid_self())) {
585 if (likely(dq
->dq_width
== 1)) {
586 // we can look at the width: if it is changing while we read it,
587 // it means that a barrier is running on `dq` concurrently, which
588 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
591 if (likely(!_dispatch_thread_frame_find_queue(dq
))) {
595 _dispatch_assert_queue_fail(dq
, false);
599 dispatch_assert_queue_barrier(dispatch_queue_t dq
)
601 dispatch_assert_queue(dq
);
603 if (likely(dq
->dq_width
== 1)) {
607 if (likely(dq
->do_targetq
)) {
608 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
609 if (likely(_dq_state_is_in_barrier(dq_state
))) {
614 _dispatch_assert_queue_barrier_fail(dq
);
617 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
618 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
619 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
621 #define _dispatch_root_queue_debug(...)
622 #define _dispatch_debug_root_queue(...)
626 #pragma mark dispatch_init
628 #if HAVE_PTHREAD_WORKQUEUE_QOS
629 pthread_priority_t _dispatch_background_priority
;
630 pthread_priority_t _dispatch_user_initiated_priority
;
633 _dispatch_root_queues_init_qos(int supported
)
635 pthread_priority_t p
;
638 for (i
= 0; i
< DISPATCH_PRIORITY_COUNT
; i
++) {
639 p
= _pthread_qos_class_encode_workqueue(_dispatch_priority2wq
[i
], 0);
640 qos
= _pthread_qos_class_decode(p
, NULL
, NULL
);
641 dispatch_assert(qos
!= _DISPATCH_QOS_CLASS_UNSPECIFIED
);
642 _dispatch_priority2qos
[i
] = qos
;
644 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
645 qos
= _dispatch_root_queue_contexts
[i
].dgq_qos
;
646 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
647 !(supported
& WORKQ_FEATURE_MAINTENANCE
)) {
650 unsigned long flags
= i
& 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
: 0;
651 flags
|= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG
;
652 if (i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
||
653 i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
) {
654 flags
|= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
656 p
= _pthread_qos_class_encode(qos
, 0, flags
);
657 _dispatch_root_queues
[i
].dq_priority
= (dispatch_priority_t
)p
;
660 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
663 _dispatch_root_queues_init_workq(int *wq_supported
)
668 #if HAVE_PTHREAD_WORKQUEUES
669 bool disable_wq
= false;
670 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
671 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
673 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
674 bool disable_qos
= false;
676 disable_qos
= slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
678 #if DISPATCH_USE_KEVENT_WORKQUEUE
679 bool disable_kevent_wq
= false;
681 disable_kevent_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
684 if (!disable_wq
&& !disable_qos
) {
685 *wq_supported
= _pthread_workqueue_supported();
686 #if DISPATCH_USE_KEVENT_WORKQUEUE
687 if (!disable_kevent_wq
&& (*wq_supported
& WORKQ_FEATURE_KEVENT
)) {
688 r
= _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3
,
689 (pthread_workqueue_function_kevent_t
)
690 _dispatch_kevent_worker_thread
,
691 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
692 #if DISPATCH_USE_MGR_THREAD
693 _dispatch_kevent_workqueue_enabled
= !r
;
695 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
696 _dispatch_evfilt_machport_direct_enabled
= !r
;
701 if (*wq_supported
& WORKQ_FEATURE_FINEPRIO
) {
702 #if DISPATCH_USE_MGR_THREAD
703 r
= _pthread_workqueue_init(_dispatch_worker_thread3
,
704 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
708 if (result
) _dispatch_root_queues_init_qos(*wq_supported
);
710 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
711 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
712 if (!result
&& !disable_wq
) {
713 pthread_workqueue_setdispatchoffset_np(
714 offsetof(struct dispatch_queue_s
, dq_serialnum
));
715 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
716 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 (void)dispatch_assume_zero(r
);
721 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
724 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725 pthread_workqueue_attr_t pwq_attr
;
727 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
728 (void)dispatch_assume_zero(r
);
732 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
733 pthread_workqueue_t pwq
= NULL
;
734 dispatch_root_queue_context_t qc
;
735 qc
= &_dispatch_root_queue_contexts
[i
];
736 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
737 if (!disable_wq
&& qc
->dgq_wq_priority
!= WORKQ_PRIO_INVALID
) {
738 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
739 qc
->dgq_wq_priority
);
740 (void)dispatch_assume_zero(r
);
741 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
743 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
744 (void)dispatch_assume_zero(r
);
745 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
746 (void)dispatch_assume_zero(r
);
747 result
= result
|| dispatch_assume(pwq
);
749 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
750 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
752 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
754 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
755 (void)dispatch_assume_zero(r
);
759 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
760 #endif // HAVE_PTHREAD_WORKQUEUES
764 #if DISPATCH_USE_PTHREAD_POOL
766 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
767 uint8_t pool_size
, bool overcommit
)
769 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
770 uint32_t thread_pool_size
= overcommit
? MAX_PTHREAD_COUNT
:
771 dispatch_hw_config(active_cpus
);
772 if (slowpath(pool_size
) && pool_size
< thread_pool_size
) {
773 thread_pool_size
= pool_size
;
775 qc
->dgq_thread_pool_size
= thread_pool_size
;
776 #if HAVE_PTHREAD_WORKQUEUES
778 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
779 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
780 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
781 #if HAVE_PTHREAD_WORKQUEUE_QOS
782 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
783 &pqc
->dpq_thread_attr
, qc
->dgq_qos
, 0));
786 #endif // HAVE_PTHREAD_WORKQUEUES
788 // override the default FIFO behavior for the pool semaphores
789 kern_return_t kr
= semaphore_create(mach_task_self(),
790 &pqc
->dpq_thread_mediator
.dsema_port
, SYNC_POLICY_LIFO
, 0);
791 DISPATCH_VERIFY_MIG(kr
);
792 (void)dispatch_assume_zero(kr
);
793 (void)dispatch_assume(pqc
->dpq_thread_mediator
.dsema_port
);
795 /* XXXRW: POSIX semaphores don't support LIFO? */
796 int ret
= sem_init(&(pqc
->dpq_thread_mediator
.dsema_sem
), 0, 0);
797 (void)dispatch_assume_zero(ret
);
800 #endif // DISPATCH_USE_PTHREAD_POOL
802 static dispatch_once_t _dispatch_root_queues_pred
;
805 _dispatch_root_queues_init(void)
807 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
808 _dispatch_root_queues_init_once
);
812 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED
)
815 _dispatch_fork_becomes_unsafe();
816 if (!_dispatch_root_queues_init_workq(&wq_supported
)) {
817 #if DISPATCH_ENABLE_THREAD_POOL
819 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
820 bool overcommit
= true;
821 #if TARGET_OS_EMBEDDED
822 // some software hangs if the non-overcommitting queues do not
823 // overcommit when threads block. Someday, this behavior should
824 // apply to all platforms
829 _dispatch_root_queue_init_pthread_pool(
830 &_dispatch_root_queue_contexts
[i
], 0, overcommit
);
833 DISPATCH_INTERNAL_CRASH((errno
<< 16) | wq_supported
,
834 "Root queue initialization failed");
835 #endif // DISPATCH_ENABLE_THREAD_POOL
839 DISPATCH_EXPORT DISPATCH_NOTHROW
841 libdispatch_init(void)
843 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT
== 6);
844 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 12);
846 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
847 -DISPATCH_QUEUE_PRIORITY_HIGH
);
848 dispatch_assert(countof(_dispatch_root_queues
) ==
849 DISPATCH_ROOT_QUEUE_COUNT
);
850 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
851 DISPATCH_ROOT_QUEUE_COUNT
);
852 dispatch_assert(countof(_dispatch_priority2qos
) ==
853 DISPATCH_PRIORITY_COUNT
);
854 #if HAVE_PTHREAD_WORKQUEUE_QOS
855 dispatch_assert(countof(_dispatch_priority2wq
) ==
856 DISPATCH_PRIORITY_COUNT
);
858 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
859 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
860 sizeof(_dispatch_wq2root_queues
[0][0]) ==
861 WORKQ_NUM_PRIOQUEUE
* 2);
863 #if DISPATCH_ENABLE_THREAD_POOL
864 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts
) ==
865 DISPATCH_ROOT_QUEUE_COUNT
);
868 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_next
) ==
869 offsetof(struct dispatch_object_s
, do_next
));
870 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_vtable
) ==
871 offsetof(struct dispatch_object_s
, do_vtable
));
872 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
873 DISPATCH_CONTINUATION_SIZE
);
874 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
876 dispatch_assert(offsetof(struct dispatch_queue_s
, dq_state
) % _Alignof(uint64_t) == 0);
877 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
878 DISPATCH_CACHELINE_SIZE
== 0);
881 #if HAVE_PTHREAD_WORKQUEUE_QOS
882 // 26497968 _dispatch_user_initiated_priority should be set for qos
883 // propagation to work properly
884 pthread_priority_t p
= _pthread_qos_class_encode(qos_class_main(), 0, 0);
885 _dispatch_main_q
.dq_priority
= (dispatch_priority_t
)p
;
886 _dispatch_main_q
.dq_override
= p
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
887 p
= _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED
, 0, 0);
888 _dispatch_user_initiated_priority
= p
;
889 p
= _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND
, 0, 0);
890 _dispatch_background_priority
= p
;
892 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
893 _dispatch_set_qos_class_enabled
= 1;
898 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
899 _dispatch_thread_key_create(&__dispatch_tsd_key
, _libdispatch_tsd_cleanup
);
901 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
902 _dispatch_thread_key_create(&dispatch_deferred_items_key
,
903 _dispatch_deferred_items_cleanup
);
904 _dispatch_thread_key_create(&dispatch_frame_key
, _dispatch_frame_cleanup
);
905 _dispatch_thread_key_create(&dispatch_voucher_key
, _voucher_thread_cleanup
);
906 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
907 _dispatch_thread_key_create(&dispatch_context_key
, _dispatch_context_cleanup
);
908 _dispatch_thread_key_create(&dispatch_defaultpriority_key
, NULL
);
909 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key
,
911 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
912 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
914 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
915 if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
) {
916 _dispatch_thread_key_create(&dispatch_sema4_key
,
917 _dispatch_thread_semaphore_dispose
);
922 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
923 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
924 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
];
927 _dispatch_queue_set_current(&_dispatch_main_q
);
928 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
930 #if DISPATCH_USE_PTHREAD_ATFORK
931 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
932 dispatch_atfork_parent
, dispatch_atfork_child
));
934 _dispatch_hw_config_init();
935 _dispatch_vtable_init();
938 _dispatch_introspection_init();
942 static dispatch_once_t _dispatch_mach_host_port_pred
;
943 static mach_port_t _dispatch_mach_host_port
;
946 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED
)
949 mach_port_t mp
, mhp
= mach_host_self();
950 kr
= host_get_host_port(mhp
, &mp
);
951 DISPATCH_VERIFY_MIG(kr
);
953 // mach_host_self returned the HOST_PRIV port
954 kr
= mach_port_deallocate(mach_task_self(), mhp
);
955 DISPATCH_VERIFY_MIG(kr
);
957 } else if (kr
!= KERN_INVALID_ARGUMENT
) {
958 (void)dispatch_assume_zero(kr
);
960 if (!fastpath(mhp
)) {
961 DISPATCH_CLIENT_CRASH(kr
, "Could not get unprivileged host port");
963 _dispatch_mach_host_port
= mhp
;
967 _dispatch_get_mach_host_port(void)
969 dispatch_once_f(&_dispatch_mach_host_port_pred
, NULL
,
970 _dispatch_mach_host_port_init
);
971 return _dispatch_mach_host_port
;
975 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
977 #include <sys/syscall.h>
980 DISPATCH_ALWAYS_INLINE
984 return (pid_t
) syscall(SYS_gettid
);
987 #error "SYS_gettid unavailable on this system"
990 #define _tsd_call_cleanup(k, f) do { \
991 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
995 _libdispatch_tsd_cleanup(void *ctx
)
997 struct dispatch_tsd
*tsd
= (struct dispatch_tsd
*) ctx
;
999 _tsd_call_cleanup(dispatch_queue_key
, _dispatch_queue_cleanup
);
1000 _tsd_call_cleanup(dispatch_frame_key
, _dispatch_frame_cleanup
);
1001 _tsd_call_cleanup(dispatch_cache_key
, _dispatch_cache_cleanup
);
1002 _tsd_call_cleanup(dispatch_context_key
, _dispatch_context_cleanup
);
1003 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key
,
1005 _tsd_call_cleanup(dispatch_defaultpriority_key
, NULL
);
1006 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1007 _tsd_call_cleanup(dispatch_bcounter_key
, NULL
);
1009 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
1010 _tsd_call_cleanup(dispatch_sema4_key
, _dispatch_thread_semaphore_dispose
);
1012 _tsd_call_cleanup(dispatch_priority_key
, NULL
);
1013 _tsd_call_cleanup(dispatch_voucher_key
, _voucher_thread_cleanup
);
1014 _tsd_call_cleanup(dispatch_deferred_items_key
,
1015 _dispatch_deferred_items_cleanup
);
1021 libdispatch_tsd_init(void)
1023 pthread_setspecific(__dispatch_tsd_key
, &__dispatch_tsd
);
1024 __dispatch_tsd
.tid
= gettid();
1028 DISPATCH_EXPORT DISPATCH_NOTHROW
1030 dispatch_atfork_child(void)
1032 void *crash
= (void *)0x100;
1036 _dispatch_mach_host_port_pred
= 0;
1037 _dispatch_mach_host_port
= MACH_VOUCHER_NULL
;
1039 _voucher_atfork_child();
1040 if (!_dispatch_is_multithreaded_inline()) {
1041 // clear the _PROHIBIT bit if set
1042 _dispatch_unsafe_fork
= 0;
1045 _dispatch_unsafe_fork
= 0;
1046 _dispatch_child_of_unsafe_fork
= true;
1048 _dispatch_main_q
.dq_items_head
= crash
;
1049 _dispatch_main_q
.dq_items_tail
= crash
;
1051 _dispatch_mgr_q
.dq_items_head
= crash
;
1052 _dispatch_mgr_q
.dq_items_tail
= crash
;
1054 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1055 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1056 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1061 #pragma mark dispatch_queue_attr_t
1063 DISPATCH_ALWAYS_INLINE
1065 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class
, int relative_priority
)
1067 qos_class_t qos
= (qos_class_t
)qos_class
;
1069 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
1070 case _DISPATCH_QOS_CLASS_BACKGROUND
:
1071 case _DISPATCH_QOS_CLASS_UTILITY
:
1072 case _DISPATCH_QOS_CLASS_DEFAULT
:
1073 case _DISPATCH_QOS_CLASS_USER_INITIATED
:
1074 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
1075 case _DISPATCH_QOS_CLASS_UNSPECIFIED
:
1080 if (relative_priority
> 0 || relative_priority
< QOS_MIN_RELATIVE_PRIORITY
){
1086 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1087 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1090 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx
[] = {
1091 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED
),
1092 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE
),
1093 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND
),
1094 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY
),
1095 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT
),
1096 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED
),
1097 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE
),
1100 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1101 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1102 DQA_INDEX_NON_OVERCOMMIT : \
1103 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1104 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1106 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1107 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1109 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1110 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1112 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1115 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1117 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1119 static inline dispatch_queue_attr_t
1120 _dispatch_get_queue_attr(qos_class_t qos
, int prio
,
1121 _dispatch_queue_attr_overcommit_t overcommit
,
1122 dispatch_autorelease_frequency_t frequency
,
1123 bool concurrent
, bool inactive
)
1125 return (dispatch_queue_attr_t
)&_dispatch_queue_attrs
1126 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos
)]
1127 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio
)]
1128 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit
)]
1129 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency
)]
1130 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent
)]
1131 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive
)];
1134 dispatch_queue_attr_t
1135 _dispatch_get_default_queue_attr(void)
1137 return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED
, 0,
1138 _dispatch_queue_attr_overcommit_unspecified
,
1139 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
, false, false);
1142 dispatch_queue_attr_t
1143 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa
,
1144 dispatch_qos_class_t qos_class
, int relative_priority
)
1146 if (!_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
1147 return DISPATCH_BAD_INPUT
;
1149 if (!slowpath(dqa
)) {
1150 dqa
= _dispatch_get_default_queue_attr();
1151 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1152 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1154 return _dispatch_get_queue_attr(qos_class
, relative_priority
,
1155 dqa
->dqa_overcommit
, dqa
->dqa_autorelease_frequency
,
1156 dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1159 dispatch_queue_attr_t
1160 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa
)
1162 if (!slowpath(dqa
)) {
1163 dqa
= _dispatch_get_default_queue_attr();
1164 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1165 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1167 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1168 dqa
->dqa_relative_priority
, dqa
->dqa_overcommit
,
1169 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
, true);
1172 dispatch_queue_attr_t
1173 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa
,
1176 if (!slowpath(dqa
)) {
1177 dqa
= _dispatch_get_default_queue_attr();
1178 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1179 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1181 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1182 dqa
->dqa_relative_priority
, overcommit
?
1183 _dispatch_queue_attr_overcommit_enabled
:
1184 _dispatch_queue_attr_overcommit_disabled
,
1185 dqa
->dqa_autorelease_frequency
, dqa
->dqa_concurrent
,
1189 dispatch_queue_attr_t
1190 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa
,
1191 dispatch_autorelease_frequency_t frequency
)
1193 switch (frequency
) {
1194 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT
:
1195 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1196 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1199 return DISPATCH_BAD_INPUT
;
1201 if (!slowpath(dqa
)) {
1202 dqa
= _dispatch_get_default_queue_attr();
1203 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1204 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1206 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1207 dqa
->dqa_relative_priority
, dqa
->dqa_overcommit
,
1208 frequency
, dqa
->dqa_concurrent
, dqa
->dqa_inactive
);
1212 #pragma mark dispatch_queue_t
1218 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1219 // we use 'xadd' on Intel, so the initial value == next assigned
1220 unsigned long volatile _dispatch_queue_serial_numbers
= 16;
1223 static dispatch_queue_t
1224 _dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1225 dispatch_queue_t tq
, bool legacy
)
1227 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1228 // Be sure the root queue priorities are set
1229 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
1230 _dispatch_root_queues_init_once
);
1232 if (!slowpath(dqa
)) {
1233 dqa
= _dispatch_get_default_queue_attr();
1234 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1235 DISPATCH_CLIENT_CRASH(dqa
->do_vtable
, "Invalid queue attribute");
1239 // Step 1: Normalize arguments (qos, overcommit, tq)
1242 qos_class_t qos
= dqa
->dqa_qos_class
;
1243 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1244 if (qos
== _DISPATCH_QOS_CLASS_USER_INTERACTIVE
&&
1245 !_dispatch_root_queues
[
1246 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
].dq_priority
) {
1247 qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
;
1250 bool maintenance_fallback
= false;
1251 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1252 maintenance_fallback
= true;
1253 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1254 if (maintenance_fallback
) {
1255 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
1256 !_dispatch_root_queues
[
1257 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
].dq_priority
) {
1258 qos
= _DISPATCH_QOS_CLASS_BACKGROUND
;
1262 _dispatch_queue_attr_overcommit_t overcommit
= dqa
->dqa_overcommit
;
1263 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
&& tq
) {
1264 if (tq
->do_targetq
) {
1265 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify both overcommit and "
1266 "a non-global target queue");
1270 if (tq
&& !tq
->do_targetq
&&
1271 tq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
1272 // Handle discrepancies between attr and target queue, attributes win
1273 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1274 if (tq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
1275 overcommit
= _dispatch_queue_attr_overcommit_enabled
;
1277 overcommit
= _dispatch_queue_attr_overcommit_disabled
;
1280 if (qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1281 tq
= _dispatch_get_root_queue_with_overcommit(tq
,
1282 overcommit
== _dispatch_queue_attr_overcommit_enabled
);
1286 } else if (tq
&& !tq
->do_targetq
) {
1287 // target is a pthread or runloop root queue, setting QoS or overcommit
1289 if (overcommit
!= _dispatch_queue_attr_overcommit_unspecified
) {
1290 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify an overcommit attribute "
1291 "and use this kind of target queue");
1293 if (qos
!= _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1294 DISPATCH_CLIENT_CRASH(tq
, "Cannot specify a QoS attribute "
1295 "and use this kind of target queue");
1298 if (overcommit
== _dispatch_queue_attr_overcommit_unspecified
) {
1299 // Serial queues default to overcommit!
1300 overcommit
= dqa
->dqa_concurrent
?
1301 _dispatch_queue_attr_overcommit_disabled
:
1302 _dispatch_queue_attr_overcommit_enabled
;
1306 qos_class_t tq_qos
= qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
?
1307 _DISPATCH_QOS_CLASS_DEFAULT
: qos
;
1308 tq
= _dispatch_get_root_queue(tq_qos
, overcommit
==
1309 _dispatch_queue_attr_overcommit_enabled
);
1310 if (slowpath(!tq
)) {
1311 DISPATCH_CLIENT_CRASH(qos
, "Invalid queue attribute");
1316 // Step 2: Initialize the queue
1320 // if any of these attributes is specified, use non legacy classes
1321 if (dqa
->dqa_inactive
|| dqa
->dqa_autorelease_frequency
) {
1327 dispatch_queue_flags_t dqf
= 0;
1329 vtable
= DISPATCH_VTABLE(queue
);
1330 } else if (dqa
->dqa_concurrent
) {
1331 vtable
= DISPATCH_VTABLE(queue_concurrent
);
1333 vtable
= DISPATCH_VTABLE(queue_serial
);
1335 switch (dqa
->dqa_autorelease_frequency
) {
1336 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER
:
1337 dqf
|= DQF_AUTORELEASE_NEVER
;
1339 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM
:
1340 dqf
|= DQF_AUTORELEASE_ALWAYS
;
1344 const char *tmp
= _dispatch_strdup_if_mutable(label
);
1346 dqf
|= DQF_LABEL_NEEDS_FREE
;
1351 dispatch_queue_t dq
= _dispatch_alloc(vtable
,
1352 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
1353 _dispatch_queue_init(dq
, dqf
, dqa
->dqa_concurrent
?
1354 DISPATCH_QUEUE_WIDTH_MAX
: 1, dqa
->dqa_inactive
);
1356 dq
->dq_label
= label
;
1358 #if HAVE_PTHREAD_WORKQUEUE_QOS
1359 dq
->dq_priority
= (dispatch_priority_t
)_pthread_qos_class_encode(qos
,
1360 dqa
->dqa_relative_priority
,
1361 overcommit
== _dispatch_queue_attr_overcommit_enabled
?
1362 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
: 0);
1364 _dispatch_retain(tq
);
1365 if (qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1366 // legacy way of inherithing the QoS from the target
1367 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1369 if (!dqa
->dqa_inactive
) {
1370 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
1372 dq
->do_targetq
= tq
;
1373 _dispatch_object_debug(dq
, "%s", __func__
);
1374 return _dispatch_introspection_queue_create(dq
);
1378 dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1379 dispatch_queue_t tq
)
1381 return _dispatch_queue_create_with_target(label
, dqa
, tq
, false);
1385 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
1387 return _dispatch_queue_create_with_target(label
, attr
,
1388 DISPATCH_TARGET_QUEUE_DEFAULT
, true);
1392 dispatch_queue_create_with_accounting_override_voucher(const char *label
,
1393 dispatch_queue_attr_t attr
, voucher_t voucher
)
1395 dispatch_queue_t dq
= dispatch_queue_create_with_target(label
, attr
,
1396 DISPATCH_TARGET_QUEUE_DEFAULT
);
1397 dq
->dq_override_voucher
= _voucher_create_accounting_voucher(voucher
);
1402 _dispatch_queue_destroy(dispatch_queue_t dq
)
1404 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
1405 uint64_t initial_state
= DISPATCH_QUEUE_STATE_INIT_VALUE(dq
->dq_width
);
1407 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
1408 initial_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
;
1410 if (dx_type(dq
) == DISPATCH_SOURCE_KEVENT_TYPE
) {
1411 // dispatch_cancel_and_wait may apply overrides in a racy way with
1412 // the source cancellation finishing. This race is expensive and not
1413 // really worthwhile to resolve since the source becomes dead anyway.
1414 dq_state
&= ~DISPATCH_QUEUE_HAS_OVERRIDE
;
1416 if (slowpath(dq_state
!= initial_state
)) {
1417 if (_dq_state_drain_locked(dq_state
)) {
1418 DISPATCH_CLIENT_CRASH(dq
, "Release of a locked queue");
1423 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state
,
1424 "Release of a queue with corrupt state");
1426 if (slowpath(dq
== _dispatch_queue_get_current())) {
1427 DISPATCH_CLIENT_CRASH(dq
, "Release of a queue by itself");
1429 if (slowpath(dq
->dq_items_tail
)) {
1430 DISPATCH_CLIENT_CRASH(dq
->dq_items_tail
,
1431 "Release of a queue while items are enqueued");
1434 // trash the queue so that use after free will crash
1435 dq
->dq_items_head
= (void *)0x200;
1436 dq
->dq_items_tail
= (void *)0x200;
1437 // poison the state with something that is suspended and is easy to spot
1438 dq
->dq_state
= 0xdead000000000000;
1440 dispatch_queue_t dqsq
= os_atomic_xchg2o(dq
, dq_specific_q
,
1441 (void *)0x200, relaxed
);
1443 _dispatch_release(dqsq
);
1445 if (dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
) {
1446 if (dq
->dq_override_voucher
) _voucher_release(dq
->dq_override_voucher
);
1447 dq
->dq_override_voucher
= DISPATCH_NO_VOUCHER
;
1451 // 6618342 Contact the team that owns the Instrument DTrace probe before
1452 // renaming this symbol
1454 _dispatch_queue_dispose(dispatch_queue_t dq
)
1456 _dispatch_object_debug(dq
, "%s", __func__
);
1457 _dispatch_introspection_queue_dispose(dq
);
1458 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
1459 free((void*)dq
->dq_label
);
1461 _dispatch_queue_destroy(dq
);
1466 _dispatch_queue_suspend_slow(dispatch_queue_t dq
)
1468 uint64_t dq_state
, value
, delta
;
1470 _dispatch_queue_sidelock_lock(dq
);
1472 // what we want to transfer (remove from dq_state)
1473 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1474 // but this is a suspend so add a suspend count at the same time
1475 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1476 if (dq
->dq_side_suspend_cnt
== 0) {
1477 // we substract delta from dq_state, and we want to set this bit
1478 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1481 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1482 // unsigned underflow of the substraction can happen because other
1483 // threads could have touched this value while we were trying to acquire
1484 // the lock, or because another thread raced us to do the same operation
1485 // and got to the lock first.
1486 if (slowpath(os_sub_overflow(dq_state
, delta
, &value
))) {
1487 os_atomic_rmw_loop_give_up(goto retry
);
1490 if (slowpath(os_add_overflow(dq
->dq_side_suspend_cnt
,
1491 DISPATCH_QUEUE_SUSPEND_HALF
, &dq
->dq_side_suspend_cnt
))) {
1492 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1494 return _dispatch_queue_sidelock_unlock(dq
);
1497 _dispatch_queue_sidelock_unlock(dq
);
1498 return dx_vtable(dq
)->do_suspend(dq
);
1502 _dispatch_queue_suspend(dispatch_queue_t dq
)
1504 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1506 uint64_t dq_state
, value
;
1508 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1509 value
= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1510 if (slowpath(os_add_overflow(dq_state
, value
, &value
))) {
1511 os_atomic_rmw_loop_give_up({
1512 return _dispatch_queue_suspend_slow(dq
);
1517 if (!_dq_state_is_suspended(dq_state
)) {
1518 // rdar://8181908 we need to extend the queue life for the duration
1519 // of the call to wakeup at _dispatch_queue_resume() time.
1520 _dispatch_retain(dq
);
1526 _dispatch_queue_resume_slow(dispatch_queue_t dq
)
1528 uint64_t dq_state
, value
, delta
;
1530 _dispatch_queue_sidelock_lock(dq
);
1532 // what we want to transfer
1533 delta
= DISPATCH_QUEUE_SUSPEND_HALF
* DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1534 // but this is a resume so consume a suspend count at the same time
1535 delta
-= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1536 switch (dq
->dq_side_suspend_cnt
) {
1539 case DISPATCH_QUEUE_SUSPEND_HALF
:
1540 // we will transition the side count to 0, so we want to clear this bit
1541 delta
-= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
;
1544 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1545 // unsigned overflow of the addition can happen because other
1546 // threads could have touched this value while we were trying to acquire
1547 // the lock, or because another thread raced us to do the same operation
1548 // and got to the lock first.
1549 if (slowpath(os_add_overflow(dq_state
, delta
, &value
))) {
1550 os_atomic_rmw_loop_give_up(goto retry
);
1553 dq
->dq_side_suspend_cnt
-= DISPATCH_QUEUE_SUSPEND_HALF
;
1554 return _dispatch_queue_sidelock_unlock(dq
);
1557 _dispatch_queue_sidelock_unlock(dq
);
1558 return dx_vtable(dq
)->do_resume(dq
, false);
1563 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq
)
1565 // Step 2: run the activation finalizer
1566 if (dx_vtable(dq
)->do_finalize_activation
) {
1567 dx_vtable(dq
)->do_finalize_activation(dq
);
1569 // Step 3: consume the suspend count
1570 return dx_vtable(dq
)->do_resume(dq
, false);
1574 _dispatch_queue_resume(dispatch_queue_t dq
, bool activate
)
1576 // covers all suspend and inactive bits, including side suspend bit
1577 const uint64_t suspend_bits
= DISPATCH_QUEUE_SUSPEND_BITS_MASK
;
1578 // backward compatibility: only dispatch sources can abuse
1579 // dispatch_resume() to really mean dispatch_activate()
1580 bool resume_can_activate
= (dx_type(dq
) == DISPATCH_SOURCE_KEVENT_TYPE
);
1581 uint64_t dq_state
, value
;
1583 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
);
1585 // Activation is a bit tricky as it needs to finalize before the wakeup.
1587 // If after doing its updates to the suspend count and/or inactive bit,
1588 // the last suspension related bit that would remain is the
1589 // NEEDS_ACTIVATION one, then this function:
1591 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1593 // 2. runs the activation finalizer
1594 // 3. consumes the suspend count set in (1), and finishes the resume flow
1596 // Concurrently, some property setters such as setting dispatch source
1597 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1598 // before activation. These protect their action by taking a suspend count.
1599 // Step (1) above cannot happen if such a setter has locked the object.
1601 // relaxed atomic because this doesn't publish anything, this is only
1602 // about picking the thread that gets to finalize the activation
1603 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
1604 if ((dq_state
& suspend_bits
) ==
1605 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1606 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1607 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1608 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1609 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1610 } else if (_dq_state_is_inactive(dq_state
)) {
1611 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1612 // simple activation because sc is not 0
1613 // resume will deal with na:1 later
1614 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
;
1616 // object already active, this is a no-op, just exit
1617 os_atomic_rmw_loop_give_up(return);
1621 // release barrier needed to publish the effect of
1622 // - dispatch_set_target_queue()
1623 // - dispatch_set_*_handler()
1624 // - do_finalize_activation()
1625 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, release
, {
1626 if ((dq_state
& suspend_bits
) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1627 + DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1628 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1629 value
= dq_state
- DISPATCH_QUEUE_NEEDS_ACTIVATION
;
1630 } else if (resume_can_activate
&& (dq_state
& suspend_bits
) ==
1631 DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_INACTIVE
) {
1632 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1633 value
= dq_state
- DISPATCH_QUEUE_INACTIVE
1634 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1635 + DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1637 value
= DISPATCH_QUEUE_SUSPEND_INTERVAL
;
1638 if (slowpath(os_sub_overflow(dq_state
, value
, &value
))) {
1639 // underflow means over-resume or a suspend count transfer
1640 // to the side count is needed
1641 os_atomic_rmw_loop_give_up({
1642 if (!(dq_state
& DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT
)) {
1645 return _dispatch_queue_resume_slow(dq
);
1648 if (_dq_state_is_runnable(value
) &&
1649 !_dq_state_drain_locked(value
)) {
1650 uint64_t full_width
= value
;
1651 if (_dq_state_has_pending_barrier(value
)) {
1652 full_width
-= DISPATCH_QUEUE_PENDING_BARRIER
;
1653 full_width
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
1654 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
1656 full_width
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
1657 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
1659 if ((full_width
& DISPATCH_QUEUE_WIDTH_MASK
) ==
1660 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
1662 value
&= ~DISPATCH_QUEUE_DIRTY
;
1663 value
|= _dispatch_tid_self();
1670 if ((dq_state
^ value
) & DISPATCH_QUEUE_NEEDS_ACTIVATION
) {
1671 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1672 return _dispatch_queue_resume_finalize_activation(dq
);
1676 // if we're still in an activate codepath here we should have
1677 // { sc:>0 na:1 }, if not we've got a corrupt state
1678 if (!fastpath(_dq_state_is_suspended(value
))) {
1679 DISPATCH_CLIENT_CRASH(dq
, "Invalid suspension state");
1684 if (_dq_state_is_suspended(value
)) {
1688 if ((dq_state
^ value
) & DISPATCH_QUEUE_IN_BARRIER
) {
1689 _dispatch_release(dq
);
1690 return _dispatch_try_lock_transfer_or_wakeup(dq
);
1693 if (_dq_state_should_wakeup(value
)) {
1694 // <rdar://problem/14637483>
1695 // seq_cst wrt state changes that were flushed and not acted upon
1696 os_atomic_thread_fence(acquire
);
1697 pthread_priority_t pp
= _dispatch_queue_reset_override_priority(dq
,
1698 _dispatch_queue_is_thread_bound(dq
));
1699 return dx_wakeup(dq
, pp
, DISPATCH_WAKEUP_CONSUME
);
1701 return _dispatch_release_tailcall(dq
);
1704 if (slowpath(_dq_state_is_inactive(dq_state
))) {
1705 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an inactive object");
1707 DISPATCH_CLIENT_CRASH(dq
, "Over-resume of an object");
1711 dispatch_queue_get_label(dispatch_queue_t dq
)
1713 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
1714 dq
= _dispatch_get_current_queue();
1716 return dq
->dq_label
? dq
->dq_label
: "";
1720 dispatch_queue_get_qos_class(dispatch_queue_t dq
, int *relative_priority_ptr
)
1722 qos_class_t qos
= _DISPATCH_QOS_CLASS_UNSPECIFIED
;
1723 int relative_priority
= 0;
1724 #if HAVE_PTHREAD_WORKQUEUE_QOS
1725 pthread_priority_t dqp
= dq
->dq_priority
;
1726 if (dqp
& _PTHREAD_PRIORITY_INHERIT_FLAG
) dqp
= 0;
1727 qos
= _pthread_qos_class_decode(dqp
, &relative_priority
, NULL
);
1731 if (relative_priority_ptr
) *relative_priority_ptr
= relative_priority
;
1736 _dispatch_queue_set_width2(void *ctxt
)
1738 int w
= (int)(intptr_t)ctxt
; // intentional truncation
1740 dispatch_queue_t dq
= _dispatch_queue_get_current();
1743 tmp
= (unsigned int)w
;
1748 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
1749 tmp
= dispatch_hw_config(physical_cpus
);
1751 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
1752 tmp
= dispatch_hw_config(active_cpus
);
1756 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
1757 tmp
= dispatch_hw_config(logical_cpus
);
1760 if (tmp
> DISPATCH_QUEUE_WIDTH_MAX
) {
1761 tmp
= DISPATCH_QUEUE_WIDTH_MAX
;
1764 dispatch_queue_flags_t old_dqf
, new_dqf
;
1765 os_atomic_rmw_loop2o(dq
, dq_atomic_flags
, old_dqf
, new_dqf
, relaxed
, {
1766 new_dqf
= old_dqf
& ~DQF_WIDTH_MASK
;
1767 new_dqf
|= (tmp
<< DQF_WIDTH_SHIFT
);
1769 _dispatch_object_debug(dq
, "%s", __func__
);
1773 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
1775 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
1776 slowpath(dx_hastypeflag(dq
, QUEUE_ROOT
))) {
1780 unsigned long type
= dx_type(dq
);
1782 case DISPATCH_QUEUE_LEGACY_TYPE
:
1783 case DISPATCH_QUEUE_CONCURRENT_TYPE
:
1785 case DISPATCH_QUEUE_SERIAL_TYPE
:
1786 DISPATCH_CLIENT_CRASH(type
, "Cannot set width of a serial queue");
1788 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1791 _dispatch_barrier_trysync_or_async_f(dq
, (void*)(intptr_t)width
,
1792 _dispatch_queue_set_width2
);
1796 _dispatch_queue_legacy_set_target_queue(void *ctxt
)
1798 dispatch_queue_t dq
= _dispatch_queue_get_current();
1799 dispatch_queue_t tq
= ctxt
;
1800 dispatch_queue_t otq
= dq
->do_targetq
;
1802 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1803 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget
, dq
, otq
, tq
);
1804 _dispatch_bug_deprecated("Changing the target of a queue "
1805 "already targeted by other dispatch objects");
1808 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1809 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
1810 #if HAVE_PTHREAD_WORKQUEUE_QOS
1811 // see _dispatch_queue_class_wakeup()
1812 _dispatch_queue_sidelock_lock(dq
);
1814 dq
->do_targetq
= tq
;
1815 #if HAVE_PTHREAD_WORKQUEUE_QOS
1816 // see _dispatch_queue_class_wakeup()
1817 _dispatch_queue_sidelock_unlock(dq
);
1820 _dispatch_object_debug(dq
, "%s", __func__
);
1821 _dispatch_introspection_target_queue_changed(dq
);
1822 _dispatch_release_tailcall(otq
);
1826 _dispatch_queue_set_target_queue(dispatch_queue_t dq
, dispatch_queue_t tq
)
1828 dispatch_assert(dq
->do_ref_cnt
!= DISPATCH_OBJECT_GLOBAL_REFCNT
&&
1831 if (slowpath(!tq
)) {
1832 bool is_concurrent_q
= (dq
->dq_width
> 1);
1833 tq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1837 if (_dispatch_queue_try_inactive_suspend(dq
)) {
1838 _dispatch_object_set_target_queue_inline(dq
, tq
);
1839 return dx_vtable(dq
)->do_resume(dq
, false);
1842 if (dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
) {
1843 DISPATCH_CLIENT_CRASH(dq
, "Cannot change the target of a queue or "
1844 "source with an accounting override voucher "
1845 "after it has been activated");
1848 unsigned long type
= dx_type(dq
);
1850 case DISPATCH_QUEUE_LEGACY_TYPE
:
1851 if (_dispatch_queue_atomic_flags(dq
) & DQF_TARGETED
) {
1852 _dispatch_bug_deprecated("Changing the target of a queue "
1853 "already targeted by other dispatch objects");
1856 case DISPATCH_SOURCE_KEVENT_TYPE
:
1857 case DISPATCH_MACH_CHANNEL_TYPE
:
1858 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget
, dq
);
1859 _dispatch_bug_deprecated("Changing the target of a source "
1860 "after it has been activated");
1863 case DISPATCH_QUEUE_SERIAL_TYPE
:
1864 case DISPATCH_QUEUE_CONCURRENT_TYPE
:
1865 DISPATCH_CLIENT_CRASH(type
, "Cannot change the target of this queue "
1866 "after it has been activated");
1868 DISPATCH_CLIENT_CRASH(type
, "Unexpected dispatch object type");
1871 _dispatch_retain(tq
);
1872 return _dispatch_barrier_trysync_or_async_f(dq
, tq
,
1873 _dispatch_queue_legacy_set_target_queue
);
1877 #pragma mark dispatch_mgr_queue
1879 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1880 static struct dispatch_pthread_root_queue_context_s
1881 _dispatch_mgr_root_queue_pthread_context
;
1882 static struct dispatch_root_queue_context_s
1883 _dispatch_mgr_root_queue_context
= {{{
1884 #if HAVE_PTHREAD_WORKQUEUES
1885 .dgq_kworkqueue
= (void*)(~0ul),
1887 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
1888 .dgq_thread_pool_size
= 1,
1891 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
1892 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root
),
1893 .dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
,
1894 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
1895 .dq_label
= "com.apple.root.libdispatch-manager",
1896 .dq_width
= DISPATCH_QUEUE_WIDTH_POOL
,
1897 .dq_override
= DISPATCH_SATURATED_OVERRIDE
,
1898 .dq_override_voucher
= DISPATCH_NO_VOUCHER
,
1901 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1903 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1906 volatile qos_class_t qos
;
1910 } _dispatch_mgr_sched
;
1912 static dispatch_once_t _dispatch_mgr_sched_pred
;
1914 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1916 #if HAVE_PTHREAD_WORKQUEUE_QOS
1917 // Must be kept in sync with list of qos classes in sys/qos.h
1918 static const int _dispatch_mgr_sched_qos2prio
[] = {
1919 [_DISPATCH_QOS_CLASS_MAINTENANCE
] = 4,
1920 [_DISPATCH_QOS_CLASS_BACKGROUND
] = 4,
1921 [_DISPATCH_QOS_CLASS_UTILITY
] = 20,
1922 [_DISPATCH_QOS_CLASS_DEFAULT
] = 31,
1923 [_DISPATCH_QOS_CLASS_USER_INITIATED
] = 37,
1924 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE
] = 47,
1926 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
1929 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
1931 struct sched_param param
;
1932 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1933 pthread_attr_t
*attr
;
1934 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1936 pthread_attr_t a
, *attr
= &a
;
1938 (void)dispatch_assume_zero(pthread_attr_init(attr
));
1939 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
1940 &_dispatch_mgr_sched
.policy
));
1941 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1942 #if HAVE_PTHREAD_WORKQUEUE_QOS
1943 qos_class_t qos
= qos_class_main();
1944 if (qos
== _DISPATCH_QOS_CLASS_DEFAULT
) {
1945 qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
; // rdar://problem/17279292
1948 _dispatch_mgr_sched
.qos
= qos
;
1949 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
1952 _dispatch_mgr_sched
.default_prio
= param
.sched_priority
;
1953 _dispatch_mgr_sched
.prio
= _dispatch_mgr_sched
.default_prio
;
1955 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1957 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1960 _dispatch_mgr_root_queue_init(void)
1962 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
1963 struct sched_param param
;
1964 pthread_attr_t
*attr
;
1965 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1966 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
1967 PTHREAD_CREATE_DETACHED
));
1969 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
1971 #if HAVE_PTHREAD_WORKQUEUE_QOS
1972 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
1974 if (_dispatch_set_qos_class_enabled
) {
1975 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr
,
1978 _dispatch_mgr_q
.dq_priority
=
1979 (dispatch_priority_t
)_pthread_qos_class_encode(qos
, 0, 0);
1982 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1983 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1984 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
1986 return &_dispatch_mgr_sched
.tid
;
1990 _dispatch_mgr_priority_apply(void)
1992 struct sched_param param
;
1994 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1995 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1996 (void)dispatch_assume_zero(pthread_setschedparam(
1997 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
,
2000 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
2005 _dispatch_mgr_priority_init(void)
2007 struct sched_param param
;
2008 pthread_attr_t
*attr
;
2009 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
2010 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2011 #if HAVE_PTHREAD_WORKQUEUE_QOS
2012 qos_class_t qos
= 0;
2013 (void)pthread_attr_get_qos_class_np(attr
, &qos
, NULL
);
2014 if (_dispatch_mgr_sched
.qos
> qos
&& _dispatch_set_qos_class_enabled
) {
2015 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched
.qos
, 0);
2016 int p
= _dispatch_mgr_sched_qos2prio
[_dispatch_mgr_sched
.qos
];
2017 if (p
> param
.sched_priority
) {
2018 param
.sched_priority
= p
;
2022 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
2023 return _dispatch_mgr_priority_apply();
2026 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2028 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2031 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
2033 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2034 struct sched_param param
;
2035 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
2036 #if HAVE_PTHREAD_WORKQUEUE_QOS
2037 qos_class_t q
, qos
= 0;
2038 (void)pthread_attr_get_qos_class_np((pthread_attr_t
*)attr
, &qos
, NULL
);
2040 param
.sched_priority
= _dispatch_mgr_sched_qos2prio
[qos
];
2041 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, qos
, q
, qos
, relaxed
, {
2042 if (q
>= qos
) os_atomic_rmw_loop_give_up(break);
2046 int p
, prio
= param
.sched_priority
;
2047 os_atomic_rmw_loop2o(&_dispatch_mgr_sched
, prio
, p
, prio
, relaxed
, {
2048 if (p
>= prio
) os_atomic_rmw_loop_give_up(return);
2050 #if DISPATCH_USE_KEVENT_WORKQUEUE
2051 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
2052 _dispatch_root_queues_init_once
);
2053 if (_dispatch_kevent_workqueue_enabled
) {
2054 pthread_priority_t pp
= 0;
2055 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2056 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2057 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2058 // problematic in this case, since it the second one is only ever
2059 // used on dq_priority fields.
2060 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2061 // it is meaningful to libdispatch only.
2062 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2064 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2067 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2068 (void)dispatch_assume_zero(r
);
2073 #if DISPATCH_USE_MGR_THREAD
2074 if (_dispatch_mgr_sched
.tid
) {
2075 return _dispatch_mgr_priority_apply();
2079 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2081 #if DISPATCH_USE_KEVENT_WORKQUEUE
2083 _dispatch_kevent_workqueue_init(void)
2085 // Initialize kevent workqueue support
2086 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
2087 _dispatch_root_queues_init_once
);
2088 if (!_dispatch_kevent_workqueue_enabled
) return;
2089 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
2090 qos_class_t qos
= _dispatch_mgr_sched
.qos
;
2091 int prio
= _dispatch_mgr_sched
.prio
;
2092 pthread_priority_t pp
= 0;
2094 pp
= _pthread_qos_class_encode(qos
, 0, 0);
2095 _dispatch_mgr_q
.dq_priority
= (dispatch_priority_t
)pp
;
2097 if (prio
> _dispatch_mgr_sched
.default_prio
) {
2098 pp
= (pthread_priority_t
)prio
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG
;
2101 int r
= _pthread_workqueue_set_event_manager_priority(pp
);
2102 (void)dispatch_assume_zero(r
);
2108 #pragma mark dispatch_pthread_root_queue
2110 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2111 static dispatch_queue_t
2112 _dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2113 const pthread_attr_t
*attr
, dispatch_block_t configure
,
2114 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
)
2116 dispatch_queue_t dq
;
2117 dispatch_root_queue_context_t qc
;
2118 dispatch_pthread_root_queue_context_t pqc
;
2119 dispatch_queue_flags_t dqf
= 0;
2121 uint8_t pool_size
= flags
& _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
?
2122 (uint8_t)(flags
& ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
) : 0;
2124 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
2125 dqs
= roundup(dqs
, _Alignof(struct dispatch_root_queue_context_s
));
2126 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
2127 sizeof(struct dispatch_root_queue_context_s
) +
2128 sizeof(struct dispatch_pthread_root_queue_context_s
));
2129 qc
= (void*)dq
+ dqs
;
2130 dispatch_assert((uintptr_t)qc
% _Alignof(typeof(*qc
)) == 0);
2131 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
2132 dispatch_assert((uintptr_t)pqc
% _Alignof(typeof(*pqc
)) == 0);
2134 const char *tmp
= _dispatch_strdup_if_mutable(label
);
2136 dqf
|= DQF_LABEL_NEEDS_FREE
;
2141 _dispatch_queue_init(dq
, dqf
, DISPATCH_QUEUE_WIDTH_POOL
, false);
2142 dq
->dq_label
= label
;
2143 dq
->dq_state
= DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
,
2144 dq
->dq_override
= DISPATCH_SATURATED_OVERRIDE
;
2146 dq
->do_targetq
= NULL
;
2148 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
2150 #if HAVE_PTHREAD_WORKQUEUES
2151 qc
->dgq_kworkqueue
= (void*)(~0ul);
2153 _dispatch_root_queue_init_pthread_pool(qc
, pool_size
, true);
2156 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
2157 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
2159 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
2161 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2162 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
2164 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
2166 if (observer_hooks
) {
2167 pqc
->dpq_observer_hooks
= *observer_hooks
;
2169 _dispatch_object_debug(dq
, "%s", __func__
);
2170 return _dispatch_introspection_queue_create(dq
);
2174 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
2175 const pthread_attr_t
*attr
, dispatch_block_t configure
)
2177 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2181 #if DISPATCH_IOHID_SPI
2183 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label
,
2184 unsigned long flags
, const pthread_attr_t
*attr
,
2185 dispatch_pthread_root_queue_observer_hooks_t observer_hooks
,
2186 dispatch_block_t configure
)
2188 if (!observer_hooks
->queue_will_execute
||
2189 !observer_hooks
->queue_did_execute
) {
2190 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2192 return _dispatch_pthread_root_queue_create(label
, flags
, attr
, configure
,
2198 dispatch_pthread_root_queue_copy_current(void)
2200 dispatch_queue_t dq
= _dispatch_queue_get_current();
2201 if (!dq
) return NULL
;
2202 while (slowpath(dq
->do_targetq
)) {
2203 dq
= dq
->do_targetq
;
2205 if (dx_type(dq
) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
||
2206 dq
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) {
2209 return (dispatch_queue_t
)_os_object_retain_with_resurrect(dq
->_as_os_obj
);
2212 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2215 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
)
2217 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
2218 DISPATCH_INTERNAL_CRASH(dq
, "Global root queue disposed");
2220 _dispatch_object_debug(dq
, "%s", __func__
);
2221 _dispatch_introspection_queue_dispose(dq
);
2222 #if DISPATCH_USE_PTHREAD_POOL
2223 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2224 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2226 pthread_attr_destroy(&pqc
->dpq_thread_attr
);
2227 _dispatch_semaphore_dispose(&pqc
->dpq_thread_mediator
);
2228 if (pqc
->dpq_thread_configure
) {
2229 Block_release(pqc
->dpq_thread_configure
);
2231 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
2234 if (dq
->dq_label
&& _dispatch_queue_label_needs_free(dq
)) {
2235 free((void*)dq
->dq_label
);
2237 _dispatch_queue_destroy(dq
);
2241 #pragma mark dispatch_queue_specific
2243 struct dispatch_queue_specific_queue_s
{
2244 DISPATCH_QUEUE_HEADER(queue_specific_queue
);
2245 TAILQ_HEAD(dispatch_queue_specific_head_s
,
2246 dispatch_queue_specific_s
) dqsq_contexts
;
2247 } DISPATCH_QUEUE_ALIGN
;
2249 struct dispatch_queue_specific_s
{
2250 const void *dqs_key
;
2252 dispatch_function_t dqs_destructor
;
2253 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
2255 DISPATCH_DECL(dispatch_queue_specific
);
2258 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
2260 dispatch_queue_specific_t dqs
, tmp
;
2262 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
2263 if (dqs
->dqs_destructor
) {
2264 dispatch_async_f(_dispatch_get_root_queue(
2265 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
2266 dqs
->dqs_destructor
);
2270 _dispatch_queue_destroy(dqsq
->_as_dq
);
2274 _dispatch_queue_init_specific(dispatch_queue_t dq
)
2276 dispatch_queue_specific_queue_t dqsq
;
2278 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
2279 sizeof(struct dispatch_queue_specific_queue_s
));
2280 _dispatch_queue_init(dqsq
->_as_dq
, DQF_NONE
,
2281 DISPATCH_QUEUE_WIDTH_MAX
, false);
2282 dqsq
->do_xref_cnt
= -1;
2283 dqsq
->do_targetq
= _dispatch_get_root_queue(
2284 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
2285 dqsq
->dq_label
= "queue-specific";
2286 TAILQ_INIT(&dqsq
->dqsq_contexts
);
2287 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
2288 dqsq
->_as_dq
, release
))) {
2289 _dispatch_release(dqsq
->_as_dq
);
2294 _dispatch_queue_set_specific(void *ctxt
)
2296 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
2297 dispatch_queue_specific_queue_t dqsq
=
2298 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2300 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2301 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
2302 // Destroy previous context for existing key
2303 if (dqs
->dqs_destructor
) {
2304 dispatch_async_f(_dispatch_get_root_queue(
2305 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
2306 dqs
->dqs_destructor
);
2308 if (dqsn
->dqs_ctxt
) {
2309 // Copy new context for existing key
2310 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
2311 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
2313 // Remove context storage for existing key
2314 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
2320 // Insert context storage for new key
2321 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
2326 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
2327 void *ctxt
, dispatch_function_t destructor
)
2329 if (slowpath(!key
)) {
2332 dispatch_queue_specific_t dqs
;
2334 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
2336 dqs
->dqs_ctxt
= ctxt
;
2337 dqs
->dqs_destructor
= destructor
;
2338 if (slowpath(!dq
->dq_specific_q
)) {
2339 _dispatch_queue_init_specific(dq
);
2341 _dispatch_barrier_trysync_or_async_f(dq
->dq_specific_q
, dqs
,
2342 _dispatch_queue_set_specific
);
2346 _dispatch_queue_get_specific(void *ctxt
)
2348 void **ctxtp
= ctxt
;
2350 dispatch_queue_specific_queue_t dqsq
=
2351 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
2352 dispatch_queue_specific_t dqs
;
2354 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
2355 if (dqs
->dqs_key
== key
) {
2356 *ctxtp
= dqs
->dqs_ctxt
;
2365 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
2367 if (slowpath(!key
)) {
2372 if (fastpath(dq
->dq_specific_q
)) {
2374 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
2381 dispatch_get_specific(const void *key
)
2383 if (slowpath(!key
)) {
2387 dispatch_queue_t dq
= _dispatch_queue_get_current();
2389 while (slowpath(dq
)) {
2390 if (slowpath(dq
->dq_specific_q
)) {
2392 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
2393 _dispatch_queue_get_specific
);
2396 dq
= dq
->do_targetq
;
2401 #if DISPATCH_IOHID_SPI
2403 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2404 dispatch_queue_t dq
) // rdar://problem/18033810
2406 if (dq
->dq_width
!= 1) {
2407 DISPATCH_CLIENT_CRASH(dq
->dq_width
, "Invalid queue type");
2409 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2410 return _dq_state_drain_locked_by(dq_state
, _dispatch_tid_self());
2415 #pragma mark dispatch_queue_debug
2418 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2421 dispatch_queue_t target
= dq
->do_targetq
;
2422 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
2424 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
,
2425 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2426 target
&& target
->dq_label
? target
->dq_label
: "", target
,
2427 dq
->dq_width
, (unsigned long long)dq_state
);
2428 if (_dq_state_is_suspended(dq_state
)) {
2429 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", suspended = %d",
2430 _dq_state_suspend_cnt(dq_state
));
2432 if (_dq_state_is_inactive(dq_state
)) {
2433 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", inactive");
2434 } else if (_dq_state_needs_activation(dq_state
)) {
2435 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", needs-activation");
2437 if (_dq_state_is_enqueued(dq_state
)) {
2438 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", enqueued");
2440 if (_dq_state_is_dirty(dq_state
)) {
2441 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", dirty");
2443 if (_dq_state_has_override(dq_state
)) {
2444 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", async-override");
2446 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
2447 if (!_dispatch_queue_is_thread_bound(dq
) && owner
) {
2448 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", draining on 0x%x",
2451 if (_dq_state_is_in_barrier(dq_state
)) {
2452 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-barrier");
2454 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", in-flight = %d",
2455 _dq_state_used_width(dq_state
, dq
->dq_width
));
2457 if (_dq_state_has_pending_barrier(dq_state
)) {
2458 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", pending-barrier");
2460 if (_dispatch_queue_is_thread_bound(dq
)) {
2461 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", thread = 0x%x ",
2468 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
2471 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2472 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
2473 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2474 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
2475 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2481 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
2483 _dispatch_object_debug(dq
, "%s", str
);
2485 _dispatch_log("queue[NULL]: %s", str
);
2490 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
2491 static OSSpinLock _dispatch_stats_lock
;
2493 uint64_t time_total
;
2494 uint64_t count_total
;
2495 uint64_t thread_total
;
2496 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
2499 _dispatch_queue_merge_stats(uint64_t start
)
2501 uint64_t delta
= _dispatch_absolute_time() - start
;
2502 unsigned long count
;
2504 count
= (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key
);
2505 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
2507 int bucket
= flsl((long)count
);
2509 // 64-bit counters on 32-bit require a lock or a queue
2510 OSSpinLockLock(&_dispatch_stats_lock
);
2512 _dispatch_stats
[bucket
].time_total
+= delta
;
2513 _dispatch_stats
[bucket
].count_total
+= count
;
2514 _dispatch_stats
[bucket
].thread_total
++;
2516 OSSpinLockUnlock(&_dispatch_stats_lock
);
2521 #pragma mark _dispatch_set_priority_and_mach_voucher
2522 #if HAVE_PTHREAD_WORKQUEUE_QOS
2526 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp
,
2529 _pthread_set_flags_t pflags
= 0;
2530 if (pp
&& _dispatch_set_qos_class_enabled
) {
2531 pthread_priority_t old_pri
= _dispatch_get_priority();
2532 if (pp
!= old_pri
) {
2533 if (old_pri
& _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG
) {
2534 pflags
|= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND
;
2535 // when we unbind, overcomitness can flip, so we need to learn
2536 // it from the defaultpri, see _dispatch_priority_compute_update
2537 pp
|= (_dispatch_get_defaultpriority() &
2538 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
2540 // else we need to keep the one that is set in the current pri
2541 pp
|= (old_pri
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
);
2543 if (likely(old_pri
& ~_PTHREAD_PRIORITY_FLAGS_MASK
)) {
2544 pflags
|= _PTHREAD_SET_SELF_QOS_FLAG
;
2546 if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q
) ==
2547 _dispatch_tid_self())) {
2548 DISPATCH_INTERNAL_CRASH(pp
,
2549 "Changing the QoS while on the manager queue");
2551 if (unlikely(pp
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
)) {
2552 DISPATCH_INTERNAL_CRASH(pp
, "Cannot raise oneself to manager");
2554 if (old_pri
& _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG
) {
2555 DISPATCH_INTERNAL_CRASH(old_pri
,
2556 "Cannot turn a manager thread into a normal one");
2560 if (kv
!= VOUCHER_NO_MACH_VOUCHER
) {
2561 #if VOUCHER_USE_MACH_VOUCHER
2562 pflags
|= _PTHREAD_SET_SELF_VOUCHER_FLAG
;
2565 if (!pflags
) return;
2566 int r
= _pthread_set_properties_self(pflags
, pp
, kv
);
2568 DISPATCH_INTERNAL_CRASH(pp
, "_pthread_set_properties_self failed");
2570 (void)dispatch_assume_zero(r
);
2575 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority
,
2576 voucher_t v
, _dispatch_thread_set_self_t flags
)
2578 voucher_t ov
= DISPATCH_NO_VOUCHER
;
2579 mach_voucher_t kv
= VOUCHER_NO_MACH_VOUCHER
;
2580 if (v
!= DISPATCH_NO_VOUCHER
) {
2581 bool retained
= flags
& DISPATCH_VOUCHER_CONSUME
;
2582 ov
= _voucher_get();
2583 if (ov
== v
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2584 if (retained
&& v
) _voucher_release_no_dispose(v
);
2585 ov
= DISPATCH_NO_VOUCHER
;
2587 if (!retained
&& v
) _voucher_retain(v
);
2588 kv
= _voucher_swap_and_get_mach_voucher(ov
, v
);
2591 #if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
2592 flags
&= ~(_dispatch_thread_set_self_t
)DISPATCH_THREAD_PARK
;
2594 if (!(flags
& DISPATCH_THREAD_PARK
)) {
2595 _dispatch_set_priority_and_mach_voucher_slow(priority
, kv
);
2597 if (ov
!= DISPATCH_NO_VOUCHER
&& (flags
& DISPATCH_VOUCHER_REPLACE
)) {
2598 if (ov
) _voucher_release(ov
);
2599 ov
= DISPATCH_NO_VOUCHER
;
2605 #pragma mark dispatch_continuation_t
2608 _dispatch_force_cache_cleanup(void)
2610 dispatch_continuation_t dc
;
2611 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2613 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
2614 _dispatch_cache_cleanup(dc
);
2620 _dispatch_cache_cleanup(void *value
)
2622 dispatch_continuation_t dc
, next_dc
= value
;
2624 while ((dc
= next_dc
)) {
2625 next_dc
= dc
->do_next
;
2626 _dispatch_continuation_free_to_heap(dc
);
2630 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2633 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
2635 _dispatch_continuation_free_to_heap(dc
);
2636 dispatch_continuation_t next_dc
;
2637 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
2639 if (!dc
|| (cnt
= dc
->dc_cache_cnt
-
2640 _dispatch_continuation_cache_limit
) <= 0){
2644 next_dc
= dc
->do_next
;
2645 _dispatch_continuation_free_to_heap(dc
);
2646 } while (--cnt
&& (dc
= next_dc
));
2647 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
2651 DISPATCH_ALWAYS_INLINE_NDEBUG
2653 _dispatch_continuation_slow_item_signal(dispatch_queue_t dq
,
2654 dispatch_object_t dou
)
2656 dispatch_continuation_t dc
= dou
._dc
;
2657 pthread_priority_t pp
= dq
->dq_override
;
2659 _dispatch_trace_continuation_pop(dq
, dc
);
2660 if (pp
> (dc
->dc_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
2661 _dispatch_wqthread_override_start((mach_port_t
)dc
->dc_data
, pp
);
2663 _dispatch_thread_event_signal((dispatch_thread_event_t
)dc
->dc_other
);
2664 _dispatch_introspection_queue_item_complete(dc
);
2669 _dispatch_continuation_push(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2671 _dispatch_queue_push(dq
, dc
,
2672 _dispatch_continuation_get_override_priority(dq
, dc
));
2677 _dispatch_continuation_push_sync_slow(dispatch_queue_t dq
,
2678 dispatch_continuation_t dc
)
2680 _dispatch_queue_push_inline(dq
, dc
,
2681 _dispatch_continuation_get_override_priority(dq
, dc
),
2682 DISPATCH_WAKEUP_SLOW_WAITER
);
2685 DISPATCH_ALWAYS_INLINE
2687 _dispatch_continuation_async2(dispatch_queue_t dq
, dispatch_continuation_t dc
,
2690 if (fastpath(barrier
|| !DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
2691 return _dispatch_continuation_push(dq
, dc
);
2693 return _dispatch_async_f2(dq
, dc
);
2698 _dispatch_continuation_async(dispatch_queue_t dq
, dispatch_continuation_t dc
)
2700 _dispatch_continuation_async2(dq
, dc
,
2701 dc
->dc_flags
& DISPATCH_OBJ_BARRIER_BIT
);
2705 #pragma mark dispatch_block_create
2709 DISPATCH_ALWAYS_INLINE
2711 _dispatch_block_flags_valid(dispatch_block_flags_t flags
)
2713 return ((flags
& ~DISPATCH_BLOCK_API_MASK
) == 0);
2716 DISPATCH_ALWAYS_INLINE
2717 static inline dispatch_block_flags_t
2718 _dispatch_block_normalize_flags(dispatch_block_flags_t flags
)
2720 if (flags
& (DISPATCH_BLOCK_NO_VOUCHER
|DISPATCH_BLOCK_DETACHED
)) {
2721 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2723 if (flags
& (DISPATCH_BLOCK_NO_QOS_CLASS
|DISPATCH_BLOCK_DETACHED
)) {
2724 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2729 static inline dispatch_block_t
2730 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags
,
2731 voucher_t voucher
, pthread_priority_t pri
, dispatch_block_t block
)
2733 flags
= _dispatch_block_normalize_flags(flags
);
2734 bool assign
= (flags
& DISPATCH_BLOCK_ASSIGN_CURRENT
);
2736 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_VOUCHER
)) {
2737 voucher
= VOUCHER_CURRENT
;
2738 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2740 if (voucher
== VOUCHER_CURRENT
) {
2741 voucher
= _voucher_get();
2743 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_PRIORITY
)) {
2744 pri
= _dispatch_priority_propagate();
2745 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2747 dispatch_block_t db
= _dispatch_block_create(flags
, voucher
, pri
, block
);
2749 dispatch_assert(_dispatch_block_get_data(db
));
2755 dispatch_block_create(dispatch_block_flags_t flags
, dispatch_block_t block
)
2757 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2758 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
, 0,
2763 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags
,
2764 dispatch_qos_class_t qos_class
, int relative_priority
,
2765 dispatch_block_t block
)
2767 if (!_dispatch_block_flags_valid(flags
) ||
2768 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2769 return DISPATCH_BAD_INPUT
;
2771 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
2772 pthread_priority_t pri
= 0;
2773 #if HAVE_PTHREAD_WORKQUEUE_QOS
2774 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2776 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
,
2781 dispatch_block_create_with_voucher(dispatch_block_flags_t flags
,
2782 voucher_t voucher
, dispatch_block_t block
)
2784 if (!_dispatch_block_flags_valid(flags
)) return DISPATCH_BAD_INPUT
;
2785 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
2786 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
, 0,
2791 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags
,
2792 voucher_t voucher
, dispatch_qos_class_t qos_class
,
2793 int relative_priority
, dispatch_block_t block
)
2795 if (!_dispatch_block_flags_valid(flags
) ||
2796 !_dispatch_qos_class_valid(qos_class
, relative_priority
)) {
2797 return DISPATCH_BAD_INPUT
;
2799 flags
|= (DISPATCH_BLOCK_HAS_VOUCHER
|DISPATCH_BLOCK_HAS_PRIORITY
);
2800 pthread_priority_t pri
= 0;
2801 #if HAVE_PTHREAD_WORKQUEUE_QOS
2802 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
2804 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
,
2809 dispatch_block_perform(dispatch_block_flags_t flags
, dispatch_block_t block
)
2811 if (!_dispatch_block_flags_valid(flags
)) {
2812 DISPATCH_CLIENT_CRASH(flags
, "Invalid flags passed to "
2813 "dispatch_block_perform()");
2815 flags
= _dispatch_block_normalize_flags(flags
);
2816 struct dispatch_block_private_data_s dbpds
=
2817 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags
, block
);
2818 return _dispatch_block_invoke_direct(&dbpds
);
2821 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2824 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s
*dbcpd
)
2826 dispatch_block_private_data_t dbpd
= (dispatch_block_private_data_t
)dbcpd
;
2827 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2828 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2829 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2830 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2831 "run more than once and waited for");
2833 if (atomic_flags
& DBF_CANCELED
) goto out
;
2835 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2836 _dispatch_thread_set_self_t adopt_flags
= 0;
2837 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2838 op
= _dispatch_get_priority();
2839 p
= dbpd
->dbpd_priority
;
2840 if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
2841 adopt_flags
|= DISPATCH_PRIORITY_ENFORCE
;
2844 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2845 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2846 v
= dbpd
->dbpd_voucher
;
2848 ov
= _dispatch_adopt_priority_and_set_voucher(p
, v
, adopt_flags
);
2849 dbpd
->dbpd_thread
= _dispatch_tid_self();
2850 _dispatch_client_callout(dbpd
->dbpd_block
,
2851 _dispatch_Block_invoke(dbpd
->dbpd_block
));
2852 _dispatch_reset_priority_and_voucher(op
, ov
);
2854 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2855 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2856 dispatch_group_leave(_dbpd_group(dbpd
));
2862 _dispatch_block_sync_invoke(void *block
)
2864 dispatch_block_t b
= block
;
2865 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2866 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2867 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2868 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2869 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2870 "run more than once and waited for");
2872 if (atomic_flags
& DBF_CANCELED
) goto out
;
2874 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2875 _dispatch_thread_set_self_t adopt_flags
= 0;
2876 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2877 op
= _dispatch_get_priority();
2878 p
= dbpd
->dbpd_priority
;
2879 if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
2880 adopt_flags
|= DISPATCH_PRIORITY_ENFORCE
;
2883 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2884 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2885 v
= dbpd
->dbpd_voucher
;
2887 ov
= _dispatch_adopt_priority_and_set_voucher(p
, v
, adopt_flags
);
2889 _dispatch_reset_priority_and_voucher(op
, ov
);
2891 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2892 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2893 dispatch_group_leave(_dbpd_group(dbpd
));
2898 oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2900 // balances dispatch_{,barrier_,}sync
2901 _os_object_release_internal(oq
->_as_os_obj
);
2905 DISPATCH_ALWAYS_INLINE
2907 _dispatch_block_async_invoke2(dispatch_block_t b
, bool release
)
2909 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2910 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2911 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2912 DISPATCH_CLIENT_CRASH(atomic_flags
, "A block object may not be both "
2913 "run more than once and waited for");
2915 if (!slowpath(atomic_flags
& DBF_CANCELED
)) {
2918 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2919 if (os_atomic_inc2o(dbpd
, dbpd_performed
, relaxed
) == 1) {
2920 dispatch_group_leave(_dbpd_group(dbpd
));
2924 oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2926 // balances dispatch_{,barrier_,group_}async
2927 _os_object_release_internal_inline(oq
->_as_os_obj
);
2935 _dispatch_block_async_invoke(void *block
)
2937 _dispatch_block_async_invoke2(block
, false);
2941 _dispatch_block_async_invoke_and_release(void *block
)
2943 _dispatch_block_async_invoke2(block
, true);
2947 dispatch_block_cancel(dispatch_block_t db
)
2949 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2951 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2952 "dispatch_block_cancel()");
2954 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
, DBF_CANCELED
, relaxed
);
2958 dispatch_block_testcancel(dispatch_block_t db
)
2960 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2962 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2963 "dispatch_block_testcancel()");
2965 return (bool)(dbpd
->dbpd_atomic_flags
& DBF_CANCELED
);
2969 dispatch_block_wait(dispatch_block_t db
, dispatch_time_t timeout
)
2971 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2973 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
2974 "dispatch_block_wait()");
2977 unsigned int flags
= os_atomic_or_orig2o(dbpd
, dbpd_atomic_flags
,
2978 DBF_WAITING
, relaxed
);
2979 if (slowpath(flags
& (DBF_WAITED
| DBF_WAITING
))) {
2980 DISPATCH_CLIENT_CRASH(flags
, "A block object may not be waited for "
2984 // <rdar://problem/17703192> If we know the queue where this block is
2985 // enqueued, or the thread that's executing it, then we should boost
2988 pthread_priority_t pp
= _dispatch_get_priority();
2990 os_mpsc_queue_t boost_oq
;
2991 boost_oq
= os_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, relaxed
);
2993 // release balances dispatch_{,barrier_,group_}async.
2994 // Can't put the queue back in the timeout case: the block might
2995 // finish after we fell out of group_wait and see our NULL, so
2996 // neither of us would ever release. Side effect: After a _wait
2997 // that times out, subsequent waits will not boost the qos of the
2998 // still-running block.
2999 dx_wakeup(boost_oq
, pp
, DISPATCH_WAKEUP_OVERRIDING
|
3000 DISPATCH_WAKEUP_CONSUME
);
3003 mach_port_t boost_th
= dbpd
->dbpd_thread
;
3005 _dispatch_thread_override_start(boost_th
, pp
, dbpd
);
3008 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
3009 if (slowpath(performed
> 1 || (boost_th
&& boost_oq
))) {
3010 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3011 "run more than once and waited for");
3014 long ret
= dispatch_group_wait(_dbpd_group(dbpd
), timeout
);
3017 _dispatch_thread_override_end(boost_th
, dbpd
);
3021 // timed out: reverse our changes
3022 (void)os_atomic_and2o(dbpd
, dbpd_atomic_flags
,
3023 ~DBF_WAITING
, relaxed
);
3025 (void)os_atomic_or2o(dbpd
, dbpd_atomic_flags
,
3026 DBF_WAITED
, relaxed
);
3027 // don't need to re-test here: the second call would see
3028 // the first call's WAITING
3035 dispatch_block_notify(dispatch_block_t db
, dispatch_queue_t queue
,
3036 dispatch_block_t notification_block
)
3038 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
3040 DISPATCH_CLIENT_CRASH(db
, "Invalid block object passed to "
3041 "dispatch_block_notify()");
3043 int performed
= os_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
3044 if (slowpath(performed
> 1)) {
3045 DISPATCH_CLIENT_CRASH(performed
, "A block object may not be both "
3046 "run more than once and observed");
3049 return dispatch_group_notify(_dbpd_group(dbpd
), queue
, notification_block
);
3054 _dispatch_continuation_init_slow(dispatch_continuation_t dc
,
3055 dispatch_queue_class_t dqu
, dispatch_block_flags_t flags
)
3057 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(dc
->dc_ctxt
);
3058 dispatch_block_flags_t block_flags
= dbpd
->dbpd_flags
;
3059 uintptr_t dc_flags
= dc
->dc_flags
;
3060 os_mpsc_queue_t oq
= dqu
._oq
;
3062 // balanced in d_block_async_invoke_and_release or d_block_wait
3063 if (os_atomic_cmpxchg2o(dbpd
, dbpd_queue
, NULL
, oq
, relaxed
)) {
3064 _os_object_retain_internal_inline(oq
->_as_os_obj
);
3067 if (dc_flags
& DISPATCH_OBJ_CONSUME_BIT
) {
3068 dc
->dc_func
= _dispatch_block_async_invoke_and_release
;
3070 dc
->dc_func
= _dispatch_block_async_invoke
;
3073 flags
|= block_flags
;
3074 if (block_flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3075 _dispatch_continuation_priority_set(dc
, dbpd
->dbpd_priority
, flags
);
3077 _dispatch_continuation_priority_set(dc
, dc
->dc_priority
, flags
);
3079 if (block_flags
& DISPATCH_BLOCK_BARRIER
) {
3080 dc_flags
|= DISPATCH_OBJ_BARRIER_BIT
;
3082 if (block_flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
3083 voucher_t v
= dbpd
->dbpd_voucher
;
3084 dc
->dc_voucher
= v
? _voucher_retain(v
) : NULL
;
3085 dc_flags
|= DISPATCH_OBJ_ENFORCE_VOUCHER
;
3086 _dispatch_voucher_debug("continuation[%p] set", dc
->dc_voucher
, dc
);
3087 _dispatch_voucher_ktrace_dc_push(dc
);
3089 _dispatch_continuation_voucher_set(dc
, oq
, flags
);
3091 dc_flags
|= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
;
3092 dc
->dc_flags
= dc_flags
;
3096 _dispatch_continuation_update_bits(dispatch_continuation_t dc
,
3099 dc
->dc_flags
= dc_flags
;
3100 if (dc_flags
& DISPATCH_OBJ_CONSUME_BIT
) {
3101 if (dc_flags
& DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
) {
3102 dc
->dc_func
= _dispatch_block_async_invoke_and_release
;
3103 } else if (dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
3104 dc
->dc_func
= _dispatch_call_block_and_release
;
3107 if (dc_flags
& DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT
) {
3108 dc
->dc_func
= _dispatch_block_async_invoke
;
3109 } else if (dc_flags
& DISPATCH_OBJ_BLOCK_BIT
) {
3110 dc
->dc_func
= _dispatch_Block_invoke(dc
->dc_ctxt
);
3115 #endif // __BLOCKS__
3118 #pragma mark dispatch_barrier_async
3122 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
3123 dispatch_function_t func
, pthread_priority_t pp
,
3124 dispatch_block_flags_t flags
, uintptr_t dc_flags
)
3126 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
3127 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3128 _dispatch_continuation_async(dq
, dc
);
3131 DISPATCH_ALWAYS_INLINE
3133 _dispatch_barrier_async_f2(dispatch_queue_t dq
, void *ctxt
,
3134 dispatch_function_t func
, pthread_priority_t pp
,
3135 dispatch_block_flags_t flags
)
3137 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3138 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3140 if (!fastpath(dc
)) {
3141 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3144 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3145 _dispatch_continuation_push(dq
, dc
);
3150 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
3151 dispatch_function_t func
)
3153 _dispatch_barrier_async_f2(dq
, ctxt
, func
, 0, 0);
3158 _dispatch_barrier_async_detached_f(dispatch_queue_t dq
, void *ctxt
,
3159 dispatch_function_t func
)
3161 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3162 dc
->dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3165 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3166 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3167 _dispatch_queue_push(dq
, dc
, 0);
3172 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
3174 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3175 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_BARRIER_BIT
;
3177 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3178 _dispatch_continuation_push(dq
, dc
);
3183 #pragma mark dispatch_async
3186 _dispatch_async_redirect_invoke(dispatch_continuation_t dc
,
3187 dispatch_invoke_flags_t flags
)
3189 dispatch_thread_frame_s dtf
;
3190 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
3191 dispatch_invoke_flags_t ctxt_flags
= (dispatch_invoke_flags_t
)dc
->dc_ctxt
;
3192 // if we went through _dispatch_root_queue_push_override,
3193 // the "right" root queue was stuffed into dc_func
3194 dispatch_queue_t assumed_rq
= (dispatch_queue_t
)dc
->dc_func
;
3195 dispatch_queue_t dq
= dc
->dc_data
, rq
, old_dq
;
3196 struct _dispatch_identity_s di
;
3198 pthread_priority_t op
, dp
, old_dp
;
3201 flags
&= ~_DISPATCH_INVOKE_AUTORELEASE_MASK
;
3202 flags
|= ctxt_flags
;
3204 old_dq
= _dispatch_get_current_queue();
3206 _dispatch_queue_set_current(assumed_rq
);
3207 _dispatch_root_queue_identity_assume(&di
, 0);
3210 old_dp
= _dispatch_set_defaultpriority(dq
->dq_priority
, &dp
);
3211 op
= dq
->dq_override
;
3212 if (op
> (dp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
3213 _dispatch_wqthread_override_start(_dispatch_tid_self(), op
);
3214 // Ensure that the root queue sees that this thread was overridden.
3215 _dispatch_set_defaultpriority_override();
3218 _dispatch_thread_frame_push(&dtf
, dq
);
3219 _dispatch_continuation_pop_forwarded(dc
, DISPATCH_NO_VOUCHER
,
3220 DISPATCH_OBJ_CONSUME_BIT
, {
3221 _dispatch_continuation_pop(other_dc
, dq
, flags
);
3223 _dispatch_thread_frame_pop(&dtf
);
3225 _dispatch_root_queue_identity_restore(&di
);
3226 _dispatch_queue_set_current(old_dq
);
3228 _dispatch_reset_defaultpriority(old_dp
);
3230 rq
= dq
->do_targetq
;
3231 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
3232 _dispatch_non_barrier_complete(rq
);
3233 rq
= rq
->do_targetq
;
3236 _dispatch_non_barrier_complete(dq
);
3238 if (dtf
.dtf_deferred
) {
3239 struct dispatch_object_s
*dou
= dtf
.dtf_deferred
;
3240 return _dispatch_queue_drain_deferred_invoke(dq
, flags
, 0, dou
);
3243 _dispatch_release_tailcall(dq
);
3246 DISPATCH_ALWAYS_INLINE
3247 static inline dispatch_continuation_t
3248 _dispatch_async_redirect_wrap(dispatch_queue_t dq
, dispatch_object_t dou
)
3250 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3252 dou
._do
->do_next
= NULL
;
3253 dc
->do_vtable
= DC_VTABLE(ASYNC_REDIRECT
);
3255 dc
->dc_ctxt
= (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3257 dc
->dc_other
= dou
._do
;
3258 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
3259 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
3260 _dispatch_retain(dq
);
3266 _dispatch_async_f_redirect(dispatch_queue_t dq
,
3267 dispatch_object_t dou
, pthread_priority_t pp
)
3269 if (!slowpath(_dispatch_object_is_redirection(dou
))) {
3270 dou
._dc
= _dispatch_async_redirect_wrap(dq
, dou
);
3272 dq
= dq
->do_targetq
;
3274 // Find the queue to redirect to
3275 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
))) {
3276 if (!fastpath(_dispatch_queue_try_acquire_async(dq
))) {
3279 if (!dou
._dc
->dc_ctxt
) {
3280 // find first queue in descending target queue order that has
3281 // an autorelease frequency set, and use that as the frequency for
3282 // this continuation.
3283 dou
._dc
->dc_ctxt
= (void *)
3284 (uintptr_t)_dispatch_queue_autorelease_frequency(dq
);
3286 dq
= dq
->do_targetq
;
3289 _dispatch_queue_push(dq
, dou
, pp
);
3292 DISPATCH_ALWAYS_INLINE
3294 _dispatch_continuation_redirect(dispatch_queue_t dq
,
3295 struct dispatch_object_s
*dc
)
3297 _dispatch_trace_continuation_pop(dq
, dc
);
3298 // This is a re-redirect, overrides have already been applied
3299 // by _dispatch_async_f2.
3300 // However we want to end up on the root queue matching `dc` qos, so pick up
3301 // the current override of `dq` which includes dc's overrde (and maybe more)
3302 _dispatch_async_f_redirect(dq
, dc
, dq
->dq_override
);
3303 _dispatch_introspection_queue_item_complete(dc
);
3308 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
3310 // <rdar://problem/24738102&24743140> reserving non barrier width
3311 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3312 // equivalent), so we have to check that this thread hasn't enqueued
3313 // anything ahead of this call or we can break ordering
3314 if (slowpath(dq
->dq_items_tail
)) {
3315 return _dispatch_continuation_push(dq
, dc
);
3318 if (slowpath(!_dispatch_queue_try_acquire_async(dq
))) {
3319 return _dispatch_continuation_push(dq
, dc
);
3322 return _dispatch_async_f_redirect(dq
, dc
,
3323 _dispatch_continuation_get_override_priority(dq
, dc
));
3326 DISPATCH_ALWAYS_INLINE
3328 _dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3329 pthread_priority_t pp
, dispatch_block_flags_t flags
)
3331 dispatch_continuation_t dc
= _dispatch_continuation_alloc_cacheonly();
3332 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3334 if (!fastpath(dc
)) {
3335 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3338 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, pp
, flags
, dc_flags
);
3339 _dispatch_continuation_async2(dq
, dc
, false);
3344 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3346 _dispatch_async_f(dq
, ctxt
, func
, 0, 0);
3351 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq
, void *ctxt
,
3352 dispatch_function_t func
)
3354 _dispatch_async_f(dq
, ctxt
, func
, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS
);
3359 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
3361 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3362 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
3364 _dispatch_continuation_init(dc
, dq
, work
, 0, 0, dc_flags
);
3365 _dispatch_continuation_async(dq
, dc
);
3370 #pragma mark dispatch_group_async
3372 DISPATCH_ALWAYS_INLINE
3374 _dispatch_continuation_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3375 dispatch_continuation_t dc
)
3377 dispatch_group_enter(dg
);
3379 _dispatch_continuation_async(dq
, dc
);
3384 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
3385 dispatch_function_t func
)
3387 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3388 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3390 _dispatch_continuation_init_f(dc
, dq
, ctxt
, func
, 0, 0, dc_flags
);
3391 _dispatch_continuation_group_async(dg
, dq
, dc
);
3396 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
3397 dispatch_block_t db
)
3399 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3400 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
| DISPATCH_OBJ_GROUP_BIT
;
3402 _dispatch_continuation_init(dc
, dq
, db
, 0, 0, dc_flags
);
3403 _dispatch_continuation_group_async(dg
, dq
, dc
);
3408 #pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
3412 _dispatch_sync_function_invoke_slow(dispatch_queue_t dq
, void *ctxt
,
3413 dispatch_function_t func
)
3416 dispatch_thread_frame_s dtf
;
3417 _dispatch_thread_frame_push(&dtf
, dq
);
3418 ov
= _dispatch_set_priority_and_voucher(0, dq
->dq_override_voucher
, 0);
3419 _dispatch_client_callout(ctxt
, func
);
3420 _dispatch_perfmon_workitem_inc();
3421 _dispatch_reset_voucher(ov
, 0);
3422 _dispatch_thread_frame_pop(&dtf
);
3425 DISPATCH_ALWAYS_INLINE
3427 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3428 dispatch_function_t func
)
3430 if (slowpath(dq
->dq_override_voucher
!= DISPATCH_NO_VOUCHER
)) {
3431 return _dispatch_sync_function_invoke_slow(dq
, ctxt
, func
);
3433 dispatch_thread_frame_s dtf
;
3434 _dispatch_thread_frame_push(&dtf
, dq
);
3435 _dispatch_client_callout(ctxt
, func
);
3436 _dispatch_perfmon_workitem_inc();
3437 _dispatch_thread_frame_pop(&dtf
);
3442 _dispatch_sync_function_invoke(dispatch_queue_t dq
, void *ctxt
,
3443 dispatch_function_t func
)
3445 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3449 _dispatch_sync_recurse_invoke(void *ctxt
)
3451 dispatch_continuation_t dc
= ctxt
;
3452 _dispatch_sync_function_invoke(dc
->dc_data
, dc
->dc_ctxt
, dc
->dc_func
);
3455 DISPATCH_ALWAYS_INLINE
3457 _dispatch_sync_function_recurse(dispatch_queue_t dq
, void *ctxt
,
3458 dispatch_function_t func
, pthread_priority_t pp
)
3460 struct dispatch_continuation_s dc
= {
3465 _dispatch_sync_f(dq
->do_targetq
, &dc
, _dispatch_sync_recurse_invoke
, pp
);
3470 _dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
3471 dispatch_function_t func
)
3473 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3474 _dispatch_non_barrier_complete(dq
);
3479 _dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
3480 dispatch_function_t func
, pthread_priority_t pp
)
3482 _dispatch_sync_function_recurse(dq
, ctxt
, func
, pp
);
3483 _dispatch_non_barrier_complete(dq
);
3486 DISPATCH_ALWAYS_INLINE
3488 _dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3489 dispatch_function_t func
, pthread_priority_t pp
)
3491 _dispatch_introspection_non_barrier_sync_begin(dq
, func
);
3492 if (slowpath(dq
->do_targetq
->do_targetq
)) {
3493 return _dispatch_non_barrier_sync_f_recurse(dq
, ctxt
, func
, pp
);
3495 _dispatch_non_barrier_sync_f_invoke(dq
, ctxt
, func
);
3499 #pragma mark dispatch_barrier_sync
3503 _dispatch_barrier_complete(dispatch_queue_t dq
)
3505 uint64_t owned
= DISPATCH_QUEUE_IN_BARRIER
+
3506 dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3508 if (slowpath(dq
->dq_items_tail
)) {
3509 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3512 if (!fastpath(_dispatch_queue_drain_try_unlock(dq
, owned
))) {
3513 // someone enqueued a slow item at the head
3514 // looping may be its last chance
3515 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3521 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
3522 dispatch_function_t func
, pthread_priority_t pp
)
3524 _dispatch_sync_function_recurse(dq
, ctxt
, func
, pp
);
3525 _dispatch_barrier_complete(dq
);
3530 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
3531 dispatch_function_t func
)
3533 _dispatch_sync_function_invoke_inline(dq
, ctxt
, func
);
3534 _dispatch_barrier_complete(dq
);
3537 DISPATCH_ALWAYS_INLINE
3539 _dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq
, void *ctxt
,
3540 dispatch_function_t func
, pthread_priority_t pp
)
3542 _dispatch_introspection_barrier_sync_begin(dq
, func
);
3543 if (slowpath(dq
->do_targetq
->do_targetq
)) {
3544 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
, pp
);
3546 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
3549 typedef struct dispatch_barrier_sync_context_s
{
3550 struct dispatch_continuation_s dbsc_dc
;
3551 dispatch_thread_frame_s dbsc_dtf
;
3552 } *dispatch_barrier_sync_context_t
;
3555 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
3557 dispatch_barrier_sync_context_t dbsc
= ctxt
;
3558 dispatch_continuation_t dc
= &dbsc
->dbsc_dc
;
3559 dispatch_queue_t dq
= dc
->dc_data
;
3560 dispatch_thread_event_t event
= (dispatch_thread_event_t
)dc
->dc_other
;
3562 dispatch_assert(dq
== _dispatch_queue_get_current());
3563 #if DISPATCH_COCOA_COMPAT
3564 if (slowpath(_dispatch_queue_is_thread_bound(dq
))) {
3565 dispatch_assert(_dispatch_thread_frame_get_current() == NULL
);
3567 // the block runs on the thread the queue is bound to and not
3568 // on the calling thread, but we mean to see the calling thread
3569 // dispatch thread frames, so we fake the link, and then undo it
3570 _dispatch_thread_frame_set_current(&dbsc
->dbsc_dtf
);
3571 // The queue is bound to a non-dispatch thread (e.g. main thread)
3572 _dispatch_continuation_voucher_adopt(dc
, DISPATCH_NO_VOUCHER
,
3573 DISPATCH_OBJ_CONSUME_BIT
);
3574 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
3575 os_atomic_store2o(dc
, dc_func
, NULL
, release
);
3576 _dispatch_thread_frame_set_current(NULL
);
3579 _dispatch_thread_event_signal(event
); // release
3584 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
3585 dispatch_function_t func
, pthread_priority_t pp
)
3587 if (slowpath(!dq
->do_targetq
)) {
3588 // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
3589 return _dispatch_sync_function_invoke(dq
, ctxt
, func
);
3593 pp
= _dispatch_get_priority();
3594 pp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3595 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3597 dispatch_thread_event_s event
;
3598 _dispatch_thread_event_init(&event
);
3599 struct dispatch_barrier_sync_context_s dbsc
= {
3602 #if DISPATCH_COCOA_COMPAT
3609 #if DISPATCH_COCOA_COMPAT
3610 // It's preferred to execute synchronous blocks on the current thread
3611 // due to thread-local side effects, etc. However, blocks submitted
3612 // to the main thread MUST be run on the main thread
3613 if (slowpath(_dispatch_queue_is_thread_bound(dq
))) {
3614 // consumed by _dispatch_barrier_sync_f_slow_invoke
3615 // or in the DISPATCH_COCOA_COMPAT hunk below
3616 _dispatch_continuation_voucher_set(&dbsc
.dbsc_dc
, dq
, 0);
3617 // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
3618 _dispatch_thread_frame_save_state(&dbsc
.dbsc_dtf
);
3619 // thread bound queues cannot mutate their target queue hierarchy
3620 // so it's fine to look now
3621 _dispatch_introspection_barrier_sync_begin(dq
, func
);
3624 uint32_t th_self
= _dispatch_tid_self();
3625 struct dispatch_continuation_s dbss
= {
3626 .dc_flags
= DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
,
3627 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
3629 .dc_data
= (void*)(uintptr_t)th_self
,
3632 .dc_voucher
= DISPATCH_NO_VOUCHER
,
3635 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3636 if (unlikely(_dq_state_drain_locked_by(dq_state
, th_self
))) {
3637 DISPATCH_CLIENT_CRASH(dq
, "dispatch_barrier_sync called on queue "
3638 "already owned by current thread");
3641 _dispatch_continuation_push_sync_slow(dq
, &dbss
);
3642 _dispatch_thread_event_wait(&event
); // acquire
3643 _dispatch_thread_event_destroy(&event
);
3644 if (_dispatch_queue_received_override(dq
, pp
)) {
3645 // Ensure that the root queue sees that this thread was overridden.
3646 // pairs with the _dispatch_wqthread_override_start in
3647 // _dispatch_continuation_slow_item_signal
3648 _dispatch_set_defaultpriority_override();
3651 #if DISPATCH_COCOA_COMPAT
3652 // Queue bound to a non-dispatch thread
3653 if (dbsc
.dbsc_dc
.dc_func
== NULL
) {
3655 } else if (dbsc
.dbsc_dc
.dc_voucher
) {
3656 // this almost never happens, unless a dispatch_sync() onto a thread
3657 // bound queue went to the slow path at the same time dispatch_main()
3658 // is called, or the queue is detached from the runloop.
3659 _voucher_release(dbsc
.dbsc_dc
.dc_voucher
);
3663 _dispatch_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3666 DISPATCH_ALWAYS_INLINE
3668 _dispatch_barrier_sync_f2(dispatch_queue_t dq
, void *ctxt
,
3669 dispatch_function_t func
, pthread_priority_t pp
)
3671 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3672 // global concurrent queues and queues bound to non-dispatch threads
3673 // always fall into the slow case
3674 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, pp
);
3677 // TODO: the more correct thing to do would be to set dq_override to the qos
3678 // of the thread that just acquired the barrier lock here. Unwinding that
3679 // would slow down the uncontended fastpath however.
3681 // The chosen tradeoff is that if an enqueue on a lower priority thread
3682 // contends with this fastpath, this thread may receive a useless override.
3683 // Improving this requires the override level to be part of the atomic
3686 _dispatch_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3691 _dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
3692 dispatch_function_t func
, pthread_priority_t pp
)
3694 _dispatch_barrier_sync_f2(dq
, ctxt
, func
, pp
);
3699 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
3700 dispatch_function_t func
)
3702 _dispatch_barrier_sync_f2(dq
, ctxt
, func
, 0);
3708 _dispatch_sync_block_with_private_data(dispatch_queue_t dq
,
3709 void (^work
)(void), dispatch_block_flags_t flags
)
3711 pthread_priority_t pp
= _dispatch_block_get_priority(work
);
3713 flags
|= _dispatch_block_get_flags(work
);
3714 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3715 pthread_priority_t tp
= _dispatch_get_priority();
3716 tp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3718 pp
= tp
| _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3719 } else if (_dispatch_block_sync_should_enforce_qos_class(flags
)) {
3720 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3723 // balanced in d_block_sync_invoke or d_block_wait
3724 if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
3725 dbpd_queue
, NULL
, dq
, relaxed
)) {
3726 _dispatch_retain(dq
);
3728 if (flags
& DISPATCH_BLOCK_BARRIER
) {
3729 _dispatch_barrier_sync_f(dq
, work
, _dispatch_block_sync_invoke
, pp
);
3731 _dispatch_sync_f(dq
, work
, _dispatch_block_sync_invoke
, pp
);
3736 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
3738 if (slowpath(_dispatch_block_has_private_data(work
))) {
3739 dispatch_block_flags_t flags
= DISPATCH_BLOCK_BARRIER
;
3740 return _dispatch_sync_block_with_private_data(dq
, work
, flags
);
3742 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
3748 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq
, void *ctxt
,
3749 dispatch_function_t func
)
3751 // Use for mutation of queue-/source-internal state only, ignores target
3753 if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3754 return _dispatch_barrier_async_detached_f(dq
, ctxt
, func
);
3756 // skip the recursion because it's about the queue state only
3757 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
3761 #pragma mark dispatch_sync
3765 _dispatch_non_barrier_complete(dispatch_queue_t dq
)
3767 uint64_t old_state
, new_state
;
3769 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
, {
3770 new_state
= old_state
- DISPATCH_QUEUE_WIDTH_INTERVAL
;
3771 if (_dq_state_is_runnable(new_state
)) {
3772 if (!_dq_state_is_runnable(old_state
)) {
3773 // we're making a FULL -> non FULL transition
3774 new_state
|= DISPATCH_QUEUE_DIRTY
;
3776 if (!_dq_state_drain_locked(new_state
)) {
3777 uint64_t full_width
= new_state
;
3778 if (_dq_state_has_pending_barrier(new_state
)) {
3779 full_width
-= DISPATCH_QUEUE_PENDING_BARRIER
;
3780 full_width
+= DISPATCH_QUEUE_WIDTH_INTERVAL
;
3781 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3783 full_width
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
3784 full_width
+= DISPATCH_QUEUE_IN_BARRIER
;
3786 if ((full_width
& DISPATCH_QUEUE_WIDTH_MASK
) ==
3787 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
3788 new_state
= full_width
;
3789 new_state
&= ~DISPATCH_QUEUE_DIRTY
;
3790 new_state
|= _dispatch_tid_self();
3796 if (_dq_state_is_in_barrier(new_state
)) {
3797 return _dispatch_try_lock_transfer_or_wakeup(dq
);
3799 if (!_dq_state_is_runnable(old_state
)) {
3800 _dispatch_queue_try_wakeup(dq
, new_state
, 0);
3806 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3807 pthread_priority_t pp
)
3809 dispatch_assert(dq
->do_targetq
);
3811 pp
= _dispatch_get_priority();
3812 pp
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
3813 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3815 dispatch_thread_event_s event
;
3816 _dispatch_thread_event_init(&event
);
3817 uint32_t th_self
= _dispatch_tid_self();
3818 struct dispatch_continuation_s dc
= {
3819 .dc_flags
= DISPATCH_OBJ_SYNC_SLOW_BIT
,
3820 #if DISPATCH_INTROSPECTION
3824 .dc_data
= (void*)(uintptr_t)th_self
,
3827 .dc_voucher
= DISPATCH_NO_VOUCHER
,
3830 uint64_t dq_state
= os_atomic_load2o(dq
, dq_state
, relaxed
);
3831 if (unlikely(_dq_state_drain_locked_by(dq_state
, th_self
))) {
3832 DISPATCH_CLIENT_CRASH(dq
, "dispatch_sync called on queue "
3833 "already owned by current thread");
3836 _dispatch_continuation_push_sync_slow(dq
, &dc
);
3837 _dispatch_thread_event_wait(&event
); // acquire
3838 _dispatch_thread_event_destroy(&event
);
3839 if (_dispatch_queue_received_override(dq
, pp
)) {
3840 // Ensure that the root queue sees that this thread was overridden.
3841 // pairs with the _dispatch_wqthread_override_start in
3842 // _dispatch_continuation_slow_item_signal
3843 _dispatch_set_defaultpriority_override();
3845 _dispatch_non_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3848 DISPATCH_ALWAYS_INLINE
3850 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3851 pthread_priority_t pp
)
3853 // <rdar://problem/24738102&24743140> reserving non barrier width
3854 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3855 // equivalent), so we have to check that this thread hasn't enqueued
3856 // anything ahead of this call or we can break ordering
3857 if (slowpath(dq
->dq_items_tail
)) {
3858 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
);
3860 // concurrent queues do not respect width on sync
3861 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq
))) {
3862 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
);
3864 _dispatch_non_barrier_sync_f_invoke_inline(dq
, ctxt
, func
, pp
);
3869 _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3870 pthread_priority_t pp
)
3872 if (DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
)) {
3873 return _dispatch_sync_f2(dq
, ctxt
, func
, pp
);
3875 return _dispatch_barrier_sync_f(dq
, ctxt
, func
, pp
);
3880 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3882 if (DISPATCH_QUEUE_USES_REDIRECTION(dq
->dq_width
)) {
3883 return _dispatch_sync_f2(dq
, ctxt
, func
, 0);
3885 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
3890 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
3892 if (slowpath(_dispatch_block_has_private_data(work
))) {
3893 return _dispatch_sync_block_with_private_data(dq
, work
, 0);
3895 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
3900 #pragma mark dispatch_trysync
3902 struct trysync_context
{
3903 dispatch_queue_t tc_dq
;
3905 dispatch_function_t tc_func
;
3910 _dispatch_trysync_recurse(dispatch_queue_t dq
,
3911 struct trysync_context
*tc
, bool barrier
)
3913 dispatch_queue_t tq
= dq
->do_targetq
;
3916 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq
))) {
3920 // <rdar://problem/24743140> check nothing was queued by the current
3921 // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
3922 // ignores the ENQUEUED bit which could cause it to miss a barrier_async
3923 // made by the same thread just before.
3924 if (slowpath(dq
->dq_items_tail
)) {
3927 // concurrent queues do not respect width on sync
3928 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq
))) {
3934 if (_dispatch_queue_cannot_trysync(tq
)) {
3935 _dispatch_queue_atomic_flags_set(dq
, DQF_CANNOT_TRYSYNC
);
3937 } else if (tq
->do_targetq
) {
3938 rc
= _dispatch_trysync_recurse(tq
, tc
, tq
->dq_width
== 1);
3939 if (rc
== ENOTSUP
) {
3940 _dispatch_queue_atomic_flags_set(dq
, DQF_CANNOT_TRYSYNC
);
3943 dispatch_thread_frame_s dtf
;
3944 _dispatch_thread_frame_push(&dtf
, tq
);
3945 _dispatch_sync_function_invoke(tc
->tc_dq
, tc
->tc_ctxt
, tc
->tc_func
);
3946 _dispatch_thread_frame_pop(&dtf
);
3949 _dispatch_barrier_complete(dq
);
3951 _dispatch_non_barrier_complete(dq
);
3958 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
3959 dispatch_function_t f
)
3961 if (slowpath(!dq
->do_targetq
)) {
3962 _dispatch_sync_function_invoke(dq
, ctxt
, f
);
3965 if (slowpath(_dispatch_queue_cannot_trysync(dq
))) {
3968 struct trysync_context tc
= {
3973 return _dispatch_trysync_recurse(dq
, &tc
, true) == 0;
3978 _dispatch_trysync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t f
)
3980 if (slowpath(!dq
->do_targetq
)) {
3981 _dispatch_sync_function_invoke(dq
, ctxt
, f
);
3984 if (slowpath(_dispatch_queue_cannot_trysync(dq
))) {
3987 struct trysync_context tc
= {
3992 return _dispatch_trysync_recurse(dq
, &tc
, dq
->dq_width
== 1) == 0;
3996 #pragma mark dispatch_after
3998 DISPATCH_ALWAYS_INLINE
4000 _dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
4001 void *ctxt
, void *handler
, bool block
)
4003 dispatch_source_t ds
;
4004 uint64_t leeway
, delta
;
4006 if (when
== DISPATCH_TIME_FOREVER
) {
4008 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
4013 delta
= _dispatch_timeout(when
);
4016 return dispatch_async(queue
, handler
);
4018 return dispatch_async_f(queue
, ctxt
, handler
);
4020 leeway
= delta
/ 10; // <rdar://problem/13447496>
4022 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
4023 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
4025 // this function can and should be optimized to not use a dispatch source
4026 ds
= dispatch_source_create(&_dispatch_source_type_after
, 0, 0, queue
);
4027 dispatch_assert(ds
);
4029 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4031 _dispatch_continuation_init(dc
, ds
, handler
, 0, 0, 0);
4033 _dispatch_continuation_init_f(dc
, ds
, ctxt
, handler
, 0, 0, 0);
4035 // reference `ds` so that it doesn't show up as a leak
4037 _dispatch_source_set_event_handler_continuation(ds
, dc
);
4038 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, leeway
);
4039 dispatch_activate(ds
);
4044 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
4045 dispatch_function_t func
)
4047 _dispatch_after(when
, queue
, ctxt
, func
, false);
4052 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
4053 dispatch_block_t work
)
4055 _dispatch_after(when
, queue
, NULL
, work
, true);
4060 #pragma mark dispatch_queue_wakeup
4064 _dispatch_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4065 dispatch_wakeup_flags_t flags
)
4067 dispatch_queue_wakeup_target_t target
= DISPATCH_QUEUE_WAKEUP_NONE
;
4069 if (_dispatch_queue_class_probe(dq
)) {
4070 target
= DISPATCH_QUEUE_WAKEUP_TARGET
;
4073 return _dispatch_queue_class_wakeup(dq
, pp
, flags
, target
);
4075 return _dispatch_queue_class_override_drainer(dq
, pp
, flags
);
4076 } else if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4077 return _dispatch_release_tailcall(dq
);
4081 #if DISPATCH_COCOA_COMPAT
4082 DISPATCH_ALWAYS_INLINE
4084 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle
)
4087 return MACH_PORT_VALID(handle
);
4088 #elif defined(__linux__)
4091 #error "runloop support not implemented on this platform"
4095 DISPATCH_ALWAYS_INLINE
4096 static inline dispatch_runloop_handle_t
4097 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq
)
4100 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
);
4101 #elif defined(__linux__)
4102 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4103 return ((dispatch_runloop_handle_t
)(uintptr_t)dq
->do_ctxt
) - 1;
4105 #error "runloop support not implemented on this platform"
4109 DISPATCH_ALWAYS_INLINE
4111 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq
, dispatch_runloop_handle_t handle
)
4114 dq
->do_ctxt
= (void *)(uintptr_t)handle
;
4115 #elif defined(__linux__)
4116 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4117 dq
->do_ctxt
= (void *)(uintptr_t)(handle
+ 1);
4119 #error "runloop support not implemented on this platform"
4122 #endif // DISPATCH_COCOA_COMPAT
4125 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4126 dispatch_wakeup_flags_t flags
)
4128 #if DISPATCH_COCOA_COMPAT
4129 if (slowpath(_dispatch_queue_atomic_flags(dq
) & DQF_RELEASED
)) {
4130 // <rdar://problem/14026816>
4131 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4134 if (_dispatch_queue_class_probe(dq
)) {
4135 return _dispatch_runloop_queue_poke(dq
, pp
, flags
);
4138 pp
= _dispatch_queue_reset_override_priority(dq
, true);
4140 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4141 if (_dispatch_queue_class_probe(dq
)) {
4142 _dispatch_runloop_queue_poke(dq
, pp
, flags
);
4144 _dispatch_thread_override_end(owner
, dq
);
4147 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4148 return _dispatch_release_tailcall(dq
);
4151 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4156 _dispatch_main_queue_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
4157 dispatch_wakeup_flags_t flags
)
4159 #if DISPATCH_COCOA_COMPAT
4160 if (_dispatch_queue_is_thread_bound(dq
)) {
4161 return _dispatch_runloop_queue_wakeup(dq
, pp
, flags
);
4164 return _dispatch_queue_wakeup(dq
, pp
, flags
);
4168 _dispatch_root_queue_wakeup(dispatch_queue_t dq
,
4169 pthread_priority_t pp DISPATCH_UNUSED
,
4170 dispatch_wakeup_flags_t flags
)
4172 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4173 // see _dispatch_queue_push_set_head
4174 dispatch_assert(flags
& DISPATCH_WAKEUP_FLUSH
);
4176 _dispatch_global_queue_poke(dq
);
4180 #pragma mark dispatch root queues poke
4182 #if DISPATCH_COCOA_COMPAT
4184 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq
)
4186 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
4187 if (!_dispatch_runloop_handle_is_valid(handle
)) {
4192 mach_port_t mp
= handle
;
4193 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
4195 case MACH_SEND_TIMEOUT
:
4196 case MACH_SEND_TIMED_OUT
:
4197 case MACH_SEND_INVALID_DEST
:
4200 (void)dispatch_assume_zero(kr
);
4203 #elif defined(__linux__)
4206 result
= eventfd_write(handle
, 1);
4207 } while (result
== -1 && errno
== EINTR
);
4208 (void)dispatch_assume_zero(result
);
4210 #error "runloop support not implemented on this platform"
4216 _dispatch_runloop_queue_poke(dispatch_queue_t dq
,
4217 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
)
4219 // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
4220 // a release barrier and that when runloop queues stop being thread bound
4221 // they have a non optional wake-up to start being a "normal" queue
4222 // either in _dispatch_runloop_queue_xref_dispose,
4223 // or in _dispatch_queue_cleanup2() for the main thread.
4225 if (dq
== &_dispatch_main_q
) {
4226 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
4227 _dispatch_runloop_queue_handle_init
);
4229 _dispatch_queue_override_priority(dq
, /* inout */ &pp
, /* inout */ &flags
);
4230 if (flags
& DISPATCH_WAKEUP_OVERRIDING
) {
4231 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4232 _dispatch_thread_override_start(owner
, pp
, dq
);
4233 if (flags
& DISPATCH_WAKEUP_WAS_OVERRIDDEN
) {
4234 _dispatch_thread_override_end(owner
, dq
);
4237 _dispatch_runloop_queue_class_poke(dq
);
4238 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
4239 return _dispatch_release_tailcall(dq
);
4246 _dispatch_global_queue_poke_slow(dispatch_queue_t dq
, unsigned int n
)
4248 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4252 _dispatch_debug_root_queue(dq
, __func__
);
4253 #if HAVE_PTHREAD_WORKQUEUES
4254 #if DISPATCH_USE_PTHREAD_POOL
4255 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
4258 _dispatch_root_queue_debug("requesting new worker thread for global "
4260 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4261 if (qc
->dgq_kworkqueue
) {
4262 pthread_workitem_handle_t wh
;
4263 unsigned int gen_cnt
;
4265 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
4266 _dispatch_worker_thread4
, dq
, &wh
, &gen_cnt
);
4267 (void)dispatch_assume_zero(r
);
4271 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4272 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4273 if (!dq
->dq_priority
) {
4274 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
4275 qc
->dgq_wq_options
, (int)i
);
4276 (void)dispatch_assume_zero(r
);
4280 #if HAVE_PTHREAD_WORKQUEUE_QOS
4281 r
= _pthread_workqueue_addthreads((int)i
, dq
->dq_priority
);
4282 (void)dispatch_assume_zero(r
);
4286 #endif // HAVE_PTHREAD_WORKQUEUES
4287 #if DISPATCH_USE_PTHREAD_POOL
4288 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
4289 if (fastpath(pqc
->dpq_thread_mediator
.do_vtable
)) {
4290 while (dispatch_semaphore_signal(&pqc
->dpq_thread_mediator
)) {
4296 uint32_t j
, t_count
;
4297 // seq_cst with atomic store to tail <rdar://problem/16932833>
4298 t_count
= os_atomic_load2o(qc
, dgq_thread_pool_size
, ordered
);
4301 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4305 j
= i
> t_count
? t_count
: i
;
4306 } while (!os_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
4307 t_count
- j
, &t_count
, acquire
));
4309 pthread_attr_t
*attr
= &pqc
->dpq_thread_attr
;
4310 pthread_t tid
, *pthr
= &tid
;
4311 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4312 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
4313 pthr
= _dispatch_mgr_root_queue_init();
4317 _dispatch_retain(dq
);
4318 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
4320 (void)dispatch_assume_zero(r
);
4322 _dispatch_temporary_resource_shortage();
4325 #endif // DISPATCH_USE_PTHREAD_POOL
4329 _dispatch_global_queue_poke_n(dispatch_queue_t dq
, unsigned int n
)
4331 if (!_dispatch_queue_class_probe(dq
)) {
4334 #if HAVE_PTHREAD_WORKQUEUES
4335 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4337 #if DISPATCH_USE_PTHREAD_POOL
4338 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
4340 !os_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
4341 _dispatch_root_queue_debug("worker thread request still pending for "
4342 "global queue: %p", dq
);
4345 #endif // HAVE_PTHREAD_WORKQUEUES
4346 return _dispatch_global_queue_poke_slow(dq
, n
);
4350 _dispatch_global_queue_poke(dispatch_queue_t dq
)
4352 return _dispatch_global_queue_poke_n(dq
, 1);
4357 _dispatch_queue_push_list_slow(dispatch_queue_t dq
, unsigned int n
)
4359 return _dispatch_global_queue_poke_n(dq
, n
);
4363 #pragma mark dispatch_queue_drain
4366 _dispatch_continuation_pop(dispatch_object_t dou
, dispatch_queue_t dq
,
4367 dispatch_invoke_flags_t flags
)
4369 _dispatch_continuation_pop_inline(dou
, dq
, flags
);
4373 _dispatch_continuation_invoke(dispatch_object_t dou
, voucher_t override_voucher
,
4374 dispatch_invoke_flags_t flags
)
4376 _dispatch_continuation_invoke_inline(dou
, override_voucher
, flags
);
4380 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4381 * (redirecting or not).
4385 * Serial drain is about serial queues (width == 1). It doesn't support
4386 * the redirecting mode, which doesn't make sense, and treats all continuations
4387 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4388 * is optimized away.
4390 * Serial drain stops if the width of the queue grows to larger than 1.
4391 * Going through a serial drain prevents any recursive drain from being
4396 * When in non-redirecting mode (meaning one of the target queues is serial),
4397 * non-barriers and barriers alike run in the context of the drain thread.
4398 * Slow non-barrier items are still all signaled so that they can make progress
4399 * toward the dispatch_sync() that will serialize them all .
4401 * In redirecting mode, non-barrier work items are redirected downward.
4403 * Concurrent drain stops if the width of the queue becomes 1, so that the
4404 * queue drain moves to the more efficient serial mode.
4406 DISPATCH_ALWAYS_INLINE
4407 static dispatch_queue_t
4408 _dispatch_queue_drain(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
,
4409 uint64_t *owned_ptr
, struct dispatch_object_s
**dc_out
,
4412 dispatch_queue_t orig_tq
= dq
->do_targetq
;
4413 dispatch_thread_frame_s dtf
;
4414 struct dispatch_object_s
*dc
= NULL
, *next_dc
;
4415 uint64_t owned
= *owned_ptr
;
4417 _dispatch_thread_frame_push(&dtf
, dq
);
4418 if (_dq_state_is_in_barrier(owned
)) {
4419 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4420 // but width can change while draining barrier work items, so we only
4421 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4422 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4425 while (dq
->dq_items_tail
) {
4426 dc
= _dispatch_queue_head(dq
);
4428 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq
))) {
4431 if (unlikely(orig_tq
!= dq
->do_targetq
)) {
4434 if (unlikely(serial_drain
!= (dq
->dq_width
== 1))) {
4437 if (serial_drain
|| _dispatch_object_is_barrier(dc
)) {
4438 if (!serial_drain
&& owned
!= DISPATCH_QUEUE_IN_BARRIER
) {
4441 next_dc
= _dispatch_queue_next(dq
, dc
);
4442 if (_dispatch_object_is_slow_item(dc
)) {
4444 goto out_with_deferred
;
4447 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4448 // we just ran barrier work items, we have to make their
4449 // effect visible to other sync work items on other threads
4450 // that may start coming in after this point, hence the
4452 os_atomic_and2o(dq
, dq_state
, ~owned
, release
);
4453 owned
= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4454 } else if (unlikely(owned
== 0)) {
4455 if (_dispatch_object_is_slow_item(dc
)) {
4456 // sync "readers" don't observe the limit
4457 _dispatch_queue_reserve_sync_width(dq
);
4458 } else if (!_dispatch_queue_try_acquire_async(dq
)) {
4459 goto out_with_no_width
;
4461 owned
= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4464 next_dc
= _dispatch_queue_next(dq
, dc
);
4465 if (_dispatch_object_is_slow_item(dc
)) {
4466 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4467 _dispatch_continuation_slow_item_signal(dq
, dc
);
4471 if (flags
& DISPATCH_INVOKE_REDIRECTING_DRAIN
) {
4472 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4473 _dispatch_continuation_redirect(dq
, dc
);
4478 _dispatch_continuation_pop_inline(dc
, dq
, flags
);
4479 _dispatch_perfmon_workitem_inc();
4480 if (unlikely(dtf
.dtf_deferred
)) {
4481 goto out_with_deferred_compute_owned
;
4483 } while ((dc
= next_dc
));
4487 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4488 // if we're IN_BARRIER we really own the full width too
4489 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4492 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4495 _dispatch_thread_frame_pop(&dtf
);
4496 return dc
? dq
->do_targetq
: NULL
;
4500 _dispatch_thread_frame_pop(&dtf
);
4503 out_with_deferred_compute_owned
:
4505 owned
= DISPATCH_QUEUE_IN_BARRIER
+ DISPATCH_QUEUE_WIDTH_INTERVAL
;
4507 if (owned
== DISPATCH_QUEUE_IN_BARRIER
) {
4508 // if we're IN_BARRIER we really own the full width too
4509 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4512 owned
= _dispatch_queue_adjust_owned(dq
, owned
, next_dc
);
4517 if (unlikely(!dc_out
)) {
4518 DISPATCH_INTERNAL_CRASH(dc
,
4519 "Deferred continuation on source, mach channel or mgr");
4522 _dispatch_thread_frame_pop(&dtf
);
4523 return dq
->do_targetq
;
4527 static dispatch_queue_t
4528 _dispatch_queue_concurrent_drain(dispatch_queue_t dq
,
4529 dispatch_invoke_flags_t flags
, uint64_t *owned
,
4530 struct dispatch_object_s
**dc_ptr
)
4532 return _dispatch_queue_drain(dq
, flags
, owned
, dc_ptr
, false);
4537 _dispatch_queue_serial_drain(dispatch_queue_t dq
,
4538 dispatch_invoke_flags_t flags
, uint64_t *owned
,
4539 struct dispatch_object_s
**dc_ptr
)
4541 flags
&= ~(dispatch_invoke_flags_t
)DISPATCH_INVOKE_REDIRECTING_DRAIN
;
4542 return _dispatch_queue_drain(dq
, flags
, owned
, dc_ptr
, true);
4545 #if DISPATCH_COCOA_COMPAT
4547 _dispatch_main_queue_drain(void)
4549 dispatch_queue_t dq
= &_dispatch_main_q
;
4550 dispatch_thread_frame_s dtf
;
4552 if (!dq
->dq_items_tail
) {
4556 if (!fastpath(_dispatch_queue_is_thread_bound(dq
))) {
4557 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
4558 " after dispatch_main()");
4560 mach_port_t owner
= DISPATCH_QUEUE_DRAIN_OWNER(dq
);
4561 if (slowpath(owner
!= _dispatch_tid_self())) {
4562 DISPATCH_CLIENT_CRASH(owner
, "_dispatch_main_queue_callback_4CF called"
4563 " from the wrong thread");
4566 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
4567 _dispatch_runloop_queue_handle_init
);
4569 _dispatch_perfmon_start();
4570 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
4571 // drains the main runloop, as this should not be observable that way
4572 _dispatch_thread_frame_push_and_rebase(&dtf
, dq
, NULL
);
4574 pthread_priority_t old_pri
= _dispatch_get_priority();
4575 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
, NULL
);
4576 voucher_t voucher
= _voucher_copy();
4578 struct dispatch_object_s
*dc
, *next_dc
, *tail
;
4579 dc
= os_mpsc_capture_snapshot(dq
, dq_items
, &tail
);
4581 next_dc
= os_mpsc_pop_snapshot_head(dc
, tail
, do_next
);
4582 _dispatch_continuation_pop_inline(dc
, dq
, DISPATCH_INVOKE_NONE
);
4583 _dispatch_perfmon_workitem_inc();
4584 } while ((dc
= next_dc
));
4586 // runloop based queues use their port for the queue PUBLISH pattern
4587 // so this raw call to dx_wakeup(0) is valid
4588 dx_wakeup(dq
, 0, 0);
4589 _dispatch_voucher_debug("main queue restore", voucher
);
4590 _dispatch_reset_defaultpriority(old_dp
);
4591 _dispatch_reset_priority_and_voucher(old_pri
, voucher
);
4592 _dispatch_thread_frame_pop(&dtf
);
4593 _dispatch_perfmon_end();
4594 _dispatch_force_cache_cleanup();
4598 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
4600 if (!dq
->dq_items_tail
) {
4603 dispatch_thread_frame_s dtf
;
4604 _dispatch_perfmon_start();
4605 _dispatch_thread_frame_push(&dtf
, dq
);
4606 pthread_priority_t old_pri
= _dispatch_get_priority();
4607 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
, NULL
);
4608 voucher_t voucher
= _voucher_copy();
4610 struct dispatch_object_s
*dc
, *next_dc
;
4611 dc
= _dispatch_queue_head(dq
);
4612 next_dc
= _dispatch_queue_next(dq
, dc
);
4613 _dispatch_continuation_pop_inline(dc
, dq
, DISPATCH_INVOKE_NONE
);
4614 _dispatch_perfmon_workitem_inc();
4617 // runloop based queues use their port for the queue PUBLISH pattern
4618 // so this raw call to dx_wakeup(0) is valid
4619 dx_wakeup(dq
, 0, 0);
4622 _dispatch_voucher_debug("runloop queue restore", voucher
);
4623 _dispatch_reset_defaultpriority(old_dp
);
4624 _dispatch_reset_priority_and_voucher(old_pri
, voucher
);
4625 _dispatch_thread_frame_pop(&dtf
);
4626 _dispatch_perfmon_end();
4627 _dispatch_force_cache_cleanup();
4634 _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq
)
4636 dispatch_continuation_t dc_tmp
, dc_start
, dc_end
;
4637 struct dispatch_object_s
*dc
= NULL
;
4638 uint64_t dq_state
, owned
;
4641 owned
= DISPATCH_QUEUE_IN_BARRIER
;
4642 owned
+= dq
->dq_width
* DISPATCH_QUEUE_WIDTH_INTERVAL
;
4643 attempt_running_slow_head
:
4644 if (slowpath(dq
->dq_items_tail
) && !DISPATCH_QUEUE_IS_SUSPENDED(dq
)) {
4645 dc
= _dispatch_queue_head(dq
);
4646 if (!_dispatch_object_is_slow_item(dc
)) {
4647 // not a slow item, needs to wake up
4648 } else if (fastpath(dq
->dq_width
== 1) ||
4649 _dispatch_object_is_barrier(dc
)) {
4650 // rdar://problem/8290662 "barrier/writer lock transfer"
4651 dc_start
= dc_end
= (dispatch_continuation_t
)dc
;
4654 dc
= _dispatch_queue_next(dq
, dc
);
4656 // <rdar://problem/10164594> "reader lock transfer"
4657 // we must not signal semaphores immediately because our right
4658 // for dequeuing is granted through holding the full "barrier" width
4659 // which a signaled work item could relinquish out from our feet
4660 dc_start
= (dispatch_continuation_t
)dc
;
4662 // no check on width here because concurrent queues
4663 // do not respect width for blocked readers, the thread
4664 // is already spent anyway
4665 dc_end
= (dispatch_continuation_t
)dc
;
4666 owned
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4668 dc
= _dispatch_queue_next(dq
, dc
);
4669 } while (dc
&& _dispatch_object_is_slow_non_barrier(dc
));
4673 _dispatch_queue_drain_transfer_lock(dq
, owned
, dc_start
);
4675 // signaled job will release the continuation
4677 dc_start
= dc_start
->do_next
;
4678 _dispatch_continuation_slow_item_signal(dq
, dc_tmp
);
4679 } while (dc_tmp
!= dc_end
);
4684 if (dc
|| dx_metatype(dq
) != _DISPATCH_QUEUE_TYPE
) {
4685 // <rdar://problem/23336992> the following wakeup is needed for sources
4686 // or mach channels: when ds_pending_data is set at the same time
4687 // as a trysync_f happens, lock transfer code above doesn't know about
4688 // ds_pending_data or the wakeup logic, but lock transfer is useless
4689 // for sources and mach channels in the first place.
4690 owned
= _dispatch_queue_adjust_owned(dq
, owned
, dc
);
4691 dq_state
= _dispatch_queue_drain_unlock(dq
, owned
, NULL
);
4692 return _dispatch_queue_try_wakeup(dq
, dq_state
, 0);
4693 } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq
, owned
))) {
4694 // someone enqueued a slow item at the head
4695 // looping may be its last chance
4696 goto attempt_running_slow_head
;
4701 _dispatch_mgr_queue_drain(void)
4703 const dispatch_invoke_flags_t flags
= DISPATCH_INVOKE_MANAGER_DRAIN
;
4704 dispatch_queue_t dq
= &_dispatch_mgr_q
;
4705 uint64_t owned
= DISPATCH_QUEUE_SERIAL_DRAIN_OWNED
;
4707 if (dq
->dq_items_tail
) {
4708 _dispatch_perfmon_start();
4709 if (slowpath(_dispatch_queue_serial_drain(dq
, flags
, &owned
, NULL
))) {
4710 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
4712 _dispatch_voucher_debug("mgr queue clear", NULL
);
4714 _dispatch_reset_defaultpriority_override();
4715 _dispatch_perfmon_end();
4718 #if DISPATCH_USE_KEVENT_WORKQUEUE
4719 if (!_dispatch_kevent_workqueue_enabled
)
4722 _dispatch_force_cache_cleanup();
4727 #pragma mark dispatch_queue_invoke
4730 _dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq
,
4731 dispatch_invoke_flags_t flags
, uint64_t to_unlock
,
4732 struct dispatch_object_s
*dc
)
4734 if (_dispatch_object_is_slow_item(dc
)) {
4735 dispatch_assert(to_unlock
== 0);
4736 _dispatch_queue_drain_transfer_lock(dq
, to_unlock
, dc
);
4737 _dispatch_continuation_slow_item_signal(dq
, dc
);
4738 return _dispatch_release_tailcall(dq
);
4741 bool should_defer_again
= false, should_pend_queue
= true;
4742 uint64_t old_state
, new_state
;
4744 if (_dispatch_get_current_queue()->do_targetq
) {
4745 _dispatch_thread_frame_get_current()->dtf_deferred
= dc
;
4746 should_defer_again
= true;
4747 should_pend_queue
= false;
4750 if (dq
->dq_width
> 1) {
4751 should_pend_queue
= false;
4752 } else if (should_pend_queue
) {
4753 dispatch_assert(to_unlock
==
4754 DISPATCH_QUEUE_WIDTH_INTERVAL
+ DISPATCH_QUEUE_IN_BARRIER
);
4755 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
4756 new_state
= old_state
;
4757 if (_dq_state_has_waiters(old_state
) ||
4758 _dq_state_is_enqueued(old_state
)) {
4759 os_atomic_rmw_loop_give_up(break);
4761 new_state
+= DISPATCH_QUEUE_DRAIN_PENDED
;
4762 new_state
-= DISPATCH_QUEUE_IN_BARRIER
;
4763 new_state
-= DISPATCH_QUEUE_WIDTH_INTERVAL
;
4765 should_pend_queue
= (new_state
& DISPATCH_QUEUE_DRAIN_PENDED
);
4768 if (!should_pend_queue
) {
4769 if (to_unlock
& DISPATCH_QUEUE_IN_BARRIER
) {
4770 _dispatch_try_lock_transfer_or_wakeup(dq
);
4771 _dispatch_release(dq
);
4772 } else if (to_unlock
) {
4773 uint64_t dq_state
= _dispatch_queue_drain_unlock(dq
, to_unlock
, NULL
);
4774 _dispatch_queue_try_wakeup(dq
, dq_state
, DISPATCH_WAKEUP_CONSUME
);
4776 _dispatch_release(dq
);
4781 if (!should_defer_again
) {
4782 dx_invoke(dc
, flags
& _DISPATCH_INVOKE_PROPAGATE_MASK
);
4786 uint32_t self
= _dispatch_tid_self();
4787 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
4788 new_state
= old_state
;
4789 if (!_dq_state_drain_pended(old_state
) ||
4790 _dq_state_drain_owner(old_state
) != self
) {
4791 os_atomic_rmw_loop_give_up({
4792 // We may have been overridden, so inform the root queue
4793 _dispatch_set_defaultpriority_override();
4794 return _dispatch_release_tailcall(dq
);
4797 new_state
= DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state
);
4799 if (_dq_state_has_override(old_state
)) {
4800 // Ensure that the root queue sees that this thread was overridden.
4801 _dispatch_set_defaultpriority_override();
4803 return dx_invoke(dq
, flags
| DISPATCH_INVOKE_STEALING
);
4808 _dispatch_queue_finalize_activation(dispatch_queue_t dq
)
4810 dispatch_queue_t tq
= dq
->do_targetq
;
4811 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
4812 _dispatch_queue_atomic_flags_set(tq
, DQF_TARGETED
);
4813 if (dq
->dq_override_voucher
== DISPATCH_NO_VOUCHER
) {
4814 voucher_t v
= tq
->dq_override_voucher
;
4815 if (v
!= DISPATCH_NO_VOUCHER
) {
4816 if (v
) _voucher_retain(v
);
4817 dq
->dq_override_voucher
= v
;
4822 DISPATCH_ALWAYS_INLINE
4823 static inline dispatch_queue_t
4824 dispatch_queue_invoke2(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
,
4825 uint64_t *owned
, struct dispatch_object_s
**dc_ptr
)
4827 dispatch_queue_t otq
= dq
->do_targetq
;
4828 dispatch_queue_t cq
= _dispatch_queue_get_current();
4830 if (slowpath(cq
!= otq
)) {
4833 if (dq
->dq_width
== 1) {
4834 return _dispatch_queue_serial_drain(dq
, flags
, owned
, dc_ptr
);
4836 return _dispatch_queue_concurrent_drain(dq
, flags
, owned
, dc_ptr
);
4839 // 6618342 Contact the team that owns the Instrument DTrace probe before
4840 // renaming this symbol
4843 _dispatch_queue_invoke(dispatch_queue_t dq
, dispatch_invoke_flags_t flags
)
4845 _dispatch_queue_class_invoke(dq
, flags
, dispatch_queue_invoke2
);
4849 #pragma mark dispatch_queue_class_wakeup
4851 #if HAVE_PTHREAD_WORKQUEUE_QOS
4853 _dispatch_queue_override_invoke(dispatch_continuation_t dc
,
4854 dispatch_invoke_flags_t flags
)
4856 dispatch_queue_t old_rq
= _dispatch_queue_get_current();
4857 dispatch_queue_t assumed_rq
= dc
->dc_other
;
4858 voucher_t ov
= DISPATCH_NO_VOUCHER
;
4859 dispatch_object_t dou
;
4861 dou
._do
= dc
->dc_data
;
4862 _dispatch_queue_set_current(assumed_rq
);
4863 flags
|= DISPATCH_INVOKE_OVERRIDING
;
4864 if (dc_type(dc
) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING
)) {
4865 flags
|= DISPATCH_INVOKE_STEALING
;
4867 // balance the fake continuation push in
4868 // _dispatch_root_queue_push_override
4869 _dispatch_trace_continuation_pop(assumed_rq
, dou
._do
);
4871 _dispatch_continuation_pop_forwarded(dc
, ov
, DISPATCH_OBJ_CONSUME_BIT
, {
4872 if (_dispatch_object_has_vtable(dou
._do
)) {
4873 dx_invoke(dou
._do
, flags
);
4875 _dispatch_continuation_invoke_inline(dou
, ov
, flags
);
4878 _dispatch_queue_set_current(old_rq
);
4881 DISPATCH_ALWAYS_INLINE
4883 _dispatch_need_global_root_queue_push_override(dispatch_queue_t rq
,
4884 pthread_priority_t pp
)
4886 pthread_priority_t rqp
= rq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4887 bool defaultqueue
= rq
->dq_priority
& _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
4889 if (unlikely(!rqp
)) return false;
4891 pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4892 return defaultqueue
? pp
&& pp
!= rqp
: pp
> rqp
;
4895 DISPATCH_ALWAYS_INLINE
4897 _dispatch_need_global_root_queue_push_override_stealer(dispatch_queue_t rq
,
4898 pthread_priority_t pp
)
4900 pthread_priority_t rqp
= rq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4901 bool defaultqueue
= rq
->dq_priority
& _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
4903 if (unlikely(!rqp
)) return false;
4905 pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
4906 return defaultqueue
|| pp
> rqp
;
4911 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq
,
4912 dispatch_object_t dou
, pthread_priority_t pp
)
4914 bool overcommit
= orig_rq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4915 dispatch_queue_t rq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
4916 dispatch_continuation_t dc
= dou
._dc
;
4918 if (_dispatch_object_is_redirection(dc
)) {
4919 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
4921 dc
->dc_func
= (void *)orig_rq
;
4923 dc
= _dispatch_continuation_alloc();
4924 dc
->do_vtable
= DC_VTABLE(OVERRIDE_OWNING
);
4925 // fake that we queued `dou` on `orig_rq` for introspection purposes
4926 _dispatch_trace_continuation_push(orig_rq
, dou
);
4928 dc
->dc_other
= orig_rq
;
4929 dc
->dc_data
= dou
._do
;
4930 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
4931 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
4934 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
4935 _dispatch_queue_push_inline(rq
, dc
, 0, 0);
4940 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq
,
4941 dispatch_queue_t dq
, pthread_priority_t pp
)
4943 bool overcommit
= orig_rq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
4944 dispatch_queue_t rq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
4945 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
4947 dc
->do_vtable
= DC_VTABLE(OVERRIDE_STEALING
);
4948 _dispatch_retain(dq
);
4951 dc
->dc_other
= orig_rq
;
4953 dc
->dc_priority
= DISPATCH_NO_PRIORITY
;
4954 dc
->dc_voucher
= DISPATCH_NO_VOUCHER
;
4956 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
4957 _dispatch_queue_push_inline(rq
, dc
, 0, 0);
4962 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq
,
4963 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
, uint64_t dq_state
)
4965 mach_port_t owner
= _dq_state_drain_owner(dq_state
);
4966 pthread_priority_t pp2
;
4967 dispatch_queue_t tq
;
4971 int rc
= _dispatch_wqthread_override_start_check_owner(owner
, pp
,
4972 &dq
->dq_state_lock
);
4973 // EPERM means the target of the override is not a work queue thread
4974 // and could be a thread bound queue such as the main queue.
4975 // When that happens we must get to that queue and wake it up if we
4976 // want the override to be appplied and take effect.
4982 if (_dq_state_is_suspended(dq_state
)) {
4986 tq
= dq
->do_targetq
;
4988 if (_dispatch_queue_has_immutable_target(dq
)) {
4990 } else if (_dispatch_is_in_root_queues_array(tq
)) {
4991 // avoid locking when we recognize the target queue as a global root
4992 // queue it is gross, but is a very common case. The locking isn't
4993 // needed because these target queues cannot go away.
4995 } else if (_dispatch_queue_sidelock_trylock(dq
, pp
)) {
4996 // <rdar://problem/17735825> to traverse the tq chain safely we must
4997 // lock it to ensure it cannot change
4999 tq
= dq
->do_targetq
;
5000 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5003 // Leading to being there, the current thread has:
5004 // 1. enqueued an object on `dq`
5005 // 2. raised the dq_override value of `dq`
5006 // 3. set the HAS_OVERRIDE bit and not seen an owner
5007 // 4. tried and failed to acquire the side lock
5010 // The side lock owner can only be one of three things:
5012 // - The suspend/resume side count code. Besides being unlikely,
5013 // it means that at this moment the queue is actually suspended,
5014 // which transfers the responsibility of applying the override to
5015 // the eventual dispatch_resume().
5017 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5018 // means that the trysync it does wasn't being drained when (3)
5019 // happened which can only be explained by one of these interleavings:
5021 // o `dq` became idle between when the object queued in (1) ran and
5022 // the set_target_queue call and we were unlucky enough that our
5023 // step (3) happened while this queue was idle. There is no reason
5024 // to override anything anymore, the queue drained to completion
5025 // while we were preempted, our job is done.
5027 // o `dq` is queued but not draining during (1-3), then when we try
5028 // to lock at (4) the queue is now draining a set_target_queue.
5029 // Since we set HAS_OVERRIDE with a release barrier, the effect of
5030 // (2) was visible to the drainer when he acquired the drain lock,
5031 // and that guy has applied our override. Our job is done.
5033 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5034 // which is fine because trylock leaves a hint that we failed our
5035 // trylock, causing the tryunlock below to fail and reassess whether
5036 // a better override needs to be applied.
5038 _dispatch_ktrace1(DISPATCH_PERF_mutable_target
, dq
);
5043 if (dx_type(tq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
5044 if (_dispatch_need_global_root_queue_push_override_stealer(tq
, pp
)) {
5045 _dispatch_root_queue_push_override_stealer(tq
, dq
, pp
);
5047 } else if (_dispatch_queue_need_override(tq
, pp
)) {
5048 dx_wakeup(tq
, pp
, DISPATCH_WAKEUP_OVERRIDING
);
5050 while (unlikely(locked
&& !_dispatch_queue_sidelock_tryunlock(dq
))) {
5051 // rdar://problem/24081326
5053 // Another instance of _dispatch_queue_class_wakeup_with_override()
5054 // tried to acquire the side lock while we were running, and could have
5055 // had a better override than ours to apply.
5057 pp2
= dq
->dq_override
;
5060 // The other instance had a better priority than ours, override
5061 // our thread, and apply the override that wasn't applied to `dq`
5068 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5069 return _dispatch_release_tailcall(dq
);
5072 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5076 _dispatch_queue_class_override_drainer(dispatch_queue_t dq
,
5077 pthread_priority_t pp
, dispatch_wakeup_flags_t flags
)
5079 #if HAVE_PTHREAD_WORKQUEUE_QOS
5080 uint64_t dq_state
, value
;
5083 // Someone is trying to override the last work item of the queue.
5084 // Do not remember this override on the queue because we know the precise
5085 // duration the override is required for: until the current drain unlocks.
5087 // That is why this function only tries to set HAS_OVERRIDE if we can
5088 // still observe a drainer, and doesn't need to set the DIRTY bit
5089 // because oq_override wasn't touched and there is no race to resolve
5091 os_atomic_rmw_loop2o(dq
, dq_state
, dq_state
, value
, relaxed
, {
5092 if (!_dq_state_drain_locked(dq_state
)) {
5093 os_atomic_rmw_loop_give_up(break);
5095 value
= dq_state
| DISPATCH_QUEUE_HAS_OVERRIDE
;
5097 if (_dq_state_drain_locked(dq_state
)) {
5098 return _dispatch_queue_class_wakeup_with_override(dq
, pp
,
5103 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5104 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5105 return _dispatch_release_tailcall(dq
);
5109 #if DISPATCH_USE_KEVENT_WORKQUEUE
5112 _dispatch_trystash_to_deferred_items(dispatch_queue_t dq
, dispatch_object_t dou
,
5113 pthread_priority_t pp
, dispatch_deferred_items_t ddi
)
5115 dispatch_priority_t old_pp
= ddi
->ddi_stashed_pp
;
5116 dispatch_queue_t old_dq
= ddi
->ddi_stashed_dq
;
5117 struct dispatch_object_s
*old_dou
= ddi
->ddi_stashed_dou
;
5118 dispatch_priority_t rq_overcommit
;
5120 rq_overcommit
= dq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
5121 if (likely(!old_pp
|| rq_overcommit
)) {
5122 ddi
->ddi_stashed_dq
= dq
;
5123 ddi
->ddi_stashed_dou
= dou
._do
;
5124 ddi
->ddi_stashed_pp
= (dispatch_priority_t
)pp
| rq_overcommit
|
5125 _PTHREAD_PRIORITY_PRIORITY_MASK
;
5126 if (likely(!old_pp
)) {
5129 // push the previously stashed item
5130 pp
= old_pp
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
5134 if (_dispatch_need_global_root_queue_push_override(dq
, pp
)) {
5135 return _dispatch_root_queue_push_override(dq
, dou
, pp
);
5137 // bit of cheating: we should really pass `pp` but we know that we are
5138 // pushing onto a global queue at this point, and we just checked that
5139 // `pp` doesn't matter.
5140 DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
);
5141 _dispatch_queue_push_inline(dq
, dou
, 0, 0);
5147 _dispatch_queue_push_slow(dispatch_queue_t dq
, dispatch_object_t dou
,
5148 pthread_priority_t pp
)
5150 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
5151 _dispatch_root_queues_init_once
);
5152 _dispatch_queue_push(dq
, dou
, pp
);
5157 _dispatch_queue_push(dispatch_queue_t dq
, dispatch_object_t dou
,
5158 pthread_priority_t pp
)
5160 _dispatch_assert_is_valid_qos_override(pp
);
5161 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE
) {
5162 #if DISPATCH_USE_KEVENT_WORKQUEUE
5163 dispatch_deferred_items_t ddi
= _dispatch_deferred_items_get();
5164 if (unlikely(ddi
&& !(ddi
->ddi_stashed_pp
&
5165 (dispatch_priority_t
)_PTHREAD_PRIORITY_FLAGS_MASK
))) {
5166 dispatch_assert(_dispatch_root_queues_pred
== DLOCK_ONCE_DONE
);
5167 return _dispatch_trystash_to_deferred_items(dq
, dou
, pp
, ddi
);
5170 #if HAVE_PTHREAD_WORKQUEUE_QOS
5171 // can't use dispatch_once_f() as it would create a frame
5172 if (unlikely(_dispatch_root_queues_pred
!= DLOCK_ONCE_DONE
)) {
5173 return _dispatch_queue_push_slow(dq
, dou
, pp
);
5175 if (_dispatch_need_global_root_queue_push_override(dq
, pp
)) {
5176 return _dispatch_root_queue_push_override(dq
, dou
, pp
);
5180 _dispatch_queue_push_inline(dq
, dou
, pp
, 0);
5185 _dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq
, pthread_priority_t pp
,
5186 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
)
5188 dispatch_queue_t tq
;
5190 if (flags
& (DISPATCH_WAKEUP_OVERRIDING
| DISPATCH_WAKEUP_WAS_OVERRIDDEN
)) {
5191 // _dispatch_queue_drain_try_unlock may have reset the override while
5192 // we were becoming the enqueuer
5193 _dispatch_queue_reinstate_override_priority(dq
, (dispatch_priority_t
)pp
);
5195 if (!(flags
& DISPATCH_WAKEUP_CONSUME
)) {
5196 _dispatch_retain(dq
);
5198 if (target
== DISPATCH_QUEUE_WAKEUP_TARGET
) {
5199 // try_become_enqueuer has no acquire barrier, as the last block
5200 // of a queue asyncing to that queue is not an uncommon pattern
5201 // and in that case the acquire is completely useless
5203 // so instead use a thread fence here when we will read the targetq
5204 // pointer because that is the only thing that really requires
5206 os_atomic_thread_fence(acquire
);
5207 tq
= dq
->do_targetq
;
5209 dispatch_assert(target
== DISPATCH_QUEUE_WAKEUP_MGR
);
5210 tq
= &_dispatch_mgr_q
;
5212 return _dispatch_queue_push(tq
, dq
, pp
);
5217 _dispatch_queue_class_wakeup(dispatch_queue_t dq
, pthread_priority_t pp
,
5218 dispatch_wakeup_flags_t flags
, dispatch_queue_wakeup_target_t target
)
5220 uint64_t old_state
, new_state
, bits
= 0;
5222 #if HAVE_PTHREAD_WORKQUEUE_QOS
5223 _dispatch_queue_override_priority(dq
, /* inout */ &pp
, /* inout */ &flags
);
5226 if (flags
& DISPATCH_WAKEUP_FLUSH
) {
5227 bits
= DISPATCH_QUEUE_DIRTY
;
5229 if (flags
& DISPATCH_WAKEUP_OVERRIDING
) {
5231 // Setting the dirty bit here is about forcing callers of
5232 // _dispatch_queue_drain_try_unlock() to loop again when an override
5233 // has just been set to close the following race:
5235 // Drainer (in drain_try_unlokc():
5236 // override_reset();
5240 // atomic_or(oq_override, override, relaxed);
5241 // atomic_or(dq_state, HAS_OVERRIDE, release);
5245 // successful drain_unlock() and leaks `oq_override`
5247 bits
= DISPATCH_QUEUE_DIRTY
| DISPATCH_QUEUE_HAS_OVERRIDE
;
5250 if (flags
& DISPATCH_WAKEUP_SLOW_WAITER
) {
5251 uint64_t pending_barrier_width
=
5252 (dq
->dq_width
- 1) * DISPATCH_QUEUE_WIDTH_INTERVAL
;
5253 uint64_t xor_owner_and_set_full_width_and_in_barrier
=
5254 _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT
|
5255 DISPATCH_QUEUE_IN_BARRIER
;
5257 #ifdef DLOCK_NOWAITERS_BIT
5258 bits
|= DLOCK_NOWAITERS_BIT
;
5260 bits
|= DLOCK_WAITERS_BIT
;
5262 flags
^= DISPATCH_WAKEUP_SLOW_WAITER
;
5263 dispatch_assert(!(flags
& DISPATCH_WAKEUP_CONSUME
));
5265 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
, {
5266 new_state
= old_state
| bits
;
5267 if (_dq_state_drain_pended(old_state
)) {
5268 // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
5269 // but we want to be more efficient wrt the WAITERS_BIT
5270 new_state
&= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK
;
5271 new_state
&= ~DISPATCH_QUEUE_DRAIN_PENDED
;
5273 if (unlikely(_dq_state_drain_locked(new_state
))) {
5274 #ifdef DLOCK_NOWAITERS_BIT
5275 new_state
&= ~(uint64_t)DLOCK_NOWAITERS_BIT
;
5277 } else if (unlikely(!_dq_state_is_runnable(new_state
) ||
5278 !(flags
& DISPATCH_WAKEUP_FLUSH
))) {
5279 // either not runnable, or was not for the first item (26700358)
5280 // so we should not try to lock and handle overrides instead
5281 } else if (_dq_state_has_pending_barrier(old_state
) ||
5282 new_state
+ pending_barrier_width
<
5283 DISPATCH_QUEUE_WIDTH_FULL_BIT
) {
5284 // see _dispatch_queue_drain_try_lock
5285 new_state
&= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK
;
5286 new_state
^= xor_owner_and_set_full_width_and_in_barrier
;
5288 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5291 if ((old_state
^ new_state
) & DISPATCH_QUEUE_IN_BARRIER
) {
5292 return _dispatch_try_lock_transfer_or_wakeup(dq
);
5295 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, release
,{
5296 new_state
= old_state
| bits
;
5297 if (likely(_dq_state_should_wakeup(old_state
))) {
5298 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5302 os_atomic_rmw_loop2o(dq
, dq_state
, old_state
, new_state
, relaxed
,{
5303 new_state
= old_state
;
5304 if (likely(_dq_state_should_wakeup(old_state
))) {
5305 new_state
|= DISPATCH_QUEUE_ENQUEUED
;
5307 os_atomic_rmw_loop_give_up(break);
5312 if ((old_state
^ new_state
) & DISPATCH_QUEUE_ENQUEUED
) {
5313 return _dispatch_queue_class_wakeup_enqueue(dq
, pp
, flags
, target
);
5316 #if HAVE_PTHREAD_WORKQUEUE_QOS
5317 if ((flags
& DISPATCH_WAKEUP_OVERRIDING
)
5318 && target
== DISPATCH_QUEUE_WAKEUP_TARGET
) {
5319 return _dispatch_queue_class_wakeup_with_override(dq
, pp
,
5324 if (flags
& DISPATCH_WAKEUP_CONSUME
) {
5325 return _dispatch_release_tailcall(dq
);
5330 #pragma mark dispatch_root_queue_drain
5334 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq
)
5336 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5337 struct dispatch_object_s
*const mediator
= (void *)~0ul;
5338 bool pending
= false, available
= true;
5339 unsigned int sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
5342 // Spin for a short while in case the contention is temporary -- e.g.
5343 // when starting up after dispatch_apply, or when executing a few
5344 // short continuations in a row.
5345 if (_dispatch_contention_wait_until(dq
->dq_items_head
!= mediator
)) {
5348 // Since we have serious contention, we need to back off.
5350 // Mark this queue as pending to avoid requests for further threads
5351 (void)os_atomic_inc2o(qc
, dgq_pending
, relaxed
);
5354 _dispatch_contention_usleep(sleep_time
);
5355 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
5357 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
5359 // The ratio of work to libdispatch overhead must be bad. This
5360 // scenario implies that there are too many threads in the pool.
5361 // Create a new pending thread and then exit this thread.
5362 // The kernel will grant a new thread when the load subsides.
5363 _dispatch_debug("contention on global queue: %p", dq
);
5367 (void)os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5370 _dispatch_global_queue_poke(dq
);
5375 DISPATCH_ALWAYS_INLINE
5377 _dispatch_root_queue_drain_one2(dispatch_queue_t dq
)
5379 // Wait for queue head and tail to be both non-empty or both empty
5380 bool available
; // <rdar://problem/15917893>
5381 _dispatch_wait_until((dq
->dq_items_head
!= NULL
) ==
5382 (available
= (dq
->dq_items_tail
!= NULL
)));
5386 DISPATCH_ALWAYS_INLINE_NDEBUG
5387 static inline struct dispatch_object_s
*
5388 _dispatch_root_queue_drain_one(dispatch_queue_t dq
)
5390 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
5393 // The mediator value acts both as a "lock" and a signal
5394 head
= os_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
5396 if (slowpath(head
== NULL
)) {
5397 // The first xchg on the tail will tell the enqueueing thread that it
5398 // is safe to blindly write out to the head pointer. A cmpxchg honors
5400 if (slowpath(!os_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
,
5404 if (slowpath(dq
->dq_items_tail
) && // <rdar://problem/14416349>
5405 _dispatch_root_queue_drain_one2(dq
)) {
5408 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
5412 if (slowpath(head
== mediator
)) {
5413 // This thread lost the race for ownership of the queue.
5414 if (fastpath(_dispatch_root_queue_drain_one_slow(dq
))) {
5420 // Restore the head pointer to a sane value before returning.
5421 // If 'next' is NULL, then this item _might_ be the last item.
5422 next
= fastpath(head
->do_next
);
5424 if (slowpath(!next
)) {
5425 os_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
5426 // 22708742: set tail to NULL with release, so that NULL write to head
5427 // above doesn't clobber head from concurrent enqueuer
5428 if (os_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, release
)) {
5429 // both head and tail are NULL now
5432 // There must be a next item now.
5433 _dispatch_wait_until(next
= head
->do_next
);
5436 os_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
5437 _dispatch_global_queue_poke(dq
);
5443 _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq
,
5444 struct dispatch_object_s
*dou
, pthread_priority_t pp
)
5446 struct _dispatch_identity_s di
;
5448 // fake that we queued `dou` on `dq` for introspection purposes
5449 _dispatch_trace_continuation_push(dq
, dou
);
5451 pp
= _dispatch_priority_inherit_from_root_queue(pp
, dq
);
5452 _dispatch_queue_set_current(dq
);
5453 _dispatch_root_queue_identity_assume(&di
, pp
);
5454 #if DISPATCH_COCOA_COMPAT
5455 void *pool
= _dispatch_last_resort_autorelease_pool_push();
5456 #endif // DISPATCH_COCOA_COMPAT
5458 _dispatch_perfmon_start();
5459 _dispatch_continuation_pop_inline(dou
, dq
,
5460 DISPATCH_INVOKE_WORKER_DRAIN
| DISPATCH_INVOKE_REDIRECTING_DRAIN
);
5461 _dispatch_perfmon_workitem_inc();
5462 _dispatch_perfmon_end();
5464 #if DISPATCH_COCOA_COMPAT
5465 _dispatch_last_resort_autorelease_pool_pop(pool
);
5466 #endif // DISPATCH_COCOA_COMPAT
5467 _dispatch_reset_defaultpriority(di
.old_pp
);
5468 _dispatch_queue_set_current(NULL
);
5470 _dispatch_voucher_debug("root queue clear", NULL
);
5471 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5474 DISPATCH_NOT_TAIL_CALLED
// prevent tailcall (for Instrument DTrace probe)
5476 _dispatch_root_queue_drain(dispatch_queue_t dq
, pthread_priority_t pri
)
5479 dispatch_queue_t cq
;
5480 if (slowpath(cq
= _dispatch_queue_get_current())) {
5481 DISPATCH_INTERNAL_CRASH(cq
, "Premature thread recycling");
5484 _dispatch_queue_set_current(dq
);
5485 if (dq
->dq_priority
) pri
= dq
->dq_priority
;
5486 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(pri
, NULL
);
5487 #if DISPATCH_COCOA_COMPAT
5488 void *pool
= _dispatch_last_resort_autorelease_pool_push();
5489 #endif // DISPATCH_COCOA_COMPAT
5491 _dispatch_perfmon_start();
5492 struct dispatch_object_s
*item
;
5494 while ((item
= fastpath(_dispatch_root_queue_drain_one(dq
)))) {
5495 if (reset
) _dispatch_wqthread_override_reset();
5496 _dispatch_continuation_pop_inline(item
, dq
,
5497 DISPATCH_INVOKE_WORKER_DRAIN
|DISPATCH_INVOKE_REDIRECTING_DRAIN
);
5498 _dispatch_perfmon_workitem_inc();
5499 reset
= _dispatch_reset_defaultpriority_override();
5501 _dispatch_perfmon_end();
5503 #if DISPATCH_COCOA_COMPAT
5504 _dispatch_last_resort_autorelease_pool_pop(pool
);
5505 #endif // DISPATCH_COCOA_COMPAT
5506 _dispatch_reset_defaultpriority(old_dp
);
5507 _dispatch_queue_set_current(NULL
);
5511 #pragma mark dispatch_worker_thread
5513 #if HAVE_PTHREAD_WORKQUEUES
5515 _dispatch_worker_thread4(void *context
)
5517 dispatch_queue_t dq
= context
;
5518 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5520 _dispatch_introspection_thread_add();
5521 int pending
= (int)os_atomic_dec2o(qc
, dgq_pending
, relaxed
);
5522 dispatch_assert(pending
>= 0);
5523 _dispatch_root_queue_drain(dq
, _dispatch_get_priority());
5524 _dispatch_voucher_debug("root queue clear", NULL
);
5525 _dispatch_reset_voucher(NULL
, DISPATCH_THREAD_PARK
);
5528 #if HAVE_PTHREAD_WORKQUEUE_QOS
5530 _dispatch_worker_thread3(pthread_priority_t pp
)
5532 bool overcommit
= pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
5533 dispatch_queue_t dq
;
5534 pp
&= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
| ~_PTHREAD_PRIORITY_FLAGS_MASK
;
5535 _dispatch_thread_setspecific(dispatch_priority_key
, (void *)(uintptr_t)pp
);
5536 dq
= _dispatch_get_root_queue_for_priority(pp
, overcommit
);
5537 return _dispatch_worker_thread4(dq
);
5539 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5541 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5542 // 6618342 Contact the team that owns the Instrument DTrace probe before
5543 // renaming this symbol
5545 _dispatch_worker_thread2(int priority
, int options
,
5546 void *context DISPATCH_UNUSED
)
5548 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
5549 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
5550 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
5552 return _dispatch_worker_thread4(dq
);
5554 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5555 #endif // HAVE_PTHREAD_WORKQUEUES
5557 #if DISPATCH_USE_PTHREAD_POOL
5558 // 6618342 Contact the team that owns the Instrument DTrace probe before
5559 // renaming this symbol
5561 _dispatch_worker_thread(void *context
)
5563 dispatch_queue_t dq
= context
;
5564 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
5565 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
5567 if (pqc
->dpq_observer_hooks
.queue_will_execute
) {
5568 _dispatch_set_pthread_root_queue_observer_hooks(
5569 &pqc
->dpq_observer_hooks
);
5571 if (pqc
->dpq_thread_configure
) {
5572 pqc
->dpq_thread_configure();
5577 // workaround tweaks the kernel workqueue does for us
5578 r
= sigfillset(&mask
);
5579 (void)dispatch_assume_zero(r
);
5580 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
5581 (void)dispatch_assume_zero(r
);
5582 _dispatch_introspection_thread_add();
5584 const int64_t timeout
= 5ull * NSEC_PER_SEC
;
5585 pthread_priority_t old_pri
= _dispatch_get_priority();
5587 _dispatch_root_queue_drain(dq
, old_pri
);
5588 _dispatch_reset_priority_and_voucher(old_pri
, NULL
);
5589 } while (dispatch_semaphore_wait(&pqc
->dpq_thread_mediator
,
5590 dispatch_time(0, timeout
)) == 0);
5592 (void)os_atomic_inc2o(qc
, dgq_thread_pool_size
, release
);
5593 _dispatch_global_queue_poke(dq
);
5594 _dispatch_release(dq
);
5600 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
5604 /* Workaround: 6269619 Not all signals can be delivered on any thread */
5606 r
= sigdelset(set
, SIGILL
);
5607 (void)dispatch_assume_zero(r
);
5608 r
= sigdelset(set
, SIGTRAP
);
5609 (void)dispatch_assume_zero(r
);
5610 #if HAVE_DECL_SIGEMT
5611 r
= sigdelset(set
, SIGEMT
);
5612 (void)dispatch_assume_zero(r
);
5614 r
= sigdelset(set
, SIGFPE
);
5615 (void)dispatch_assume_zero(r
);
5616 r
= sigdelset(set
, SIGBUS
);
5617 (void)dispatch_assume_zero(r
);
5618 r
= sigdelset(set
, SIGSEGV
);
5619 (void)dispatch_assume_zero(r
);
5620 r
= sigdelset(set
, SIGSYS
);
5621 (void)dispatch_assume_zero(r
);
5622 r
= sigdelset(set
, SIGPIPE
);
5623 (void)dispatch_assume_zero(r
);
5625 return pthread_sigmask(how
, set
, oset
);
5627 #endif // DISPATCH_USE_PTHREAD_POOL
5630 #pragma mark dispatch_runloop_queue
5632 static bool _dispatch_program_is_probably_callback_driven
;
5634 #if DISPATCH_COCOA_COMPAT
5637 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
5639 dispatch_queue_t dq
;
5642 if (slowpath(flags
)) {
5643 return DISPATCH_BAD_INPUT
;
5645 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
5646 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
5647 _dispatch_queue_init(dq
, DQF_THREAD_BOUND
| DQF_CANNOT_TRYSYNC
, 1, false);
5648 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,true);
5649 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
5650 _dispatch_runloop_queue_handle_init(dq
);
5651 _dispatch_queue_set_bound_thread(dq
);
5652 _dispatch_object_debug(dq
, "%s", __func__
);
5653 return _dispatch_introspection_queue_create(dq
);
5657 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
5659 _dispatch_object_debug(dq
, "%s", __func__
);
5661 pthread_priority_t pp
= _dispatch_queue_reset_override_priority(dq
, true);
5662 _dispatch_queue_clear_bound_thread(dq
);
5663 dx_wakeup(dq
, pp
, DISPATCH_WAKEUP_FLUSH
);
5664 if (pp
) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq
), dq
);
5668 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
)
5670 _dispatch_object_debug(dq
, "%s", __func__
);
5671 _dispatch_introspection_queue_dispose(dq
);
5672 _dispatch_runloop_queue_handle_dispose(dq
);
5673 _dispatch_queue_destroy(dq
);
5677 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
5679 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5680 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5682 dispatch_retain(dq
);
5683 bool r
= _dispatch_runloop_queue_drain_one(dq
);
5684 dispatch_release(dq
);
5689 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
5691 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5692 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5694 _dispatch_runloop_queue_wakeup(dq
, 0, false);
5697 dispatch_runloop_handle_t
5698 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
5700 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
5701 DISPATCH_CLIENT_CRASH(dq
->do_vtable
, "Not a runloop queue");
5703 return _dispatch_runloop_queue_get_handle(dq
);
5707 _dispatch_runloop_queue_handle_init(void *ctxt
)
5709 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
5710 dispatch_runloop_handle_t handle
;
5712 _dispatch_fork_becomes_unsafe();
5717 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
5718 DISPATCH_VERIFY_MIG(kr
);
5719 (void)dispatch_assume_zero(kr
);
5720 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
5721 MACH_MSG_TYPE_MAKE_SEND
);
5722 DISPATCH_VERIFY_MIG(kr
);
5723 (void)dispatch_assume_zero(kr
);
5724 if (dq
!= &_dispatch_main_q
) {
5725 struct mach_port_limits limits
= {
5728 kr
= mach_port_set_attributes(mach_task_self(), mp
,
5729 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
5731 DISPATCH_VERIFY_MIG(kr
);
5732 (void)dispatch_assume_zero(kr
);
5735 #elif defined(__linux__)
5736 int fd
= eventfd(0, EFD_CLOEXEC
| EFD_NONBLOCK
);
5741 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5742 "process is out of file descriptors");
5745 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5746 "system is out of file descriptors");
5749 DISPATCH_CLIENT_CRASH(err
, "eventfd() failure: "
5750 "kernel is out of memory");
5753 DISPATCH_INTERNAL_CRASH(err
, "eventfd() failure");
5759 #error "runloop support not implemented on this platform"
5761 _dispatch_runloop_queue_set_handle(dq
, handle
);
5763 _dispatch_program_is_probably_callback_driven
= true;
5767 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq
)
5769 dispatch_runloop_handle_t handle
= _dispatch_runloop_queue_get_handle(dq
);
5770 if (!_dispatch_runloop_handle_is_valid(handle
)) {
5775 mach_port_t mp
= handle
;
5776 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
5777 DISPATCH_VERIFY_MIG(kr
);
5778 (void)dispatch_assume_zero(kr
);
5779 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
5780 DISPATCH_VERIFY_MIG(kr
);
5781 (void)dispatch_assume_zero(kr
);
5782 #elif defined(__linux__)
5783 int rc
= close(handle
);
5784 (void)dispatch_assume_zero(rc
);
5786 #error "runloop support not implemented on this platform"
5791 #pragma mark dispatch_main_queue
5793 dispatch_runloop_handle_t
5794 _dispatch_get_main_queue_handle_4CF(void)
5796 dispatch_queue_t dq
= &_dispatch_main_q
;
5797 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
5798 _dispatch_runloop_queue_handle_init
);
5799 return _dispatch_runloop_queue_get_handle(dq
);
5803 dispatch_runloop_handle_t
5804 _dispatch_get_main_queue_port_4CF(void)
5806 return _dispatch_get_main_queue_handle_4CF();
5810 static bool main_q_is_draining
;
5812 // 6618342 Contact the team that owns the Instrument DTrace probe before
5813 // renaming this symbol
5816 _dispatch_queue_set_mainq_drain_state(bool arg
)
5818 main_q_is_draining
= arg
;
5822 _dispatch_main_queue_callback_4CF(
5824 mach_msg_header_t
*_Null_unspecified msg
5830 if (main_q_is_draining
) {
5833 _dispatch_queue_set_mainq_drain_state(true);
5834 _dispatch_main_queue_drain();
5835 _dispatch_queue_set_mainq_drain_state(false);
5843 #if HAVE_PTHREAD_MAIN_NP
5844 if (pthread_main_np()) {
5846 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
5847 _dispatch_program_is_probably_callback_driven
= true;
5848 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE
);
5850 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
5851 // To avoid that, just before calling pthread_exit we register a TSD destructor
5852 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
5853 // This relies on an implementation detail (currently true in glibc) that TSD destructors
5854 // will be called in the order of creation to cause all the TSD cleanup functions to
5855 // run before the thread becomes trapped in sigsuspend.
5856 pthread_key_t dispatch_main_key
;
5857 pthread_key_create(&dispatch_main_key
, _dispatch_sig_thread
);
5858 pthread_setspecific(dispatch_main_key
, &dispatch_main_key
);
5861 DISPATCH_INTERNAL_CRASH(errno
, "pthread_exit() returned");
5862 #if HAVE_PTHREAD_MAIN_NP
5864 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
5868 DISPATCH_NOINLINE DISPATCH_NORETURN
5870 _dispatch_sigsuspend(void)
5872 static const sigset_t mask
;
5881 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
5883 // never returns, so burn bridges behind us
5884 _dispatch_clear_stack(0);
5885 _dispatch_sigsuspend();
5890 _dispatch_queue_cleanup2(void)
5892 dispatch_queue_t dq
= &_dispatch_main_q
;
5893 _dispatch_queue_clear_bound_thread(dq
);
5895 // <rdar://problem/22623242>
5896 // Here is what happens when both this cleanup happens because of
5897 // dispatch_main() being called, and a concurrent enqueuer makes the queue
5900 // _dispatch_queue_cleanup2:
5901 // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
5902 // maximal_barrier();
5903 // if (load(dq_items_tail, seq_cst)) {
5904 // // do the wake up the normal serial queue way
5906 // // do no wake up <----
5910 // store(dq_items_tail, new_tail, release);
5911 // if (load(dq_is_thread_bound, relaxed)) {
5912 // // do the wake up the runloop way <----
5914 // // do the wake up the normal serial way
5917 // what would be bad is to take both paths marked <---- because the queue
5918 // wouldn't be woken up until the next time it's used (which may never
5921 // An enqueuer that speculates the load of the old value of thread_bound
5922 // and then does the store may wake up the main queue the runloop way.
5923 // But then, the cleanup thread will see that store because the load
5924 // of dq_items_tail is sequentially consistent, and we have just thrown away
5927 // By the time cleanup2() is out of the maximally synchronizing barrier,
5928 // no other thread can speculate the wrong load anymore, and both cleanup2()
5929 // and a concurrent enqueuer would treat the queue in the standard non
5932 _dispatch_queue_atomic_flags_clear(dq
,
5933 DQF_THREAD_BOUND
| DQF_CANNOT_TRYSYNC
);
5934 os_atomic_maximally_synchronizing_barrier();
5935 // no need to drop the override, the thread will die anyway
5936 // the barrier above includes an acquire, so it's ok to do this raw
5937 // call to dx_wakeup(0)
5938 dx_wakeup(dq
, 0, 0);
5940 // overload the "probably" variable to mean that dispatch_main() or
5941 // similar non-POSIX API was called
5942 // this has to run before the DISPATCH_COCOA_COMPAT below
5943 // See dispatch_main for call to _dispatch_sig_thread on linux.
5945 if (_dispatch_program_is_probably_callback_driven
) {
5946 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
5947 _DISPATCH_QOS_CLASS_DEFAULT
, true), NULL
, _dispatch_sig_thread
);
5948 sleep(1); // workaround 6778970
5952 #if DISPATCH_COCOA_COMPAT
5953 dispatch_once_f(&_dispatch_main_q_handle_pred
, dq
,
5954 _dispatch_runloop_queue_handle_init
);
5955 _dispatch_runloop_queue_handle_dispose(dq
);
5960 _dispatch_queue_cleanup(void *ctxt
)
5962 if (ctxt
== &_dispatch_main_q
) {
5963 return _dispatch_queue_cleanup2();
5965 // POSIX defines that destructors are only called if 'ctxt' is non-null
5966 DISPATCH_INTERNAL_CRASH(ctxt
,
5967 "Premature thread exit while a dispatch queue is running");
5971 _dispatch_deferred_items_cleanup(void *ctxt
)
5973 // POSIX defines that destructors are only called if 'ctxt' is non-null
5974 DISPATCH_INTERNAL_CRASH(ctxt
,
5975 "Premature thread exit with unhandled deferred items");
5979 _dispatch_frame_cleanup(void *ctxt
)
5981 // POSIX defines that destructors are only called if 'ctxt' is non-null
5982 DISPATCH_INTERNAL_CRASH(ctxt
,
5983 "Premature thread exit while a dispatch frame is active");
5987 _dispatch_context_cleanup(void *ctxt
)
5989 // POSIX defines that destructors are only called if 'ctxt' is non-null
5990 DISPATCH_INTERNAL_CRASH(ctxt
,
5991 "Premature thread exit while a dispatch context is set");