2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
51 static void _dispatch_cache_cleanup(void *value
);
52 static void _dispatch_async_f_redirect(dispatch_queue_t dq
,
53 dispatch_continuation_t dc
, pthread_priority_t pp
);
54 static void _dispatch_queue_cleanup(void *ctxt
);
55 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq
,
57 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
58 static inline _dispatch_thread_semaphore_t
59 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
);
60 static inline bool _dispatch_queue_prepare_override(dispatch_queue_t dq
,
61 dispatch_queue_t tq
, pthread_priority_t p
);
62 static inline void _dispatch_queue_push_override(dispatch_queue_t dq
,
63 dispatch_queue_t tq
, pthread_priority_t p
);
64 #if HAVE_PTHREAD_WORKQUEUES
65 static void _dispatch_worker_thread4(void *context
);
66 #if HAVE_PTHREAD_WORKQUEUE_QOS
67 static void _dispatch_worker_thread3(pthread_priority_t priority
);
69 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
70 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
73 #if DISPATCH_USE_PTHREAD_POOL
74 static void *_dispatch_worker_thread(void *context
);
75 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
78 #if DISPATCH_COCOA_COMPAT
79 static dispatch_once_t _dispatch_main_q_port_pred
;
80 static dispatch_queue_t
_dispatch_main_queue_wakeup(void);
81 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
);
82 static void _dispatch_runloop_queue_port_init(void *ctxt
);
83 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
);
86 static void _dispatch_root_queues_init(void *context
);
87 static dispatch_once_t _dispatch_root_queues_pred
;
90 #pragma mark dispatch_root_queue
92 struct dispatch_pthread_root_queue_context_s
{
93 pthread_attr_t dpq_thread_attr
;
94 dispatch_block_t dpq_thread_configure
;
95 struct dispatch_semaphore_s dpq_thread_mediator
;
97 typedef struct dispatch_pthread_root_queue_context_s
*
98 dispatch_pthread_root_queue_context_t
;
100 #if DISPATCH_ENABLE_THREAD_POOL
101 static struct dispatch_pthread_root_queue_context_s
102 _dispatch_pthread_root_queue_contexts
[] = {
103 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {
104 .dpq_thread_mediator
= {
105 .do_vtable
= DISPATCH_VTABLE(semaphore
),
106 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
107 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
109 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {
110 .dpq_thread_mediator
= {
111 .do_vtable
= DISPATCH_VTABLE(semaphore
),
112 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
113 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
115 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {
116 .dpq_thread_mediator
= {
117 .do_vtable
= DISPATCH_VTABLE(semaphore
),
118 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
119 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
121 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {
122 .dpq_thread_mediator
= {
123 .do_vtable
= DISPATCH_VTABLE(semaphore
),
124 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
125 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
127 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {
128 .dpq_thread_mediator
= {
129 .do_vtable
= DISPATCH_VTABLE(semaphore
),
130 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
131 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
133 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {
134 .dpq_thread_mediator
= {
135 .do_vtable
= DISPATCH_VTABLE(semaphore
),
136 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
137 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
139 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {
140 .dpq_thread_mediator
= {
141 .do_vtable
= DISPATCH_VTABLE(semaphore
),
142 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
143 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
145 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {
146 .dpq_thread_mediator
= {
147 .do_vtable
= DISPATCH_VTABLE(semaphore
),
148 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
149 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
151 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {
152 .dpq_thread_mediator
= {
153 .do_vtable
= DISPATCH_VTABLE(semaphore
),
154 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
155 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
157 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {
158 .dpq_thread_mediator
= {
159 .do_vtable
= DISPATCH_VTABLE(semaphore
),
160 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
161 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
163 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {
164 .dpq_thread_mediator
= {
165 .do_vtable
= DISPATCH_VTABLE(semaphore
),
166 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
167 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
169 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {
170 .dpq_thread_mediator
= {
171 .do_vtable
= DISPATCH_VTABLE(semaphore
),
172 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
173 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
178 #define MAX_PTHREAD_COUNT 255
180 struct dispatch_root_queue_context_s
{
183 unsigned int volatile dgq_pending
;
184 #if HAVE_PTHREAD_WORKQUEUES
186 int dgq_wq_priority
, dgq_wq_options
;
187 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
188 pthread_workqueue_t dgq_kworkqueue
;
190 #endif // HAVE_PTHREAD_WORKQUEUES
191 #if DISPATCH_USE_PTHREAD_POOL
193 uint32_t volatile dgq_thread_pool_size
;
196 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
199 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
201 DISPATCH_CACHELINE_ALIGN
202 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
203 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
206 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
209 #if DISPATCH_ENABLE_THREAD_POOL
210 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
211 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
],
214 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {{{
215 #if HAVE_PTHREAD_WORKQUEUES
216 .dgq_qos
= _DISPATCH_QOS_CLASS_MAINTENANCE
,
217 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
218 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
220 #if DISPATCH_ENABLE_THREAD_POOL
221 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
222 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
],
225 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {{{
226 #if HAVE_PTHREAD_WORKQUEUES
227 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
228 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
231 #if DISPATCH_ENABLE_THREAD_POOL
232 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
233 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
236 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {{{
237 #if HAVE_PTHREAD_WORKQUEUES
238 .dgq_qos
= _DISPATCH_QOS_CLASS_BACKGROUND
,
239 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
240 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
242 #if DISPATCH_ENABLE_THREAD_POOL
243 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
244 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
247 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {{{
248 #if HAVE_PTHREAD_WORKQUEUES
249 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
250 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
253 #if DISPATCH_ENABLE_THREAD_POOL
254 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
255 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
258 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {{{
259 #if HAVE_PTHREAD_WORKQUEUES
260 .dgq_qos
= _DISPATCH_QOS_CLASS_UTILITY
,
261 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
262 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
264 #if DISPATCH_ENABLE_THREAD_POOL
265 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
266 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
269 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {{{
270 #if HAVE_PTHREAD_WORKQUEUES
271 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
272 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
275 #if DISPATCH_ENABLE_THREAD_POOL
276 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
277 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
280 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {{{
281 #if HAVE_PTHREAD_WORKQUEUES
282 .dgq_qos
= _DISPATCH_QOS_CLASS_DEFAULT
,
283 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
284 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
286 #if DISPATCH_ENABLE_THREAD_POOL
287 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
288 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
291 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {{{
292 #if HAVE_PTHREAD_WORKQUEUES
293 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
294 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
297 #if DISPATCH_ENABLE_THREAD_POOL
298 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
299 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
302 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {{{
303 #if HAVE_PTHREAD_WORKQUEUES
304 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
,
305 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
306 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
308 #if DISPATCH_ENABLE_THREAD_POOL
309 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
310 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
313 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {{{
314 #if HAVE_PTHREAD_WORKQUEUES
315 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
316 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
319 #if DISPATCH_ENABLE_THREAD_POOL
320 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
321 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
],
324 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {{{
325 #if HAVE_PTHREAD_WORKQUEUES
326 .dgq_qos
= _DISPATCH_QOS_CLASS_USER_INTERACTIVE
,
327 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
328 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
330 #if DISPATCH_ENABLE_THREAD_POOL
331 .dgq_ctxt
= &_dispatch_pthread_root_queue_contexts
[
332 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
],
337 // 6618342 Contact the team that owns the Instrument DTrace probe before
338 // renaming this symbol
339 // dq_running is set to 2 so that barrier operations go through the slow path
340 DISPATCH_CACHELINE_ALIGN
341 struct dispatch_queue_s _dispatch_root_queues
[] = {
342 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
] = {
343 .do_vtable
= DISPATCH_VTABLE(queue_root
),
344 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
345 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
346 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
347 .do_ctxt
= &_dispatch_root_queue_contexts
[
348 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
],
349 .dq_label
= "com.apple.root.maintenance-qos",
351 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
354 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
] = {
355 .do_vtable
= DISPATCH_VTABLE(queue_root
),
356 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
357 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
358 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
359 .do_ctxt
= &_dispatch_root_queue_contexts
[
360 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT
],
361 .dq_label
= "com.apple.root.maintenance-qos.overcommit",
363 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
366 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
] = {
367 .do_vtable
= DISPATCH_VTABLE(queue_root
),
368 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
369 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
370 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
371 .do_ctxt
= &_dispatch_root_queue_contexts
[
372 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
373 .dq_label
= "com.apple.root.background-qos",
375 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
378 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
] = {
379 .do_vtable
= DISPATCH_VTABLE(queue_root
),
380 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
381 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
382 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
383 .do_ctxt
= &_dispatch_root_queue_contexts
[
384 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
385 .dq_label
= "com.apple.root.background-qos.overcommit",
387 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
390 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
] = {
391 .do_vtable
= DISPATCH_VTABLE(queue_root
),
392 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
393 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
394 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
395 .do_ctxt
= &_dispatch_root_queue_contexts
[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
397 .dq_label
= "com.apple.root.utility-qos",
399 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
402 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
] = {
403 .do_vtable
= DISPATCH_VTABLE(queue_root
),
404 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
405 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
406 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
407 .do_ctxt
= &_dispatch_root_queue_contexts
[
408 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
409 .dq_label
= "com.apple.root.utility-qos.overcommit",
411 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
414 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
] = {
415 .do_vtable
= DISPATCH_VTABLE(queue_root
),
416 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
417 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
418 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
419 .do_ctxt
= &_dispatch_root_queue_contexts
[
420 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
421 .dq_label
= "com.apple.root.default-qos",
423 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
426 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
] = {
427 .do_vtable
= DISPATCH_VTABLE(queue_root
),
428 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
429 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
430 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
431 .do_ctxt
= &_dispatch_root_queue_contexts
[
432 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
433 .dq_label
= "com.apple.root.default-qos.overcommit",
435 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
438 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
] = {
439 .do_vtable
= DISPATCH_VTABLE(queue_root
),
440 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
441 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
442 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
443 .do_ctxt
= &_dispatch_root_queue_contexts
[
444 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
445 .dq_label
= "com.apple.root.user-initiated-qos",
447 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
450 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
] = {
451 .do_vtable
= DISPATCH_VTABLE(queue_root
),
452 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
453 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
454 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
455 .do_ctxt
= &_dispatch_root_queue_contexts
[
456 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
457 .dq_label
= "com.apple.root.user-initiated-qos.overcommit",
459 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
462 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
] = {
463 .do_vtable
= DISPATCH_VTABLE(queue_root
),
464 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
465 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
466 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
467 .do_ctxt
= &_dispatch_root_queue_contexts
[
468 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
],
469 .dq_label
= "com.apple.root.user-interactive-qos",
471 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
474 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
] = {
475 .do_vtable
= DISPATCH_VTABLE(queue_root
),
476 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
477 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
478 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
479 .do_ctxt
= &_dispatch_root_queue_contexts
[
480 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT
],
481 .dq_label
= "com.apple.root.user-interactive-qos.overcommit",
483 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
488 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
489 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
490 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
491 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
],
492 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
493 &_dispatch_root_queues
[
494 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT
],
495 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
496 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS
],
497 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
498 &_dispatch_root_queues
[
499 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT
],
500 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
501 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
],
502 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
503 &_dispatch_root_queues
[
504 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
],
505 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
506 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
],
507 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
508 &_dispatch_root_queues
[
509 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT
],
511 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
513 #define DISPATCH_PRIORITY_COUNT 5
516 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
517 // maintenance priority
518 DISPATCH_PRIORITY_IDX_BACKGROUND
= 0,
519 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
,
520 DISPATCH_PRIORITY_IDX_LOW
,
521 DISPATCH_PRIORITY_IDX_DEFAULT
,
522 DISPATCH_PRIORITY_IDX_HIGH
,
525 static qos_class_t _dispatch_priority2qos
[] = {
526 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = _DISPATCH_QOS_CLASS_BACKGROUND
,
527 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = _DISPATCH_QOS_CLASS_UTILITY
,
528 [DISPATCH_PRIORITY_IDX_LOW
] = _DISPATCH_QOS_CLASS_UTILITY
,
529 [DISPATCH_PRIORITY_IDX_DEFAULT
] = _DISPATCH_QOS_CLASS_DEFAULT
,
530 [DISPATCH_PRIORITY_IDX_HIGH
] = _DISPATCH_QOS_CLASS_USER_INITIATED
,
533 #if HAVE_PTHREAD_WORKQUEUE_QOS
534 static const int _dispatch_priority2wq
[] = {
535 [DISPATCH_PRIORITY_IDX_BACKGROUND
] = WORKQ_BG_PRIOQUEUE
,
536 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
] = WORKQ_NON_INTERACTIVE_PRIOQUEUE
,
537 [DISPATCH_PRIORITY_IDX_LOW
] = WORKQ_LOW_PRIOQUEUE
,
538 [DISPATCH_PRIORITY_IDX_DEFAULT
] = WORKQ_DEFAULT_PRIOQUEUE
,
539 [DISPATCH_PRIORITY_IDX_HIGH
] = WORKQ_HIGH_PRIOQUEUE
,
543 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
544 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
546 #define _dispatch_mgr_root_queue \
547 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
550 // 6618342 Contact the team that owns the Instrument DTrace probe before
551 // renaming this symbol
552 DISPATCH_CACHELINE_ALIGN
553 struct dispatch_queue_s _dispatch_mgr_q
= {
554 .do_vtable
= DISPATCH_VTABLE(queue_mgr
),
555 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
556 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
557 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
558 .do_targetq
= &_dispatch_mgr_root_queue
,
559 .dq_label
= "com.apple.libdispatch-manager",
561 .dq_is_thread_bound
= 1,
566 dispatch_get_global_queue(long priority
, unsigned long flags
)
568 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
571 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
572 _dispatch_root_queues_init
);
575 #if !RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
576 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
577 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
]
579 // map maintenance to background on old kernel
580 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
582 qos
= (qos_class_t
)priority
;
585 #endif // RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
586 case DISPATCH_QUEUE_PRIORITY_BACKGROUND
:
587 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_BACKGROUND
];
589 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE
:
590 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE
];
592 case DISPATCH_QUEUE_PRIORITY_LOW
:
593 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_LOW
];
595 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
596 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_DEFAULT
];
598 case DISPATCH_QUEUE_PRIORITY_HIGH
:
599 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
601 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
602 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
603 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
]
605 qos
= _dispatch_priority2qos
[DISPATCH_PRIORITY_IDX_HIGH
];
611 qos
= (qos_class_t
)priority
;
614 return _dispatch_get_root_queue(qos
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
617 DISPATCH_ALWAYS_INLINE
618 static inline dispatch_queue_t
619 _dispatch_get_current_queue(void)
621 return _dispatch_queue_get_current() ?:
622 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, true);
626 dispatch_get_current_queue(void)
628 return _dispatch_get_current_queue();
631 DISPATCH_ALWAYS_INLINE
633 _dispatch_queue_targets_queue(dispatch_queue_t dq1
, dispatch_queue_t dq2
)
639 dq1
= dq1
->do_targetq
;
644 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
645 "Assertion failed: Block was run on an unexpected queue"
649 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
652 asprintf(&msg
, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE
,
653 expected
? "Expected" : "Unexpected", dq
, dq
->dq_label
?
655 _dispatch_log("%s", msg
);
656 _dispatch_set_crash_log_message(msg
);
657 _dispatch_hardware_crash();
662 dispatch_assert_queue(dispatch_queue_t dq
)
664 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
665 DISPATCH_CLIENT_CRASH("invalid queue passed to "
666 "dispatch_assert_queue()");
668 dispatch_queue_t cq
= _dispatch_queue_get_current();
669 if (fastpath(cq
) && fastpath(_dispatch_queue_targets_queue(cq
, dq
))) {
672 _dispatch_assert_queue_fail(dq
, true);
676 dispatch_assert_queue_not(dispatch_queue_t dq
)
678 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
679 DISPATCH_CLIENT_CRASH("invalid queue passed to "
680 "dispatch_assert_queue_not()");
682 dispatch_queue_t cq
= _dispatch_queue_get_current();
683 if (slowpath(cq
) && slowpath(_dispatch_queue_targets_queue(cq
, dq
))) {
684 _dispatch_assert_queue_fail(dq
, false);
688 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
689 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
690 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
692 #define _dispatch_root_queue_debug(...)
693 #define _dispatch_debug_root_queue(...)
697 #pragma mark dispatch_init
699 #if HAVE_PTHREAD_WORKQUEUE_QOS
700 int _dispatch_set_qos_class_enabled
;
701 pthread_priority_t _dispatch_background_priority
;
702 pthread_priority_t _dispatch_user_initiated_priority
;
705 _dispatch_root_queues_init_qos(int supported
)
707 pthread_priority_t p
;
710 for (i
= 0; i
< DISPATCH_PRIORITY_COUNT
; i
++) {
711 p
= _pthread_qos_class_encode_workqueue(_dispatch_priority2wq
[i
], 0);
712 qos
= _pthread_qos_class_decode(p
, NULL
, NULL
);
713 dispatch_assert(qos
!= _DISPATCH_QOS_CLASS_UNSPECIFIED
);
714 _dispatch_priority2qos
[i
] = qos
;
716 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
717 qos
= _dispatch_root_queue_contexts
[i
].dgq_qos
;
718 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
719 !(supported
& WORKQ_FEATURE_MAINTENANCE
)) {
722 unsigned long flags
= i
& 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
: 0;
723 flags
|= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG
;
724 if (i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
||
725 i
== DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
) {
726 flags
|= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG
;
728 p
= _pthread_qos_class_encode(qos
, 0, flags
);
729 _dispatch_root_queues
[i
].dq_priority
= p
;
731 p
= _pthread_qos_class_encode(qos_class_main(), 0, 0);
732 _dispatch_main_q
.dq_priority
= p
;
733 _dispatch_queue_set_override_priority(&_dispatch_main_q
);
734 _dispatch_background_priority
= _dispatch_root_queues
[
735 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS
].dq_priority
&
736 ~_PTHREAD_PRIORITY_FLAGS_MASK
;
737 _dispatch_user_initiated_priority
= _dispatch_root_queues
[
738 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS
].dq_priority
&
739 ~_PTHREAD_PRIORITY_FLAGS_MASK
;
740 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
741 _dispatch_set_qos_class_enabled
= 1;
747 _dispatch_root_queues_init_workq(void)
750 #if HAVE_PTHREAD_WORKQUEUES
751 bool disable_wq
= false;
752 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
753 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
756 #if HAVE_PTHREAD_WORKQUEUE_QOS
757 bool disable_qos
= false;
759 disable_qos
= slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
761 if (!disable_qos
&& !disable_wq
) {
762 r
= _pthread_workqueue_supported();
764 if (r
& WORKQ_FEATURE_FINEPRIO
) {
765 r
= _pthread_workqueue_init(_dispatch_worker_thread3
,
766 offsetof(struct dispatch_queue_s
, dq_serialnum
), 0);
768 if (result
) _dispatch_root_queues_init_qos(supported
);
771 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
772 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
773 if (!result
&& !disable_wq
) {
774 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
775 pthread_workqueue_setdispatchoffset_np(
776 offsetof(struct dispatch_queue_s
, dq_serialnum
));
778 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
779 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
780 (void)dispatch_assume_zero(r
);
784 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
785 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
787 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
788 pthread_workqueue_attr_t pwq_attr
;
790 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
791 (void)dispatch_assume_zero(r
);
795 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
796 pthread_workqueue_t pwq
= NULL
;
797 dispatch_root_queue_context_t qc
;
798 qc
= &_dispatch_root_queue_contexts
[i
];
799 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
801 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
802 qc
->dgq_wq_priority
);
803 (void)dispatch_assume_zero(r
);
804 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
806 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
807 (void)dispatch_assume_zero(r
);
808 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
809 (void)dispatch_assume_zero(r
);
810 result
= result
|| dispatch_assume(pwq
);
812 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
813 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
815 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
817 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
818 (void)dispatch_assume_zero(r
);
822 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
823 #endif // HAVE_PTHREAD_WORKQUEUES
827 #if DISPATCH_USE_PTHREAD_POOL
829 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
830 uint8_t pool_size
, bool overcommit
)
832 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
833 uint32_t thread_pool_size
= overcommit
? MAX_PTHREAD_COUNT
:
834 dispatch_hw_config(active_cpus
);
835 if (slowpath(pool_size
) && pool_size
< thread_pool_size
) {
836 thread_pool_size
= pool_size
;
838 qc
->dgq_thread_pool_size
= thread_pool_size
;
840 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
841 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
842 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
843 #if HAVE_PTHREAD_WORKQUEUE_QOS
844 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
845 &pqc
->dpq_thread_attr
, qc
->dgq_qos
, 0));
849 // override the default FIFO behavior for the pool semaphores
850 kern_return_t kr
= semaphore_create(mach_task_self(),
851 &pqc
->dpq_thread_mediator
.dsema_port
, SYNC_POLICY_LIFO
, 0);
852 DISPATCH_VERIFY_MIG(kr
);
853 (void)dispatch_assume_zero(kr
);
854 (void)dispatch_assume(pqc
->dpq_thread_mediator
.dsema_port
);
856 /* XXXRW: POSIX semaphores don't support LIFO? */
857 int ret
= sem_init(&pqc
->dpq_thread_mediator
.dsema_sem
), 0, 0);
858 (void)dispatch_assume_zero(ret
);
861 #endif // DISPATCH_USE_PTHREAD_POOL
863 static dispatch_once_t _dispatch_root_queues_pred
;
866 _dispatch_root_queues_init(void *context DISPATCH_UNUSED
)
868 _dispatch_safe_fork
= false;
869 if (!_dispatch_root_queues_init_workq()) {
870 #if DISPATCH_ENABLE_THREAD_POOL
872 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
873 bool overcommit
= true;
874 #if TARGET_OS_EMBEDDED
875 // some software hangs if the non-overcommitting queues do not
876 // overcommit when threads block. Someday, this behavior should
877 // apply to all platforms
882 _dispatch_root_queue_init_pthread_pool(
883 &_dispatch_root_queue_contexts
[i
], 0, overcommit
);
886 DISPATCH_CRASH("Root queue initialization failed");
887 #endif // DISPATCH_ENABLE_THREAD_POOL
891 #define countof(x) (sizeof(x) / sizeof(x[0]))
893 DISPATCH_EXPORT DISPATCH_NOTHROW
895 libdispatch_init(void)
897 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT
== 6);
898 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 12);
900 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
901 -DISPATCH_QUEUE_PRIORITY_HIGH
);
902 dispatch_assert(countof(_dispatch_root_queues
) ==
903 DISPATCH_ROOT_QUEUE_COUNT
);
904 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
905 DISPATCH_ROOT_QUEUE_COUNT
);
906 dispatch_assert(countof(_dispatch_priority2qos
) ==
907 DISPATCH_PRIORITY_COUNT
);
908 #if HAVE_PTHREAD_WORKQUEUE_QOS
909 dispatch_assert(countof(_dispatch_priority2wq
) ==
910 DISPATCH_PRIORITY_COUNT
);
912 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
913 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
914 sizeof(_dispatch_wq2root_queues
[0][0]) ==
915 WORKQ_NUM_PRIOQUEUE
* 2);
917 #if DISPATCH_ENABLE_THREAD_POOL
918 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts
) ==
919 DISPATCH_ROOT_QUEUE_COUNT
);
922 dispatch_assert(offsetof(struct dispatch_continuation_s
, do_next
) ==
923 offsetof(struct dispatch_object_s
, do_next
));
924 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
925 DISPATCH_CONTINUATION_SIZE
);
926 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
928 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
929 DISPATCH_CACHELINE_SIZE
== 0);
931 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
932 _dispatch_thread_key_create(&dispatch_voucher_key
, _voucher_thread_cleanup
);
933 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
934 _dispatch_thread_key_create(&dispatch_io_key
, NULL
);
935 _dispatch_thread_key_create(&dispatch_apply_key
, NULL
);
936 _dispatch_thread_key_create(&dispatch_defaultpriority_key
, NULL
);
937 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
938 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
940 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
941 _dispatch_thread_key_create(&dispatch_sema4_key
,
942 (void (*)(void *))_dispatch_thread_semaphore_dispose
);
945 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
946 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
947 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT
];
950 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
951 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
953 #if DISPATCH_USE_PTHREAD_ATFORK
954 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
955 dispatch_atfork_parent
, dispatch_atfork_child
));
958 _dispatch_hw_config_init();
959 _dispatch_vtable_init();
962 _dispatch_introspection_init();
966 static dispatch_once_t _dispatch_mach_host_port_pred
;
967 static mach_port_t _dispatch_mach_host_port
;
970 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED
)
973 mach_port_t mp
, mhp
= mach_host_self();
974 kr
= host_get_host_port(mhp
, &mp
);
975 DISPATCH_VERIFY_MIG(kr
);
977 // mach_host_self returned the HOST_PRIV port
978 kr
= mach_port_deallocate(mach_task_self(), mhp
);
979 DISPATCH_VERIFY_MIG(kr
);
980 (void)dispatch_assume_zero(kr
);
982 } else if (kr
!= KERN_INVALID_ARGUMENT
) {
983 (void)dispatch_assume_zero(kr
);
985 if (!dispatch_assume(mhp
)) {
986 DISPATCH_CRASH("Could not get unprivileged host port");
988 _dispatch_mach_host_port
= mhp
;
992 _dispatch_get_mach_host_port(void)
994 dispatch_once_f(&_dispatch_mach_host_port_pred
, NULL
,
995 _dispatch_mach_host_port_init
);
996 return _dispatch_mach_host_port
;
1000 DISPATCH_EXPORT DISPATCH_NOTHROW
1002 dispatch_atfork_child(void)
1004 void *crash
= (void *)0x100;
1008 _dispatch_mach_host_port_pred
= 0;
1009 _dispatch_mach_host_port
= MACH_VOUCHER_NULL
;
1011 _voucher_atfork_child();
1012 if (_dispatch_safe_fork
) {
1015 _dispatch_child_of_unsafe_fork
= true;
1017 _dispatch_main_q
.dq_items_head
= crash
;
1018 _dispatch_main_q
.dq_items_tail
= crash
;
1020 _dispatch_mgr_q
.dq_items_head
= crash
;
1021 _dispatch_mgr_q
.dq_items_tail
= crash
;
1023 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1024 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1025 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1030 #pragma mark dispatch_queue_attr_t
1032 DISPATCH_ALWAYS_INLINE
1034 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class
, int relative_priority
)
1036 qos_class_t qos
= (qos_class_t
)qos_class
;
1038 case _DISPATCH_QOS_CLASS_MAINTENANCE
:
1039 case _DISPATCH_QOS_CLASS_BACKGROUND
:
1040 case _DISPATCH_QOS_CLASS_UTILITY
:
1041 case _DISPATCH_QOS_CLASS_DEFAULT
:
1042 case _DISPATCH_QOS_CLASS_USER_INITIATED
:
1043 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE
:
1044 case _DISPATCH_QOS_CLASS_UNSPECIFIED
:
1049 if (relative_priority
> 0 || relative_priority
< QOS_MIN_RELATIVE_PRIORITY
){
1055 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1056 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1059 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx
[] = {
1060 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED
),
1061 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE
),
1062 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND
),
1063 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY
),
1064 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT
),
1065 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED
),
1066 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE
),
1069 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1070 (overcommit ? DQA_INDEX_OVERCOMMIT : DQA_INDEX_NON_OVERCOMMIT)
1072 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1073 (concurrent ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1075 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1077 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1079 static inline dispatch_queue_attr_t
1080 _dispatch_get_queue_attr(qos_class_t qos
, int prio
, bool overcommit
,
1083 return (dispatch_queue_attr_t
)&_dispatch_queue_attrs
1084 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos
)]
1085 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio
)]
1086 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit
)]
1087 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent
)];
1090 dispatch_queue_attr_t
1091 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa
,
1092 dispatch_qos_class_t qos_class
, int relative_priority
)
1094 if (!_dispatch_qos_class_valid(qos_class
, relative_priority
)) return NULL
;
1095 if (!slowpath(dqa
)) {
1096 dqa
= _dispatch_get_queue_attr(0, 0, false, false);
1097 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1098 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1100 return _dispatch_get_queue_attr(qos_class
, relative_priority
,
1101 dqa
->dqa_overcommit
, dqa
->dqa_concurrent
);
1104 dispatch_queue_attr_t
1105 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa
,
1108 if (!slowpath(dqa
)) {
1109 dqa
= _dispatch_get_queue_attr(0, 0, false, false);
1110 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1111 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1113 return _dispatch_get_queue_attr(dqa
->dqa_qos_class
,
1114 dqa
->dqa_relative_priority
, overcommit
, dqa
->dqa_concurrent
);
1118 #pragma mark dispatch_queue_t
1124 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1125 // we use 'xadd' on Intel, so the initial value == next assigned
1126 unsigned long volatile _dispatch_queue_serial_numbers
= 16;
1129 dispatch_queue_create_with_target(const char *label
, dispatch_queue_attr_t dqa
,
1130 dispatch_queue_t tq
)
1132 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1133 // Be sure the root queue priorities are set
1134 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
1135 _dispatch_root_queues_init
);
1137 bool disallow_tq
= (slowpath(dqa
) && dqa
!= DISPATCH_QUEUE_CONCURRENT
);
1138 if (!slowpath(dqa
)) {
1139 dqa
= _dispatch_get_queue_attr(0, 0, false, false);
1140 } else if (dqa
->do_vtable
!= DISPATCH_VTABLE(queue_attr
)) {
1141 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1143 dispatch_queue_t dq
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
1144 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
1145 _dispatch_queue_init(dq
);
1147 dq
->dq_label
= strdup(label
);
1149 qos_class_t qos
= dqa
->dqa_qos_class
;
1150 bool overcommit
= dqa
->dqa_overcommit
;
1151 #if HAVE_PTHREAD_WORKQUEUE_QOS
1152 dq
->dq_priority
= _pthread_qos_class_encode(qos
, dqa
->dqa_relative_priority
,
1155 if (dqa
->dqa_concurrent
) {
1156 dq
->dq_width
= DISPATCH_QUEUE_WIDTH_MAX
;
1158 // Default serial queue target queue is overcommit!
1162 if (qos
== _DISPATCH_QOS_CLASS_UNSPECIFIED
) {
1163 qos
= _DISPATCH_QOS_CLASS_DEFAULT
;
1165 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1166 if (qos
== _DISPATCH_QOS_CLASS_USER_INTERACTIVE
&&
1167 !_dispatch_root_queues
[
1168 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS
].dq_priority
) {
1169 qos
= _DISPATCH_QOS_CLASS_USER_INITIATED
;
1172 bool maintenance_fallback
= false;
1173 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1174 maintenance_fallback
= true;
1175 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1176 if (maintenance_fallback
) {
1177 if (qos
== _DISPATCH_QOS_CLASS_MAINTENANCE
&&
1178 !_dispatch_root_queues
[
1179 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
].dq_priority
) {
1180 qos
= _DISPATCH_QOS_CLASS_BACKGROUND
;
1184 tq
= _dispatch_get_root_queue(qos
, overcommit
);
1185 if (slowpath(!tq
)) {
1186 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1189 _dispatch_retain(tq
);
1191 // TODO: override target queue's qos/overcommit ?
1192 DISPATCH_CLIENT_CRASH("Invalid combination of target queue & "
1195 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1197 _dispatch_queue_set_override_priority(dq
);
1198 dq
->do_targetq
= tq
;
1199 _dispatch_object_debug(dq
, "%s", __func__
);
1200 return _dispatch_introspection_queue_create(dq
);
1204 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
1206 return dispatch_queue_create_with_target(label
, attr
,
1207 DISPATCH_TARGET_QUEUE_DEFAULT
);
1211 _dispatch_queue_destroy(dispatch_object_t dou
)
1213 dispatch_queue_t dq
= dou
._dq
;
1214 if (slowpath(dq
== _dispatch_queue_get_current())) {
1215 DISPATCH_CRASH("Release of a queue by itself");
1217 if (slowpath(dq
->dq_items_tail
)) {
1218 DISPATCH_CRASH("Release of a queue while items are enqueued");
1221 // trash the tail queue so that use after free will crash
1222 dq
->dq_items_tail
= (void *)0x200;
1224 dispatch_queue_t dqsq
= dispatch_atomic_xchg2o(dq
, dq_specific_q
,
1225 (void *)0x200, relaxed
);
1227 _dispatch_release(dqsq
);
1231 // 6618342 Contact the team that owns the Instrument DTrace probe before
1232 // renaming this symbol
1234 _dispatch_queue_dispose(dispatch_queue_t dq
)
1236 _dispatch_object_debug(dq
, "%s", __func__
);
1237 _dispatch_introspection_queue_dispose(dq
);
1239 free((void*)dq
->dq_label
);
1241 _dispatch_queue_destroy(dq
);
1245 dispatch_queue_get_label(dispatch_queue_t dq
)
1247 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
1248 dq
= _dispatch_get_current_queue();
1250 return dq
->dq_label
? dq
->dq_label
: "";
1254 dispatch_queue_get_qos_class(dispatch_queue_t dq
, int *relative_priority_ptr
)
1256 qos_class_t qos
= _DISPATCH_QOS_CLASS_UNSPECIFIED
;
1257 int relative_priority
= 0;
1258 #if HAVE_PTHREAD_WORKQUEUE_QOS
1259 pthread_priority_t dqp
= dq
->dq_priority
;
1260 if (dqp
& _PTHREAD_PRIORITY_INHERIT_FLAG
) dqp
= 0;
1261 qos
= _pthread_qos_class_decode(dqp
, &relative_priority
, NULL
);
1265 if (relative_priority_ptr
) *relative_priority_ptr
= relative_priority
;
1270 _dispatch_queue_set_width2(void *ctxt
)
1272 int w
= (int)(intptr_t)ctxt
; // intentional truncation
1274 dispatch_queue_t dq
= _dispatch_queue_get_current();
1276 if (w
== 1 || w
== 0) {
1278 _dispatch_object_debug(dq
, "%s", __func__
);
1282 tmp
= (unsigned int)w
;
1284 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
1285 tmp
= dispatch_hw_config(physical_cpus
);
1287 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
1288 tmp
= dispatch_hw_config(active_cpus
);
1292 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
1293 tmp
= dispatch_hw_config(logical_cpus
);
1296 if (tmp
> DISPATCH_QUEUE_WIDTH_MAX
/ 2) {
1297 tmp
= DISPATCH_QUEUE_WIDTH_MAX
/ 2;
1299 // multiply by two since the running count is inc/dec by two
1300 // (the low bit == barrier)
1301 dq
->dq_width
= (typeof(dq
->dq_width
))(tmp
* 2);
1302 _dispatch_object_debug(dq
, "%s", __func__
);
1306 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
1308 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
1309 slowpath(dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
1312 _dispatch_barrier_trysync_f(dq
, (void*)(intptr_t)width
,
1313 _dispatch_queue_set_width2
);
1316 // 6618342 Contact the team that owns the Instrument DTrace probe before
1317 // renaming this symbol
1319 _dispatch_set_target_queue2(void *ctxt
)
1321 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current(), tq
= ctxt
;
1324 while (!dispatch_atomic_cmpxchgv2o(dq
, dq_tqthread
, MACH_PORT_NULL
,
1325 _dispatch_thread_port(), &th
, acquire
)) {
1326 _dispatch_thread_switch(th
, DISPATCH_YIELD_THREAD_SWITCH_OPTION
,
1327 DISPATCH_CONTENTION_USLEEP_START
);
1329 _dispatch_queue_priority_inherit_from_target(dq
, tq
);
1330 prev_dq
= dq
->do_targetq
;
1331 dq
->do_targetq
= tq
;
1332 _dispatch_release(prev_dq
);
1333 _dispatch_object_debug(dq
, "%s", __func__
);
1334 dispatch_atomic_store2o(dq
, dq_tqthread
, MACH_PORT_NULL
, release
);
1338 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
1340 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue
, dou
, dq
);
1341 if (slowpath(dou
._do
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
1342 slowpath(dx_type(dou
._do
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
1345 unsigned long type
= dx_metatype(dou
._do
);
1346 if (slowpath(!dq
)) {
1347 bool is_concurrent_q
= (type
== _DISPATCH_QUEUE_TYPE
&&
1348 slowpath(dou
._dq
->dq_width
> 1));
1349 dq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1352 // TODO: put into the vtable
1354 case _DISPATCH_QUEUE_TYPE
:
1355 case _DISPATCH_SOURCE_TYPE
:
1356 _dispatch_retain(dq
);
1357 return _dispatch_barrier_trysync_f(dou
._dq
, dq
,
1358 _dispatch_set_target_queue2
);
1359 case _DISPATCH_IO_TYPE
:
1360 return _dispatch_io_set_target_queue(dou
._dchannel
, dq
);
1362 dispatch_queue_t prev_dq
;
1363 _dispatch_retain(dq
);
1364 prev_dq
= dispatch_atomic_xchg2o(dou
._do
, do_targetq
, dq
, release
);
1365 if (prev_dq
) _dispatch_release(prev_dq
);
1366 _dispatch_object_debug(dou
._do
, "%s", __func__
);
1373 #pragma mark dispatch_pthread_root_queue
1375 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1376 static struct dispatch_pthread_root_queue_context_s
1377 _dispatch_mgr_root_queue_pthread_context
;
1378 static struct dispatch_root_queue_context_s
1379 _dispatch_mgr_root_queue_context
= {{{
1380 #if HAVE_PTHREAD_WORKQUEUES
1381 .dgq_kworkqueue
= (void*)(~0ul),
1383 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
1384 .dgq_thread_pool_size
= 1,
1386 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
1387 .do_vtable
= DISPATCH_VTABLE(queue_root
),
1388 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
1389 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
1390 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
1391 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
1392 .dq_label
= "com.apple.root.libdispatch-manager",
1394 .dq_width
= DISPATCH_QUEUE_WIDTH_MAX
,
1402 } _dispatch_mgr_sched
;
1403 static dispatch_once_t _dispatch_mgr_sched_pred
;
1406 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
1408 struct sched_param param
;
1409 pthread_attr_t
*attr
;
1410 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1411 (void)dispatch_assume_zero(pthread_attr_init(attr
));
1412 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
1413 &_dispatch_mgr_sched
.policy
));
1414 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1415 // legacy priority calls allowed when requesting above default priority
1416 _dispatch_mgr_sched
.default_prio
= param
.sched_priority
;
1417 _dispatch_mgr_sched
.prio
= _dispatch_mgr_sched
.default_prio
;
1422 _dispatch_mgr_root_queue_init(void)
1424 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
1425 struct sched_param param
;
1426 pthread_attr_t
*attr
;
1427 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1428 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
1429 PTHREAD_CREATE_DETACHED
));
1431 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
1433 #if HAVE_PTHREAD_WORKQUEUE_QOS
1434 if (_dispatch_set_qos_class_enabled
) {
1435 qos_class_t qos
= qos_class_main();
1436 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr
, qos
, 0));
1437 _dispatch_mgr_q
.dq_priority
= _pthread_qos_class_encode(qos
, 0, 0);
1438 _dispatch_queue_set_override_priority(&_dispatch_mgr_q
);
1441 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1442 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1443 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
1445 return &_dispatch_mgr_sched
.tid
;
1449 _dispatch_mgr_priority_apply(void)
1451 struct sched_param param
;
1453 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
1454 if (param
.sched_priority
> _dispatch_mgr_sched
.default_prio
) {
1455 (void)dispatch_assume_zero(pthread_setschedparam(
1456 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
,
1459 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
1464 _dispatch_mgr_priority_init(void)
1466 struct sched_param param
;
1467 pthread_attr_t
*attr
;
1468 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
1469 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1470 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
1471 return _dispatch_mgr_priority_apply();
1477 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
1479 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
1480 struct sched_param param
;
1481 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
1482 int p
= _dispatch_mgr_sched
.prio
;
1483 do if (p
>= param
.sched_priority
) {
1485 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched
, prio
,
1486 p
, param
.sched_priority
, &p
, relaxed
)));
1487 if (_dispatch_mgr_sched
.tid
) {
1488 return _dispatch_mgr_priority_apply();
1493 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
1494 const pthread_attr_t
*attr
, dispatch_block_t configure
)
1496 dispatch_queue_t dq
;
1497 dispatch_root_queue_context_t qc
;
1498 dispatch_pthread_root_queue_context_t pqc
;
1500 uint8_t pool_size
= flags
& _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
?
1501 (uint8_t)(flags
& ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE
) : 0;
1503 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
1504 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
1505 sizeof(struct dispatch_root_queue_context_s
) +
1506 sizeof(struct dispatch_pthread_root_queue_context_s
));
1507 qc
= (void*)dq
+ dqs
;
1508 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
1510 _dispatch_queue_init(dq
);
1512 dq
->dq_label
= strdup(label
);
1515 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
1517 dq
->do_targetq
= NULL
;
1519 dq
->dq_width
= DISPATCH_QUEUE_WIDTH_MAX
;
1521 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
1523 #if HAVE_PTHREAD_WORKQUEUES
1524 qc
->dgq_kworkqueue
= (void*)(~0ul);
1526 _dispatch_root_queue_init_pthread_pool(qc
, pool_size
, true);
1529 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
1530 #if HAVE_PTHREAD_WORKQUEUE_QOS
1531 qos_class_t qos
= 0;
1532 if (!pthread_attr_get_qos_class_np(&pqc
->dpq_thread_attr
, &qos
, NULL
)
1533 && qos
> _DISPATCH_QOS_CLASS_DEFAULT
) {
1534 DISPATCH_CLIENT_CRASH("pthread root queues do not support "
1535 "explicit QoS attributes");
1538 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
1540 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
1542 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1543 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
1545 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
1547 _dispatch_object_debug(dq
, "%s", __func__
);
1548 return _dispatch_introspection_queue_create(dq
);
1553 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
)
1555 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1556 DISPATCH_CRASH("Global root queue disposed");
1558 _dispatch_object_debug(dq
, "%s", __func__
);
1559 _dispatch_introspection_queue_dispose(dq
);
1560 #if DISPATCH_USE_PTHREAD_POOL
1561 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
1562 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
1564 pthread_attr_destroy(&pqc
->dpq_thread_attr
);
1565 _dispatch_semaphore_dispose(&pqc
->dpq_thread_mediator
);
1566 if (pqc
->dpq_thread_configure
) {
1567 Block_release(pqc
->dpq_thread_configure
);
1569 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1573 free((void*)dq
->dq_label
);
1575 _dispatch_queue_destroy(dq
);
1579 #pragma mark dispatch_queue_specific
1581 struct dispatch_queue_specific_queue_s
{
1582 DISPATCH_STRUCT_HEADER(queue_specific_queue
);
1583 DISPATCH_QUEUE_HEADER
;
1584 TAILQ_HEAD(dispatch_queue_specific_head_s
,
1585 dispatch_queue_specific_s
) dqsq_contexts
;
1588 struct dispatch_queue_specific_s
{
1589 const void *dqs_key
;
1591 dispatch_function_t dqs_destructor
;
1592 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
1594 DISPATCH_DECL(dispatch_queue_specific
);
1597 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
1599 dispatch_queue_specific_t dqs
, tmp
;
1601 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
1602 if (dqs
->dqs_destructor
) {
1603 dispatch_async_f(_dispatch_get_root_queue(
1604 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
1605 dqs
->dqs_destructor
);
1609 _dispatch_queue_destroy((dispatch_queue_t
)dqsq
);
1613 _dispatch_queue_init_specific(dispatch_queue_t dq
)
1615 dispatch_queue_specific_queue_t dqsq
;
1617 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
1618 sizeof(struct dispatch_queue_specific_queue_s
));
1619 _dispatch_queue_init((dispatch_queue_t
)dqsq
);
1620 dqsq
->do_xref_cnt
= -1;
1621 dqsq
->do_targetq
= _dispatch_get_root_queue(
1622 _DISPATCH_QOS_CLASS_USER_INITIATED
, true);
1623 dqsq
->dq_width
= DISPATCH_QUEUE_WIDTH_MAX
;
1624 dqsq
->dq_label
= "queue-specific";
1625 TAILQ_INIT(&dqsq
->dqsq_contexts
);
1626 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
1627 (dispatch_queue_t
)dqsq
, release
))) {
1628 _dispatch_release((dispatch_queue_t
)dqsq
);
1633 _dispatch_queue_set_specific(void *ctxt
)
1635 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
1636 dispatch_queue_specific_queue_t dqsq
=
1637 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1639 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1640 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
1641 // Destroy previous context for existing key
1642 if (dqs
->dqs_destructor
) {
1643 dispatch_async_f(_dispatch_get_root_queue(
1644 _DISPATCH_QOS_CLASS_DEFAULT
, false), dqs
->dqs_ctxt
,
1645 dqs
->dqs_destructor
);
1647 if (dqsn
->dqs_ctxt
) {
1648 // Copy new context for existing key
1649 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
1650 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
1652 // Remove context storage for existing key
1653 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
1659 // Insert context storage for new key
1660 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
1665 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
1666 void *ctxt
, dispatch_function_t destructor
)
1668 if (slowpath(!key
)) {
1671 dispatch_queue_specific_t dqs
;
1673 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
1675 dqs
->dqs_ctxt
= ctxt
;
1676 dqs
->dqs_destructor
= destructor
;
1677 if (slowpath(!dq
->dq_specific_q
)) {
1678 _dispatch_queue_init_specific(dq
);
1680 _dispatch_barrier_trysync_f(dq
->dq_specific_q
, dqs
,
1681 _dispatch_queue_set_specific
);
1685 _dispatch_queue_get_specific(void *ctxt
)
1687 void **ctxtp
= ctxt
;
1689 dispatch_queue_specific_queue_t dqsq
=
1690 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1691 dispatch_queue_specific_t dqs
;
1693 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1694 if (dqs
->dqs_key
== key
) {
1695 *ctxtp
= dqs
->dqs_ctxt
;
1704 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
1706 if (slowpath(!key
)) {
1711 if (fastpath(dq
->dq_specific_q
)) {
1713 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
1720 dispatch_get_specific(const void *key
)
1722 if (slowpath(!key
)) {
1726 dispatch_queue_t dq
= _dispatch_queue_get_current();
1728 while (slowpath(dq
)) {
1729 if (slowpath(dq
->dq_specific_q
)) {
1731 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
1732 _dispatch_queue_get_specific
);
1735 dq
= dq
->do_targetq
;
1741 #pragma mark dispatch_queue_debug
1744 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1747 dispatch_queue_t target
= dq
->do_targetq
;
1748 offset
+= dsnprintf(buf
, bufsiz
, "target = %s[%p], width = 0x%x, "
1749 "running = 0x%x, barrier = %d ", target
&& target
->dq_label
?
1750 target
->dq_label
: "", target
, dq
->dq_width
/ 2,
1751 dq
->dq_running
/ 2, dq
->dq_running
& 1);
1752 if (dq
->dq_is_thread_bound
) {
1753 offset
+= dsnprintf(buf
, bufsiz
, ", thread = 0x%x ",
1754 _dispatch_queue_get_bound_thread(dq
));
1760 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1763 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
1764 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
1765 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1766 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1767 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
1773 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1775 _dispatch_object_debug(dq
, "%s", str
);
1777 _dispatch_log("queue[NULL]: %s", str
);
1782 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1783 static OSSpinLock _dispatch_stats_lock
;
1785 uint64_t time_total
;
1786 uint64_t count_total
;
1787 uint64_t thread_total
;
1788 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
1791 _dispatch_queue_merge_stats(uint64_t start
)
1793 uint64_t delta
= _dispatch_absolute_time() - start
;
1794 unsigned long count
;
1796 count
= (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1797 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1799 int bucket
= flsl((long)count
);
1801 // 64-bit counters on 32-bit require a lock or a queue
1802 OSSpinLockLock(&_dispatch_stats_lock
);
1804 _dispatch_stats
[bucket
].time_total
+= delta
;
1805 _dispatch_stats
[bucket
].count_total
+= count
;
1806 _dispatch_stats
[bucket
].thread_total
++;
1808 OSSpinLockUnlock(&_dispatch_stats_lock
);
1813 #pragma mark dispatch_continuation_t
1816 _dispatch_force_cache_cleanup(void)
1818 dispatch_continuation_t dc
;
1819 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1821 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1822 _dispatch_cache_cleanup(dc
);
1828 _dispatch_cache_cleanup(void *value
)
1830 dispatch_continuation_t dc
, next_dc
= value
;
1832 while ((dc
= next_dc
)) {
1833 next_dc
= dc
->do_next
;
1834 _dispatch_continuation_free_to_heap(dc
);
1838 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1839 int _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
1843 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
1845 _dispatch_continuation_free_to_heap(dc
);
1846 dispatch_continuation_t next_dc
;
1847 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1849 if (!dc
|| (cnt
= dc
->dc_cache_cnt
-
1850 _dispatch_continuation_cache_limit
) <= 0){
1854 next_dc
= dc
->do_next
;
1855 _dispatch_continuation_free_to_heap(dc
);
1856 } while (--cnt
&& (dc
= next_dc
));
1857 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
1861 DISPATCH_ALWAYS_INLINE_NDEBUG
1863 _dispatch_continuation_redirect(dispatch_queue_t dq
, dispatch_object_t dou
)
1865 dispatch_continuation_t dc
= dou
._dc
;
1867 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, acquire
);
1868 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
1869 (long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
) {
1870 _dispatch_trace_continuation_pop(dq
, dou
);
1871 _dispatch_thread_semaphore_signal(
1872 (_dispatch_thread_semaphore_t
)dc
->dc_other
);
1873 _dispatch_introspection_queue_item_complete(dou
);
1875 _dispatch_async_f_redirect(dq
, dc
,
1876 _dispatch_queue_get_override_priority(dq
));
1878 _dispatch_perfmon_workitem_inc();
1882 #pragma mark dispatch_block_create
1886 DISPATCH_ALWAYS_INLINE
1888 _dispatch_block_flags_valid(dispatch_block_flags_t flags
)
1890 return ((flags
& ~DISPATCH_BLOCK_API_MASK
) == 0);
1893 DISPATCH_ALWAYS_INLINE
1894 static inline dispatch_block_flags_t
1895 _dispatch_block_normalize_flags(dispatch_block_flags_t flags
)
1897 if (flags
& (DISPATCH_BLOCK_NO_VOUCHER
|DISPATCH_BLOCK_DETACHED
)) {
1898 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
1900 if (flags
& (DISPATCH_BLOCK_NO_QOS_CLASS
|DISPATCH_BLOCK_DETACHED
)) {
1901 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
1906 static inline dispatch_block_t
1907 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags
,
1908 voucher_t voucher
, pthread_priority_t pri
, dispatch_block_t block
)
1910 flags
= _dispatch_block_normalize_flags(flags
);
1911 voucher_t cv
= NULL
;
1912 bool assign
= (flags
& DISPATCH_BLOCK_ASSIGN_CURRENT
);
1913 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_VOUCHER
)) {
1914 voucher
= cv
= voucher_copy();
1915 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
1917 if (assign
&& !(flags
& DISPATCH_BLOCK_HAS_PRIORITY
)) {
1918 pri
= _dispatch_priority_propagate();
1919 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
1921 dispatch_block_t db
= _dispatch_block_create(flags
, voucher
, pri
, block
);
1922 if (cv
) _voucher_release(cv
);
1924 dispatch_assert(_dispatch_block_get_data(db
));
1930 dispatch_block_create(dispatch_block_flags_t flags
, dispatch_block_t block
)
1932 if (!_dispatch_block_flags_valid(flags
)) return NULL
;
1933 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
, 0,
1938 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags
,
1939 dispatch_qos_class_t qos_class
, int relative_priority
,
1940 dispatch_block_t block
)
1942 if (!_dispatch_block_flags_valid(flags
)) return NULL
;
1943 if (!_dispatch_qos_class_valid(qos_class
, relative_priority
)) return NULL
;
1944 flags
|= DISPATCH_BLOCK_HAS_PRIORITY
;
1945 pthread_priority_t pri
= 0;
1946 #if HAVE_PTHREAD_WORKQUEUE_QOS
1947 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
1949 return _dispatch_block_create_with_voucher_and_priority(flags
, NULL
,
1954 dispatch_block_create_with_voucher(dispatch_block_flags_t flags
,
1955 voucher_t voucher
, dispatch_block_t block
)
1957 if (!_dispatch_block_flags_valid(flags
)) return NULL
;
1958 flags
|= DISPATCH_BLOCK_HAS_VOUCHER
;
1959 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
, 0,
1964 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags
,
1965 voucher_t voucher
, dispatch_qos_class_t qos_class
,
1966 int relative_priority
, dispatch_block_t block
)
1968 if (!_dispatch_block_flags_valid(flags
)) return NULL
;
1969 if (!_dispatch_qos_class_valid(qos_class
, relative_priority
)) return NULL
;
1970 flags
|= (DISPATCH_BLOCK_HAS_VOUCHER
|DISPATCH_BLOCK_HAS_PRIORITY
);
1971 pthread_priority_t pri
= 0;
1972 #if HAVE_PTHREAD_WORKQUEUE_QOS
1973 pri
= _pthread_qos_class_encode(qos_class
, relative_priority
, 0);
1975 return _dispatch_block_create_with_voucher_and_priority(flags
, voucher
,
1980 dispatch_block_perform(dispatch_block_flags_t flags
, dispatch_block_t block
)
1982 if (!_dispatch_block_flags_valid(flags
)) {
1983 DISPATCH_CLIENT_CRASH("Invalid flags passed to "
1984 "dispatch_block_perform()");
1986 flags
= _dispatch_block_normalize_flags(flags
);
1987 struct dispatch_block_private_data_s dbpds
=
1988 DISPATCH_BLOCK_PRIVATE_DATA_INITIALIZER(flags
, NULL
, 0, block
);
1989 dbpds
.dbpd_atomic_flags
|= DBF_PERFORM
; // no group_leave at end of invoke
1990 return _dispatch_block_invoke(&dbpds
);
1993 #define _dbpd_group(dbpd) ((dispatch_group_t)&(dbpd)->dbpd_group)
1996 _dispatch_block_invoke(const struct dispatch_block_private_data_s
*dbcpd
)
1998 dispatch_block_private_data_t dbpd
= (dispatch_block_private_data_t
)dbcpd
;
1999 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2000 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2001 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2002 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2003 "than once and waited for");
2005 if (atomic_flags
& DBF_CANCELED
) goto out
;
2007 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2008 unsigned long override
= 0;
2009 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2010 op
= _dispatch_get_priority();
2011 p
= dbpd
->dbpd_priority
;
2012 override
= (flags
& DISPATCH_BLOCK_ENFORCE_QOS_CLASS
) ||
2013 !(flags
& DISPATCH_BLOCK_INHERIT_QOS_CLASS
) ?
2014 DISPATCH_PRIORITY_ENFORCE
: 0;
2016 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2017 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2018 v
= dbpd
->dbpd_voucher
;
2019 if (v
) _voucher_retain(v
);
2021 ov
= _dispatch_adopt_priority_and_voucher(p
, v
, override
);
2022 dbpd
->dbpd_thread
= _dispatch_thread_port();
2024 _dispatch_set_priority_and_replace_voucher(op
, ov
);
2026 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2027 if (dispatch_atomic_inc2o(dbpd
, dbpd_performed
, acquire
) == 1) {
2028 dispatch_group_leave(_dbpd_group(dbpd
));
2034 _dispatch_block_sync_invoke(void *block
)
2036 dispatch_block_t b
= block
;
2037 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2038 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2039 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2040 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2041 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2042 "than once and waited for");
2044 if (atomic_flags
& DBF_CANCELED
) goto out
;
2046 pthread_priority_t op
= DISPATCH_NO_PRIORITY
, p
= DISPATCH_NO_PRIORITY
;
2047 unsigned long override
= 0;
2048 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2049 op
= _dispatch_get_priority();
2050 p
= dbpd
->dbpd_priority
;
2051 override
= (flags
& DISPATCH_BLOCK_ENFORCE_QOS_CLASS
) ||
2052 !(flags
& DISPATCH_BLOCK_INHERIT_QOS_CLASS
) ?
2053 DISPATCH_PRIORITY_ENFORCE
: 0;
2055 voucher_t ov
, v
= DISPATCH_NO_VOUCHER
;
2056 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2057 v
= dbpd
->dbpd_voucher
;
2058 if (v
) _voucher_retain(v
);
2060 ov
= _dispatch_adopt_priority_and_voucher(p
, v
, override
);
2062 _dispatch_set_priority_and_replace_voucher(op
, ov
);
2064 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2065 if (dispatch_atomic_inc2o(dbpd
, dbpd_performed
, acquire
) == 1) {
2066 dispatch_group_leave(_dbpd_group(dbpd
));
2070 dispatch_queue_t dq
= _dispatch_queue_get_current();
2071 if (dispatch_atomic_cmpxchg2o(dbpd
, dbpd_queue
, dq
, NULL
, acquire
)) {
2072 // balances dispatch_{,barrier_,}sync
2073 _dispatch_release(dq
);
2078 _dispatch_block_async_invoke_and_release(void *block
)
2080 dispatch_block_t b
= block
;
2081 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(b
);
2082 dispatch_block_flags_t flags
= dbpd
->dbpd_flags
;
2083 unsigned int atomic_flags
= dbpd
->dbpd_atomic_flags
;
2084 if (slowpath(atomic_flags
& DBF_WAITED
)) {
2085 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2086 "than once and waited for");
2088 if (atomic_flags
& DBF_CANCELED
) goto out
;
2090 pthread_priority_t p
= DISPATCH_NO_PRIORITY
;
2091 unsigned long override
= 0;
2092 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2093 override
= (flags
& DISPATCH_BLOCK_ENFORCE_QOS_CLASS
) ?
2094 DISPATCH_PRIORITY_ENFORCE
: 0;
2095 p
= dbpd
->dbpd_priority
;
2097 voucher_t v
= DISPATCH_NO_VOUCHER
;
2098 if (flags
& DISPATCH_BLOCK_HAS_VOUCHER
) {
2099 v
= dbpd
->dbpd_voucher
;
2100 if (v
) _voucher_retain(v
);
2102 _dispatch_adopt_priority_and_replace_voucher(p
, v
, override
);
2105 if ((atomic_flags
& DBF_PERFORM
) == 0) {
2106 if (dispatch_atomic_inc2o(dbpd
, dbpd_performed
, acquire
) == 1) {
2107 dispatch_group_leave(_dbpd_group(dbpd
));
2110 dispatch_queue_t dq
= _dispatch_queue_get_current();
2111 if (dispatch_atomic_cmpxchg2o(dbpd
, dbpd_queue
, dq
, NULL
, acquire
)) {
2112 // balances dispatch_{,barrier_,group_}async
2113 _dispatch_release(dq
);
2119 dispatch_block_cancel(dispatch_block_t db
)
2121 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2123 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2124 "dispatch_block_cancel()");
2126 (void)dispatch_atomic_or2o(dbpd
, dbpd_atomic_flags
, DBF_CANCELED
, relaxed
);
2130 dispatch_block_testcancel(dispatch_block_t db
)
2132 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2134 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2135 "dispatch_block_testcancel()");
2137 return (bool)(dbpd
->dbpd_atomic_flags
& DBF_CANCELED
);
2141 dispatch_block_wait(dispatch_block_t db
, dispatch_time_t timeout
)
2143 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2145 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2146 "dispatch_block_wait()");
2149 unsigned int flags
= dispatch_atomic_or_orig2o(dbpd
, dbpd_atomic_flags
,
2150 DBF_WAITING
, relaxed
);
2151 if (slowpath(flags
& (DBF_WAITED
| DBF_WAITING
))) {
2152 DISPATCH_CLIENT_CRASH("A block object may not be waited for "
2156 // <rdar://problem/17703192> If we know the queue where this block is
2157 // enqueued, or the thread that's executing it, then we should boost
2160 pthread_priority_t pp
= _dispatch_get_priority();
2162 dispatch_queue_t boost_dq
;
2163 boost_dq
= dispatch_atomic_xchg2o(dbpd
, dbpd_queue
, NULL
, acquire
);
2165 // release balances dispatch_{,barrier_,group_}async.
2166 // Can't put the queue back in the timeout case: the block might
2167 // finish after we fell out of group_wait and see our NULL, so
2168 // neither of us would ever release. Side effect: After a _wait
2169 // that times out, subsequent waits will not boost the qos of the
2170 // still-running block.
2171 _dispatch_queue_wakeup_with_qos_and_release(boost_dq
, pp
);
2174 mach_port_t boost_th
= dbpd
->dbpd_thread
;
2176 _dispatch_thread_override_start(boost_th
, pp
);
2179 int performed
= dispatch_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
2180 if (slowpath(performed
> 1 || (boost_th
&& boost_dq
))) {
2181 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2182 "than once and waited for");
2185 long ret
= dispatch_group_wait(_dbpd_group(dbpd
), timeout
);
2188 _dispatch_thread_override_end(boost_th
);
2192 // timed out: reverse our changes
2193 (void)dispatch_atomic_and2o(dbpd
, dbpd_atomic_flags
,
2194 ~DBF_WAITING
, relaxed
);
2196 (void)dispatch_atomic_or2o(dbpd
, dbpd_atomic_flags
,
2197 DBF_WAITED
, relaxed
);
2198 // don't need to re-test here: the second call would see
2199 // the first call's WAITING
2206 dispatch_block_notify(dispatch_block_t db
, dispatch_queue_t queue
,
2207 dispatch_block_t notification_block
)
2209 dispatch_block_private_data_t dbpd
= _dispatch_block_get_data(db
);
2211 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2212 "dispatch_block_notify()");
2214 int performed
= dispatch_atomic_load2o(dbpd
, dbpd_performed
, relaxed
);
2215 if (slowpath(performed
> 1)) {
2216 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2217 "than once and observed");
2220 return dispatch_group_notify(_dbpd_group(dbpd
), queue
, notification_block
);
2223 #endif // __BLOCKS__
2226 #pragma mark dispatch_barrier_async
2230 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
2231 dispatch_function_t func
, pthread_priority_t pp
,
2232 dispatch_block_flags_t flags
)
2234 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
2236 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
2239 _dispatch_continuation_voucher_set(dc
, flags
);
2240 _dispatch_continuation_priority_set(dc
, pp
, flags
);
2242 pp
= _dispatch_continuation_get_override_priority(dq
, dc
);
2244 _dispatch_queue_push(dq
, dc
, pp
);
2247 DISPATCH_ALWAYS_INLINE
2249 _dispatch_barrier_async_f2(dispatch_queue_t dq
, void *ctxt
,
2250 dispatch_function_t func
, pthread_priority_t pp
,
2251 dispatch_block_flags_t flags
)
2253 dispatch_continuation_t dc
;
2255 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
2257 return _dispatch_barrier_async_f_slow(dq
, ctxt
, func
, pp
, flags
);
2260 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
2263 _dispatch_continuation_voucher_set(dc
, flags
);
2264 _dispatch_continuation_priority_set(dc
, pp
, flags
);
2266 pp
= _dispatch_continuation_get_override_priority(dq
, dc
);
2268 _dispatch_queue_push(dq
, dc
, pp
);
2273 _dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
2274 dispatch_function_t func
, pthread_priority_t pp
,
2275 dispatch_block_flags_t flags
)
2277 return _dispatch_barrier_async_f2(dq
, ctxt
, func
, pp
, flags
);
2282 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
2283 dispatch_function_t func
)
2285 return _dispatch_barrier_async_f2(dq
, ctxt
, func
, 0, 0);
2290 _dispatch_barrier_async_detached_f(dispatch_queue_t dq
, void *ctxt
,
2291 dispatch_function_t func
)
2293 return _dispatch_barrier_async_f2(dq
, ctxt
, func
, 0,
2294 DISPATCH_BLOCK_NO_QOS_CLASS
|DISPATCH_BLOCK_NO_VOUCHER
);
2299 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
2301 dispatch_function_t func
= _dispatch_call_block_and_release
;
2302 pthread_priority_t pp
= 0;
2303 dispatch_block_flags_t flags
= 0;
2304 if (slowpath(_dispatch_block_has_private_data(work
))) {
2305 func
= _dispatch_block_async_invoke_and_release
;
2306 pp
= _dispatch_block_get_priority(work
);
2307 flags
= _dispatch_block_get_flags(work
);
2308 // balanced in d_block_async_invoke_and_release or d_block_wait
2309 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
2310 dbpd_queue
, NULL
, dq
, release
)) {
2311 _dispatch_retain(dq
);
2314 _dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
), func
, pp
, flags
);
2319 #pragma mark dispatch_async
2322 _dispatch_async_redirect_invoke(void *ctxt
)
2324 struct dispatch_continuation_s
*dc
= ctxt
;
2325 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
2326 dispatch_queue_t old_dq
, dq
= dc
->dc_data
, rq
;
2328 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2329 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2330 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(dq
->dq_priority
);
2331 _dispatch_continuation_pop(other_dc
);
2332 _dispatch_reset_defaultpriority(old_dp
);
2333 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2335 rq
= dq
->do_targetq
;
2336 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
2337 if (dispatch_atomic_sub2o(rq
, dq_running
, 2, relaxed
) == 0) {
2338 _dispatch_queue_wakeup(rq
);
2340 rq
= rq
->do_targetq
;
2343 if (dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0) {
2344 _dispatch_queue_wakeup(dq
);
2346 _dispatch_release(dq
);
2350 _dispatch_async_f_redirect2(dispatch_queue_t dq
, dispatch_continuation_t dc
,
2351 pthread_priority_t pp
)
2353 uint32_t running
= 2;
2355 // Find the queue to redirect to
2357 if (slowpath(dq
->dq_items_tail
) ||
2358 slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) ||
2359 slowpath(dq
->dq_width
== 1)) {
2362 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2363 if (slowpath(running
& 1) || slowpath(running
> dq
->dq_width
)) {
2364 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
2367 dq
= dq
->do_targetq
;
2368 } while (slowpath(dq
->do_targetq
));
2370 _dispatch_queue_push_wakeup(dq
, dc
, pp
, running
== 0);
2375 _dispatch_async_f_redirect(dispatch_queue_t dq
,
2376 dispatch_continuation_t other_dc
, pthread_priority_t pp
)
2378 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
2380 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
2381 dc
->dc_func
= _dispatch_async_redirect_invoke
;
2384 dc
->dc_other
= other_dc
;
2385 dc
->dc_priority
= 0;
2386 dc
->dc_voucher
= NULL
;
2388 _dispatch_retain(dq
);
2389 dq
= dq
->do_targetq
;
2390 if (slowpath(dq
->do_targetq
)) {
2391 return _dispatch_async_f_redirect2(dq
, dc
, pp
);
2394 _dispatch_queue_push(dq
, dc
, pp
);
2399 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
,
2400 pthread_priority_t pp
)
2402 uint32_t running
= 2;
2405 if (slowpath(dq
->dq_items_tail
)
2406 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
2409 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2410 if (slowpath(running
> dq
->dq_width
)) {
2411 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
2414 if (!slowpath(running
& 1)) {
2415 return _dispatch_async_f_redirect(dq
, dc
, pp
);
2417 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
2418 // We might get lucky and find that the barrier has ended by now
2419 } while (!(running
& 1));
2421 _dispatch_queue_push_wakeup(dq
, dc
, pp
, running
== 0);
2426 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
2427 dispatch_function_t func
, pthread_priority_t pp
,
2428 dispatch_block_flags_t flags
)
2430 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
2432 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
2435 _dispatch_continuation_voucher_set(dc
, flags
);
2436 _dispatch_continuation_priority_set(dc
, pp
, flags
);
2438 pp
= _dispatch_continuation_get_override_priority(dq
, dc
);
2440 // No fastpath/slowpath hint because we simply don't know
2441 if (dq
->do_targetq
) {
2442 return _dispatch_async_f2(dq
, dc
, pp
);
2445 _dispatch_queue_push(dq
, dc
, pp
);
2448 DISPATCH_ALWAYS_INLINE
2450 _dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
2451 pthread_priority_t pp
, dispatch_block_flags_t flags
)
2453 dispatch_continuation_t dc
;
2455 // No fastpath/slowpath hint because we simply don't know
2456 if (dq
->dq_width
== 1 || flags
& DISPATCH_BLOCK_BARRIER
) {
2457 return _dispatch_barrier_async_f(dq
, ctxt
, func
, pp
, flags
);
2460 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
2462 return _dispatch_async_f_slow(dq
, ctxt
, func
, pp
, flags
);
2465 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
2468 _dispatch_continuation_voucher_set(dc
, flags
);
2469 _dispatch_continuation_priority_set(dc
, pp
, flags
);
2471 pp
= _dispatch_continuation_get_override_priority(dq
, dc
);
2473 // No fastpath/slowpath hint because we simply don't know
2474 if (dq
->do_targetq
) {
2475 return _dispatch_async_f2(dq
, dc
, pp
);
2478 _dispatch_queue_push(dq
, dc
, pp
);
2483 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
2485 return _dispatch_async_f(dq
, ctxt
, func
, 0, 0);
2490 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
2492 dispatch_function_t func
= _dispatch_call_block_and_release
;
2493 dispatch_block_flags_t flags
= 0;
2494 pthread_priority_t pp
= 0;
2495 if (slowpath(_dispatch_block_has_private_data(work
))) {
2496 func
= _dispatch_block_async_invoke_and_release
;
2497 pp
= _dispatch_block_get_priority(work
);
2498 flags
= _dispatch_block_get_flags(work
);
2499 // balanced in d_block_async_invoke_and_release or d_block_wait
2500 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
2501 dbpd_queue
, NULL
, dq
, release
)) {
2502 _dispatch_retain(dq
);
2505 _dispatch_async_f(dq
, _dispatch_Block_copy(work
), func
, pp
, flags
);
2510 #pragma mark dispatch_group_async
2512 DISPATCH_ALWAYS_INLINE
2514 _dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
2515 dispatch_function_t func
, pthread_priority_t pp
,
2516 dispatch_block_flags_t flags
)
2518 dispatch_continuation_t dc
;
2520 _dispatch_retain(dg
);
2521 dispatch_group_enter(dg
);
2523 dc
= _dispatch_continuation_alloc();
2525 unsigned long barrier
= (flags
& DISPATCH_BLOCK_BARRIER
) ?
2526 DISPATCH_OBJ_BARRIER_BIT
: 0;
2527 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_GROUP_BIT
|
2532 _dispatch_continuation_voucher_set(dc
, flags
);
2533 _dispatch_continuation_priority_set(dc
, pp
, flags
);
2535 pp
= _dispatch_continuation_get_override_priority(dq
, dc
);
2537 // No fastpath/slowpath hint because we simply don't know
2538 if (dq
->dq_width
!= 1 && !barrier
&& dq
->do_targetq
) {
2539 return _dispatch_async_f2(dq
, dc
, pp
);
2542 _dispatch_queue_push(dq
, dc
, pp
);
2547 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
2548 dispatch_function_t func
)
2550 return _dispatch_group_async_f(dg
, dq
, ctxt
, func
, 0, 0);
2555 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
2556 dispatch_block_t db
)
2558 dispatch_function_t func
= _dispatch_call_block_and_release
;
2559 dispatch_block_flags_t flags
= 0;
2560 pthread_priority_t pp
= 0;
2561 if (slowpath(_dispatch_block_has_private_data(db
))) {
2562 func
= _dispatch_block_async_invoke_and_release
;
2563 pp
= _dispatch_block_get_priority(db
);
2564 flags
= _dispatch_block_get_flags(db
);
2565 // balanced in d_block_async_invoke_and_release or d_block_wait
2566 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(db
),
2567 dbpd_queue
, NULL
, dq
, release
)) {
2568 _dispatch_retain(dq
);
2571 _dispatch_group_async_f(dg
, dq
, _dispatch_Block_copy(db
), func
, pp
, flags
);
2576 #pragma mark dispatch_function_invoke
2578 static void _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
,
2579 dispatch_function_t func
, pthread_priority_t pp
);
2581 DISPATCH_ALWAYS_INLINE
2583 _dispatch_function_invoke(dispatch_queue_t dq
, void *ctxt
,
2584 dispatch_function_t func
)
2586 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2587 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2588 _dispatch_client_callout(ctxt
, func
);
2589 _dispatch_perfmon_workitem_inc();
2590 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2594 _dispatch_sync_recurse_invoke(void *ctxt
)
2596 dispatch_continuation_t dc
= ctxt
;
2597 _dispatch_function_invoke(dc
->dc_data
, dc
->dc_ctxt
, dc
->dc_func
);
2600 DISPATCH_ALWAYS_INLINE
2602 _dispatch_function_recurse(dispatch_queue_t dq
, void *ctxt
,
2603 dispatch_function_t func
, pthread_priority_t pp
)
2605 struct dispatch_continuation_s dc
= {
2610 _dispatch_sync_f(dq
->do_targetq
, &dc
, _dispatch_sync_recurse_invoke
, pp
);
2614 #pragma mark dispatch_barrier_sync
2616 static void _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
2617 dispatch_function_t func
);
2619 DISPATCH_ALWAYS_INLINE_NDEBUG
2620 static inline _dispatch_thread_semaphore_t
2621 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq
, dispatch_object_t dou
,
2624 _dispatch_thread_semaphore_t sema
;
2625 dispatch_continuation_t dc
= dou
._dc
;
2628 if (DISPATCH_OBJ_IS_VTABLE(dc
) || ((long)dc
->do_vtable
&
2629 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) !=
2630 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) {
2633 _dispatch_trace_continuation_pop(dq
, dc
);
2634 _dispatch_perfmon_workitem_inc();
2636 th
= (mach_port_t
)dc
->dc_data
;
2639 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
2641 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
2642 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
2643 // rdar://problem/9032024 running lock must be held until sync_f_slow
2645 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2647 _dispatch_introspection_queue_item_complete(dou
);
2648 _dispatch_wqthread_override_start(th
,
2649 _dispatch_queue_get_override_priority(dq
));
2650 return sema
? sema
: MACH_PORT_DEAD
;
2654 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
2656 dispatch_continuation_t dc
= ctxt
;
2657 dispatch_queue_t dq
= dc
->dc_data
;
2658 _dispatch_thread_semaphore_t sema
;
2659 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
2661 dispatch_assert(dq
== _dispatch_queue_get_current());
2662 #if DISPATCH_COCOA_COMPAT
2663 if (slowpath(dq
->dq_is_thread_bound
)) {
2664 // The queue is bound to a non-dispatch thread (e.g. main thread)
2665 _dispatch_continuation_voucher_adopt(dc
);
2666 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
2667 dispatch_atomic_store2o(dc
, dc_func
, NULL
, release
);
2668 _dispatch_thread_semaphore_signal(sema
); // release
2672 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
2673 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
2674 // rdar://9032024 running lock must be held until sync_f_slow returns
2675 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2676 _dispatch_thread_semaphore_signal(sema
); // release
2681 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
2682 dispatch_function_t func
, pthread_priority_t pp
)
2684 if (slowpath(!dq
->do_targetq
)) {
2685 // the global concurrent queues do not need strict ordering
2686 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2687 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
2689 if (!pp
) pp
= (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG
);
2690 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
2691 struct dispatch_continuation_s dc
= {
2693 #if DISPATCH_COCOA_COMPAT
2697 .dc_other
= (void*)sema
,
2699 #if DISPATCH_COCOA_COMPAT
2700 // It's preferred to execute synchronous blocks on the current thread
2701 // due to thread-local side effects, garbage collection, etc. However,
2702 // blocks submitted to the main thread MUST be run on the main thread
2703 if (slowpath(dq
->dq_is_thread_bound
)) {
2704 _dispatch_continuation_voucher_set(&dc
, 0);
2707 struct dispatch_continuation_s dbss
= {
2708 .do_vtable
= (void *)(DISPATCH_OBJ_BARRIER_BIT
|
2709 DISPATCH_OBJ_SYNC_SLOW_BIT
),
2710 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
2712 .dc_data
= (void*)(uintptr_t)_dispatch_thread_port(),
2715 _dispatch_queue_push(dq
, &dbss
,
2716 _dispatch_continuation_get_override_priority(dq
, &dbss
));
2718 _dispatch_thread_semaphore_wait(sema
); // acquire
2719 _dispatch_put_thread_semaphore(sema
);
2721 #if DISPATCH_COCOA_COMPAT
2722 // Queue bound to a non-dispatch thread
2723 if (dc
.dc_func
== NULL
) {
2728 _dispatch_queue_set_thread(dq
);
2729 if (slowpath(dq
->do_targetq
->do_targetq
)) {
2730 _dispatch_function_recurse(dq
, ctxt
, func
, pp
);
2732 _dispatch_function_invoke(dq
, ctxt
, func
);
2734 _dispatch_queue_clear_thread(dq
);
2736 if (fastpath(dq
->do_suspend_cnt
< 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL
) &&
2737 dq
->dq_running
== 2) {
2738 // rdar://problem/8290662 "lock transfer"
2739 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
2741 _dispatch_thread_semaphore_signal(sema
); // release
2745 (void)dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
2746 DISPATCH_OBJECT_SUSPEND_INTERVAL
, release
);
2747 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
2748 _dispatch_queue_wakeup(dq
);
2754 _dispatch_barrier_sync_f2(dispatch_queue_t dq
)
2756 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
2757 // rdar://problem/8290662 "lock transfer"
2758 _dispatch_thread_semaphore_t sema
;
2759 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
2761 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
2762 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
2763 // rdar://9032024 running lock must be held until sync_f_slow
2764 // returns: increment by 2 and decrement by 1
2765 (void)dispatch_atomic_inc2o(dq
, dq_running
, relaxed
);
2766 _dispatch_thread_semaphore_signal(sema
);
2770 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
2771 _dispatch_queue_wakeup(dq
);
2777 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
2778 dispatch_function_t func
)
2780 _dispatch_queue_set_thread(dq
);
2781 _dispatch_function_invoke(dq
, ctxt
, func
);
2782 _dispatch_queue_clear_thread(dq
);
2783 if (slowpath(dq
->dq_items_tail
)) {
2784 return _dispatch_barrier_sync_f2(dq
);
2786 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
2787 _dispatch_queue_wakeup(dq
);
2793 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
2794 dispatch_function_t func
, pthread_priority_t pp
)
2796 _dispatch_queue_set_thread(dq
);
2797 _dispatch_function_recurse(dq
, ctxt
, func
, pp
);
2798 _dispatch_queue_clear_thread(dq
);
2799 if (slowpath(dq
->dq_items_tail
)) {
2800 return _dispatch_barrier_sync_f2(dq
);
2802 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
2803 _dispatch_queue_wakeup(dq
);
2809 _dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
2810 dispatch_function_t func
, pthread_priority_t pp
)
2812 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2813 // 2) the queue is not suspended
2814 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
2815 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, pp
);
2817 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1, acquire
))) {
2818 // global concurrent queues and queues bound to non-dispatch threads
2819 // always fall into the slow case
2820 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, pp
);
2822 if (slowpath(dq
->do_targetq
->do_targetq
)) {
2823 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
, pp
);
2825 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
2830 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
2831 dispatch_function_t func
)
2833 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2834 // 2) the queue is not suspended
2835 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
2836 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, 0);
2838 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1, acquire
))) {
2839 // global concurrent queues and queues bound to non-dispatch threads
2840 // always fall into the slow case
2841 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
, 0);
2843 if (slowpath(dq
->do_targetq
->do_targetq
)) {
2844 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
, 0);
2846 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
2852 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
2854 bool has_pd
= _dispatch_block_has_private_data(work
);
2855 dispatch_function_t func
= _dispatch_Block_invoke(work
);
2856 pthread_priority_t pp
= 0;
2858 func
= _dispatch_block_sync_invoke
;
2859 pp
= _dispatch_block_get_priority(work
);
2860 dispatch_block_flags_t flags
= _dispatch_block_get_flags(work
);
2861 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
2862 pthread_priority_t tp
= _dispatch_get_priority();
2864 pp
= tp
| _PTHREAD_PRIORITY_ENFORCE_FLAG
;
2865 } else if (!(flags
& DISPATCH_BLOCK_INHERIT_QOS_CLASS
)) {
2866 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
2869 // balanced in d_block_sync_invoke or d_block_wait
2870 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
2871 dbpd_queue
, NULL
, dq
, release
)) {
2872 _dispatch_retain(dq
);
2874 #if DISPATCH_COCOA_COMPAT
2875 } else if (dq
->dq_is_thread_bound
&& dispatch_begin_thread_4GC
) {
2876 // Blocks submitted to the main queue MUST be run on the main thread,
2877 // under GC we must Block_copy in order to notify the thread-local
2878 // garbage collector that the objects are transferring to another thread
2879 // rdar://problem/7176237&7181849&7458685
2880 work
= _dispatch_Block_copy(work
);
2881 func
= _dispatch_call_block_and_release
;
2884 _dispatch_barrier_sync_f(dq
, work
, func
, pp
);
2888 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
2890 if (slowpath(dq
->dq_is_thread_bound
) ||
2891 slowpath(_dispatch_block_has_private_data(work
))) {
2892 return _dispatch_barrier_sync_slow(dq
, work
);
2894 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
2900 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
2901 dispatch_function_t func
)
2903 _dispatch_queue_set_thread(dq
);
2904 _dispatch_function_invoke(dq
, ctxt
, func
);
2905 _dispatch_queue_clear_thread(dq
);
2906 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
2907 _dispatch_queue_wakeup(dq
);
2913 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
2914 dispatch_function_t func
)
2916 // Use for mutation of queue-/source-internal state only, ignores target
2918 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
2919 || slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1,
2921 return _dispatch_barrier_async_detached_f(dq
, ctxt
, func
);
2923 _dispatch_barrier_trysync_f_invoke(dq
, ctxt
, func
);
2927 #pragma mark dispatch_sync
2931 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
2932 pthread_priority_t pp
, bool wakeup
)
2934 if (!pp
) pp
= (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG
);
2935 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
2936 struct dispatch_continuation_s dc
= {
2937 .do_vtable
= (void*)DISPATCH_OBJ_SYNC_SLOW_BIT
,
2938 #if DISPATCH_INTROSPECTION
2941 .dc_data
= (void*)(uintptr_t)_dispatch_thread_port(),
2943 .dc_other
= (void*)sema
,
2946 _dispatch_queue_push_wakeup(dq
, &dc
,
2947 _dispatch_continuation_get_override_priority(dq
, &dc
), wakeup
);
2949 _dispatch_thread_semaphore_wait(sema
);
2950 _dispatch_put_thread_semaphore(sema
);
2952 if (slowpath(dq
->do_targetq
->do_targetq
)) {
2953 _dispatch_function_recurse(dq
, ctxt
, func
, pp
);
2955 _dispatch_function_invoke(dq
, ctxt
, func
);
2958 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
2959 _dispatch_queue_wakeup(dq
);
2965 _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
2966 dispatch_function_t func
)
2968 _dispatch_function_invoke(dq
, ctxt
, func
);
2969 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
2970 _dispatch_queue_wakeup(dq
);
2976 _dispatch_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
2977 dispatch_function_t func
, pthread_priority_t pp
)
2979 _dispatch_function_recurse(dq
, ctxt
, func
, pp
);
2980 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
2981 _dispatch_queue_wakeup(dq
);
2986 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
2987 pthread_priority_t pp
)
2989 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2990 // 2) the queue is not suspended
2991 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
2992 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
, false);
2994 uint32_t running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2995 // re-check suspension after barrier check <rdar://problem/15242126>
2996 if (slowpath(running
& 1) || _dispatch_object_suspended(dq
)) {
2997 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
2998 return _dispatch_sync_f_slow(dq
, ctxt
, func
, pp
, running
== 0);
3000 if (slowpath(dq
->do_targetq
->do_targetq
)) {
3001 return _dispatch_sync_f_recurse(dq
, ctxt
, func
, pp
);
3003 _dispatch_sync_f_invoke(dq
, ctxt
, func
);
3008 _dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
3009 pthread_priority_t pp
)
3011 if (fastpath(dq
->dq_width
== 1)) {
3012 return _dispatch_barrier_sync_f(dq
, ctxt
, func
, pp
);
3014 if (slowpath(!dq
->do_targetq
)) {
3015 // the global concurrent queues do not need strict ordering
3016 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
3017 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
3019 _dispatch_sync_f2(dq
, ctxt
, func
, pp
);
3024 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
3026 if (fastpath(dq
->dq_width
== 1)) {
3027 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
3029 if (slowpath(!dq
->do_targetq
)) {
3030 // the global concurrent queues do not need strict ordering
3031 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
3032 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
3034 _dispatch_sync_f2(dq
, ctxt
, func
, 0);
3040 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
3042 bool has_pd
= _dispatch_block_has_private_data(work
);
3043 if (has_pd
&& (_dispatch_block_get_flags(work
) & DISPATCH_BLOCK_BARRIER
)) {
3044 return _dispatch_barrier_sync_slow(dq
, work
);
3046 dispatch_function_t func
= _dispatch_Block_invoke(work
);
3047 pthread_priority_t pp
= 0;
3049 func
= _dispatch_block_sync_invoke
;
3050 pp
= _dispatch_block_get_priority(work
);
3051 dispatch_block_flags_t flags
= _dispatch_block_get_flags(work
);
3052 if (flags
& DISPATCH_BLOCK_HAS_PRIORITY
) {
3053 pthread_priority_t tp
= _dispatch_get_priority();
3055 pp
= tp
| _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3056 } else if (!(flags
& DISPATCH_BLOCK_INHERIT_QOS_CLASS
)) {
3057 pp
|= _PTHREAD_PRIORITY_ENFORCE_FLAG
;
3060 // balanced in d_block_sync_invoke or d_block_wait
3061 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work
),
3062 dbpd_queue
, NULL
, dq
, release
)) {
3063 _dispatch_retain(dq
);
3065 #if DISPATCH_COCOA_COMPAT
3066 } else if (dq
->dq_is_thread_bound
&& dispatch_begin_thread_4GC
) {
3067 // Blocks submitted to the main queue MUST be run on the main thread,
3068 // under GC we must Block_copy in order to notify the thread-local
3069 // garbage collector that the objects are transferring to another thread
3070 // rdar://problem/7176237&7181849&7458685
3071 work
= _dispatch_Block_copy(work
);
3072 func
= _dispatch_call_block_and_release
;
3075 if (slowpath(!dq
->do_targetq
)) {
3076 // the global concurrent queues do not need strict ordering
3077 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
3078 return _dispatch_sync_f_invoke(dq
, work
, func
);
3080 _dispatch_sync_f2(dq
, work
, func
, pp
);
3084 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
3086 if (fastpath(dq
->dq_width
== 1)) {
3087 return dispatch_barrier_sync(dq
, work
);
3089 if (slowpath(dq
->dq_is_thread_bound
) ||
3090 slowpath(_dispatch_block_has_private_data(work
)) ) {
3091 return _dispatch_sync_slow(dq
, work
);
3093 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
3098 #pragma mark dispatch_after
3101 _dispatch_after_timer_callback(void *ctxt
)
3103 dispatch_continuation_t dc
= ctxt
, dc1
;
3104 dispatch_source_t ds
= dc
->dc_data
;
3105 dc1
= _dispatch_continuation_free_cacheonly(dc
);
3106 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
3107 dispatch_source_cancel(ds
);
3108 dispatch_release(ds
);
3109 if (slowpath(dc1
)) {
3110 _dispatch_continuation_free_to_cache_limit(dc1
);
3116 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
3117 dispatch_function_t func
)
3119 uint64_t delta
, leeway
;
3120 dispatch_source_t ds
;
3122 if (when
== DISPATCH_TIME_FOREVER
) {
3124 DISPATCH_CLIENT_CRASH(
3125 "dispatch_after_f() called with 'when' == infinity");
3130 delta
= _dispatch_timeout(when
);
3132 return dispatch_async_f(queue
, ctxt
, func
);
3134 leeway
= delta
/ 10; // <rdar://problem/13447496>
3135 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
3136 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
3138 // this function can and should be optimized to not use a dispatch source
3139 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, queue
);
3140 dispatch_assert(ds
);
3142 // TODO: don't use a separate continuation & voucher
3143 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3144 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
);
3149 dispatch_set_context(ds
, dc
);
3150 dispatch_source_set_event_handler_f(ds
, _dispatch_after_timer_callback
);
3151 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, leeway
);
3152 dispatch_resume(ds
);
3157 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
3158 dispatch_block_t work
)
3160 // test before the copy of the block
3161 if (when
== DISPATCH_TIME_FOREVER
) {
3163 DISPATCH_CLIENT_CRASH(
3164 "dispatch_after() called with 'when' == infinity");
3168 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
),
3169 _dispatch_call_block_and_release
);
3174 #pragma mark dispatch_queue_push
3176 DISPATCH_ALWAYS_INLINE
3178 _dispatch_queue_push_list_slow2(dispatch_queue_t dq
, pthread_priority_t pp
,
3179 struct dispatch_object_s
*obj
, bool retained
)
3181 // The queue must be retained before dq_items_head is written in order
3182 // to ensure that the reference is still valid when _dispatch_wakeup is
3183 // called. Otherwise, if preempted between the assignment to
3184 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
3185 // queue may release the last reference to the queue when invoked by
3186 // _dispatch_queue_drain. <rdar://problem/6932776>
3187 if (!retained
) _dispatch_retain(dq
);
3188 dq
->dq_items_head
= obj
;
3189 return _dispatch_queue_wakeup_with_qos_and_release(dq
, pp
);
3194 _dispatch_queue_push_list_slow(dispatch_queue_t dq
, pthread_priority_t pp
,
3195 struct dispatch_object_s
*obj
, unsigned int n
, bool retained
)
3197 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
3198 dispatch_assert(!retained
);
3199 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
3200 return _dispatch_queue_wakeup_global2(dq
, n
);
3202 _dispatch_queue_push_list_slow2(dq
, pp
, obj
, retained
);
3207 _dispatch_queue_push_slow(dispatch_queue_t dq
, pthread_priority_t pp
,
3208 struct dispatch_object_s
*obj
, bool retained
)
3210 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
3211 dispatch_assert(!retained
);
3212 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
3213 return _dispatch_queue_wakeup_global(dq
);
3215 _dispatch_queue_push_list_slow2(dq
, pp
, obj
, retained
);
3219 #pragma mark dispatch_queue_probe
3222 _dispatch_queue_probe(dispatch_queue_t dq
)
3224 return _dispatch_queue_class_probe(dq
);
3227 #if DISPATCH_COCOA_COMPAT
3229 _dispatch_runloop_queue_probe(dispatch_queue_t dq
)
3231 if (_dispatch_queue_class_probe(dq
)) {
3232 if (dq
->do_xref_cnt
== -1) return true; // <rdar://problem/14026816>
3233 return _dispatch_runloop_queue_wakeup(dq
);
3240 _dispatch_mgr_queue_probe(dispatch_queue_t dq
)
3242 if (_dispatch_queue_class_probe(dq
)) {
3243 return _dispatch_mgr_wakeup(dq
);
3249 _dispatch_root_queue_probe(dispatch_queue_t dq
)
3251 _dispatch_queue_wakeup_global(dq
);
3256 #pragma mark dispatch_wakeup
3258 // 6618342 Contact the team that owns the Instrument DTrace probe before
3259 // renaming this symbol
3261 _dispatch_wakeup(dispatch_object_t dou
)
3263 unsigned long type
= dx_metatype(dou
._do
);
3264 if (type
== _DISPATCH_QUEUE_TYPE
|| type
== _DISPATCH_SOURCE_TYPE
) {
3265 return _dispatch_queue_wakeup(dou
._dq
);
3267 if (_dispatch_object_suspended(dou
)) {
3270 if (!dx_probe(dou
._do
)) {
3273 if (!dispatch_atomic_cmpxchg2o(dou
._do
, do_suspend_cnt
, 0,
3274 DISPATCH_OBJECT_SUSPEND_LOCK
, acquire
)) {
3277 _dispatch_retain(dou
._do
);
3278 dispatch_queue_t tq
= dou
._do
->do_targetq
;
3279 _dispatch_queue_push(tq
, dou
._do
, 0);
3280 return tq
; // libdispatch does not need this, but the Instrument DTrace
3284 #if DISPATCH_COCOA_COMPAT
3286 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq
)
3288 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
3292 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
3294 case MACH_SEND_TIMEOUT
:
3295 case MACH_SEND_TIMED_OUT
:
3296 case MACH_SEND_INVALID_DEST
:
3299 (void)dispatch_assume_zero(kr
);
3304 DISPATCH_NOINLINE DISPATCH_WEAK
3306 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
)
3308 _dispatch_runloop_queue_wakeup_thread(dq
);
3313 static dispatch_queue_t
3314 _dispatch_main_queue_wakeup(void)
3316 dispatch_queue_t dq
= &_dispatch_main_q
;
3317 if (!dq
->dq_is_thread_bound
) {
3320 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
3321 _dispatch_runloop_queue_port_init
);
3322 _dispatch_runloop_queue_wakeup_thread(dq
);
3329 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq
, unsigned int n
)
3331 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
3335 _dispatch_debug_root_queue(dq
, __func__
);
3336 dispatch_once_f(&_dispatch_root_queues_pred
, NULL
,
3337 _dispatch_root_queues_init
);
3339 #if HAVE_PTHREAD_WORKQUEUES
3340 #if DISPATCH_USE_PTHREAD_POOL
3341 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
3344 _dispatch_root_queue_debug("requesting new worker thread for global "
3346 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3347 if (qc
->dgq_kworkqueue
) {
3348 pthread_workitem_handle_t wh
;
3349 unsigned int gen_cnt
;
3351 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
3352 _dispatch_worker_thread4
, dq
, &wh
, &gen_cnt
);
3353 (void)dispatch_assume_zero(r
);
3357 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3358 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
3359 if (!dq
->dq_priority
) {
3360 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
3361 qc
->dgq_wq_options
, (int)i
);
3362 (void)dispatch_assume_zero(r
);
3366 #if HAVE_PTHREAD_WORKQUEUE_QOS
3367 r
= _pthread_workqueue_addthreads((int)i
, dq
->dq_priority
);
3368 (void)dispatch_assume_zero(r
);
3372 #endif // HAVE_PTHREAD_WORKQUEUES
3373 #if DISPATCH_USE_PTHREAD_POOL
3374 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
3375 if (fastpath(pqc
->dpq_thread_mediator
.do_vtable
)) {
3376 while (dispatch_semaphore_signal(&pqc
->dpq_thread_mediator
)) {
3382 uint32_t j
, t_count
;
3383 // seq_cst with atomic store to tail <rdar://problem/16932833>
3384 t_count
= dispatch_atomic_load2o(qc
, dgq_thread_pool_size
, seq_cst
);
3387 _dispatch_root_queue_debug("pthread pool is full for root queue: "
3391 j
= i
> t_count
? t_count
: i
;
3392 } while (!dispatch_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
3393 t_count
- j
, &t_count
, acquire
));
3395 pthread_attr_t
*attr
= &pqc
->dpq_thread_attr
;
3396 pthread_t tid
, *pthr
= &tid
;
3397 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
3398 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
3399 pthr
= _dispatch_mgr_root_queue_init();
3403 _dispatch_retain(dq
);
3404 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
3406 (void)dispatch_assume_zero(r
);
3408 _dispatch_temporary_resource_shortage();
3411 #endif // DISPATCH_USE_PTHREAD_POOL
3415 _dispatch_queue_wakeup_global2(dispatch_queue_t dq
, unsigned int n
)
3417 if (!_dispatch_queue_class_probe(dq
)) {
3420 #if HAVE_PTHREAD_WORKQUEUES
3421 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
3423 #if DISPATCH_USE_PTHREAD_POOL
3424 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
3426 !dispatch_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
3427 _dispatch_root_queue_debug("worker thread request still pending for "
3428 "global queue: %p", dq
);
3431 #endif // HAVE_PTHREAD_WORKQUEUES
3432 return _dispatch_queue_wakeup_global_slow(dq
, n
);
3436 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
3438 return _dispatch_queue_wakeup_global2(dq
, 1);
3442 #pragma mark dispatch_queue_invoke
3444 DISPATCH_ALWAYS_INLINE
3445 static inline dispatch_queue_t
3446 dispatch_queue_invoke2(dispatch_object_t dou
,
3447 _dispatch_thread_semaphore_t
*sema_ptr
)
3449 dispatch_queue_t dq
= dou
._dq
;
3450 dispatch_queue_t otq
= dq
->do_targetq
;
3451 dispatch_queue_t cq
= _dispatch_queue_get_current();
3453 if (slowpath(cq
!= otq
)) {
3457 *sema_ptr
= _dispatch_queue_drain(dq
);
3459 if (slowpath(otq
!= dq
->do_targetq
)) {
3460 // An item on the queue changed the target queue
3461 return dq
->do_targetq
;
3466 // 6618342 Contact the team that owns the Instrument DTrace probe before
3467 // renaming this symbol
3470 _dispatch_queue_invoke(dispatch_queue_t dq
)
3472 _dispatch_queue_class_invoke(dq
, dispatch_queue_invoke2
);
3476 #pragma mark dispatch_queue_drain
3478 DISPATCH_ALWAYS_INLINE
3479 static inline struct dispatch_object_s
*
3480 _dispatch_queue_head(dispatch_queue_t dq
)
3482 struct dispatch_object_s
*dc
;
3483 _dispatch_wait_until(dc
= fastpath(dq
->dq_items_head
));
3487 DISPATCH_ALWAYS_INLINE
3488 static inline struct dispatch_object_s
*
3489 _dispatch_queue_next(dispatch_queue_t dq
, struct dispatch_object_s
*dc
)
3491 struct dispatch_object_s
*next_dc
;
3492 next_dc
= fastpath(dc
->do_next
);
3493 dq
->dq_items_head
= next_dc
;
3494 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
,
3496 _dispatch_wait_until(next_dc
= fastpath(dc
->do_next
));
3497 dq
->dq_items_head
= next_dc
;
3502 _dispatch_thread_semaphore_t
3503 _dispatch_queue_drain(dispatch_object_t dou
)
3505 dispatch_queue_t dq
= dou
._dq
, orig_tq
, old_dq
;
3506 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
3507 struct dispatch_object_s
*dc
, *next_dc
;
3508 _dispatch_thread_semaphore_t sema
= 0;
3510 // Continue draining sources after target queue change rdar://8928171
3511 bool check_tq
= (dx_type(dq
) != DISPATCH_SOURCE_KEVENT_TYPE
);
3513 orig_tq
= dq
->do_targetq
;
3515 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
3516 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(dq
->dq_priority
);
3518 pthread_priority_t op
= _dispatch_queue_get_override_priority(dq
);
3519 pthread_priority_t dp
= _dispatch_get_defaultpriority();
3520 dp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
3522 _dispatch_wqthread_override_start(dq
->dq_thread
, op
);
3525 //dispatch_debug_queue(dq, __func__);
3527 while (dq
->dq_items_tail
) {
3528 dc
= _dispatch_queue_head(dq
);
3530 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
3533 if (dq
->dq_running
> dq
->dq_width
) {
3536 if (slowpath(orig_tq
!= dq
->do_targetq
) && check_tq
) {
3539 bool redirect
= false;
3540 if (!fastpath(dq
->dq_width
== 1)) {
3541 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
3542 (long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
3543 if (dq
->dq_running
> 1) {
3550 next_dc
= _dispatch_queue_next(dq
, dc
);
3552 _dispatch_continuation_redirect(dq
, dc
);
3555 if ((sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, true))) {
3558 _dispatch_continuation_pop(dc
);
3559 _dispatch_perfmon_workitem_inc();
3560 } while ((dc
= next_dc
));
3564 _dispatch_reset_defaultpriority(old_dp
);
3565 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
3569 #if DISPATCH_COCOA_COMPAT
3571 _dispatch_main_queue_drain(void)
3573 dispatch_queue_t dq
= &_dispatch_main_q
;
3574 if (!dq
->dq_items_tail
) {
3577 struct dispatch_continuation_s marker
= {
3580 struct dispatch_object_s
*dmarker
= (void*)&marker
;
3581 _dispatch_queue_push_notrace(dq
, dmarker
, 0);
3583 _dispatch_perfmon_start();
3584 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
3585 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
3586 pthread_priority_t old_pri
= _dispatch_get_priority();
3587 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
);
3588 voucher_t voucher
= _voucher_copy();
3590 struct dispatch_object_s
*dc
, *next_dc
;
3591 dc
= _dispatch_queue_head(dq
);
3593 next_dc
= _dispatch_queue_next(dq
, dc
);
3594 if (dc
== dmarker
) {
3597 _dispatch_continuation_pop(dc
);
3598 _dispatch_perfmon_workitem_inc();
3599 } while ((dc
= next_dc
));
3600 DISPATCH_CRASH("Main queue corruption");
3604 _dispatch_main_queue_wakeup();
3606 _dispatch_voucher_debug("main queue restore", voucher
);
3607 _dispatch_set_priority_and_replace_voucher(old_pri
, voucher
);
3608 _dispatch_queue_reset_override_priority(dq
);
3609 _dispatch_reset_defaultpriority(old_dp
);
3610 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
3611 _dispatch_perfmon_end();
3612 _dispatch_force_cache_cleanup();
3616 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
3618 if (!dq
->dq_items_tail
) {
3621 _dispatch_perfmon_start();
3622 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
3623 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
3624 pthread_priority_t old_pri
= _dispatch_get_priority();
3625 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(old_pri
);
3626 voucher_t voucher
= _voucher_copy();
3628 struct dispatch_object_s
*dc
, *next_dc
;
3629 dc
= _dispatch_queue_head(dq
);
3630 next_dc
= _dispatch_queue_next(dq
, dc
);
3631 _dispatch_continuation_pop(dc
);
3632 _dispatch_perfmon_workitem_inc();
3634 _dispatch_voucher_debug("runloop queue restore", voucher
);
3635 _dispatch_set_priority_and_replace_voucher(old_pri
, voucher
);
3636 _dispatch_reset_defaultpriority(old_dp
);
3637 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
3638 _dispatch_perfmon_end();
3639 _dispatch_force_cache_cleanup();
3644 DISPATCH_ALWAYS_INLINE_NDEBUG
3645 static inline _dispatch_thread_semaphore_t
3646 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
)
3648 // rdar://problem/8290662 "lock transfer"
3649 struct dispatch_object_s
*dc
;
3650 _dispatch_thread_semaphore_t sema
;
3652 // queue is locked, or suspended and not being drained
3653 dc
= dq
->dq_items_head
;
3654 if (slowpath(!dc
) || !(sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, false))){
3657 // dequeue dc, it is a barrier sync
3658 (void)_dispatch_queue_next(dq
, dc
);
3663 _dispatch_mgr_queue_drain(void)
3665 dispatch_queue_t dq
= &_dispatch_mgr_q
;
3666 if (!dq
->dq_items_tail
) {
3667 return _dispatch_force_cache_cleanup();
3669 _dispatch_perfmon_start();
3670 if (slowpath(_dispatch_queue_drain(dq
))) {
3671 DISPATCH_CRASH("Sync onto manager queue");
3673 _dispatch_voucher_debug("mgr queue clear", NULL
);
3675 _dispatch_queue_reset_override_priority(dq
);
3676 _dispatch_reset_defaultpriority_override();
3677 _dispatch_perfmon_end();
3678 _dispatch_force_cache_cleanup();
3682 #pragma mark _dispatch_queue_wakeup_with_qos
3685 static dispatch_queue_t
3686 _dispatch_queue_wakeup_with_qos_slow(dispatch_queue_t dq
, pthread_priority_t pp
,
3689 if (!dx_probe(dq
) && (dq
->dq_is_thread_bound
|| !dq
->dq_thread
)) {
3690 if (retained
) _dispatch_release(dq
);
3693 pp
&= _PTHREAD_PRIORITY_QOS_CLASS_MASK
;
3694 bool override
= _dispatch_queue_override_priority(dq
, pp
);
3695 if (override
&& dq
->dq_running
> 1) {
3699 if (!dispatch_atomic_cmpxchg2o(dq
, do_suspend_cnt
, 0,
3700 DISPATCH_OBJECT_SUSPEND_LOCK
, acquire
)) {
3701 #if DISPATCH_COCOA_COMPAT
3702 if (dq
== &_dispatch_main_q
&& dq
->dq_is_thread_bound
) {
3703 return _dispatch_main_queue_wakeup();
3708 // <rdar://problem/17735825> to traverse the tq chain safely we must
3709 // lock it to ensure it cannot change, unless the queue is running
3710 // and we can just override the thread itself
3711 if (dq
->dq_thread
) {
3712 _dispatch_wqthread_override_start(dq
->dq_thread
, pp
);
3713 } else if (!dispatch_atomic_cmpxchgv2o(dq
, dq_tqthread
,
3714 MACH_PORT_NULL
, _dispatch_thread_port(), &th
, acquire
)) {
3715 // already locked, override the owner, trysync will do a queue
3716 // wakeup when it returns.
3717 _dispatch_wqthread_override_start(th
, pp
);
3719 dispatch_queue_t tq
= dq
->do_targetq
;
3720 if (_dispatch_queue_prepare_override(dq
, tq
, pp
)) {
3721 _dispatch_queue_push_override(dq
, tq
, pp
);
3723 _dispatch_queue_wakeup_with_qos(tq
, pp
);
3725 dispatch_atomic_store2o(dq
, dq_tqthread
, MACH_PORT_NULL
,
3729 if (retained
) _dispatch_release(dq
);
3732 dispatch_queue_t tq
= dq
->do_targetq
;
3733 if (!retained
) _dispatch_retain(dq
);
3735 override
= _dispatch_queue_prepare_override(dq
, tq
, pp
);
3737 _dispatch_queue_push(tq
, dq
, pp
);
3739 _dispatch_queue_push_override(dq
, tq
, pp
);
3741 return tq
; // libdispatch does not need this, but the Instrument DTrace
3745 DISPATCH_ALWAYS_INLINE
3746 static inline dispatch_queue_t
3747 _dispatch_queue_wakeup_with_qos2(dispatch_queue_t dq
, pthread_priority_t pp
,
3750 if (_dispatch_object_suspended(dq
)) {
3751 _dispatch_queue_override_priority(dq
, pp
);
3752 if (retained
) _dispatch_release(dq
);
3755 return _dispatch_queue_wakeup_with_qos_slow(dq
, pp
, retained
);
3760 _dispatch_queue_wakeup_with_qos_and_release(dispatch_queue_t dq
,
3761 pthread_priority_t pp
)
3763 (void)_dispatch_queue_wakeup_with_qos2(dq
, pp
, true);
3768 _dispatch_queue_wakeup_with_qos(dispatch_queue_t dq
, pthread_priority_t pp
)
3770 (void)_dispatch_queue_wakeup_with_qos2(dq
, pp
, false);
3775 _dispatch_queue_wakeup(dispatch_queue_t dq
)
3777 return _dispatch_queue_wakeup_with_qos2(dq
,
3778 _dispatch_queue_get_override_priority(dq
), false);
3781 #if HAVE_PTHREAD_WORKQUEUE_QOS
3783 _dispatch_queue_override_invoke(void *ctxt
)
3785 dispatch_continuation_t dc
= (dispatch_continuation_t
)ctxt
;
3786 dispatch_queue_t dq
= dc
->dc_data
;
3787 pthread_priority_t p
= 0;
3789 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) &&
3790 fastpath(dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1, acquire
))) {
3791 _dispatch_queue_set_thread(dq
);
3793 _dispatch_object_debug(dq
, "stolen onto thread 0x%x, 0x%lx",
3794 dq
->dq_thread
, _dispatch_get_defaultpriority());
3796 pthread_priority_t old_dp
= _dispatch_get_defaultpriority();
3797 _dispatch_reset_defaultpriority(dc
->dc_priority
);
3799 dispatch_queue_t tq
= NULL
;
3800 _dispatch_thread_semaphore_t sema
= 0;
3801 tq
= dispatch_queue_invoke2(dq
, &sema
);
3803 _dispatch_queue_clear_thread(dq
);
3804 _dispatch_reset_defaultpriority(old_dp
);
3806 uint32_t running
= dispatch_atomic_dec2o(dq
, dq_running
, release
);
3808 _dispatch_thread_semaphore_signal(sema
);
3809 } else if (!tq
&& running
== 0) {
3810 p
= _dispatch_queue_reset_override_priority(dq
);
3811 if (p
> (dq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
3812 _dispatch_wqthread_override_reset();
3815 _dispatch_introspection_queue_item_complete(dq
);
3817 return _dispatch_queue_wakeup_with_qos_and_release(dq
, p
);
3820 mach_port_t th
= dq
->dq_thread
;
3822 p
= _dispatch_queue_get_override_priority(dq
);
3823 _dispatch_object_debug(dq
, "overriding thr 0x%x to priority 0x%lx",
3825 _dispatch_wqthread_override_start(th
, p
);
3828 _dispatch_release(dq
); // added when we pushed the override block
3833 _dispatch_queue_prepare_override(dispatch_queue_t dq
, dispatch_queue_t tq
,
3834 pthread_priority_t p
)
3836 #if HAVE_PTHREAD_WORKQUEUE_QOS
3837 if (dx_type(tq
) != DISPATCH_QUEUE_ROOT_TYPE
|| !tq
->dq_priority
) {
3840 if (p
<= (dq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
3843 if (p
<= (tq
->dq_priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
)) {
3846 _dispatch_retain(dq
);
3849 (void)dq
; (void)tq
; (void)p
;
3855 _dispatch_queue_push_override(dispatch_queue_t dq
, dispatch_queue_t tq
,
3856 pthread_priority_t p
)
3858 #if HAVE_PTHREAD_WORKQUEUE_QOS
3859 unsigned int qosbit
, idx
, overcommit
;
3860 overcommit
= (tq
->dq_priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) ? 1 : 0;
3861 qosbit
= (p
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
) >>
3862 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT
;
3863 idx
= (unsigned int)__builtin_ffs((int)qosbit
);
3864 if (!idx
|| idx
> DISPATCH_QUEUE_QOS_COUNT
) {
3865 DISPATCH_CRASH("Corrupted override priority");
3867 dispatch_queue_t rq
= &_dispatch_root_queues
[((idx
-1) << 1) | overcommit
];
3869 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3870 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
3871 dc
->dc_func
= _dispatch_queue_override_invoke
;
3873 dc
->dc_priority
= tq
->dq_priority
;
3874 dc
->dc_voucher
= NULL
;
3876 // dq retained by _dispatch_queue_prepare_override
3878 _dispatch_queue_push(rq
, dc
, 0);
3880 (void)dq
; (void)tq
; (void)p
;
3885 #pragma mark dispatch_root_queue_drain
3889 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq
)
3891 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
3892 struct dispatch_object_s
*const mediator
= (void *)~0ul;
3893 bool pending
= false, available
= true;
3894 unsigned int sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
3897 // Spin for a short while in case the contention is temporary -- e.g.
3898 // when starting up after dispatch_apply, or when executing a few
3899 // short continuations in a row.
3900 if (_dispatch_contention_wait_until(dq
->dq_items_head
!= mediator
)) {
3903 // Since we have serious contention, we need to back off.
3905 // Mark this queue as pending to avoid requests for further threads
3906 (void)dispatch_atomic_inc2o(qc
, dgq_pending
, relaxed
);
3909 _dispatch_contention_usleep(sleep_time
);
3910 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
3912 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
3914 // The ratio of work to libdispatch overhead must be bad. This
3915 // scenario implies that there are too many threads in the pool.
3916 // Create a new pending thread and then exit this thread.
3917 // The kernel will grant a new thread when the load subsides.
3918 _dispatch_debug("contention on global queue: %p", dq
);
3922 (void)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
3925 _dispatch_queue_wakeup_global(dq
);
3930 DISPATCH_ALWAYS_INLINE
3932 _dispatch_queue_concurrent_drain_one2(dispatch_queue_t dq
)
3934 // Wait for queue head and tail to be both non-empty or both empty
3935 bool available
; // <rdar://problem/15917893>
3936 _dispatch_wait_until((dq
->dq_items_head
!= NULL
) ==
3937 (available
= (dq
->dq_items_tail
!= NULL
)));
3941 DISPATCH_ALWAYS_INLINE_NDEBUG
3942 static inline struct dispatch_object_s
*
3943 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
3945 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
3948 // The mediator value acts both as a "lock" and a signal
3949 head
= dispatch_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
3951 if (slowpath(head
== NULL
)) {
3952 // The first xchg on the tail will tell the enqueueing thread that it
3953 // is safe to blindly write out to the head pointer. A cmpxchg honors
3955 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
,
3959 if (slowpath(dq
->dq_items_tail
) && // <rdar://problem/14416349>
3960 _dispatch_queue_concurrent_drain_one2(dq
)) {
3963 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
3967 if (slowpath(head
== mediator
)) {
3968 // This thread lost the race for ownership of the queue.
3969 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq
))) {
3975 // Restore the head pointer to a sane value before returning.
3976 // If 'next' is NULL, then this item _might_ be the last item.
3977 next
= fastpath(head
->do_next
);
3979 if (slowpath(!next
)) {
3980 dispatch_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
3982 if (dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, relaxed
)) {
3983 // both head and tail are NULL now
3986 // There must be a next item now.
3987 _dispatch_wait_until(next
= head
->do_next
);
3990 dispatch_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
3991 _dispatch_queue_wakeup_global(dq
);
3997 _dispatch_root_queue_drain(dispatch_queue_t dq
)
4000 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
4001 DISPATCH_CRASH("Premature thread recycling");
4004 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
4005 pthread_priority_t old_pri
= _dispatch_get_priority();
4006 pthread_priority_t pri
= dq
->dq_priority
? dq
->dq_priority
: old_pri
;
4007 pthread_priority_t old_dp
= _dispatch_set_defaultpriority(pri
);
4009 #if DISPATCH_COCOA_COMPAT
4010 // ensure that high-level memory management techniques do not leak/crash
4011 if (dispatch_begin_thread_4GC
) {
4012 dispatch_begin_thread_4GC();
4014 void *pool
= _dispatch_autorelease_pool_push();
4015 #endif // DISPATCH_COCOA_COMPAT
4017 _dispatch_perfmon_start();
4018 struct dispatch_object_s
*item
;
4020 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
4021 if (reset
) _dispatch_wqthread_override_reset();
4022 _dispatch_continuation_pop(item
);
4023 reset
= _dispatch_reset_defaultpriority_override();
4025 _dispatch_voucher_debug("root queue clear", NULL
);
4026 _dispatch_set_priority_and_replace_voucher(old_pri
, NULL
);
4027 _dispatch_reset_defaultpriority(old_dp
);
4028 _dispatch_perfmon_end();
4030 #if DISPATCH_COCOA_COMPAT
4031 _dispatch_autorelease_pool_pop(pool
);
4032 if (dispatch_end_thread_4GC
) {
4033 dispatch_end_thread_4GC();
4035 #endif // DISPATCH_COCOA_COMPAT
4037 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
4041 #pragma mark dispatch_worker_thread
4043 #if HAVE_PTHREAD_WORKQUEUES
4045 _dispatch_worker_thread4(void *context
)
4047 dispatch_queue_t dq
= context
;
4048 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4050 _dispatch_introspection_thread_add();
4051 int pending
= (int)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
4052 dispatch_assert(pending
>= 0);
4053 _dispatch_root_queue_drain(dq
);
4054 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
4057 #if HAVE_PTHREAD_WORKQUEUE_QOS
4059 _dispatch_worker_thread3(pthread_priority_t priority
)
4061 // Reset priority TSD to workaround <rdar://problem/17825261>
4062 _dispatch_thread_setspecific(dispatch_priority_key
,
4063 (void*)(uintptr_t)(priority
& ~_PTHREAD_PRIORITY_FLAGS_MASK
));
4064 unsigned int overcommit
, qosbit
, idx
;
4065 overcommit
= (priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) ? 1 : 0;
4066 qosbit
= (priority
& _PTHREAD_PRIORITY_QOS_CLASS_MASK
) >>
4067 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT
;
4068 if (!_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS
].
4070 // If kernel doesn't support maintenance, bottom bit is background.
4071 // Shift to our idea of where background bit is.
4074 idx
= (unsigned int)__builtin_ffs((int)qosbit
);
4075 dispatch_assert(idx
> 0 && idx
< DISPATCH_QUEUE_QOS_COUNT
+1);
4076 dispatch_queue_t dq
= &_dispatch_root_queues
[((idx
-1) << 1) | overcommit
];
4077 return _dispatch_worker_thread4(dq
);
4079 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
4081 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4082 // 6618342 Contact the team that owns the Instrument DTrace probe before
4083 // renaming this symbol
4085 _dispatch_worker_thread2(int priority
, int options
,
4086 void *context DISPATCH_UNUSED
)
4088 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
4089 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
4090 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
4092 return _dispatch_worker_thread4(dq
);
4094 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4095 #endif // HAVE_PTHREAD_WORKQUEUES
4097 #if DISPATCH_USE_PTHREAD_POOL
4098 // 6618342 Contact the team that owns the Instrument DTrace probe before
4099 // renaming this symbol
4101 _dispatch_worker_thread(void *context
)
4103 dispatch_queue_t dq
= context
;
4104 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
4105 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
4107 if (pqc
->dpq_thread_configure
) {
4108 pqc
->dpq_thread_configure();
4113 // workaround tweaks the kernel workqueue does for us
4114 r
= sigfillset(&mask
);
4115 (void)dispatch_assume_zero(r
);
4116 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
4117 (void)dispatch_assume_zero(r
);
4118 _dispatch_introspection_thread_add();
4120 const int64_t timeout
= 5ull * NSEC_PER_SEC
;
4122 _dispatch_root_queue_drain(dq
);
4123 } while (dispatch_semaphore_wait(&pqc
->dpq_thread_mediator
,
4124 dispatch_time(0, timeout
)) == 0);
4126 (void)dispatch_atomic_inc2o(qc
, dgq_thread_pool_size
, release
);
4127 _dispatch_queue_wakeup_global(dq
);
4128 _dispatch_release(dq
);
4134 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
4138 /* Workaround: 6269619 Not all signals can be delivered on any thread */
4140 r
= sigdelset(set
, SIGILL
);
4141 (void)dispatch_assume_zero(r
);
4142 r
= sigdelset(set
, SIGTRAP
);
4143 (void)dispatch_assume_zero(r
);
4144 #if HAVE_DECL_SIGEMT
4145 r
= sigdelset(set
, SIGEMT
);
4146 (void)dispatch_assume_zero(r
);
4148 r
= sigdelset(set
, SIGFPE
);
4149 (void)dispatch_assume_zero(r
);
4150 r
= sigdelset(set
, SIGBUS
);
4151 (void)dispatch_assume_zero(r
);
4152 r
= sigdelset(set
, SIGSEGV
);
4153 (void)dispatch_assume_zero(r
);
4154 r
= sigdelset(set
, SIGSYS
);
4155 (void)dispatch_assume_zero(r
);
4156 r
= sigdelset(set
, SIGPIPE
);
4157 (void)dispatch_assume_zero(r
);
4159 return pthread_sigmask(how
, set
, oset
);
4161 #endif // DISPATCH_USE_PTHREAD_POOL
4164 #pragma mark dispatch_runloop_queue
4166 static bool _dispatch_program_is_probably_callback_driven
;
4168 #if DISPATCH_COCOA_COMPAT
4171 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
4173 dispatch_queue_t dq
;
4176 if (slowpath(flags
)) {
4179 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
4180 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
4181 _dispatch_queue_init(dq
);
4182 dq
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,true);
4183 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
4184 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
4186 dq
->dq_is_thread_bound
= 1;
4187 _dispatch_runloop_queue_port_init(dq
);
4188 _dispatch_queue_set_bound_thread(dq
);
4189 _dispatch_object_debug(dq
, "%s", __func__
);
4190 return _dispatch_introspection_queue_create(dq
);
4194 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
4196 _dispatch_object_debug(dq
, "%s", __func__
);
4197 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
4198 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
4199 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
4200 _dispatch_queue_clear_bound_thread(dq
);
4201 if (suspend_cnt
== 0) {
4202 _dispatch_queue_wakeup(dq
);
4207 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
)
4209 _dispatch_object_debug(dq
, "%s", __func__
);
4210 _dispatch_introspection_queue_dispose(dq
);
4211 _dispatch_runloop_queue_port_dispose(dq
);
4212 _dispatch_queue_destroy(dq
);
4216 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
4218 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
4219 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4221 dispatch_retain(dq
);
4222 bool r
= _dispatch_runloop_queue_drain_one(dq
);
4223 dispatch_release(dq
);
4228 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
4230 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
4231 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4233 _dispatch_runloop_queue_probe(dq
);
4237 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
4239 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
4240 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4242 return (mach_port_t
)dq
->do_ctxt
;
4246 _dispatch_runloop_queue_port_init(void *ctxt
)
4248 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
4252 _dispatch_safe_fork
= false;
4253 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
4254 DISPATCH_VERIFY_MIG(kr
);
4255 (void)dispatch_assume_zero(kr
);
4256 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
4257 MACH_MSG_TYPE_MAKE_SEND
);
4258 DISPATCH_VERIFY_MIG(kr
);
4259 (void)dispatch_assume_zero(kr
);
4260 if (dq
!= &_dispatch_main_q
) {
4261 struct mach_port_limits limits
= {
4264 kr
= mach_port_set_attributes(mach_task_self(), mp
,
4265 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
4267 DISPATCH_VERIFY_MIG(kr
);
4268 (void)dispatch_assume_zero(kr
);
4270 dq
->do_ctxt
= (void*)(uintptr_t)mp
;
4272 _dispatch_program_is_probably_callback_driven
= true;
4276 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
)
4278 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
4283 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
4284 DISPATCH_VERIFY_MIG(kr
);
4285 (void)dispatch_assume_zero(kr
);
4286 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
4287 DISPATCH_VERIFY_MIG(kr
);
4288 (void)dispatch_assume_zero(kr
);
4292 #pragma mark dispatch_main_queue
4295 _dispatch_get_main_queue_port_4CF(void)
4297 dispatch_queue_t dq
= &_dispatch_main_q
;
4298 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
4299 _dispatch_runloop_queue_port_init
);
4300 return (mach_port_t
)dq
->do_ctxt
;
4303 static bool main_q_is_draining
;
4305 // 6618342 Contact the team that owns the Instrument DTrace probe before
4306 // renaming this symbol
4309 _dispatch_queue_set_mainq_drain_state(bool arg
)
4311 main_q_is_draining
= arg
;
4315 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg DISPATCH_UNUSED
)
4317 if (main_q_is_draining
) {
4320 _dispatch_queue_set_mainq_drain_state(true);
4321 _dispatch_main_queue_drain();
4322 _dispatch_queue_set_mainq_drain_state(false);
4330 #if HAVE_PTHREAD_MAIN_NP
4331 if (pthread_main_np()) {
4333 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
4334 _dispatch_program_is_probably_callback_driven
= true;
4336 DISPATCH_CRASH("pthread_exit() returned");
4337 #if HAVE_PTHREAD_MAIN_NP
4339 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
4343 DISPATCH_NOINLINE DISPATCH_NORETURN
4345 _dispatch_sigsuspend(void)
4347 static const sigset_t mask
;
4356 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
4358 // never returns, so burn bridges behind us
4359 _dispatch_clear_stack(0);
4360 _dispatch_sigsuspend();
4365 _dispatch_queue_cleanup2(void)
4367 dispatch_queue_t dq
= &_dispatch_main_q
;
4368 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
4369 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
4370 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
4371 dq
->dq_is_thread_bound
= 0;
4372 if (suspend_cnt
== 0) {
4373 _dispatch_queue_wakeup(dq
);
4376 // overload the "probably" variable to mean that dispatch_main() or
4377 // similar non-POSIX API was called
4378 // this has to run before the DISPATCH_COCOA_COMPAT below
4379 if (_dispatch_program_is_probably_callback_driven
) {
4380 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
4381 _DISPATCH_QOS_CLASS_DEFAULT
, true), NULL
, _dispatch_sig_thread
);
4382 sleep(1); // workaround 6778970
4385 #if DISPATCH_COCOA_COMPAT
4386 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
4387 _dispatch_runloop_queue_port_init
);
4388 _dispatch_runloop_queue_port_dispose(dq
);
4393 _dispatch_queue_cleanup(void *ctxt
)
4395 if (ctxt
== &_dispatch_main_q
) {
4396 return _dispatch_queue_cleanup2();
4398 // POSIX defines that destructors are only called if 'ctxt' is non-null
4399 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");