2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
33 #if HAVE_PTHREAD_WORKQUEUES && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
34 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
38 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
39 #define pthread_workqueue_t void*
42 static void _dispatch_cache_cleanup(void *value
);
43 static void _dispatch_async_f_redirect(dispatch_queue_t dq
,
44 dispatch_continuation_t dc
);
45 static void _dispatch_queue_cleanup(void *ctxt
);
46 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq
,
48 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
49 static inline _dispatch_thread_semaphore_t
50 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
);
51 #if HAVE_PTHREAD_WORKQUEUES
52 static void _dispatch_worker_thread3(void *context
);
53 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
54 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
57 #if DISPATCH_USE_PTHREAD_POOL
58 static void *_dispatch_worker_thread(void *context
);
59 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
62 #if DISPATCH_COCOA_COMPAT
63 static dispatch_once_t _dispatch_main_q_port_pred
;
64 static dispatch_queue_t
_dispatch_main_queue_wakeup(void);
65 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
);
66 static void _dispatch_runloop_queue_port_init(void *ctxt
);
67 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
);
71 #pragma mark dispatch_root_queue
73 #if DISPATCH_ENABLE_THREAD_POOL
74 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
76 .do_vtable
= DISPATCH_VTABLE(semaphore
),
77 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
78 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
80 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
81 .do_vtable
= DISPATCH_VTABLE(semaphore
),
82 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
83 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
86 .do_vtable
= DISPATCH_VTABLE(semaphore
),
87 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
88 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
90 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
91 .do_vtable
= DISPATCH_VTABLE(semaphore
),
92 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
93 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
96 .do_vtable
= DISPATCH_VTABLE(semaphore
),
97 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
98 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
100 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
101 .do_vtable
= DISPATCH_VTABLE(semaphore
),
102 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
103 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
106 .do_vtable
= DISPATCH_VTABLE(semaphore
),
107 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
108 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
111 .do_vtable
= DISPATCH_VTABLE(semaphore
),
112 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
113 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
118 #define MAX_PTHREAD_COUNT 255
120 struct dispatch_root_queue_context_s
{
123 unsigned int volatile dgq_pending
;
124 #if HAVE_PTHREAD_WORKQUEUES
125 int dgq_wq_priority
, dgq_wq_options
;
126 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
127 pthread_workqueue_t dgq_kworkqueue
;
129 #endif // HAVE_PTHREAD_WORKQUEUES
130 #if DISPATCH_USE_PTHREAD_POOL
132 dispatch_semaphore_t dgq_thread_mediator
;
133 uint32_t volatile dgq_thread_pool_size
;
136 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
139 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
141 DISPATCH_CACHELINE_ALIGN
142 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
143 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {{{
144 #if HAVE_PTHREAD_WORKQUEUES
145 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
148 #if DISPATCH_ENABLE_THREAD_POOL
149 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
150 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
153 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {{{
154 #if HAVE_PTHREAD_WORKQUEUES
155 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
156 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
158 #if DISPATCH_ENABLE_THREAD_POOL
159 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
160 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
163 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {{{
164 #if HAVE_PTHREAD_WORKQUEUES
165 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
168 #if DISPATCH_ENABLE_THREAD_POOL
169 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
170 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
173 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {{{
174 #if HAVE_PTHREAD_WORKQUEUES
175 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
176 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
178 #if DISPATCH_ENABLE_THREAD_POOL
179 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
180 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
183 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {{{
184 #if HAVE_PTHREAD_WORKQUEUES
185 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
188 #if DISPATCH_ENABLE_THREAD_POOL
189 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
190 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
193 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {{{
194 #if HAVE_PTHREAD_WORKQUEUES
195 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
196 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
198 #if DISPATCH_ENABLE_THREAD_POOL
199 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
200 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
208 #if DISPATCH_ENABLE_THREAD_POOL
209 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
210 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
216 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
225 // 6618342 Contact the team that owns the Instrument DTrace probe before
226 // renaming this symbol
227 // dq_running is set to 2 so that barrier operations go through the slow path
228 DISPATCH_CACHELINE_ALIGN
229 struct dispatch_queue_s _dispatch_root_queues
[] = {
230 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
231 .do_vtable
= DISPATCH_VTABLE(queue_root
),
232 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
233 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
234 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
235 .do_ctxt
= &_dispatch_root_queue_contexts
[
236 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
237 .dq_label
= "com.apple.root.low-priority",
239 .dq_width
= UINT32_MAX
,
242 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
243 .do_vtable
= DISPATCH_VTABLE(queue_root
),
244 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
245 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
246 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
247 .do_ctxt
= &_dispatch_root_queue_contexts
[
248 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
249 .dq_label
= "com.apple.root.low-overcommit-priority",
251 .dq_width
= UINT32_MAX
,
254 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
255 .do_vtable
= DISPATCH_VTABLE(queue_root
),
256 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
257 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
258 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
259 .do_ctxt
= &_dispatch_root_queue_contexts
[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
261 .dq_label
= "com.apple.root.default-priority",
263 .dq_width
= UINT32_MAX
,
266 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
267 .do_vtable
= DISPATCH_VTABLE(queue_root
),
268 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
269 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
270 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
271 .do_ctxt
= &_dispatch_root_queue_contexts
[
272 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
273 .dq_label
= "com.apple.root.default-overcommit-priority",
275 .dq_width
= UINT32_MAX
,
278 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
279 .do_vtable
= DISPATCH_VTABLE(queue_root
),
280 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
281 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
282 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
283 .do_ctxt
= &_dispatch_root_queue_contexts
[
284 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
285 .dq_label
= "com.apple.root.high-priority",
287 .dq_width
= UINT32_MAX
,
290 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
291 .do_vtable
= DISPATCH_VTABLE(queue_root
),
292 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
293 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
294 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
295 .do_ctxt
= &_dispatch_root_queue_contexts
[
296 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
297 .dq_label
= "com.apple.root.high-overcommit-priority",
299 .dq_width
= UINT32_MAX
,
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
303 .do_vtable
= DISPATCH_VTABLE(queue_root
),
304 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
305 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
306 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
307 .do_ctxt
= &_dispatch_root_queue_contexts
[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
309 .dq_label
= "com.apple.root.background-priority",
311 .dq_width
= UINT32_MAX
,
314 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
315 .do_vtable
= DISPATCH_VTABLE(queue_root
),
316 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
317 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
318 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
319 .do_ctxt
= &_dispatch_root_queue_contexts
[
320 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
321 .dq_label
= "com.apple.root.background-overcommit-priority",
323 .dq_width
= UINT32_MAX
,
328 #if HAVE_PTHREAD_WORKQUEUES
329 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
330 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
331 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
332 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
333 &_dispatch_root_queues
[
334 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
335 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
336 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
337 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
338 &_dispatch_root_queues
[
339 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
340 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
341 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
342 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
343 &_dispatch_root_queues
[
344 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
345 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
346 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
347 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
348 &_dispatch_root_queues
[
349 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
351 #endif // HAVE_PTHREAD_WORKQUEUES
353 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
354 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
356 #define _dispatch_mgr_root_queue \
357 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
360 // 6618342 Contact the team that owns the Instrument DTrace probe before
361 // renaming this symbol
362 DISPATCH_CACHELINE_ALIGN
363 struct dispatch_queue_s _dispatch_mgr_q
= {
364 .do_vtable
= DISPATCH_VTABLE(queue_mgr
),
365 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
366 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
367 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
368 .do_targetq
= &_dispatch_mgr_root_queue
,
369 .dq_label
= "com.apple.libdispatch-manager",
371 .dq_is_thread_bound
= 1,
376 dispatch_get_global_queue(long priority
, unsigned long flags
)
378 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
381 return _dispatch_get_root_queue(priority
,
382 flags
& DISPATCH_QUEUE_OVERCOMMIT
);
385 DISPATCH_ALWAYS_INLINE
386 static inline dispatch_queue_t
387 _dispatch_get_current_queue(void)
389 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
393 dispatch_get_current_queue(void)
395 return _dispatch_get_current_queue();
398 DISPATCH_ALWAYS_INLINE
400 _dispatch_queue_targets_queue(dispatch_queue_t dq1
, dispatch_queue_t dq2
)
406 dq1
= dq1
->do_targetq
;
411 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
412 "Assertion failed: Block was run on an unexpected queue"
416 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
419 asprintf(&msg
, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE
,
420 expected
? "Expected" : "Unexpected", dq
, dq
->dq_label
?
422 _dispatch_log("%s", msg
);
423 _dispatch_set_crash_log_message(msg
);
424 _dispatch_hardware_crash();
429 dispatch_assert_queue(dispatch_queue_t dq
)
431 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
432 DISPATCH_CLIENT_CRASH("invalid queue passed to "
433 "dispatch_assert_queue()");
435 dispatch_queue_t cq
= _dispatch_queue_get_current();
436 if (fastpath(cq
) && fastpath(_dispatch_queue_targets_queue(cq
, dq
))) {
439 _dispatch_assert_queue_fail(dq
, true);
443 dispatch_assert_queue_not(dispatch_queue_t dq
)
445 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
446 DISPATCH_CLIENT_CRASH("invalid queue passed to "
447 "dispatch_assert_queue_not()");
449 dispatch_queue_t cq
= _dispatch_queue_get_current();
450 if (slowpath(cq
) && slowpath(_dispatch_queue_targets_queue(cq
, dq
))) {
451 _dispatch_assert_queue_fail(dq
, false);
455 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
456 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
457 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
459 #define _dispatch_root_queue_debug(...)
460 #define _dispatch_debug_root_queue(...)
464 #pragma mark dispatch_init
467 _dispatch_hw_config_init(void)
469 _dispatch_hw_config
.cc_max_active
= _dispatch_get_activecpu();
470 _dispatch_hw_config
.cc_max_logical
= _dispatch_get_logicalcpu_max();
471 _dispatch_hw_config
.cc_max_physical
= _dispatch_get_physicalcpu_max();
475 _dispatch_root_queues_init_workq(void)
478 #if HAVE_PTHREAD_WORKQUEUES
479 bool disable_wq
= false;
480 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
481 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
484 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
486 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
487 pthread_workqueue_setdispatchoffset_np(
488 offsetof(struct dispatch_queue_s
, dq_serialnum
));
490 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
491 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
492 (void)dispatch_assume_zero(r
);
496 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
497 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
499 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
500 pthread_workqueue_attr_t pwq_attr
;
502 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
503 (void)dispatch_assume_zero(r
);
507 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
508 pthread_workqueue_t pwq
= NULL
;
509 dispatch_root_queue_context_t qc
;
510 qc
= &_dispatch_root_queue_contexts
[i
];
511 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
513 #if DISPATCH_NO_BG_PRIORITY
514 && (qc
->dgq_wq_priority
!= WORKQ_BG_PRIOQUEUE
)
517 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
518 qc
->dgq_wq_priority
);
519 (void)dispatch_assume_zero(r
);
520 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
522 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
523 (void)dispatch_assume_zero(r
);
524 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
525 (void)dispatch_assume_zero(r
);
526 result
= result
|| dispatch_assume(pwq
);
528 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
529 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
531 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
533 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
534 (void)dispatch_assume_zero(r
);
538 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
539 #endif // HAVE_PTHREAD_WORKQUEUES
543 #if DISPATCH_USE_PTHREAD_POOL
545 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
548 qc
->dgq_thread_pool_size
= overcommit
? MAX_PTHREAD_COUNT
:
549 _dispatch_hw_config
.cc_max_active
;
551 // override the default FIFO behavior for the pool semaphores
552 kern_return_t kr
= semaphore_create(mach_task_self(),
553 &qc
->dgq_thread_mediator
->dsema_port
, SYNC_POLICY_LIFO
, 0);
554 DISPATCH_VERIFY_MIG(kr
);
555 (void)dispatch_assume_zero(kr
);
556 (void)dispatch_assume(qc
->dgq_thread_mediator
->dsema_port
);
558 /* XXXRW: POSIX semaphores don't support LIFO? */
559 int ret
= sem_init(&qc
->dgq_thread_mediator
->dsema_sem
, 0, 0);
560 (void)dispatch_assume_zero(ret
);
563 #endif // DISPATCH_USE_PTHREAD_POOL
566 _dispatch_root_queues_init(void *context DISPATCH_UNUSED
)
568 _dispatch_safe_fork
= false;
569 if (!_dispatch_root_queues_init_workq()) {
570 #if DISPATCH_ENABLE_THREAD_POOL
572 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
573 bool overcommit
= true;
574 #if TARGET_OS_EMBEDDED
575 // some software hangs if the non-overcommitting queues do not
576 // overcommit when threads block. Someday, this behavior should
577 // apply to all platforms
582 _dispatch_root_queue_init_pthread_pool(
583 &_dispatch_root_queue_contexts
[i
], overcommit
);
586 DISPATCH_CRASH("Root queue initialization failed");
587 #endif // DISPATCH_ENABLE_THREAD_POOL
592 #define countof(x) (sizeof(x) / sizeof(x[0]))
594 DISPATCH_EXPORT DISPATCH_NOTHROW
596 libdispatch_init(void)
598 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 4);
599 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 8);
601 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
602 -DISPATCH_QUEUE_PRIORITY_HIGH
);
603 dispatch_assert(countof(_dispatch_root_queues
) ==
604 DISPATCH_ROOT_QUEUE_COUNT
);
605 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
606 DISPATCH_ROOT_QUEUE_COUNT
);
607 #if HAVE_PTHREAD_WORKQUEUES
608 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
609 sizeof(_dispatch_wq2root_queues
[0][0]) ==
610 DISPATCH_ROOT_QUEUE_COUNT
);
612 #if DISPATCH_ENABLE_THREAD_POOL
613 dispatch_assert(countof(_dispatch_thread_mediator
) ==
614 DISPATCH_ROOT_QUEUE_COUNT
);
617 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
618 DISPATCH_CONTINUATION_SIZE
);
619 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
621 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
622 DISPATCH_CACHELINE_SIZE
== 0);
624 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
625 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
626 _dispatch_thread_key_create(&dispatch_sema4_key
,
627 (void (*)(void *))_dispatch_thread_semaphore_dispose
);
629 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
630 _dispatch_thread_key_create(&dispatch_io_key
, NULL
);
631 _dispatch_thread_key_create(&dispatch_apply_key
, NULL
);
632 #if DISPATCH_PERF_MON
633 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
636 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
637 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
638 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
];
641 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
642 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
644 #if DISPATCH_USE_PTHREAD_ATFORK
645 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
646 dispatch_atfork_parent
, dispatch_atfork_child
));
649 _dispatch_hw_config_init();
650 _dispatch_vtable_init();
652 _dispatch_introspection_init();
655 DISPATCH_EXPORT DISPATCH_NOTHROW
657 dispatch_atfork_child(void)
659 void *crash
= (void *)0x100;
662 if (_dispatch_safe_fork
) {
665 _dispatch_child_of_unsafe_fork
= true;
667 _dispatch_main_q
.dq_items_head
= crash
;
668 _dispatch_main_q
.dq_items_tail
= crash
;
670 _dispatch_mgr_q
.dq_items_head
= crash
;
671 _dispatch_mgr_q
.dq_items_tail
= crash
;
673 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
674 _dispatch_root_queues
[i
].dq_items_head
= crash
;
675 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
680 #pragma mark dispatch_queue_t
686 // 4,5,6,7,8,9,10,11 - global queues
687 // we use 'xadd' on Intel, so the initial value == next assigned
688 unsigned long volatile _dispatch_queue_serial_numbers
= 12;
691 dispatch_queue_create_with_target(const char *label
,
692 dispatch_queue_attr_t attr
, dispatch_queue_t tq
)
696 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
697 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
699 _dispatch_queue_init(dq
);
701 dq
->dq_label
= strdup(label
);
704 if (attr
== DISPATCH_QUEUE_CONCURRENT
) {
705 dq
->dq_width
= UINT32_MAX
;
707 tq
= _dispatch_get_root_queue(0, false);
711 // Default target queue is overcommit!
712 tq
= _dispatch_get_root_queue(0, true);
714 if (slowpath(attr
)) {
715 dispatch_debug_assert(!attr
, "Invalid attribute");
719 _dispatch_object_debug(dq
, "%s", __func__
);
720 return _dispatch_introspection_queue_create(dq
);
724 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
726 return dispatch_queue_create_with_target(label
, attr
,
727 DISPATCH_TARGET_QUEUE_DEFAULT
);
731 _dispatch_queue_destroy(dispatch_object_t dou
)
733 dispatch_queue_t dq
= dou
._dq
;
734 if (slowpath(dq
== _dispatch_queue_get_current())) {
735 DISPATCH_CRASH("Release of a queue by itself");
737 if (slowpath(dq
->dq_items_tail
)) {
738 DISPATCH_CRASH("Release of a queue while items are enqueued");
741 // trash the tail queue so that use after free will crash
742 dq
->dq_items_tail
= (void *)0x200;
744 dispatch_queue_t dqsq
= dispatch_atomic_xchg2o(dq
, dq_specific_q
,
745 (void *)0x200, relaxed
);
747 _dispatch_release(dqsq
);
751 // 6618342 Contact the team that owns the Instrument DTrace probe before
752 // renaming this symbol
754 _dispatch_queue_dispose(dispatch_queue_t dq
)
756 _dispatch_object_debug(dq
, "%s", __func__
);
757 _dispatch_introspection_queue_dispose(dq
);
759 free((void*)dq
->dq_label
);
761 _dispatch_queue_destroy(dq
);
765 dispatch_queue_get_label(dispatch_queue_t dq
)
767 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
768 dq
= _dispatch_get_current_queue();
770 return dq
->dq_label
? dq
->dq_label
: "";
774 _dispatch_queue_set_width2(void *ctxt
)
776 int w
= (int)(intptr_t)ctxt
; // intentional truncation
778 dispatch_queue_t dq
= _dispatch_queue_get_current();
780 if (w
== 1 || w
== 0) {
782 _dispatch_object_debug(dq
, "%s", __func__
);
786 tmp
= (unsigned int)w
;
788 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
789 tmp
= _dispatch_hw_config
.cc_max_physical
;
791 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
792 tmp
= _dispatch_hw_config
.cc_max_active
;
796 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
797 tmp
= _dispatch_hw_config
.cc_max_logical
;
800 // multiply by two since the running count is inc/dec by two
801 // (the low bit == barrier)
802 dq
->dq_width
= tmp
* 2;
803 _dispatch_object_debug(dq
, "%s", __func__
);
807 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
809 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
810 slowpath(dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
813 _dispatch_barrier_trysync_f(dq
, (void*)(intptr_t)width
,
814 _dispatch_queue_set_width2
);
817 // 6618342 Contact the team that owns the Instrument DTrace probe before
818 // renaming this symbol
820 _dispatch_set_target_queue2(void *ctxt
)
822 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
824 prev_dq
= dq
->do_targetq
;
825 dq
->do_targetq
= ctxt
;
826 _dispatch_release(prev_dq
);
827 _dispatch_object_debug(dq
, "%s", __func__
);
831 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
833 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue
, dou
, dq
);
834 if (slowpath(dou
._do
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
835 slowpath(dx_type(dou
._do
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
838 unsigned long type
= dx_metatype(dou
._do
);
840 bool is_concurrent_q
= (type
== _DISPATCH_QUEUE_TYPE
&&
841 slowpath(dou
._dq
->dq_width
> 1));
842 dq
= _dispatch_get_root_queue(0, !is_concurrent_q
);
844 // TODO: put into the vtable
846 case _DISPATCH_QUEUE_TYPE
:
847 case _DISPATCH_SOURCE_TYPE
:
848 _dispatch_retain(dq
);
849 return _dispatch_barrier_trysync_f(dou
._dq
, dq
,
850 _dispatch_set_target_queue2
);
851 case _DISPATCH_IO_TYPE
:
852 return _dispatch_io_set_target_queue(dou
._dchannel
, dq
);
854 dispatch_queue_t prev_dq
;
855 _dispatch_retain(dq
);
856 prev_dq
= dispatch_atomic_xchg2o(dou
._do
, do_targetq
, dq
, release
);
857 if (prev_dq
) _dispatch_release(prev_dq
);
858 _dispatch_object_debug(dou
._do
, "%s", __func__
);
865 #pragma mark dispatch_pthread_root_queue
867 struct dispatch_pthread_root_queue_context_s
{
868 pthread_attr_t dpq_thread_attr
;
869 dispatch_block_t dpq_thread_configure
;
870 struct dispatch_semaphore_s dpq_thread_mediator
;
872 typedef struct dispatch_pthread_root_queue_context_s
*
873 dispatch_pthread_root_queue_context_t
;
875 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
876 static struct dispatch_pthread_root_queue_context_s
877 _dispatch_mgr_root_queue_pthread_context
;
878 static struct dispatch_root_queue_context_s
879 _dispatch_mgr_root_queue_context
= {{{
880 #if HAVE_PTHREAD_WORKQUEUES
881 .dgq_kworkqueue
= (void*)(~0ul),
883 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
884 .dgq_thread_pool_size
= 1,
886 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
887 .do_vtable
= DISPATCH_VTABLE(queue_root
),
888 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
889 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
890 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
891 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
892 .dq_label
= "com.apple.root.libdispatch-manager",
894 .dq_width
= UINT32_MAX
,
901 } _dispatch_mgr_sched
;
902 static dispatch_once_t _dispatch_mgr_sched_pred
;
905 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
907 struct sched_param param
;
908 pthread_attr_t
*attr
;
909 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
910 (void)dispatch_assume_zero(pthread_attr_init(attr
));
911 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
912 &_dispatch_mgr_sched
.policy
));
913 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
914 // high-priority workq threads are at priority 2 above default
915 _dispatch_mgr_sched
.prio
= param
.sched_priority
+ 2;
920 _dispatch_mgr_root_queue_init(void)
922 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
923 struct sched_param param
;
924 pthread_attr_t
*attr
;
925 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
926 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
927 PTHREAD_CREATE_DETACHED
));
929 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
931 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
932 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
933 return &_dispatch_mgr_sched
.tid
;
937 _dispatch_mgr_priority_apply(void)
939 struct sched_param param
;
941 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
942 (void)dispatch_assume_zero(pthread_setschedparam(
943 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
, ¶m
));
944 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
949 _dispatch_mgr_priority_init(void)
951 struct sched_param param
;
952 pthread_attr_t
*attr
;
953 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
954 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
955 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
956 return _dispatch_mgr_priority_apply();
962 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
964 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
965 struct sched_param param
;
966 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
967 int p
= _dispatch_mgr_sched
.prio
;
968 do if (p
>= param
.sched_priority
) {
970 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched
, prio
,
971 p
, param
.sched_priority
, &p
, relaxed
)));
972 if (_dispatch_mgr_sched
.tid
) {
973 return _dispatch_mgr_priority_apply();
978 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
979 const pthread_attr_t
*attr
, dispatch_block_t configure
)
982 dispatch_root_queue_context_t qc
;
983 dispatch_pthread_root_queue_context_t pqc
;
986 if (slowpath(flags
)) {
989 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
990 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
991 sizeof(struct dispatch_root_queue_context_s
) +
992 sizeof(struct dispatch_pthread_root_queue_context_s
));
993 qc
= (void*)dq
+ dqs
;
994 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
996 _dispatch_queue_init(dq
);
998 dq
->dq_label
= strdup(label
);
1001 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
1003 dq
->do_targetq
= NULL
;
1005 dq
->dq_width
= UINT32_MAX
;
1007 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
1008 qc
->dgq_thread_mediator
= &pqc
->dpq_thread_mediator
;
1010 #if HAVE_PTHREAD_WORKQUEUES
1011 qc
->dgq_kworkqueue
= (void*)(~0ul);
1013 _dispatch_root_queue_init_pthread_pool(qc
, true); // rdar://11352331
1016 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
1017 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
1019 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
1021 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1022 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
1024 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
1026 _dispatch_object_debug(dq
, "%s", __func__
);
1027 return _dispatch_introspection_queue_create(dq
);
1032 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
)
1034 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1035 DISPATCH_CRASH("Global root queue disposed");
1037 _dispatch_object_debug(dq
, "%s", __func__
);
1038 _dispatch_introspection_queue_dispose(dq
);
1039 #if DISPATCH_USE_PTHREAD_POOL
1040 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
1041 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
1043 _dispatch_semaphore_dispose(qc
->dgq_thread_mediator
);
1044 if (pqc
->dpq_thread_configure
) {
1045 Block_release(pqc
->dpq_thread_configure
);
1047 dq
->do_targetq
= _dispatch_get_root_queue(0, false);
1050 free((void*)dq
->dq_label
);
1052 _dispatch_queue_destroy(dq
);
1056 #pragma mark dispatch_queue_specific
1058 struct dispatch_queue_specific_queue_s
{
1059 DISPATCH_STRUCT_HEADER(queue_specific_queue
);
1060 DISPATCH_QUEUE_HEADER
;
1061 TAILQ_HEAD(dispatch_queue_specific_head_s
,
1062 dispatch_queue_specific_s
) dqsq_contexts
;
1065 struct dispatch_queue_specific_s
{
1066 const void *dqs_key
;
1068 dispatch_function_t dqs_destructor
;
1069 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
1071 DISPATCH_DECL(dispatch_queue_specific
);
1074 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
1076 dispatch_queue_specific_t dqs
, tmp
;
1078 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
1079 if (dqs
->dqs_destructor
) {
1080 dispatch_async_f(_dispatch_get_root_queue(
1081 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
1082 dqs
->dqs_destructor
);
1086 _dispatch_queue_destroy((dispatch_queue_t
)dqsq
);
1090 _dispatch_queue_init_specific(dispatch_queue_t dq
)
1092 dispatch_queue_specific_queue_t dqsq
;
1094 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
1095 sizeof(struct dispatch_queue_specific_queue_s
));
1096 _dispatch_queue_init((dispatch_queue_t
)dqsq
);
1097 dqsq
->do_xref_cnt
= -1;
1098 dqsq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH
,
1100 dqsq
->dq_width
= UINT32_MAX
;
1101 dqsq
->dq_label
= "queue-specific";
1102 TAILQ_INIT(&dqsq
->dqsq_contexts
);
1103 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
1104 (dispatch_queue_t
)dqsq
, release
))) {
1105 _dispatch_release((dispatch_queue_t
)dqsq
);
1110 _dispatch_queue_set_specific(void *ctxt
)
1112 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
1113 dispatch_queue_specific_queue_t dqsq
=
1114 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1116 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1117 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
1118 // Destroy previous context for existing key
1119 if (dqs
->dqs_destructor
) {
1120 dispatch_async_f(_dispatch_get_root_queue(
1121 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
1122 dqs
->dqs_destructor
);
1124 if (dqsn
->dqs_ctxt
) {
1125 // Copy new context for existing key
1126 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
1127 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
1129 // Remove context storage for existing key
1130 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
1136 // Insert context storage for new key
1137 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
1142 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
1143 void *ctxt
, dispatch_function_t destructor
)
1145 if (slowpath(!key
)) {
1148 dispatch_queue_specific_t dqs
;
1150 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
1152 dqs
->dqs_ctxt
= ctxt
;
1153 dqs
->dqs_destructor
= destructor
;
1154 if (slowpath(!dq
->dq_specific_q
)) {
1155 _dispatch_queue_init_specific(dq
);
1157 _dispatch_barrier_trysync_f(dq
->dq_specific_q
, dqs
,
1158 _dispatch_queue_set_specific
);
1162 _dispatch_queue_get_specific(void *ctxt
)
1164 void **ctxtp
= ctxt
;
1166 dispatch_queue_specific_queue_t dqsq
=
1167 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1168 dispatch_queue_specific_t dqs
;
1170 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1171 if (dqs
->dqs_key
== key
) {
1172 *ctxtp
= dqs
->dqs_ctxt
;
1181 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
1183 if (slowpath(!key
)) {
1188 if (fastpath(dq
->dq_specific_q
)) {
1190 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
1197 dispatch_get_specific(const void *key
)
1199 if (slowpath(!key
)) {
1203 dispatch_queue_t dq
= _dispatch_queue_get_current();
1205 while (slowpath(dq
)) {
1206 if (slowpath(dq
->dq_specific_q
)) {
1208 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
1209 _dispatch_queue_get_specific
);
1212 dq
= dq
->do_targetq
;
1218 #pragma mark dispatch_queue_debug
1221 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1224 dispatch_queue_t target
= dq
->do_targetq
;
1225 offset
+= dsnprintf(buf
, bufsiz
, "target = %s[%p], width = 0x%x, "
1226 "running = 0x%x, barrier = %d ", target
&& target
->dq_label
?
1227 target
->dq_label
: "", target
, dq
->dq_width
/ 2,
1228 dq
->dq_running
/ 2, dq
->dq_running
& 1);
1229 if (dq
->dq_is_thread_bound
) {
1230 offset
+= dsnprintf(buf
, bufsiz
, ", thread = %p ",
1231 _dispatch_queue_get_bound_thread(dq
));
1237 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1240 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
1241 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
1242 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1243 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1244 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
1250 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1252 _dispatch_object_debug(dq
, "%s", str
);
1254 _dispatch_log("queue[NULL]: %s", str
);
1259 #if DISPATCH_PERF_MON
1260 static OSSpinLock _dispatch_stats_lock
;
1262 uint64_t time_total
;
1263 uint64_t count_total
;
1264 uint64_t thread_total
;
1265 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
1268 _dispatch_queue_merge_stats(uint64_t start
)
1270 uint64_t avg
, delta
= _dispatch_absolute_time() - start
;
1271 unsigned long count
, bucket
;
1273 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1274 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1277 avg
= delta
/ count
;
1278 bucket
= flsll(avg
);
1283 // 64-bit counters on 32-bit require a lock or a queue
1284 OSSpinLockLock(&_dispatch_stats_lock
);
1286 _dispatch_stats
[bucket
].time_total
+= delta
;
1287 _dispatch_stats
[bucket
].count_total
+= count
;
1288 _dispatch_stats
[bucket
].thread_total
++;
1290 OSSpinLockUnlock(&_dispatch_stats_lock
);
1295 #pragma mark dispatch_continuation_t
1298 _dispatch_force_cache_cleanup(void)
1300 dispatch_continuation_t dc
;
1301 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1303 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1304 _dispatch_cache_cleanup(dc
);
1310 _dispatch_cache_cleanup(void *value
)
1312 dispatch_continuation_t dc
, next_dc
= value
;
1314 while ((dc
= next_dc
)) {
1315 next_dc
= dc
->do_next
;
1316 _dispatch_continuation_free_to_heap(dc
);
1320 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1321 int _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
1325 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
1327 _dispatch_continuation_free_to_heap(dc
);
1328 dispatch_continuation_t next_dc
;
1329 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1331 if (!dc
|| (cnt
= dc
->do_ref_cnt
-_dispatch_continuation_cache_limit
) <= 0) {
1335 next_dc
= dc
->do_next
;
1336 _dispatch_continuation_free_to_heap(dc
);
1337 } while (--cnt
&& (dc
= next_dc
));
1338 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
1342 DISPATCH_ALWAYS_INLINE_NDEBUG
1344 _dispatch_continuation_redirect(dispatch_queue_t dq
, dispatch_object_t dou
)
1346 dispatch_continuation_t dc
= dou
._dc
;
1348 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, acquire
);
1349 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
1350 (long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
) {
1351 _dispatch_trace_continuation_pop(dq
, dou
);
1352 _dispatch_thread_semaphore_signal(
1353 (_dispatch_thread_semaphore_t
)dc
->dc_other
);
1354 _dispatch_introspection_queue_item_complete(dou
);
1356 _dispatch_async_f_redirect(dq
, dc
);
1358 _dispatch_perfmon_workitem_inc();
1361 DISPATCH_ALWAYS_INLINE_NDEBUG
1363 _dispatch_continuation_pop(dispatch_object_t dou
)
1365 dispatch_continuation_t dc
= dou
._dc
, dc1
;
1366 dispatch_group_t dg
;
1368 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou
);
1369 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
1370 return dx_invoke(dou
._do
);
1373 // Add the item back to the cache before calling the function. This
1374 // allows the 'hot' continuation to be used for a quick callback.
1376 // The ccache version is per-thread.
1377 // Therefore, the object has not been reused yet.
1378 // This generates better assembly.
1379 if ((long)dc
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
1380 dc1
= _dispatch_continuation_free_cacheonly(dc
);
1384 if ((long)dc
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
1389 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
1391 dispatch_group_leave(dg
);
1392 _dispatch_release(dg
);
1394 _dispatch_introspection_queue_item_complete(dou
);
1395 if (slowpath(dc1
)) {
1396 _dispatch_continuation_free_to_cache_limit(dc1
);
1401 #pragma mark dispatch_barrier_async
1405 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1406 dispatch_function_t func
)
1408 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1410 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1414 _dispatch_queue_push(dq
, dc
);
1419 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
1420 dispatch_function_t func
)
1422 dispatch_continuation_t dc
;
1424 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1426 return _dispatch_barrier_async_f_slow(dq
, ctxt
, func
);
1429 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1433 _dispatch_queue_push(dq
, dc
);
1438 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
1440 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
),
1441 _dispatch_call_block_and_release
);
1446 #pragma mark dispatch_async
1449 _dispatch_async_redirect_invoke(void *ctxt
)
1451 struct dispatch_continuation_s
*dc
= ctxt
;
1452 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
1453 dispatch_queue_t old_dq
, dq
= dc
->dc_data
, rq
;
1455 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1456 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1457 _dispatch_continuation_pop(other_dc
);
1458 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1460 rq
= dq
->do_targetq
;
1461 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
1462 if (dispatch_atomic_sub2o(rq
, dq_running
, 2, relaxed
) == 0) {
1463 _dispatch_wakeup(rq
);
1465 rq
= rq
->do_targetq
;
1468 if (dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0) {
1469 _dispatch_wakeup(dq
);
1471 _dispatch_release(dq
);
1475 _dispatch_async_f_redirect2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1477 uint32_t running
= 2;
1479 // Find the queue to redirect to
1481 if (slowpath(dq
->dq_items_tail
) ||
1482 slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) ||
1483 slowpath(dq
->dq_width
== 1)) {
1486 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1487 if (slowpath(running
& 1) || slowpath(running
> dq
->dq_width
)) {
1488 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1491 dq
= dq
->do_targetq
;
1492 } while (slowpath(dq
->do_targetq
));
1494 _dispatch_queue_push_wakeup(dq
, dc
, running
== 0);
1499 _dispatch_async_f_redirect(dispatch_queue_t dq
,
1500 dispatch_continuation_t other_dc
)
1502 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
1504 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1505 dc
->dc_func
= _dispatch_async_redirect_invoke
;
1508 dc
->dc_other
= other_dc
;
1510 _dispatch_retain(dq
);
1511 dq
= dq
->do_targetq
;
1512 if (slowpath(dq
->do_targetq
)) {
1513 return _dispatch_async_f_redirect2(dq
, dc
);
1516 _dispatch_queue_push(dq
, dc
);
1521 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1523 uint32_t running
= 2;
1526 if (slowpath(dq
->dq_items_tail
)
1527 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1530 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1531 if (slowpath(running
> dq
->dq_width
)) {
1532 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1535 if (!slowpath(running
& 1)) {
1536 return _dispatch_async_f_redirect(dq
, dc
);
1538 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1539 // We might get lucky and find that the barrier has ended by now
1540 } while (!(running
& 1));
1542 _dispatch_queue_push_wakeup(dq
, dc
, running
== 0);
1547 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1548 dispatch_function_t func
)
1550 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1552 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1556 // No fastpath/slowpath hint because we simply don't know
1557 if (dq
->do_targetq
) {
1558 return _dispatch_async_f2(dq
, dc
);
1561 _dispatch_queue_push(dq
, dc
);
1566 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1568 dispatch_continuation_t dc
;
1570 // No fastpath/slowpath hint because we simply don't know
1571 if (dq
->dq_width
== 1) {
1572 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1575 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1577 return _dispatch_async_f_slow(dq
, ctxt
, func
);
1580 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1584 // No fastpath/slowpath hint because we simply don't know
1585 if (dq
->do_targetq
) {
1586 return _dispatch_async_f2(dq
, dc
);
1589 _dispatch_queue_push(dq
, dc
);
1594 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
1596 dispatch_async_f(dq
, _dispatch_Block_copy(work
),
1597 _dispatch_call_block_and_release
);
1602 #pragma mark dispatch_group_async
1606 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
1607 dispatch_function_t func
)
1609 dispatch_continuation_t dc
;
1611 _dispatch_retain(dg
);
1612 dispatch_group_enter(dg
);
1614 dc
= _dispatch_continuation_alloc();
1616 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_GROUP_BIT
);
1621 // No fastpath/slowpath hint because we simply don't know
1622 if (dq
->dq_width
!= 1 && dq
->do_targetq
) {
1623 return _dispatch_async_f2(dq
, dc
);
1626 _dispatch_queue_push(dq
, dc
);
1631 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
1632 dispatch_block_t db
)
1634 dispatch_group_async_f(dg
, dq
, _dispatch_Block_copy(db
),
1635 _dispatch_call_block_and_release
);
1640 #pragma mark dispatch_function_invoke
1642 DISPATCH_ALWAYS_INLINE
1644 _dispatch_function_invoke(dispatch_queue_t dq
, void *ctxt
,
1645 dispatch_function_t func
)
1647 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1648 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1649 _dispatch_client_callout(ctxt
, func
);
1650 _dispatch_perfmon_workitem_inc();
1651 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1655 _dispatch_sync_recurse_invoke(void *ctxt
)
1657 dispatch_continuation_t dc
= ctxt
;
1658 _dispatch_function_invoke(dc
->dc_data
, dc
->dc_ctxt
, dc
->dc_func
);
1661 DISPATCH_ALWAYS_INLINE
1663 _dispatch_function_recurse(dispatch_queue_t dq
, void *ctxt
,
1664 dispatch_function_t func
)
1666 struct dispatch_continuation_s dc
= {
1671 dispatch_sync_f(dq
->do_targetq
, &dc
, _dispatch_sync_recurse_invoke
);
1675 #pragma mark dispatch_barrier_sync
1677 static void _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1678 dispatch_function_t func
);
1680 DISPATCH_ALWAYS_INLINE_NDEBUG
1681 static inline _dispatch_thread_semaphore_t
1682 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq
, dispatch_object_t dou
,
1685 _dispatch_thread_semaphore_t sema
;
1686 dispatch_continuation_t dc
= dou
._dc
;
1688 if (DISPATCH_OBJ_IS_VTABLE(dc
) || ((long)dc
->do_vtable
&
1689 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) !=
1690 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) {
1693 _dispatch_trace_continuation_pop(dq
, dc
);
1694 _dispatch_perfmon_workitem_inc();
1698 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
1700 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1701 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1702 // rdar://problem/9032024 running lock must be held until sync_f_slow
1704 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1706 _dispatch_introspection_queue_item_complete(dou
);
1707 return sema
? sema
: MACH_PORT_DEAD
;
1711 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
1713 dispatch_continuation_t dc
= ctxt
;
1714 dispatch_queue_t dq
= dc
->dc_data
;
1715 _dispatch_thread_semaphore_t sema
;
1716 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
1718 dispatch_assert(dq
== _dispatch_queue_get_current());
1719 #if DISPATCH_COCOA_COMPAT
1720 if (slowpath(dq
->dq_is_thread_bound
)) {
1721 // The queue is bound to a non-dispatch thread (e.g. main thread)
1722 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
1723 dispatch_atomic_store2o(dc
, dc_func
, NULL
, release
);
1724 _dispatch_thread_semaphore_signal(sema
); // release
1728 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1729 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1730 // rdar://9032024 running lock must be held until sync_f_slow returns
1731 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1732 _dispatch_thread_semaphore_signal(sema
); // release
1737 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
1738 dispatch_function_t func
)
1740 if (slowpath(!dq
->do_targetq
)) {
1741 // the global concurrent queues do not need strict ordering
1742 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1743 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1745 // It's preferred to execute synchronous blocks on the current thread
1746 // due to thread-local side effects, garbage collection, etc. However,
1747 // blocks submitted to the main thread MUST be run on the main thread
1749 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1750 struct dispatch_continuation_s dc
= {
1752 #if DISPATCH_COCOA_COMPAT
1756 .dc_other
= (void*)sema
,
1758 struct dispatch_continuation_s dbss
= {
1759 .do_vtable
= (void *)(DISPATCH_OBJ_BARRIER_BIT
|
1760 DISPATCH_OBJ_SYNC_SLOW_BIT
),
1761 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
1763 #if DISPATCH_INTROSPECTION
1764 .dc_data
= (void*)_dispatch_thread_self(),
1767 _dispatch_queue_push(dq
, &dbss
);
1769 _dispatch_thread_semaphore_wait(sema
); // acquire
1770 _dispatch_put_thread_semaphore(sema
);
1772 #if DISPATCH_COCOA_COMPAT
1773 // Queue bound to a non-dispatch thread
1774 if (dc
.dc_func
== NULL
) {
1778 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1779 _dispatch_function_recurse(dq
, ctxt
, func
);
1781 _dispatch_function_invoke(dq
, ctxt
, func
);
1783 if (fastpath(dq
->do_suspend_cnt
< 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL
) &&
1784 dq
->dq_running
== 2) {
1785 // rdar://problem/8290662 "lock transfer"
1786 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1788 _dispatch_thread_semaphore_signal(sema
); // release
1792 (void)dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
1793 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1794 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, release
) == 0)) {
1795 _dispatch_wakeup(dq
);
1801 _dispatch_barrier_sync_f2(dispatch_queue_t dq
)
1803 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1804 // rdar://problem/8290662 "lock transfer"
1805 _dispatch_thread_semaphore_t sema
;
1806 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1808 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1809 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1810 // rdar://9032024 running lock must be held until sync_f_slow
1811 // returns: increment by 2 and decrement by 1
1812 (void)dispatch_atomic_inc2o(dq
, dq_running
, relaxed
);
1813 _dispatch_thread_semaphore_signal(sema
);
1817 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1818 _dispatch_wakeup(dq
);
1824 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1825 dispatch_function_t func
)
1827 _dispatch_function_invoke(dq
, ctxt
, func
);
1828 if (slowpath(dq
->dq_items_tail
)) {
1829 return _dispatch_barrier_sync_f2(dq
);
1831 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1832 _dispatch_wakeup(dq
);
1838 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1839 dispatch_function_t func
)
1841 _dispatch_function_recurse(dq
, ctxt
, func
);
1842 if (slowpath(dq
->dq_items_tail
)) {
1843 return _dispatch_barrier_sync_f2(dq
);
1845 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1846 _dispatch_wakeup(dq
);
1852 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
1853 dispatch_function_t func
)
1855 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1856 // 2) the queue is not suspended
1857 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1858 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1860 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1, acquire
))) {
1861 // global concurrent queues and queues bound to non-dispatch threads
1862 // always fall into the slow case
1863 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1865 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1866 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
);
1868 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
1872 #if DISPATCH_COCOA_COMPAT
1875 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1877 // Blocks submitted to the main queue MUST be run on the main thread,
1878 // therefore under GC we must Block_copy in order to notify the thread-local
1879 // garbage collector that the objects are transferring to the main thread
1880 // rdar://problem/7176237&7181849&7458685
1881 if (dispatch_begin_thread_4GC
) {
1882 dispatch_block_t block
= _dispatch_Block_copy(work
);
1883 return dispatch_barrier_sync_f(dq
, block
,
1884 _dispatch_call_block_and_release
);
1886 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
1891 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
1893 #if DISPATCH_COCOA_COMPAT
1894 if (slowpath(dq
->dq_is_thread_bound
)) {
1895 return _dispatch_barrier_sync_slow(dq
, work
);
1898 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
1904 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1905 dispatch_function_t func
)
1907 _dispatch_function_invoke(dq
, ctxt
, func
);
1908 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1909 _dispatch_wakeup(dq
);
1915 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
1916 dispatch_function_t func
)
1918 // Use for mutation of queue-/source-internal state only, ignores target
1920 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
1921 || slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1,
1923 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1925 _dispatch_barrier_trysync_f_invoke(dq
, ctxt
, func
);
1929 #pragma mark dispatch_sync
1933 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
1936 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1937 struct dispatch_continuation_s dss
= {
1938 .do_vtable
= (void*)DISPATCH_OBJ_SYNC_SLOW_BIT
,
1939 #if DISPATCH_INTROSPECTION
1942 .dc_data
= (void*)_dispatch_thread_self(),
1944 .dc_other
= (void*)sema
,
1946 _dispatch_queue_push_wakeup(dq
, &dss
, wakeup
);
1948 _dispatch_thread_semaphore_wait(sema
);
1949 _dispatch_put_thread_semaphore(sema
);
1951 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1952 _dispatch_function_recurse(dq
, ctxt
, func
);
1954 _dispatch_function_invoke(dq
, ctxt
, func
);
1956 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1957 _dispatch_wakeup(dq
);
1963 _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1964 dispatch_function_t func
)
1966 _dispatch_function_invoke(dq
, ctxt
, func
);
1967 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1968 _dispatch_wakeup(dq
);
1974 _dispatch_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1975 dispatch_function_t func
)
1977 _dispatch_function_recurse(dq
, ctxt
, func
);
1978 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1979 _dispatch_wakeup(dq
);
1984 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1986 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1987 // 2) the queue is not suspended
1988 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1989 return _dispatch_sync_f_slow(dq
, ctxt
, func
, false);
1991 uint32_t running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1992 // re-check suspension after barrier check <rdar://problem/15242126>
1993 if (slowpath(running
& 1) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1994 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1995 return _dispatch_sync_f_slow(dq
, ctxt
, func
, running
== 0);
1997 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1998 return _dispatch_sync_f_recurse(dq
, ctxt
, func
);
2000 _dispatch_sync_f_invoke(dq
, ctxt
, func
);
2005 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
2007 if (fastpath(dq
->dq_width
== 1)) {
2008 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
2010 if (slowpath(!dq
->do_targetq
)) {
2011 // the global concurrent queues do not need strict ordering
2012 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2013 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
2015 _dispatch_sync_f2(dq
, ctxt
, func
);
2019 #if DISPATCH_COCOA_COMPAT
2022 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
2024 // Blocks submitted to the main queue MUST be run on the main thread,
2025 // therefore under GC we must Block_copy in order to notify the thread-local
2026 // garbage collector that the objects are transferring to the main thread
2027 // rdar://problem/7176237&7181849&7458685
2028 if (dispatch_begin_thread_4GC
) {
2029 dispatch_block_t block
= _dispatch_Block_copy(work
);
2030 return dispatch_sync_f(dq
, block
, _dispatch_call_block_and_release
);
2032 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
2037 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
2039 #if DISPATCH_COCOA_COMPAT
2040 if (slowpath(dq
->dq_is_thread_bound
)) {
2041 return _dispatch_sync_slow(dq
, work
);
2044 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
2049 #pragma mark dispatch_after
2052 _dispatch_after_timer_callback(void *ctxt
)
2054 dispatch_continuation_t dc
= ctxt
, dc1
;
2055 dispatch_source_t ds
= dc
->dc_data
;
2056 dc1
= _dispatch_continuation_free_cacheonly(dc
);
2057 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
2058 dispatch_source_cancel(ds
);
2059 dispatch_release(ds
);
2060 if (slowpath(dc1
)) {
2061 _dispatch_continuation_free_to_cache_limit(dc1
);
2067 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
2068 dispatch_function_t func
)
2070 uint64_t delta
, leeway
;
2071 dispatch_source_t ds
;
2073 if (when
== DISPATCH_TIME_FOREVER
) {
2075 DISPATCH_CLIENT_CRASH(
2076 "dispatch_after_f() called with 'when' == infinity");
2081 delta
= _dispatch_timeout(when
);
2083 return dispatch_async_f(queue
, ctxt
, func
);
2085 leeway
= delta
/ 10; // <rdar://problem/13447496>
2086 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
2087 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
2089 // this function can and should be optimized to not use a dispatch source
2090 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, queue
);
2091 dispatch_assert(ds
);
2093 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
2094 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
2099 dispatch_set_context(ds
, dc
);
2100 dispatch_source_set_event_handler_f(ds
, _dispatch_after_timer_callback
);
2101 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, leeway
);
2102 dispatch_resume(ds
);
2107 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
2108 dispatch_block_t work
)
2110 // test before the copy of the block
2111 if (when
== DISPATCH_TIME_FOREVER
) {
2113 DISPATCH_CLIENT_CRASH(
2114 "dispatch_after() called with 'when' == infinity");
2118 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
),
2119 _dispatch_call_block_and_release
);
2124 #pragma mark dispatch_queue_push
2128 _dispatch_queue_push_list_slow2(dispatch_queue_t dq
,
2129 struct dispatch_object_s
*obj
)
2131 // The queue must be retained before dq_items_head is written in order
2132 // to ensure that the reference is still valid when _dispatch_wakeup is
2133 // called. Otherwise, if preempted between the assignment to
2134 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
2135 // queue may release the last reference to the queue when invoked by
2136 // _dispatch_queue_drain. <rdar://problem/6932776>
2137 _dispatch_retain(dq
);
2138 dq
->dq_items_head
= obj
;
2139 _dispatch_wakeup(dq
);
2140 _dispatch_release(dq
);
2145 _dispatch_queue_push_list_slow(dispatch_queue_t dq
,
2146 struct dispatch_object_s
*obj
, unsigned int n
)
2148 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
2149 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
2150 return _dispatch_queue_wakeup_global2(dq
, n
);
2152 _dispatch_queue_push_list_slow2(dq
, obj
);
2157 _dispatch_queue_push_slow(dispatch_queue_t dq
,
2158 struct dispatch_object_s
*obj
)
2160 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
2161 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
2162 return _dispatch_queue_wakeup_global(dq
);
2164 _dispatch_queue_push_list_slow2(dq
, obj
);
2168 #pragma mark dispatch_queue_probe
2171 _dispatch_queue_probe(dispatch_queue_t dq
)
2173 return (unsigned long)slowpath(dq
->dq_items_tail
!= NULL
);
2176 #if DISPATCH_COCOA_COMPAT
2178 _dispatch_runloop_queue_probe(dispatch_queue_t dq
)
2180 if (_dispatch_queue_probe(dq
)) {
2181 if (dq
->do_xref_cnt
== -1) return true; // <rdar://problem/14026816>
2182 return _dispatch_runloop_queue_wakeup(dq
);
2189 _dispatch_mgr_queue_probe(dispatch_queue_t dq
)
2191 if (_dispatch_queue_probe(dq
)) {
2192 return _dispatch_mgr_wakeup(dq
);
2198 _dispatch_root_queue_probe(dispatch_queue_t dq
)
2200 _dispatch_queue_wakeup_global(dq
);
2205 #pragma mark dispatch_wakeup
2207 // 6618342 Contact the team that owns the Instrument DTrace probe before
2208 // renaming this symbol
2210 _dispatch_wakeup(dispatch_object_t dou
)
2212 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
2215 if (!dx_probe(dou
._do
)) {
2218 if (!dispatch_atomic_cmpxchg2o(dou
._do
, do_suspend_cnt
, 0,
2219 DISPATCH_OBJECT_SUSPEND_LOCK
, release
)) {
2220 #if DISPATCH_COCOA_COMPAT
2221 if (dou
._dq
== &_dispatch_main_q
) {
2222 return _dispatch_main_queue_wakeup();
2227 _dispatch_retain(dou
._do
);
2228 dispatch_queue_t tq
= dou
._do
->do_targetq
;
2229 _dispatch_queue_push(tq
, dou
._do
);
2230 return tq
; // libdispatch does not need this, but the Instrument DTrace
2234 #if DISPATCH_COCOA_COMPAT
2236 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq
)
2238 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
2242 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
2244 case MACH_SEND_TIMEOUT
:
2245 case MACH_SEND_TIMED_OUT
:
2246 case MACH_SEND_INVALID_DEST
:
2249 (void)dispatch_assume_zero(kr
);
2254 DISPATCH_NOINLINE DISPATCH_WEAK
2256 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
)
2258 _dispatch_runloop_queue_wakeup_thread(dq
);
2263 static dispatch_queue_t
2264 _dispatch_main_queue_wakeup(void)
2266 dispatch_queue_t dq
= &_dispatch_main_q
;
2267 if (!dq
->dq_is_thread_bound
) {
2270 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
2271 _dispatch_runloop_queue_port_init
);
2272 _dispatch_runloop_queue_wakeup_thread(dq
);
2279 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq
, unsigned int n
)
2281 static dispatch_once_t pred
;
2282 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2286 _dispatch_debug_root_queue(dq
, __func__
);
2287 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
2289 #if HAVE_PTHREAD_WORKQUEUES
2290 #if DISPATCH_USE_PTHREAD_POOL
2291 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
2294 _dispatch_root_queue_debug("requesting new worker thread for global "
2296 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2297 if (qc
->dgq_kworkqueue
) {
2298 pthread_workitem_handle_t wh
;
2299 unsigned int gen_cnt
;
2301 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
2302 _dispatch_worker_thread3
, dq
, &wh
, &gen_cnt
);
2303 (void)dispatch_assume_zero(r
);
2307 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2308 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2309 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
2310 qc
->dgq_wq_options
, (int)i
);
2311 (void)dispatch_assume_zero(r
);
2315 #endif // HAVE_PTHREAD_WORKQUEUES
2316 #if DISPATCH_USE_PTHREAD_POOL
2317 if (fastpath(qc
->dgq_thread_mediator
)) {
2318 while (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
2324 uint32_t j
, t_count
= qc
->dgq_thread_pool_size
;
2327 _dispatch_root_queue_debug("pthread pool is full for root queue: "
2331 j
= i
> t_count
? t_count
: i
;
2332 } while (!dispatch_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
2333 t_count
- j
, &t_count
, relaxed
));
2335 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2336 pthread_attr_t
*attr
= pqc
? &pqc
->dpq_thread_attr
: NULL
;
2337 pthread_t tid
, *pthr
= &tid
;
2338 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2339 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
2340 pthr
= _dispatch_mgr_root_queue_init();
2344 _dispatch_retain(dq
);
2345 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
2347 (void)dispatch_assume_zero(r
);
2349 _dispatch_temporary_resource_shortage();
2352 r
= pthread_detach(*pthr
);
2353 (void)dispatch_assume_zero(r
);
2356 #endif // DISPATCH_USE_PTHREAD_POOL
2360 _dispatch_queue_wakeup_global2(dispatch_queue_t dq
, unsigned int n
)
2362 if (!dq
->dq_items_tail
) {
2365 #if HAVE_PTHREAD_WORKQUEUES
2366 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2368 #if DISPATCH_USE_PTHREAD_POOL
2369 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
2371 !dispatch_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
2372 _dispatch_root_queue_debug("worker thread request still pending for "
2373 "global queue: %p", dq
);
2376 #endif // HAVE_PTHREAD_WORKQUEUES
2377 return _dispatch_queue_wakeup_global_slow(dq
, n
);
2381 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
2383 return _dispatch_queue_wakeup_global2(dq
, 1);
2387 #pragma mark dispatch_queue_invoke
2389 DISPATCH_ALWAYS_INLINE
2390 static inline dispatch_queue_t
2391 dispatch_queue_invoke2(dispatch_object_t dou
,
2392 _dispatch_thread_semaphore_t
*sema_ptr
)
2394 dispatch_queue_t dq
= dou
._dq
;
2395 dispatch_queue_t otq
= dq
->do_targetq
;
2396 *sema_ptr
= _dispatch_queue_drain(dq
);
2398 if (slowpath(otq
!= dq
->do_targetq
)) {
2399 // An item on the queue changed the target queue
2400 return dq
->do_targetq
;
2405 // 6618342 Contact the team that owns the Instrument DTrace probe before
2406 // renaming this symbol
2409 _dispatch_queue_invoke(dispatch_queue_t dq
)
2411 _dispatch_queue_class_invoke(dq
, dispatch_queue_invoke2
);
2415 #pragma mark dispatch_queue_drain
2417 DISPATCH_ALWAYS_INLINE
2418 static inline struct dispatch_object_s
*
2419 _dispatch_queue_head(dispatch_queue_t dq
)
2421 struct dispatch_object_s
*dc
;
2422 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2423 dispatch_hardware_pause();
2428 DISPATCH_ALWAYS_INLINE
2429 static inline struct dispatch_object_s
*
2430 _dispatch_queue_next(dispatch_queue_t dq
, struct dispatch_object_s
*dc
)
2432 struct dispatch_object_s
*next_dc
;
2433 next_dc
= fastpath(dc
->do_next
);
2434 dq
->dq_items_head
= next_dc
;
2435 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
,
2437 // Enqueue is TIGHTLY controlled, we won't wait long.
2438 while (!(next_dc
= fastpath(dc
->do_next
))) {
2439 dispatch_hardware_pause();
2441 dq
->dq_items_head
= next_dc
;
2446 _dispatch_thread_semaphore_t
2447 _dispatch_queue_drain(dispatch_object_t dou
)
2449 dispatch_queue_t dq
= dou
._dq
, orig_tq
, old_dq
;
2450 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2451 struct dispatch_object_s
*dc
, *next_dc
;
2452 _dispatch_thread_semaphore_t sema
= 0;
2454 // Continue draining sources after target queue change rdar://8928171
2455 bool check_tq
= (dx_type(dq
) != DISPATCH_SOURCE_KEVENT_TYPE
);
2457 orig_tq
= dq
->do_targetq
;
2459 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2460 //dispatch_debug_queue(dq, __func__);
2462 while (dq
->dq_items_tail
) {
2463 dc
= _dispatch_queue_head(dq
);
2465 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
2468 if (dq
->dq_running
> dq
->dq_width
) {
2471 if (slowpath(orig_tq
!= dq
->do_targetq
) && check_tq
) {
2474 bool redirect
= false;
2475 if (!fastpath(dq
->dq_width
== 1)) {
2476 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
2477 (long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
2478 if (dq
->dq_running
> 1) {
2485 next_dc
= _dispatch_queue_next(dq
, dc
);
2487 _dispatch_continuation_redirect(dq
, dc
);
2490 if ((sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, true))) {
2493 _dispatch_continuation_pop(dc
);
2494 _dispatch_perfmon_workitem_inc();
2495 } while ((dc
= next_dc
));
2499 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2503 #if DISPATCH_COCOA_COMPAT
2505 _dispatch_main_queue_drain(void)
2507 dispatch_queue_t dq
= &_dispatch_main_q
;
2508 if (!dq
->dq_items_tail
) {
2511 struct dispatch_continuation_s marker
= {
2514 struct dispatch_object_s
*dmarker
= (void*)&marker
;
2515 _dispatch_queue_push_notrace(dq
, dmarker
);
2517 _dispatch_perfmon_start();
2518 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2519 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2521 struct dispatch_object_s
*dc
, *next_dc
;
2522 dc
= _dispatch_queue_head(dq
);
2524 next_dc
= _dispatch_queue_next(dq
, dc
);
2525 if (dc
== dmarker
) {
2528 _dispatch_continuation_pop(dc
);
2529 _dispatch_perfmon_workitem_inc();
2530 } while ((dc
= next_dc
));
2531 DISPATCH_CRASH("Main queue corruption");
2535 _dispatch_main_queue_wakeup();
2537 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2538 _dispatch_perfmon_end();
2539 _dispatch_force_cache_cleanup();
2543 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
2545 if (!dq
->dq_items_tail
) {
2548 _dispatch_perfmon_start();
2549 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2550 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2552 struct dispatch_object_s
*dc
, *next_dc
;
2553 dc
= _dispatch_queue_head(dq
);
2554 next_dc
= _dispatch_queue_next(dq
, dc
);
2555 _dispatch_continuation_pop(dc
);
2556 _dispatch_perfmon_workitem_inc();
2558 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2559 _dispatch_perfmon_end();
2560 _dispatch_force_cache_cleanup();
2565 DISPATCH_ALWAYS_INLINE_NDEBUG
2566 static inline _dispatch_thread_semaphore_t
2567 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
)
2569 // rdar://problem/8290662 "lock transfer"
2570 struct dispatch_object_s
*dc
;
2571 _dispatch_thread_semaphore_t sema
;
2573 // queue is locked, or suspended and not being drained
2574 dc
= dq
->dq_items_head
;
2575 if (slowpath(!dc
) || !(sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, false))){
2578 // dequeue dc, it is a barrier sync
2579 (void)_dispatch_queue_next(dq
, dc
);
2584 _dispatch_mgr_queue_drain(void)
2586 dispatch_queue_t dq
= &_dispatch_mgr_q
;
2587 if (!dq
->dq_items_tail
) {
2588 return _dispatch_force_cache_cleanup();
2590 _dispatch_perfmon_start();
2591 if (slowpath(_dispatch_queue_drain(dq
))) {
2592 DISPATCH_CRASH("Sync onto manager queue");
2594 _dispatch_perfmon_end();
2595 _dispatch_force_cache_cleanup();
2599 #pragma mark dispatch_root_queue_drain
2601 #ifndef DISPATCH_CONTENTION_USE_RAND
2602 #define DISPATCH_CONTENTION_USE_RAND (!TARGET_OS_EMBEDDED)
2604 #ifndef DISPATCH_CONTENTION_SPINS_MAX
2605 #define DISPATCH_CONTENTION_SPINS_MAX (128 - 1)
2607 #ifndef DISPATCH_CONTENTION_SPINS_MIN
2608 #define DISPATCH_CONTENTION_SPINS_MIN (32 - 1)
2610 #ifndef DISPATCH_CONTENTION_USLEEP_START
2611 #define DISPATCH_CONTENTION_USLEEP_START 500
2613 #ifndef DISPATCH_CONTENTION_USLEEP_MAX
2614 #define DISPATCH_CONTENTION_USLEEP_MAX 100000
2619 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq
)
2621 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2622 struct dispatch_object_s
*const mediator
= (void *)~0ul;
2623 bool pending
= false, available
= true;
2624 unsigned int spins
, sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
2627 // Spin for a short while in case the contention is temporary -- e.g.
2628 // when starting up after dispatch_apply, or when executing a few
2629 // short continuations in a row.
2630 #if DISPATCH_CONTENTION_USE_RAND
2631 // Use randomness to prevent threads from resonating at the same
2632 // frequency and permanently contending. All threads sharing the same
2633 // seed value is safe with the FreeBSD rand_r implementation.
2634 static unsigned int seed
;
2635 spins
= (rand_r(&seed
) & DISPATCH_CONTENTION_SPINS_MAX
) |
2636 DISPATCH_CONTENTION_SPINS_MIN
;
2638 spins
= DISPATCH_CONTENTION_SPINS_MIN
+
2639 (DISPATCH_CONTENTION_SPINS_MAX
-DISPATCH_CONTENTION_SPINS_MIN
)/2;
2642 dispatch_hardware_pause();
2643 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
2645 // Since we have serious contention, we need to back off.
2647 // Mark this queue as pending to avoid requests for further threads
2648 (void)dispatch_atomic_inc2o(qc
, dgq_pending
, relaxed
);
2651 _dispatch_contention_usleep(sleep_time
);
2652 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
2654 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
2656 // The ratio of work to libdispatch overhead must be bad. This
2657 // scenario implies that there are too many threads in the pool.
2658 // Create a new pending thread and then exit this thread.
2659 // The kernel will grant a new thread when the load subsides.
2660 _dispatch_debug("contention on global queue: %p", dq
);
2661 _dispatch_queue_wakeup_global(dq
);
2665 (void)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
2670 DISPATCH_ALWAYS_INLINE_NDEBUG
2671 static inline struct dispatch_object_s
*
2672 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
2674 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
2677 // The mediator value acts both as a "lock" and a signal
2678 head
= dispatch_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
2680 if (slowpath(head
== NULL
)) {
2681 // The first xchg on the tail will tell the enqueueing thread that it
2682 // is safe to blindly write out to the head pointer. A cmpxchg honors
2684 (void)dispatch_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
, NULL
,
2686 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
2690 if (slowpath(head
== mediator
)) {
2691 // This thread lost the race for ownership of the queue.
2692 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq
))) {
2698 // Restore the head pointer to a sane value before returning.
2699 // If 'next' is NULL, then this item _might_ be the last item.
2700 next
= fastpath(head
->do_next
);
2702 if (slowpath(!next
)) {
2703 dispatch_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
2705 if (dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, relaxed
)) {
2706 // both head and tail are NULL now
2710 // There must be a next item now. This thread won't wait long.
2711 while (!(next
= head
->do_next
)) {
2712 dispatch_hardware_pause();
2716 dispatch_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
2717 _dispatch_queue_wakeup_global(dq
);
2723 _dispatch_root_queue_drain(dispatch_queue_t dq
)
2726 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
2727 DISPATCH_CRASH("Premature thread recycling");
2730 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2732 #if DISPATCH_COCOA_COMPAT
2733 // ensure that high-level memory management techniques do not leak/crash
2734 if (dispatch_begin_thread_4GC
) {
2735 dispatch_begin_thread_4GC();
2737 void *pool
= _dispatch_autorelease_pool_push();
2738 #endif // DISPATCH_COCOA_COMPAT
2740 _dispatch_perfmon_start();
2741 struct dispatch_object_s
*item
;
2742 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
2743 _dispatch_continuation_pop(item
);
2745 _dispatch_perfmon_end();
2747 #if DISPATCH_COCOA_COMPAT
2748 _dispatch_autorelease_pool_pop(pool
);
2749 if (dispatch_end_thread_4GC
) {
2750 dispatch_end_thread_4GC();
2752 #endif // DISPATCH_COCOA_COMPAT
2754 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
2758 #pragma mark dispatch_worker_thread
2760 #if HAVE_PTHREAD_WORKQUEUES
2762 _dispatch_worker_thread3(void *context
)
2764 dispatch_queue_t dq
= context
;
2765 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2767 _dispatch_introspection_thread_add();
2769 (void)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
2770 _dispatch_root_queue_drain(dq
);
2771 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
2775 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2776 // 6618342 Contact the team that owns the Instrument DTrace probe before
2777 // renaming this symbol
2779 _dispatch_worker_thread2(int priority
, int options
,
2780 void *context DISPATCH_UNUSED
)
2782 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
2783 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
2784 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
2786 return _dispatch_worker_thread3(dq
);
2788 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2789 #endif // HAVE_PTHREAD_WORKQUEUES
2791 #if DISPATCH_USE_PTHREAD_POOL
2792 // 6618342 Contact the team that owns the Instrument DTrace probe before
2793 // renaming this symbol
2795 _dispatch_worker_thread(void *context
)
2797 dispatch_queue_t dq
= context
;
2798 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2799 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2801 if (pqc
&& pqc
->dpq_thread_configure
) {
2802 pqc
->dpq_thread_configure();
2807 // workaround tweaks the kernel workqueue does for us
2808 r
= sigfillset(&mask
);
2809 (void)dispatch_assume_zero(r
);
2810 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
2811 (void)dispatch_assume_zero(r
);
2812 _dispatch_introspection_thread_add();
2814 // Non-pthread-root-queue pthreads use a 65 second timeout in case there
2815 // are any timers that run once a minute <rdar://problem/11744973>
2816 const int64_t timeout
= (pqc
? 5ull : 65ull) * NSEC_PER_SEC
;
2819 _dispatch_root_queue_drain(dq
);
2820 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
,
2821 dispatch_time(0, timeout
)) == 0);
2823 (void)dispatch_atomic_inc2o(qc
, dgq_thread_pool_size
, relaxed
);
2824 _dispatch_queue_wakeup_global(dq
);
2825 _dispatch_release(dq
);
2831 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
2835 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2837 r
= sigdelset(set
, SIGILL
);
2838 (void)dispatch_assume_zero(r
);
2839 r
= sigdelset(set
, SIGTRAP
);
2840 (void)dispatch_assume_zero(r
);
2841 #if HAVE_DECL_SIGEMT
2842 r
= sigdelset(set
, SIGEMT
);
2843 (void)dispatch_assume_zero(r
);
2845 r
= sigdelset(set
, SIGFPE
);
2846 (void)dispatch_assume_zero(r
);
2847 r
= sigdelset(set
, SIGBUS
);
2848 (void)dispatch_assume_zero(r
);
2849 r
= sigdelset(set
, SIGSEGV
);
2850 (void)dispatch_assume_zero(r
);
2851 r
= sigdelset(set
, SIGSYS
);
2852 (void)dispatch_assume_zero(r
);
2853 r
= sigdelset(set
, SIGPIPE
);
2854 (void)dispatch_assume_zero(r
);
2856 return pthread_sigmask(how
, set
, oset
);
2858 #endif // DISPATCH_USE_PTHREAD_POOL
2861 #pragma mark dispatch_runloop_queue
2863 static bool _dispatch_program_is_probably_callback_driven
;
2865 #if DISPATCH_COCOA_COMPAT
2868 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
2870 dispatch_queue_t dq
;
2873 if (slowpath(flags
)) {
2876 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
2877 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
2878 _dispatch_queue_init(dq
);
2879 dq
->do_targetq
= _dispatch_get_root_queue(0, true);
2880 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
2881 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
2883 dq
->dq_is_thread_bound
= 1;
2884 _dispatch_runloop_queue_port_init(dq
);
2885 _dispatch_queue_set_bound_thread(dq
);
2886 _dispatch_object_debug(dq
, "%s", __func__
);
2887 return _dispatch_introspection_queue_create(dq
);
2891 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
2893 _dispatch_object_debug(dq
, "%s", __func__
);
2894 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
2895 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
2896 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
2897 _dispatch_queue_clear_bound_thread(dq
);
2898 if (suspend_cnt
== 0) {
2899 _dispatch_wakeup(dq
);
2904 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
)
2906 _dispatch_object_debug(dq
, "%s", __func__
);
2907 _dispatch_introspection_queue_dispose(dq
);
2908 _dispatch_runloop_queue_port_dispose(dq
);
2909 _dispatch_queue_destroy(dq
);
2913 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
2915 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2916 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2918 dispatch_retain(dq
);
2919 bool r
= _dispatch_runloop_queue_drain_one(dq
);
2920 dispatch_release(dq
);
2925 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
2927 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2928 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2930 _dispatch_runloop_queue_probe(dq
);
2934 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
2936 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2937 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2939 return (mach_port_t
)dq
->do_ctxt
;
2943 _dispatch_runloop_queue_port_init(void *ctxt
)
2945 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
2949 _dispatch_safe_fork
= false;
2950 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
2951 DISPATCH_VERIFY_MIG(kr
);
2952 (void)dispatch_assume_zero(kr
);
2953 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
2954 MACH_MSG_TYPE_MAKE_SEND
);
2955 DISPATCH_VERIFY_MIG(kr
);
2956 (void)dispatch_assume_zero(kr
);
2957 if (dq
!= &_dispatch_main_q
) {
2958 struct mach_port_limits limits
= {
2961 kr
= mach_port_set_attributes(mach_task_self(), mp
,
2962 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
2964 DISPATCH_VERIFY_MIG(kr
);
2965 (void)dispatch_assume_zero(kr
);
2967 dq
->do_ctxt
= (void*)(uintptr_t)mp
;
2969 _dispatch_program_is_probably_callback_driven
= true;
2973 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
)
2975 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
2980 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
2981 DISPATCH_VERIFY_MIG(kr
);
2982 (void)dispatch_assume_zero(kr
);
2983 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
2984 DISPATCH_VERIFY_MIG(kr
);
2985 (void)dispatch_assume_zero(kr
);
2989 #pragma mark dispatch_main_queue
2992 _dispatch_get_main_queue_port_4CF(void)
2994 dispatch_queue_t dq
= &_dispatch_main_q
;
2995 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
2996 _dispatch_runloop_queue_port_init
);
2997 return (mach_port_t
)dq
->do_ctxt
;
3000 static bool main_q_is_draining
;
3002 // 6618342 Contact the team that owns the Instrument DTrace probe before
3003 // renaming this symbol
3006 _dispatch_queue_set_mainq_drain_state(bool arg
)
3008 main_q_is_draining
= arg
;
3012 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg DISPATCH_UNUSED
)
3014 if (main_q_is_draining
) {
3017 _dispatch_queue_set_mainq_drain_state(true);
3018 _dispatch_main_queue_drain();
3019 _dispatch_queue_set_mainq_drain_state(false);
3027 #if HAVE_PTHREAD_MAIN_NP
3028 if (pthread_main_np()) {
3030 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
3031 _dispatch_program_is_probably_callback_driven
= true;
3033 DISPATCH_CRASH("pthread_exit() returned");
3034 #if HAVE_PTHREAD_MAIN_NP
3036 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
3040 DISPATCH_NOINLINE DISPATCH_NORETURN
3042 _dispatch_sigsuspend(void)
3044 static const sigset_t mask
;
3053 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
3055 // never returns, so burn bridges behind us
3056 _dispatch_clear_stack(0);
3057 _dispatch_sigsuspend();
3062 _dispatch_queue_cleanup2(void)
3064 dispatch_queue_t dq
= &_dispatch_main_q
;
3065 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
3066 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
3067 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
3068 dq
->dq_is_thread_bound
= 0;
3069 if (suspend_cnt
== 0) {
3070 _dispatch_wakeup(dq
);
3073 // overload the "probably" variable to mean that dispatch_main() or
3074 // similar non-POSIX API was called
3075 // this has to run before the DISPATCH_COCOA_COMPAT below
3076 if (_dispatch_program_is_probably_callback_driven
) {
3077 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL
,
3078 _dispatch_sig_thread
);
3079 sleep(1); // workaround 6778970
3082 #if DISPATCH_COCOA_COMPAT
3083 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
3084 _dispatch_runloop_queue_port_init
);
3085 _dispatch_runloop_queue_port_dispose(dq
);
3090 _dispatch_queue_cleanup(void *ctxt
)
3092 if (ctxt
== &_dispatch_main_q
) {
3093 return _dispatch_queue_cleanup2();
3095 // POSIX defines that destructors are only called if 'ctxt' is non-null
3096 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");