2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
30 dummy_function_r0(void)
35 static bool _dispatch_select_workaround
;
36 static fd_set _dispatch_rfds
;
37 static fd_set _dispatch_wfds
;
38 static void *_dispatch_rfd_ptrs
[FD_SETSIZE
];
39 static void *_dispatch_wfd_ptrs
[FD_SETSIZE
];
42 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
44 .do_vtable
= &_dispatch_semaphore_vtable
,
45 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
46 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
49 .do_vtable
= &_dispatch_semaphore_vtable
,
50 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
51 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
54 .do_vtable
= &_dispatch_semaphore_vtable
,
55 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
56 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
59 .do_vtable
= &_dispatch_semaphore_vtable
,
60 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
61 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
64 .do_vtable
= &_dispatch_semaphore_vtable
,
65 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
66 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
69 .do_vtable
= &_dispatch_semaphore_vtable
,
70 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
71 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
75 static struct dispatch_queue_s _dispatch_root_queues
[];
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority
, bool overcommit
)
80 if (overcommit
) switch (priority
) {
81 case DISPATCH_QUEUE_PRIORITY_LOW
:
82 return &_dispatch_root_queues
[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
84 return &_dispatch_root_queues
[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH
:
86 return &_dispatch_root_queues
[5];
89 case DISPATCH_QUEUE_PRIORITY_LOW
:
90 return &_dispatch_root_queues
[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
92 return &_dispatch_root_queues
[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH
:
94 return &_dispatch_root_queues
[4];
102 _dispatch_Block_copy(dispatch_block_t db
)
104 dispatch_block_t rval
;
106 while (!(rval
= Block_copy(db
))) {
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
115 _dispatch_call_block_and_release(void *block
)
117 void (^b
)(void) = block
;
123 _dispatch_call_block_and_release2(void *block
, void *ctxt
)
125 void (^b
)(void*) = block
;
130 #endif /* __BLOCKS__ */
132 struct dispatch_queue_attr_vtable_s
{
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s
);
136 struct dispatch_queue_attr_s
{
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s
, dispatch_queue_attr_vtable_s
);
141 void* finalizer_ctxt
;
142 dispatch_queue_finalizer_function_t finalizer_func
;
145 unsigned long qa_flags
;
148 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq
);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq
);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
155 static struct dispatch_object_s
*_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
);
157 static bool _dispatch_program_is_probably_callback_driven
;
159 #if DISPATCH_COCOA_COMPAT
160 void (*dispatch_begin_thread_4GC
)(void) = dummy_function
;
161 void (*dispatch_end_thread_4GC
)(void) = dummy_function
;
162 void *(*_dispatch_begin_NSAutoReleasePool
)(void) = (void *)dummy_function
;
163 void (*_dispatch_end_NSAutoReleasePool
)(void *) = (void *)dummy_function
;
164 static void _dispatch_queue_wakeup_main(void);
166 static dispatch_once_t _dispatch_main_q_port_pred
;
167 static bool main_q_is_draining
;
168 static mach_port_t main_q_port
;
171 static void _dispatch_cache_cleanup2(void *value
);
172 static void _dispatch_force_cache_cleanup(void);
174 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable
= {
175 .do_type
= DISPATCH_QUEUE_TYPE
,
177 .do_dispose
= _dispatch_queue_dispose
,
178 .do_invoke
= (void *)dummy_function_r0
,
179 .do_probe
= (void *)dummy_function_r0
,
180 .do_debug
= dispatch_queue_debug
,
183 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable
= {
184 .do_type
= DISPATCH_QUEUE_GLOBAL_TYPE
,
185 .do_kind
= "global-queue",
186 .do_debug
= dispatch_queue_debug
,
187 .do_probe
= _dispatch_queue_wakeup_global
,
190 #define MAX_THREAD_COUNT 255
192 struct dispatch_root_queue_context_s
{
193 pthread_workqueue_t dgq_kworkqueue
;
194 uint32_t dgq_pending
;
195 uint32_t dgq_thread_pool_size
;
196 dispatch_semaphore_t dgq_thread_mediator
;
199 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
200 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
202 .dgq_thread_mediator
= &_dispatch_thread_mediator
[0],
203 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
206 .dgq_thread_mediator
= &_dispatch_thread_mediator
[1],
207 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
210 .dgq_thread_mediator
= &_dispatch_thread_mediator
[2],
211 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
214 .dgq_thread_mediator
= &_dispatch_thread_mediator
[3],
215 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
218 .dgq_thread_mediator
= &_dispatch_thread_mediator
[4],
219 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
222 .dgq_thread_mediator
= &_dispatch_thread_mediator
[5],
223 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
227 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 static struct dispatch_queue_s _dispatch_root_queues
[] = {
231 .do_vtable
= &_dispatch_queue_root_vtable
,
232 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
233 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
234 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
235 .do_ctxt
= &_dispatch_root_queue_contexts
[0],
237 .dq_label
= "com.apple.root.low-priority",
239 .dq_width
= UINT32_MAX
,
243 .do_vtable
= &_dispatch_queue_root_vtable
,
244 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
245 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
246 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
247 .do_ctxt
= &_dispatch_root_queue_contexts
[1],
249 .dq_label
= "com.apple.root.low-overcommit-priority",
251 .dq_width
= UINT32_MAX
,
255 .do_vtable
= &_dispatch_queue_root_vtable
,
256 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
257 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
258 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
259 .do_ctxt
= &_dispatch_root_queue_contexts
[2],
261 .dq_label
= "com.apple.root.default-priority",
263 .dq_width
= UINT32_MAX
,
267 .do_vtable
= &_dispatch_queue_root_vtable
,
268 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
269 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
270 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
271 .do_ctxt
= &_dispatch_root_queue_contexts
[3],
273 .dq_label
= "com.apple.root.default-overcommit-priority",
275 .dq_width
= UINT32_MAX
,
279 .do_vtable
= &_dispatch_queue_root_vtable
,
280 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
281 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
282 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
283 .do_ctxt
= &_dispatch_root_queue_contexts
[4],
285 .dq_label
= "com.apple.root.high-priority",
287 .dq_width
= UINT32_MAX
,
291 .do_vtable
= &_dispatch_queue_root_vtable
,
292 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
293 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
294 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
295 .do_ctxt
= &_dispatch_root_queue_contexts
[5],
297 .dq_label
= "com.apple.root.high-overcommit-priority",
299 .dq_width
= UINT32_MAX
,
304 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
305 struct dispatch_queue_s _dispatch_main_q
= {
306 .do_vtable
= &_dispatch_queue_vtable
,
307 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
308 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
309 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
310 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
/ 2],
312 .dq_label
= "com.apple.main-thread",
318 #if DISPATCH_PERF_MON
319 static OSSpinLock _dispatch_stats_lock
;
320 static size_t _dispatch_bad_ratio
;
323 uint64_t count_total
;
324 uint64_t thread_total
;
325 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
326 static void _dispatch_queue_merge_stats(uint64_t start
);
329 static void *_dispatch_worker_thread(void *context
);
330 static void _dispatch_worker_thread2(void *context
);
332 malloc_zone_t
*_dispatch_ccache_zone
;
335 _dispatch_continuation_free(dispatch_continuation_t dc
)
337 dispatch_continuation_t prev_dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
338 dc
->do_next
= prev_dc
;
339 _dispatch_thread_setspecific(dispatch_cache_key
, dc
);
343 _dispatch_continuation_pop(dispatch_object_t dou
)
345 dispatch_continuation_t dc
= dou
._dc
;
348 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
349 return _dispatch_queue_invoke(dou
._dq
);
352 // Add the item back to the cache before calling the function. This
353 // allows the 'hot' continuation to be used for a quick callback.
355 // The ccache version is per-thread.
356 // Therefore, the object has not been reused yet.
357 // This generates better assembly.
358 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
359 _dispatch_continuation_free(dc
);
361 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
366 dc
->dc_func(dc
->dc_ctxt
);
368 dispatch_group_leave(dg
);
369 _dispatch_release(dg
);
373 struct dispatch_object_s
*
374 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
376 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
378 // The mediator value acts both as a "lock" and a signal
379 head
= dispatch_atomic_xchg(&dq
->dq_items_head
, mediator
);
381 if (slowpath(head
== NULL
)) {
382 // The first xchg on the tail will tell the enqueueing thread that it
383 // is safe to blindly write out to the head pointer. A cmpxchg honors
385 dispatch_atomic_cmpxchg(&dq
->dq_items_head
, mediator
, NULL
);
386 _dispatch_debug("no work on global work queue");
390 if (slowpath(head
== mediator
)) {
391 // This thread lost the race for ownership of the queue.
393 // The ratio of work to libdispatch overhead must be bad. This
394 // scenario implies that there are too many threads in the pool.
395 // Create a new pending thread and then exit this thread.
396 // The kernel will grant a new thread when the load subsides.
397 _dispatch_debug("Contention on queue: %p", dq
);
398 _dispatch_queue_wakeup_global(dq
);
399 #if DISPATCH_PERF_MON
400 dispatch_atomic_inc(&_dispatch_bad_ratio
);
405 // Restore the head pointer to a sane value before returning.
406 // If 'next' is NULL, then this item _might_ be the last item.
407 next
= fastpath(head
->do_next
);
409 if (slowpath(!next
)) {
410 dq
->dq_items_head
= NULL
;
412 if (dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, head
, NULL
)) {
413 // both head and tail are NULL now
417 // There must be a next item now. This thread won't wait long.
418 while (!(next
= head
->do_next
)) {
419 _dispatch_hardware_pause();
423 dq
->dq_items_head
= next
;
424 _dispatch_queue_wakeup_global(dq
);
430 dispatch_get_current_queue(void)
432 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
435 #undef dispatch_get_main_queue
436 __OSX_AVAILABLE_STARTING(__MAC_10_6
,__IPHONE_NA
)
437 dispatch_queue_t
dispatch_get_main_queue(void);
440 dispatch_get_main_queue(void)
442 return &_dispatch_main_q
;
444 #define dispatch_get_main_queue() (&_dispatch_main_q)
446 struct _dispatch_hw_config_s _dispatch_hw_config
;
449 _dispatch_queue_set_width_init(void)
451 size_t valsz
= sizeof(uint32_t);
454 sysctlbyname("hw.activecpu", &_dispatch_hw_config
.cc_max_active
, &valsz
, NULL
, 0);
455 dispatch_assume_zero(errno
);
456 dispatch_assume(valsz
== sizeof(uint32_t));
459 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config
.cc_max_logical
, &valsz
, NULL
, 0);
460 dispatch_assume_zero(errno
);
461 dispatch_assume(valsz
== sizeof(uint32_t));
464 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config
.cc_max_physical
, &valsz
, NULL
, 0);
465 dispatch_assume_zero(errno
);
466 dispatch_assume(valsz
== sizeof(uint32_t));
470 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
472 int w
= (int)width
; // intentional truncation
475 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
478 if (w
== 1 || w
== 0) {
485 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
486 tmp
= _dispatch_hw_config
.cc_max_physical
;
488 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
489 tmp
= _dispatch_hw_config
.cc_max_active
;
493 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
494 tmp
= _dispatch_hw_config
.cc_max_logical
;
497 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
498 dq
->dq_width
= tmp
* 2;
500 // XXX if the queue has items and the width is increased, we should try to wake the queue
507 // 4,5,6,7,8,9 - global queues
508 // we use 'xadd' on Intel, so the initial value == next assigned
509 static unsigned long _dispatch_queue_serial_numbers
= 10;
511 // Note to later developers: ensure that any initialization changes are
512 // made for statically allocated queues (i.e. _dispatch_main_q).
514 _dispatch_queue_init(dispatch_queue_t dq
)
516 dq
->do_vtable
= &_dispatch_queue_vtable
;
517 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
520 dq
->do_targetq
= _dispatch_get_root_queue(0, true);
523 dq
->dq_serialnum
= dispatch_atomic_inc(&_dispatch_queue_serial_numbers
) - 1;
527 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
536 label_len
= strlen(label
);
537 if (label_len
< (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1)) {
538 label_len
= (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1);
541 // XXX switch to malloc()
542 dq
= calloc(1ul, sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_MIN_LABEL_SIZE
+ label_len
+ 1);
547 _dispatch_queue_init(dq
);
548 strcpy(dq
->dq_label
, label
);
550 #ifndef DISPATCH_NO_LEGACY
551 if (slowpath(attr
)) {
552 dq
->do_targetq
= _dispatch_get_root_queue(attr
->qa_priority
, attr
->qa_flags
& DISPATCH_QUEUE_OVERCOMMIT
);
553 dq
->dq_finalizer_ctxt
= attr
->finalizer_ctxt
;
554 dq
->dq_finalizer_func
= attr
->finalizer_func
;
556 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
557 // if finalizer_ctxt is a Block, retain it.
558 dq
->dq_finalizer_ctxt
= Block_copy(dq
->dq_finalizer_ctxt
);
559 if (!(dq
->dq_finalizer_ctxt
)) {
574 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
576 _dispatch_queue_dispose(dispatch_queue_t dq
)
578 if (slowpath(dq
== _dispatch_queue_get_current())) {
579 DISPATCH_CRASH("Release of a queue by itself");
581 if (slowpath(dq
->dq_items_tail
)) {
582 DISPATCH_CRASH("Release of a queue while items are enqueued");
585 #ifndef DISPATCH_NO_LEGACY
586 if (dq
->dq_finalizer_func
) {
587 dq
->dq_finalizer_func(dq
->dq_finalizer_ctxt
, dq
);
591 // trash the tail queue so that use after free will crash
592 dq
->dq_items_tail
= (void *)0x200;
594 _dispatch_dispose(dq
);
599 _dispatch_queue_push_list_slow(dispatch_queue_t dq
, struct dispatch_object_s
*obj
)
601 // The queue must be retained before dq_items_head is written in order
602 // to ensure that the reference is still valid when _dispatch_wakeup is
603 // called. Otherwise, if preempted between the assignment to
604 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
605 // queue may release the last reference to the queue when invoked by
606 // _dispatch_queue_drain. <rdar://problem/6932776>
607 _dispatch_retain(dq
);
608 dq
->dq_items_head
= obj
;
609 _dispatch_wakeup(dq
);
610 _dispatch_release(dq
);
615 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
617 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
619 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
621 dc
->dc_ctxt
= context
;
623 _dispatch_queue_push(dq
, dc
);
628 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
630 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
636 dispatch_barrier_async_f(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
638 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
641 return _dispatch_barrier_async_f_slow(dq
, context
, func
);
644 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
646 dc
->dc_ctxt
= context
;
648 _dispatch_queue_push(dq
, dc
);
653 _dispatch_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
655 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
657 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
659 dc
->dc_ctxt
= context
;
661 _dispatch_queue_push(dq
, dc
);
666 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
668 dispatch_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
674 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
676 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
678 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
679 // the "drain" function will do this test
682 return _dispatch_async_f_slow(dq
, ctxt
, func
);
685 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
689 _dispatch_queue_push(dq
, dc
);
692 struct dispatch_barrier_sync_slow2_s
{
693 dispatch_queue_t dbss2_dq
;
694 dispatch_function_t dbss2_func
;
695 dispatch_function_t dbss2_ctxt
;
696 dispatch_semaphore_t dbss2_sema
;
700 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
702 struct dispatch_barrier_sync_slow2_s
*dbss2
= ctxt
;
704 dispatch_assert(dbss2
->dbss2_dq
== dispatch_get_current_queue());
705 // ALL blocks on the main queue, must be run on the main thread
706 if (dbss2
->dbss2_dq
== dispatch_get_main_queue()) {
707 dbss2
->dbss2_func(dbss2
->dbss2_ctxt
);
709 dispatch_suspend(dbss2
->dbss2_dq
);
711 dispatch_semaphore_signal(dbss2
->dbss2_sema
);
716 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
719 // It's preferred to execute synchronous blocks on the current thread
720 // due to thread-local side effects, garbage collection, etc. However,
721 // blocks submitted to the main thread MUST be run on the main thread
723 struct dispatch_barrier_sync_slow2_s dbss2
= {
727 .dbss2_sema
= _dispatch_get_thread_semaphore(),
729 struct dispatch_barrier_sync_slow_s
{
730 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s
);
732 .do_vtable
= (void *)DISPATCH_OBJ_BARRIER_BIT
,
733 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
737 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
738 _dispatch_queue_push(dq
, (void *)&dbss
);
739 dispatch_semaphore_wait(dbss2
.dbss2_sema
, DISPATCH_TIME_FOREVER
);
741 if (dq
!= dispatch_get_main_queue()) {
742 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
744 _dispatch_workitem_inc();
745 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
748 _dispatch_put_thread_semaphore(dbss2
.dbss2_sema
);
753 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
755 // Blocks submitted to the main queue MUST be run on the main thread,
756 // therefore we must Block_copy in order to notify the thread-local
757 // garbage collector that the objects are transferring to the main thread
758 if (dq
== dispatch_get_main_queue()) {
759 dispatch_block_t block
= Block_copy(work
);
760 return dispatch_barrier_sync_f(dq
, block
, _dispatch_call_block_and_release
);
762 struct Block_basic
*bb
= (void *)work
;
764 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
770 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
772 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
774 // 1) ensure that this thread hasn't enqueued anything ahead of this call
775 // 2) the queue is not suspended
776 // 3) the queue is not weird
777 if (slowpath(dq
->dq_items_tail
)
778 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
779 || slowpath(!_dispatch_queue_trylock(dq
))) {
780 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
783 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
785 _dispatch_workitem_inc();
786 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
787 _dispatch_queue_unlock(dq
);
791 _dispatch_sync_f_slow2(void *ctxt
)
793 dispatch_queue_t dq
= _dispatch_queue_get_current();
794 dispatch_atomic_add(&dq
->dq_running
, 2);
795 dispatch_semaphore_signal(ctxt
);
800 _dispatch_sync_f_slow(dispatch_queue_t dq
)
802 // the global root queues do not need strict ordering
803 if (dq
->do_targetq
== NULL
) {
804 dispatch_atomic_add(&dq
->dq_running
, 2);
808 struct dispatch_sync_slow_s
{
809 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s
);
812 .dc_func
= _dispatch_sync_f_slow2
,
813 .dc_ctxt
= _dispatch_get_thread_semaphore(),
816 // XXX FIXME -- concurrent queues can be come serial again
817 _dispatch_queue_push(dq
, (void *)&dss
);
819 dispatch_semaphore_wait(dss
.dc_ctxt
, DISPATCH_TIME_FOREVER
);
820 _dispatch_put_thread_semaphore(dss
.dc_ctxt
);
825 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
827 struct Block_basic
*bb
= (void *)work
;
828 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
834 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
836 typeof(dq
->dq_running
) prev_cnt
;
837 dispatch_queue_t old_dq
;
839 if (dq
->dq_width
== 1) {
840 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
843 // 1) ensure that this thread hasn't enqueued anything ahead of this call
844 // 2) the queue is not suspended
845 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
846 _dispatch_sync_f_slow(dq
);
848 prev_cnt
= dispatch_atomic_add(&dq
->dq_running
, 2) - 2;
850 if (slowpath(prev_cnt
& 1)) {
851 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
852 _dispatch_wakeup(dq
);
854 _dispatch_sync_f_slow(dq
);
858 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
859 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
861 _dispatch_workitem_inc();
862 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
864 if (slowpath(dispatch_atomic_sub(&dq
->dq_running
, 2) == 0)) {
865 _dispatch_wakeup(dq
);
870 dispatch_queue_get_label(dispatch_queue_t dq
)
875 #if DISPATCH_COCOA_COMPAT
877 _dispatch_main_q_port_init(void *ctxt
__attribute__((unused
)))
881 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &main_q_port
);
882 DISPATCH_VERIFY_MIG(kr
);
883 dispatch_assume_zero(kr
);
884 kr
= mach_port_insert_right(mach_task_self(), main_q_port
, main_q_port
, MACH_MSG_TYPE_MAKE_SEND
);
885 DISPATCH_VERIFY_MIG(kr
);
886 dispatch_assume_zero(kr
);
888 _dispatch_program_is_probably_callback_driven
= true;
889 _dispatch_safe_fork
= false;
892 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
895 _dispatch_queue_set_mainq_drain_state(bool arg
)
897 main_q_is_draining
= arg
;
904 if (pthread_main_np()) {
905 _dispatch_program_is_probably_callback_driven
= true;
907 DISPATCH_CRASH("pthread_exit() returned");
909 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
913 _dispatch_sigsuspend(void *ctxt
__attribute__((unused
)))
915 static const sigset_t mask
;
924 _dispatch_queue_cleanup2(void)
926 dispatch_atomic_dec(&_dispatch_main_q
.dq_running
);
928 if (dispatch_atomic_sub(&_dispatch_main_q
.do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
929 _dispatch_wakeup(&_dispatch_main_q
);
932 // overload the "probably" variable to mean that dispatch_main() or
933 // similar non-POSIX API was called
934 // this has to run before the DISPATCH_COCOA_COMPAT below
935 if (_dispatch_program_is_probably_callback_driven
) {
936 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL
, _dispatch_sigsuspend
);
937 sleep(1); // workaround 6778970
940 #if DISPATCH_COCOA_COMPAT
941 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
943 mach_port_t mp
= main_q_port
;
949 kr
= mach_port_deallocate(mach_task_self(), mp
);
950 DISPATCH_VERIFY_MIG(kr
);
951 dispatch_assume_zero(kr
);
952 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
953 DISPATCH_VERIFY_MIG(kr
);
954 dispatch_assume_zero(kr
);
960 dispatch_get_concurrent_queue(long pri
)
963 pri
= DISPATCH_QUEUE_PRIORITY_HIGH
;
964 } else if (pri
< 0) {
965 pri
= DISPATCH_QUEUE_PRIORITY_LOW
;
967 return _dispatch_get_root_queue(pri
, false);
971 _dispatch_queue_cleanup(void *ctxt
)
973 if (ctxt
== &_dispatch_main_q
) {
974 return _dispatch_queue_cleanup2();
976 // POSIX defines that destructors are only called if 'ctxt' is non-null
977 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
981 dispatch_get_global_queue(long priority
, unsigned long flags
)
983 if (flags
& ~DISPATCH_QUEUE_OVERCOMMIT
) {
986 return _dispatch_get_root_queue(priority
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
989 #define countof(x) (sizeof(x) / sizeof(x[0]))
991 libdispatch_init(void)
993 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 3);
994 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 6);
996 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
== -DISPATCH_QUEUE_PRIORITY_HIGH
);
997 dispatch_assert(countof(_dispatch_root_queues
) == DISPATCH_ROOT_QUEUE_COUNT
);
998 dispatch_assert(countof(_dispatch_thread_mediator
) == DISPATCH_ROOT_QUEUE_COUNT
);
999 dispatch_assert(countof(_dispatch_root_queue_contexts
) == DISPATCH_ROOT_QUEUE_COUNT
);
1001 _dispatch_thread_key_init_np(dispatch_queue_key
, _dispatch_queue_cleanup
);
1002 _dispatch_thread_key_init_np(dispatch_sema4_key
, (void (*)(void *))dispatch_release
); // use the extern release
1003 _dispatch_thread_key_init_np(dispatch_cache_key
, _dispatch_cache_cleanup2
);
1004 #if DISPATCH_PERF_MON
1005 _dispatch_thread_key_init_np(dispatch_bcounter_key
, NULL
);
1008 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
1010 _dispatch_queue_set_width_init();
1014 _dispatch_queue_unlock(dispatch_queue_t dq
)
1016 if (slowpath(dispatch_atomic_dec(&dq
->dq_running
))) {
1020 _dispatch_wakeup(dq
);
1023 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1025 _dispatch_wakeup(dispatch_object_t dou
)
1027 dispatch_queue_t tq
;
1029 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
1032 if (!dx_probe(dou
._do
) && !dou
._dq
->dq_items_tail
) {
1036 if (!_dispatch_trylock(dou
._do
)) {
1037 #if DISPATCH_COCOA_COMPAT
1038 if (dou
._dq
== &_dispatch_main_q
) {
1039 _dispatch_queue_wakeup_main();
1044 _dispatch_retain(dou
._do
);
1045 tq
= dou
._do
->do_targetq
;
1046 _dispatch_queue_push(tq
, dou
._do
);
1047 return tq
; // libdispatch doesn't need this, but the Instrument DTrace probe does
1050 #if DISPATCH_COCOA_COMPAT
1053 _dispatch_queue_wakeup_main(void)
1057 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1059 kr
= _dispatch_send_wakeup_main_thread(main_q_port
, 0);
1062 case MACH_SEND_TIMEOUT
:
1063 case MACH_SEND_TIMED_OUT
:
1064 case MACH_SEND_INVALID_DEST
:
1067 dispatch_assume_zero(kr
);
1071 _dispatch_safe_fork
= false;
1076 _dispatch_rootq2wq_pri(long idx
)
1078 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1082 return WORKQ_LOW_PRIOQUEUE
;
1086 return WORKQ_DEFAULT_PRIOQUEUE
;
1089 return WORKQ_HIGH_PRIOQUEUE
;
1097 _dispatch_root_queues_init(void *context
__attribute__((unused
)))
1099 bool disable_wq
= getenv("LIBDISPATCH_DISABLE_KWQ");
1100 pthread_workqueue_attr_t pwq_attr
;
1104 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
1105 dispatch_assume_zero(r
);
1107 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1108 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
, _dispatch_rootq2wq_pri(i
));
1109 dispatch_assume_zero(r
);
1110 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
, i
& 1);
1111 dispatch_assume_zero(r
);
1112 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1115 dispatch_root_queue_contexts
[i
].dgq_thread_pool_size
= _dispatch_hw_config
.cc_max_active
;
1120 if (disable_wq
|| (r
= pthread_workqueue_create_np(&_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
, &pwq_attr
))) {
1122 dispatch_assume_zero(r
);
1124 // override the default FIFO behavior for the pool semaphores
1125 kr
= semaphore_create(mach_task_self(), &_dispatch_thread_mediator
[i
].dsema_port
, SYNC_POLICY_LIFO
, 0);
1126 DISPATCH_VERIFY_MIG(kr
);
1127 dispatch_assume_zero(kr
);
1128 dispatch_assume(_dispatch_thread_mediator
[i
].dsema_port
);
1130 dispatch_assume(_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
);
1134 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
1135 dispatch_assume_zero(r
);
1139 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
1141 static dispatch_once_t pred
;
1142 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1143 pthread_workitem_handle_t wh
;
1144 unsigned int gen_cnt
;
1148 if (!dq
->dq_items_tail
) {
1152 _dispatch_safe_fork
= false;
1154 dispatch_debug_queue(dq
, __PRETTY_FUNCTION__
);
1156 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
1158 if (qc
->dgq_kworkqueue
) {
1159 if (dispatch_atomic_cmpxchg(&qc
->dgq_pending
, 0, 1)) {
1160 _dispatch_debug("requesting new worker thread");
1162 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
, _dispatch_worker_thread2
, dq
, &wh
, &gen_cnt
);
1163 dispatch_assume_zero(r
);
1165 _dispatch_debug("work thread request still pending on global queue: %p", dq
);
1170 if (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
1175 t_count
= qc
->dgq_thread_pool_size
;
1177 _dispatch_debug("The thread pool is full: %p", dq
);
1180 } while (!dispatch_atomic_cmpxchg(&qc
->dgq_thread_pool_size
, t_count
, t_count
- 1));
1182 while ((r
= pthread_create(&pthr
, NULL
, _dispatch_worker_thread
, dq
))) {
1184 dispatch_assume_zero(r
);
1188 r
= pthread_detach(pthr
);
1189 dispatch_assume_zero(r
);
1196 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
)
1198 #if DISPATCH_PERF_MON
1199 uint64_t start
= mach_absolute_time();
1201 _dispatch_queue_drain(dq
);
1202 #if DISPATCH_PERF_MON
1203 _dispatch_queue_merge_stats(start
);
1205 _dispatch_force_cache_cleanup();
1208 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1211 _dispatch_queue_invoke(dispatch_queue_t dq
)
1213 dispatch_queue_t tq
= dq
->do_targetq
;
1215 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) && fastpath(_dispatch_queue_trylock(dq
))) {
1216 _dispatch_queue_drain(dq
);
1217 if (tq
== dq
->do_targetq
) {
1220 tq
= dq
->do_targetq
;
1222 // We do not need to check the result.
1223 // When the suspend-count lock is dropped, then the check will happen.
1224 dispatch_atomic_dec(&dq
->dq_running
);
1226 return _dispatch_queue_push(tq
, dq
);
1230 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1231 if (dispatch_atomic_sub(&dq
->do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
1232 if (dq
->dq_running
== 0) {
1233 _dispatch_wakeup(dq
); // verify that the queue is idle
1236 _dispatch_release(dq
); // added when the queue is put on the list
1239 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1241 _dispatch_set_target_queue2(void *ctxt
)
1243 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
1245 prev_dq
= dq
->do_targetq
;
1246 dq
->do_targetq
= ctxt
;
1247 _dispatch_release(prev_dq
);
1251 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
1253 if (slowpath(dou
._do
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1256 // NOTE: we test for NULL target queues internally to detect root queues
1257 // therefore, if the retain crashes due to a bad input, that is OK
1258 _dispatch_retain(dq
);
1259 dispatch_barrier_async_f(dou
._dq
, dq
, _dispatch_set_target_queue2
);
1263 _dispatch_async_f_redirect2(void *_ctxt
)
1265 struct dispatch_continuation_s
*dc
= _ctxt
;
1266 struct dispatch_continuation_s
*other_dc
= dc
->dc_data
[1];
1267 dispatch_queue_t old_dq
, dq
= dc
->dc_data
[0];
1269 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1270 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1271 _dispatch_continuation_pop(other_dc
);
1272 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1274 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
1275 _dispatch_wakeup(dq
);
1277 _dispatch_release(dq
);
1281 _dispatch_async_f_redirect(dispatch_queue_t dq
, struct dispatch_object_s
*other_dc
)
1283 dispatch_continuation_t dc
= (void *)other_dc
;
1284 dispatch_queue_t root_dq
= dq
;
1286 if (dc
->dc_func
== _dispatch_sync_f_slow2
) {
1287 return dc
->dc_func(dc
->dc_ctxt
);
1290 dispatch_atomic_add(&dq
->dq_running
, 2);
1291 _dispatch_retain(dq
);
1293 dc
= _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1295 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1296 dc
->dc_func
= _dispatch_async_f_redirect2
;
1298 dc
->dc_data
[0] = dq
;
1299 dc
->dc_data
[1] = other_dc
;
1302 root_dq
= root_dq
->do_targetq
;
1303 } while (root_dq
->do_targetq
);
1305 _dispatch_queue_push(root_dq
, dc
);
1310 _dispatch_queue_drain(dispatch_queue_t dq
)
1312 dispatch_queue_t orig_tq
, old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1313 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
1315 orig_tq
= dq
->do_targetq
;
1317 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1319 while (dq
->dq_items_tail
) {
1320 while (!fastpath(dq
->dq_items_head
)) {
1321 _dispatch_hardware_pause();
1324 dc
= dq
->dq_items_head
;
1325 dq
->dq_items_head
= NULL
;
1328 // Enqueue is TIGHTLY controlled, we won't wait long.
1330 next_dc
= fastpath(dc
->do_next
);
1331 } while (!next_dc
&& !dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, dc
, NULL
));
1332 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
1335 if (dq
->dq_running
> dq
->dq_width
) {
1338 if (orig_tq
!= dq
->do_targetq
) {
1341 if (fastpath(dq
->dq_width
== 1)) {
1342 _dispatch_continuation_pop(dc
);
1343 _dispatch_workitem_inc();
1344 } else if ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
1345 if (dq
->dq_running
> 1) {
1348 _dispatch_continuation_pop(dc
);
1349 _dispatch_workitem_inc();
1351 _dispatch_async_f_redirect(dq
, dc
);
1353 } while ((dc
= next_dc
));
1357 // if this is not a complete drain, we must undo some things
1359 // 'dc' must NOT be "popped"
1360 // 'dc' might be the last item
1361 if (next_dc
|| dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, NULL
, dc
)) {
1362 dq
->dq_items_head
= dc
;
1364 while (!(next_dc
= dq
->dq_items_head
)) {
1365 _dispatch_hardware_pause();
1367 dq
->dq_items_head
= dc
;
1368 dc
->do_next
= next_dc
;
1372 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1375 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1377 _dispatch_worker_thread(void *context
)
1379 dispatch_queue_t dq
= context
;
1380 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1384 // workaround tweaks the kernel workqueue does for us
1385 r
= sigfillset(&mask
);
1386 dispatch_assume_zero(r
);
1387 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
1388 dispatch_assume_zero(r
);
1391 _dispatch_worker_thread2(context
);
1392 // we use 65 seconds in case there are any timers that run once a minute
1393 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
, dispatch_time(0, 65ull * NSEC_PER_SEC
)) == 0);
1395 dispatch_atomic_inc(&qc
->dgq_thread_pool_size
);
1396 if (dq
->dq_items_tail
) {
1397 _dispatch_queue_wakeup_global(dq
);
1403 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1405 _dispatch_worker_thread2(void *context
)
1407 struct dispatch_object_s
*item
;
1408 dispatch_queue_t dq
= context
;
1409 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1411 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
1412 DISPATCH_CRASH("Premature thread recycling");
1415 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1416 qc
->dgq_pending
= 0;
1418 #if DISPATCH_COCOA_COMPAT
1419 // ensure that high-level memory management techniques do not leak/crash
1420 dispatch_begin_thread_4GC();
1421 void *pool
= _dispatch_begin_NSAutoReleasePool();
1424 #if DISPATCH_PERF_MON
1425 uint64_t start
= mach_absolute_time();
1427 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
1428 _dispatch_continuation_pop(item
);
1430 #if DISPATCH_PERF_MON
1431 _dispatch_queue_merge_stats(start
);
1434 #if DISPATCH_COCOA_COMPAT
1435 _dispatch_end_NSAutoReleasePool(pool
);
1436 dispatch_end_thread_4GC();
1439 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
1441 _dispatch_force_cache_cleanup();
1444 #if DISPATCH_PERF_MON
1446 _dispatch_queue_merge_stats(uint64_t start
)
1448 uint64_t avg
, delta
= mach_absolute_time() - start
;
1449 unsigned long count
, bucket
;
1451 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1452 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1455 avg
= delta
/ count
;
1456 bucket
= flsll(avg
);
1461 // 64-bit counters on 32-bit require a lock or a queue
1462 OSSpinLockLock(&_dispatch_stats_lock
);
1464 _dispatch_stats
[bucket
].time_total
+= delta
;
1465 _dispatch_stats
[bucket
].count_total
+= count
;
1466 _dispatch_stats
[bucket
].thread_total
++;
1468 OSSpinLockUnlock(&_dispatch_stats_lock
);
1473 dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1475 return snprintf(buf
, bufsiz
, "parent = %p ", dq
->do_targetq
);
1479 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1482 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ", dq
->dq_label
, dq
);
1483 offset
+= dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1484 offset
+= dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1485 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "}");
1491 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1493 dispatch_debug(dq
, "%s", str
);
1495 _dispatch_log("queue[NULL]: %s", str
);
1500 #if DISPATCH_COCOA_COMPAT
1502 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg
__attribute__((unused
)))
1504 if (main_q_is_draining
) {
1507 _dispatch_queue_set_mainq_drain_state(true);
1508 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q
);
1509 _dispatch_queue_set_mainq_drain_state(false);
1513 _dispatch_get_main_queue_port_4CF(void)
1515 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1521 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr
)
1523 dispatch_queue_attr_set_finalizer(attr
, NULL
);
1524 _dispatch_dispose(attr
);
1527 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable
= {
1528 .do_type
= DISPATCH_QUEUE_ATTR_TYPE
,
1529 .do_kind
= "queue-attr",
1530 .do_dispose
= dispatch_queue_attr_dispose
,
1533 dispatch_queue_attr_t
1534 dispatch_queue_attr_create(void)
1536 dispatch_queue_attr_t a
= calloc(1, sizeof(struct dispatch_queue_attr_s
));
1539 a
->do_vtable
= &dispatch_queue_attr_vtable
;
1540 a
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1543 a
->do_targetq
= _dispatch_get_root_queue(0, 0);
1544 a
->qa_flags
= DISPATCH_QUEUE_OVERCOMMIT
;
1550 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr
, uint64_t flags
)
1552 dispatch_assert_zero(flags
& ~DISPATCH_QUEUE_FLAGS_MASK
);
1553 attr
->qa_flags
= (unsigned long)flags
& DISPATCH_QUEUE_FLAGS_MASK
;
1557 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr
, int priority
)
1559 dispatch_debug_assert(attr
, "NULL pointer");
1560 dispatch_debug_assert(priority
<= DISPATCH_QUEUE_PRIORITY_HIGH
&& priority
>= DISPATCH_QUEUE_PRIORITY_LOW
, "Invalid priority");
1563 priority
= DISPATCH_QUEUE_PRIORITY_HIGH
;
1564 } else if (priority
< 0) {
1565 priority
= DISPATCH_QUEUE_PRIORITY_LOW
;
1568 attr
->qa_priority
= priority
;
1572 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr
,
1573 void *context
, dispatch_queue_finalizer_function_t finalizer
)
1576 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1577 Block_release(attr
->finalizer_ctxt
);
1580 attr
->finalizer_ctxt
= context
;
1581 attr
->finalizer_func
= finalizer
;
1586 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr
,
1587 dispatch_queue_finalizer_t finalizer
)
1590 dispatch_queue_finalizer_function_t func
;
1593 if (!(ctxt
= Block_copy(finalizer
))) {
1596 func
= (void *)_dispatch_call_block_and_release2
;
1602 dispatch_queue_attr_set_finalizer_f(attr
, ctxt
, func
);
1609 _dispatch_ccache_init(void *context
__attribute__((unused
)))
1611 _dispatch_ccache_zone
= malloc_create_zone(0, 0);
1612 dispatch_assert(_dispatch_ccache_zone
);
1613 malloc_set_zone_name(_dispatch_ccache_zone
, "DispatchContinuations");
1616 dispatch_continuation_t
1617 _dispatch_continuation_alloc_from_heap(void)
1619 static dispatch_once_t pred
;
1620 dispatch_continuation_t dc
;
1622 dispatch_once_f(&pred
, NULL
, _dispatch_ccache_init
);
1624 while (!(dc
= fastpath(malloc_zone_calloc(_dispatch_ccache_zone
, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc
)))))) {
1632 _dispatch_force_cache_cleanup(void)
1634 dispatch_continuation_t dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1636 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1637 _dispatch_cache_cleanup2(dc
);
1643 _dispatch_cache_cleanup2(void *value
)
1645 dispatch_continuation_t dc
, next_dc
= value
;
1647 while ((dc
= next_dc
)) {
1648 next_dc
= dc
->do_next
;
1649 malloc_zone_free(_dispatch_ccache_zone
, dc
);
1653 static char _dispatch_build
[16];
1656 _dispatch_bug_init(void *context
__attribute__((unused
)))
1658 int mib
[] = { CTL_KERN
, KERN_OSVERSION
};
1659 size_t bufsz
= sizeof(_dispatch_build
);
1661 sysctl(mib
, 2, _dispatch_build
, &bufsz
, NULL
, 0);
1665 _dispatch_bug(size_t line
, long val
)
1667 static dispatch_once_t pred
;
1668 static void *last_seen
;
1669 void *ra
= __builtin_return_address(0);
1671 dispatch_once_f(&pred
, NULL
, _dispatch_bug_init
);
1672 if (last_seen
!= ra
) {
1674 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build
, line
, val
);
1679 _dispatch_abort(size_t line
, long val
)
1681 _dispatch_bug(line
, val
);
1686 _dispatch_log(const char *msg
, ...)
1692 _dispatch_logv(msg
, ap
);
1698 _dispatch_logv(const char *msg
, va_list ap
)
1701 static FILE *logfile
, *tmp
;
1702 char newbuf
[strlen(msg
) + 2];
1703 char path
[PATH_MAX
];
1705 sprintf(newbuf
, "%s\n", msg
);
1708 snprintf(path
, sizeof(path
), "/var/tmp/libdispatch.%d.log", getpid());
1709 tmp
= fopen(path
, "a");
1711 if (!dispatch_atomic_cmpxchg(&logfile
, NULL
, tmp
)) {
1715 gettimeofday(&tv
, NULL
);
1716 fprintf(logfile
, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1717 getprogname() ?: "", getpid(), tv
.tv_sec
, tv
.tv_usec
);
1720 vfprintf(logfile
, newbuf
, ap
);
1723 vsyslog(LOG_NOTICE
, msg
, ap
);
1728 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
1732 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1734 r
= sigdelset(set
, SIGILL
);
1735 dispatch_assume_zero(r
);
1736 r
= sigdelset(set
, SIGTRAP
);
1737 dispatch_assume_zero(r
);
1738 r
= sigdelset(set
, SIGEMT
);
1739 dispatch_assume_zero(r
);
1740 r
= sigdelset(set
, SIGFPE
);
1741 dispatch_assume_zero(r
);
1742 r
= sigdelset(set
, SIGBUS
);
1743 dispatch_assume_zero(r
);
1744 r
= sigdelset(set
, SIGSEGV
);
1745 dispatch_assume_zero(r
);
1746 r
= sigdelset(set
, SIGSYS
);
1747 dispatch_assume_zero(r
);
1748 r
= sigdelset(set
, SIGPIPE
);
1749 dispatch_assume_zero(r
);
1751 return pthread_sigmask(how
, set
, oset
);
1754 bool _dispatch_safe_fork
= true;
1757 dispatch_atfork_prepare(void)
1762 dispatch_atfork_parent(void)
1767 dispatch_atfork_child(void)
1769 void *crash
= (void *)0x100;
1772 if (_dispatch_safe_fork
) {
1776 _dispatch_main_q
.dq_items_head
= crash
;
1777 _dispatch_main_q
.dq_items_tail
= crash
;
1779 _dispatch_mgr_q
.dq_items_head
= crash
;
1780 _dispatch_mgr_q
.dq_items_tail
= crash
;
1782 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1783 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1784 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1789 dispatch_init_pthread(pthread_t pthr
__attribute__((unused
)))
1793 static int _dispatch_kq
;
1796 _dispatch_get_kq_init(void *context
__attribute__((unused
)))
1798 static const struct kevent kev
= {
1800 .filter
= EVFILT_USER
,
1801 .flags
= EV_ADD
|EV_CLEAR
,
1804 _dispatch_kq
= kqueue();
1805 _dispatch_safe_fork
= false;
1806 // in case we fall back to select()
1807 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
1809 if (_dispatch_kq
== -1) {
1810 dispatch_assert_zero(errno
);
1813 dispatch_assume_zero(kevent(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
));
1815 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
1819 _dispatch_get_kq(void)
1821 static dispatch_once_t pred
;
1823 dispatch_once_f(&pred
, NULL
, _dispatch_get_kq_init
);
1825 return _dispatch_kq
;
1829 _dispatch_mgr_thread2(struct kevent
*kev
, size_t cnt
)
1833 for (i
= 0; i
< cnt
; i
++) {
1834 // EVFILT_USER isn't used by sources
1835 if (kev
[i
].filter
== EVFILT_USER
) {
1836 // If _dispatch_mgr_thread2() ever is changed to return to the
1837 // caller, then this should become _dispatch_queue_drain()
1838 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q
);
1840 _dispatch_source_drain_kevent(&kev
[i
]);
1845 static dispatch_queue_t
1846 _dispatch_mgr_invoke(dispatch_queue_t dq
)
1848 static const struct timespec timeout_immediately
= { 0, 0 };
1849 struct timespec timeout
;
1850 const struct timespec
*timeoutp
;
1851 struct timeval sel_timeout
, *sel_timeoutp
;
1852 fd_set tmp_rfds
, tmp_wfds
;
1853 struct kevent kev
[1];
1854 int k_cnt
, k_err
, i
, r
;
1856 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1859 _dispatch_run_timers();
1861 timeoutp
= _dispatch_get_next_timer_fire(&timeout
);
1863 if (_dispatch_select_workaround
) {
1864 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
1865 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
1867 sel_timeout
.tv_sec
= timeoutp
->tv_sec
;
1868 sel_timeout
.tv_usec
= (typeof(sel_timeout
.tv_usec
))(timeoutp
->tv_nsec
/ 1000u);
1869 sel_timeoutp
= &sel_timeout
;
1871 sel_timeoutp
= NULL
;
1874 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
, sel_timeoutp
);
1876 if (errno
!= EBADF
) {
1877 dispatch_assume_zero(errno
);
1880 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1881 if (i
== _dispatch_kq
) {
1884 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
, &_dispatch_wfds
)) {
1891 FD_CLR(i
, &_dispatch_rfds
);
1892 FD_CLR(i
, &_dispatch_wfds
);
1893 _dispatch_rfd_ptrs
[i
] = 0;
1894 _dispatch_wfd_ptrs
[i
] = 0;
1901 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1902 if (i
== _dispatch_kq
) {
1905 if (FD_ISSET(i
, &tmp_rfds
)) {
1906 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISABLE
1907 EV_SET(&kev
[0], i
, EVFILT_READ
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_rfd_ptrs
[i
]);
1908 _dispatch_rfd_ptrs
[i
] = 0;
1909 _dispatch_mgr_thread2(kev
, 1);
1911 if (FD_ISSET(i
, &tmp_wfds
)) {
1912 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISABLE
1913 EV_SET(&kev
[0], i
, EVFILT_WRITE
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_wfd_ptrs
[i
]);
1914 _dispatch_wfd_ptrs
[i
] = 0;
1915 _dispatch_mgr_thread2(kev
, 1);
1920 timeoutp
= &timeout_immediately
;
1923 k_cnt
= kevent(_dispatch_kq
, NULL
, 0, kev
, sizeof(kev
) / sizeof(kev
[0]), timeoutp
);
1928 if (k_err
== EBADF
) {
1929 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1931 dispatch_assume_zero(k_err
);
1934 _dispatch_mgr_thread2(kev
, (size_t)k_cnt
);
1937 _dispatch_force_cache_cleanup();
1946 _dispatch_mgr_wakeup(dispatch_queue_t dq
)
1948 static const struct kevent kev
= {
1950 .filter
= EVFILT_USER
,
1952 .flags
= EV_TRIGGER
,
1955 .fflags
= NOTE_TRIGGER
,
1959 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq
);
1961 _dispatch_update_kq(&kev
);
1967 _dispatch_update_kq(const struct kevent
*kev
)
1969 struct kevent kev_copy
= *kev
;
1970 kev_copy
.flags
|= EV_RECEIPT
;
1972 if (kev_copy
.flags
& EV_DELETE
) {
1973 switch (kev_copy
.filter
) {
1975 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_rfds
)) {
1976 FD_CLR((int)kev_copy
.ident
, &_dispatch_rfds
);
1977 _dispatch_rfd_ptrs
[kev_copy
.ident
] = 0;
1981 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_wfds
)) {
1982 FD_CLR((int)kev_copy
.ident
, &_dispatch_wfds
);
1983 _dispatch_wfd_ptrs
[kev_copy
.ident
] = 0;
1991 int rval
= kevent(_dispatch_get_kq(), &kev_copy
, 1, &kev_copy
, 1, NULL
);
1993 // If we fail to register with kevents, for other reasons aside from
1994 // changelist elements.
1995 dispatch_assume_zero(errno
);
1996 //kev_copy.flags |= EV_ERROR;
1997 //kev_copy.data = error;
2001 // The following select workaround only applies to adding kevents
2002 if (!(kev
->flags
& EV_ADD
)) {
2006 switch (kev_copy
.data
) {
2012 // If an error occurred while registering with kevent, and it was
2013 // because of a kevent changelist processing && the kevent involved
2014 // either doing a read or write, it would indicate we were trying
2015 // to register a /dev/* port; fall back to select
2016 switch (kev_copy
.filter
) {
2018 _dispatch_select_workaround
= true;
2019 FD_SET((int)kev_copy
.ident
, &_dispatch_rfds
);
2020 _dispatch_rfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2023 _dispatch_select_workaround
= true;
2024 FD_SET((int)kev_copy
.ident
, &_dispatch_wfds
);
2025 _dispatch_wfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2028 _dispatch_source_drain_kevent(&kev_copy
);
2035 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable
= {
2036 .do_type
= DISPATCH_QUEUE_MGR_TYPE
,
2037 .do_kind
= "mgr-queue",
2038 .do_invoke
= _dispatch_mgr_invoke
,
2039 .do_debug
= dispatch_queue_debug
,
2040 .do_probe
= _dispatch_mgr_wakeup
,
2043 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2044 struct dispatch_queue_s _dispatch_mgr_q
= {
2045 .do_vtable
= &_dispatch_queue_mgr_vtable
,
2046 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2047 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2048 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
2049 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
- 1],
2051 .dq_label
= "com.apple.libdispatch-manager",
2056 const struct dispatch_queue_offsets_s dispatch_queue_offsets
= {
2058 .dqo_label
= offsetof(struct dispatch_queue_s
, dq_label
),
2059 .dqo_label_size
= sizeof(_dispatch_main_q
.dq_label
),
2061 .dqo_flags_size
= 0,
2062 .dqo_width
= offsetof(struct dispatch_queue_s
, dq_width
),
2063 .dqo_width_size
= sizeof(_dispatch_main_q
.dq_width
),
2064 .dqo_serialnum
= offsetof(struct dispatch_queue_s
, dq_serialnum
),
2065 .dqo_serialnum_size
= sizeof(_dispatch_main_q
.dq_serialnum
),
2066 .dqo_running
= offsetof(struct dispatch_queue_s
, dq_running
),
2067 .dqo_running_size
= sizeof(_dispatch_main_q
.dq_running
),
2072 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
, dispatch_block_t work
)
2074 // test before the copy of the block
2075 if (when
== DISPATCH_TIME_FOREVER
) {
2077 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2081 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
2087 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
, void (*func
)(void *))
2090 if (when
== DISPATCH_TIME_FOREVER
) {
2092 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2097 // this function can and should be optimized to not use a dispatch source
2099 delta
= _dispatch_timeout(when
);
2101 return dispatch_async_f(queue
, ctxt
, func
);
2103 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL
, delta
, 0, NULL
, queue
, ^(dispatch_source_t ds
) {
2104 long err_dom
, err_val
;
2105 if ((err_dom
= dispatch_source_get_error(ds
, &err_val
))) {
2106 dispatch_assert(err_dom
== DISPATCH_ERROR_DOMAIN_POSIX
);
2107 dispatch_assert(err_val
== ECANCELED
);
2109 dispatch_release(ds
); // MUST NOT be _dispatch_release()
2111 dispatch_source_cancel(ds
);