2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
30 #if DISPATCH_ENABLE_THREAD_POOL && !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
31 #define pthread_workqueue_t void*
34 static void _dispatch_cache_cleanup(void *value
);
35 static void _dispatch_async_f_redirect(dispatch_queue_t dq
,
36 dispatch_continuation_t dc
);
37 static void _dispatch_queue_cleanup(void *ctxt
);
38 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq
,
40 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
41 static _dispatch_thread_semaphore_t
_dispatch_queue_drain(dispatch_queue_t dq
);
42 static inline _dispatch_thread_semaphore_t
43 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
);
44 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
45 static void _dispatch_worker_thread3(void *context
);
47 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
48 static void _dispatch_worker_thread2(int priority
, int options
, void *context
);
50 #if DISPATCH_ENABLE_THREAD_POOL
51 static void *_dispatch_worker_thread(void *context
);
52 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
55 #if DISPATCH_COCOA_COMPAT
56 static unsigned int _dispatch_worker_threads
;
57 static dispatch_once_t _dispatch_main_q_port_pred
;
58 static mach_port_t main_q_port
;
60 static void _dispatch_main_q_port_init(void *ctxt
);
61 static dispatch_queue_t
_dispatch_queue_wakeup_main(void);
62 static void _dispatch_main_queue_drain(void);
66 #pragma mark dispatch_root_queue
68 #if DISPATCH_ENABLE_THREAD_POOL
69 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
70 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
71 .do_vtable
= DISPATCH_VTABLE(semaphore
),
72 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
73 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
76 .do_vtable
= DISPATCH_VTABLE(semaphore
),
77 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
78 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
80 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
81 .do_vtable
= DISPATCH_VTABLE(semaphore
),
82 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
83 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
86 .do_vtable
= DISPATCH_VTABLE(semaphore
),
87 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
88 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
90 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
91 .do_vtable
= DISPATCH_VTABLE(semaphore
),
92 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
93 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
96 .do_vtable
= DISPATCH_VTABLE(semaphore
),
97 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
98 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
100 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
101 .do_vtable
= DISPATCH_VTABLE(semaphore
),
102 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
103 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
106 .do_vtable
= DISPATCH_VTABLE(semaphore
),
107 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
108 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
113 #define MAX_THREAD_COUNT 255
115 struct dispatch_root_queue_context_s
{
118 unsigned int volatile dgq_pending
;
119 #if HAVE_PTHREAD_WORKQUEUES
120 int dgq_wq_priority
, dgq_wq_options
;
121 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
122 pthread_workqueue_t dgq_kworkqueue
;
124 #endif // HAVE_PTHREAD_WORKQUEUES
125 #if DISPATCH_ENABLE_THREAD_POOL
126 dispatch_semaphore_t dgq_thread_mediator
;
127 uint32_t dgq_thread_pool_size
;
130 char _dgq_pad
[DISPATCH_CACHELINE_SIZE
];
134 DISPATCH_CACHELINE_ALIGN
135 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
136 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {{{
137 #if HAVE_PTHREAD_WORKQUEUES
138 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
141 #if DISPATCH_ENABLE_THREAD_POOL
142 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
143 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
144 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
147 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {{{
148 #if HAVE_PTHREAD_WORKQUEUES
149 .dgq_wq_priority
= WORKQ_LOW_PRIOQUEUE
,
150 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
152 #if DISPATCH_ENABLE_THREAD_POOL
153 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
154 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
155 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
158 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {{{
159 #if HAVE_PTHREAD_WORKQUEUES
160 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
163 #if DISPATCH_ENABLE_THREAD_POOL
164 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
165 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
166 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
169 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {{{
170 #if HAVE_PTHREAD_WORKQUEUES
171 .dgq_wq_priority
= WORKQ_DEFAULT_PRIOQUEUE
,
172 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
174 #if DISPATCH_ENABLE_THREAD_POOL
175 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
176 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
177 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
180 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {{{
181 #if HAVE_PTHREAD_WORKQUEUES
182 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
185 #if DISPATCH_ENABLE_THREAD_POOL
186 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
187 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
188 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
191 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {{{
192 #if HAVE_PTHREAD_WORKQUEUES
193 .dgq_wq_priority
= WORKQ_HIGH_PRIOQUEUE
,
194 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
196 #if DISPATCH_ENABLE_THREAD_POOL
197 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
198 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
199 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
202 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {{{
203 #if HAVE_PTHREAD_WORKQUEUES
204 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
207 #if DISPATCH_ENABLE_THREAD_POOL
208 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
209 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
210 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority
= WORKQ_BG_PRIOQUEUE
,
216 .dgq_wq_options
= WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
,
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
221 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
226 // 6618342 Contact the team that owns the Instrument DTrace probe before
227 // renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 DISPATCH_CACHELINE_ALIGN
230 struct dispatch_queue_s _dispatch_root_queues
[] = {
231 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
232 .do_vtable
= DISPATCH_VTABLE(queue_root
),
233 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
234 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
235 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
236 .do_ctxt
= &_dispatch_root_queue_contexts
[
237 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
238 .dq_label
= "com.apple.root.low-priority",
240 .dq_width
= UINT32_MAX
,
243 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
244 .do_vtable
= DISPATCH_VTABLE(queue_root
),
245 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
246 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
247 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
248 .do_ctxt
= &_dispatch_root_queue_contexts
[
249 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
250 .dq_label
= "com.apple.root.low-overcommit-priority",
252 .dq_width
= UINT32_MAX
,
255 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
256 .do_vtable
= DISPATCH_VTABLE(queue_root
),
257 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
258 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
259 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
260 .do_ctxt
= &_dispatch_root_queue_contexts
[
261 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
262 .dq_label
= "com.apple.root.default-priority",
264 .dq_width
= UINT32_MAX
,
267 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
268 .do_vtable
= DISPATCH_VTABLE(queue_root
),
269 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
270 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
271 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
272 .do_ctxt
= &_dispatch_root_queue_contexts
[
273 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
274 .dq_label
= "com.apple.root.default-overcommit-priority",
276 .dq_width
= UINT32_MAX
,
279 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
280 .do_vtable
= DISPATCH_VTABLE(queue_root
),
281 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
282 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
283 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
284 .do_ctxt
= &_dispatch_root_queue_contexts
[
285 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
286 .dq_label
= "com.apple.root.high-priority",
288 .dq_width
= UINT32_MAX
,
291 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
292 .do_vtable
= DISPATCH_VTABLE(queue_root
),
293 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
294 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
295 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
296 .do_ctxt
= &_dispatch_root_queue_contexts
[
297 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
298 .dq_label
= "com.apple.root.high-overcommit-priority",
300 .dq_width
= UINT32_MAX
,
303 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
304 .do_vtable
= DISPATCH_VTABLE(queue_root
),
305 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
306 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
307 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
308 .do_ctxt
= &_dispatch_root_queue_contexts
[
309 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
310 .dq_label
= "com.apple.root.background-priority",
312 .dq_width
= UINT32_MAX
,
315 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
316 .do_vtable
= DISPATCH_VTABLE(queue_root
),
317 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
318 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
319 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
320 .do_ctxt
= &_dispatch_root_queue_contexts
[
321 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
322 .dq_label
= "com.apple.root.background-overcommit-priority",
324 .dq_width
= UINT32_MAX
,
329 #if HAVE_PTHREAD_WORKQUEUES
330 static const dispatch_queue_t _dispatch_wq2root_queues
[][2] = {
331 [WORKQ_LOW_PRIOQUEUE
][0] = &_dispatch_root_queues
[
332 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
333 [WORKQ_LOW_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
334 &_dispatch_root_queues
[
335 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
336 [WORKQ_DEFAULT_PRIOQUEUE
][0] = &_dispatch_root_queues
[
337 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
338 [WORKQ_DEFAULT_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
339 &_dispatch_root_queues
[
340 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
341 [WORKQ_HIGH_PRIOQUEUE
][0] = &_dispatch_root_queues
[
342 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
343 [WORKQ_HIGH_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
344 &_dispatch_root_queues
[
345 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
346 [WORKQ_BG_PRIOQUEUE
][0] = &_dispatch_root_queues
[
347 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
348 [WORKQ_BG_PRIOQUEUE
][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
] =
349 &_dispatch_root_queues
[
350 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
352 #endif // HAVE_PTHREAD_WORKQUEUES
354 // 6618342 Contact the team that owns the Instrument DTrace probe before
355 // renaming this symbol
356 DISPATCH_CACHELINE_ALIGN
357 struct dispatch_queue_s _dispatch_mgr_q
= {
358 .do_vtable
= DISPATCH_VTABLE(queue_mgr
),
359 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
360 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
361 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
362 .do_targetq
= &_dispatch_root_queues
[
363 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
364 .dq_label
= "com.apple.libdispatch-manager",
370 dispatch_get_global_queue(long priority
, unsigned long flags
)
372 if (flags
& ~DISPATCH_QUEUE_OVERCOMMIT
) {
375 return _dispatch_get_root_queue(priority
,
376 flags
& DISPATCH_QUEUE_OVERCOMMIT
);
380 dispatch_get_current_queue(void)
382 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
386 #pragma mark dispatch_init
389 _dispatch_hw_config_init(void)
391 _dispatch_hw_config
.cc_max_active
= _dispatch_get_activecpu();
392 _dispatch_hw_config
.cc_max_logical
= _dispatch_get_logicalcpu_max();
393 _dispatch_hw_config
.cc_max_physical
= _dispatch_get_physicalcpu_max();
397 _dispatch_root_queues_init_workq(void)
400 #if HAVE_PTHREAD_WORKQUEUES
401 bool disable_wq
= false;
402 #if DISPATCH_ENABLE_THREAD_POOL
403 disable_wq
= slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
406 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
408 r
= pthread_workqueue_setdispatch_np(_dispatch_worker_thread2
);
409 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
410 (void)dispatch_assume_zero(r
);
414 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
415 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
417 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
418 pthread_workqueue_attr_t pwq_attr
;
420 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
421 (void)dispatch_assume_zero(r
);
425 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
426 pthread_workqueue_t pwq
= NULL
;
427 struct dispatch_root_queue_context_s
*qc
=
428 &_dispatch_root_queue_contexts
[i
];
429 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
431 #if DISPATCH_NO_BG_PRIORITY
432 && (qc
->dgq_wq_priority
!= WORKQ_BG_PRIOQUEUE
)
435 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
,
436 qc
->dgq_wq_priority
);
437 (void)dispatch_assume_zero(r
);
438 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
,
439 qc
->dgq_wq_options
& WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
);
440 (void)dispatch_assume_zero(r
);
441 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
442 (void)dispatch_assume_zero(r
);
443 result
= result
|| dispatch_assume(pwq
);
446 qc
->dgq_kworkqueue
= pwq
? pwq
: (void*)(~0ul);
448 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
450 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
451 (void)dispatch_assume_zero(r
);
455 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
456 #endif // HAVE_PTHREAD_WORKQUEUES
461 _dispatch_root_queues_init_thread_pool(void)
463 #if DISPATCH_ENABLE_THREAD_POOL
465 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
466 #if TARGET_OS_EMBEDDED
467 // some software hangs if the non-overcommitting queues do not
468 // overcommit when threads block. Someday, this behavior should apply
471 _dispatch_root_queue_contexts
[i
].dgq_thread_pool_size
=
472 _dispatch_hw_config
.cc_max_active
;
476 // override the default FIFO behavior for the pool semaphores
477 kern_return_t kr
= semaphore_create(mach_task_self(),
478 &_dispatch_thread_mediator
[i
].dsema_port
, SYNC_POLICY_LIFO
, 0);
479 DISPATCH_VERIFY_MIG(kr
);
480 (void)dispatch_assume_zero(kr
);
481 (void)dispatch_assume(_dispatch_thread_mediator
[i
].dsema_port
);
483 /* XXXRW: POSIX semaphores don't support LIFO? */
484 int ret
= sem_init(&_dispatch_thread_mediator
[i
].dsema_sem
, 0, 0);
485 (void)dispatch_assume_zero(ret
);
489 DISPATCH_CRASH("Thread pool creation failed");
490 #endif // DISPATCH_ENABLE_THREAD_POOL
494 _dispatch_root_queues_init(void *context DISPATCH_UNUSED
)
496 _dispatch_safe_fork
= false;
497 if (!_dispatch_root_queues_init_workq()) {
498 _dispatch_root_queues_init_thread_pool();
503 #define countof(x) (sizeof(x) / sizeof(x[0]))
505 DISPATCH_EXPORT DISPATCH_NOTHROW
507 libdispatch_init(void)
509 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 4);
510 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 8);
512 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
513 -DISPATCH_QUEUE_PRIORITY_HIGH
);
514 dispatch_assert(countof(_dispatch_root_queues
) ==
515 DISPATCH_ROOT_QUEUE_COUNT
);
516 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
517 DISPATCH_ROOT_QUEUE_COUNT
);
518 #if HAVE_PTHREAD_WORKQUEUES
519 dispatch_assert(sizeof(_dispatch_wq2root_queues
) /
520 sizeof(_dispatch_wq2root_queues
[0][0]) ==
521 DISPATCH_ROOT_QUEUE_COUNT
);
523 #if DISPATCH_ENABLE_THREAD_POOL
524 dispatch_assert(countof(_dispatch_thread_mediator
) ==
525 DISPATCH_ROOT_QUEUE_COUNT
);
528 dispatch_assert(sizeof(struct dispatch_apply_s
) <=
529 ROUND_UP_TO_CACHELINE_SIZE(sizeof(
530 struct dispatch_continuation_s
)));
531 dispatch_assert(sizeof(struct dispatch_source_s
) ==
532 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
533 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
535 dispatch_assert(sizeof(struct dispatch_root_queue_context_s
) %
536 DISPATCH_CACHELINE_SIZE
== 0);
538 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
539 _dispatch_thread_key_create(&dispatch_sema4_key
,
540 (void (*)(void *))_dispatch_thread_semaphore_dispose
);
541 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
542 _dispatch_thread_key_create(&dispatch_io_key
, NULL
);
543 _dispatch_thread_key_create(&dispatch_apply_key
, NULL
);
544 #if DISPATCH_PERF_MON
545 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
548 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
549 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
550 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
];
553 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
555 #if DISPATCH_USE_PTHREAD_ATFORK
556 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
557 dispatch_atfork_parent
, dispatch_atfork_child
));
560 _dispatch_hw_config_init();
561 _dispatch_vtable_init();
565 DISPATCH_EXPORT DISPATCH_NOTHROW
567 dispatch_atfork_child(void)
569 void *crash
= (void *)0x100;
572 if (_dispatch_safe_fork
) {
576 _dispatch_main_q
.dq_items_head
= crash
;
577 _dispatch_main_q
.dq_items_tail
= crash
;
579 _dispatch_mgr_q
.dq_items_head
= crash
;
580 _dispatch_mgr_q
.dq_items_tail
= crash
;
582 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
583 _dispatch_root_queues
[i
].dq_items_head
= crash
;
584 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
589 #pragma mark dispatch_queue_t
595 // 4,5,6,7,8,9,10,11 - global queues
596 // we use 'xadd' on Intel, so the initial value == next assigned
597 unsigned long _dispatch_queue_serial_numbers
= 12;
600 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
609 label_len
= strlen(label
);
610 if (label_len
< (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1)) {
611 label_len
= (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1);
614 // XXX switch to malloc()
615 dq
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
616 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_MIN_LABEL_SIZE
-
617 DISPATCH_QUEUE_CACHELINE_PAD
+ label_len
+ 1);
619 _dispatch_queue_init(dq
);
620 strcpy(dq
->dq_label
, label
);
622 if (fastpath(!attr
)) {
625 if (fastpath(attr
== DISPATCH_QUEUE_CONCURRENT
)) {
626 dq
->dq_width
= UINT32_MAX
;
627 dq
->do_targetq
= _dispatch_get_root_queue(0, false);
629 dispatch_debug_assert(!attr
, "Invalid attribute");
634 // 6618342 Contact the team that owns the Instrument DTrace probe before
635 // renaming this symbol
637 _dispatch_queue_dispose(dispatch_queue_t dq
)
639 if (slowpath(dq
== _dispatch_queue_get_current())) {
640 DISPATCH_CRASH("Release of a queue by itself");
642 if (slowpath(dq
->dq_items_tail
)) {
643 DISPATCH_CRASH("Release of a queue while items are enqueued");
646 // trash the tail queue so that use after free will crash
647 dq
->dq_items_tail
= (void *)0x200;
649 dispatch_queue_t dqsq
= dispatch_atomic_xchg2o(dq
, dq_specific_q
,
652 _dispatch_release(dqsq
);
657 dispatch_queue_get_label(dispatch_queue_t dq
)
663 _dispatch_queue_set_width2(void *ctxt
)
665 int w
= (int)(intptr_t)ctxt
; // intentional truncation
667 dispatch_queue_t dq
= _dispatch_queue_get_current();
669 if (w
== 1 || w
== 0) {
676 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
677 tmp
= _dispatch_hw_config
.cc_max_physical
;
679 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
680 tmp
= _dispatch_hw_config
.cc_max_active
;
684 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
685 tmp
= _dispatch_hw_config
.cc_max_logical
;
688 // multiply by two since the running count is inc/dec by two
689 // (the low bit == barrier)
690 dq
->dq_width
= tmp
* 2;
694 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
696 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
699 dispatch_barrier_async_f(dq
, (void*)(intptr_t)width
,
700 _dispatch_queue_set_width2
);
703 // 6618342 Contact the team that owns the Instrument DTrace probe before
704 // renaming this symbol
706 _dispatch_set_target_queue2(void *ctxt
)
708 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
710 prev_dq
= dq
->do_targetq
;
711 dq
->do_targetq
= ctxt
;
712 _dispatch_release(prev_dq
);
716 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
718 dispatch_queue_t prev_dq
;
721 if (slowpath(dou
._do
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
724 type
= dx_type(dou
._do
) & _DISPATCH_META_TYPE_MASK
;
726 bool is_concurrent_q
= (type
== _DISPATCH_QUEUE_TYPE
&&
727 slowpath(dou
._dq
->dq_width
> 1));
728 dq
= _dispatch_get_root_queue(0, !is_concurrent_q
);
730 // TODO: put into the vtable
732 case _DISPATCH_QUEUE_TYPE
:
733 case _DISPATCH_SOURCE_TYPE
:
734 _dispatch_retain(dq
);
735 return dispatch_barrier_async_f(dou
._dq
, dq
,
736 _dispatch_set_target_queue2
);
737 case _DISPATCH_IO_TYPE
:
738 return _dispatch_io_set_target_queue(dou
._dchannel
, dq
);
740 _dispatch_retain(dq
);
741 dispatch_atomic_store_barrier();
742 prev_dq
= dispatch_atomic_xchg2o(dou
._do
, do_targetq
, dq
);
743 if (prev_dq
) _dispatch_release(prev_dq
);
749 dispatch_set_current_target_queue(dispatch_queue_t dq
)
751 dispatch_queue_t queue
= _dispatch_queue_get_current();
753 if (slowpath(!queue
)) {
754 DISPATCH_CLIENT_CRASH("SPI not called from a queue");
756 if (slowpath(queue
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
757 DISPATCH_CLIENT_CRASH("SPI not supported on this queue");
759 if (slowpath(queue
->dq_width
!= 1)) {
760 DISPATCH_CLIENT_CRASH("SPI not called from a serial queue");
763 dq
= _dispatch_get_root_queue(0, true);
765 _dispatch_retain(dq
);
766 _dispatch_set_target_queue2(dq
);
770 #pragma mark dispatch_queue_specific
772 struct dispatch_queue_specific_queue_s
{
773 DISPATCH_STRUCT_HEADER(queue_specific_queue
);
774 DISPATCH_QUEUE_HEADER
;
776 char _dqsq_pad
[DISPATCH_QUEUE_MIN_LABEL_SIZE
];
779 TAILQ_HEAD(dispatch_queue_specific_head_s
,
780 dispatch_queue_specific_s
) dqsq_contexts
;
785 struct dispatch_queue_specific_s
{
788 dispatch_function_t dqs_destructor
;
789 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
791 DISPATCH_DECL(dispatch_queue_specific
);
794 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
796 dispatch_queue_specific_t dqs
, tmp
;
798 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
799 if (dqs
->dqs_destructor
) {
800 dispatch_async_f(_dispatch_get_root_queue(
801 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
802 dqs
->dqs_destructor
);
806 _dispatch_queue_dispose((dispatch_queue_t
)dqsq
);
810 _dispatch_queue_init_specific(dispatch_queue_t dq
)
812 dispatch_queue_specific_queue_t dqsq
;
814 dqsq
= _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue
),
815 sizeof(struct dispatch_queue_specific_queue_s
));
816 _dispatch_queue_init((dispatch_queue_t
)dqsq
);
817 dqsq
->do_xref_cnt
= -1;
818 dqsq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH
,
820 dqsq
->dq_width
= UINT32_MAX
;
821 strlcpy(dqsq
->dq_label
, "queue-specific", sizeof(dqsq
->dq_label
));
822 TAILQ_INIT(&dqsq
->dqsq_contexts
);
823 dispatch_atomic_store_barrier();
824 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
,
825 (dispatch_queue_t
)dqsq
))) {
826 _dispatch_release((dispatch_queue_t
)dqsq
);
831 _dispatch_queue_set_specific(void *ctxt
)
833 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
834 dispatch_queue_specific_queue_t dqsq
=
835 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
837 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
838 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
839 // Destroy previous context for existing key
840 if (dqs
->dqs_destructor
) {
841 dispatch_async_f(_dispatch_get_root_queue(
842 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
843 dqs
->dqs_destructor
);
845 if (dqsn
->dqs_ctxt
) {
846 // Copy new context for existing key
847 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
848 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
850 // Remove context storage for existing key
851 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
857 // Insert context storage for new key
858 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
863 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
864 void *ctxt
, dispatch_function_t destructor
)
866 if (slowpath(!key
)) {
869 dispatch_queue_specific_t dqs
;
871 dqs
= calloc(1, sizeof(struct dispatch_queue_specific_s
));
873 dqs
->dqs_ctxt
= ctxt
;
874 dqs
->dqs_destructor
= destructor
;
875 if (slowpath(!dq
->dq_specific_q
)) {
876 _dispatch_queue_init_specific(dq
);
878 dispatch_barrier_async_f(dq
->dq_specific_q
, dqs
,
879 _dispatch_queue_set_specific
);
883 _dispatch_queue_get_specific(void *ctxt
)
887 dispatch_queue_specific_queue_t dqsq
=
888 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
889 dispatch_queue_specific_t dqs
;
891 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
892 if (dqs
->dqs_key
== key
) {
893 *ctxtp
= dqs
->dqs_ctxt
;
902 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
904 if (slowpath(!key
)) {
909 if (fastpath(dq
->dq_specific_q
)) {
911 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
918 dispatch_get_specific(const void *key
)
920 if (slowpath(!key
)) {
924 dispatch_queue_t dq
= _dispatch_queue_get_current();
926 while (slowpath(dq
)) {
927 if (slowpath(dq
->dq_specific_q
)) {
929 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
930 _dispatch_queue_get_specific
);
939 #pragma mark dispatch_queue_debug
942 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
944 dispatch_queue_t target
= dq
->do_targetq
;
945 return snprintf(buf
, bufsiz
, "target = %s[%p], width = 0x%x, "
946 "running = 0x%x, barrier = %d ", target
? target
->dq_label
: "",
947 target
, dq
->dq_width
/ 2, dq
->dq_running
/ 2, dq
->dq_running
& 1);
951 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
954 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
956 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
957 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
958 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "}");
964 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
966 dispatch_debug(dq
, "%s", str
);
968 _dispatch_log("queue[NULL]: %s", str
);
973 #if DISPATCH_PERF_MON
974 static OSSpinLock _dispatch_stats_lock
;
975 static size_t _dispatch_bad_ratio
;
978 uint64_t count_total
;
979 uint64_t thread_total
;
980 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
983 _dispatch_queue_merge_stats(uint64_t start
)
985 uint64_t avg
, delta
= _dispatch_absolute_time() - start
;
986 unsigned long count
, bucket
;
988 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
989 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
998 // 64-bit counters on 32-bit require a lock or a queue
999 OSSpinLockLock(&_dispatch_stats_lock
);
1001 _dispatch_stats
[bucket
].time_total
+= delta
;
1002 _dispatch_stats
[bucket
].count_total
+= count
;
1003 _dispatch_stats
[bucket
].thread_total
++;
1005 OSSpinLockUnlock(&_dispatch_stats_lock
);
1010 #pragma mark dispatch_continuation_t
1012 static malloc_zone_t
*_dispatch_ccache_zone
;
1015 _dispatch_ccache_init(void *context DISPATCH_UNUSED
)
1017 _dispatch_ccache_zone
= malloc_create_zone(0, 0);
1018 dispatch_assert(_dispatch_ccache_zone
);
1019 malloc_set_zone_name(_dispatch_ccache_zone
, "DispatchContinuations");
1022 dispatch_continuation_t
1023 _dispatch_continuation_alloc_from_heap(void)
1025 static dispatch_once_t pred
;
1026 dispatch_continuation_t dc
;
1028 dispatch_once_f(&pred
, NULL
, _dispatch_ccache_init
);
1030 // This is also used for allocating struct dispatch_apply_s. If the
1031 // ROUND_UP behavior is changed, adjust the assert in libdispatch_init
1032 while (!(dc
= fastpath(malloc_zone_calloc(_dispatch_ccache_zone
, 1,
1033 ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc
)))))) {
1041 _dispatch_force_cache_cleanup(void)
1043 dispatch_continuation_t dc
;
1044 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1046 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1047 _dispatch_cache_cleanup(dc
);
1051 // rdar://problem/11500155
1053 dispatch_flush_continuation_cache(void)
1055 _dispatch_force_cache_cleanup();
1060 _dispatch_cache_cleanup(void *value
)
1062 dispatch_continuation_t dc
, next_dc
= value
;
1064 while ((dc
= next_dc
)) {
1065 next_dc
= dc
->do_next
;
1066 malloc_zone_free(_dispatch_ccache_zone
, dc
);
1070 DISPATCH_ALWAYS_INLINE_NDEBUG
1072 _dispatch_continuation_redirect(dispatch_queue_t dq
, dispatch_object_t dou
)
1074 dispatch_continuation_t dc
= dou
._dc
;
1076 _dispatch_trace_continuation_pop(dq
, dou
);
1077 (void)dispatch_atomic_add2o(dq
, dq_running
, 2);
1078 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
1079 (long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
) {
1080 dispatch_atomic_barrier();
1081 _dispatch_thread_semaphore_signal(
1082 (_dispatch_thread_semaphore_t
)dc
->dc_ctxt
);
1084 _dispatch_async_f_redirect(dq
, dc
);
1088 DISPATCH_ALWAYS_INLINE_NDEBUG
1090 _dispatch_continuation_pop(dispatch_object_t dou
)
1092 dispatch_continuation_t dc
= dou
._dc
;
1093 dispatch_group_t dg
;
1095 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou
);
1096 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
1097 return _dispatch_queue_invoke(dou
._dq
);
1100 // Add the item back to the cache before calling the function. This
1101 // allows the 'hot' continuation to be used for a quick callback.
1103 // The ccache version is per-thread.
1104 // Therefore, the object has not been reused yet.
1105 // This generates better assembly.
1106 if ((long)dc
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
1107 _dispatch_continuation_free(dc
);
1109 if ((long)dc
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
1114 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
1116 dispatch_group_leave(dg
);
1117 _dispatch_release(dg
);
1122 #pragma mark dispatch_barrier_async
1126 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1127 dispatch_function_t func
)
1129 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1131 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1135 _dispatch_queue_push(dq
, dc
);
1140 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
1141 dispatch_function_t func
)
1143 dispatch_continuation_t dc
;
1145 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1147 return _dispatch_barrier_async_f_slow(dq
, ctxt
, func
);
1150 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1154 _dispatch_queue_push(dq
, dc
);
1159 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
1161 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
),
1162 _dispatch_call_block_and_release
);
1167 #pragma mark dispatch_async
1170 _dispatch_async_f_redirect_invoke(void *_ctxt
)
1172 struct dispatch_continuation_s
*dc
= _ctxt
;
1173 struct dispatch_continuation_s
*other_dc
= dc
->dc_other
;
1174 dispatch_queue_t old_dq
, dq
= dc
->dc_data
, rq
;
1176 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1177 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1178 _dispatch_continuation_pop(other_dc
);
1179 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1181 rq
= dq
->do_targetq
;
1182 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
1183 if (dispatch_atomic_sub2o(rq
, dq_running
, 2) == 0) {
1184 _dispatch_wakeup(rq
);
1186 rq
= rq
->do_targetq
;
1189 if (dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0) {
1190 _dispatch_wakeup(dq
);
1192 _dispatch_release(dq
);
1197 _dispatch_async_f2_slow(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1199 _dispatch_wakeup(dq
);
1200 _dispatch_queue_push(dq
, dc
);
1205 _dispatch_async_f_redirect(dispatch_queue_t dq
,
1206 dispatch_continuation_t other_dc
)
1208 dispatch_continuation_t dc
;
1209 dispatch_queue_t rq
;
1211 _dispatch_retain(dq
);
1213 dc
= _dispatch_continuation_alloc();
1215 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1216 dc
->dc_func
= _dispatch_async_f_redirect_invoke
;
1219 dc
->dc_other
= other_dc
;
1221 // Find the queue to redirect to
1222 rq
= dq
->do_targetq
;
1223 while (slowpath(rq
->do_targetq
)) {
1226 if (slowpath(rq
->dq_items_tail
) ||
1227 slowpath(DISPATCH_OBJECT_SUSPENDED(rq
)) ||
1228 slowpath(rq
->dq_width
== 1)) {
1231 running
= dispatch_atomic_add2o(rq
, dq_running
, 2) - 2;
1232 if (slowpath(running
& 1) || slowpath(running
+ 2 > rq
->dq_width
)) {
1233 if (slowpath(dispatch_atomic_sub2o(rq
, dq_running
, 2) == 0)) {
1234 return _dispatch_async_f2_slow(rq
, dc
);
1238 rq
= rq
->do_targetq
;
1240 _dispatch_queue_push(rq
, dc
);
1245 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1251 if (slowpath(dq
->dq_items_tail
)
1252 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1255 running
= dispatch_atomic_add2o(dq
, dq_running
, 2);
1256 if (slowpath(running
> dq
->dq_width
)) {
1257 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1258 return _dispatch_async_f2_slow(dq
, dc
);
1262 locked
= running
& 1;
1263 if (fastpath(!locked
)) {
1264 return _dispatch_async_f_redirect(dq
, dc
);
1266 locked
= dispatch_atomic_sub2o(dq
, dq_running
, 2) & 1;
1267 // We might get lucky and find that the barrier has ended by now
1270 _dispatch_queue_push(dq
, dc
);
1275 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1276 dispatch_function_t func
)
1278 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1280 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1284 // No fastpath/slowpath hint because we simply don't know
1285 if (dq
->do_targetq
) {
1286 return _dispatch_async_f2(dq
, dc
);
1289 _dispatch_queue_push(dq
, dc
);
1294 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1296 dispatch_continuation_t dc
;
1298 // No fastpath/slowpath hint because we simply don't know
1299 if (dq
->dq_width
== 1) {
1300 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1303 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1305 return _dispatch_async_f_slow(dq
, ctxt
, func
);
1308 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1312 // No fastpath/slowpath hint because we simply don't know
1313 if (dq
->do_targetq
) {
1314 return _dispatch_async_f2(dq
, dc
);
1317 _dispatch_queue_push(dq
, dc
);
1322 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
1324 dispatch_async_f(dq
, _dispatch_Block_copy(work
),
1325 _dispatch_call_block_and_release
);
1330 #pragma mark dispatch_group_async
1334 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
1335 dispatch_function_t func
)
1337 dispatch_continuation_t dc
;
1339 _dispatch_retain(dg
);
1340 dispatch_group_enter(dg
);
1342 dc
= _dispatch_continuation_alloc();
1344 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_GROUP_BIT
);
1349 // No fastpath/slowpath hint because we simply don't know
1350 if (dq
->dq_width
!= 1 && dq
->do_targetq
) {
1351 return _dispatch_async_f2(dq
, dc
);
1354 _dispatch_queue_push(dq
, dc
);
1359 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
1360 dispatch_block_t db
)
1362 dispatch_group_async_f(dg
, dq
, _dispatch_Block_copy(db
),
1363 _dispatch_call_block_and_release
);
1368 #pragma mark dispatch_function_invoke
1370 DISPATCH_ALWAYS_INLINE
1372 _dispatch_function_invoke(dispatch_queue_t dq
, void *ctxt
,
1373 dispatch_function_t func
)
1375 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1376 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1377 _dispatch_client_callout(ctxt
, func
);
1378 _dispatch_workitem_inc();
1379 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1382 struct dispatch_function_recurse_s
{
1383 dispatch_queue_t dfr_dq
;
1385 dispatch_function_t dfr_func
;
1389 _dispatch_function_recurse_invoke(void *ctxt
)
1391 struct dispatch_function_recurse_s
*dfr
= ctxt
;
1392 _dispatch_function_invoke(dfr
->dfr_dq
, dfr
->dfr_ctxt
, dfr
->dfr_func
);
1395 DISPATCH_ALWAYS_INLINE
1397 _dispatch_function_recurse(dispatch_queue_t dq
, void *ctxt
,
1398 dispatch_function_t func
)
1400 struct dispatch_function_recurse_s dfr
= {
1405 dispatch_sync_f(dq
->do_targetq
, &dfr
, _dispatch_function_recurse_invoke
);
1409 #pragma mark dispatch_barrier_sync
1411 struct dispatch_barrier_sync_slow_s
{
1412 DISPATCH_CONTINUATION_HEADER(barrier_sync_slow
);
1415 struct dispatch_barrier_sync_slow2_s
{
1416 dispatch_queue_t dbss2_dq
;
1417 #if DISPATCH_COCOA_COMPAT
1418 dispatch_function_t dbss2_func
;
1421 _dispatch_thread_semaphore_t dbss2_sema
;
1424 DISPATCH_ALWAYS_INLINE_NDEBUG
1425 static inline _dispatch_thread_semaphore_t
1426 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq
, dispatch_object_t dou
,
1429 dispatch_continuation_t dc
= dou
._dc
;
1431 if (DISPATCH_OBJ_IS_VTABLE(dc
) || ((long)dc
->do_vtable
&
1432 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) !=
1433 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) {
1436 _dispatch_trace_continuation_pop(dq
, dc
);
1437 _dispatch_workitem_inc();
1439 struct dispatch_barrier_sync_slow_s
*dbssp
= (void *)dc
;
1440 struct dispatch_barrier_sync_slow2_s
*dbss2
= dbssp
->dc_ctxt
;
1442 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, do_suspend_cnt
,
1443 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1444 // rdar://problem/9032024 running lock must be held until sync_f_slow
1446 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, dq_running
, 2);
1448 return dbss2
->dbss2_sema
? dbss2
->dbss2_sema
: MACH_PORT_DEAD
;
1452 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
1454 struct dispatch_barrier_sync_slow2_s
*dbss2
= ctxt
;
1456 dispatch_assert(dbss2
->dbss2_dq
== _dispatch_queue_get_current());
1457 #if DISPATCH_COCOA_COMPAT
1458 // When the main queue is bound to the main thread
1459 if (dbss2
->dbss2_dq
== &_dispatch_main_q
&& pthread_main_np()) {
1460 dbss2
->dbss2_func(dbss2
->dbss2_ctxt
);
1461 dbss2
->dbss2_func
= NULL
;
1462 dispatch_atomic_barrier();
1463 _dispatch_thread_semaphore_signal(dbss2
->dbss2_sema
);
1467 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, do_suspend_cnt
,
1468 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1469 // rdar://9032024 running lock must be held until sync_f_slow returns
1470 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, dq_running
, 2);
1471 dispatch_atomic_barrier();
1472 _dispatch_thread_semaphore_signal(dbss2
->dbss2_sema
);
1477 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
1478 dispatch_function_t func
)
1480 // It's preferred to execute synchronous blocks on the current thread
1481 // due to thread-local side effects, garbage collection, etc. However,
1482 // blocks submitted to the main thread MUST be run on the main thread
1484 struct dispatch_barrier_sync_slow2_s dbss2
= {
1486 #if DISPATCH_COCOA_COMPAT
1490 .dbss2_sema
= _dispatch_get_thread_semaphore(),
1492 struct dispatch_barrier_sync_slow_s dbss
= {
1493 .do_vtable
= (void *)(DISPATCH_OBJ_BARRIER_BIT
|
1494 DISPATCH_OBJ_SYNC_SLOW_BIT
),
1495 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
1498 _dispatch_queue_push(dq
, (void *)&dbss
);
1500 _dispatch_thread_semaphore_wait(dbss2
.dbss2_sema
);
1501 _dispatch_put_thread_semaphore(dbss2
.dbss2_sema
);
1503 #if DISPATCH_COCOA_COMPAT
1504 // Main queue bound to main thread
1505 if (dbss2
.dbss2_func
== NULL
) {
1509 dispatch_atomic_acquire_barrier();
1510 if (slowpath(dq
->do_targetq
) && slowpath(dq
->do_targetq
->do_targetq
)) {
1511 _dispatch_function_recurse(dq
, ctxt
, func
);
1513 _dispatch_function_invoke(dq
, ctxt
, func
);
1515 dispatch_atomic_release_barrier();
1516 if (fastpath(dq
->do_suspend_cnt
< 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL
) &&
1517 dq
->dq_running
== 2) {
1518 // rdar://problem/8290662 "lock transfer"
1519 _dispatch_thread_semaphore_t sema
;
1520 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1522 _dispatch_thread_semaphore_signal(sema
);
1526 (void)dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
1527 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1528 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1529 _dispatch_wakeup(dq
);
1535 _dispatch_barrier_sync_f2(dispatch_queue_t dq
)
1537 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1538 // rdar://problem/8290662 "lock transfer"
1539 _dispatch_thread_semaphore_t sema
;
1540 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1542 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1543 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1544 // rdar://9032024 running lock must be held until sync_f_slow
1545 // returns: increment by 2 and decrement by 1
1546 (void)dispatch_atomic_inc2o(dq
, dq_running
);
1547 _dispatch_thread_semaphore_signal(sema
);
1551 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1552 _dispatch_wakeup(dq
);
1558 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1559 dispatch_function_t func
)
1561 dispatch_atomic_acquire_barrier();
1562 _dispatch_function_invoke(dq
, ctxt
, func
);
1563 dispatch_atomic_release_barrier();
1564 if (slowpath(dq
->dq_items_tail
)) {
1565 return _dispatch_barrier_sync_f2(dq
);
1567 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1568 _dispatch_wakeup(dq
);
1574 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1575 dispatch_function_t func
)
1577 dispatch_atomic_acquire_barrier();
1578 _dispatch_function_recurse(dq
, ctxt
, func
);
1579 dispatch_atomic_release_barrier();
1580 if (slowpath(dq
->dq_items_tail
)) {
1581 return _dispatch_barrier_sync_f2(dq
);
1583 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1584 _dispatch_wakeup(dq
);
1590 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
1591 dispatch_function_t func
)
1593 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1594 // 2) the queue is not suspended
1595 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1596 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1598 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1))) {
1599 // global queues and main queue bound to main thread always falls into
1601 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1603 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1604 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
);
1606 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
1610 #if DISPATCH_COCOA_COMPAT
1613 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1615 // Blocks submitted to the main queue MUST be run on the main thread,
1616 // therefore under GC we must Block_copy in order to notify the thread-local
1617 // garbage collector that the objects are transferring to the main thread
1618 // rdar://problem/7176237&7181849&7458685
1619 if (dispatch_begin_thread_4GC
) {
1620 dispatch_block_t block
= _dispatch_Block_copy(work
);
1621 return dispatch_barrier_sync_f(dq
, block
,
1622 _dispatch_call_block_and_release
);
1624 struct Block_basic
*bb
= (void *)work
;
1625 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1630 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
1632 #if DISPATCH_COCOA_COMPAT
1633 if (slowpath(dq
== &_dispatch_main_q
)) {
1634 return _dispatch_barrier_sync_slow(dq
, work
);
1637 struct Block_basic
*bb
= (void *)work
;
1638 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1643 #pragma mark dispatch_sync
1647 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1649 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1650 struct dispatch_sync_slow_s
{
1651 DISPATCH_CONTINUATION_HEADER(sync_slow
);
1653 .do_vtable
= (void*)DISPATCH_OBJ_SYNC_SLOW_BIT
,
1654 .dc_ctxt
= (void*)sema
,
1656 _dispatch_queue_push(dq
, (void *)&dss
);
1658 _dispatch_thread_semaphore_wait(sema
);
1659 _dispatch_put_thread_semaphore(sema
);
1661 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1662 _dispatch_function_recurse(dq
, ctxt
, func
);
1664 _dispatch_function_invoke(dq
, ctxt
, func
);
1666 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1667 _dispatch_wakeup(dq
);
1673 _dispatch_sync_f_slow2(dispatch_queue_t dq
, void *ctxt
,
1674 dispatch_function_t func
)
1676 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1677 _dispatch_wakeup(dq
);
1679 _dispatch_sync_f_slow(dq
, ctxt
, func
);
1684 _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1685 dispatch_function_t func
)
1687 _dispatch_function_invoke(dq
, ctxt
, func
);
1688 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1689 _dispatch_wakeup(dq
);
1695 _dispatch_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1696 dispatch_function_t func
)
1698 _dispatch_function_recurse(dq
, ctxt
, func
);
1699 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1700 _dispatch_wakeup(dq
);
1706 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1708 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1709 // 2) the queue is not suspended
1710 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1711 return _dispatch_sync_f_slow(dq
, ctxt
, func
);
1713 if (slowpath(dispatch_atomic_add2o(dq
, dq_running
, 2) & 1)) {
1714 return _dispatch_sync_f_slow2(dq
, ctxt
, func
);
1716 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1717 return _dispatch_sync_f_recurse(dq
, ctxt
, func
);
1719 _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1724 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1726 if (fastpath(dq
->dq_width
== 1)) {
1727 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
1729 if (slowpath(!dq
->do_targetq
)) {
1730 // the global root queues do not need strict ordering
1731 (void)dispatch_atomic_add2o(dq
, dq_running
, 2);
1732 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1734 _dispatch_sync_f2(dq
, ctxt
, func
);
1738 #if DISPATCH_COCOA_COMPAT
1741 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1743 // Blocks submitted to the main queue MUST be run on the main thread,
1744 // therefore under GC we must Block_copy in order to notify the thread-local
1745 // garbage collector that the objects are transferring to the main thread
1746 // rdar://problem/7176237&7181849&7458685
1747 if (dispatch_begin_thread_4GC
) {
1748 dispatch_block_t block
= _dispatch_Block_copy(work
);
1749 return dispatch_sync_f(dq
, block
, _dispatch_call_block_and_release
);
1751 struct Block_basic
*bb
= (void *)work
;
1752 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1757 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
1759 #if DISPATCH_COCOA_COMPAT
1760 if (slowpath(dq
== &_dispatch_main_q
)) {
1761 return _dispatch_sync_slow(dq
, work
);
1764 struct Block_basic
*bb
= (void *)work
;
1765 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1770 #pragma mark dispatch_after
1772 struct _dispatch_after_time_s
{
1774 void (*datc_func
)(void *);
1775 dispatch_source_t ds
;
1779 _dispatch_after_timer_callback(void *ctxt
)
1781 struct _dispatch_after_time_s
*datc
= ctxt
;
1783 dispatch_assert(datc
->datc_func
);
1784 _dispatch_client_callout(datc
->datc_ctxt
, datc
->datc_func
);
1786 dispatch_source_t ds
= datc
->ds
;
1789 dispatch_source_cancel(ds
); // Needed until 7287561 gets integrated
1790 dispatch_release(ds
);
1795 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
1796 dispatch_function_t func
)
1799 struct _dispatch_after_time_s
*datc
= NULL
;
1800 dispatch_source_t ds
;
1802 if (when
== DISPATCH_TIME_FOREVER
) {
1804 DISPATCH_CLIENT_CRASH(
1805 "dispatch_after_f() called with 'when' == infinity");
1810 // this function can and should be optimized to not use a dispatch source
1811 delta
= _dispatch_timeout(when
);
1813 return dispatch_async_f(queue
, ctxt
, func
);
1815 // on successful creation, source owns malloc-ed context (which it frees in
1816 // the event handler)
1817 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, queue
);
1818 dispatch_assert(ds
);
1820 datc
= malloc(sizeof(*datc
));
1821 dispatch_assert(datc
);
1822 datc
->datc_ctxt
= ctxt
;
1823 datc
->datc_func
= func
;
1826 dispatch_set_context(ds
, datc
);
1827 dispatch_source_set_event_handler_f(ds
, _dispatch_after_timer_callback
);
1828 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, 0);
1829 dispatch_resume(ds
);
1834 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1835 dispatch_block_t work
)
1837 // test before the copy of the block
1838 if (when
== DISPATCH_TIME_FOREVER
) {
1840 DISPATCH_CLIENT_CRASH(
1841 "dispatch_after() called with 'when' == infinity");
1845 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
),
1846 _dispatch_call_block_and_release
);
1851 #pragma mark dispatch_wakeup
1855 _dispatch_queue_push_list_slow2(dispatch_queue_t dq
,
1856 struct dispatch_object_s
*obj
)
1858 // The queue must be retained before dq_items_head is written in order
1859 // to ensure that the reference is still valid when _dispatch_wakeup is
1860 // called. Otherwise, if preempted between the assignment to
1861 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
1862 // queue may release the last reference to the queue when invoked by
1863 // _dispatch_queue_drain. <rdar://problem/6932776>
1864 _dispatch_retain(dq
);
1865 dq
->dq_items_head
= obj
;
1866 _dispatch_wakeup(dq
);
1867 _dispatch_release(dq
);
1872 _dispatch_queue_push_list_slow(dispatch_queue_t dq
,
1873 struct dispatch_object_s
*obj
, unsigned int n
)
1875 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_TYPE
) {
1876 dq
->dq_items_head
= obj
;
1877 return _dispatch_queue_wakeup_global2(dq
, n
);
1879 _dispatch_queue_push_list_slow2(dq
, obj
);
1884 _dispatch_queue_push_slow(dispatch_queue_t dq
,
1885 struct dispatch_object_s
*obj
)
1887 if (dx_type(dq
) == DISPATCH_QUEUE_GLOBAL_TYPE
) {
1888 dq
->dq_items_head
= obj
;
1889 return _dispatch_queue_wakeup_global(dq
);
1891 _dispatch_queue_push_list_slow2(dq
, obj
);
1894 // 6618342 Contact the team that owns the Instrument DTrace probe before
1895 // renaming this symbol
1897 _dispatch_wakeup(dispatch_object_t dou
)
1899 dispatch_queue_t tq
;
1901 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
1904 if (!dx_probe(dou
._do
) && !dou
._dq
->dq_items_tail
) {
1908 // _dispatch_source_invoke() relies on this testing the whole suspend count
1909 // word, not just the lock bit. In other words, no point taking the lock
1910 // if the source is suspended or canceled.
1911 if (!dispatch_atomic_cmpxchg2o(dou
._do
, do_suspend_cnt
, 0,
1912 DISPATCH_OBJECT_SUSPEND_LOCK
)) {
1913 #if DISPATCH_COCOA_COMPAT
1914 if (dou
._dq
== &_dispatch_main_q
) {
1915 return _dispatch_queue_wakeup_main();
1920 dispatch_atomic_acquire_barrier();
1921 _dispatch_retain(dou
._do
);
1922 tq
= dou
._do
->do_targetq
;
1923 _dispatch_queue_push(tq
, dou
._do
);
1924 return tq
; // libdispatch does not need this, but the Instrument DTrace
1928 #if DISPATCH_COCOA_COMPAT
1931 _dispatch_queue_wakeup_main(void)
1935 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
1936 _dispatch_main_q_port_init
);
1940 kr
= _dispatch_send_wakeup_main_thread(main_q_port
, 0);
1943 case MACH_SEND_TIMEOUT
:
1944 case MACH_SEND_TIMED_OUT
:
1945 case MACH_SEND_INVALID_DEST
:
1948 (void)dispatch_assume_zero(kr
);
1957 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq
, unsigned int n
)
1959 static dispatch_once_t pred
;
1960 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1963 dispatch_debug_queue(dq
, __func__
);
1964 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
1966 #if HAVE_PTHREAD_WORKQUEUES
1967 #if DISPATCH_ENABLE_THREAD_POOL
1968 if (qc
->dgq_kworkqueue
!= (void*)(~0ul))
1971 _dispatch_debug("requesting new worker thread");
1972 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
1973 if (qc
->dgq_kworkqueue
) {
1974 pthread_workitem_handle_t wh
;
1975 unsigned int gen_cnt
, i
= n
;
1977 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
1978 _dispatch_worker_thread3
, dq
, &wh
, &gen_cnt
);
1979 (void)dispatch_assume_zero(r
);
1983 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
1984 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
1985 r
= pthread_workqueue_addthreads_np(qc
->dgq_wq_priority
,
1986 qc
->dgq_wq_options
, n
);
1987 (void)dispatch_assume_zero(r
);
1991 #endif // HAVE_PTHREAD_WORKQUEUES
1992 #if DISPATCH_ENABLE_THREAD_POOL
1993 if (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
2000 t_count
= qc
->dgq_thread_pool_size
;
2002 _dispatch_debug("The thread pool is full: %p", dq
);
2005 } while (!dispatch_atomic_cmpxchg2o(qc
, dgq_thread_pool_size
, t_count
,
2008 while ((r
= pthread_create(&pthr
, NULL
, _dispatch_worker_thread
, dq
))) {
2010 (void)dispatch_assume_zero(r
);
2014 r
= pthread_detach(pthr
);
2015 (void)dispatch_assume_zero(r
);
2016 #endif // DISPATCH_ENABLE_THREAD_POOL
2020 _dispatch_queue_wakeup_global2(dispatch_queue_t dq
, unsigned int n
)
2022 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2024 if (!dq
->dq_items_tail
) {
2027 #if HAVE_PTHREAD_WORKQUEUES
2029 #if DISPATCH_ENABLE_THREAD_POOL
2030 (qc
->dgq_kworkqueue
!= (void*)(~0ul)) &&
2032 !dispatch_atomic_cmpxchg2o(qc
, dgq_pending
, 0, n
)) {
2033 _dispatch_debug("work thread request still pending on global queue: "
2037 #endif // HAVE_PTHREAD_WORKQUEUES
2038 return _dispatch_queue_wakeup_global_slow(dq
, n
);
2042 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
2044 return _dispatch_queue_wakeup_global2(dq
, 1);
2048 _dispatch_queue_probe_root(dispatch_queue_t dq
)
2050 _dispatch_queue_wakeup_global2(dq
, 1);
2055 #pragma mark dispatch_queue_drain
2057 // 6618342 Contact the team that owns the Instrument DTrace probe before
2058 // renaming this symbol
2061 _dispatch_queue_invoke(dispatch_queue_t dq
)
2063 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) &&
2064 fastpath(dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1))) {
2065 dispatch_atomic_acquire_barrier();
2066 dispatch_queue_t otq
= dq
->do_targetq
, tq
= NULL
;
2067 _dispatch_thread_semaphore_t sema
= _dispatch_queue_drain(dq
);
2068 if (dq
->do_vtable
->do_invoke
) {
2069 // Assume that object invoke checks it is executing on correct queue
2071 } else if (slowpath(otq
!= dq
->do_targetq
)) {
2072 // An item on the queue changed the target queue
2073 tq
= dq
->do_targetq
;
2075 // We do not need to check the result.
2076 // When the suspend-count lock is dropped, then the check will happen.
2077 dispatch_atomic_release_barrier();
2078 (void)dispatch_atomic_dec2o(dq
, dq_running
);
2080 _dispatch_thread_semaphore_signal(sema
);
2082 return _dispatch_queue_push(tq
, dq
);
2086 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
2087 dispatch_atomic_release_barrier();
2088 if (!dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
2089 DISPATCH_OBJECT_SUSPEND_LOCK
)) {
2090 if (dq
->dq_running
== 0) {
2091 _dispatch_wakeup(dq
); // verify that the queue is idle
2094 _dispatch_release(dq
); // added when the queue is put on the list
2097 static _dispatch_thread_semaphore_t
2098 _dispatch_queue_drain(dispatch_queue_t dq
)
2100 dispatch_queue_t orig_tq
, old_dq
;
2101 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2102 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
2103 _dispatch_thread_semaphore_t sema
= 0;
2105 // Continue draining sources after target queue change rdar://8928171
2106 bool check_tq
= (dx_type(dq
) != DISPATCH_SOURCE_KEVENT_TYPE
);
2108 orig_tq
= dq
->do_targetq
;
2110 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2111 //dispatch_debug_queue(dq, __func__);
2113 while (dq
->dq_items_tail
) {
2114 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2115 _dispatch_hardware_pause();
2117 dq
->dq_items_head
= NULL
;
2119 next_dc
= fastpath(dc
->do_next
);
2121 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2122 // Enqueue is TIGHTLY controlled, we won't wait long.
2123 while (!(next_dc
= fastpath(dc
->do_next
))) {
2124 _dispatch_hardware_pause();
2127 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
2130 if (dq
->dq_running
> dq
->dq_width
) {
2133 if (slowpath(orig_tq
!= dq
->do_targetq
) && check_tq
) {
2136 if (!fastpath(dq
->dq_width
== 1)) {
2137 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
2138 (long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
2139 if (dq
->dq_running
> 1) {
2143 _dispatch_continuation_redirect(dq
, dc
);
2147 if ((sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, true))) {
2151 _dispatch_continuation_pop(dc
);
2152 _dispatch_workitem_inc();
2153 } while ((dc
= next_dc
));
2157 // if this is not a complete drain, we must undo some things
2159 // 'dc' must NOT be "popped"
2160 // 'dc' might be the last item
2162 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, NULL
, dc
)) {
2163 // wait for enqueue slow path to finish
2164 while (!(next_dc
= fastpath(dq
->dq_items_head
))) {
2165 _dispatch_hardware_pause();
2167 dc
->do_next
= next_dc
;
2169 dq
->dq_items_head
= dc
;
2172 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2177 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
)
2179 #if DISPATCH_PERF_MON
2180 uint64_t start
= _dispatch_absolute_time();
2182 _dispatch_thread_semaphore_t sema
= _dispatch_queue_drain(dq
);
2184 dispatch_atomic_barrier();
2185 _dispatch_thread_semaphore_signal(sema
);
2187 #if DISPATCH_PERF_MON
2188 _dispatch_queue_merge_stats(start
);
2190 _dispatch_force_cache_cleanup();
2193 #if DISPATCH_COCOA_COMPAT
2195 _dispatch_main_queue_drain(void)
2197 dispatch_queue_t dq
= &_dispatch_main_q
;
2198 if (!dq
->dq_items_tail
) {
2201 struct dispatch_main_queue_drain_marker_s
{
2202 DISPATCH_CONTINUATION_HEADER(main_queue_drain_marker
);
2206 struct dispatch_object_s
*dmarker
= (void*)&marker
;
2207 _dispatch_queue_push_notrace(dq
, dmarker
);
2209 #if DISPATCH_PERF_MON
2210 uint64_t start
= _dispatch_absolute_time();
2212 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2213 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2215 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
2216 while (dq
->dq_items_tail
) {
2217 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2218 _dispatch_hardware_pause();
2220 dq
->dq_items_head
= NULL
;
2222 next_dc
= fastpath(dc
->do_next
);
2224 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2225 // Enqueue is TIGHTLY controlled, we won't wait long.
2226 while (!(next_dc
= fastpath(dc
->do_next
))) {
2227 _dispatch_hardware_pause();
2230 if (dc
== dmarker
) {
2232 dq
->dq_items_head
= next_dc
;
2233 _dispatch_queue_wakeup_main();
2237 _dispatch_continuation_pop(dc
);
2238 _dispatch_workitem_inc();
2239 } while ((dc
= next_dc
));
2241 dispatch_assert(dc
); // did not encounter marker
2244 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2245 #if DISPATCH_PERF_MON
2246 _dispatch_queue_merge_stats(start
);
2248 _dispatch_force_cache_cleanup();
2252 DISPATCH_ALWAYS_INLINE_NDEBUG
2253 static inline _dispatch_thread_semaphore_t
2254 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
)
2256 // rdar://problem/8290662 "lock transfer"
2257 struct dispatch_object_s
*dc
, *next_dc
;
2258 _dispatch_thread_semaphore_t sema
;
2260 // queue is locked, or suspended and not being drained
2261 dc
= dq
->dq_items_head
;
2262 if (slowpath(!dc
) || !(sema
= _dispatch_barrier_sync_f_pop(dq
, dc
, false))){
2265 // dequeue dc, it is a barrier sync
2266 next_dc
= fastpath(dc
->do_next
);
2267 dq
->dq_items_head
= next_dc
;
2268 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2269 // Enqueue is TIGHTLY controlled, we won't wait long.
2270 while (!(next_dc
= fastpath(dc
->do_next
))) {
2271 _dispatch_hardware_pause();
2273 dq
->dq_items_head
= next_dc
;
2278 #ifndef DISPATCH_HEAD_CONTENTION_SPINS
2279 #define DISPATCH_HEAD_CONTENTION_SPINS 10000
2282 static struct dispatch_object_s
*
2283 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
2285 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
2288 // The mediator value acts both as a "lock" and a signal
2289 head
= dispatch_atomic_xchg2o(dq
, dq_items_head
, mediator
);
2291 if (slowpath(head
== NULL
)) {
2292 // The first xchg on the tail will tell the enqueueing thread that it
2293 // is safe to blindly write out to the head pointer. A cmpxchg honors
2295 (void)dispatch_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
, NULL
);
2296 _dispatch_debug("no work on global work queue");
2300 if (slowpath(head
== mediator
)) {
2301 // This thread lost the race for ownership of the queue.
2302 // Spin for a short while in case many threads have started draining at
2303 // once as part of a dispatch_apply
2304 unsigned int i
= DISPATCH_HEAD_CONTENTION_SPINS
;
2306 _dispatch_hardware_pause();
2307 if (dq
->dq_items_head
!= mediator
) goto start
;
2309 // The ratio of work to libdispatch overhead must be bad. This
2310 // scenario implies that there are too many threads in the pool.
2311 // Create a new pending thread and then exit this thread.
2312 // The kernel will grant a new thread when the load subsides.
2313 _dispatch_debug("Contention on queue: %p", dq
);
2314 _dispatch_queue_wakeup_global(dq
);
2315 #if DISPATCH_PERF_MON
2316 dispatch_atomic_inc(&_dispatch_bad_ratio
);
2321 // Restore the head pointer to a sane value before returning.
2322 // If 'next' is NULL, then this item _might_ be the last item.
2323 next
= fastpath(head
->do_next
);
2325 if (slowpath(!next
)) {
2326 dq
->dq_items_head
= NULL
;
2328 if (dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
)) {
2329 // both head and tail are NULL now
2333 // There must be a next item now. This thread won't wait long.
2334 while (!(next
= head
->do_next
)) {
2335 _dispatch_hardware_pause();
2339 dq
->dq_items_head
= next
;
2340 _dispatch_queue_wakeup_global(dq
);
2346 #pragma mark dispatch_worker_thread
2349 _dispatch_worker_thread4(dispatch_queue_t dq
)
2351 struct dispatch_object_s
*item
;
2355 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
2356 DISPATCH_CRASH("Premature thread recycling");
2359 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2361 #if DISPATCH_COCOA_COMPAT
2362 (void)dispatch_atomic_inc(&_dispatch_worker_threads
);
2363 // ensure that high-level memory management techniques do not leak/crash
2364 if (dispatch_begin_thread_4GC
) {
2365 dispatch_begin_thread_4GC();
2367 void *pool
= _dispatch_autorelease_pool_push();
2368 #endif // DISPATCH_COCOA_COMPAT
2370 #if DISPATCH_PERF_MON
2371 uint64_t start
= _dispatch_absolute_time();
2373 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
2374 _dispatch_continuation_pop(item
);
2376 #if DISPATCH_PERF_MON
2377 _dispatch_queue_merge_stats(start
);
2380 #if DISPATCH_COCOA_COMPAT
2381 _dispatch_autorelease_pool_pop(pool
);
2382 if (dispatch_end_thread_4GC
) {
2383 dispatch_end_thread_4GC();
2385 if (!dispatch_atomic_dec(&_dispatch_worker_threads
) &&
2386 dispatch_no_worker_threads_4GC
) {
2387 dispatch_no_worker_threads_4GC();
2389 #endif // DISPATCH_COCOA_COMPAT
2391 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
2393 _dispatch_force_cache_cleanup();
2397 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2399 _dispatch_worker_thread3(void *context
)
2401 dispatch_queue_t dq
= context
;
2402 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2404 (void)dispatch_atomic_dec2o(qc
, dgq_pending
);
2405 _dispatch_worker_thread4(dq
);
2409 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2410 // 6618342 Contact the team that owns the Instrument DTrace probe before
2411 // renaming this symbol
2413 _dispatch_worker_thread2(int priority
, int options
,
2414 void *context DISPATCH_UNUSED
)
2416 dispatch_assert(priority
>= 0 && priority
< WORKQ_NUM_PRIOQUEUE
);
2417 dispatch_assert(!(options
& ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT
));
2418 dispatch_queue_t dq
= _dispatch_wq2root_queues
[priority
][options
];
2419 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2421 (void)dispatch_atomic_dec2o(qc
, dgq_pending
);
2422 _dispatch_worker_thread4(dq
);
2426 #if DISPATCH_ENABLE_THREAD_POOL
2427 // 6618342 Contact the team that owns the Instrument DTrace probe before
2428 // renaming this symbol
2430 _dispatch_worker_thread(void *context
)
2432 dispatch_queue_t dq
= context
;
2433 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2437 // workaround tweaks the kernel workqueue does for us
2438 r
= sigfillset(&mask
);
2439 (void)dispatch_assume_zero(r
);
2440 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
2441 (void)dispatch_assume_zero(r
);
2444 _dispatch_worker_thread4(dq
);
2445 // we use 65 seconds in case there are any timers that run once a minute
2446 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
,
2447 dispatch_time(0, 65ull * NSEC_PER_SEC
)) == 0);
2449 (void)dispatch_atomic_inc2o(qc
, dgq_thread_pool_size
);
2450 if (dq
->dq_items_tail
) {
2451 _dispatch_queue_wakeup_global(dq
);
2458 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
2462 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2464 r
= sigdelset(set
, SIGILL
);
2465 (void)dispatch_assume_zero(r
);
2466 r
= sigdelset(set
, SIGTRAP
);
2467 (void)dispatch_assume_zero(r
);
2468 #if HAVE_DECL_SIGEMT
2469 r
= sigdelset(set
, SIGEMT
);
2470 (void)dispatch_assume_zero(r
);
2472 r
= sigdelset(set
, SIGFPE
);
2473 (void)dispatch_assume_zero(r
);
2474 r
= sigdelset(set
, SIGBUS
);
2475 (void)dispatch_assume_zero(r
);
2476 r
= sigdelset(set
, SIGSEGV
);
2477 (void)dispatch_assume_zero(r
);
2478 r
= sigdelset(set
, SIGSYS
);
2479 (void)dispatch_assume_zero(r
);
2480 r
= sigdelset(set
, SIGPIPE
);
2481 (void)dispatch_assume_zero(r
);
2483 return pthread_sigmask(how
, set
, oset
);
2488 #pragma mark dispatch_main_queue
2490 static bool _dispatch_program_is_probably_callback_driven
;
2492 #if DISPATCH_COCOA_COMPAT
2494 _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED
)
2498 _dispatch_safe_fork
= false;
2499 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
2501 DISPATCH_VERIFY_MIG(kr
);
2502 (void)dispatch_assume_zero(kr
);
2503 kr
= mach_port_insert_right(mach_task_self(), main_q_port
, main_q_port
,
2504 MACH_MSG_TYPE_MAKE_SEND
);
2505 DISPATCH_VERIFY_MIG(kr
);
2506 (void)dispatch_assume_zero(kr
);
2508 _dispatch_program_is_probably_callback_driven
= true;
2512 _dispatch_get_main_queue_port_4CF(void)
2514 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
2515 _dispatch_main_q_port_init
);
2519 static bool main_q_is_draining
;
2521 // 6618342 Contact the team that owns the Instrument DTrace probe before
2522 // renaming this symbol
2525 _dispatch_queue_set_mainq_drain_state(bool arg
)
2527 main_q_is_draining
= arg
;
2531 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg DISPATCH_UNUSED
)
2533 if (main_q_is_draining
) {
2536 _dispatch_queue_set_mainq_drain_state(true);
2537 _dispatch_main_queue_drain();
2538 _dispatch_queue_set_mainq_drain_state(false);
2546 #if HAVE_PTHREAD_MAIN_NP
2547 if (pthread_main_np()) {
2549 _dispatch_program_is_probably_callback_driven
= true;
2551 DISPATCH_CRASH("pthread_exit() returned");
2552 #if HAVE_PTHREAD_MAIN_NP
2554 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
2558 DISPATCH_NOINLINE DISPATCH_NORETURN
2560 _dispatch_sigsuspend(void)
2562 static const sigset_t mask
;
2564 #if DISPATCH_COCOA_COMPAT
2565 // Do not count the signal handling thread as a worker thread
2566 (void)dispatch_atomic_dec(&_dispatch_worker_threads
);
2575 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
2577 // never returns, so burn bridges behind us
2578 _dispatch_clear_stack(0);
2579 _dispatch_sigsuspend();
2584 _dispatch_queue_cleanup2(void)
2586 (void)dispatch_atomic_dec(&_dispatch_main_q
.dq_running
);
2588 dispatch_atomic_release_barrier();
2589 if (dispatch_atomic_sub2o(&_dispatch_main_q
, do_suspend_cnt
,
2590 DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
2591 _dispatch_wakeup(&_dispatch_main_q
);
2594 // overload the "probably" variable to mean that dispatch_main() or
2595 // similar non-POSIX API was called
2596 // this has to run before the DISPATCH_COCOA_COMPAT below
2597 if (_dispatch_program_is_probably_callback_driven
) {
2598 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL
,
2599 _dispatch_sig_thread
);
2600 sleep(1); // workaround 6778970
2603 #if DISPATCH_COCOA_COMPAT
2604 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
2605 _dispatch_main_q_port_init
);
2607 mach_port_t mp
= main_q_port
;
2613 kr
= mach_port_deallocate(mach_task_self(), mp
);
2614 DISPATCH_VERIFY_MIG(kr
);
2615 (void)dispatch_assume_zero(kr
);
2616 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
,
2618 DISPATCH_VERIFY_MIG(kr
);
2619 (void)dispatch_assume_zero(kr
);
2625 _dispatch_queue_cleanup(void *ctxt
)
2627 if (ctxt
== &_dispatch_main_q
) {
2628 return _dispatch_queue_cleanup2();
2630 // POSIX defines that destructors are only called if 'ctxt' is non-null
2631 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
2635 #pragma mark dispatch_manager_queue
2637 static unsigned int _dispatch_select_workaround
;
2638 static fd_set _dispatch_rfds
;
2639 static fd_set _dispatch_wfds
;
2640 static void **_dispatch_rfd_ptrs
;
2641 static void **_dispatch_wfd_ptrs
;
2643 static int _dispatch_kq
;
2646 _dispatch_get_kq_init(void *context DISPATCH_UNUSED
)
2648 static const struct kevent kev
= {
2650 .filter
= EVFILT_USER
,
2651 .flags
= EV_ADD
|EV_CLEAR
,
2654 _dispatch_safe_fork
= false;
2655 _dispatch_kq
= kqueue();
2656 if (_dispatch_kq
== -1) {
2657 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2658 "probably out of file descriptors");
2659 } else if (dispatch_assume(_dispatch_kq
< FD_SETSIZE
)) {
2660 // in case we fall back to select()
2661 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
2664 (void)dispatch_assume_zero(kevent(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
));
2666 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
2670 _dispatch_get_kq(void)
2672 static dispatch_once_t pred
;
2674 dispatch_once_f(&pred
, NULL
, _dispatch_get_kq_init
);
2676 return _dispatch_kq
;
2680 _dispatch_update_kq(const struct kevent
*kev
)
2683 struct kevent kev_copy
= *kev
;
2684 // This ensures we don't get a pending kevent back while registering
2686 kev_copy
.flags
|= EV_RECEIPT
;
2688 if (_dispatch_select_workaround
&& (kev_copy
.flags
& EV_DELETE
)) {
2689 // Only executed on manager queue
2690 switch (kev_copy
.filter
) {
2692 if (kev_copy
.ident
< FD_SETSIZE
&&
2693 FD_ISSET((int)kev_copy
.ident
, &_dispatch_rfds
)) {
2694 FD_CLR((int)kev_copy
.ident
, &_dispatch_rfds
);
2695 _dispatch_rfd_ptrs
[kev_copy
.ident
] = 0;
2696 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2701 if (kev_copy
.ident
< FD_SETSIZE
&&
2702 FD_ISSET((int)kev_copy
.ident
, &_dispatch_wfds
)) {
2703 FD_CLR((int)kev_copy
.ident
, &_dispatch_wfds
);
2704 _dispatch_wfd_ptrs
[kev_copy
.ident
] = 0;
2705 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2715 rval
= kevent(_dispatch_get_kq(), &kev_copy
, 1, &kev_copy
, 1, NULL
);
2717 // If we fail to register with kevents, for other reasons aside from
2718 // changelist elements.
2724 _dispatch_bug_client("Do not close random Unix descriptors");
2727 (void)dispatch_assume_zero(err
);
2730 //kev_copy.flags |= EV_ERROR;
2731 //kev_copy.data = err;
2735 // The following select workaround only applies to adding kevents
2736 if ((kev
->flags
& (EV_DISABLE
|EV_DELETE
)) ||
2737 !(kev
->flags
& (EV_ADD
|EV_ENABLE
))) {
2741 // Only executed on manager queue
2742 switch (kev_copy
.data
) {
2748 // If an error occurred while registering with kevent, and it was
2749 // because of a kevent changelist processing && the kevent involved
2750 // either doing a read or write, it would indicate we were trying
2751 // to register a /dev/* port; fall back to select
2752 switch (kev_copy
.filter
) {
2754 if (dispatch_assume(kev_copy
.ident
< FD_SETSIZE
)) {
2755 if (!_dispatch_rfd_ptrs
) {
2756 _dispatch_rfd_ptrs
= calloc(FD_SETSIZE
, sizeof(void*));
2758 _dispatch_rfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2759 FD_SET((int)kev_copy
.ident
, &_dispatch_rfds
);
2760 (void)dispatch_atomic_inc(&_dispatch_select_workaround
);
2761 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2762 (int)kev_copy
.ident
, (long)kev_copy
.data
);
2767 if (dispatch_assume(kev_copy
.ident
< FD_SETSIZE
)) {
2768 if (!_dispatch_wfd_ptrs
) {
2769 _dispatch_wfd_ptrs
= calloc(FD_SETSIZE
, sizeof(void*));
2771 _dispatch_wfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2772 FD_SET((int)kev_copy
.ident
, &_dispatch_wfds
);
2773 (void)dispatch_atomic_inc(&_dispatch_select_workaround
);
2774 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2775 (int)kev_copy
.ident
, (long)kev_copy
.data
);
2780 // kevent error, _dispatch_source_merge_kevent() will handle it
2781 _dispatch_source_drain_kevent(&kev_copy
);
2786 return kev_copy
.data
;
2790 _dispatch_mgr_wakeup(dispatch_queue_t dq
)
2792 static const struct kevent kev
= {
2794 .filter
= EVFILT_USER
,
2795 .fflags
= NOTE_TRIGGER
,
2798 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq
);
2800 _dispatch_update_kq(&kev
);
2806 _dispatch_mgr_thread2(struct kevent
*kev
, size_t cnt
)
2810 for (i
= 0; i
< cnt
; i
++) {
2811 // EVFILT_USER isn't used by sources
2812 if (kev
[i
].filter
== EVFILT_USER
) {
2813 // If _dispatch_mgr_thread2() ever is changed to return to the
2814 // caller, then this should become _dispatch_queue_drain()
2815 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q
);
2817 _dispatch_source_drain_kevent(&kev
[i
]);
2822 #if DISPATCH_USE_VM_PRESSURE && DISPATCH_USE_MALLOC_VM_PRESSURE_SOURCE
2823 // VM Pressure source for malloc <rdar://problem/7805121>
2824 static dispatch_source_t _dispatch_malloc_vm_pressure_source
;
2827 _dispatch_malloc_vm_pressure_handler(void *context DISPATCH_UNUSED
)
2829 malloc_zone_pressure_relief(0,0);
2833 _dispatch_malloc_vm_pressure_setup(void)
2835 _dispatch_malloc_vm_pressure_source
= dispatch_source_create(
2836 DISPATCH_SOURCE_TYPE_VM
, 0, DISPATCH_VM_PRESSURE
,
2837 _dispatch_get_root_queue(0, true));
2838 dispatch_source_set_event_handler_f(_dispatch_malloc_vm_pressure_source
,
2839 _dispatch_malloc_vm_pressure_handler
);
2840 dispatch_resume(_dispatch_malloc_vm_pressure_source
);
2843 #define _dispatch_malloc_vm_pressure_setup()
2846 DISPATCH_NOINLINE DISPATCH_NORETURN
2848 _dispatch_mgr_invoke(void)
2850 static const struct timespec timeout_immediately
= { 0, 0 };
2851 struct timespec timeout
;
2852 const struct timespec
*timeoutp
;
2853 struct timeval sel_timeout
, *sel_timeoutp
;
2854 fd_set tmp_rfds
, tmp_wfds
;
2855 struct kevent kev
[1];
2856 int k_cnt
, err
, i
, r
;
2858 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_mgr_q
);
2859 #if DISPATCH_COCOA_COMPAT
2860 // Do not count the manager thread as a worker thread
2861 (void)dispatch_atomic_dec(&_dispatch_worker_threads
);
2863 _dispatch_malloc_vm_pressure_setup();
2866 _dispatch_run_timers();
2868 timeoutp
= _dispatch_get_next_timer_fire(&timeout
);
2870 if (_dispatch_select_workaround
) {
2871 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
2872 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
2874 sel_timeout
.tv_sec
= timeoutp
->tv_sec
;
2875 sel_timeout
.tv_usec
= (typeof(sel_timeout
.tv_usec
))
2876 (timeoutp
->tv_nsec
/ 1000u);
2877 sel_timeoutp
= &sel_timeout
;
2879 sel_timeoutp
= NULL
;
2882 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
, sel_timeoutp
);
2887 (void)dispatch_assume_zero(err
);
2891 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2892 if (i
== _dispatch_kq
) {
2895 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
,
2903 if (FD_ISSET(i
, &_dispatch_rfds
)) {
2904 FD_CLR(i
, &_dispatch_rfds
);
2905 _dispatch_rfd_ptrs
[i
] = 0;
2906 (void)dispatch_atomic_dec(
2907 &_dispatch_select_workaround
);
2909 if (FD_ISSET(i
, &_dispatch_wfds
)) {
2910 FD_CLR(i
, &_dispatch_wfds
);
2911 _dispatch_wfd_ptrs
[i
] = 0;
2912 (void)dispatch_atomic_dec(
2913 &_dispatch_select_workaround
);
2921 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2922 if (i
== _dispatch_kq
) {
2925 if (FD_ISSET(i
, &tmp_rfds
)) {
2926 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISABLE
2927 EV_SET(&kev
[0], i
, EVFILT_READ
,
2928 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2929 _dispatch_rfd_ptrs
[i
]);
2930 _dispatch_rfd_ptrs
[i
] = 0;
2931 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2932 _dispatch_mgr_thread2(kev
, 1);
2934 if (FD_ISSET(i
, &tmp_wfds
)) {
2935 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISABLE
2936 EV_SET(&kev
[0], i
, EVFILT_WRITE
,
2937 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2938 _dispatch_wfd_ptrs
[i
]);
2939 _dispatch_wfd_ptrs
[i
] = 0;
2940 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2941 _dispatch_mgr_thread2(kev
, 1);
2946 timeoutp
= &timeout_immediately
;
2949 k_cnt
= kevent(_dispatch_kq
, NULL
, 0, kev
, sizeof(kev
) / sizeof(kev
[0]),
2956 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2959 (void)dispatch_assume_zero(err
);
2963 _dispatch_mgr_thread2(kev
, (size_t)k_cnt
);
2966 _dispatch_force_cache_cleanup();
2974 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
)
2976 // never returns, so burn bridges behind us & clear stack 2k ahead
2977 _dispatch_clear_stack(2048);
2978 _dispatch_mgr_invoke();