2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
30 dummy_function_r0(void)
35 static bool _dispatch_select_workaround
;
36 static fd_set _dispatch_rfds
;
37 static fd_set _dispatch_wfds
;
38 static void *_dispatch_rfd_ptrs
[FD_SETSIZE
];
39 static void *_dispatch_wfd_ptrs
[FD_SETSIZE
];
42 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
44 .do_vtable
= &_dispatch_semaphore_vtable
,
45 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
46 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
49 .do_vtable
= &_dispatch_semaphore_vtable
,
50 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
51 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
54 .do_vtable
= &_dispatch_semaphore_vtable
,
55 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
56 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
59 .do_vtable
= &_dispatch_semaphore_vtable
,
60 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
61 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
64 .do_vtable
= &_dispatch_semaphore_vtable
,
65 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
66 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
69 .do_vtable
= &_dispatch_semaphore_vtable
,
70 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
71 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
75 static struct dispatch_queue_s _dispatch_root_queues
[];
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority
, bool overcommit
)
80 if (overcommit
) switch (priority
) {
81 case DISPATCH_QUEUE_PRIORITY_LOW
:
82 return &_dispatch_root_queues
[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
84 return &_dispatch_root_queues
[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH
:
86 return &_dispatch_root_queues
[5];
89 case DISPATCH_QUEUE_PRIORITY_LOW
:
90 return &_dispatch_root_queues
[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
92 return &_dispatch_root_queues
[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH
:
94 return &_dispatch_root_queues
[4];
102 _dispatch_Block_copy(dispatch_block_t db
)
104 dispatch_block_t rval
;
106 while (!(rval
= Block_copy(db
))) {
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
115 _dispatch_call_block_and_release(void *block
)
117 void (^b
)(void) = block
;
123 _dispatch_call_block_and_release2(void *block
, void *ctxt
)
125 void (^b
)(void*) = block
;
130 #endif /* __BLOCKS__ */
132 struct dispatch_queue_attr_vtable_s
{
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s
);
136 struct dispatch_queue_attr_s
{
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s
, dispatch_queue_attr_vtable_s
);
141 void* finalizer_ctxt
;
142 dispatch_queue_finalizer_function_t finalizer_func
;
145 unsigned long qa_flags
;
148 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq
);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq
);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
155 static struct dispatch_object_s
*_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
);
157 static bool _dispatch_program_is_probably_callback_driven
;
159 #if DISPATCH_COCOA_COMPAT
160 void (*dispatch_begin_thread_4GC
)(void) = dummy_function
;
161 void (*dispatch_end_thread_4GC
)(void) = dummy_function
;
162 void *(*_dispatch_begin_NSAutoReleasePool
)(void) = (void *)dummy_function
;
163 void (*_dispatch_end_NSAutoReleasePool
)(void *) = (void *)dummy_function
;
164 static void _dispatch_queue_wakeup_main(void);
166 static dispatch_once_t _dispatch_main_q_port_pred
;
167 static bool main_q_is_draining
;
168 static mach_port_t main_q_port
;
171 static void _dispatch_cache_cleanup2(void *value
);
172 static void _dispatch_force_cache_cleanup(void);
174 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable
= {
175 .do_type
= DISPATCH_QUEUE_TYPE
,
177 .do_dispose
= _dispatch_queue_dispose
,
178 .do_invoke
= (void *)dummy_function_r0
,
179 .do_probe
= (void *)dummy_function_r0
,
180 .do_debug
= dispatch_queue_debug
,
183 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable
= {
184 .do_type
= DISPATCH_QUEUE_GLOBAL_TYPE
,
185 .do_kind
= "global-queue",
186 .do_debug
= dispatch_queue_debug
,
187 .do_probe
= _dispatch_queue_wakeup_global
,
190 #define MAX_THREAD_COUNT 255
192 struct dispatch_root_queue_context_s
{
193 pthread_workqueue_t dgq_kworkqueue
;
194 uint32_t dgq_pending
;
195 uint32_t dgq_thread_pool_size
;
196 dispatch_semaphore_t dgq_thread_mediator
;
199 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
200 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
202 .dgq_thread_mediator
= &_dispatch_thread_mediator
[0],
203 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
206 .dgq_thread_mediator
= &_dispatch_thread_mediator
[1],
207 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
210 .dgq_thread_mediator
= &_dispatch_thread_mediator
[2],
211 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
214 .dgq_thread_mediator
= &_dispatch_thread_mediator
[3],
215 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
218 .dgq_thread_mediator
= &_dispatch_thread_mediator
[4],
219 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
222 .dgq_thread_mediator
= &_dispatch_thread_mediator
[5],
223 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
227 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 static struct dispatch_queue_s _dispatch_root_queues
[] = {
231 .do_vtable
= &_dispatch_queue_root_vtable
,
232 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
233 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
234 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
235 .do_ctxt
= &_dispatch_root_queue_contexts
[0],
237 .dq_label
= "com.apple.root.low-priority",
239 .dq_width
= UINT32_MAX
,
243 .do_vtable
= &_dispatch_queue_root_vtable
,
244 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
245 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
246 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
247 .do_ctxt
= &_dispatch_root_queue_contexts
[1],
249 .dq_label
= "com.apple.root.low-overcommit-priority",
251 .dq_width
= UINT32_MAX
,
255 .do_vtable
= &_dispatch_queue_root_vtable
,
256 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
257 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
258 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
259 .do_ctxt
= &_dispatch_root_queue_contexts
[2],
261 .dq_label
= "com.apple.root.default-priority",
263 .dq_width
= UINT32_MAX
,
267 .do_vtable
= &_dispatch_queue_root_vtable
,
268 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
269 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
270 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
271 .do_ctxt
= &_dispatch_root_queue_contexts
[3],
273 .dq_label
= "com.apple.root.default-overcommit-priority",
275 .dq_width
= UINT32_MAX
,
279 .do_vtable
= &_dispatch_queue_root_vtable
,
280 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
281 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
282 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
283 .do_ctxt
= &_dispatch_root_queue_contexts
[4],
285 .dq_label
= "com.apple.root.high-priority",
287 .dq_width
= UINT32_MAX
,
291 .do_vtable
= &_dispatch_queue_root_vtable
,
292 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
293 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
294 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
295 .do_ctxt
= &_dispatch_root_queue_contexts
[5],
297 .dq_label
= "com.apple.root.high-overcommit-priority",
299 .dq_width
= UINT32_MAX
,
304 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
305 struct dispatch_queue_s _dispatch_main_q
= {
306 .do_vtable
= &_dispatch_queue_vtable
,
307 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
308 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
309 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
310 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
/ 2],
312 .dq_label
= "com.apple.main-thread",
318 #if DISPATCH_PERF_MON
319 static OSSpinLock _dispatch_stats_lock
;
320 static size_t _dispatch_bad_ratio
;
323 uint64_t count_total
;
324 uint64_t thread_total
;
325 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
326 static void _dispatch_queue_merge_stats(uint64_t start
);
329 static void *_dispatch_worker_thread(void *context
);
330 static void _dispatch_worker_thread2(void *context
);
332 malloc_zone_t
*_dispatch_ccache_zone
;
335 _dispatch_continuation_free(dispatch_continuation_t dc
)
337 dispatch_continuation_t prev_dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
338 dc
->do_next
= prev_dc
;
339 _dispatch_thread_setspecific(dispatch_cache_key
, dc
);
343 _dispatch_continuation_pop(dispatch_object_t dou
)
345 dispatch_continuation_t dc
= dou
._dc
;
348 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
349 return _dispatch_queue_invoke(dou
._dq
);
352 // Add the item back to the cache before calling the function. This
353 // allows the 'hot' continuation to be used for a quick callback.
355 // The ccache version is per-thread.
356 // Therefore, the object has not been reused yet.
357 // This generates better assembly.
358 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
359 _dispatch_continuation_free(dc
);
361 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
366 dc
->dc_func(dc
->dc_ctxt
);
368 dispatch_group_leave(dg
);
369 _dispatch_release(dg
);
373 struct dispatch_object_s
*
374 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
376 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
378 // The mediator value acts both as a "lock" and a signal
379 head
= dispatch_atomic_xchg(&dq
->dq_items_head
, mediator
);
381 if (slowpath(head
== NULL
)) {
382 // The first xchg on the tail will tell the enqueueing thread that it
383 // is safe to blindly write out to the head pointer. A cmpxchg honors
385 dispatch_atomic_cmpxchg(&dq
->dq_items_head
, mediator
, NULL
);
386 _dispatch_debug("no work on global work queue");
390 if (slowpath(head
== mediator
)) {
391 // This thread lost the race for ownership of the queue.
393 // The ratio of work to libdispatch overhead must be bad. This
394 // scenario implies that there are too many threads in the pool.
395 // Create a new pending thread and then exit this thread.
396 // The kernel will grant a new thread when the load subsides.
397 _dispatch_debug("Contention on queue: %p", dq
);
398 _dispatch_queue_wakeup_global(dq
);
399 #if DISPATCH_PERF_MON
400 dispatch_atomic_inc(&_dispatch_bad_ratio
);
405 // Restore the head pointer to a sane value before returning.
406 // If 'next' is NULL, then this item _might_ be the last item.
407 next
= fastpath(head
->do_next
);
409 if (slowpath(!next
)) {
410 dq
->dq_items_head
= NULL
;
412 if (dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, head
, NULL
)) {
413 // both head and tail are NULL now
417 // There must be a next item now. This thread won't wait long.
418 while (!(next
= head
->do_next
)) {
419 _dispatch_hardware_pause();
423 dq
->dq_items_head
= next
;
424 _dispatch_queue_wakeup_global(dq
);
430 dispatch_get_current_queue(void)
432 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
435 #undef dispatch_get_main_queue
436 __OSX_AVAILABLE_STARTING(__MAC_10_6
,__IPHONE_NA
)
437 dispatch_queue_t
dispatch_get_main_queue(void);
440 dispatch_get_main_queue(void)
442 return &_dispatch_main_q
;
444 #define dispatch_get_main_queue() (&_dispatch_main_q)
446 struct _dispatch_hw_config_s _dispatch_hw_config
;
449 _dispatch_queue_set_width_init(void)
451 size_t valsz
= sizeof(uint32_t);
454 sysctlbyname("hw.activecpu", &_dispatch_hw_config
.cc_max_active
, &valsz
, NULL
, 0);
455 dispatch_assume_zero(errno
);
456 dispatch_assume(valsz
== sizeof(uint32_t));
459 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config
.cc_max_logical
, &valsz
, NULL
, 0);
460 dispatch_assume_zero(errno
);
461 dispatch_assume(valsz
== sizeof(uint32_t));
464 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config
.cc_max_physical
, &valsz
, NULL
, 0);
465 dispatch_assume_zero(errno
);
466 dispatch_assume(valsz
== sizeof(uint32_t));
470 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
472 int w
= (int)width
; // intentional truncation
475 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
478 if (w
== 1 || w
== 0) {
485 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
486 tmp
= _dispatch_hw_config
.cc_max_physical
;
488 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
489 tmp
= _dispatch_hw_config
.cc_max_active
;
493 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
494 tmp
= _dispatch_hw_config
.cc_max_logical
;
497 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
498 dq
->dq_width
= tmp
* 2;
500 // XXX if the queue has items and the width is increased, we should try to wake the queue
507 // 4,5,6,7,8,9 - global queues
508 // we use 'xadd' on Intel, so the initial value == next assigned
509 static unsigned long _dispatch_queue_serial_numbers
= 10;
511 // Note to later developers: ensure that any initialization changes are
512 // made for statically allocated queues (i.e. _dispatch_main_q).
514 _dispatch_queue_init(dispatch_queue_t dq
)
516 dq
->do_vtable
= &_dispatch_queue_vtable
;
517 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
520 dq
->do_targetq
= _dispatch_get_root_queue(0, true);
523 dq
->dq_serialnum
= dispatch_atomic_inc(&_dispatch_queue_serial_numbers
) - 1;
527 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
536 label_len
= strlen(label
);
537 if (label_len
< (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1)) {
538 label_len
= (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1);
541 // XXX switch to malloc()
542 dq
= calloc(1ul, sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_MIN_LABEL_SIZE
+ label_len
+ 1);
547 _dispatch_queue_init(dq
);
548 strcpy(dq
->dq_label
, label
);
550 #ifndef DISPATCH_NO_LEGACY
551 if (slowpath(attr
)) {
552 dq
->do_targetq
= _dispatch_get_root_queue(attr
->qa_priority
, attr
->qa_flags
& DISPATCH_QUEUE_OVERCOMMIT
);
553 dq
->dq_finalizer_ctxt
= attr
->finalizer_ctxt
;
554 dq
->dq_finalizer_func
= attr
->finalizer_func
;
556 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
557 // if finalizer_ctxt is a Block, retain it.
558 dq
->dq_finalizer_ctxt
= Block_copy(dq
->dq_finalizer_ctxt
);
559 if (!(dq
->dq_finalizer_ctxt
)) {
574 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
576 _dispatch_queue_dispose(dispatch_queue_t dq
)
578 if (slowpath(dq
== _dispatch_queue_get_current())) {
579 DISPATCH_CRASH("Release of a queue by itself");
581 if (slowpath(dq
->dq_items_tail
)) {
582 DISPATCH_CRASH("Release of a queue while items are enqueued");
585 #ifndef DISPATCH_NO_LEGACY
586 if (dq
->dq_finalizer_func
) {
587 dq
->dq_finalizer_func(dq
->dq_finalizer_ctxt
, dq
);
591 // trash the tail queue so that use after free will crash
592 dq
->dq_items_tail
= (void *)0x200;
594 _dispatch_dispose(dq
);
599 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
601 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
603 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
605 dc
->dc_ctxt
= context
;
607 _dispatch_queue_push(dq
, dc
);
612 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
614 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
620 dispatch_barrier_async_f(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
622 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
625 return _dispatch_barrier_async_f_slow(dq
, context
, func
);
628 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
630 dc
->dc_ctxt
= context
;
632 _dispatch_queue_push(dq
, dc
);
637 _dispatch_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
639 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
641 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
643 dc
->dc_ctxt
= context
;
645 _dispatch_queue_push(dq
, dc
);
650 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
652 dispatch_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
658 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
660 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
662 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
663 // the "drain" function will do this test
666 return _dispatch_async_f_slow(dq
, ctxt
, func
);
669 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
673 _dispatch_queue_push(dq
, dc
);
676 struct dispatch_barrier_sync_slow2_s
{
677 dispatch_function_t dbss2_func
;
678 dispatch_function_t dbss2_ctxt
;
679 dispatch_semaphore_t dbss2_sema
;
683 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
685 struct dispatch_barrier_sync_slow2_s
*dbss2
= ctxt
;
687 dbss2
->dbss2_func(dbss2
->dbss2_ctxt
);
688 dispatch_semaphore_signal(dbss2
->dbss2_sema
);
693 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
695 struct dispatch_barrier_sync_slow2_s dbss2
= {
698 .dbss2_sema
= _dispatch_get_thread_semaphore(),
700 struct dispatch_barrier_sync_slow_s
{
701 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s
);
703 .do_vtable
= (void *)DISPATCH_OBJ_BARRIER_BIT
,
704 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
708 _dispatch_queue_push(dq
, (void *)&dbss
);
710 while (dispatch_semaphore_wait(dbss2
.dbss2_sema
, dispatch_time(0, 3ull * NSEC_PER_SEC
))) {
711 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
714 if (_dispatch_queue_trylock(dq
)) {
715 _dispatch_queue_drain(dq
);
716 _dispatch_queue_unlock(dq
);
719 _dispatch_put_thread_semaphore(dbss2
.dbss2_sema
);
724 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
726 struct Block_basic
*bb
= (void *)work
;
728 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
734 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
736 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
738 // 1) ensure that this thread hasn't enqueued anything ahead of this call
739 // 2) the queue is not suspended
740 // 3) the queue is not weird
741 if (slowpath(dq
->dq_items_tail
)
742 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
743 || slowpath(!_dispatch_queue_trylock(dq
))) {
744 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
747 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
749 _dispatch_workitem_inc();
750 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
751 _dispatch_queue_unlock(dq
);
755 _dispatch_sync_f_slow2(void *ctxt
)
757 dispatch_queue_t dq
= _dispatch_queue_get_current();
758 dispatch_atomic_add(&dq
->dq_running
, 2);
759 dispatch_semaphore_signal(ctxt
);
764 _dispatch_sync_f_slow(dispatch_queue_t dq
)
766 // the global root queues do not need strict ordering
767 if (dq
->do_targetq
== NULL
) {
768 dispatch_atomic_add(&dq
->dq_running
, 2);
772 struct dispatch_sync_slow_s
{
773 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s
);
776 .dc_func
= _dispatch_sync_f_slow2
,
777 .dc_ctxt
= _dispatch_get_thread_semaphore(),
780 // XXX FIXME -- concurrent queues can be come serial again
781 _dispatch_queue_push(dq
, (void *)&dss
);
783 dispatch_semaphore_wait(dss
.dc_ctxt
, DISPATCH_TIME_FOREVER
);
784 _dispatch_put_thread_semaphore(dss
.dc_ctxt
);
789 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
791 struct Block_basic
*bb
= (void *)work
;
792 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
798 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
800 typeof(dq
->dq_running
) prev_cnt
;
801 dispatch_queue_t old_dq
;
803 if (dq
->dq_width
== 1) {
804 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
807 // 1) ensure that this thread hasn't enqueued anything ahead of this call
808 // 2) the queue is not suspended
809 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
810 _dispatch_sync_f_slow(dq
);
812 prev_cnt
= dispatch_atomic_add(&dq
->dq_running
, 2) - 2;
814 if (slowpath(prev_cnt
& 1)) {
815 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
816 _dispatch_wakeup(dq
);
818 _dispatch_sync_f_slow(dq
);
822 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
823 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
825 _dispatch_workitem_inc();
826 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
828 if (slowpath(dispatch_atomic_sub(&dq
->dq_running
, 2) == 0)) {
829 _dispatch_wakeup(dq
);
834 dispatch_queue_get_label(dispatch_queue_t dq
)
839 #if DISPATCH_COCOA_COMPAT
841 _dispatch_main_q_port_init(void *ctxt
__attribute__((unused
)))
845 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &main_q_port
);
846 DISPATCH_VERIFY_MIG(kr
);
847 dispatch_assume_zero(kr
);
848 kr
= mach_port_insert_right(mach_task_self(), main_q_port
, main_q_port
, MACH_MSG_TYPE_MAKE_SEND
);
849 DISPATCH_VERIFY_MIG(kr
);
850 dispatch_assume_zero(kr
);
852 _dispatch_program_is_probably_callback_driven
= true;
853 _dispatch_safe_fork
= false;
856 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
859 _dispatch_queue_set_mainq_drain_state(bool arg
)
861 main_q_is_draining
= arg
;
868 if (pthread_main_np()) {
869 _dispatch_program_is_probably_callback_driven
= true;
871 DISPATCH_CRASH("pthread_exit() returned");
873 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
877 _dispatch_sigsuspend(void *ctxt
__attribute__((unused
)))
879 static const sigset_t mask
;
888 _dispatch_queue_cleanup2(void)
890 dispatch_atomic_dec(&_dispatch_main_q
.dq_running
);
892 if (dispatch_atomic_sub(&_dispatch_main_q
.do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
893 _dispatch_wakeup(&_dispatch_main_q
);
896 // overload the "probably" variable to mean that dispatch_main() or
897 // similar non-POSIX API was called
898 // this has to run before the DISPATCH_COCOA_COMPAT below
899 if (_dispatch_program_is_probably_callback_driven
) {
900 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL
, _dispatch_sigsuspend
);
901 sleep(1); // workaround 6778970
904 #if DISPATCH_COCOA_COMPAT
905 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
907 mach_port_t mp
= main_q_port
;
913 kr
= mach_port_deallocate(mach_task_self(), mp
);
914 DISPATCH_VERIFY_MIG(kr
);
915 dispatch_assume_zero(kr
);
916 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
917 DISPATCH_VERIFY_MIG(kr
);
918 dispatch_assume_zero(kr
);
924 dispatch_get_concurrent_queue(long pri
)
927 pri
= DISPATCH_QUEUE_PRIORITY_HIGH
;
928 } else if (pri
< 0) {
929 pri
= DISPATCH_QUEUE_PRIORITY_LOW
;
931 return _dispatch_get_root_queue(pri
, false);
935 _dispatch_queue_cleanup(void *ctxt
)
937 if (ctxt
== &_dispatch_main_q
) {
938 return _dispatch_queue_cleanup2();
940 // POSIX defines that destructors are only called if 'ctxt' is non-null
941 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
945 dispatch_get_global_queue(long priority
, unsigned long flags
)
947 if (flags
& ~DISPATCH_QUEUE_OVERCOMMIT
) {
950 return _dispatch_get_root_queue(priority
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
953 #define countof(x) (sizeof(x) / sizeof(x[0]))
955 libdispatch_init(void)
957 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 3);
958 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 6);
960 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
== -DISPATCH_QUEUE_PRIORITY_HIGH
);
961 dispatch_assert(countof(_dispatch_root_queues
) == DISPATCH_ROOT_QUEUE_COUNT
);
962 dispatch_assert(countof(_dispatch_thread_mediator
) == DISPATCH_ROOT_QUEUE_COUNT
);
963 dispatch_assert(countof(_dispatch_root_queue_contexts
) == DISPATCH_ROOT_QUEUE_COUNT
);
965 _dispatch_thread_key_init_np(dispatch_queue_key
, _dispatch_queue_cleanup
);
966 _dispatch_thread_key_init_np(dispatch_sema4_key
, (void (*)(void *))dispatch_release
); // use the extern release
967 _dispatch_thread_key_init_np(dispatch_cache_key
, _dispatch_cache_cleanup2
);
968 #if DISPATCH_PERF_MON
969 _dispatch_thread_key_init_np(dispatch_bcounter_key
, NULL
);
972 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
974 _dispatch_queue_set_width_init();
978 _dispatch_queue_unlock(dispatch_queue_t dq
)
980 if (slowpath(dispatch_atomic_dec(&dq
->dq_running
))) {
984 _dispatch_wakeup(dq
);
987 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
989 _dispatch_wakeup(dispatch_object_t dou
)
993 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
996 if (!dx_probe(dou
._do
) && !dou
._dq
->dq_items_tail
) {
1000 if (!_dispatch_trylock(dou
._do
)) {
1001 #if DISPATCH_COCOA_COMPAT
1002 if (dou
._dq
== &_dispatch_main_q
) {
1003 _dispatch_queue_wakeup_main();
1008 _dispatch_retain(dou
._do
);
1009 tq
= dou
._do
->do_targetq
;
1010 _dispatch_queue_push(tq
, dou
._do
);
1011 return tq
; // libdispatch doesn't need this, but the Instrument DTrace probe does
1014 #if DISPATCH_COCOA_COMPAT
1017 _dispatch_queue_wakeup_main(void)
1021 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1023 kr
= _dispatch_send_wakeup_main_thread(main_q_port
, 0);
1026 case MACH_SEND_TIMEOUT
:
1027 case MACH_SEND_TIMED_OUT
:
1028 case MACH_SEND_INVALID_DEST
:
1031 dispatch_assume_zero(kr
);
1035 _dispatch_safe_fork
= false;
1040 _dispatch_rootq2wq_pri(long idx
)
1042 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1046 return WORKQ_LOW_PRIOQUEUE
;
1050 return WORKQ_DEFAULT_PRIOQUEUE
;
1053 return WORKQ_HIGH_PRIOQUEUE
;
1061 _dispatch_root_queues_init(void *context
__attribute__((unused
)))
1063 bool disable_wq
= getenv("LIBDISPATCH_DISABLE_KWQ");
1064 pthread_workqueue_attr_t pwq_attr
;
1068 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
1069 dispatch_assume_zero(r
);
1071 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1072 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
, _dispatch_rootq2wq_pri(i
));
1073 dispatch_assume_zero(r
);
1074 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
, i
& 1);
1075 dispatch_assume_zero(r
);
1076 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1079 dispatch_root_queue_contexts
[i
].dgq_thread_pool_size
= _dispatch_hw_config
.cc_max_active
;
1084 if (disable_wq
|| (r
= pthread_workqueue_create_np(&_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
, &pwq_attr
))) {
1086 dispatch_assume_zero(r
);
1088 // override the default FIFO behavior for the pool semaphores
1089 kr
= semaphore_create(mach_task_self(), &_dispatch_thread_mediator
[i
].dsema_port
, SYNC_POLICY_LIFO
, 0);
1090 DISPATCH_VERIFY_MIG(kr
);
1091 dispatch_assume_zero(kr
);
1092 dispatch_assume(_dispatch_thread_mediator
[i
].dsema_port
);
1094 dispatch_assume(_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
);
1098 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
1099 dispatch_assume_zero(r
);
1103 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
1105 static dispatch_once_t pred
;
1106 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1107 pthread_workitem_handle_t wh
;
1108 unsigned int gen_cnt
;
1112 if (!dq
->dq_items_tail
) {
1116 _dispatch_safe_fork
= false;
1118 dispatch_debug_queue(dq
, __PRETTY_FUNCTION__
);
1120 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
1122 if (qc
->dgq_kworkqueue
) {
1123 if (dispatch_atomic_cmpxchg(&qc
->dgq_pending
, 0, 1)) {
1124 _dispatch_debug("requesting new worker thread");
1126 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
, _dispatch_worker_thread2
, dq
, &wh
, &gen_cnt
);
1127 dispatch_assume_zero(r
);
1129 _dispatch_debug("work thread request still pending on global queue: %p", dq
);
1134 if (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
1139 t_count
= qc
->dgq_thread_pool_size
;
1141 _dispatch_debug("The thread pool is full: %p", dq
);
1144 } while (!dispatch_atomic_cmpxchg(&qc
->dgq_thread_pool_size
, t_count
, t_count
- 1));
1146 while ((r
= pthread_create(&pthr
, NULL
, _dispatch_worker_thread
, dq
))) {
1148 dispatch_assume_zero(r
);
1152 r
= pthread_detach(pthr
);
1153 dispatch_assume_zero(r
);
1160 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
)
1162 #if DISPATCH_PERF_MON
1163 uint64_t start
= mach_absolute_time();
1165 _dispatch_queue_drain(dq
);
1166 #if DISPATCH_PERF_MON
1167 _dispatch_queue_merge_stats(start
);
1169 _dispatch_force_cache_cleanup();
1172 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1175 _dispatch_queue_invoke(dispatch_queue_t dq
)
1177 dispatch_queue_t tq
= dq
->do_targetq
;
1179 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) && fastpath(_dispatch_queue_trylock(dq
))) {
1180 _dispatch_queue_drain(dq
);
1181 if (tq
== dq
->do_targetq
) {
1184 tq
= dq
->do_targetq
;
1186 // We do not need to check the result.
1187 // When the suspend-count lock is dropped, then the check will happen.
1188 dispatch_atomic_dec(&dq
->dq_running
);
1190 return _dispatch_queue_push(tq
, dq
);
1194 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1195 if (dispatch_atomic_sub(&dq
->do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
1196 if (dq
->dq_running
== 0) {
1197 _dispatch_wakeup(dq
); // verify that the queue is idle
1200 _dispatch_release(dq
); // added when the queue is put on the list
1203 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1205 _dispatch_set_target_queue2(void *ctxt
)
1207 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
1209 prev_dq
= dq
->do_targetq
;
1210 dq
->do_targetq
= ctxt
;
1211 _dispatch_release(prev_dq
);
1215 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
1217 if (slowpath(dou
._do
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1220 // NOTE: we test for NULL target queues internally to detect root queues
1221 // therefore, if the retain crashes due to a bad input, that is OK
1222 _dispatch_retain(dq
);
1223 dispatch_barrier_async_f(dou
._dq
, dq
, _dispatch_set_target_queue2
);
1227 _dispatch_async_f_redirect2(void *_ctxt
)
1229 struct dispatch_continuation_s
*dc
= _ctxt
;
1230 struct dispatch_continuation_s
*other_dc
= dc
->dc_data
[1];
1231 dispatch_queue_t old_dq
, dq
= dc
->dc_data
[0];
1233 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1234 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1235 _dispatch_continuation_pop(other_dc
);
1236 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1238 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
1239 _dispatch_wakeup(dq
);
1241 _dispatch_release(dq
);
1245 _dispatch_async_f_redirect(dispatch_queue_t dq
, struct dispatch_object_s
*other_dc
)
1247 dispatch_continuation_t dc
= (void *)other_dc
;
1248 dispatch_queue_t root_dq
= dq
;
1250 if (dc
->dc_func
== _dispatch_sync_f_slow2
) {
1251 return dc
->dc_func(dc
->dc_ctxt
);
1254 dispatch_atomic_add(&dq
->dq_running
, 2);
1255 _dispatch_retain(dq
);
1257 dc
= _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1259 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1260 dc
->dc_func
= _dispatch_async_f_redirect2
;
1262 dc
->dc_data
[0] = dq
;
1263 dc
->dc_data
[1] = other_dc
;
1266 root_dq
= root_dq
->do_targetq
;
1267 } while (root_dq
->do_targetq
);
1269 _dispatch_queue_push(root_dq
, dc
);
1274 _dispatch_queue_drain(dispatch_queue_t dq
)
1276 dispatch_queue_t orig_tq
, old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1277 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
1279 orig_tq
= dq
->do_targetq
;
1281 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1283 while (dq
->dq_items_tail
) {
1284 while (!fastpath(dq
->dq_items_head
)) {
1285 _dispatch_hardware_pause();
1288 dc
= dq
->dq_items_head
;
1289 dq
->dq_items_head
= NULL
;
1292 // Enqueue is TIGHTLY controlled, we won't wait long.
1294 next_dc
= fastpath(dc
->do_next
);
1295 } while (!next_dc
&& !dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, dc
, NULL
));
1296 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
1299 if (dq
->dq_running
> dq
->dq_width
) {
1302 if (orig_tq
!= dq
->do_targetq
) {
1305 if (fastpath(dq
->dq_width
== 1)) {
1306 _dispatch_continuation_pop(dc
);
1307 _dispatch_workitem_inc();
1308 } else if ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
1309 if (dq
->dq_running
> 1) {
1312 _dispatch_continuation_pop(dc
);
1313 _dispatch_workitem_inc();
1315 _dispatch_async_f_redirect(dq
, dc
);
1317 } while ((dc
= next_dc
));
1321 // if this is not a complete drain, we must undo some things
1323 // 'dc' must NOT be "popped"
1324 // 'dc' might be the last item
1325 if (next_dc
|| dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, NULL
, dc
)) {
1326 dq
->dq_items_head
= dc
;
1328 while (!(next_dc
= dq
->dq_items_head
)) {
1329 _dispatch_hardware_pause();
1331 dq
->dq_items_head
= dc
;
1332 dc
->do_next
= next_dc
;
1336 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1339 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1341 _dispatch_worker_thread(void *context
)
1343 dispatch_queue_t dq
= context
;
1344 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1348 // workaround tweaks the kernel workqueue does for us
1349 r
= sigfillset(&mask
);
1350 dispatch_assume_zero(r
);
1351 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
1352 dispatch_assume_zero(r
);
1355 _dispatch_worker_thread2(context
);
1356 // we use 65 seconds in case there are any timers that run once a minute
1357 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
, dispatch_time(0, 65ull * NSEC_PER_SEC
)) == 0);
1359 dispatch_atomic_inc(&qc
->dgq_thread_pool_size
);
1360 if (dq
->dq_items_tail
) {
1361 _dispatch_queue_wakeup_global(dq
);
1367 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1369 _dispatch_worker_thread2(void *context
)
1371 struct dispatch_object_s
*item
;
1372 dispatch_queue_t dq
= context
;
1373 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1375 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
1376 DISPATCH_CRASH("Premature thread recycling");
1379 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1380 qc
->dgq_pending
= 0;
1382 #if DISPATCH_COCOA_COMPAT
1383 // ensure that high-level memory management techniques do not leak/crash
1384 dispatch_begin_thread_4GC();
1385 void *pool
= _dispatch_begin_NSAutoReleasePool();
1388 #if DISPATCH_PERF_MON
1389 uint64_t start
= mach_absolute_time();
1391 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
1392 _dispatch_continuation_pop(item
);
1394 #if DISPATCH_PERF_MON
1395 _dispatch_queue_merge_stats(start
);
1398 #if DISPATCH_COCOA_COMPAT
1399 _dispatch_end_NSAutoReleasePool(pool
);
1400 dispatch_end_thread_4GC();
1403 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
1405 _dispatch_force_cache_cleanup();
1408 #if DISPATCH_PERF_MON
1410 _dispatch_queue_merge_stats(uint64_t start
)
1412 uint64_t avg
, delta
= mach_absolute_time() - start
;
1413 unsigned long count
, bucket
;
1415 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1416 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1419 avg
= delta
/ count
;
1420 bucket
= flsll(avg
);
1425 // 64-bit counters on 32-bit require a lock or a queue
1426 OSSpinLockLock(&_dispatch_stats_lock
);
1428 _dispatch_stats
[bucket
].time_total
+= delta
;
1429 _dispatch_stats
[bucket
].count_total
+= count
;
1430 _dispatch_stats
[bucket
].thread_total
++;
1432 OSSpinLockUnlock(&_dispatch_stats_lock
);
1437 dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1439 return snprintf(buf
, bufsiz
, "parent = %p ", dq
->do_targetq
);
1443 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1446 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ", dq
->dq_label
, dq
);
1447 offset
+= dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1448 offset
+= dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1449 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "}");
1455 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1457 dispatch_debug(dq
, "%s", str
);
1459 _dispatch_log("queue[NULL]: %s", str
);
1464 #if DISPATCH_COCOA_COMPAT
1466 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg
__attribute__((unused
)))
1468 if (main_q_is_draining
) {
1471 _dispatch_queue_set_mainq_drain_state(true);
1472 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q
);
1473 _dispatch_queue_set_mainq_drain_state(false);
1477 _dispatch_get_main_queue_port_4CF(void)
1479 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1485 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr
)
1487 dispatch_queue_attr_set_finalizer(attr
, NULL
);
1488 _dispatch_dispose(attr
);
1491 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable
= {
1492 .do_type
= DISPATCH_QUEUE_ATTR_TYPE
,
1493 .do_kind
= "queue-attr",
1494 .do_dispose
= dispatch_queue_attr_dispose
,
1497 dispatch_queue_attr_t
1498 dispatch_queue_attr_create(void)
1500 dispatch_queue_attr_t a
= calloc(1, sizeof(struct dispatch_queue_attr_s
));
1503 a
->do_vtable
= &dispatch_queue_attr_vtable
;
1504 a
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1507 a
->do_targetq
= _dispatch_get_root_queue(0, 0);
1508 a
->qa_flags
= DISPATCH_QUEUE_OVERCOMMIT
;
1514 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr
, uint64_t flags
)
1516 dispatch_assert_zero(flags
& ~DISPATCH_QUEUE_FLAGS_MASK
);
1517 attr
->qa_flags
= (unsigned long)flags
& DISPATCH_QUEUE_FLAGS_MASK
;
1521 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr
, int priority
)
1523 dispatch_debug_assert(attr
, "NULL pointer");
1524 dispatch_debug_assert(priority
<= DISPATCH_QUEUE_PRIORITY_HIGH
&& priority
>= DISPATCH_QUEUE_PRIORITY_LOW
, "Invalid priority");
1527 priority
= DISPATCH_QUEUE_PRIORITY_HIGH
;
1528 } else if (priority
< 0) {
1529 priority
= DISPATCH_QUEUE_PRIORITY_LOW
;
1532 attr
->qa_priority
= priority
;
1536 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr
,
1537 void *context
, dispatch_queue_finalizer_function_t finalizer
)
1540 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1541 Block_release(attr
->finalizer_ctxt
);
1544 attr
->finalizer_ctxt
= context
;
1545 attr
->finalizer_func
= finalizer
;
1550 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr
,
1551 dispatch_queue_finalizer_t finalizer
)
1554 dispatch_queue_finalizer_function_t func
;
1557 if (!(ctxt
= Block_copy(finalizer
))) {
1560 func
= (void *)_dispatch_call_block_and_release2
;
1566 dispatch_queue_attr_set_finalizer_f(attr
, ctxt
, func
);
1573 _dispatch_ccache_init(void *context
__attribute__((unused
)))
1575 _dispatch_ccache_zone
= malloc_create_zone(0, 0);
1576 dispatch_assert(_dispatch_ccache_zone
);
1577 malloc_set_zone_name(_dispatch_ccache_zone
, "DispatchContinuations");
1580 dispatch_continuation_t
1581 _dispatch_continuation_alloc_from_heap(void)
1583 static dispatch_once_t pred
;
1584 dispatch_continuation_t dc
;
1586 dispatch_once_f(&pred
, NULL
, _dispatch_ccache_init
);
1588 while (!(dc
= fastpath(malloc_zone_calloc(_dispatch_ccache_zone
, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc
)))))) {
1596 _dispatch_force_cache_cleanup(void)
1598 dispatch_continuation_t dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1600 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1601 _dispatch_cache_cleanup2(dc
);
1607 _dispatch_cache_cleanup2(void *value
)
1609 dispatch_continuation_t dc
, next_dc
= value
;
1611 while ((dc
= next_dc
)) {
1612 next_dc
= dc
->do_next
;
1613 malloc_zone_free(_dispatch_ccache_zone
, dc
);
1617 static char _dispatch_build
[16];
1620 _dispatch_bug_init(void *context
__attribute__((unused
)))
1622 int mib
[] = { CTL_KERN
, KERN_OSVERSION
};
1623 size_t bufsz
= sizeof(_dispatch_build
);
1625 sysctl(mib
, 2, _dispatch_build
, &bufsz
, NULL
, 0);
1629 _dispatch_bug(size_t line
, long val
)
1631 static dispatch_once_t pred
;
1632 static void *last_seen
;
1633 void *ra
= __builtin_return_address(0);
1635 dispatch_once_f(&pred
, NULL
, _dispatch_bug_init
);
1636 if (last_seen
!= ra
) {
1638 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build
, line
, val
);
1643 _dispatch_abort(size_t line
, long val
)
1645 _dispatch_bug(line
, val
);
1650 _dispatch_log(const char *msg
, ...)
1656 _dispatch_logv(msg
, ap
);
1662 _dispatch_logv(const char *msg
, va_list ap
)
1665 static FILE *logfile
, *tmp
;
1666 char newbuf
[strlen(msg
) + 2];
1667 char path
[PATH_MAX
];
1669 sprintf(newbuf
, "%s\n", msg
);
1672 snprintf(path
, sizeof(path
), "/var/tmp/libdispatch.%d.log", getpid());
1673 tmp
= fopen(path
, "a");
1675 if (!dispatch_atomic_cmpxchg(&logfile
, NULL
, tmp
)) {
1679 gettimeofday(&tv
, NULL
);
1680 fprintf(logfile
, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1681 getprogname() ?: "", getpid(), tv
.tv_sec
, tv
.tv_usec
);
1684 vfprintf(logfile
, newbuf
, ap
);
1687 vsyslog(LOG_NOTICE
, msg
, ap
);
1692 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
1696 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1698 r
= sigdelset(set
, SIGILL
);
1699 dispatch_assume_zero(r
);
1700 r
= sigdelset(set
, SIGTRAP
);
1701 dispatch_assume_zero(r
);
1702 r
= sigdelset(set
, SIGEMT
);
1703 dispatch_assume_zero(r
);
1704 r
= sigdelset(set
, SIGFPE
);
1705 dispatch_assume_zero(r
);
1706 r
= sigdelset(set
, SIGBUS
);
1707 dispatch_assume_zero(r
);
1708 r
= sigdelset(set
, SIGSEGV
);
1709 dispatch_assume_zero(r
);
1710 r
= sigdelset(set
, SIGSYS
);
1711 dispatch_assume_zero(r
);
1712 r
= sigdelset(set
, SIGPIPE
);
1713 dispatch_assume_zero(r
);
1715 return pthread_sigmask(how
, set
, oset
);
1718 bool _dispatch_safe_fork
= true;
1721 dispatch_atfork_prepare(void)
1726 dispatch_atfork_parent(void)
1731 dispatch_atfork_child(void)
1733 void *crash
= (void *)0x100;
1736 if (_dispatch_safe_fork
) {
1740 _dispatch_main_q
.dq_items_head
= crash
;
1741 _dispatch_main_q
.dq_items_tail
= crash
;
1743 _dispatch_mgr_q
.dq_items_head
= crash
;
1744 _dispatch_mgr_q
.dq_items_tail
= crash
;
1746 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1747 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1748 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1753 dispatch_init_pthread(pthread_t pthr
__attribute__((unused
)))
1757 static int _dispatch_kq
;
1760 _dispatch_get_kq_init(void *context
__attribute__((unused
)))
1762 static const struct kevent kev
= {
1764 .filter
= EVFILT_USER
,
1765 .flags
= EV_ADD
|EV_CLEAR
,
1768 _dispatch_kq
= kqueue();
1769 _dispatch_safe_fork
= false;
1770 // in case we fall back to select()
1771 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
1773 if (_dispatch_kq
== -1) {
1774 dispatch_assert_zero(errno
);
1777 dispatch_assume_zero(kevent(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
));
1779 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
1783 _dispatch_get_kq(void)
1785 static dispatch_once_t pred
;
1787 dispatch_once_f(&pred
, NULL
, _dispatch_get_kq_init
);
1789 return _dispatch_kq
;
1793 _dispatch_mgr_thread2(struct kevent
*kev
, size_t cnt
)
1797 for (i
= 0; i
< cnt
; i
++) {
1798 // EVFILT_USER isn't used by sources
1799 if (kev
[i
].filter
== EVFILT_USER
) {
1800 // If _dispatch_mgr_thread2() ever is changed to return to the
1801 // caller, then this should become _dispatch_queue_drain()
1802 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q
);
1804 _dispatch_source_drain_kevent(&kev
[i
]);
1809 static dispatch_queue_t
1810 _dispatch_mgr_invoke(dispatch_queue_t dq
)
1812 static const struct timespec timeout_immediately
= { 0, 0 };
1813 struct timespec timeout
;
1814 const struct timespec
*timeoutp
;
1815 struct timeval sel_timeout
, *sel_timeoutp
;
1816 fd_set tmp_rfds
, tmp_wfds
;
1817 struct kevent kev
[1];
1818 int k_cnt
, k_err
, i
, r
;
1820 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1823 _dispatch_run_timers();
1825 timeoutp
= _dispatch_get_next_timer_fire(&timeout
);
1827 if (_dispatch_select_workaround
) {
1828 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
1829 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
1831 sel_timeout
.tv_sec
= timeoutp
->tv_sec
;
1832 sel_timeout
.tv_usec
= (typeof(sel_timeout
.tv_usec
))(timeoutp
->tv_nsec
/ 1000u);
1833 sel_timeoutp
= &sel_timeout
;
1835 sel_timeoutp
= NULL
;
1838 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
, sel_timeoutp
);
1840 if (errno
!= EBADF
) {
1841 dispatch_assume_zero(errno
);
1844 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1845 if (i
== _dispatch_kq
) {
1848 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
, &_dispatch_wfds
)) {
1855 FD_CLR(i
, &_dispatch_rfds
);
1856 FD_CLR(i
, &_dispatch_wfds
);
1857 _dispatch_rfd_ptrs
[i
] = 0;
1858 _dispatch_wfd_ptrs
[i
] = 0;
1865 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1866 if (i
== _dispatch_kq
) {
1869 if (FD_ISSET(i
, &tmp_rfds
)) {
1870 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISABLE
1871 EV_SET(&kev
[0], i
, EVFILT_READ
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_rfd_ptrs
[i
]);
1872 _dispatch_rfd_ptrs
[i
] = 0;
1873 _dispatch_mgr_thread2(kev
, 1);
1875 if (FD_ISSET(i
, &tmp_wfds
)) {
1876 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISABLE
1877 EV_SET(&kev
[0], i
, EVFILT_WRITE
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_wfd_ptrs
[i
]);
1878 _dispatch_wfd_ptrs
[i
] = 0;
1879 _dispatch_mgr_thread2(kev
, 1);
1884 timeoutp
= &timeout_immediately
;
1887 k_cnt
= kevent(_dispatch_kq
, NULL
, 0, kev
, sizeof(kev
) / sizeof(kev
[0]), timeoutp
);
1892 if (k_err
== EBADF
) {
1893 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1895 dispatch_assume_zero(k_err
);
1898 _dispatch_mgr_thread2(kev
, (size_t)k_cnt
);
1901 _dispatch_force_cache_cleanup();
1910 _dispatch_mgr_wakeup(dispatch_queue_t dq
)
1912 static const struct kevent kev
= {
1914 .filter
= EVFILT_USER
,
1916 .flags
= EV_TRIGGER
,
1919 .fflags
= NOTE_TRIGGER
,
1923 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq
);
1925 _dispatch_update_kq(&kev
);
1931 _dispatch_update_kq(const struct kevent
*kev
)
1933 struct kevent kev_copy
= *kev
;
1934 kev_copy
.flags
|= EV_RECEIPT
;
1936 if (kev_copy
.flags
& EV_DELETE
) {
1937 switch (kev_copy
.filter
) {
1939 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_rfds
)) {
1940 FD_CLR((int)kev_copy
.ident
, &_dispatch_rfds
);
1941 _dispatch_rfd_ptrs
[kev_copy
.ident
] = 0;
1945 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_wfds
)) {
1946 FD_CLR((int)kev_copy
.ident
, &_dispatch_wfds
);
1947 _dispatch_wfd_ptrs
[kev_copy
.ident
] = 0;
1955 int rval
= kevent(_dispatch_get_kq(), &kev_copy
, 1, &kev_copy
, 1, NULL
);
1957 // If we fail to register with kevents, for other reasons aside from
1958 // changelist elements.
1959 dispatch_assume_zero(errno
);
1960 //kev_copy.flags |= EV_ERROR;
1961 //kev_copy.data = error;
1965 // The following select workaround only applies to adding kevents
1966 if (!(kev
->flags
& EV_ADD
)) {
1970 switch (kev_copy
.data
) {
1976 // If an error occurred while registering with kevent, and it was
1977 // because of a kevent changelist processing && the kevent involved
1978 // either doing a read or write, it would indicate we were trying
1979 // to register a /dev/* port; fall back to select
1980 switch (kev_copy
.filter
) {
1982 _dispatch_select_workaround
= true;
1983 FD_SET((int)kev_copy
.ident
, &_dispatch_rfds
);
1984 _dispatch_rfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
1987 _dispatch_select_workaround
= true;
1988 FD_SET((int)kev_copy
.ident
, &_dispatch_wfds
);
1989 _dispatch_wfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
1992 _dispatch_source_drain_kevent(&kev_copy
);
1999 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable
= {
2000 .do_type
= DISPATCH_QUEUE_MGR_TYPE
,
2001 .do_kind
= "mgr-queue",
2002 .do_invoke
= _dispatch_mgr_invoke
,
2003 .do_debug
= dispatch_queue_debug
,
2004 .do_probe
= _dispatch_mgr_wakeup
,
2007 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2008 struct dispatch_queue_s _dispatch_mgr_q
= {
2009 .do_vtable
= &_dispatch_queue_mgr_vtable
,
2010 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2011 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2012 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
2013 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
- 1],
2015 .dq_label
= "com.apple.libdispatch-manager",
2020 const struct dispatch_queue_offsets_s dispatch_queue_offsets
= {
2022 .dqo_label
= offsetof(struct dispatch_queue_s
, dq_label
),
2023 .dqo_label_size
= sizeof(_dispatch_main_q
.dq_label
),
2025 .dqo_flags_size
= 0,
2026 .dqo_width
= offsetof(struct dispatch_queue_s
, dq_width
),
2027 .dqo_width_size
= sizeof(_dispatch_main_q
.dq_width
),
2028 .dqo_serialnum
= offsetof(struct dispatch_queue_s
, dq_serialnum
),
2029 .dqo_serialnum_size
= sizeof(_dispatch_main_q
.dq_serialnum
),
2030 .dqo_running
= offsetof(struct dispatch_queue_s
, dq_running
),
2031 .dqo_running_size
= sizeof(_dispatch_main_q
.dq_running
),
2036 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
, dispatch_block_t work
)
2038 // test before the copy of the block
2039 if (when
== DISPATCH_TIME_FOREVER
) {
2041 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2045 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
2051 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
, void (*func
)(void *))
2054 if (when
== DISPATCH_TIME_FOREVER
) {
2056 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2061 // this function can and should be optimized to not use a dispatch source
2063 delta
= _dispatch_timeout(when
);
2065 return dispatch_async_f(queue
, ctxt
, func
);
2067 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL
, delta
, 0, NULL
, queue
, ^(dispatch_source_t ds
) {
2068 long err_dom
, err_val
;
2069 if ((err_dom
= dispatch_source_get_error(ds
, &err_val
))) {
2070 dispatch_assert(err_dom
== DISPATCH_ERROR_DOMAIN_POSIX
);
2071 dispatch_assert(err_val
== ECANCELED
);
2073 dispatch_release(ds
); // MUST NOT be _dispatch_release()
2075 dispatch_source_cancel(ds
);