2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
33 #if HAVE_PTHREAD_WORKQUEUES && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
34 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
38 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
39 #define pthread_workqueue_t void*
42 static void _dispatch_cache_cleanup(void *value
);
43 static void _dispatch_async_f_redirect(dispatch_queue_t dq
,
44 dispatch_continuation_t dc
);
45 static void _dispatch_queue_cleanup(void *ctxt
);
46 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq
,
48 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
49 static inline _dispatch_thread_semaphore_t
50 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
);
51 #if HAVE_PTHREAD_WORKQUEUES
52 static void _dispatch_worker_thread3(void *context
);
53 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
54 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
57 #if DISPATCH_USE_PTHREAD_POOL
58 static void *_dispatch_worker_thread(void *context
);
59 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
62 #if DISPATCH_COCOA_COMPAT
63 static dispatch_once_t _dispatch_main_q_port_pred
;
64 static dispatch_queue_t
_dispatch_main_queue_wakeup(void);
65 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
);
66 static void _dispatch_runloop_queue_port_init(void *ctxt
);
67 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
);
71 #pragma mark dispatch_root_queue
73 #if DISPATCH_ENABLE_THREAD_POOL
74 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
76 .do_vtable
= DISPATCH_VTABLE(semaphore
),
77 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
78 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
80 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
81 .do_vtable
= DISPATCH_VTABLE(semaphore
),
82 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
83 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
86 .do_vtable
= DISPATCH_VTABLE(semaphore
),
87 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
88 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
90 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
91 .do_vtable
= DISPATCH_VTABLE(semaphore
),
92 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
93 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
96 .do_vtable
= DISPATCH_VTABLE(semaphore
),
97 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
98 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
100 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
101 .do_vtable
= DISPATCH_VTABLE(semaphore
),
102 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
103 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
106 .do_vtable
= DISPATCH_VTABLE(semaphore
),
107 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
108 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
111 .do_vtable
= DISPATCH_VTABLE(semaphore
),
112 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
113 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
118 #define MAX_PTHREAD_COUNT 255
120 struct dispatch_root_queue_context_s
{
123 unsigned int volatile dgq_pending
;
124 #if HAVE_PTHREAD_WORKQUEUES
125 int dgq_wq_priority
, dgq_wq_options
;
126 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
127 pthread_workqueue_t dgq_kworkqueue
;
129 #endif // HAVE_PTHREAD_WORKQUEUES
130 #if DISPATCH_USE_PTHREAD_POOL
132 dispatch_semaphore_t dgq_thread_mediator
;
133 uint32_t volatile dgq_thread_pool_size
;
136 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
139 typedef struct dispatch_root_queue_context_s
*dispatch_root_queue_context_t
;
141 DISPATCH_CACHELINE_ALIGN
142 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
143 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {{{
144 #if HAVE_PTHREAD_WORKQUEUES
145 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
148 #if DISPATCH_ENABLE_THREAD_POOL
149 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
150 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
153 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {{{
154 #if HAVE_PTHREAD_WORKQUEUES
155 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
156 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
158 #if DISPATCH_ENABLE_THREAD_POOL
159 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
160 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
163 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {{{
164 #if HAVE_PTHREAD_WORKQUEUES
165 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
168 #if DISPATCH_ENABLE_THREAD_POOL
169 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
170 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
173 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {{{
174 #if HAVE_PTHREAD_WORKQUEUES
175 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
176 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
178 #if DISPATCH_ENABLE_THREAD_POOL
179 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
180 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
183 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {{{
184 #if HAVE_PTHREAD_WORKQUEUES
185 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
188 #if DISPATCH_ENABLE_THREAD_POOL
189 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
190 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
193 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {{{
194 #if HAVE_PTHREAD_WORKQUEUES
195 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
196 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
198 #if DISPATCH_ENABLE_THREAD_POOL
199 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
200 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
208 #if DISPATCH_ENABLE_THREAD_POOL
209 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
210 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
216 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
225 // 6618342 Contact the team that owns the Instrument DTrace probe before
226 // renaming this symbol
227 // dq_running is set to 2 so that barrier operations go through the slow path
228 DISPATCH_CACHELINE_ALIGN
229 struct dispatch_queue_s _dispatch_root_queues
[] = {
230 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
231 .do_vtable
= DISPATCH_VTABLE(queue_root
),
232 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
233 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
234 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
235 .do_ctxt
= &_dispatch_root_queue_contexts
[
236 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
237 .dq_label
= "com.apple.root.low-priority",
239 .dq_width
= UINT32_MAX
,
242 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
243 .do_vtable
= DISPATCH_VTABLE(queue_root
),
244 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
245 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
246 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
247 .do_ctxt
= &_dispatch_root_queue_contexts
[
248 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
249 .dq_label
= "com.apple.root.low-overcommit-priority",
251 .dq_width
= UINT32_MAX
,
254 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
255 .do_vtable
= DISPATCH_VTABLE(queue_root
),
256 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
257 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
258 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
259 .do_ctxt
= &_dispatch_root_queue_contexts
[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
261 .dq_label
= "com.apple.root.default-priority",
263 .dq_width
= UINT32_MAX
,
266 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
267 .do_vtable
= DISPATCH_VTABLE(queue_root
),
268 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
269 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
270 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
271 .do_ctxt
= &_dispatch_root_queue_contexts
[
272 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
273 .dq_label
= "com.apple.root.default-overcommit-priority",
275 .dq_width
= UINT32_MAX
,
278 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
279 .do_vtable
= DISPATCH_VTABLE(queue_root
),
280 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
281 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
282 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
283 .do_ctxt
= &_dispatch_root_queue_contexts
[
284 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
285 .dq_label
= "com.apple.root.high-priority",
287 .dq_width
= UINT32_MAX
,
290 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
291 .do_vtable
= DISPATCH_VTABLE(queue_root
),
292 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
293 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
294 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
295 .do_ctxt
= &_dispatch_root_queue_contexts
[
296 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
297 .dq_label
= "com.apple.root.high-overcommit-priority",
299 .dq_width
= UINT32_MAX
,
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
303 .do_vtable
= DISPATCH_VTABLE(queue_root
),
304 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
305 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
306 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
307 .do_ctxt
= &_dispatch_root_queue_contexts
[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
309 .dq_label
= "com.apple.root.background-priority",
311 .dq_width
= UINT32_MAX
,
314 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
315 .do_vtable
= DISPATCH_VTABLE(queue_root
),
316 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
317 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
318 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
319 .do_ctxt
= &_dispatch_root_queue_contexts
[
320 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
321 .dq_label
= "com.apple.root.background-overcommit-priority",
323 .dq_width
= UINT32_MAX
,
328 #if HAVE_PTHREAD_WORKQUEUES
329 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
330 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
331 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
332 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
333 &_dispatch_root_queues
[
334 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
335 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
336 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
337 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
338 &_dispatch_root_queues
[
339 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
340 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
341 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
342 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
343 &_dispatch_root_queues
[
344 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
345 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
346 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
347 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
348 &_dispatch_root_queues
[
349 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
351 #endif // HAVE_PTHREAD_WORKQUEUES
353 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
354 static struct dispatch_queue_s _dispatch_mgr_root_queue
;
356 #define _dispatch_mgr_root_queue \
357 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
360 // 6618342 Contact the team that owns the Instrument DTrace probe before
361 // renaming this symbol
362 DISPATCH_CACHELINE_ALIGN
363 struct dispatch_queue_s _dispatch_mgr_q
= {
364 .do_vtable
= DISPATCH_VTABLE(queue_mgr
),
365 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
366 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
367 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
368 .do_targetq
= &_dispatch_mgr_root_queue
,
369 .dq_label
= "com.apple.libdispatch-manager",
371 .dq_is_thread_bound
= 1,
376 dispatch_get_global_queue(long priority
, unsigned long flags
)
378 if (flags
& ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT
) {
381 return _dispatch_get_root_queue(priority
,
382 flags
& DISPATCH_QUEUE_OVERCOMMIT
);
385 DISPATCH_ALWAYS_INLINE
386 static inline dispatch_queue_t
387 _dispatch_get_current_queue(void)
389 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
393 dispatch_get_current_queue(void)
395 return _dispatch_get_current_queue();
398 DISPATCH_ALWAYS_INLINE
400 _dispatch_queue_targets_queue(dispatch_queue_t dq1
, dispatch_queue_t dq2
)
406 dq1
= dq1
->do_targetq
;
411 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
412 "Assertion failed: Block was run on an unexpected queue"
416 _dispatch_assert_queue_fail(dispatch_queue_t dq
, bool expected
)
419 asprintf(&msg
, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE
,
420 expected
? "Expected" : "Unexpected", dq
, dq
->dq_label
?
422 _dispatch_log("%s", msg
);
423 _dispatch_set_crash_log_message(msg
);
424 _dispatch_hardware_crash();
429 dispatch_assert_queue(dispatch_queue_t dq
)
431 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
432 DISPATCH_CLIENT_CRASH("invalid queue passed to "
433 "dispatch_assert_queue()");
435 dispatch_queue_t cq
= _dispatch_queue_get_current();
436 if (fastpath(cq
) && fastpath(_dispatch_queue_targets_queue(cq
, dq
))) {
439 _dispatch_assert_queue_fail(dq
, true);
443 dispatch_assert_queue_not(dispatch_queue_t dq
)
445 if (slowpath(!dq
) || slowpath(!(dx_metatype(dq
) == _DISPATCH_QUEUE_TYPE
))) {
446 DISPATCH_CLIENT_CRASH("invalid queue passed to "
447 "dispatch_assert_queue_not()");
449 dispatch_queue_t cq
= _dispatch_queue_get_current();
450 if (slowpath(cq
) && slowpath(_dispatch_queue_targets_queue(cq
, dq
))) {
451 _dispatch_assert_queue_fail(dq
, false);
455 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
456 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
457 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
459 #define _dispatch_root_queue_debug(...)
460 #define _dispatch_debug_root_queue(...)
464 #pragma mark dispatch_init
467 _dispatch_hw_config_init(void)
469 _dispatch_hw_config
.cc_max_active
= _dispatch_get_activecpu();
470 _dispatch_hw_config
.cc_max_logical
= _dispatch_get_logicalcpu_max();
471 _dispatch_hw_config
.cc_max_physical
= _dispatch_get_physicalcpu_max();
475 _dispatch_root_queues_init_workq(void)
478 #if HAVE_PTHREAD_WORKQUEUES
479 bool disable_wq
= false;
480 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
481 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
484 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
486 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
487 pthread_workqueue_setdispatchoffset_np(
488 offsetof(struct dispatch_queue_s
, dq_serialnum
));
490 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
491 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
492 (void)dispatch_assume_zero(r
);
496 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
497 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
499 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
500 pthread_workqueue_attr_t pwq_attr
;
502 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
503 (void)dispatch_assume_zero(r
);
507 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
508 pthread_workqueue_t pwq
= NULL
;
509 dispatch_root_queue_context_t qc
;
510 qc
= &_dispatch_root_queue_contexts
[i
];
511 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
513 #if DISPATCH_NO_BG_PRIORITY
514 && (qc
->dgq_wq_priority
!= WORKQ_BG_PRIOQUEUE
)
517 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
518 qc
->dgq_wq_priority
);
519 (void)dispatch_assume_zero(r
);
520 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
522 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
523 (void)dispatch_assume_zero(r
);
524 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
525 (void)dispatch_assume_zero(r
);
526 result
= result
|| dispatch_assume(pwq
);
528 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
529 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
531 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
533 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
534 (void)dispatch_assume_zero(r
);
538 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
539 #endif // HAVE_PTHREAD_WORKQUEUES
543 #if DISPATCH_USE_PTHREAD_POOL
545 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc
,
548 qc
->dgq_thread_pool_size
= overcommit
? MAX_PTHREAD_COUNT
:
549 _dispatch_hw_config
.cc_max_active
;
551 // override the default FIFO behavior for the pool semaphores
552 kern_return_t kr
= semaphore_create(mach_task_self(),
553 &qc
->dgq_thread_mediator
->dsema_port
, SYNC_POLICY_LIFO
, 0);
554 DISPATCH_VERIFY_MIG(kr
);
555 (void)dispatch_assume_zero(kr
);
556 (void)dispatch_assume(qc
->dgq_thread_mediator
->dsema_port
);
558 /* XXXRW: POSIX semaphores don't support LIFO? */
559 int ret
= sem_init(&qc
->dgq_thread_mediator
->dsema_sem
, 0, 0);
560 (void)dispatch_assume_zero(ret
);
563 #endif // DISPATCH_USE_PTHREAD_POOL
566 _dispatch_root_queues_init(void *context DISPATCH_UNUSED
)
568 _dispatch_safe_fork
= false;
569 if (!_dispatch_root_queues_init_workq()) {
570 #if DISPATCH_ENABLE_THREAD_POOL
572 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
573 bool overcommit
= true;
574 #if TARGET_OS_EMBEDDED
575 // some software hangs if the non-overcommitting queues do not
576 // overcommit when threads block. Someday, this behavior should
577 // apply to all platforms
582 _dispatch_root_queue_init_pthread_pool(
583 &_dispatch_root_queue_contexts
[i
], overcommit
);
586 DISPATCH_CRASH("Root queue initialization failed");
587 #endif // DISPATCH_ENABLE_THREAD_POOL
592 #define countof(x) (sizeof(x) / sizeof(x[0]))
594 DISPATCH_EXPORT DISPATCH_NOTHROW
596 libdispatch_init(void)
598 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 4);
599 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 8);
601 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
602 -DISPATCH_QUEUE_PRIORITY_HIGH
);
603 dispatch_assert(countof(_dispatch_root_queues
) ==
604 DISPATCH_ROOT_QUEUE_COUNT
);
605 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
606 DISPATCH_ROOT_QUEUE_COUNT
);
607 #if HAVE_PTHREAD_WORKQUEUES
608 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
609 sizeof(_dispatch_wq2root_queues
[0][0]) ==
610 DISPATCH_ROOT_QUEUE_COUNT
);
612 #if DISPATCH_ENABLE_THREAD_POOL
613 dispatch_assert(countof(_dispatch_thread_mediator
) ==
614 DISPATCH_ROOT_QUEUE_COUNT
);
617 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
618 DISPATCH_CONTINUATION_SIZE
);
619 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
621 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
622 DISPATCH_CACHELINE_SIZE
== 0);
624 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
625 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
626 _dispatch_thread_key_create(&dispatch_sema4_key
,
627 (void (*)(void *))_dispatch_thread_semaphore_dispose
);
629 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
630 _dispatch_thread_key_create(&dispatch_io_key
, NULL
);
631 _dispatch_thread_key_create(&dispatch_apply_key
, NULL
);
632 #if DISPATCH_PERF_MON
633 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
636 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
637 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
638 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
];
641 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
642 _dispatch_queue_set_bound_thread(&_dispatch_main_q
);
644 #if DISPATCH_USE_PTHREAD_ATFORK
645 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
646 dispatch_atfork_parent
, dispatch_atfork_child
));
649 _dispatch_hw_config_init();
650 _dispatch_vtable_init();
652 _dispatch_introspection_init();
655 DISPATCH_EXPORT DISPATCH_NOTHROW
657 dispatch_atfork_child(void)
659 void *crash
= (void *)0x100;
662 if (_dispatch_safe_fork
) {
665 _dispatch_child_of_unsafe_fork
= true;
667 _dispatch_main_q
.dq_items_head
= crash
;
668 _dispatch_main_q
.dq_items_tail
= crash
;
670 _dispatch_mgr_q
.dq_items_head
= crash
;
671 _dispatch_mgr_q
.dq_items_tail
= crash
;
673 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
674 _dispatch_root_queues
[i
].dq_items_head
= crash
;
675 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
680 #pragma mark dispatch_queue_t
686 // 4,5,6,7,8,9,10,11 - global queues
687 // we use 'xadd' on Intel, so the initial value == next assigned
688 unsigned long volatile _dispatch_queue_serial_numbers
= 12;
691 dispatch_queue_create_with_target(const char *label
,
692 dispatch_queue_attr_t attr
, dispatch_queue_t tq
)
696 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
697 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
699 _dispatch_queue_init(dq
);
701 dq
->dq_label
= strdup(label
);
704 if (attr
== DISPATCH_QUEUE_CONCURRENT
) {
705 dq
->dq_width
= UINT32_MAX
;
707 tq
= _dispatch_get_root_queue(0, false);
711 // Default target queue is overcommit!
712 tq
= _dispatch_get_root_queue(0, true);
714 if (slowpath(attr
)) {
715 dispatch_debug_assert(!attr
, "Invalid attribute");
719 _dispatch_object_debug(dq
, "%s", __func__
);
720 return _dispatch_introspection_queue_create(dq
);
724 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
726 return dispatch_queue_create_with_target(label
, attr
,
727 DISPATCH_TARGET_QUEUE_DEFAULT
);
731 _dispatch_queue_destroy(dispatch_object_t dou
)
733 dispatch_queue_t dq
= dou
._dq
;
734 if (slowpath(dq
== _dispatch_queue_get_current())) {
735 DISPATCH_CRASH("Release of a queue by itself");
737 if (slowpath(dq
->dq_items_tail
)) {
738 DISPATCH_CRASH("Release of a queue while items are enqueued");
741 // trash the tail queue so that use after free will crash
742 dq
->dq_items_tail
= (void *)0x200;
744 dispatch_queue_t dqsq
= dispatch_atomic_xchg2o(dq
, dq_specific_q
,
745 (void *)0x200, relaxed
);
747 _dispatch_release(dqsq
);
751 // 6618342 Contact the team that owns the Instrument DTrace probe before
752 // renaming this symbol
754 _dispatch_queue_dispose(dispatch_queue_t dq
)
756 _dispatch_object_debug(dq
, "%s", __func__
);
757 _dispatch_introspection_queue_dispose(dq
);
759 free((void*)dq
->dq_label
);
761 _dispatch_queue_destroy(dq
);
765 dispatch_queue_get_label(dispatch_queue_t dq
)
767 if (slowpath(dq
== DISPATCH_CURRENT_QUEUE_LABEL
)) {
768 dq
= _dispatch_get_current_queue();
770 return dq
->dq_label
? dq
->dq_label
: "";
774 _dispatch_queue_set_width2(void *ctxt
)
776 int w
= (int)(intptr_t)ctxt
; // intentional truncation
778 dispatch_queue_t dq
= _dispatch_queue_get_current();
780 if (w
== 1 || w
== 0) {
782 _dispatch_object_debug(dq
, "%s", __func__
);
786 tmp
= (unsigned int)w
;
788 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
789 tmp
= _dispatch_hw_config
.cc_max_physical
;
791 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
792 tmp
= _dispatch_hw_config
.cc_max_active
;
796 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
797 tmp
= _dispatch_hw_config
.cc_max_logical
;
800 // multiply by two since the running count is inc/dec by two
801 // (the low bit == barrier)
802 dq
->dq_width
= tmp
* 2;
803 _dispatch_object_debug(dq
, "%s", __func__
);
807 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
809 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
810 slowpath(dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
813 _dispatch_barrier_trysync_f(dq
, (void*)(intptr_t)width
,
814 _dispatch_queue_set_width2
);
817 // 6618342 Contact the team that owns the Instrument DTrace probe before
818 // renaming this symbol
820 _dispatch_set_target_queue2(void *ctxt
)
822 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
824 prev_dq
= dq
->do_targetq
;
825 dq
->do_targetq
= ctxt
;
826 _dispatch_release(prev_dq
);
827 _dispatch_object_debug(dq
, "%s", __func__
);
831 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
833 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue
, dou
, dq
);
834 if (slowpath(dou
._do
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
) ||
835 slowpath(dx_type(dou
._do
) == DISPATCH_QUEUE_ROOT_TYPE
)) {
838 unsigned long type
= dx_metatype(dou
._do
);
840 bool is_concurrent_q
= (type
== _DISPATCH_QUEUE_TYPE
&&
841 slowpath(dou
._dq
->dq_width
> 1));
842 dq
= _dispatch_get_root_queue(0, !is_concurrent_q
);
844 // TODO: put into the vtable
846 case _DISPATCH_QUEUE_TYPE
:
847 case _DISPATCH_SOURCE_TYPE
:
848 _dispatch_retain(dq
);
849 return _dispatch_barrier_trysync_f(dou
._dq
, dq
,
850 _dispatch_set_target_queue2
);
851 case _DISPATCH_IO_TYPE
:
852 return _dispatch_io_set_target_queue(dou
._dchannel
, dq
);
854 dispatch_queue_t prev_dq
;
855 _dispatch_retain(dq
);
856 prev_dq
= dispatch_atomic_xchg2o(dou
._do
, do_targetq
, dq
, release
);
857 if (prev_dq
) _dispatch_release(prev_dq
);
858 _dispatch_object_debug(dou
._do
, "%s", __func__
);
865 #pragma mark dispatch_pthread_root_queue
867 struct dispatch_pthread_root_queue_context_s
{
868 pthread_attr_t dpq_thread_attr
;
869 dispatch_block_t dpq_thread_configure
;
870 struct dispatch_semaphore_s dpq_thread_mediator
;
872 typedef struct dispatch_pthread_root_queue_context_s
*
873 dispatch_pthread_root_queue_context_t
;
875 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
876 static struct dispatch_pthread_root_queue_context_s
877 _dispatch_mgr_root_queue_pthread_context
;
878 static struct dispatch_root_queue_context_s
879 _dispatch_mgr_root_queue_context
= {{{
880 #if HAVE_PTHREAD_WORKQUEUES
881 .dgq_kworkqueue
= (void*)(~0ul),
883 .dgq_ctxt
= &_dispatch_mgr_root_queue_pthread_context
,
884 .dgq_thread_pool_size
= 1,
886 static struct dispatch_queue_s _dispatch_mgr_root_queue
= {
887 .do_vtable
= DISPATCH_VTABLE(queue_root
),
888 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
889 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
890 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
891 .do_ctxt
= &_dispatch_mgr_root_queue_context
,
892 .dq_label
= "com.apple.root.libdispatch-manager",
894 .dq_width
= UINT32_MAX
,
901 } _dispatch_mgr_sched
;
902 static dispatch_once_t _dispatch_mgr_sched_pred
;
905 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED
)
907 struct sched_param param
;
908 pthread_attr_t
*attr
;
909 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
910 (void)dispatch_assume_zero(pthread_attr_init(attr
));
911 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr
,
912 &_dispatch_mgr_sched
.policy
));
913 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
914 // high-priority workq threads are at priority 2 above default
915 _dispatch_mgr_sched
.prio
= param
.sched_priority
+ 2;
920 _dispatch_mgr_root_queue_init(void)
922 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
923 struct sched_param param
;
924 pthread_attr_t
*attr
;
925 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
926 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr
,
927 PTHREAD_CREATE_DETACHED
));
929 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr
, 64 * 1024));
931 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
932 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr
, ¶m
));
933 return &_dispatch_mgr_sched
.tid
;
937 _dispatch_mgr_priority_apply(void)
939 struct sched_param param
;
941 param
.sched_priority
= _dispatch_mgr_sched
.prio
;
942 (void)dispatch_assume_zero(pthread_setschedparam(
943 _dispatch_mgr_sched
.tid
, _dispatch_mgr_sched
.policy
, ¶m
));
944 } while (_dispatch_mgr_sched
.prio
> param
.sched_priority
);
949 _dispatch_mgr_priority_init(void)
951 struct sched_param param
;
952 pthread_attr_t
*attr
;
953 attr
= &_dispatch_mgr_root_queue_pthread_context
.dpq_thread_attr
;
954 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
955 if (slowpath(_dispatch_mgr_sched
.prio
> param
.sched_priority
)) {
956 return _dispatch_mgr_priority_apply();
962 _dispatch_mgr_priority_raise(const pthread_attr_t
*attr
)
964 dispatch_once_f(&_dispatch_mgr_sched_pred
, NULL
, _dispatch_mgr_sched_init
);
965 struct sched_param param
;
966 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr
, ¶m
));
967 int p
= _dispatch_mgr_sched
.prio
;
968 do if (p
>= param
.sched_priority
) {
970 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched
, prio
,
971 p
, param
.sched_priority
, &p
, relaxed
)));
972 if (_dispatch_mgr_sched
.tid
) {
973 return _dispatch_mgr_priority_apply();
978 dispatch_pthread_root_queue_create(const char *label
, unsigned long flags
,
979 const pthread_attr_t
*attr
, dispatch_block_t configure
)
982 dispatch_root_queue_context_t qc
;
983 dispatch_pthread_root_queue_context_t pqc
;
986 if (slowpath(flags
)) {
989 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
990 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_root
), dqs
+
991 sizeof(struct dispatch_root_queue_context_s
) +
992 sizeof(struct dispatch_pthread_root_queue_context_s
));
993 qc
= (void*)dq
+ dqs
;
994 pqc
= (void*)qc
+ sizeof(struct dispatch_root_queue_context_s
);
996 _dispatch_queue_init(dq
);
998 dq
->dq_label
= strdup(label
);
1001 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
1003 dq
->do_targetq
= NULL
;
1005 dq
->dq_width
= UINT32_MAX
;
1007 pqc
->dpq_thread_mediator
.do_vtable
= DISPATCH_VTABLE(semaphore
);
1008 qc
->dgq_thread_mediator
= &pqc
->dpq_thread_mediator
;
1010 #if HAVE_PTHREAD_WORKQUEUES
1011 qc
->dgq_kworkqueue
= (void*)(~0ul);
1013 _dispatch_root_queue_init_pthread_pool(qc
, true); // rdar://11352331
1016 memcpy(&pqc
->dpq_thread_attr
, attr
, sizeof(pthread_attr_t
));
1017 _dispatch_mgr_priority_raise(&pqc
->dpq_thread_attr
);
1019 (void)dispatch_assume_zero(pthread_attr_init(&pqc
->dpq_thread_attr
));
1021 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1022 &pqc
->dpq_thread_attr
, PTHREAD_CREATE_DETACHED
));
1024 pqc
->dpq_thread_configure
= _dispatch_Block_copy(configure
);
1026 _dispatch_object_debug(dq
, "%s", __func__
);
1027 return _dispatch_introspection_queue_create(dq
);
1032 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq
)
1034 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1035 DISPATCH_CRASH("Global root queue disposed");
1037 _dispatch_object_debug(dq
, "%s", __func__
);
1038 _dispatch_introspection_queue_dispose(dq
);
1039 #if DISPATCH_USE_PTHREAD_POOL
1040 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
1041 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
1043 _dispatch_semaphore_dispose(qc
->dgq_thread_mediator
);
1044 if (pqc
->dpq_thread_configure
) {
1045 Block_release(pqc
->dpq_thread_configure
);
1047 dq
->do_targetq
= _dispatch_get_root_queue(0, false);
1050 free((void*)dq
->dq_label
);
1052 _dispatch_queue_destroy(dq
);
1056 #pragma mark dispatch_queue_specific
1058 struct dispatch_queue_specific_queue_s
{
1059 DISPATCH_STRUCT_HEADER(queue_specific_queue
);
1060 DISPATCH_QUEUE_HEADER
;
1061 TAILQ_HEAD(dispatch_queue_specific_head_s
,
1062 dispatch_queue_specific_s
) dqsq_contexts
;
1065 struct dispatch_queue_specific_s
{
1066 const void *dqs_key
;
1068 dispatch_function_t dqs_destructor
;
1069 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
1071 DISPATCH_DECL(dispatch_queue_specific
);
1074 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
1076 dispatch_queue_specific_t dqs
, tmp
;
1078 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
1079 if (dqs
->dqs_destructor
) {
1080 dispatch_async_f(_dispatch_get_root_queue(
1081 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
1082 dqs
->dqs_destructor
);
1086 _dispatch_queue_destroy((dispatch_queue_t
)dqsq
);
1090 _dispatch_queue_init_specific(dispatch_queue_t dq
)
1092 dispatch_queue_specific_queue_t dqsq
;
1094 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
1095 sizeof(struct dispatch_queue_specific_queue_s
));
1096 _dispatch_queue_init((dispatch_queue_t
)dqsq
);
1097 dqsq
->do_xref_cnt
= -1;
1098 dqsq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH
,
1100 dqsq
->dq_width
= UINT32_MAX
;
1101 dqsq
->dq_label
= "queue-specific";
1102 TAILQ_INIT(&dqsq
->dqsq_contexts
);
1103 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
1104 (dispatch_queue_t
)dqsq
, release
))) {
1105 _dispatch_release((dispatch_queue_t
)dqsq
);
1110 _dispatch_queue_set_specific(void *ctxt
)
1112 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
1113 dispatch_queue_specific_queue_t dqsq
=
1114 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1116 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1117 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
1118 // Destroy previous context for existing key
1119 if (dqs
->dqs_destructor
) {
1120 dispatch_async_f(_dispatch_get_root_queue(
1121 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
1122 dqs
->dqs_destructor
);
1124 if (dqsn
->dqs_ctxt
) {
1125 // Copy new context for existing key
1126 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
1127 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
1129 // Remove context storage for existing key
1130 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
1136 // Insert context storage for new key
1137 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
1142 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
1143 void *ctxt
, dispatch_function_t destructor
)
1145 if (slowpath(!key
)) {
1148 dispatch_queue_specific_t dqs
;
1150 dqs
= _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s
));
1152 dqs
->dqs_ctxt
= ctxt
;
1153 dqs
->dqs_destructor
= destructor
;
1154 if (slowpath(!dq
->dq_specific_q
)) {
1155 _dispatch_queue_init_specific(dq
);
1157 _dispatch_barrier_trysync_f(dq
->dq_specific_q
, dqs
,
1158 _dispatch_queue_set_specific
);
1162 _dispatch_queue_get_specific(void *ctxt
)
1164 void **ctxtp
= ctxt
;
1166 dispatch_queue_specific_queue_t dqsq
=
1167 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
1168 dispatch_queue_specific_t dqs
;
1170 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
1171 if (dqs
->dqs_key
== key
) {
1172 *ctxtp
= dqs
->dqs_ctxt
;
1181 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
1183 if (slowpath(!key
)) {
1188 if (fastpath(dq
->dq_specific_q
)) {
1190 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
1197 dispatch_get_specific(const void *key
)
1199 if (slowpath(!key
)) {
1203 dispatch_queue_t dq
= _dispatch_queue_get_current();
1205 while (slowpath(dq
)) {
1206 if (slowpath(dq
->dq_specific_q
)) {
1208 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
1209 _dispatch_queue_get_specific
);
1212 dq
= dq
->do_targetq
;
1218 #pragma mark dispatch_queue_debug
1221 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1224 dispatch_queue_t target
= dq
->do_targetq
;
1225 offset
+= dsnprintf(buf
, bufsiz
, "target = %s[%p], width = 0x%x, "
1226 "running = 0x%x, barrier = %d ", target
&& target
->dq_label
?
1227 target
->dq_label
: "", target
, dq
->dq_width
/ 2,
1228 dq
->dq_running
/ 2, dq
->dq_running
& 1);
1229 if (dq
->dq_is_thread_bound
) {
1230 offset
+= dsnprintf(buf
, bufsiz
, ", thread = %p ",
1231 _dispatch_queue_get_bound_thread(dq
));
1237 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1240 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
1241 dq
->dq_label
? dq
->dq_label
: dx_kind(dq
), dq
);
1242 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1243 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1244 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
1250 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1252 _dispatch_object_debug(dq
, "%s", str
);
1254 _dispatch_log("queue[NULL]: %s", str
);
1259 #if DISPATCH_PERF_MON
1260 static OSSpinLock _dispatch_stats_lock
;
1262 uint64_t time_total
;
1263 uint64_t count_total
;
1264 uint64_t thread_total
;
1265 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
1268 _dispatch_queue_merge_stats(uint64_t start
)
1270 uint64_t avg
, delta
= _dispatch_absolute_time() - start
;
1271 unsigned long count
, bucket
;
1273 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1274 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1277 avg
= delta
/ count
;
1278 bucket
= flsll(avg
);
1283 // 64-bit counters on 32-bit require a lock or a queue
1284 OSSpinLockLock(&_dispatch_stats_lock
);
1286 _dispatch_stats
[bucket
].time_total
+= delta
;
1287 _dispatch_stats
[bucket
].count_total
+= count
;
1288 _dispatch_stats
[bucket
].thread_total
++;
1290 OSSpinLockUnlock(&_dispatch_stats_lock
);
1295 #pragma mark dispatch_continuation_t
1298 _dispatch_force_cache_cleanup(void)
1300 dispatch_continuation_t dc
;
1301 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1303 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1304 _dispatch_cache_cleanup(dc
);
1310 _dispatch_cache_cleanup(void *value
)
1312 dispatch_continuation_t dc
, next_dc
= value
;
1314 while ((dc
= next_dc
)) {
1315 next_dc
= dc
->do_next
;
1316 _dispatch_continuation_free_to_heap(dc
);
1320 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1321 int _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
1325 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc
)
1327 _dispatch_continuation_free_to_heap(dc
);
1328 dispatch_continuation_t next_dc
;
1329 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1331 if (!dc
|| (cnt
= dc
->do_ref_cnt
-_dispatch_continuation_cache_limit
) <= 0) {
1335 next_dc
= dc
->do_next
;
1336 _dispatch_continuation_free_to_heap(dc
);
1337 } while (--cnt
&& (dc
= next_dc
));
1338 _dispatch_thread_setspecific(dispatch_cache_key
, next_dc
);
1342 DISPATCH_ALWAYS_INLINE_NDEBUG
1344 _dispatch_continuation_redirect(dispatch_queue_t dq
, dispatch_object_t dou
)
1346 dispatch_continuation_t dc
= dou
._dc
;
1348 _dispatch_trace_continuation_pop(dq
, dou
);
1349 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, acquire
);
1350 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
1351 (long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
) {
1352 _dispatch_thread_semaphore_signal(
1353 (_dispatch_thread_semaphore_t
)dc
->dc_other
);
1355 _dispatch_async_f_redirect(dq
, dc
);
1359 DISPATCH_ALWAYS_INLINE_NDEBUG
1361 _dispatch_continuation_pop(dispatch_object_t dou
)
1363 dispatch_continuation_t dc
= dou
._dc
, dc1
;
1364 dispatch_group_t dg
;
1366 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou
);
1367 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
1368 return dx_invoke(dou
._do
);
1371 // Add the item back to the cache before calling the function. This
1372 // allows the 'hot' continuation to be used for a quick callback.
1374 // The ccache version is per-thread.
1375 // Therefore, the object has not been reused yet.
1376 // This generates better assembly.
1377 if ((long)dc
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
1378 dc1
= _dispatch_continuation_free_cacheonly(dc
);
1382 if ((long)dc
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
1387 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
1389 dispatch_group_leave(dg
);
1390 _dispatch_release(dg
);
1392 if (slowpath(dc1
)) {
1393 _dispatch_continuation_free_to_cache_limit(dc1
);
1398 #pragma mark dispatch_barrier_async
1402 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1403 dispatch_function_t func
)
1405 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1407 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1411 _dispatch_queue_push(dq
, dc
);
1416 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
1417 dispatch_function_t func
)
1419 dispatch_continuation_t dc
;
1421 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1423 return _dispatch_barrier_async_f_slow(dq
, ctxt
, func
);
1426 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1430 _dispatch_queue_push(dq
, dc
);
1435 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
1437 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
),
1438 _dispatch_call_block_and_release
);
1443 #pragma mark dispatch_async
1446 _dispatch_async_redirect_invoke(void *ctxt
)
1448 struct dispatch_continuation_s
*dc
= ctxt
;
1449 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
1450 dispatch_queue_t old_dq
, dq
= dc
->dc_data
, rq
;
1452 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1453 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1454 _dispatch_continuation_pop(other_dc
);
1455 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1457 rq
= dq
->do_targetq
;
1458 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
1459 if (dispatch_atomic_sub2o(rq
, dq_running
, 2, relaxed
) == 0) {
1460 _dispatch_wakeup(rq
);
1462 rq
= rq
->do_targetq
;
1465 if (dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0) {
1466 _dispatch_wakeup(dq
);
1468 _dispatch_release(dq
);
1472 _dispatch_async_f_redirect2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1474 uint32_t running
= 2;
1476 // Find the queue to redirect to
1478 if (slowpath(dq
->dq_items_tail
) ||
1479 slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) ||
1480 slowpath(dq
->dq_width
== 1)) {
1483 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1484 if (slowpath(running
& 1) || slowpath(running
> dq
->dq_width
)) {
1485 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1488 dq
= dq
->do_targetq
;
1489 } while (slowpath(dq
->do_targetq
));
1491 _dispatch_queue_push_wakeup(dq
, dc
, running
== 0);
1496 _dispatch_async_f_redirect(dispatch_queue_t dq
,
1497 dispatch_continuation_t other_dc
)
1499 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
1501 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1502 dc
->dc_func
= _dispatch_async_redirect_invoke
;
1505 dc
->dc_other
= other_dc
;
1507 _dispatch_retain(dq
);
1508 dq
= dq
->do_targetq
;
1509 if (slowpath(dq
->do_targetq
)) {
1510 return _dispatch_async_f_redirect2(dq
, dc
);
1513 _dispatch_queue_push(dq
, dc
);
1518 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1520 uint32_t running
= 2;
1523 if (slowpath(dq
->dq_items_tail
)
1524 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1527 running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1528 if (slowpath(running
> dq
->dq_width
)) {
1529 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1532 if (!slowpath(running
& 1)) {
1533 return _dispatch_async_f_redirect(dq
, dc
);
1535 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1536 // We might get lucky and find that the barrier has ended by now
1537 } while (!(running
& 1));
1539 _dispatch_queue_push_wakeup(dq
, dc
, running
== 0);
1544 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1545 dispatch_function_t func
)
1547 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1549 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1553 // No fastpath/slowpath hint because we simply don't know
1554 if (dq
->do_targetq
) {
1555 return _dispatch_async_f2(dq
, dc
);
1558 _dispatch_queue_push(dq
, dc
);
1563 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1565 dispatch_continuation_t dc
;
1567 // No fastpath/slowpath hint because we simply don't know
1568 if (dq
->dq_width
== 1) {
1569 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1572 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1574 return _dispatch_async_f_slow(dq
, ctxt
, func
);
1577 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1581 // No fastpath/slowpath hint because we simply don't know
1582 if (dq
->do_targetq
) {
1583 return _dispatch_async_f2(dq
, dc
);
1586 _dispatch_queue_push(dq
, dc
);
1591 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
1593 dispatch_async_f(dq
, _dispatch_Block_copy(work
),
1594 _dispatch_call_block_and_release
);
1599 #pragma mark dispatch_group_async
1603 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
1604 dispatch_function_t func
)
1606 dispatch_continuation_t dc
;
1608 _dispatch_retain(dg
);
1609 dispatch_group_enter(dg
);
1611 dc
= _dispatch_continuation_alloc();
1613 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_GROUP_BIT
);
1618 // No fastpath/slowpath hint because we simply don't know
1619 if (dq
->dq_width
!= 1 && dq
->do_targetq
) {
1620 return _dispatch_async_f2(dq
, dc
);
1623 _dispatch_queue_push(dq
, dc
);
1628 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
1629 dispatch_block_t db
)
1631 dispatch_group_async_f(dg
, dq
, _dispatch_Block_copy(db
),
1632 _dispatch_call_block_and_release
);
1637 #pragma mark dispatch_function_invoke
1639 DISPATCH_ALWAYS_INLINE
1641 _dispatch_function_invoke(dispatch_queue_t dq
, void *ctxt
,
1642 dispatch_function_t func
)
1644 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1645 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1646 _dispatch_client_callout(ctxt
, func
);
1647 _dispatch_perfmon_workitem_inc();
1648 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1652 _dispatch_sync_recurse_invoke(void *ctxt
)
1654 dispatch_continuation_t dc
= ctxt
;
1655 _dispatch_function_invoke(dc
->dc_data
, dc
->dc_ctxt
, dc
->dc_func
);
1658 DISPATCH_ALWAYS_INLINE
1660 _dispatch_function_recurse(dispatch_queue_t dq
, void *ctxt
,
1661 dispatch_function_t func
)
1663 struct dispatch_continuation_s dc
= {
1668 dispatch_sync_f(dq
->do_targetq
, &dc
, _dispatch_sync_recurse_invoke
);
1672 #pragma mark dispatch_barrier_sync
1674 static void _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1675 dispatch_function_t func
);
1677 DISPATCH_ALWAYS_INLINE_NDEBUG
1678 static inline _dispatch_thread_semaphore_t
1679 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq
, dispatch_object_t dou
,
1682 _dispatch_thread_semaphore_t sema
;
1683 dispatch_continuation_t dc
= dou
._dc
;
1685 if (DISPATCH_OBJ_IS_VTABLE(dc
) || ((long)dc
->do_vtable
&
1686 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) !=
1687 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) {
1690 _dispatch_trace_continuation_pop(dq
, dc
);
1691 _dispatch_perfmon_workitem_inc();
1695 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
1697 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1698 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1699 // rdar://problem/9032024 running lock must be held until sync_f_slow
1701 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1703 return sema
? sema
: MACH_PORT_DEAD
;
1707 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
1709 dispatch_continuation_t dc
= ctxt
;
1710 dispatch_queue_t dq
= dc
->dc_data
;
1711 _dispatch_thread_semaphore_t sema
;
1712 sema
= (_dispatch_thread_semaphore_t
)dc
->dc_other
;
1714 dispatch_assert(dq
== _dispatch_queue_get_current());
1715 #if DISPATCH_COCOA_COMPAT
1716 if (slowpath(dq
->dq_is_thread_bound
)) {
1717 // The queue is bound to a non-dispatch thread (e.g. main thread)
1718 dc
->dc_func(dc
->dc_ctxt
);
1719 dispatch_atomic_store2o(dc
, dc_func
, NULL
, release
);
1720 _dispatch_thread_semaphore_signal(sema
); // release
1724 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1725 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1726 // rdar://9032024 running lock must be held until sync_f_slow returns
1727 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1728 _dispatch_thread_semaphore_signal(sema
); // release
1733 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
1734 dispatch_function_t func
)
1736 if (slowpath(!dq
->do_targetq
)) {
1737 // the global concurrent queues do not need strict ordering
1738 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1739 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1741 // It's preferred to execute synchronous blocks on the current thread
1742 // due to thread-local side effects, garbage collection, etc. However,
1743 // blocks submitted to the main thread MUST be run on the main thread
1745 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1746 struct dispatch_continuation_s dc
= {
1748 #if DISPATCH_COCOA_COMPAT
1752 .dc_other
= (void*)sema
,
1754 struct dispatch_continuation_s dbss
= {
1755 .do_vtable
= (void *)(DISPATCH_OBJ_BARRIER_BIT
|
1756 DISPATCH_OBJ_SYNC_SLOW_BIT
),
1757 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
1759 #if DISPATCH_INTROSPECTION
1760 .dc_data
= (void*)_dispatch_thread_self(),
1763 _dispatch_queue_push(dq
, &dbss
);
1765 _dispatch_thread_semaphore_wait(sema
); // acquire
1766 _dispatch_put_thread_semaphore(sema
);
1768 #if DISPATCH_COCOA_COMPAT
1769 // Queue bound to a non-dispatch thread
1770 if (dc
.dc_func
== NULL
) {
1774 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1775 _dispatch_function_recurse(dq
, ctxt
, func
);
1777 _dispatch_function_invoke(dq
, ctxt
, func
);
1779 if (fastpath(dq
->do_suspend_cnt
< 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL
) &&
1780 dq
->dq_running
== 2) {
1781 // rdar://problem/8290662 "lock transfer"
1782 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1784 _dispatch_thread_semaphore_signal(sema
); // release
1788 (void)dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
1789 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1790 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, release
) == 0)) {
1791 _dispatch_wakeup(dq
);
1797 _dispatch_barrier_sync_f2(dispatch_queue_t dq
)
1799 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1800 // rdar://problem/8290662 "lock transfer"
1801 _dispatch_thread_semaphore_t sema
;
1802 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1804 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1805 DISPATCH_OBJECT_SUSPEND_INTERVAL
, relaxed
);
1806 // rdar://9032024 running lock must be held until sync_f_slow
1807 // returns: increment by 2 and decrement by 1
1808 (void)dispatch_atomic_inc2o(dq
, dq_running
, relaxed
);
1809 _dispatch_thread_semaphore_signal(sema
);
1813 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1814 _dispatch_wakeup(dq
);
1820 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1821 dispatch_function_t func
)
1823 _dispatch_function_invoke(dq
, ctxt
, func
);
1824 if (slowpath(dq
->dq_items_tail
)) {
1825 return _dispatch_barrier_sync_f2(dq
);
1827 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1828 _dispatch_wakeup(dq
);
1834 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1835 dispatch_function_t func
)
1837 _dispatch_function_recurse(dq
, ctxt
, func
);
1838 if (slowpath(dq
->dq_items_tail
)) {
1839 return _dispatch_barrier_sync_f2(dq
);
1841 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1842 _dispatch_wakeup(dq
);
1848 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
1849 dispatch_function_t func
)
1851 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1852 // 2) the queue is not suspended
1853 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1854 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1856 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1, acquire
))) {
1857 // global concurrent queues and queues bound to non-dispatch threads
1858 // always fall into the slow case
1859 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1861 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1862 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
);
1864 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
1868 #if DISPATCH_COCOA_COMPAT
1871 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1873 // Blocks submitted to the main queue MUST be run on the main thread,
1874 // therefore under GC we must Block_copy in order to notify the thread-local
1875 // garbage collector that the objects are transferring to the main thread
1876 // rdar://problem/7176237&7181849&7458685
1877 if (dispatch_begin_thread_4GC
) {
1878 dispatch_block_t block
= _dispatch_Block_copy(work
);
1879 return dispatch_barrier_sync_f(dq
, block
,
1880 _dispatch_call_block_and_release
);
1882 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
1887 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
1889 #if DISPATCH_COCOA_COMPAT
1890 if (slowpath(dq
->dq_is_thread_bound
)) {
1891 return _dispatch_barrier_sync_slow(dq
, work
);
1894 dispatch_barrier_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
1900 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1901 dispatch_function_t func
)
1903 _dispatch_function_invoke(dq
, ctxt
, func
);
1904 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
, release
) == 0)) {
1905 _dispatch_wakeup(dq
);
1911 _dispatch_barrier_trysync_f(dispatch_queue_t dq
, void *ctxt
,
1912 dispatch_function_t func
)
1914 // Use for mutation of queue-/source-internal state only, ignores target
1916 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
1917 || slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1,
1919 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1921 _dispatch_barrier_trysync_f_invoke(dq
, ctxt
, func
);
1925 #pragma mark dispatch_sync
1929 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
,
1932 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1933 struct dispatch_continuation_s dss
= {
1934 .do_vtable
= (void*)DISPATCH_OBJ_SYNC_SLOW_BIT
,
1935 #if DISPATCH_INTROSPECTION
1938 .dc_data
= (void*)_dispatch_thread_self(),
1940 .dc_other
= (void*)sema
,
1942 _dispatch_queue_push_wakeup(dq
, &dss
, wakeup
);
1944 _dispatch_thread_semaphore_wait(sema
);
1945 _dispatch_put_thread_semaphore(sema
);
1947 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1948 _dispatch_function_recurse(dq
, ctxt
, func
);
1950 _dispatch_function_invoke(dq
, ctxt
, func
);
1952 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1953 _dispatch_wakeup(dq
);
1959 _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1960 dispatch_function_t func
)
1962 _dispatch_function_invoke(dq
, ctxt
, func
);
1963 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1964 _dispatch_wakeup(dq
);
1970 _dispatch_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1971 dispatch_function_t func
)
1973 _dispatch_function_recurse(dq
, ctxt
, func
);
1974 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
) == 0)) {
1975 _dispatch_wakeup(dq
);
1980 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1982 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1983 // 2) the queue is not suspended
1984 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1985 return _dispatch_sync_f_slow(dq
, ctxt
, func
, false);
1987 uint32_t running
= dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
1988 if (slowpath(running
& 1)) {
1989 running
= dispatch_atomic_sub2o(dq
, dq_running
, 2, relaxed
);
1990 return _dispatch_sync_f_slow(dq
, ctxt
, func
, running
== 0);
1992 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1993 return _dispatch_sync_f_recurse(dq
, ctxt
, func
);
1995 _dispatch_sync_f_invoke(dq
, ctxt
, func
);
2000 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
2002 if (fastpath(dq
->dq_width
== 1)) {
2003 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
2005 if (slowpath(!dq
->do_targetq
)) {
2006 // the global concurrent queues do not need strict ordering
2007 (void)dispatch_atomic_add2o(dq
, dq_running
, 2, relaxed
);
2008 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
2010 _dispatch_sync_f2(dq
, ctxt
, func
);
2014 #if DISPATCH_COCOA_COMPAT
2017 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
2019 // Blocks submitted to the main queue MUST be run on the main thread,
2020 // therefore under GC we must Block_copy in order to notify the thread-local
2021 // garbage collector that the objects are transferring to the main thread
2022 // rdar://problem/7176237&7181849&7458685
2023 if (dispatch_begin_thread_4GC
) {
2024 dispatch_block_t block
= _dispatch_Block_copy(work
);
2025 return dispatch_sync_f(dq
, block
, _dispatch_call_block_and_release
);
2027 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
2032 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
2034 #if DISPATCH_COCOA_COMPAT
2035 if (slowpath(dq
->dq_is_thread_bound
)) {
2036 return _dispatch_sync_slow(dq
, work
);
2039 dispatch_sync_f(dq
, work
, _dispatch_Block_invoke(work
));
2044 #pragma mark dispatch_after
2047 _dispatch_after_timer_callback(void *ctxt
)
2049 dispatch_continuation_t dc
= ctxt
, dc1
;
2050 dispatch_source_t ds
= dc
->dc_data
;
2051 dc1
= _dispatch_continuation_free_cacheonly(dc
);
2052 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
2053 dispatch_source_cancel(ds
);
2054 dispatch_release(ds
);
2055 if (slowpath(dc1
)) {
2056 _dispatch_continuation_free_to_cache_limit(dc1
);
2062 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
2063 dispatch_function_t func
)
2065 uint64_t delta
, leeway
;
2066 dispatch_source_t ds
;
2068 if (when
== DISPATCH_TIME_FOREVER
) {
2070 DISPATCH_CLIENT_CRASH(
2071 "dispatch_after_f() called with 'when' == infinity");
2076 delta
= _dispatch_timeout(when
);
2078 return dispatch_async_f(queue
, ctxt
, func
);
2080 leeway
= delta
/ 10; // <rdar://problem/13447496>
2081 if (leeway
< NSEC_PER_MSEC
) leeway
= NSEC_PER_MSEC
;
2082 if (leeway
> 60 * NSEC_PER_SEC
) leeway
= 60 * NSEC_PER_SEC
;
2084 // this function can and should be optimized to not use a dispatch source
2085 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, queue
);
2086 dispatch_assert(ds
);
2088 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
2089 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
2094 dispatch_set_context(ds
, dc
);
2095 dispatch_source_set_event_handler_f(ds
, _dispatch_after_timer_callback
);
2096 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, leeway
);
2097 dispatch_resume(ds
);
2102 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
2103 dispatch_block_t work
)
2105 // test before the copy of the block
2106 if (when
== DISPATCH_TIME_FOREVER
) {
2108 DISPATCH_CLIENT_CRASH(
2109 "dispatch_after() called with 'when' == infinity");
2113 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
),
2114 _dispatch_call_block_and_release
);
2119 #pragma mark dispatch_queue_push
2123 _dispatch_queue_push_list_slow2(dispatch_queue_t dq
,
2124 struct dispatch_object_s
*obj
)
2126 // The queue must be retained before dq_items_head is written in order
2127 // to ensure that the reference is still valid when _dispatch_wakeup is
2128 // called. Otherwise, if preempted between the assignment to
2129 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
2130 // queue may release the last reference to the queue when invoked by
2131 // _dispatch_queue_drain. <rdar://problem/6932776>
2132 _dispatch_retain(dq
);
2133 dq
->dq_items_head
= obj
;
2134 _dispatch_wakeup(dq
);
2135 _dispatch_release(dq
);
2140 _dispatch_queue_push_list_slow(dispatch_queue_t dq
,
2141 struct dispatch_object_s
*obj
, unsigned int n
)
2143 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
2144 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
2145 return _dispatch_queue_wakeup_global2(dq
, n
);
2147 _dispatch_queue_push_list_slow2(dq
, obj
);
2152 _dispatch_queue_push_slow(dispatch_queue_t dq
,
2153 struct dispatch_object_s
*obj
)
2155 if (dx_type(dq
) == DISPATCH_QUEUE_ROOT_TYPE
&& !dq
->dq_is_thread_bound
) {
2156 dispatch_atomic_store2o(dq
, dq_items_head
, obj
, relaxed
);
2157 return _dispatch_queue_wakeup_global(dq
);
2159 _dispatch_queue_push_list_slow2(dq
, obj
);
2163 #pragma mark dispatch_queue_probe
2166 _dispatch_queue_probe(dispatch_queue_t dq
)
2168 return (unsigned long)slowpath(dq
->dq_items_tail
!= NULL
);
2171 #if DISPATCH_COCOA_COMPAT
2173 _dispatch_runloop_queue_probe(dispatch_queue_t dq
)
2175 if (_dispatch_queue_probe(dq
)) {
2176 if (dq
->do_xref_cnt
== -1) return true; // <rdar://problem/14026816>
2177 return _dispatch_runloop_queue_wakeup(dq
);
2184 _dispatch_mgr_queue_probe(dispatch_queue_t dq
)
2186 if (_dispatch_queue_probe(dq
)) {
2187 return _dispatch_mgr_wakeup(dq
);
2193 _dispatch_root_queue_probe(dispatch_queue_t dq
)
2195 _dispatch_queue_wakeup_global(dq
);
2200 #pragma mark dispatch_wakeup
2202 // 6618342 Contact the team that owns the Instrument DTrace probe before
2203 // renaming this symbol
2205 _dispatch_wakeup(dispatch_object_t dou
)
2207 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
2210 if (!dx_probe(dou
._do
)) {
2213 if (!dispatch_atomic_cmpxchg2o(dou
._do
, do_suspend_cnt
, 0,
2214 DISPATCH_OBJECT_SUSPEND_LOCK
, release
)) {
2215 #if DISPATCH_COCOA_COMPAT
2216 if (dou
._dq
== &_dispatch_main_q
) {
2217 return _dispatch_main_queue_wakeup();
2222 _dispatch_retain(dou
._do
);
2223 dispatch_queue_t tq
= dou
._do
->do_targetq
;
2224 _dispatch_queue_push(tq
, dou
._do
);
2225 return tq
; // libdispatch does not need this, but the Instrument DTrace
2229 #if DISPATCH_COCOA_COMPAT
2231 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq
)
2233 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
2237 kern_return_t kr
= _dispatch_send_wakeup_runloop_thread(mp
, 0);
2239 case MACH_SEND_TIMEOUT
:
2240 case MACH_SEND_TIMED_OUT
:
2241 case MACH_SEND_INVALID_DEST
:
2244 (void)dispatch_assume_zero(kr
);
2249 DISPATCH_NOINLINE DISPATCH_WEAK
2251 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq
)
2253 _dispatch_runloop_queue_wakeup_thread(dq
);
2258 static dispatch_queue_t
2259 _dispatch_main_queue_wakeup(void)
2261 dispatch_queue_t dq
= &_dispatch_main_q
;
2262 if (!dq
->dq_is_thread_bound
) {
2265 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
2266 _dispatch_runloop_queue_port_init
);
2267 _dispatch_runloop_queue_wakeup_thread(dq
);
2274 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq
, unsigned int n
)
2276 static dispatch_once_t pred
;
2277 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2281 _dispatch_debug_root_queue(dq
, __func__
);
2282 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
2284 #if HAVE_PTHREAD_WORKQUEUES
2285 #if DISPATCH_USE_PTHREAD_POOL
2286 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
2289 _dispatch_root_queue_debug("requesting new worker thread for global "
2291 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2292 if (qc
->dgq_kworkqueue
) {
2293 pthread_workitem_handle_t wh
;
2294 unsigned int gen_cnt
;
2296 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
2297 _dispatch_worker_thread3
, dq
, &wh
, &gen_cnt
);
2298 (void)dispatch_assume_zero(r
);
2302 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2303 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2304 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
2305 qc
->dgq_wq_options
, (int)i
);
2306 (void)dispatch_assume_zero(r
);
2310 #endif // HAVE_PTHREAD_WORKQUEUES
2311 #if DISPATCH_USE_PTHREAD_POOL
2312 if (fastpath(qc
->dgq_thread_mediator
)) {
2313 while (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
2319 uint32_t j
, t_count
= qc
->dgq_thread_pool_size
;
2322 _dispatch_root_queue_debug("pthread pool is full for root queue: "
2326 j
= i
> t_count
? t_count
: i
;
2327 } while (!dispatch_atomic_cmpxchgvw2o(qc
, dgq_thread_pool_size
, t_count
,
2328 t_count
- j
, &t_count
, relaxed
));
2330 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2331 pthread_attr_t
*attr
= pqc
? &pqc
->dpq_thread_attr
: NULL
;
2332 pthread_t tid
, *pthr
= &tid
;
2333 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2334 if (slowpath(dq
== &_dispatch_mgr_root_queue
)) {
2335 pthr
= _dispatch_mgr_root_queue_init();
2339 _dispatch_retain(dq
);
2340 while ((r
= pthread_create(pthr
, attr
, _dispatch_worker_thread
, dq
))) {
2342 (void)dispatch_assume_zero(r
);
2344 _dispatch_temporary_resource_shortage();
2347 r
= pthread_detach(*pthr
);
2348 (void)dispatch_assume_zero(r
);
2351 #endif // DISPATCH_USE_PTHREAD_POOL
2355 _dispatch_queue_wakeup_global2(dispatch_queue_t dq
, unsigned int n
)
2357 if (!dq
->dq_items_tail
) {
2360 #if HAVE_PTHREAD_WORKQUEUES
2361 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2363 #if DISPATCH_USE_PTHREAD_POOL
2364 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
2366 !dispatch_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
, relaxed
)) {
2367 _dispatch_root_queue_debug("worker thread request still pending for "
2368 "global queue: %p", dq
);
2371 #endif // HAVE_PTHREAD_WORKQUEUES
2372 return _dispatch_queue_wakeup_global_slow(dq
, n
);
2376 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
2378 return _dispatch_queue_wakeup_global2(dq
, 1);
2382 #pragma mark dispatch_queue_invoke
2384 DISPATCH_ALWAYS_INLINE
2385 static inline dispatch_queue_t
2386 dispatch_queue_invoke2(dispatch_object_t dou
,
2387 _dispatch_thread_semaphore_t
*sema_ptr
)
2389 dispatch_queue_t dq
= dou
._dq
;
2390 dispatch_queue_t otq
= dq
->do_targetq
;
2391 *sema_ptr
= _dispatch_queue_drain(dq
);
2393 if (slowpath(otq
!= dq
->do_targetq
)) {
2394 // An item on the queue changed the target queue
2395 return dq
->do_targetq
;
2400 // 6618342 Contact the team that owns the Instrument DTrace probe before
2401 // renaming this symbol
2404 _dispatch_queue_invoke(dispatch_queue_t dq
)
2406 _dispatch_queue_class_invoke(dq
, dispatch_queue_invoke2
);
2410 #pragma mark dispatch_queue_drain
2412 DISPATCH_ALWAYS_INLINE
2413 static inline struct dispatch_object_s
*
2414 _dispatch_queue_head(dispatch_queue_t dq
)
2416 struct dispatch_object_s
*dc
;
2417 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2418 dispatch_hardware_pause();
2423 DISPATCH_ALWAYS_INLINE
2424 static inline struct dispatch_object_s
*
2425 _dispatch_queue_next(dispatch_queue_t dq
, struct dispatch_object_s
*dc
)
2427 struct dispatch_object_s
*next_dc
;
2428 next_dc
= fastpath(dc
->do_next
);
2429 dq
->dq_items_head
= next_dc
;
2430 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
,
2432 // Enqueue is TIGHTLY controlled, we won't wait long.
2433 while (!(next_dc
= fastpath(dc
->do_next
))) {
2434 dispatch_hardware_pause();
2436 dq
->dq_items_head
= next_dc
;
2441 _dispatch_thread_semaphore_t
2442 _dispatch_queue_drain(dispatch_object_t dou
)
2444 dispatch_queue_t dq
= dou
._dq
, orig_tq
, old_dq
;
2445 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2446 struct dispatch_object_s
*dc
, *next_dc
;
2447 _dispatch_thread_semaphore_t sema
= 0;
2449 // Continue draining sources after target queue change rdar://8928171
2450 bool check_tq
= (dx_type(dq
) != DISPATCH_SOURCE_KEVENT_TYPE
);
2452 orig_tq
= dq
->do_targetq
;
2454 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2455 //dispatch_debug_queue(dq, __func__);
2457 while (dq
->dq_items_tail
) {
2458 dc
= _dispatch_queue_head(dq
);
2460 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
2463 if (dq
->dq_running
> dq
->dq_width
) {
2466 if (slowpath(orig_tq
!= dq
->do_targetq
) && check_tq
) {
2469 bool redirect
= false;
2470 if (!fastpath(dq
->dq_width
== 1)) {
2471 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
2472 (long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
2473 if (dq
->dq_running
> 1) {
2480 next_dc
= _dispatch_queue_next(dq
, dc
);
2482 _dispatch_continuation_redirect(dq
, dc
);
2485 if ((sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, true))) {
2488 _dispatch_continuation_pop(dc
);
2489 _dispatch_perfmon_workitem_inc();
2490 } while ((dc
= next_dc
));
2494 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2498 #if DISPATCH_COCOA_COMPAT
2500 _dispatch_main_queue_drain(void)
2502 dispatch_queue_t dq
= &_dispatch_main_q
;
2503 if (!dq
->dq_items_tail
) {
2506 struct dispatch_continuation_s marker
= {
2509 struct dispatch_object_s
*dmarker
= (void*)&marker
;
2510 _dispatch_queue_push_notrace(dq
, dmarker
);
2512 _dispatch_perfmon_start();
2513 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2514 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2516 struct dispatch_object_s
*dc
, *next_dc
;
2517 dc
= _dispatch_queue_head(dq
);
2519 next_dc
= _dispatch_queue_next(dq
, dc
);
2520 if (dc
== dmarker
) {
2523 _dispatch_continuation_pop(dc
);
2524 _dispatch_perfmon_workitem_inc();
2525 } while ((dc
= next_dc
));
2526 DISPATCH_CRASH("Main queue corruption");
2530 _dispatch_main_queue_wakeup();
2532 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2533 _dispatch_perfmon_end();
2534 _dispatch_force_cache_cleanup();
2538 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq
)
2540 if (!dq
->dq_items_tail
) {
2543 _dispatch_perfmon_start();
2544 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2545 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2547 struct dispatch_object_s
*dc
, *next_dc
;
2548 dc
= _dispatch_queue_head(dq
);
2549 next_dc
= _dispatch_queue_next(dq
, dc
);
2550 _dispatch_continuation_pop(dc
);
2551 _dispatch_perfmon_workitem_inc();
2553 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2554 _dispatch_perfmon_end();
2555 _dispatch_force_cache_cleanup();
2560 DISPATCH_ALWAYS_INLINE_NDEBUG
2561 static inline _dispatch_thread_semaphore_t
2562 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
)
2564 // rdar://problem/8290662 "lock transfer"
2565 struct dispatch_object_s
*dc
;
2566 _dispatch_thread_semaphore_t sema
;
2568 // queue is locked, or suspended and not being drained
2569 dc
= dq
->dq_items_head
;
2570 if (slowpath(!dc
) || !(sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, false))){
2573 // dequeue dc, it is a barrier sync
2574 (void)_dispatch_queue_next(dq
, dc
);
2579 _dispatch_mgr_queue_drain(void)
2581 dispatch_queue_t dq
= &_dispatch_mgr_q
;
2582 if (!dq
->dq_items_tail
) {
2583 return _dispatch_force_cache_cleanup();
2585 _dispatch_perfmon_start();
2586 if (slowpath(_dispatch_queue_drain(dq
))) {
2587 DISPATCH_CRASH("Sync onto manager queue");
2589 _dispatch_perfmon_end();
2590 _dispatch_force_cache_cleanup();
2594 #pragma mark dispatch_root_queue_drain
2596 #ifndef DISPATCH_CONTENTION_USE_RAND
2597 #define DISPATCH_CONTENTION_USE_RAND (!TARGET_OS_EMBEDDED)
2599 #ifndef DISPATCH_CONTENTION_SPINS_MAX
2600 #define DISPATCH_CONTENTION_SPINS_MAX (128 - 1)
2602 #ifndef DISPATCH_CONTENTION_SPINS_MIN
2603 #define DISPATCH_CONTENTION_SPINS_MIN (32 - 1)
2605 #ifndef DISPATCH_CONTENTION_USLEEP_START
2606 #define DISPATCH_CONTENTION_USLEEP_START 500
2608 #ifndef DISPATCH_CONTENTION_USLEEP_MAX
2609 #define DISPATCH_CONTENTION_USLEEP_MAX 100000
2614 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq
)
2616 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2617 struct dispatch_object_s
*const mediator
= (void *)~0ul;
2618 bool pending
= false, available
= true;
2619 unsigned int spins
, sleep_time
= DISPATCH_CONTENTION_USLEEP_START
;
2622 // Spin for a short while in case the contention is temporary -- e.g.
2623 // when starting up after dispatch_apply, or when executing a few
2624 // short continuations in a row.
2625 #if DISPATCH_CONTENTION_USE_RAND
2626 // Use randomness to prevent threads from resonating at the same
2627 // frequency and permanently contending. All threads sharing the same
2628 // seed value is safe with the FreeBSD rand_r implementation.
2629 static unsigned int seed
;
2630 spins
= (rand_r(&seed
) & DISPATCH_CONTENTION_SPINS_MAX
) |
2631 DISPATCH_CONTENTION_SPINS_MIN
;
2633 spins
= DISPATCH_CONTENTION_SPINS_MIN
+
2634 (DISPATCH_CONTENTION_SPINS_MAX
-DISPATCH_CONTENTION_SPINS_MIN
)/2;
2637 dispatch_hardware_pause();
2638 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
2640 // Since we have serious contention, we need to back off.
2642 // Mark this queue as pending to avoid requests for further threads
2643 (void)dispatch_atomic_inc2o(qc
, dgq_pending
, relaxed
);
2646 _dispatch_contention_usleep(sleep_time
);
2647 if (fastpath(dq
->dq_items_head
!= mediator
)) goto out
;
2649 } while (sleep_time
< DISPATCH_CONTENTION_USLEEP_MAX
);
2651 // The ratio of work to libdispatch overhead must be bad. This
2652 // scenario implies that there are too many threads in the pool.
2653 // Create a new pending thread and then exit this thread.
2654 // The kernel will grant a new thread when the load subsides.
2655 _dispatch_debug("contention on global queue: %p", dq
);
2656 _dispatch_queue_wakeup_global(dq
);
2660 (void)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
2665 DISPATCH_ALWAYS_INLINE_NDEBUG
2666 static inline struct dispatch_object_s
*
2667 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
2669 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
2672 // The mediator value acts both as a "lock" and a signal
2673 head
= dispatch_atomic_xchg2o(dq
, dq_items_head
, mediator
, relaxed
);
2675 if (slowpath(head
== NULL
)) {
2676 // The first xchg on the tail will tell the enqueueing thread that it
2677 // is safe to blindly write out to the head pointer. A cmpxchg honors
2679 (void)dispatch_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
, NULL
,
2681 _dispatch_root_queue_debug("no work on global queue: %p", dq
);
2685 if (slowpath(head
== mediator
)) {
2686 // This thread lost the race for ownership of the queue.
2687 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq
))) {
2693 // Restore the head pointer to a sane value before returning.
2694 // If 'next' is NULL, then this item _might_ be the last item.
2695 next
= fastpath(head
->do_next
);
2697 if (slowpath(!next
)) {
2698 dispatch_atomic_store2o(dq
, dq_items_head
, NULL
, relaxed
);
2700 if (dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
, relaxed
)) {
2701 // both head and tail are NULL now
2705 // There must be a next item now. This thread won't wait long.
2706 while (!(next
= head
->do_next
)) {
2707 dispatch_hardware_pause();
2711 dispatch_atomic_store2o(dq
, dq_items_head
, next
, relaxed
);
2712 _dispatch_queue_wakeup_global(dq
);
2718 _dispatch_root_queue_drain(dispatch_queue_t dq
)
2721 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
2722 DISPATCH_CRASH("Premature thread recycling");
2725 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2727 #if DISPATCH_COCOA_COMPAT
2728 // ensure that high-level memory management techniques do not leak/crash
2729 if (dispatch_begin_thread_4GC
) {
2730 dispatch_begin_thread_4GC();
2732 void *pool
= _dispatch_autorelease_pool_push();
2733 #endif // DISPATCH_COCOA_COMPAT
2735 _dispatch_perfmon_start();
2736 struct dispatch_object_s
*item
;
2737 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
2738 _dispatch_continuation_pop(item
);
2740 _dispatch_perfmon_end();
2742 #if DISPATCH_COCOA_COMPAT
2743 _dispatch_autorelease_pool_pop(pool
);
2744 if (dispatch_end_thread_4GC
) {
2745 dispatch_end_thread_4GC();
2747 #endif // DISPATCH_COCOA_COMPAT
2749 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
2753 #pragma mark dispatch_worker_thread
2755 #if HAVE_PTHREAD_WORKQUEUES
2757 _dispatch_worker_thread3(void *context
)
2759 dispatch_queue_t dq
= context
;
2760 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2762 _dispatch_introspection_thread_add();
2764 (void)dispatch_atomic_dec2o(qc
, dgq_pending
, relaxed
);
2765 _dispatch_root_queue_drain(dq
);
2766 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
2770 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2771 // 6618342 Contact the team that owns the Instrument DTrace probe before
2772 // renaming this symbol
2774 _dispatch_worker_thread2(int priority
, int options
,
2775 void *context DISPATCH_UNUSED
)
2777 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
2778 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
2779 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
2781 return _dispatch_worker_thread3(dq
);
2783 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2784 #endif // HAVE_PTHREAD_WORKQUEUES
2786 #if DISPATCH_USE_PTHREAD_POOL
2787 // 6618342 Contact the team that owns the Instrument DTrace probe before
2788 // renaming this symbol
2790 _dispatch_worker_thread(void *context
)
2792 dispatch_queue_t dq
= context
;
2793 dispatch_root_queue_context_t qc
= dq
->do_ctxt
;
2794 dispatch_pthread_root_queue_context_t pqc
= qc
->dgq_ctxt
;
2796 if (pqc
&& pqc
->dpq_thread_configure
) {
2797 pqc
->dpq_thread_configure();
2802 // workaround tweaks the kernel workqueue does for us
2803 r
= sigfillset(&mask
);
2804 (void)dispatch_assume_zero(r
);
2805 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
2806 (void)dispatch_assume_zero(r
);
2807 _dispatch_introspection_thread_add();
2809 // Non-pthread-root-queue pthreads use a 65 second timeout in case there
2810 // are any timers that run once a minute <rdar://problem/11744973>
2811 const int64_t timeout
= (pqc
? 5ull : 65ull) * NSEC_PER_SEC
;
2814 _dispatch_root_queue_drain(dq
);
2815 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
,
2816 dispatch_time(0, timeout
)) == 0);
2818 (void)dispatch_atomic_inc2o(qc
, dgq_thread_pool_size
, relaxed
);
2819 _dispatch_queue_wakeup_global(dq
);
2820 _dispatch_release(dq
);
2826 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
2830 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2832 r
= sigdelset(set
, SIGILL
);
2833 (void)dispatch_assume_zero(r
);
2834 r
= sigdelset(set
, SIGTRAP
);
2835 (void)dispatch_assume_zero(r
);
2836 #if HAVE_DECL_SIGEMT
2837 r
= sigdelset(set
, SIGEMT
);
2838 (void)dispatch_assume_zero(r
);
2840 r
= sigdelset(set
, SIGFPE
);
2841 (void)dispatch_assume_zero(r
);
2842 r
= sigdelset(set
, SIGBUS
);
2843 (void)dispatch_assume_zero(r
);
2844 r
= sigdelset(set
, SIGSEGV
);
2845 (void)dispatch_assume_zero(r
);
2846 r
= sigdelset(set
, SIGSYS
);
2847 (void)dispatch_assume_zero(r
);
2848 r
= sigdelset(set
, SIGPIPE
);
2849 (void)dispatch_assume_zero(r
);
2851 return pthread_sigmask(how
, set
, oset
);
2853 #endif // DISPATCH_USE_PTHREAD_POOL
2856 #pragma mark dispatch_runloop_queue
2858 static bool _dispatch_program_is_probably_callback_driven
;
2860 #if DISPATCH_COCOA_COMPAT
2863 _dispatch_runloop_root_queue_create_4CF(const char *label
, unsigned long flags
)
2865 dispatch_queue_t dq
;
2868 if (slowpath(flags
)) {
2871 dqs
= sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
;
2872 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue_runloop
), dqs
);
2873 _dispatch_queue_init(dq
);
2874 dq
->do_targetq
= _dispatch_get_root_queue(0, true);
2875 dq
->dq_label
= label
? label
: "runloop-queue"; // no-copy contract
2876 dq
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
;
2878 dq
->dq_is_thread_bound
= 1;
2879 _dispatch_runloop_queue_port_init(dq
);
2880 _dispatch_queue_set_bound_thread(dq
);
2881 _dispatch_object_debug(dq
, "%s", __func__
);
2882 return _dispatch_introspection_queue_create(dq
);
2886 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq
)
2888 _dispatch_object_debug(dq
, "%s", __func__
);
2889 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
2890 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
2891 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
2892 _dispatch_queue_clear_bound_thread(dq
);
2893 if (suspend_cnt
== 0) {
2894 _dispatch_wakeup(dq
);
2899 _dispatch_runloop_queue_dispose(dispatch_queue_t dq
)
2901 _dispatch_object_debug(dq
, "%s", __func__
);
2902 _dispatch_introspection_queue_dispose(dq
);
2903 _dispatch_runloop_queue_port_dispose(dq
);
2904 _dispatch_queue_destroy(dq
);
2908 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq
)
2910 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2911 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2913 dispatch_retain(dq
);
2914 bool r
= _dispatch_runloop_queue_drain_one(dq
);
2915 dispatch_release(dq
);
2920 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq
)
2922 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2923 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2925 _dispatch_runloop_queue_probe(dq
);
2929 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq
)
2931 if (slowpath(dq
->do_vtable
!= DISPATCH_VTABLE(queue_runloop
))) {
2932 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2934 return (mach_port_t
)dq
->do_ctxt
;
2938 _dispatch_runloop_queue_port_init(void *ctxt
)
2940 dispatch_queue_t dq
= (dispatch_queue_t
)ctxt
;
2944 _dispatch_safe_fork
= false;
2945 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &mp
);
2946 DISPATCH_VERIFY_MIG(kr
);
2947 (void)dispatch_assume_zero(kr
);
2948 kr
= mach_port_insert_right(mach_task_self(), mp
, mp
,
2949 MACH_MSG_TYPE_MAKE_SEND
);
2950 DISPATCH_VERIFY_MIG(kr
);
2951 (void)dispatch_assume_zero(kr
);
2952 if (dq
!= &_dispatch_main_q
) {
2953 struct mach_port_limits limits
= {
2956 kr
= mach_port_set_attributes(mach_task_self(), mp
,
2957 MACH_PORT_LIMITS_INFO
, (mach_port_info_t
)&limits
,
2959 DISPATCH_VERIFY_MIG(kr
);
2960 (void)dispatch_assume_zero(kr
);
2962 dq
->do_ctxt
= (void*)(uintptr_t)mp
;
2964 _dispatch_program_is_probably_callback_driven
= true;
2968 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq
)
2970 mach_port_t mp
= (mach_port_t
)dq
->do_ctxt
;
2975 kern_return_t kr
= mach_port_deallocate(mach_task_self(), mp
);
2976 DISPATCH_VERIFY_MIG(kr
);
2977 (void)dispatch_assume_zero(kr
);
2978 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
2979 DISPATCH_VERIFY_MIG(kr
);
2980 (void)dispatch_assume_zero(kr
);
2984 #pragma mark dispatch_main_queue
2987 _dispatch_get_main_queue_port_4CF(void)
2989 dispatch_queue_t dq
= &_dispatch_main_q
;
2990 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
2991 _dispatch_runloop_queue_port_init
);
2992 return (mach_port_t
)dq
->do_ctxt
;
2995 static bool main_q_is_draining
;
2997 // 6618342 Contact the team that owns the Instrument DTrace probe before
2998 // renaming this symbol
3001 _dispatch_queue_set_mainq_drain_state(bool arg
)
3003 main_q_is_draining
= arg
;
3007 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg DISPATCH_UNUSED
)
3009 if (main_q_is_draining
) {
3012 _dispatch_queue_set_mainq_drain_state(true);
3013 _dispatch_main_queue_drain();
3014 _dispatch_queue_set_mainq_drain_state(false);
3022 #if HAVE_PTHREAD_MAIN_NP
3023 if (pthread_main_np()) {
3025 _dispatch_object_debug(&_dispatch_main_q
, "%s", __func__
);
3026 _dispatch_program_is_probably_callback_driven
= true;
3028 DISPATCH_CRASH("pthread_exit() returned");
3029 #if HAVE_PTHREAD_MAIN_NP
3031 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
3035 DISPATCH_NOINLINE DISPATCH_NORETURN
3037 _dispatch_sigsuspend(void)
3039 static const sigset_t mask
;
3048 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
3050 // never returns, so burn bridges behind us
3051 _dispatch_clear_stack(0);
3052 _dispatch_sigsuspend();
3057 _dispatch_queue_cleanup2(void)
3059 dispatch_queue_t dq
= &_dispatch_main_q
;
3060 (void)dispatch_atomic_dec2o(dq
, dq_running
, relaxed
);
3061 unsigned int suspend_cnt
= dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
3062 DISPATCH_OBJECT_SUSPEND_LOCK
, release
);
3063 dq
->dq_is_thread_bound
= 0;
3064 if (suspend_cnt
== 0) {
3065 _dispatch_wakeup(dq
);
3068 // overload the "probably" variable to mean that dispatch_main() or
3069 // similar non-POSIX API was called
3070 // this has to run before the DISPATCH_COCOA_COMPAT below
3071 if (_dispatch_program_is_probably_callback_driven
) {
3072 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL
,
3073 _dispatch_sig_thread
);
3074 sleep(1); // workaround 6778970
3077 #if DISPATCH_COCOA_COMPAT
3078 dispatch_once_f(&_dispatch_main_q_port_pred
, dq
,
3079 _dispatch_runloop_queue_port_init
);
3080 _dispatch_runloop_queue_port_dispose(dq
);
3085 _dispatch_queue_cleanup(void *ctxt
)
3087 if (ctxt
== &_dispatch_main_q
) {
3088 return _dispatch_queue_cleanup2();
3090 // POSIX defines that destructors are only called if 'ctxt' is non-null
3091 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");