2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
31 static void _dispatch_cache_cleanup(void *value
);
32 static void _dispatch_async_f_redirect(dispatch_queue_t dq
,
33 dispatch_continuation_t dc
);
34 static void _dispatch_queue_cleanup(void *ctxt
);
35 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
36 static void _dispatch_queue_drain(dispatch_queue_t dq
);
37 static inline _dispatch_thread_semaphore_t
38 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
);
39 static void _dispatch_worker_thread2(void *context
);
40 #if DISPATCH_ENABLE_THREAD_POOL
41 static void *_dispatch_worker_thread(void *context
);
42 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
44 static bool _dispatch_mgr_wakeup(dispatch_queue_t dq
);
45 static dispatch_queue_t
_dispatch_mgr_thread(dispatch_queue_t dq
);
47 #if DISPATCH_COCOA_COMPAT
48 static unsigned int _dispatch_worker_threads
;
49 static dispatch_once_t _dispatch_main_q_port_pred
;
50 static mach_port_t main_q_port
;
52 static void _dispatch_main_q_port_init(void *ctxt
);
53 static void _dispatch_queue_wakeup_main(void);
54 static void _dispatch_main_queue_drain(void);
58 #pragma mark dispatch_queue_vtable
60 const struct dispatch_queue_vtable_s _dispatch_queue_vtable
= {
61 .do_type
= DISPATCH_QUEUE_TYPE
,
63 .do_dispose
= _dispatch_queue_dispose
,
65 .do_probe
= (void *)dummy_function_r0
,
66 .do_debug
= dispatch_queue_debug
,
69 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable
= {
70 .do_type
= DISPATCH_QUEUE_GLOBAL_TYPE
,
71 .do_kind
= "global-queue",
72 .do_debug
= dispatch_queue_debug
,
73 .do_probe
= _dispatch_queue_wakeup_global
,
76 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable
= {
77 .do_type
= DISPATCH_QUEUE_MGR_TYPE
,
78 .do_kind
= "mgr-queue",
79 .do_invoke
= _dispatch_mgr_thread
,
80 .do_debug
= dispatch_queue_debug
,
81 .do_probe
= _dispatch_mgr_wakeup
,
85 #pragma mark dispatch_root_queue
87 #if HAVE_PTHREAD_WORKQUEUES
88 static const int _dispatch_root_queue_wq_priorities
[] = {
89 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = WORKQ_LOW_PRIOQUEUE
,
90 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = WORKQ_LOW_PRIOQUEUE
,
91 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = WORKQ_DEFAULT_PRIOQUEUE
,
92 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] =
93 WORKQ_DEFAULT_PRIOQUEUE
,
94 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = WORKQ_HIGH_PRIOQUEUE
,
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = WORKQ_HIGH_PRIOQUEUE
,
96 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = WORKQ_BG_PRIOQUEUE
,
97 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] =
102 #if DISPATCH_ENABLE_THREAD_POOL
103 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
104 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
105 .do_vtable
= &_dispatch_semaphore_vtable
,
106 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
107 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
109 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
110 .do_vtable
= &_dispatch_semaphore_vtable
,
111 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
112 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
114 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
115 .do_vtable
= &_dispatch_semaphore_vtable
,
116 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
117 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
119 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
120 .do_vtable
= &_dispatch_semaphore_vtable
,
121 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
122 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
124 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
125 .do_vtable
= &_dispatch_semaphore_vtable
,
126 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
127 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
129 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
130 .do_vtable
= &_dispatch_semaphore_vtable
,
131 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
132 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
134 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
135 .do_vtable
= &_dispatch_semaphore_vtable
,
136 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
137 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
139 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
140 .do_vtable
= &_dispatch_semaphore_vtable
,
141 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
142 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
147 #define MAX_THREAD_COUNT 255
149 struct dispatch_root_queue_context_s
{
150 #if HAVE_PTHREAD_WORKQUEUES
151 pthread_workqueue_t dgq_kworkqueue
;
153 uint32_t dgq_pending
;
154 #if DISPATCH_ENABLE_THREAD_POOL
155 uint32_t dgq_thread_pool_size
;
156 dispatch_semaphore_t dgq_thread_mediator
;
160 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
161 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
162 #if DISPATCH_ENABLE_THREAD_POOL
163 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
164 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
165 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
168 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
169 #if DISPATCH_ENABLE_THREAD_POOL
170 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
171 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
172 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
175 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
176 #if DISPATCH_ENABLE_THREAD_POOL
177 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
178 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
179 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
182 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
183 #if DISPATCH_ENABLE_THREAD_POOL
184 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
185 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
186 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
189 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
190 #if DISPATCH_ENABLE_THREAD_POOL
191 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
192 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
193 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
196 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
197 #if DISPATCH_ENABLE_THREAD_POOL
198 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
199 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
200 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
204 #if DISPATCH_ENABLE_THREAD_POOL
205 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
206 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
207 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
210 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
211 #if DISPATCH_ENABLE_THREAD_POOL
212 .dgq_thread_mediator
= &_dispatch_thread_mediator
[
213 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
214 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
219 // 6618342 Contact the team that owns the Instrument DTrace probe before
220 // renaming this symbol
221 // dq_running is set to 2 so that barrier operations go through the slow path
222 DISPATCH_CACHELINE_ALIGN
223 struct dispatch_queue_s _dispatch_root_queues
[] = {
224 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
] = {
225 .do_vtable
= &_dispatch_queue_root_vtable
,
226 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
227 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
228 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
229 .do_ctxt
= &_dispatch_root_queue_contexts
[
230 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY
],
232 .dq_label
= "com.apple.root.low-priority",
234 .dq_width
= UINT32_MAX
,
237 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
] = {
238 .do_vtable
= &_dispatch_queue_root_vtable
,
239 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
240 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
241 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
242 .do_ctxt
= &_dispatch_root_queue_contexts
[
243 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY
],
245 .dq_label
= "com.apple.root.low-overcommit-priority",
247 .dq_width
= UINT32_MAX
,
250 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
] = {
251 .do_vtable
= &_dispatch_queue_root_vtable
,
252 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
253 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
254 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
255 .do_ctxt
= &_dispatch_root_queue_contexts
[
256 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY
],
258 .dq_label
= "com.apple.root.default-priority",
260 .dq_width
= UINT32_MAX
,
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
] = {
264 .do_vtable
= &_dispatch_queue_root_vtable
,
265 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
266 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
267 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
268 .do_ctxt
= &_dispatch_root_queue_contexts
[
269 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
],
271 .dq_label
= "com.apple.root.default-overcommit-priority",
273 .dq_width
= UINT32_MAX
,
276 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
] = {
277 .do_vtable
= &_dispatch_queue_root_vtable
,
278 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
279 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
280 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
281 .do_ctxt
= &_dispatch_root_queue_contexts
[
282 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY
],
284 .dq_label
= "com.apple.root.high-priority",
286 .dq_width
= UINT32_MAX
,
289 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
] = {
290 .do_vtable
= &_dispatch_queue_root_vtable
,
291 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
292 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
293 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
294 .do_ctxt
= &_dispatch_root_queue_contexts
[
295 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
297 .dq_label
= "com.apple.root.high-overcommit-priority",
299 .dq_width
= UINT32_MAX
,
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
] = {
303 .do_vtable
= &_dispatch_queue_root_vtable
,
304 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
305 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
306 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
307 .do_ctxt
= &_dispatch_root_queue_contexts
[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY
],
310 .dq_label
= "com.apple.root.background-priority",
312 .dq_width
= UINT32_MAX
,
315 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
] = {
316 .do_vtable
= &_dispatch_queue_root_vtable
,
317 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
318 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
319 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
320 .do_ctxt
= &_dispatch_root_queue_contexts
[
321 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY
],
323 .dq_label
= "com.apple.root.background-overcommit-priority",
325 .dq_width
= UINT32_MAX
,
330 // 6618342 Contact the team that owns the Instrument DTrace probe before
331 // renaming this symbol
332 DISPATCH_CACHELINE_ALIGN
333 struct dispatch_queue_s _dispatch_mgr_q
= {
334 .do_vtable
= &_dispatch_queue_mgr_vtable
,
335 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
336 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
337 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
338 .do_targetq
= &_dispatch_root_queues
[
339 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY
],
341 .dq_label
= "com.apple.libdispatch-manager",
347 dispatch_get_global_queue(long priority
, unsigned long flags
)
349 if (flags
& ~DISPATCH_QUEUE_OVERCOMMIT
) {
352 return _dispatch_get_root_queue(priority
,
353 flags
& DISPATCH_QUEUE_OVERCOMMIT
);
357 dispatch_get_current_queue(void)
359 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
363 #pragma mark dispatch_init
366 _dispatch_hw_config_init(void)
368 _dispatch_hw_config
.cc_max_active
= _dispatch_get_activecpu();
369 _dispatch_hw_config
.cc_max_logical
= _dispatch_get_logicalcpu_max();
370 _dispatch_hw_config
.cc_max_physical
= _dispatch_get_physicalcpu_max();
374 _dispatch_root_queues_init_workq(void)
377 #if HAVE_PTHREAD_WORKQUEUES
378 #if DISPATCH_ENABLE_THREAD_POOL
379 if (slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"))) return result
;
382 pthread_workqueue_attr_t pwq_attr
;
383 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
384 (void)dispatch_assume_zero(r
);
385 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
386 pthread_workqueue_t pwq
= NULL
;
387 const int prio
= _dispatch_root_queue_wq_priorities
[i
];
389 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
, prio
);
390 (void)dispatch_assume_zero(r
);
391 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
, i
& 1);
392 (void)dispatch_assume_zero(r
);
393 r
= pthread_workqueue_create_np(&pwq
, &pwq_attr
);
394 (void)dispatch_assume_zero(r
);
395 result
= result
|| dispatch_assume(pwq
);
396 _dispatch_root_queue_contexts
[i
].dgq_kworkqueue
= pwq
;
398 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
399 (void)dispatch_assume_zero(r
);
400 #endif // HAVE_PTHREAD_WORKQUEUES
405 _dispatch_root_queues_init_thread_pool(void)
407 #if DISPATCH_ENABLE_THREAD_POOL
409 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
410 #if TARGET_OS_EMBEDDED
411 // some software hangs if the non-overcommitting queues do not
412 // overcommit when threads block. Someday, this behavior should apply
415 _dispatch_root_queue_contexts
[i
].dgq_thread_pool_size
=
416 _dispatch_hw_config
.cc_max_active
;
420 // override the default FIFO behavior for the pool semaphores
421 kern_return_t kr
= semaphore_create(mach_task_self(),
422 &_dispatch_thread_mediator
[i
].dsema_port
, SYNC_POLICY_LIFO
, 0);
423 DISPATCH_VERIFY_MIG(kr
);
424 (void)dispatch_assume_zero(kr
);
425 (void)dispatch_assume(_dispatch_thread_mediator
[i
].dsema_port
);
427 /* XXXRW: POSIX semaphores don't support LIFO? */
428 int ret
= sem_init(&_dispatch_thread_mediator
[i
].dsema_sem
, 0, 0);
429 (void)dispatch_assume_zero(ret
);
433 DISPATCH_CRASH("Thread pool creation failed");
434 #endif // DISPATCH_ENABLE_THREAD_POOL
438 _dispatch_root_queues_init(void *context DISPATCH_UNUSED
)
440 if (!_dispatch_root_queues_init_workq()) {
441 _dispatch_root_queues_init_thread_pool();
446 #define countof(x) (sizeof(x) / sizeof(x[0]))
448 DISPATCH_EXPORT DISPATCH_NOTHROW
450 libdispatch_init(void)
452 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 4);
453 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 8);
455 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
==
456 -DISPATCH_QUEUE_PRIORITY_HIGH
);
457 dispatch_assert(countof(_dispatch_root_queues
) ==
458 DISPATCH_ROOT_QUEUE_COUNT
);
459 dispatch_assert(countof(_dispatch_root_queue_contexts
) ==
460 DISPATCH_ROOT_QUEUE_COUNT
);
461 #if HAVE_PTHREAD_WORKQUEUES
462 dispatch_assert(countof(_dispatch_root_queue_wq_priorities
) ==
463 DISPATCH_ROOT_QUEUE_COUNT
);
465 #if DISPATCH_ENABLE_THREAD_POOL
466 dispatch_assert(countof(_dispatch_thread_mediator
) ==
467 DISPATCH_ROOT_QUEUE_COUNT
);
469 dispatch_assert(sizeof(struct dispatch_source_s
) ==
470 sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_CACHELINE_PAD
);
472 dispatch_assert(sizeof(struct dispatch_queue_s
) % DISPATCH_CACHELINE_SIZE
476 _dispatch_thread_key_create(&dispatch_queue_key
, _dispatch_queue_cleanup
);
477 _dispatch_thread_key_create(&dispatch_sema4_key
,
478 (void (*)(void *))_dispatch_thread_semaphore_dispose
);
479 _dispatch_thread_key_create(&dispatch_cache_key
, _dispatch_cache_cleanup
);
480 _dispatch_thread_key_create(&dispatch_io_key
, NULL
);
481 _dispatch_thread_key_create(&dispatch_apply_key
, NULL
);
482 #if DISPATCH_PERF_MON
483 _dispatch_thread_key_create(&dispatch_bcounter_key
, NULL
);
486 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
487 _dispatch_main_q
.do_vtable
= &_dispatch_queue_vtable
;
488 _dispatch_main_q
.do_targetq
= &_dispatch_root_queues
[
489 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
];
490 _dispatch_data_empty
.do_vtable
= &_dispatch_data_vtable
;
493 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
495 #if DISPATCH_USE_PTHREAD_ATFORK
496 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare
,
497 dispatch_atfork_parent
, dispatch_atfork_child
));
500 _dispatch_hw_config_init();
503 DISPATCH_EXPORT DISPATCH_NOTHROW
505 dispatch_atfork_child(void)
507 void *crash
= (void *)0x100;
510 if (_dispatch_safe_fork
) {
514 _dispatch_main_q
.dq_items_head
= crash
;
515 _dispatch_main_q
.dq_items_tail
= crash
;
517 _dispatch_mgr_q
.dq_items_head
= crash
;
518 _dispatch_mgr_q
.dq_items_tail
= crash
;
520 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
521 _dispatch_root_queues
[i
].dq_items_head
= crash
;
522 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
527 #pragma mark dispatch_queue_t
533 // 4,5,6,7,8,9,10,11 - global queues
534 // we use 'xadd' on Intel, so the initial value == next assigned
535 unsigned long _dispatch_queue_serial_numbers
= 12;
538 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
547 label_len
= strlen(label
);
548 if (label_len
< (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1)) {
549 label_len
= (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1);
552 // XXX switch to malloc()
553 dq
= calloc(1ul, sizeof(struct dispatch_queue_s
) -
554 DISPATCH_QUEUE_MIN_LABEL_SIZE
- DISPATCH_QUEUE_CACHELINE_PAD
+
560 _dispatch_queue_init(dq
);
561 strcpy(dq
->dq_label
, label
);
563 if (fastpath(!attr
)) {
566 if (fastpath(attr
== DISPATCH_QUEUE_CONCURRENT
)) {
567 dq
->dq_width
= UINT32_MAX
;
568 dq
->do_targetq
= _dispatch_get_root_queue(0, false);
570 dispatch_debug_assert(!attr
, "Invalid attribute");
575 // 6618342 Contact the team that owns the Instrument DTrace probe before
576 // renaming this symbol
578 _dispatch_queue_dispose(dispatch_queue_t dq
)
580 if (slowpath(dq
== _dispatch_queue_get_current())) {
581 DISPATCH_CRASH("Release of a queue by itself");
583 if (slowpath(dq
->dq_items_tail
)) {
584 DISPATCH_CRASH("Release of a queue while items are enqueued");
587 // trash the tail queue so that use after free will crash
588 dq
->dq_items_tail
= (void *)0x200;
590 dispatch_queue_t dqsq
= dispatch_atomic_xchg2o(dq
, dq_specific_q
,
593 _dispatch_release(dqsq
);
596 _dispatch_dispose(dq
);
600 dispatch_queue_get_label(dispatch_queue_t dq
)
606 _dispatch_queue_set_width2(void *ctxt
)
608 int w
= (int)(intptr_t)ctxt
; // intentional truncation
610 dispatch_queue_t dq
= _dispatch_queue_get_current();
612 if (w
== 1 || w
== 0) {
619 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
620 tmp
= _dispatch_hw_config
.cc_max_physical
;
622 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
623 tmp
= _dispatch_hw_config
.cc_max_active
;
627 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
628 tmp
= _dispatch_hw_config
.cc_max_logical
;
631 // multiply by two since the running count is inc/dec by two
632 // (the low bit == barrier)
633 dq
->dq_width
= tmp
* 2;
637 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
639 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
642 dispatch_barrier_async_f(dq
, (void*)(intptr_t)width
,
643 _dispatch_queue_set_width2
);
646 // 6618342 Contact the team that owns the Instrument DTrace probe before
647 // renaming this symbol
649 _dispatch_set_target_queue2(void *ctxt
)
651 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
653 prev_dq
= dq
->do_targetq
;
654 dq
->do_targetq
= ctxt
;
655 _dispatch_release(prev_dq
);
659 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
661 dispatch_queue_t prev_dq
;
664 if (slowpath(dou
._do
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
667 type
= dx_type(dou
._do
) & _DISPATCH_META_TYPE_MASK
;
669 bool is_concurrent_q
= (type
== _DISPATCH_QUEUE_TYPE
&&
670 slowpath(dou
._dq
->dq_width
> 1));
671 dq
= _dispatch_get_root_queue(0, !is_concurrent_q
);
673 // TODO: put into the vtable
675 case _DISPATCH_QUEUE_TYPE
:
676 case _DISPATCH_SOURCE_TYPE
:
677 _dispatch_retain(dq
);
678 return dispatch_barrier_async_f(dou
._dq
, dq
,
679 _dispatch_set_target_queue2
);
680 case _DISPATCH_IO_TYPE
:
681 return _dispatch_io_set_target_queue(dou
._dchannel
, dq
);
683 _dispatch_retain(dq
);
684 dispatch_atomic_store_barrier();
685 prev_dq
= dispatch_atomic_xchg2o(dou
._do
, do_targetq
, dq
);
686 if (prev_dq
) _dispatch_release(prev_dq
);
692 dispatch_set_current_target_queue(dispatch_queue_t dq
)
694 dispatch_queue_t queue
= _dispatch_queue_get_current();
696 if (slowpath(!queue
)) {
697 DISPATCH_CLIENT_CRASH("SPI not called from a queue");
699 if (slowpath(queue
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
700 DISPATCH_CLIENT_CRASH("SPI not supported on this queue");
702 if (slowpath(queue
->dq_width
!= 1)) {
703 DISPATCH_CLIENT_CRASH("SPI not called from a serial queue");
706 dq
= _dispatch_get_root_queue(0, true);
708 _dispatch_retain(dq
);
709 _dispatch_set_target_queue2(dq
);
713 #pragma mark dispatch_queue_specific
715 struct dispatch_queue_specific_queue_s
{
716 DISPATCH_STRUCT_HEADER(dispatch_queue_specific_queue_s
,
717 dispatch_queue_specific_queue_vtable_s
);
718 DISPATCH_QUEUE_HEADER
;
720 char _dqsq_pad
[DISPATCH_QUEUE_MIN_LABEL_SIZE
];
723 TAILQ_HEAD(dispatch_queue_specific_head_s
,
724 dispatch_queue_specific_s
) dqsq_contexts
;
728 DISPATCH_DECL(dispatch_queue_specific_queue
);
731 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
);
733 struct dispatch_queue_specific_queue_vtable_s
{
734 DISPATCH_VTABLE_HEADER(dispatch_queue_specific_queue_s
);
737 static const struct dispatch_queue_specific_queue_vtable_s
738 _dispatch_queue_specific_queue_vtable
= {
739 .do_type
= DISPATCH_QUEUE_SPECIFIC_TYPE
,
740 .do_kind
= "queue-context",
741 .do_dispose
= _dispatch_queue_specific_queue_dispose
,
743 .do_probe
= (void *)dummy_function_r0
,
744 .do_debug
= (void *)dispatch_queue_debug
,
747 struct dispatch_queue_specific_s
{
750 dispatch_function_t dqs_destructor
;
751 TAILQ_ENTRY(dispatch_queue_specific_s
) dqs_list
;
753 DISPATCH_DECL(dispatch_queue_specific
);
756 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq
)
758 dispatch_queue_specific_t dqs
, tmp
;
760 TAILQ_FOREACH_SAFE(dqs
, &dqsq
->dqsq_contexts
, dqs_list
, tmp
) {
761 if (dqs
->dqs_destructor
) {
762 dispatch_async_f(_dispatch_get_root_queue(
763 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
764 dqs
->dqs_destructor
);
768 _dispatch_queue_dispose((dispatch_queue_t
)dqsq
);
772 _dispatch_queue_init_specific(dispatch_queue_t dq
)
774 dispatch_queue_specific_queue_t dqsq
;
776 dqsq
= calloc(1ul, sizeof(struct dispatch_queue_specific_queue_s
));
777 _dispatch_queue_init((dispatch_queue_t
)dqsq
);
778 dqsq
->do_vtable
= &_dispatch_queue_specific_queue_vtable
;
779 dqsq
->do_xref_cnt
= 0;
780 dqsq
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH
,
782 dqsq
->dq_width
= UINT32_MAX
;
783 strlcpy(dqsq
->dq_label
, "queue-specific", sizeof(dqsq
->dq_label
));
784 TAILQ_INIT(&dqsq
->dqsq_contexts
);
785 dispatch_atomic_store_barrier();
786 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_specific_q
, NULL
, dqsq
))) {
787 _dispatch_release((dispatch_queue_t
)dqsq
);
792 _dispatch_queue_set_specific(void *ctxt
)
794 dispatch_queue_specific_t dqs
, dqsn
= ctxt
;
795 dispatch_queue_specific_queue_t dqsq
=
796 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
798 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
799 if (dqs
->dqs_key
== dqsn
->dqs_key
) {
800 // Destroy previous context for existing key
801 if (dqs
->dqs_destructor
) {
802 dispatch_async_f(_dispatch_get_root_queue(
803 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false), dqs
->dqs_ctxt
,
804 dqs
->dqs_destructor
);
806 if (dqsn
->dqs_ctxt
) {
807 // Copy new context for existing key
808 dqs
->dqs_ctxt
= dqsn
->dqs_ctxt
;
809 dqs
->dqs_destructor
= dqsn
->dqs_destructor
;
811 // Remove context storage for existing key
812 TAILQ_REMOVE(&dqsq
->dqsq_contexts
, dqs
, dqs_list
);
818 // Insert context storage for new key
819 TAILQ_INSERT_TAIL(&dqsq
->dqsq_contexts
, dqsn
, dqs_list
);
824 dispatch_queue_set_specific(dispatch_queue_t dq
, const void *key
,
825 void *ctxt
, dispatch_function_t destructor
)
827 if (slowpath(!key
)) {
830 dispatch_queue_specific_t dqs
;
832 dqs
= calloc(1, sizeof(struct dispatch_queue_specific_s
));
834 dqs
->dqs_ctxt
= ctxt
;
835 dqs
->dqs_destructor
= destructor
;
836 if (slowpath(!dq
->dq_specific_q
)) {
837 _dispatch_queue_init_specific(dq
);
839 dispatch_barrier_async_f(dq
->dq_specific_q
, dqs
,
840 _dispatch_queue_set_specific
);
844 _dispatch_queue_get_specific(void *ctxt
)
848 dispatch_queue_specific_queue_t dqsq
=
849 (dispatch_queue_specific_queue_t
)_dispatch_queue_get_current();
850 dispatch_queue_specific_t dqs
;
852 TAILQ_FOREACH(dqs
, &dqsq
->dqsq_contexts
, dqs_list
) {
853 if (dqs
->dqs_key
== key
) {
854 *ctxtp
= dqs
->dqs_ctxt
;
863 dispatch_queue_get_specific(dispatch_queue_t dq
, const void *key
)
865 if (slowpath(!key
)) {
870 if (fastpath(dq
->dq_specific_q
)) {
872 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
, _dispatch_queue_get_specific
);
879 dispatch_get_specific(const void *key
)
881 if (slowpath(!key
)) {
885 dispatch_queue_t dq
= _dispatch_queue_get_current();
887 while (slowpath(dq
)) {
888 if (slowpath(dq
->dq_specific_q
)) {
890 dispatch_sync_f(dq
->dq_specific_q
, &ctxt
,
891 _dispatch_queue_get_specific
);
900 #pragma mark dispatch_queue_debug
903 _dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
905 dispatch_queue_t target
= dq
->do_targetq
;
906 return snprintf(buf
, bufsiz
, "target = %s[%p], width = 0x%x, "
907 "running = 0x%x, barrier = %d ", target
? target
->dq_label
: "",
908 target
, dq
->dq_width
/ 2, dq
->dq_running
/ 2, dq
->dq_running
& 1);
912 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
915 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
917 offset
+= _dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
918 offset
+= _dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
919 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "}");
925 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
927 dispatch_debug(dq
, "%s", str
);
929 _dispatch_log("queue[NULL]: %s", str
);
934 #if DISPATCH_PERF_MON
935 static OSSpinLock _dispatch_stats_lock
;
936 static size_t _dispatch_bad_ratio
;
939 uint64_t count_total
;
940 uint64_t thread_total
;
941 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
944 _dispatch_queue_merge_stats(uint64_t start
)
946 uint64_t avg
, delta
= _dispatch_absolute_time() - start
;
947 unsigned long count
, bucket
;
949 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
950 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
959 // 64-bit counters on 32-bit require a lock or a queue
960 OSSpinLockLock(&_dispatch_stats_lock
);
962 _dispatch_stats
[bucket
].time_total
+= delta
;
963 _dispatch_stats
[bucket
].count_total
+= count
;
964 _dispatch_stats
[bucket
].thread_total
++;
966 OSSpinLockUnlock(&_dispatch_stats_lock
);
971 #pragma mark dispatch_continuation_t
973 static malloc_zone_t
*_dispatch_ccache_zone
;
976 _dispatch_ccache_init(void *context DISPATCH_UNUSED
)
978 _dispatch_ccache_zone
= malloc_create_zone(0, 0);
979 dispatch_assert(_dispatch_ccache_zone
);
980 malloc_set_zone_name(_dispatch_ccache_zone
, "DispatchContinuations");
983 static dispatch_continuation_t
984 _dispatch_continuation_alloc_from_heap(void)
986 static dispatch_once_t pred
;
987 dispatch_continuation_t dc
;
989 dispatch_once_f(&pred
, NULL
, _dispatch_ccache_init
);
991 while (!(dc
= fastpath(malloc_zone_calloc(_dispatch_ccache_zone
, 1,
992 ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc
)))))) {
999 DISPATCH_ALWAYS_INLINE
1000 static inline dispatch_continuation_t
1001 _dispatch_continuation_alloc_cacheonly(void)
1003 dispatch_continuation_t dc
;
1004 dc
= fastpath(_dispatch_thread_getspecific(dispatch_cache_key
));
1006 _dispatch_thread_setspecific(dispatch_cache_key
, dc
->do_next
);
1012 _dispatch_force_cache_cleanup(void)
1014 dispatch_continuation_t dc
;
1015 dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1017 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1018 _dispatch_cache_cleanup(dc
);
1024 _dispatch_cache_cleanup(void *value
)
1026 dispatch_continuation_t dc
, next_dc
= value
;
1028 while ((dc
= next_dc
)) {
1029 next_dc
= dc
->do_next
;
1030 malloc_zone_free(_dispatch_ccache_zone
, dc
);
1034 DISPATCH_ALWAYS_INLINE
1036 _dispatch_continuation_free(dispatch_continuation_t dc
)
1038 dispatch_continuation_t prev_dc
;
1039 prev_dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1040 dc
->do_next
= prev_dc
;
1041 _dispatch_thread_setspecific(dispatch_cache_key
, dc
);
1044 DISPATCH_ALWAYS_INLINE_NDEBUG
1046 _dispatch_continuation_redirect(dispatch_queue_t dq
, dispatch_object_t dou
)
1048 dispatch_continuation_t dc
= dou
._dc
;
1050 _dispatch_trace_continuation_pop(dq
, dou
);
1051 (void)dispatch_atomic_add2o(dq
, dq_running
, 2);
1052 if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
1053 (long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
) {
1054 dispatch_atomic_barrier();
1055 _dispatch_thread_semaphore_signal(
1056 (_dispatch_thread_semaphore_t
)dc
->dc_ctxt
);
1058 _dispatch_async_f_redirect(dq
, dc
);
1062 DISPATCH_ALWAYS_INLINE_NDEBUG
1064 _dispatch_continuation_pop(dispatch_object_t dou
)
1066 dispatch_continuation_t dc
= dou
._dc
;
1067 dispatch_group_t dg
;
1069 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou
);
1070 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
1071 return _dispatch_queue_invoke(dou
._dq
);
1074 // Add the item back to the cache before calling the function. This
1075 // allows the 'hot' continuation to be used for a quick callback.
1077 // The ccache version is per-thread.
1078 // Therefore, the object has not been reused yet.
1079 // This generates better assembly.
1080 if ((long)dc
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
1081 _dispatch_continuation_free(dc
);
1083 if ((long)dc
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
1088 _dispatch_client_callout(dc
->dc_ctxt
, dc
->dc_func
);
1090 dispatch_group_leave(dg
);
1091 _dispatch_release(dg
);
1096 #pragma mark dispatch_barrier_async
1100 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1101 dispatch_function_t func
)
1103 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1105 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1109 _dispatch_queue_push(dq
, dc
);
1114 dispatch_barrier_async_f(dispatch_queue_t dq
, void *ctxt
,
1115 dispatch_function_t func
)
1117 dispatch_continuation_t dc
;
1119 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1121 return _dispatch_barrier_async_f_slow(dq
, ctxt
, func
);
1124 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
1128 _dispatch_queue_push(dq
, dc
);
1133 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
1135 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
),
1136 _dispatch_call_block_and_release
);
1141 #pragma mark dispatch_async
1144 _dispatch_async_f_redirect_invoke(void *_ctxt
)
1146 struct dispatch_continuation_s
*dc
= _ctxt
;
1147 struct dispatch_continuation_s
*other_dc
= dc
->dc_data
[1];
1148 dispatch_queue_t old_dq
, dq
= dc
->dc_data
[0], rq
;
1150 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1151 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1152 _dispatch_continuation_pop(other_dc
);
1153 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1155 rq
= dq
->do_targetq
;
1156 while (slowpath(rq
->do_targetq
) && rq
!= old_dq
) {
1157 if (dispatch_atomic_sub2o(rq
, dq_running
, 2) == 0) {
1158 _dispatch_wakeup(rq
);
1160 rq
= rq
->do_targetq
;
1163 if (dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0) {
1164 _dispatch_wakeup(dq
);
1166 _dispatch_release(dq
);
1171 _dispatch_async_f2_slow(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1173 _dispatch_wakeup(dq
);
1174 _dispatch_queue_push(dq
, dc
);
1179 _dispatch_async_f_redirect(dispatch_queue_t dq
,
1180 dispatch_continuation_t other_dc
)
1182 dispatch_continuation_t dc
;
1183 dispatch_queue_t rq
;
1185 _dispatch_retain(dq
);
1187 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1189 dc
= _dispatch_continuation_alloc_from_heap();
1192 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1193 dc
->dc_func
= _dispatch_async_f_redirect_invoke
;
1195 dc
->dc_data
[0] = dq
;
1196 dc
->dc_data
[1] = other_dc
;
1198 // Find the queue to redirect to
1199 rq
= dq
->do_targetq
;
1200 while (slowpath(rq
->do_targetq
)) {
1203 if (slowpath(rq
->dq_items_tail
) ||
1204 slowpath(DISPATCH_OBJECT_SUSPENDED(rq
)) ||
1205 slowpath(rq
->dq_width
== 1)) {
1208 running
= dispatch_atomic_add2o(rq
, dq_running
, 2) - 2;
1209 if (slowpath(running
& 1) || slowpath(running
+ 2 > rq
->dq_width
)) {
1210 if (slowpath(dispatch_atomic_sub2o(rq
, dq_running
, 2) == 0)) {
1211 return _dispatch_async_f2_slow(rq
, dc
);
1215 rq
= rq
->do_targetq
;
1217 _dispatch_queue_push(rq
, dc
);
1222 _dispatch_async_f2(dispatch_queue_t dq
, dispatch_continuation_t dc
)
1228 if (slowpath(dq
->dq_items_tail
)
1229 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1232 running
= dispatch_atomic_add2o(dq
, dq_running
, 2);
1233 if (slowpath(running
> dq
->dq_width
)) {
1234 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1235 return _dispatch_async_f2_slow(dq
, dc
);
1239 locked
= running
& 1;
1240 if (fastpath(!locked
)) {
1241 return _dispatch_async_f_redirect(dq
, dc
);
1243 locked
= dispatch_atomic_sub2o(dq
, dq_running
, 2) & 1;
1244 // We might get lucky and find that the barrier has ended by now
1247 _dispatch_queue_push(dq
, dc
);
1252 _dispatch_async_f_slow(dispatch_queue_t dq
, void *ctxt
,
1253 dispatch_function_t func
)
1255 dispatch_continuation_t dc
= _dispatch_continuation_alloc_from_heap();
1257 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1261 // No fastpath/slowpath hint because we simply don't know
1262 if (dq
->do_targetq
) {
1263 return _dispatch_async_f2(dq
, dc
);
1266 _dispatch_queue_push(dq
, dc
);
1271 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1273 dispatch_continuation_t dc
;
1275 // No fastpath/slowpath hint because we simply don't know
1276 if (dq
->dq_width
== 1) {
1277 return dispatch_barrier_async_f(dq
, ctxt
, func
);
1280 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1282 return _dispatch_async_f_slow(dq
, ctxt
, func
);
1285 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1289 // No fastpath/slowpath hint because we simply don't know
1290 if (dq
->do_targetq
) {
1291 return _dispatch_async_f2(dq
, dc
);
1294 _dispatch_queue_push(dq
, dc
);
1299 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
1301 dispatch_async_f(dq
, _dispatch_Block_copy(work
),
1302 _dispatch_call_block_and_release
);
1307 #pragma mark dispatch_group_async
1311 dispatch_group_async_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
1312 dispatch_function_t func
)
1314 dispatch_continuation_t dc
;
1316 _dispatch_retain(dg
);
1317 dispatch_group_enter(dg
);
1319 dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
1321 dc
= _dispatch_continuation_alloc_from_heap();
1324 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_GROUP_BIT
);
1329 // No fastpath/slowpath hint because we simply don't know
1330 if (dq
->dq_width
!= 1 && dq
->do_targetq
) {
1331 return _dispatch_async_f2(dq
, dc
);
1334 _dispatch_queue_push(dq
, dc
);
1339 dispatch_group_async(dispatch_group_t dg
, dispatch_queue_t dq
,
1340 dispatch_block_t db
)
1342 dispatch_group_async_f(dg
, dq
, _dispatch_Block_copy(db
),
1343 _dispatch_call_block_and_release
);
1348 #pragma mark dispatch_function_invoke
1350 DISPATCH_ALWAYS_INLINE
1352 _dispatch_function_invoke(dispatch_queue_t dq
, void *ctxt
,
1353 dispatch_function_t func
)
1355 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1356 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1357 _dispatch_client_callout(ctxt
, func
);
1358 _dispatch_workitem_inc();
1359 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1362 struct dispatch_function_recurse_s
{
1363 dispatch_queue_t dfr_dq
;
1365 dispatch_function_t dfr_func
;
1369 _dispatch_function_recurse_invoke(void *ctxt
)
1371 struct dispatch_function_recurse_s
*dfr
= ctxt
;
1372 _dispatch_function_invoke(dfr
->dfr_dq
, dfr
->dfr_ctxt
, dfr
->dfr_func
);
1375 DISPATCH_ALWAYS_INLINE
1377 _dispatch_function_recurse(dispatch_queue_t dq
, void *ctxt
,
1378 dispatch_function_t func
)
1380 struct dispatch_function_recurse_s dfr
= {
1385 dispatch_sync_f(dq
->do_targetq
, &dfr
, _dispatch_function_recurse_invoke
);
1389 #pragma mark dispatch_barrier_sync
1391 struct dispatch_barrier_sync_slow_s
{
1392 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s
);
1395 struct dispatch_barrier_sync_slow2_s
{
1396 dispatch_queue_t dbss2_dq
;
1397 #if DISPATCH_COCOA_COMPAT
1398 dispatch_function_t dbss2_func
;
1401 _dispatch_thread_semaphore_t dbss2_sema
;
1405 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
1407 struct dispatch_barrier_sync_slow2_s
*dbss2
= ctxt
;
1409 dispatch_assert(dbss2
->dbss2_dq
== _dispatch_queue_get_current());
1410 #if DISPATCH_COCOA_COMPAT
1411 // When the main queue is bound to the main thread
1412 if (dbss2
->dbss2_dq
== &_dispatch_main_q
&& pthread_main_np()) {
1413 dbss2
->dbss2_func(dbss2
->dbss2_ctxt
);
1414 dbss2
->dbss2_func
= NULL
;
1415 dispatch_atomic_barrier();
1416 _dispatch_thread_semaphore_signal(dbss2
->dbss2_sema
);
1420 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, do_suspend_cnt
,
1421 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1422 // rdar://9032024 running lock must be held until sync_f_slow returns
1423 (void)dispatch_atomic_add2o(dbss2
->dbss2_dq
, dq_running
, 2);
1424 dispatch_atomic_barrier();
1425 _dispatch_thread_semaphore_signal(dbss2
->dbss2_sema
);
1430 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
,
1431 dispatch_function_t func
)
1433 // It's preferred to execute synchronous blocks on the current thread
1434 // due to thread-local side effects, garbage collection, etc. However,
1435 // blocks submitted to the main thread MUST be run on the main thread
1437 struct dispatch_barrier_sync_slow2_s dbss2
= {
1439 #if DISPATCH_COCOA_COMPAT
1443 .dbss2_sema
= _dispatch_get_thread_semaphore(),
1445 struct dispatch_barrier_sync_slow_s dbss
= {
1446 .do_vtable
= (void *)(DISPATCH_OBJ_BARRIER_BIT
|
1447 DISPATCH_OBJ_SYNC_SLOW_BIT
),
1448 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
1451 _dispatch_queue_push(dq
, (void *)&dbss
);
1453 _dispatch_thread_semaphore_wait(dbss2
.dbss2_sema
);
1454 _dispatch_put_thread_semaphore(dbss2
.dbss2_sema
);
1456 #if DISPATCH_COCOA_COMPAT
1457 // Main queue bound to main thread
1458 if (dbss2
.dbss2_func
== NULL
) {
1462 dispatch_atomic_acquire_barrier();
1463 if (slowpath(dq
->do_targetq
) && slowpath(dq
->do_targetq
->do_targetq
)) {
1464 _dispatch_function_recurse(dq
, ctxt
, func
);
1466 _dispatch_function_invoke(dq
, ctxt
, func
);
1468 dispatch_atomic_release_barrier();
1469 if (fastpath(dq
->do_suspend_cnt
< 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL
)) {
1470 // rdar://problem/8290662 "lock transfer"
1471 // ensure drain of current barrier sync has finished
1472 while (slowpath(dq
->dq_running
> 2)) {
1473 _dispatch_hardware_pause();
1475 _dispatch_thread_semaphore_t sema
;
1476 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1478 _dispatch_thread_semaphore_signal(sema
);
1482 (void)dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
1483 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1484 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1485 _dispatch_wakeup(dq
);
1491 _dispatch_barrier_sync_f2(dispatch_queue_t dq
)
1493 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
1494 // rdar://problem/8290662 "lock transfer"
1495 _dispatch_thread_semaphore_t sema
;
1496 sema
= _dispatch_queue_drain_one_barrier_sync(dq
);
1498 (void)dispatch_atomic_add2o(dq
, do_suspend_cnt
,
1499 DISPATCH_OBJECT_SUSPEND_INTERVAL
);
1500 // rdar://9032024 running lock must be held until sync_f_slow
1501 // returns: increment by 2 and decrement by 1
1502 (void)dispatch_atomic_inc2o(dq
, dq_running
);
1503 _dispatch_thread_semaphore_signal(sema
);
1507 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1508 _dispatch_wakeup(dq
);
1514 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1515 dispatch_function_t func
)
1517 dispatch_atomic_acquire_barrier();
1518 _dispatch_function_invoke(dq
, ctxt
, func
);
1519 dispatch_atomic_release_barrier();
1520 if (slowpath(dq
->dq_items_tail
)) {
1521 return _dispatch_barrier_sync_f2(dq
);
1523 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1524 _dispatch_wakeup(dq
);
1530 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1531 dispatch_function_t func
)
1533 dispatch_atomic_acquire_barrier();
1534 _dispatch_function_recurse(dq
, ctxt
, func
);
1535 dispatch_atomic_release_barrier();
1536 if (slowpath(dq
->dq_items_tail
)) {
1537 return _dispatch_barrier_sync_f2(dq
);
1539 if (slowpath(dispatch_atomic_dec2o(dq
, dq_running
) == 0)) {
1540 _dispatch_wakeup(dq
);
1546 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
,
1547 dispatch_function_t func
)
1549 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1550 // 2) the queue is not suspended
1551 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1552 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1554 if (slowpath(!dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1))) {
1555 // global queues and main queue bound to main thread always falls into
1557 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
1559 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1560 return _dispatch_barrier_sync_f_recurse(dq
, ctxt
, func
);
1562 _dispatch_barrier_sync_f_invoke(dq
, ctxt
, func
);
1566 #if DISPATCH_COCOA_COMPAT
1569 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1571 // Blocks submitted to the main queue MUST be run on the main thread,
1572 // therefore under GC we must Block_copy in order to notify the thread-local
1573 // garbage collector that the objects are transferring to the main thread
1574 // rdar://problem/7176237&7181849&7458685
1575 if (dispatch_begin_thread_4GC
) {
1576 dispatch_block_t block
= _dispatch_Block_copy(work
);
1577 return dispatch_barrier_sync_f(dq
, block
,
1578 _dispatch_call_block_and_release
);
1580 struct Block_basic
*bb
= (void *)work
;
1581 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1586 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
1588 #if DISPATCH_COCOA_COMPAT
1589 if (slowpath(dq
== &_dispatch_main_q
)) {
1590 return _dispatch_barrier_sync_slow(dq
, work
);
1593 struct Block_basic
*bb
= (void *)work
;
1594 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1599 #pragma mark dispatch_sync
1603 _dispatch_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1605 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
1606 struct dispatch_sync_slow_s
{
1607 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s
);
1609 .do_vtable
= (void*)DISPATCH_OBJ_SYNC_SLOW_BIT
,
1610 .dc_ctxt
= (void*)sema
,
1612 _dispatch_queue_push(dq
, (void *)&dss
);
1614 _dispatch_thread_semaphore_wait(sema
);
1615 _dispatch_put_thread_semaphore(sema
);
1617 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1618 _dispatch_function_recurse(dq
, ctxt
, func
);
1620 _dispatch_function_invoke(dq
, ctxt
, func
);
1622 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1623 _dispatch_wakeup(dq
);
1629 _dispatch_sync_f_slow2(dispatch_queue_t dq
, void *ctxt
,
1630 dispatch_function_t func
)
1632 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1633 _dispatch_wakeup(dq
);
1635 _dispatch_sync_f_slow(dq
, ctxt
, func
);
1640 _dispatch_sync_f_invoke(dispatch_queue_t dq
, void *ctxt
,
1641 dispatch_function_t func
)
1643 _dispatch_function_invoke(dq
, ctxt
, func
);
1644 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1645 _dispatch_wakeup(dq
);
1651 _dispatch_sync_f_recurse(dispatch_queue_t dq
, void *ctxt
,
1652 dispatch_function_t func
)
1654 _dispatch_function_recurse(dq
, ctxt
, func
);
1655 if (slowpath(dispatch_atomic_sub2o(dq
, dq_running
, 2) == 0)) {
1656 _dispatch_wakeup(dq
);
1662 _dispatch_sync_f2(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1664 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1665 // 2) the queue is not suspended
1666 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))){
1667 return _dispatch_sync_f_slow(dq
, ctxt
, func
);
1669 if (slowpath(dispatch_atomic_add2o(dq
, dq_running
, 2) & 1)) {
1670 return _dispatch_sync_f_slow2(dq
, ctxt
, func
);
1672 if (slowpath(dq
->do_targetq
->do_targetq
)) {
1673 return _dispatch_sync_f_recurse(dq
, ctxt
, func
);
1675 _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1680 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
1682 if (fastpath(dq
->dq_width
== 1)) {
1683 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
1685 if (slowpath(!dq
->do_targetq
)) {
1686 // the global root queues do not need strict ordering
1687 (void)dispatch_atomic_add2o(dq
, dq_running
, 2);
1688 return _dispatch_sync_f_invoke(dq
, ctxt
, func
);
1690 _dispatch_sync_f2(dq
, ctxt
, func
);
1694 #if DISPATCH_COCOA_COMPAT
1697 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
1699 // Blocks submitted to the main queue MUST be run on the main thread,
1700 // therefore under GC we must Block_copy in order to notify the thread-local
1701 // garbage collector that the objects are transferring to the main thread
1702 // rdar://problem/7176237&7181849&7458685
1703 if (dispatch_begin_thread_4GC
) {
1704 dispatch_block_t block
= _dispatch_Block_copy(work
);
1705 return dispatch_sync_f(dq
, block
, _dispatch_call_block_and_release
);
1707 struct Block_basic
*bb
= (void *)work
;
1708 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1713 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
1715 #if DISPATCH_COCOA_COMPAT
1716 if (slowpath(dq
== &_dispatch_main_q
)) {
1717 return _dispatch_sync_slow(dq
, work
);
1720 struct Block_basic
*bb
= (void *)work
;
1721 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
1726 #pragma mark dispatch_after
1728 struct _dispatch_after_time_s
{
1730 void (*datc_func
)(void *);
1731 dispatch_source_t ds
;
1735 _dispatch_after_timer_callback(void *ctxt
)
1737 struct _dispatch_after_time_s
*datc
= ctxt
;
1739 dispatch_assert(datc
->datc_func
);
1740 _dispatch_client_callout(datc
->datc_ctxt
, datc
->datc_func
);
1742 dispatch_source_t ds
= datc
->ds
;
1745 dispatch_source_cancel(ds
); // Needed until 7287561 gets integrated
1746 dispatch_release(ds
);
1751 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
,
1752 dispatch_function_t func
)
1755 struct _dispatch_after_time_s
*datc
= NULL
;
1756 dispatch_source_t ds
;
1758 if (when
== DISPATCH_TIME_FOREVER
) {
1760 DISPATCH_CLIENT_CRASH(
1761 "dispatch_after_f() called with 'when' == infinity");
1766 // this function can and should be optimized to not use a dispatch source
1767 delta
= _dispatch_timeout(when
);
1769 return dispatch_async_f(queue
, ctxt
, func
);
1771 // on successful creation, source owns malloc-ed context (which it frees in
1772 // the event handler)
1773 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, queue
);
1774 dispatch_assert(ds
);
1776 datc
= malloc(sizeof(*datc
));
1777 dispatch_assert(datc
);
1778 datc
->datc_ctxt
= ctxt
;
1779 datc
->datc_func
= func
;
1782 dispatch_set_context(ds
, datc
);
1783 dispatch_source_set_event_handler_f(ds
, _dispatch_after_timer_callback
);
1784 dispatch_source_set_timer(ds
, when
, DISPATCH_TIME_FOREVER
, 0);
1785 dispatch_resume(ds
);
1790 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
,
1791 dispatch_block_t work
)
1793 // test before the copy of the block
1794 if (when
== DISPATCH_TIME_FOREVER
) {
1796 DISPATCH_CLIENT_CRASH(
1797 "dispatch_after() called with 'when' == infinity");
1801 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
),
1802 _dispatch_call_block_and_release
);
1807 #pragma mark dispatch_wakeup
1811 _dispatch_queue_push_list_slow(dispatch_queue_t dq
,
1812 struct dispatch_object_s
*obj
)
1814 // The queue must be retained before dq_items_head is written in order
1815 // to ensure that the reference is still valid when _dispatch_wakeup is
1816 // called. Otherwise, if preempted between the assignment to
1817 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
1818 // queue may release the last reference to the queue when invoked by
1819 // _dispatch_queue_drain. <rdar://problem/6932776>
1820 _dispatch_retain(dq
);
1821 dq
->dq_items_head
= obj
;
1822 _dispatch_wakeup(dq
);
1823 _dispatch_release(dq
);
1826 // 6618342 Contact the team that owns the Instrument DTrace probe before
1827 // renaming this symbol
1829 _dispatch_wakeup(dispatch_object_t dou
)
1831 dispatch_queue_t tq
;
1833 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
1836 if (!dx_probe(dou
._do
) && !dou
._dq
->dq_items_tail
) {
1840 // _dispatch_source_invoke() relies on this testing the whole suspend count
1841 // word, not just the lock bit. In other words, no point taking the lock
1842 // if the source is suspended or canceled.
1843 if (!dispatch_atomic_cmpxchg2o(dou
._do
, do_suspend_cnt
, 0,
1844 DISPATCH_OBJECT_SUSPEND_LOCK
)) {
1845 #if DISPATCH_COCOA_COMPAT
1846 if (dou
._dq
== &_dispatch_main_q
) {
1847 _dispatch_queue_wakeup_main();
1852 _dispatch_retain(dou
._do
);
1853 tq
= dou
._do
->do_targetq
;
1854 _dispatch_queue_push(tq
, dou
._do
);
1855 return tq
; // libdispatch does not need this, but the Instrument DTrace
1859 #if DISPATCH_COCOA_COMPAT
1862 _dispatch_queue_wakeup_main(void)
1866 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
1867 _dispatch_main_q_port_init
);
1869 kr
= _dispatch_send_wakeup_main_thread(main_q_port
, 0);
1872 case MACH_SEND_TIMEOUT
:
1873 case MACH_SEND_TIMED_OUT
:
1874 case MACH_SEND_INVALID_DEST
:
1877 (void)dispatch_assume_zero(kr
);
1881 _dispatch_safe_fork
= false;
1886 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
1888 static dispatch_once_t pred
;
1889 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1892 if (!dq
->dq_items_tail
) {
1896 _dispatch_safe_fork
= false;
1898 dispatch_debug_queue(dq
, __PRETTY_FUNCTION__
);
1900 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
1902 #if HAVE_PTHREAD_WORKQUEUES
1903 #if DISPATCH_ENABLE_THREAD_POOL
1904 if (qc
->dgq_kworkqueue
)
1907 if (dispatch_atomic_cmpxchg2o(qc
, dgq_pending
, 0, 1)) {
1908 pthread_workitem_handle_t wh
;
1909 unsigned int gen_cnt
;
1910 _dispatch_debug("requesting new worker thread");
1912 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
,
1913 _dispatch_worker_thread2
, dq
, &wh
, &gen_cnt
);
1914 (void)dispatch_assume_zero(r
);
1916 _dispatch_debug("work thread request still pending on global "
1921 #endif // HAVE_PTHREAD_WORKQUEUES
1922 #if DISPATCH_ENABLE_THREAD_POOL
1923 if (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
1930 t_count
= qc
->dgq_thread_pool_size
;
1932 _dispatch_debug("The thread pool is full: %p", dq
);
1935 } while (!dispatch_atomic_cmpxchg2o(qc
, dgq_thread_pool_size
, t_count
,
1938 while ((r
= pthread_create(&pthr
, NULL
, _dispatch_worker_thread
, dq
))) {
1940 (void)dispatch_assume_zero(r
);
1944 r
= pthread_detach(pthr
);
1945 (void)dispatch_assume_zero(r
);
1946 #endif // DISPATCH_ENABLE_THREAD_POOL
1953 #pragma mark dispatch_queue_drain
1955 // 6618342 Contact the team that owns the Instrument DTrace probe before
1956 // renaming this symbol
1959 _dispatch_queue_invoke(dispatch_queue_t dq
)
1961 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) &&
1962 fastpath(dispatch_atomic_cmpxchg2o(dq
, dq_running
, 0, 1))) {
1963 dispatch_atomic_acquire_barrier();
1964 dispatch_queue_t otq
= dq
->do_targetq
, tq
= NULL
;
1965 _dispatch_queue_drain(dq
);
1966 if (dq
->do_vtable
->do_invoke
) {
1967 // Assume that object invoke checks it is executing on correct queue
1969 } else if (slowpath(otq
!= dq
->do_targetq
)) {
1970 // An item on the queue changed the target queue
1971 tq
= dq
->do_targetq
;
1973 // We do not need to check the result.
1974 // When the suspend-count lock is dropped, then the check will happen.
1975 dispatch_atomic_release_barrier();
1976 (void)dispatch_atomic_dec2o(dq
, dq_running
);
1978 return _dispatch_queue_push(tq
, dq
);
1982 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1983 if (!dispatch_atomic_sub2o(dq
, do_suspend_cnt
,
1984 DISPATCH_OBJECT_SUSPEND_LOCK
)) {
1985 if (dq
->dq_running
== 0) {
1986 _dispatch_wakeup(dq
); // verify that the queue is idle
1989 _dispatch_release(dq
); // added when the queue is put on the list
1993 _dispatch_queue_drain(dispatch_queue_t dq
)
1995 dispatch_queue_t orig_tq
, old_dq
;
1996 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1997 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
1999 // Continue draining sources after target queue change rdar://8928171
2000 bool check_tq
= (dx_type(dq
) != DISPATCH_SOURCE_KEVENT_TYPE
);
2002 orig_tq
= dq
->do_targetq
;
2004 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2005 //dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
2007 while (dq
->dq_items_tail
) {
2008 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2009 _dispatch_hardware_pause();
2011 dq
->dq_items_head
= NULL
;
2013 next_dc
= fastpath(dc
->do_next
);
2015 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2016 // Enqueue is TIGHTLY controlled, we won't wait long.
2017 while (!(next_dc
= fastpath(dc
->do_next
))) {
2018 _dispatch_hardware_pause();
2021 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
2024 if (dq
->dq_running
> dq
->dq_width
) {
2027 if (slowpath(orig_tq
!= dq
->do_targetq
) && check_tq
) {
2030 if (fastpath(dq
->dq_width
== 1)) {
2031 _dispatch_continuation_pop(dc
);
2032 _dispatch_workitem_inc();
2033 } else if (!DISPATCH_OBJ_IS_VTABLE(dc
) &&
2034 (long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
2035 if (dq
->dq_running
> 1) {
2038 _dispatch_continuation_pop(dc
);
2039 _dispatch_workitem_inc();
2041 _dispatch_continuation_redirect(dq
, dc
);
2043 } while ((dc
= next_dc
));
2047 // if this is not a complete drain, we must undo some things
2049 // 'dc' must NOT be "popped"
2050 // 'dc' might be the last item
2052 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, NULL
, dc
)) {
2053 // wait for enqueue slow path to finish
2054 while (!(next_dc
= fastpath(dq
->dq_items_head
))) {
2055 _dispatch_hardware_pause();
2057 dc
->do_next
= next_dc
;
2059 dq
->dq_items_head
= dc
;
2062 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2066 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
)
2068 #if DISPATCH_PERF_MON
2069 uint64_t start
= _dispatch_absolute_time();
2071 _dispatch_queue_drain(dq
);
2072 #if DISPATCH_PERF_MON
2073 _dispatch_queue_merge_stats(start
);
2075 _dispatch_force_cache_cleanup();
2078 #if DISPATCH_COCOA_COMPAT
2080 _dispatch_main_queue_drain(void)
2082 dispatch_queue_t dq
= &_dispatch_main_q
;
2083 if (!dq
->dq_items_tail
) {
2086 struct dispatch_main_queue_drain_marker_s
{
2087 DISPATCH_CONTINUATION_HEADER(dispatch_main_queue_drain_marker_s
);
2091 struct dispatch_object_s
*dmarker
= (void*)&marker
;
2092 _dispatch_queue_push_notrace(dq
, dmarker
);
2094 #if DISPATCH_PERF_MON
2095 uint64_t start
= _dispatch_absolute_time();
2097 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
2098 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2100 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
2101 while (dq
->dq_items_tail
) {
2102 while (!(dc
= fastpath(dq
->dq_items_head
))) {
2103 _dispatch_hardware_pause();
2105 dq
->dq_items_head
= NULL
;
2107 next_dc
= fastpath(dc
->do_next
);
2109 !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2110 // Enqueue is TIGHTLY controlled, we won't wait long.
2111 while (!(next_dc
= fastpath(dc
->do_next
))) {
2112 _dispatch_hardware_pause();
2115 if (dc
== dmarker
) {
2117 dq
->dq_items_head
= next_dc
;
2118 _dispatch_queue_wakeup_main();
2122 _dispatch_continuation_pop(dc
);
2123 _dispatch_workitem_inc();
2124 } while ((dc
= next_dc
));
2126 dispatch_assert(dc
); // did not encounter marker
2129 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
2130 #if DISPATCH_PERF_MON
2131 _dispatch_queue_merge_stats(start
);
2133 _dispatch_force_cache_cleanup();
2137 DISPATCH_ALWAYS_INLINE_NDEBUG
2138 static inline _dispatch_thread_semaphore_t
2139 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq
)
2141 // rdar://problem/8290662 "lock transfer"
2142 struct dispatch_object_s
*dc
, *next_dc
;
2144 // queue is locked, or suspended and not being drained
2145 dc
= dq
->dq_items_head
;
2146 if (slowpath(!dc
) || DISPATCH_OBJ_IS_VTABLE(dc
) || ((long)dc
->do_vtable
&
2147 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) !=
2148 (DISPATCH_OBJ_BARRIER_BIT
| DISPATCH_OBJ_SYNC_SLOW_BIT
)) {
2151 // dequeue dc, it is a barrier sync
2152 next_dc
= fastpath(dc
->do_next
);
2153 dq
->dq_items_head
= next_dc
;
2154 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, dc
, NULL
)) {
2155 // Enqueue is TIGHTLY controlled, we won't wait long.
2156 while (!(next_dc
= fastpath(dc
->do_next
))) {
2157 _dispatch_hardware_pause();
2159 dq
->dq_items_head
= next_dc
;
2161 _dispatch_trace_continuation_pop(dq
, dc
);
2162 _dispatch_workitem_inc();
2164 struct dispatch_barrier_sync_slow_s
*dbssp
= (void *)dc
;
2165 struct dispatch_barrier_sync_slow2_s
*dbss2p
= dbssp
->dc_ctxt
;
2166 return dbss2p
->dbss2_sema
;
2169 static struct dispatch_object_s
*
2170 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
2172 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
2174 // The mediator value acts both as a "lock" and a signal
2175 head
= dispatch_atomic_xchg2o(dq
, dq_items_head
, mediator
);
2177 if (slowpath(head
== NULL
)) {
2178 // The first xchg on the tail will tell the enqueueing thread that it
2179 // is safe to blindly write out to the head pointer. A cmpxchg honors
2181 (void)dispatch_atomic_cmpxchg2o(dq
, dq_items_head
, mediator
, NULL
);
2182 _dispatch_debug("no work on global work queue");
2186 if (slowpath(head
== mediator
)) {
2187 // This thread lost the race for ownership of the queue.
2189 // The ratio of work to libdispatch overhead must be bad. This
2190 // scenario implies that there are too many threads in the pool.
2191 // Create a new pending thread and then exit this thread.
2192 // The kernel will grant a new thread when the load subsides.
2193 _dispatch_debug("Contention on queue: %p", dq
);
2194 _dispatch_queue_wakeup_global(dq
);
2195 #if DISPATCH_PERF_MON
2196 dispatch_atomic_inc(&_dispatch_bad_ratio
);
2201 // Restore the head pointer to a sane value before returning.
2202 // If 'next' is NULL, then this item _might_ be the last item.
2203 next
= fastpath(head
->do_next
);
2205 if (slowpath(!next
)) {
2206 dq
->dq_items_head
= NULL
;
2208 if (dispatch_atomic_cmpxchg2o(dq
, dq_items_tail
, head
, NULL
)) {
2209 // both head and tail are NULL now
2213 // There must be a next item now. This thread won't wait long.
2214 while (!(next
= head
->do_next
)) {
2215 _dispatch_hardware_pause();
2219 dq
->dq_items_head
= next
;
2220 _dispatch_queue_wakeup_global(dq
);
2226 #pragma mark dispatch_worker_thread
2228 // 6618342 Contact the team that owns the Instrument DTrace probe before
2229 // renaming this symbol
2231 _dispatch_worker_thread2(void *context
)
2233 struct dispatch_object_s
*item
;
2234 dispatch_queue_t dq
= context
;
2235 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2238 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
2239 DISPATCH_CRASH("Premature thread recycling");
2242 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
2243 qc
->dgq_pending
= 0;
2245 #if DISPATCH_COCOA_COMPAT
2246 (void)dispatch_atomic_inc(&_dispatch_worker_threads
);
2247 // ensure that high-level memory management techniques do not leak/crash
2248 if (dispatch_begin_thread_4GC
) {
2249 dispatch_begin_thread_4GC();
2251 void *pool
= _dispatch_begin_NSAutoReleasePool();
2254 #if DISPATCH_PERF_MON
2255 uint64_t start
= _dispatch_absolute_time();
2257 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
2258 _dispatch_continuation_pop(item
);
2260 #if DISPATCH_PERF_MON
2261 _dispatch_queue_merge_stats(start
);
2264 #if DISPATCH_COCOA_COMPAT
2265 _dispatch_end_NSAutoReleasePool(pool
);
2266 dispatch_end_thread_4GC();
2267 if (!dispatch_atomic_dec(&_dispatch_worker_threads
) &&
2268 dispatch_no_worker_threads_4GC
) {
2269 dispatch_no_worker_threads_4GC();
2273 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
2275 _dispatch_force_cache_cleanup();
2279 #if DISPATCH_ENABLE_THREAD_POOL
2280 // 6618342 Contact the team that owns the Instrument DTrace probe before
2281 // renaming this symbol
2283 _dispatch_worker_thread(void *context
)
2285 dispatch_queue_t dq
= context
;
2286 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
2290 // workaround tweaks the kernel workqueue does for us
2291 r
= sigfillset(&mask
);
2292 (void)dispatch_assume_zero(r
);
2293 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
2294 (void)dispatch_assume_zero(r
);
2297 _dispatch_worker_thread2(context
);
2298 // we use 65 seconds in case there are any timers that run once a minute
2299 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
,
2300 dispatch_time(0, 65ull * NSEC_PER_SEC
)) == 0);
2302 (void)dispatch_atomic_inc2o(qc
, dgq_thread_pool_size
);
2303 if (dq
->dq_items_tail
) {
2304 _dispatch_queue_wakeup_global(dq
);
2311 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
2315 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2317 r
= sigdelset(set
, SIGILL
);
2318 (void)dispatch_assume_zero(r
);
2319 r
= sigdelset(set
, SIGTRAP
);
2320 (void)dispatch_assume_zero(r
);
2321 #if HAVE_DECL_SIGEMT
2322 r
= sigdelset(set
, SIGEMT
);
2323 (void)dispatch_assume_zero(r
);
2325 r
= sigdelset(set
, SIGFPE
);
2326 (void)dispatch_assume_zero(r
);
2327 r
= sigdelset(set
, SIGBUS
);
2328 (void)dispatch_assume_zero(r
);
2329 r
= sigdelset(set
, SIGSEGV
);
2330 (void)dispatch_assume_zero(r
);
2331 r
= sigdelset(set
, SIGSYS
);
2332 (void)dispatch_assume_zero(r
);
2333 r
= sigdelset(set
, SIGPIPE
);
2334 (void)dispatch_assume_zero(r
);
2336 return pthread_sigmask(how
, set
, oset
);
2341 #pragma mark dispatch_main_queue
2343 static bool _dispatch_program_is_probably_callback_driven
;
2345 #if DISPATCH_COCOA_COMPAT
2347 _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED
)
2351 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
2353 DISPATCH_VERIFY_MIG(kr
);
2354 (void)dispatch_assume_zero(kr
);
2355 kr
= mach_port_insert_right(mach_task_self(), main_q_port
, main_q_port
,
2356 MACH_MSG_TYPE_MAKE_SEND
);
2357 DISPATCH_VERIFY_MIG(kr
);
2358 (void)dispatch_assume_zero(kr
);
2360 _dispatch_program_is_probably_callback_driven
= true;
2361 _dispatch_safe_fork
= false;
2365 _dispatch_get_main_queue_port_4CF(void)
2367 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
2368 _dispatch_main_q_port_init
);
2372 static bool main_q_is_draining
;
2374 // 6618342 Contact the team that owns the Instrument DTrace probe before
2375 // renaming this symbol
2378 _dispatch_queue_set_mainq_drain_state(bool arg
)
2380 main_q_is_draining
= arg
;
2384 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg DISPATCH_UNUSED
)
2386 if (main_q_is_draining
) {
2389 _dispatch_queue_set_mainq_drain_state(true);
2390 _dispatch_main_queue_drain();
2391 _dispatch_queue_set_mainq_drain_state(false);
2399 #if HAVE_PTHREAD_MAIN_NP
2400 if (pthread_main_np()) {
2402 _dispatch_program_is_probably_callback_driven
= true;
2404 DISPATCH_CRASH("pthread_exit() returned");
2405 #if HAVE_PTHREAD_MAIN_NP
2407 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
2411 DISPATCH_NOINLINE DISPATCH_NORETURN
2413 _dispatch_sigsuspend(void)
2415 static const sigset_t mask
;
2417 #if DISPATCH_COCOA_COMPAT
2418 // Do not count the signal handling thread as a worker thread
2419 (void)dispatch_atomic_dec(&_dispatch_worker_threads
);
2428 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED
)
2430 // never returns, so burn bridges behind us
2431 _dispatch_clear_stack(0);
2432 _dispatch_sigsuspend();
2437 _dispatch_queue_cleanup2(void)
2439 (void)dispatch_atomic_dec(&_dispatch_main_q
.dq_running
);
2441 if (dispatch_atomic_sub(&_dispatch_main_q
.do_suspend_cnt
,
2442 DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
2443 _dispatch_wakeup(&_dispatch_main_q
);
2446 // overload the "probably" variable to mean that dispatch_main() or
2447 // similar non-POSIX API was called
2448 // this has to run before the DISPATCH_COCOA_COMPAT below
2449 if (_dispatch_program_is_probably_callback_driven
) {
2450 dispatch_async_f(_dispatch_get_root_queue(0, false), NULL
,
2451 _dispatch_sig_thread
);
2452 sleep(1); // workaround 6778970
2455 #if DISPATCH_COCOA_COMPAT
2456 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
,
2457 _dispatch_main_q_port_init
);
2459 mach_port_t mp
= main_q_port
;
2465 kr
= mach_port_deallocate(mach_task_self(), mp
);
2466 DISPATCH_VERIFY_MIG(kr
);
2467 (void)dispatch_assume_zero(kr
);
2468 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
,
2470 DISPATCH_VERIFY_MIG(kr
);
2471 (void)dispatch_assume_zero(kr
);
2477 _dispatch_queue_cleanup(void *ctxt
)
2479 if (ctxt
== &_dispatch_main_q
) {
2480 return _dispatch_queue_cleanup2();
2482 // POSIX defines that destructors are only called if 'ctxt' is non-null
2483 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
2487 #pragma mark dispatch_manager_queue
2489 static unsigned int _dispatch_select_workaround
;
2490 static fd_set _dispatch_rfds
;
2491 static fd_set _dispatch_wfds
;
2492 static void **_dispatch_rfd_ptrs
;
2493 static void **_dispatch_wfd_ptrs
;
2495 static int _dispatch_kq
;
2498 _dispatch_get_kq_init(void *context DISPATCH_UNUSED
)
2500 static const struct kevent kev
= {
2502 .filter
= EVFILT_USER
,
2503 .flags
= EV_ADD
|EV_CLEAR
,
2506 _dispatch_kq
= kqueue();
2508 _dispatch_safe_fork
= false;
2510 if (_dispatch_kq
== -1) {
2511 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2512 "probably out of file descriptors");
2513 } else if (dispatch_assume(_dispatch_kq
< FD_SETSIZE
)) {
2514 // in case we fall back to select()
2515 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
2518 (void)dispatch_assume_zero(kevent(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
));
2520 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
2524 _dispatch_get_kq(void)
2526 static dispatch_once_t pred
;
2528 dispatch_once_f(&pred
, NULL
, _dispatch_get_kq_init
);
2530 return _dispatch_kq
;
2534 _dispatch_update_kq(const struct kevent
*kev
)
2536 struct kevent kev_copy
= *kev
;
2537 // This ensures we don't get a pending kevent back while registering
2539 kev_copy
.flags
|= EV_RECEIPT
;
2541 if (_dispatch_select_workaround
&& (kev_copy
.flags
& EV_DELETE
)) {
2542 // Only executed on manager queue
2543 switch (kev_copy
.filter
) {
2545 if (kev_copy
.ident
< FD_SETSIZE
&&
2546 FD_ISSET((int)kev_copy
.ident
, &_dispatch_rfds
)) {
2547 FD_CLR((int)kev_copy
.ident
, &_dispatch_rfds
);
2548 _dispatch_rfd_ptrs
[kev_copy
.ident
] = 0;
2549 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2554 if (kev_copy
.ident
< FD_SETSIZE
&&
2555 FD_ISSET((int)kev_copy
.ident
, &_dispatch_wfds
)) {
2556 FD_CLR((int)kev_copy
.ident
, &_dispatch_wfds
);
2557 _dispatch_wfd_ptrs
[kev_copy
.ident
] = 0;
2558 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2567 int rval
= kevent(_dispatch_get_kq(), &kev_copy
, 1, &kev_copy
, 1, NULL
);
2569 // If we fail to register with kevents, for other reasons aside from
2570 // changelist elements.
2571 (void)dispatch_assume_zero(errno
);
2572 //kev_copy.flags |= EV_ERROR;
2573 //kev_copy.data = error;
2577 // The following select workaround only applies to adding kevents
2578 if ((kev
->flags
& (EV_DISABLE
|EV_DELETE
)) ||
2579 !(kev
->flags
& (EV_ADD
|EV_ENABLE
))) {
2583 // Only executed on manager queue
2584 switch (kev_copy
.data
) {
2590 // If an error occurred while registering with kevent, and it was
2591 // because of a kevent changelist processing && the kevent involved
2592 // either doing a read or write, it would indicate we were trying
2593 // to register a /dev/* port; fall back to select
2594 switch (kev_copy
.filter
) {
2596 if (dispatch_assume(kev_copy
.ident
< FD_SETSIZE
)) {
2597 if (!_dispatch_rfd_ptrs
) {
2598 _dispatch_rfd_ptrs
= calloc(FD_SETSIZE
, sizeof(void*));
2600 _dispatch_rfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2601 FD_SET((int)kev_copy
.ident
, &_dispatch_rfds
);
2602 (void)dispatch_atomic_inc(&_dispatch_select_workaround
);
2603 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2604 (int)kev_copy
.ident
, (long)kev_copy
.data
);
2609 if (dispatch_assume(kev_copy
.ident
< FD_SETSIZE
)) {
2610 if (!_dispatch_wfd_ptrs
) {
2611 _dispatch_wfd_ptrs
= calloc(FD_SETSIZE
, sizeof(void*));
2613 _dispatch_wfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2614 FD_SET((int)kev_copy
.ident
, &_dispatch_wfds
);
2615 (void)dispatch_atomic_inc(&_dispatch_select_workaround
);
2616 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2617 (int)kev_copy
.ident
, (long)kev_copy
.data
);
2622 // kevent error, _dispatch_source_merge_kevent() will handle it
2623 _dispatch_source_drain_kevent(&kev_copy
);
2628 return kev_copy
.data
;
2632 _dispatch_mgr_wakeup(dispatch_queue_t dq
)
2634 static const struct kevent kev
= {
2636 .filter
= EVFILT_USER
,
2637 .fflags
= NOTE_TRIGGER
,
2640 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq
);
2642 _dispatch_update_kq(&kev
);
2648 _dispatch_mgr_thread2(struct kevent
*kev
, size_t cnt
)
2652 for (i
= 0; i
< cnt
; i
++) {
2653 // EVFILT_USER isn't used by sources
2654 if (kev
[i
].filter
== EVFILT_USER
) {
2655 // If _dispatch_mgr_thread2() ever is changed to return to the
2656 // caller, then this should become _dispatch_queue_drain()
2657 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q
);
2659 _dispatch_source_drain_kevent(&kev
[i
]);
2664 #if DISPATCH_USE_VM_PRESSURE && DISPATCH_USE_MALLOC_VM_PRESSURE_SOURCE
2665 // VM Pressure source for malloc <rdar://problem/7805121>
2666 static dispatch_source_t _dispatch_malloc_vm_pressure_source
;
2669 _dispatch_malloc_vm_pressure_handler(void *context DISPATCH_UNUSED
)
2671 malloc_zone_pressure_relief(0,0);
2675 _dispatch_malloc_vm_pressure_setup(void)
2677 _dispatch_malloc_vm_pressure_source
= dispatch_source_create(
2678 DISPATCH_SOURCE_TYPE_VM
, 0, DISPATCH_VM_PRESSURE
,
2679 _dispatch_get_root_queue(0, true));
2680 dispatch_source_set_event_handler_f(_dispatch_malloc_vm_pressure_source
,
2681 _dispatch_malloc_vm_pressure_handler
);
2682 dispatch_resume(_dispatch_malloc_vm_pressure_source
);
2685 #define _dispatch_malloc_vm_pressure_setup()
2688 DISPATCH_NOINLINE DISPATCH_NORETURN
2690 _dispatch_mgr_invoke(void)
2692 static const struct timespec timeout_immediately
= { 0, 0 };
2693 struct timespec timeout
;
2694 const struct timespec
*timeoutp
;
2695 struct timeval sel_timeout
, *sel_timeoutp
;
2696 fd_set tmp_rfds
, tmp_wfds
;
2697 struct kevent kev
[1];
2698 int k_cnt
, err
, i
, r
;
2700 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_mgr_q
);
2701 #if DISPATCH_COCOA_COMPAT
2702 // Do not count the manager thread as a worker thread
2703 (void)dispatch_atomic_dec(&_dispatch_worker_threads
);
2705 _dispatch_malloc_vm_pressure_setup();
2708 _dispatch_run_timers();
2710 timeoutp
= _dispatch_get_next_timer_fire(&timeout
);
2712 if (_dispatch_select_workaround
) {
2713 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
2714 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
2716 sel_timeout
.tv_sec
= timeoutp
->tv_sec
;
2717 sel_timeout
.tv_usec
= (typeof(sel_timeout
.tv_usec
))
2718 (timeoutp
->tv_nsec
/ 1000u);
2719 sel_timeoutp
= &sel_timeout
;
2721 sel_timeoutp
= NULL
;
2724 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
, sel_timeoutp
);
2729 (void)dispatch_assume_zero(err
);
2733 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2734 if (i
== _dispatch_kq
) {
2737 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
,
2745 if (FD_ISSET(i
, &_dispatch_rfds
)) {
2746 FD_CLR(i
, &_dispatch_rfds
);
2747 _dispatch_rfd_ptrs
[i
] = 0;
2748 (void)dispatch_atomic_dec(
2749 &_dispatch_select_workaround
);
2751 if (FD_ISSET(i
, &_dispatch_wfds
)) {
2752 FD_CLR(i
, &_dispatch_wfds
);
2753 _dispatch_wfd_ptrs
[i
] = 0;
2754 (void)dispatch_atomic_dec(
2755 &_dispatch_select_workaround
);
2763 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2764 if (i
== _dispatch_kq
) {
2767 if (FD_ISSET(i
, &tmp_rfds
)) {
2768 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISABLE
2769 EV_SET(&kev
[0], i
, EVFILT_READ
,
2770 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2771 _dispatch_rfd_ptrs
[i
]);
2772 _dispatch_rfd_ptrs
[i
] = 0;
2773 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2774 _dispatch_mgr_thread2(kev
, 1);
2776 if (FD_ISSET(i
, &tmp_wfds
)) {
2777 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISABLE
2778 EV_SET(&kev
[0], i
, EVFILT_WRITE
,
2779 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2780 _dispatch_wfd_ptrs
[i
]);
2781 _dispatch_wfd_ptrs
[i
] = 0;
2782 (void)dispatch_atomic_dec(&_dispatch_select_workaround
);
2783 _dispatch_mgr_thread2(kev
, 1);
2788 timeoutp
= &timeout_immediately
;
2791 k_cnt
= kevent(_dispatch_kq
, NULL
, 0, kev
, sizeof(kev
) / sizeof(kev
[0]),
2798 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2801 (void)dispatch_assume_zero(err
);
2805 _dispatch_mgr_thread2(kev
, (size_t)k_cnt
);
2808 _dispatch_force_cache_cleanup();
2815 static dispatch_queue_t
2816 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
)
2818 // never returns, so burn bridges behind us & clear stack 2k ahead
2819 _dispatch_clear_stack(2048);
2820 _dispatch_mgr_invoke();