2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
30 dummy_function_r0(void)
35 static bool _dispatch_select_workaround
;
36 static fd_set _dispatch_rfds
;
37 static fd_set _dispatch_wfds
;
38 static void *_dispatch_rfd_ptrs
[FD_SETSIZE
];
39 static void *_dispatch_wfd_ptrs
[FD_SETSIZE
];
42 static struct dispatch_semaphore_s _dispatch_thread_mediator
[] = {
44 .do_vtable
= &_dispatch_semaphore_vtable
,
45 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
46 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
49 .do_vtable
= &_dispatch_semaphore_vtable
,
50 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
51 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
54 .do_vtable
= &_dispatch_semaphore_vtable
,
55 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
56 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
59 .do_vtable
= &_dispatch_semaphore_vtable
,
60 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
61 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
64 .do_vtable
= &_dispatch_semaphore_vtable
,
65 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
66 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
69 .do_vtable
= &_dispatch_semaphore_vtable
,
70 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
71 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
75 static struct dispatch_queue_s _dispatch_root_queues
[];
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority
, bool overcommit
)
80 if (overcommit
) switch (priority
) {
81 case DISPATCH_QUEUE_PRIORITY_LOW
:
82 return &_dispatch_root_queues
[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
84 return &_dispatch_root_queues
[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH
:
86 return &_dispatch_root_queues
[5];
89 case DISPATCH_QUEUE_PRIORITY_LOW
:
90 return &_dispatch_root_queues
[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT
:
92 return &_dispatch_root_queues
[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH
:
94 return &_dispatch_root_queues
[4];
102 _dispatch_Block_copy(dispatch_block_t db
)
104 dispatch_block_t rval
;
106 while (!(rval
= Block_copy(db
))) {
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
115 _dispatch_call_block_and_release(void *block
)
117 void (^b
)(void) = block
;
123 _dispatch_call_block_and_release2(void *block
, void *ctxt
)
125 void (^b
)(void*) = block
;
130 #endif /* __BLOCKS__ */
132 struct dispatch_queue_attr_vtable_s
{
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s
);
136 struct dispatch_queue_attr_s
{
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s
, dispatch_queue_attr_vtable_s
);
141 void* finalizer_ctxt
;
142 dispatch_queue_finalizer_function_t finalizer_func
;
145 unsigned long qa_flags
;
148 static int _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
);
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq
);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq
);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq
);
155 static struct dispatch_object_s
*_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
);
157 static bool _dispatch_program_is_probably_callback_driven
;
159 #if DISPATCH_COCOA_COMPAT
160 // dispatch_begin_thread_4GC having non-default value triggers GC-only slow paths and
161 // is checked frequently, testing against NULL is faster than comparing for equality
162 // with "dummy_function"
163 void (*dispatch_begin_thread_4GC
)(void) = NULL
;
164 void (*dispatch_end_thread_4GC
)(void) = dummy_function
;
165 void *(*_dispatch_begin_NSAutoReleasePool
)(void) = (void *)dummy_function
;
166 void (*_dispatch_end_NSAutoReleasePool
)(void *) = (void *)dummy_function
;
167 static void _dispatch_queue_wakeup_main(void);
169 static dispatch_once_t _dispatch_main_q_port_pred
;
170 static bool main_q_is_draining
;
171 static mach_port_t main_q_port
;
174 static void _dispatch_cache_cleanup2(void *value
);
175 static void _dispatch_force_cache_cleanup(void);
177 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable
= {
178 .do_type
= DISPATCH_QUEUE_TYPE
,
180 .do_dispose
= _dispatch_queue_dispose
,
181 .do_invoke
= (void *)dummy_function_r0
,
182 .do_probe
= (void *)dummy_function_r0
,
183 .do_debug
= dispatch_queue_debug
,
186 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable
= {
187 .do_type
= DISPATCH_QUEUE_GLOBAL_TYPE
,
188 .do_kind
= "global-queue",
189 .do_debug
= dispatch_queue_debug
,
190 .do_probe
= _dispatch_queue_wakeup_global
,
193 #define MAX_THREAD_COUNT 255
195 struct dispatch_root_queue_context_s
{
196 pthread_workqueue_t dgq_kworkqueue
;
197 uint32_t dgq_pending
;
198 uint32_t dgq_thread_pool_size
;
199 dispatch_semaphore_t dgq_thread_mediator
;
202 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
203 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts
[] = {
205 .dgq_thread_mediator
= &_dispatch_thread_mediator
[0],
206 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
209 .dgq_thread_mediator
= &_dispatch_thread_mediator
[1],
210 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
213 .dgq_thread_mediator
= &_dispatch_thread_mediator
[2],
214 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
217 .dgq_thread_mediator
= &_dispatch_thread_mediator
[3],
218 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
221 .dgq_thread_mediator
= &_dispatch_thread_mediator
[4],
222 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
225 .dgq_thread_mediator
= &_dispatch_thread_mediator
[5],
226 .dgq_thread_pool_size
= MAX_THREAD_COUNT
,
230 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
231 // dq_running is set to 2 so that barrier operations go through the slow path
232 static struct dispatch_queue_s _dispatch_root_queues
[] = {
234 .do_vtable
= &_dispatch_queue_root_vtable
,
235 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
236 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
237 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
238 .do_ctxt
= &_dispatch_root_queue_contexts
[0],
240 .dq_label
= "com.apple.root.low-priority",
242 .dq_width
= UINT32_MAX
,
246 .do_vtable
= &_dispatch_queue_root_vtable
,
247 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
248 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
249 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
250 .do_ctxt
= &_dispatch_root_queue_contexts
[1],
252 .dq_label
= "com.apple.root.low-overcommit-priority",
254 .dq_width
= UINT32_MAX
,
258 .do_vtable
= &_dispatch_queue_root_vtable
,
259 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
260 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
261 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
262 .do_ctxt
= &_dispatch_root_queue_contexts
[2],
264 .dq_label
= "com.apple.root.default-priority",
266 .dq_width
= UINT32_MAX
,
270 .do_vtable
= &_dispatch_queue_root_vtable
,
271 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
272 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
273 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
274 .do_ctxt
= &_dispatch_root_queue_contexts
[3],
276 .dq_label
= "com.apple.root.default-overcommit-priority",
278 .dq_width
= UINT32_MAX
,
282 .do_vtable
= &_dispatch_queue_root_vtable
,
283 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
284 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
285 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
286 .do_ctxt
= &_dispatch_root_queue_contexts
[4],
288 .dq_label
= "com.apple.root.high-priority",
290 .dq_width
= UINT32_MAX
,
294 .do_vtable
= &_dispatch_queue_root_vtable
,
295 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
296 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
297 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
298 .do_ctxt
= &_dispatch_root_queue_contexts
[5],
300 .dq_label
= "com.apple.root.high-overcommit-priority",
302 .dq_width
= UINT32_MAX
,
307 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
308 struct dispatch_queue_s _dispatch_main_q
= {
309 .do_vtable
= &_dispatch_queue_vtable
,
310 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
311 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
312 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
313 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
/ 2],
315 .dq_label
= "com.apple.main-thread",
321 #if DISPATCH_PERF_MON
322 static OSSpinLock _dispatch_stats_lock
;
323 static size_t _dispatch_bad_ratio
;
326 uint64_t count_total
;
327 uint64_t thread_total
;
328 } _dispatch_stats
[65]; // ffs*/fls*() returns zero when no bits are set
329 static void _dispatch_queue_merge_stats(uint64_t start
);
332 static void *_dispatch_worker_thread(void *context
);
333 static void _dispatch_worker_thread2(void *context
);
335 malloc_zone_t
*_dispatch_ccache_zone
;
338 _dispatch_continuation_free(dispatch_continuation_t dc
)
340 dispatch_continuation_t prev_dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
341 dc
->do_next
= prev_dc
;
342 _dispatch_thread_setspecific(dispatch_cache_key
, dc
);
346 _dispatch_continuation_pop(dispatch_object_t dou
)
348 dispatch_continuation_t dc
= dou
._dc
;
351 if (DISPATCH_OBJ_IS_VTABLE(dou
._do
)) {
352 return _dispatch_queue_invoke(dou
._dq
);
355 // Add the item back to the cache before calling the function. This
356 // allows the 'hot' continuation to be used for a quick callback.
358 // The ccache version is per-thread.
359 // Therefore, the object has not been reused yet.
360 // This generates better assembly.
361 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_ASYNC_BIT
) {
362 _dispatch_continuation_free(dc
);
364 if ((long)dou
._do
->do_vtable
& DISPATCH_OBJ_GROUP_BIT
) {
369 dc
->dc_func(dc
->dc_ctxt
);
371 dispatch_group_leave(dg
);
372 _dispatch_release(dg
);
376 struct dispatch_object_s
*
377 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq
)
379 struct dispatch_object_s
*head
, *next
, *const mediator
= (void *)~0ul;
381 // The mediator value acts both as a "lock" and a signal
382 head
= dispatch_atomic_xchg(&dq
->dq_items_head
, mediator
);
384 if (slowpath(head
== NULL
)) {
385 // The first xchg on the tail will tell the enqueueing thread that it
386 // is safe to blindly write out to the head pointer. A cmpxchg honors
388 dispatch_atomic_cmpxchg(&dq
->dq_items_head
, mediator
, NULL
);
389 _dispatch_debug("no work on global work queue");
393 if (slowpath(head
== mediator
)) {
394 // This thread lost the race for ownership of the queue.
396 // The ratio of work to libdispatch overhead must be bad. This
397 // scenario implies that there are too many threads in the pool.
398 // Create a new pending thread and then exit this thread.
399 // The kernel will grant a new thread when the load subsides.
400 _dispatch_debug("Contention on queue: %p", dq
);
401 _dispatch_queue_wakeup_global(dq
);
402 #if DISPATCH_PERF_MON
403 dispatch_atomic_inc(&_dispatch_bad_ratio
);
408 // Restore the head pointer to a sane value before returning.
409 // If 'next' is NULL, then this item _might_ be the last item.
410 next
= fastpath(head
->do_next
);
412 if (slowpath(!next
)) {
413 dq
->dq_items_head
= NULL
;
415 if (dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, head
, NULL
)) {
416 // both head and tail are NULL now
420 // There must be a next item now. This thread won't wait long.
421 while (!(next
= head
->do_next
)) {
422 _dispatch_hardware_pause();
426 dq
->dq_items_head
= next
;
427 _dispatch_queue_wakeup_global(dq
);
433 dispatch_get_current_queue(void)
435 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
438 #undef dispatch_get_main_queue
439 __OSX_AVAILABLE_STARTING(__MAC_10_6
,__IPHONE_NA
)
440 dispatch_queue_t
dispatch_get_main_queue(void);
443 dispatch_get_main_queue(void)
445 return &_dispatch_main_q
;
447 #define dispatch_get_main_queue() (&_dispatch_main_q)
449 struct _dispatch_hw_config_s _dispatch_hw_config
;
452 _dispatch_queue_set_width_init(void)
454 size_t valsz
= sizeof(uint32_t);
457 sysctlbyname("hw.activecpu", &_dispatch_hw_config
.cc_max_active
, &valsz
, NULL
, 0);
458 dispatch_assume_zero(errno
);
459 dispatch_assume(valsz
== sizeof(uint32_t));
462 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config
.cc_max_logical
, &valsz
, NULL
, 0);
463 dispatch_assume_zero(errno
);
464 dispatch_assume(valsz
== sizeof(uint32_t));
467 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config
.cc_max_physical
, &valsz
, NULL
, 0);
468 dispatch_assume_zero(errno
);
469 dispatch_assume(valsz
== sizeof(uint32_t));
473 dispatch_queue_set_width(dispatch_queue_t dq
, long width
)
475 int w
= (int)width
; // intentional truncation
478 if (slowpath(dq
->do_ref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
481 if (w
== 1 || w
== 0) {
488 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS
:
489 tmp
= _dispatch_hw_config
.cc_max_physical
;
491 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS
:
492 tmp
= _dispatch_hw_config
.cc_max_active
;
496 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS
:
497 tmp
= _dispatch_hw_config
.cc_max_logical
;
500 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
501 dq
->dq_width
= tmp
* 2;
503 // XXX if the queue has items and the width is increased, we should try to wake the queue
510 // 4,5,6,7,8,9 - global queues
511 // we use 'xadd' on Intel, so the initial value == next assigned
512 static unsigned long _dispatch_queue_serial_numbers
= 10;
514 // Note to later developers: ensure that any initialization changes are
515 // made for statically allocated queues (i.e. _dispatch_main_q).
517 _dispatch_queue_init(dispatch_queue_t dq
)
519 dq
->do_vtable
= &_dispatch_queue_vtable
;
520 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
523 dq
->do_targetq
= _dispatch_get_root_queue(0, true);
526 dq
->dq_serialnum
= dispatch_atomic_inc(&_dispatch_queue_serial_numbers
) - 1;
530 dispatch_queue_create(const char *label
, dispatch_queue_attr_t attr
)
539 label_len
= strlen(label
);
540 if (label_len
< (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1)) {
541 label_len
= (DISPATCH_QUEUE_MIN_LABEL_SIZE
- 1);
544 // XXX switch to malloc()
545 dq
= calloc(1ul, sizeof(struct dispatch_queue_s
) - DISPATCH_QUEUE_MIN_LABEL_SIZE
+ label_len
+ 1);
550 _dispatch_queue_init(dq
);
551 strcpy(dq
->dq_label
, label
);
553 #ifndef DISPATCH_NO_LEGACY
554 if (slowpath(attr
)) {
555 dq
->do_targetq
= _dispatch_get_root_queue(attr
->qa_priority
, attr
->qa_flags
& DISPATCH_QUEUE_OVERCOMMIT
);
556 dq
->dq_finalizer_ctxt
= attr
->finalizer_ctxt
;
557 dq
->dq_finalizer_func
= attr
->finalizer_func
;
559 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
560 // if finalizer_ctxt is a Block, retain it.
561 dq
->dq_finalizer_ctxt
= Block_copy(dq
->dq_finalizer_ctxt
);
562 if (!(dq
->dq_finalizer_ctxt
)) {
577 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
579 _dispatch_queue_dispose(dispatch_queue_t dq
)
581 if (slowpath(dq
== _dispatch_queue_get_current())) {
582 DISPATCH_CRASH("Release of a queue by itself");
584 if (slowpath(dq
->dq_items_tail
)) {
585 DISPATCH_CRASH("Release of a queue while items are enqueued");
588 #ifndef DISPATCH_NO_LEGACY
589 if (dq
->dq_finalizer_func
) {
590 dq
->dq_finalizer_func(dq
->dq_finalizer_ctxt
, dq
);
594 // trash the tail queue so that use after free will crash
595 dq
->dq_items_tail
= (void *)0x200;
597 _dispatch_dispose(dq
);
602 _dispatch_queue_push_list_slow(dispatch_queue_t dq
, struct dispatch_object_s
*obj
)
604 // The queue must be retained before dq_items_head is written in order
605 // to ensure that the reference is still valid when _dispatch_wakeup is
606 // called. Otherwise, if preempted between the assignment to
607 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
608 // queue may release the last reference to the queue when invoked by
609 // _dispatch_queue_drain. <rdar://problem/6932776>
610 _dispatch_retain(dq
);
611 dq
->dq_items_head
= obj
;
612 _dispatch_wakeup(dq
);
613 _dispatch_release(dq
);
618 _dispatch_barrier_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
620 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
622 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
624 dc
->dc_ctxt
= context
;
626 _dispatch_queue_push(dq
, dc
);
631 dispatch_barrier_async(dispatch_queue_t dq
, void (^work
)(void))
633 dispatch_barrier_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
639 dispatch_barrier_async_f(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
641 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
644 return _dispatch_barrier_async_f_slow(dq
, context
, func
);
647 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
649 dc
->dc_ctxt
= context
;
651 _dispatch_queue_push(dq
, dc
);
656 _dispatch_async_f_slow(dispatch_queue_t dq
, void *context
, dispatch_function_t func
)
658 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_from_heap());
660 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
662 dc
->dc_ctxt
= context
;
664 _dispatch_queue_push(dq
, dc
);
669 dispatch_async(dispatch_queue_t dq
, void (^work
)(void))
671 dispatch_async_f(dq
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
677 dispatch_async_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
679 dispatch_continuation_t dc
= fastpath(_dispatch_continuation_alloc_cacheonly());
681 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
682 // the "drain" function will do this test
685 return _dispatch_async_f_slow(dq
, ctxt
, func
);
688 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
692 _dispatch_queue_push(dq
, dc
);
695 struct dispatch_barrier_sync_slow2_s
{
696 dispatch_queue_t dbss2_dq
;
697 #if DISPATCH_COCOA_COMPAT
698 dispatch_function_t dbss2_func
;
699 dispatch_function_t dbss2_ctxt
;
701 dispatch_semaphore_t dbss2_sema
;
705 _dispatch_barrier_sync_f_slow_invoke(void *ctxt
)
707 struct dispatch_barrier_sync_slow2_s
*dbss2
= ctxt
;
709 dispatch_assert(dbss2
->dbss2_dq
== dispatch_get_current_queue());
710 #if DISPATCH_COCOA_COMPAT
711 // When the main queue is bound to the main thread
712 if (dbss2
->dbss2_dq
== &_dispatch_main_q
&& pthread_main_np()) {
713 dbss2
->dbss2_func(dbss2
->dbss2_ctxt
);
714 dbss2
->dbss2_func
= NULL
;
715 dispatch_semaphore_signal(dbss2
->dbss2_sema
);
719 dispatch_suspend(dbss2
->dbss2_dq
);
720 dispatch_semaphore_signal(dbss2
->dbss2_sema
);
725 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
728 // It's preferred to execute synchronous blocks on the current thread
729 // due to thread-local side effects, garbage collection, etc. However,
730 // blocks submitted to the main thread MUST be run on the main thread
732 struct dispatch_barrier_sync_slow2_s dbss2
= {
734 #if DISPATCH_COCOA_COMPAT
738 .dbss2_sema
= _dispatch_get_thread_semaphore(),
740 struct dispatch_barrier_sync_slow_s
{
741 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s
);
743 .do_vtable
= (void *)DISPATCH_OBJ_BARRIER_BIT
,
744 .dc_func
= _dispatch_barrier_sync_f_slow_invoke
,
748 _dispatch_queue_push(dq
, (void *)&dbss
);
749 dispatch_semaphore_wait(dbss2
.dbss2_sema
, DISPATCH_TIME_FOREVER
);
750 _dispatch_put_thread_semaphore(dbss2
.dbss2_sema
);
752 #if DISPATCH_COCOA_COMPAT
753 // Main queue bound to main thread
754 if (dbss2
.dbss2_func
== NULL
) {
758 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
759 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
761 _dispatch_workitem_inc();
762 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
767 #if DISPATCH_COCOA_COMPAT
770 _dispatch_barrier_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
772 // Blocks submitted to the main queue MUST be run on the main thread,
773 // therefore under GC we must Block_copy in order to notify the thread-local
774 // garbage collector that the objects are transferring to the main thread
775 // rdar://problem/7176237&7181849&7458685
776 if (dispatch_begin_thread_4GC
) {
777 dispatch_block_t block
= _dispatch_Block_copy(work
);
778 return dispatch_barrier_sync_f(dq
, block
, _dispatch_call_block_and_release
);
780 struct Block_basic
*bb
= (void *)work
;
781 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
786 dispatch_barrier_sync(dispatch_queue_t dq
, void (^work
)(void))
788 #if DISPATCH_COCOA_COMPAT
789 if (slowpath(dq
== &_dispatch_main_q
)) {
790 return _dispatch_barrier_sync_slow(dq
, work
);
793 struct Block_basic
*bb
= (void *)work
;
794 dispatch_barrier_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
800 dispatch_barrier_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
802 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
804 // 1) ensure that this thread hasn't enqueued anything ahead of this call
805 // 2) the queue is not suspended
806 // 3) the queue is not weird
807 if (slowpath(dq
->dq_items_tail
)
808 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))
809 || slowpath(!_dispatch_queue_trylock(dq
))) {
810 return _dispatch_barrier_sync_f_slow(dq
, ctxt
, func
);
813 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
815 _dispatch_workitem_inc();
816 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
817 _dispatch_queue_unlock(dq
);
821 _dispatch_sync_f_slow2(void *ctxt
)
823 dispatch_queue_t dq
= _dispatch_queue_get_current();
824 dispatch_atomic_add(&dq
->dq_running
, 2);
825 dispatch_semaphore_signal(ctxt
);
830 _dispatch_sync_f_slow(dispatch_queue_t dq
)
832 // the global root queues do not need strict ordering
833 if (dq
->do_targetq
== NULL
) {
834 dispatch_atomic_add(&dq
->dq_running
, 2);
838 struct dispatch_sync_slow_s
{
839 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s
);
842 .dc_func
= _dispatch_sync_f_slow2
,
843 .dc_ctxt
= _dispatch_get_thread_semaphore(),
846 // XXX FIXME -- concurrent queues can be come serial again
847 _dispatch_queue_push(dq
, (void *)&dss
);
849 dispatch_semaphore_wait(dss
.dc_ctxt
, DISPATCH_TIME_FOREVER
);
850 _dispatch_put_thread_semaphore(dss
.dc_ctxt
);
854 #if DISPATCH_COCOA_COMPAT
857 _dispatch_sync_slow(dispatch_queue_t dq
, void (^work
)(void))
859 // Blocks submitted to the main queue MUST be run on the main thread,
860 // therefore under GC we must Block_copy in order to notify the thread-local
861 // garbage collector that the objects are transferring to the main thread
862 // rdar://problem/7176237&7181849&7458685
863 if (dispatch_begin_thread_4GC
) {
864 dispatch_block_t block
= _dispatch_Block_copy(work
);
865 return dispatch_sync_f(dq
, block
, _dispatch_call_block_and_release
);
867 struct Block_basic
*bb
= (void *)work
;
868 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
873 dispatch_sync(dispatch_queue_t dq
, void (^work
)(void))
875 #if DISPATCH_COCOA_COMPAT
876 if (slowpath(dq
== &_dispatch_main_q
)) {
877 return _dispatch_sync_slow(dq
, work
);
880 struct Block_basic
*bb
= (void *)work
;
881 dispatch_sync_f(dq
, work
, (dispatch_function_t
)bb
->Block_invoke
);
887 dispatch_sync_f(dispatch_queue_t dq
, void *ctxt
, dispatch_function_t func
)
889 typeof(dq
->dq_running
) prev_cnt
;
890 dispatch_queue_t old_dq
;
892 if (dq
->dq_width
== 1) {
893 return dispatch_barrier_sync_f(dq
, ctxt
, func
);
896 // 1) ensure that this thread hasn't enqueued anything ahead of this call
897 // 2) the queue is not suspended
898 if (slowpath(dq
->dq_items_tail
) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq
))) {
899 _dispatch_sync_f_slow(dq
);
901 prev_cnt
= dispatch_atomic_add(&dq
->dq_running
, 2) - 2;
903 if (slowpath(prev_cnt
& 1)) {
904 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
905 _dispatch_wakeup(dq
);
907 _dispatch_sync_f_slow(dq
);
911 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
912 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
914 _dispatch_workitem_inc();
915 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
917 if (slowpath(dispatch_atomic_sub(&dq
->dq_running
, 2) == 0)) {
918 _dispatch_wakeup(dq
);
923 dispatch_queue_get_label(dispatch_queue_t dq
)
928 #if DISPATCH_COCOA_COMPAT
930 _dispatch_main_q_port_init(void *ctxt
__attribute__((unused
)))
934 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &main_q_port
);
935 DISPATCH_VERIFY_MIG(kr
);
936 dispatch_assume_zero(kr
);
937 kr
= mach_port_insert_right(mach_task_self(), main_q_port
, main_q_port
, MACH_MSG_TYPE_MAKE_SEND
);
938 DISPATCH_VERIFY_MIG(kr
);
939 dispatch_assume_zero(kr
);
941 _dispatch_program_is_probably_callback_driven
= true;
942 _dispatch_safe_fork
= false;
945 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
948 _dispatch_queue_set_mainq_drain_state(bool arg
)
950 main_q_is_draining
= arg
;
957 if (pthread_main_np()) {
958 _dispatch_program_is_probably_callback_driven
= true;
960 DISPATCH_CRASH("pthread_exit() returned");
962 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
966 _dispatch_sigsuspend(void *ctxt
__attribute__((unused
)))
968 static const sigset_t mask
;
977 _dispatch_queue_cleanup2(void)
979 dispatch_atomic_dec(&_dispatch_main_q
.dq_running
);
981 if (dispatch_atomic_sub(&_dispatch_main_q
.do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
982 _dispatch_wakeup(&_dispatch_main_q
);
985 // overload the "probably" variable to mean that dispatch_main() or
986 // similar non-POSIX API was called
987 // this has to run before the DISPATCH_COCOA_COMPAT below
988 if (_dispatch_program_is_probably_callback_driven
) {
989 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL
, _dispatch_sigsuspend
);
990 sleep(1); // workaround 6778970
993 #if DISPATCH_COCOA_COMPAT
994 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
996 mach_port_t mp
= main_q_port
;
1002 kr
= mach_port_deallocate(mach_task_self(), mp
);
1003 DISPATCH_VERIFY_MIG(kr
);
1004 dispatch_assume_zero(kr
);
1005 kr
= mach_port_mod_refs(mach_task_self(), mp
, MACH_PORT_RIGHT_RECEIVE
, -1);
1006 DISPATCH_VERIFY_MIG(kr
);
1007 dispatch_assume_zero(kr
);
1013 dispatch_get_concurrent_queue(long pri
)
1016 pri
= DISPATCH_QUEUE_PRIORITY_HIGH
;
1017 } else if (pri
< 0) {
1018 pri
= DISPATCH_QUEUE_PRIORITY_LOW
;
1020 return _dispatch_get_root_queue(pri
, false);
1024 _dispatch_queue_cleanup(void *ctxt
)
1026 if (ctxt
== &_dispatch_main_q
) {
1027 return _dispatch_queue_cleanup2();
1029 // POSIX defines that destructors are only called if 'ctxt' is non-null
1030 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
1034 dispatch_get_global_queue(long priority
, unsigned long flags
)
1036 if (flags
& ~DISPATCH_QUEUE_OVERCOMMIT
) {
1039 return _dispatch_get_root_queue(priority
, flags
& DISPATCH_QUEUE_OVERCOMMIT
);
1042 #define countof(x) (sizeof(x) / sizeof(x[0]))
1044 libdispatch_init(void)
1046 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT
== 3);
1047 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT
== 6);
1049 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW
== -DISPATCH_QUEUE_PRIORITY_HIGH
);
1050 dispatch_assert(countof(_dispatch_root_queues
) == DISPATCH_ROOT_QUEUE_COUNT
);
1051 dispatch_assert(countof(_dispatch_thread_mediator
) == DISPATCH_ROOT_QUEUE_COUNT
);
1052 dispatch_assert(countof(_dispatch_root_queue_contexts
) == DISPATCH_ROOT_QUEUE_COUNT
);
1054 _dispatch_thread_key_init_np(dispatch_queue_key
, _dispatch_queue_cleanup
);
1055 _dispatch_thread_key_init_np(dispatch_sema4_key
, (void (*)(void *))dispatch_release
); // use the extern release
1056 _dispatch_thread_key_init_np(dispatch_cache_key
, _dispatch_cache_cleanup2
);
1057 #if DISPATCH_PERF_MON
1058 _dispatch_thread_key_init_np(dispatch_bcounter_key
, NULL
);
1061 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_main_q
);
1063 _dispatch_queue_set_width_init();
1067 _dispatch_queue_unlock(dispatch_queue_t dq
)
1069 if (slowpath(dispatch_atomic_dec(&dq
->dq_running
))) {
1073 _dispatch_wakeup(dq
);
1076 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1078 _dispatch_wakeup(dispatch_object_t dou
)
1080 dispatch_queue_t tq
;
1082 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou
._do
))) {
1085 if (!dx_probe(dou
._do
) && !dou
._dq
->dq_items_tail
) {
1089 if (!_dispatch_trylock(dou
._do
)) {
1090 #if DISPATCH_COCOA_COMPAT
1091 if (dou
._dq
== &_dispatch_main_q
) {
1092 _dispatch_queue_wakeup_main();
1097 _dispatch_retain(dou
._do
);
1098 tq
= dou
._do
->do_targetq
;
1099 _dispatch_queue_push(tq
, dou
._do
);
1100 return tq
; // libdispatch doesn't need this, but the Instrument DTrace probe does
1103 #if DISPATCH_COCOA_COMPAT
1106 _dispatch_queue_wakeup_main(void)
1110 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1112 kr
= _dispatch_send_wakeup_main_thread(main_q_port
, 0);
1115 case MACH_SEND_TIMEOUT
:
1116 case MACH_SEND_TIMED_OUT
:
1117 case MACH_SEND_INVALID_DEST
:
1120 dispatch_assume_zero(kr
);
1124 _dispatch_safe_fork
= false;
1129 _dispatch_rootq2wq_pri(long idx
)
1131 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1135 return WORKQ_LOW_PRIOQUEUE
;
1139 return WORKQ_DEFAULT_PRIOQUEUE
;
1142 return WORKQ_HIGH_PRIOQUEUE
;
1150 _dispatch_root_queues_init(void *context
__attribute__((unused
)))
1152 bool disable_wq
= getenv("LIBDISPATCH_DISABLE_KWQ");
1153 pthread_workqueue_attr_t pwq_attr
;
1157 r
= pthread_workqueue_attr_init_np(&pwq_attr
);
1158 dispatch_assume_zero(r
);
1160 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1161 r
= pthread_workqueue_attr_setqueuepriority_np(&pwq_attr
, _dispatch_rootq2wq_pri(i
));
1162 dispatch_assume_zero(r
);
1163 r
= pthread_workqueue_attr_setovercommit_np(&pwq_attr
, i
& 1);
1164 dispatch_assume_zero(r
);
1165 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1168 dispatch_root_queue_contexts
[i
].dgq_thread_pool_size
= _dispatch_hw_config
.cc_max_active
;
1173 if (disable_wq
|| (r
= pthread_workqueue_create_np(&_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
, &pwq_attr
))) {
1175 dispatch_assume_zero(r
);
1177 // override the default FIFO behavior for the pool semaphores
1178 kr
= semaphore_create(mach_task_self(), &_dispatch_thread_mediator
[i
].dsema_port
, SYNC_POLICY_LIFO
, 0);
1179 DISPATCH_VERIFY_MIG(kr
);
1180 dispatch_assume_zero(kr
);
1181 dispatch_assume(_dispatch_thread_mediator
[i
].dsema_port
);
1183 dispatch_assume(_dispatch_root_queue_contexts
[i
].dgq_kworkqueue
);
1187 r
= pthread_workqueue_attr_destroy_np(&pwq_attr
);
1188 dispatch_assume_zero(r
);
1192 _dispatch_queue_wakeup_global(dispatch_queue_t dq
)
1194 static dispatch_once_t pred
;
1195 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1196 pthread_workitem_handle_t wh
;
1197 unsigned int gen_cnt
;
1201 if (!dq
->dq_items_tail
) {
1205 _dispatch_safe_fork
= false;
1207 dispatch_debug_queue(dq
, __PRETTY_FUNCTION__
);
1209 dispatch_once_f(&pred
, NULL
, _dispatch_root_queues_init
);
1211 if (qc
->dgq_kworkqueue
) {
1212 if (dispatch_atomic_cmpxchg(&qc
->dgq_pending
, 0, 1)) {
1213 _dispatch_debug("requesting new worker thread");
1215 r
= pthread_workqueue_additem_np(qc
->dgq_kworkqueue
, _dispatch_worker_thread2
, dq
, &wh
, &gen_cnt
);
1216 dispatch_assume_zero(r
);
1218 _dispatch_debug("work thread request still pending on global queue: %p", dq
);
1223 if (dispatch_semaphore_signal(qc
->dgq_thread_mediator
)) {
1228 t_count
= qc
->dgq_thread_pool_size
;
1230 _dispatch_debug("The thread pool is full: %p", dq
);
1233 } while (!dispatch_atomic_cmpxchg(&qc
->dgq_thread_pool_size
, t_count
, t_count
- 1));
1235 while ((r
= pthread_create(&pthr
, NULL
, _dispatch_worker_thread
, dq
))) {
1237 dispatch_assume_zero(r
);
1241 r
= pthread_detach(pthr
);
1242 dispatch_assume_zero(r
);
1249 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq
)
1251 #if DISPATCH_PERF_MON
1252 uint64_t start
= mach_absolute_time();
1254 _dispatch_queue_drain(dq
);
1255 #if DISPATCH_PERF_MON
1256 _dispatch_queue_merge_stats(start
);
1258 _dispatch_force_cache_cleanup();
1261 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1264 _dispatch_queue_invoke(dispatch_queue_t dq
)
1266 dispatch_queue_t tq
= dq
->do_targetq
;
1268 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq
)) && fastpath(_dispatch_queue_trylock(dq
))) {
1269 _dispatch_queue_drain(dq
);
1270 if (tq
== dq
->do_targetq
) {
1273 tq
= dq
->do_targetq
;
1275 // We do not need to check the result.
1276 // When the suspend-count lock is dropped, then the check will happen.
1277 dispatch_atomic_dec(&dq
->dq_running
);
1279 return _dispatch_queue_push(tq
, dq
);
1283 dq
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1284 if (dispatch_atomic_sub(&dq
->do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
) == 0) {
1285 if (dq
->dq_running
== 0) {
1286 _dispatch_wakeup(dq
); // verify that the queue is idle
1289 _dispatch_release(dq
); // added when the queue is put on the list
1292 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1294 _dispatch_set_target_queue2(void *ctxt
)
1296 dispatch_queue_t prev_dq
, dq
= _dispatch_queue_get_current();
1298 prev_dq
= dq
->do_targetq
;
1299 dq
->do_targetq
= ctxt
;
1300 _dispatch_release(prev_dq
);
1304 dispatch_set_target_queue(dispatch_object_t dou
, dispatch_queue_t dq
)
1306 if (slowpath(dou
._do
->do_xref_cnt
== DISPATCH_OBJECT_GLOBAL_REFCNT
)) {
1309 // NOTE: we test for NULL target queues internally to detect root queues
1310 // therefore, if the retain crashes due to a bad input, that is OK
1311 _dispatch_retain(dq
);
1312 dispatch_barrier_async_f(dou
._dq
, dq
, _dispatch_set_target_queue2
);
1316 _dispatch_async_f_redirect2(void *_ctxt
)
1318 struct dispatch_continuation_s
*dc
= _ctxt
;
1319 struct dispatch_continuation_s
*other_dc
= dc
->dc_data
[1];
1320 dispatch_queue_t old_dq
, dq
= dc
->dc_data
[0];
1322 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1323 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1324 _dispatch_continuation_pop(other_dc
);
1325 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1327 if (dispatch_atomic_sub(&dq
->dq_running
, 2) == 0) {
1328 _dispatch_wakeup(dq
);
1330 _dispatch_release(dq
);
1334 _dispatch_async_f_redirect(dispatch_queue_t dq
, struct dispatch_object_s
*other_dc
)
1336 dispatch_continuation_t dc
= (void *)other_dc
;
1337 dispatch_queue_t root_dq
= dq
;
1339 if (dc
->dc_func
== _dispatch_sync_f_slow2
) {
1340 return dc
->dc_func(dc
->dc_ctxt
);
1343 dispatch_atomic_add(&dq
->dq_running
, 2);
1344 _dispatch_retain(dq
);
1346 dc
= _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1348 dc
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
1349 dc
->dc_func
= _dispatch_async_f_redirect2
;
1351 dc
->dc_data
[0] = dq
;
1352 dc
->dc_data
[1] = other_dc
;
1355 root_dq
= root_dq
->do_targetq
;
1356 } while (root_dq
->do_targetq
);
1358 _dispatch_queue_push(root_dq
, dc
);
1363 _dispatch_queue_drain(dispatch_queue_t dq
)
1365 dispatch_queue_t orig_tq
, old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
1366 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
1368 orig_tq
= dq
->do_targetq
;
1370 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1372 while (dq
->dq_items_tail
) {
1373 while (!fastpath(dq
->dq_items_head
)) {
1374 _dispatch_hardware_pause();
1377 dc
= dq
->dq_items_head
;
1378 dq
->dq_items_head
= NULL
;
1381 // Enqueue is TIGHTLY controlled, we won't wait long.
1383 next_dc
= fastpath(dc
->do_next
);
1384 } while (!next_dc
&& !dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, dc
, NULL
));
1385 if (DISPATCH_OBJECT_SUSPENDED(dq
)) {
1388 if (dq
->dq_running
> dq
->dq_width
) {
1391 if (orig_tq
!= dq
->do_targetq
) {
1394 if (fastpath(dq
->dq_width
== 1)) {
1395 _dispatch_continuation_pop(dc
);
1396 _dispatch_workitem_inc();
1397 } else if ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
1398 if (dq
->dq_running
> 1) {
1401 _dispatch_continuation_pop(dc
);
1402 _dispatch_workitem_inc();
1404 _dispatch_async_f_redirect(dq
, dc
);
1406 } while ((dc
= next_dc
));
1410 // if this is not a complete drain, we must undo some things
1412 // 'dc' must NOT be "popped"
1413 // 'dc' might be the last item
1414 if (next_dc
|| dispatch_atomic_cmpxchg(&dq
->dq_items_tail
, NULL
, dc
)) {
1415 dq
->dq_items_head
= dc
;
1417 while (!(next_dc
= dq
->dq_items_head
)) {
1418 _dispatch_hardware_pause();
1420 dq
->dq_items_head
= dc
;
1421 dc
->do_next
= next_dc
;
1425 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
1428 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1430 _dispatch_worker_thread(void *context
)
1432 dispatch_queue_t dq
= context
;
1433 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1437 // workaround tweaks the kernel workqueue does for us
1438 r
= sigfillset(&mask
);
1439 dispatch_assume_zero(r
);
1440 r
= _dispatch_pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
1441 dispatch_assume_zero(r
);
1444 _dispatch_worker_thread2(context
);
1445 // we use 65 seconds in case there are any timers that run once a minute
1446 } while (dispatch_semaphore_wait(qc
->dgq_thread_mediator
, dispatch_time(0, 65ull * NSEC_PER_SEC
)) == 0);
1448 dispatch_atomic_inc(&qc
->dgq_thread_pool_size
);
1449 if (dq
->dq_items_tail
) {
1450 _dispatch_queue_wakeup_global(dq
);
1456 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1458 _dispatch_worker_thread2(void *context
)
1460 struct dispatch_object_s
*item
;
1461 dispatch_queue_t dq
= context
;
1462 struct dispatch_root_queue_context_s
*qc
= dq
->do_ctxt
;
1464 if (_dispatch_thread_getspecific(dispatch_queue_key
)) {
1465 DISPATCH_CRASH("Premature thread recycling");
1468 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1469 qc
->dgq_pending
= 0;
1471 #if DISPATCH_COCOA_COMPAT
1472 // ensure that high-level memory management techniques do not leak/crash
1473 if (dispatch_begin_thread_4GC
) {
1474 dispatch_begin_thread_4GC();
1476 void *pool
= _dispatch_begin_NSAutoReleasePool();
1479 #if DISPATCH_PERF_MON
1480 uint64_t start
= mach_absolute_time();
1482 while ((item
= fastpath(_dispatch_queue_concurrent_drain_one(dq
)))) {
1483 _dispatch_continuation_pop(item
);
1485 #if DISPATCH_PERF_MON
1486 _dispatch_queue_merge_stats(start
);
1489 #if DISPATCH_COCOA_COMPAT
1490 _dispatch_end_NSAutoReleasePool(pool
);
1491 dispatch_end_thread_4GC();
1494 _dispatch_thread_setspecific(dispatch_queue_key
, NULL
);
1496 _dispatch_force_cache_cleanup();
1499 #if DISPATCH_PERF_MON
1501 _dispatch_queue_merge_stats(uint64_t start
)
1503 uint64_t avg
, delta
= mach_absolute_time() - start
;
1504 unsigned long count
, bucket
;
1506 count
= (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key
);
1507 _dispatch_thread_setspecific(dispatch_bcounter_key
, NULL
);
1510 avg
= delta
/ count
;
1511 bucket
= flsll(avg
);
1516 // 64-bit counters on 32-bit require a lock or a queue
1517 OSSpinLockLock(&_dispatch_stats_lock
);
1519 _dispatch_stats
[bucket
].time_total
+= delta
;
1520 _dispatch_stats
[bucket
].count_total
+= count
;
1521 _dispatch_stats
[bucket
].thread_total
++;
1523 OSSpinLockUnlock(&_dispatch_stats_lock
);
1528 dispatch_queue_debug_attr(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1530 return snprintf(buf
, bufsiz
, "parent = %p ", dq
->do_targetq
);
1534 dispatch_queue_debug(dispatch_queue_t dq
, char* buf
, size_t bufsiz
)
1537 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ", dq
->dq_label
, dq
);
1538 offset
+= dispatch_object_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1539 offset
+= dispatch_queue_debug_attr(dq
, &buf
[offset
], bufsiz
- offset
);
1540 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "}");
1546 dispatch_debug_queue(dispatch_queue_t dq
, const char* str
) {
1548 dispatch_debug(dq
, "%s", str
);
1550 _dispatch_log("queue[NULL]: %s", str
);
1555 #if DISPATCH_COCOA_COMPAT
1557 _dispatch_main_queue_callback_4CF(mach_msg_header_t
*msg
__attribute__((unused
)))
1559 if (main_q_is_draining
) {
1562 _dispatch_queue_set_mainq_drain_state(true);
1563 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q
);
1564 _dispatch_queue_set_mainq_drain_state(false);
1568 _dispatch_get_main_queue_port_4CF(void)
1570 dispatch_once_f(&_dispatch_main_q_port_pred
, NULL
, _dispatch_main_q_port_init
);
1576 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr
)
1578 dispatch_queue_attr_set_finalizer(attr
, NULL
);
1579 _dispatch_dispose(attr
);
1582 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable
= {
1583 .do_type
= DISPATCH_QUEUE_ATTR_TYPE
,
1584 .do_kind
= "queue-attr",
1585 .do_dispose
= dispatch_queue_attr_dispose
,
1588 dispatch_queue_attr_t
1589 dispatch_queue_attr_create(void)
1591 dispatch_queue_attr_t a
= calloc(1, sizeof(struct dispatch_queue_attr_s
));
1594 a
->do_vtable
= &dispatch_queue_attr_vtable
;
1595 a
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1598 a
->do_targetq
= _dispatch_get_root_queue(0, 0);
1599 a
->qa_flags
= DISPATCH_QUEUE_OVERCOMMIT
;
1605 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr
, uint64_t flags
)
1607 dispatch_assert_zero(flags
& ~DISPATCH_QUEUE_FLAGS_MASK
);
1608 attr
->qa_flags
= (unsigned long)flags
& DISPATCH_QUEUE_FLAGS_MASK
;
1612 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr
, int priority
)
1614 dispatch_debug_assert(attr
, "NULL pointer");
1615 dispatch_debug_assert(priority
<= DISPATCH_QUEUE_PRIORITY_HIGH
&& priority
>= DISPATCH_QUEUE_PRIORITY_LOW
, "Invalid priority");
1618 priority
= DISPATCH_QUEUE_PRIORITY_HIGH
;
1619 } else if (priority
< 0) {
1620 priority
= DISPATCH_QUEUE_PRIORITY_LOW
;
1623 attr
->qa_priority
= priority
;
1627 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr
,
1628 void *context
, dispatch_queue_finalizer_function_t finalizer
)
1631 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1632 Block_release(attr
->finalizer_ctxt
);
1635 attr
->finalizer_ctxt
= context
;
1636 attr
->finalizer_func
= finalizer
;
1641 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr
,
1642 dispatch_queue_finalizer_t finalizer
)
1645 dispatch_queue_finalizer_function_t func
;
1648 if (!(ctxt
= Block_copy(finalizer
))) {
1651 func
= (void *)_dispatch_call_block_and_release2
;
1657 dispatch_queue_attr_set_finalizer_f(attr
, ctxt
, func
);
1664 _dispatch_ccache_init(void *context
__attribute__((unused
)))
1666 _dispatch_ccache_zone
= malloc_create_zone(0, 0);
1667 dispatch_assert(_dispatch_ccache_zone
);
1668 malloc_set_zone_name(_dispatch_ccache_zone
, "DispatchContinuations");
1671 dispatch_continuation_t
1672 _dispatch_continuation_alloc_from_heap(void)
1674 static dispatch_once_t pred
;
1675 dispatch_continuation_t dc
;
1677 dispatch_once_f(&pred
, NULL
, _dispatch_ccache_init
);
1679 while (!(dc
= fastpath(malloc_zone_calloc(_dispatch_ccache_zone
, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc
)))))) {
1687 _dispatch_force_cache_cleanup(void)
1689 dispatch_continuation_t dc
= _dispatch_thread_getspecific(dispatch_cache_key
);
1691 _dispatch_thread_setspecific(dispatch_cache_key
, NULL
);
1692 _dispatch_cache_cleanup2(dc
);
1698 _dispatch_cache_cleanup2(void *value
)
1700 dispatch_continuation_t dc
, next_dc
= value
;
1702 while ((dc
= next_dc
)) {
1703 next_dc
= dc
->do_next
;
1704 malloc_zone_free(_dispatch_ccache_zone
, dc
);
1708 static char _dispatch_build
[16];
1711 _dispatch_bug_init(void *context
__attribute__((unused
)))
1713 int mib
[] = { CTL_KERN
, KERN_OSVERSION
};
1714 size_t bufsz
= sizeof(_dispatch_build
);
1716 sysctl(mib
, 2, _dispatch_build
, &bufsz
, NULL
, 0);
1720 _dispatch_bug(size_t line
, long val
)
1722 static dispatch_once_t pred
;
1723 static void *last_seen
;
1724 void *ra
= __builtin_return_address(0);
1726 dispatch_once_f(&pred
, NULL
, _dispatch_bug_init
);
1727 if (last_seen
!= ra
) {
1729 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build
, line
, val
);
1734 _dispatch_abort(size_t line
, long val
)
1736 _dispatch_bug(line
, val
);
1741 _dispatch_log(const char *msg
, ...)
1747 _dispatch_logv(msg
, ap
);
1753 _dispatch_logv(const char *msg
, va_list ap
)
1756 static FILE *logfile
, *tmp
;
1757 char newbuf
[strlen(msg
) + 2];
1758 char path
[PATH_MAX
];
1760 sprintf(newbuf
, "%s\n", msg
);
1763 snprintf(path
, sizeof(path
), "/var/tmp/libdispatch.%d.log", getpid());
1764 tmp
= fopen(path
, "a");
1766 if (!dispatch_atomic_cmpxchg(&logfile
, NULL
, tmp
)) {
1770 gettimeofday(&tv
, NULL
);
1771 fprintf(logfile
, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1772 getprogname() ?: "", getpid(), tv
.tv_sec
, tv
.tv_usec
);
1775 vfprintf(logfile
, newbuf
, ap
);
1778 vsyslog(LOG_NOTICE
, msg
, ap
);
1783 _dispatch_pthread_sigmask(int how
, sigset_t
*set
, sigset_t
*oset
)
1787 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1789 r
= sigdelset(set
, SIGILL
);
1790 dispatch_assume_zero(r
);
1791 r
= sigdelset(set
, SIGTRAP
);
1792 dispatch_assume_zero(r
);
1793 r
= sigdelset(set
, SIGEMT
);
1794 dispatch_assume_zero(r
);
1795 r
= sigdelset(set
, SIGFPE
);
1796 dispatch_assume_zero(r
);
1797 r
= sigdelset(set
, SIGBUS
);
1798 dispatch_assume_zero(r
);
1799 r
= sigdelset(set
, SIGSEGV
);
1800 dispatch_assume_zero(r
);
1801 r
= sigdelset(set
, SIGSYS
);
1802 dispatch_assume_zero(r
);
1803 r
= sigdelset(set
, SIGPIPE
);
1804 dispatch_assume_zero(r
);
1806 return pthread_sigmask(how
, set
, oset
);
1809 bool _dispatch_safe_fork
= true;
1812 dispatch_atfork_prepare(void)
1817 dispatch_atfork_parent(void)
1822 dispatch_atfork_child(void)
1824 void *crash
= (void *)0x100;
1827 if (_dispatch_safe_fork
) {
1831 _dispatch_main_q
.dq_items_head
= crash
;
1832 _dispatch_main_q
.dq_items_tail
= crash
;
1834 _dispatch_mgr_q
.dq_items_head
= crash
;
1835 _dispatch_mgr_q
.dq_items_tail
= crash
;
1837 for (i
= 0; i
< DISPATCH_ROOT_QUEUE_COUNT
; i
++) {
1838 _dispatch_root_queues
[i
].dq_items_head
= crash
;
1839 _dispatch_root_queues
[i
].dq_items_tail
= crash
;
1844 dispatch_init_pthread(pthread_t pthr
__attribute__((unused
)))
1848 static int _dispatch_kq
;
1851 _dispatch_get_kq_init(void *context
__attribute__((unused
)))
1853 static const struct kevent kev
= {
1855 .filter
= EVFILT_USER
,
1856 .flags
= EV_ADD
|EV_CLEAR
,
1859 _dispatch_kq
= kqueue();
1860 _dispatch_safe_fork
= false;
1861 // in case we fall back to select()
1862 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
1864 if (_dispatch_kq
== -1) {
1865 dispatch_assert_zero(errno
);
1868 dispatch_assume_zero(kevent(_dispatch_kq
, &kev
, 1, NULL
, 0, NULL
));
1870 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
1874 _dispatch_get_kq(void)
1876 static dispatch_once_t pred
;
1878 dispatch_once_f(&pred
, NULL
, _dispatch_get_kq_init
);
1880 return _dispatch_kq
;
1884 _dispatch_mgr_thread2(struct kevent
*kev
, size_t cnt
)
1888 for (i
= 0; i
< cnt
; i
++) {
1889 // EVFILT_USER isn't used by sources
1890 if (kev
[i
].filter
== EVFILT_USER
) {
1891 // If _dispatch_mgr_thread2() ever is changed to return to the
1892 // caller, then this should become _dispatch_queue_drain()
1893 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q
);
1895 _dispatch_source_drain_kevent(&kev
[i
]);
1900 static dispatch_queue_t
1901 _dispatch_mgr_invoke(dispatch_queue_t dq
)
1903 static const struct timespec timeout_immediately
= { 0, 0 };
1904 struct timespec timeout
;
1905 const struct timespec
*timeoutp
;
1906 struct timeval sel_timeout
, *sel_timeoutp
;
1907 fd_set tmp_rfds
, tmp_wfds
;
1908 struct kevent kev
[1];
1909 int k_cnt
, k_err
, i
, r
;
1911 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
1914 _dispatch_run_timers();
1916 timeoutp
= _dispatch_get_next_timer_fire(&timeout
);
1918 if (_dispatch_select_workaround
) {
1919 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
1920 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
1922 sel_timeout
.tv_sec
= timeoutp
->tv_sec
;
1923 sel_timeout
.tv_usec
= (typeof(sel_timeout
.tv_usec
))(timeoutp
->tv_nsec
/ 1000u);
1924 sel_timeoutp
= &sel_timeout
;
1926 sel_timeoutp
= NULL
;
1929 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
, sel_timeoutp
);
1931 if (errno
!= EBADF
) {
1932 dispatch_assume_zero(errno
);
1935 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1936 if (i
== _dispatch_kq
) {
1939 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
, &_dispatch_wfds
)) {
1946 FD_CLR(i
, &_dispatch_rfds
);
1947 FD_CLR(i
, &_dispatch_wfds
);
1948 _dispatch_rfd_ptrs
[i
] = 0;
1949 _dispatch_wfd_ptrs
[i
] = 0;
1956 for (i
= 0; i
< FD_SETSIZE
; i
++) {
1957 if (i
== _dispatch_kq
) {
1960 if (FD_ISSET(i
, &tmp_rfds
)) {
1961 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISABLE
1962 EV_SET(&kev
[0], i
, EVFILT_READ
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_rfd_ptrs
[i
]);
1963 _dispatch_rfd_ptrs
[i
] = 0;
1964 _dispatch_mgr_thread2(kev
, 1);
1966 if (FD_ISSET(i
, &tmp_wfds
)) {
1967 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISABLE
1968 EV_SET(&kev
[0], i
, EVFILT_WRITE
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1, _dispatch_wfd_ptrs
[i
]);
1969 _dispatch_wfd_ptrs
[i
] = 0;
1970 _dispatch_mgr_thread2(kev
, 1);
1975 timeoutp
= &timeout_immediately
;
1978 k_cnt
= kevent(_dispatch_kq
, NULL
, 0, kev
, sizeof(kev
) / sizeof(kev
[0]), timeoutp
);
1983 if (k_err
== EBADF
) {
1984 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1986 dispatch_assume_zero(k_err
);
1989 _dispatch_mgr_thread2(kev
, (size_t)k_cnt
);
1992 _dispatch_force_cache_cleanup();
2001 _dispatch_mgr_wakeup(dispatch_queue_t dq
)
2003 static const struct kevent kev
= {
2005 .filter
= EVFILT_USER
,
2007 .flags
= EV_TRIGGER
,
2010 .fflags
= NOTE_TRIGGER
,
2014 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq
);
2016 _dispatch_update_kq(&kev
);
2022 _dispatch_update_kq(const struct kevent
*kev
)
2024 struct kevent kev_copy
= *kev
;
2025 kev_copy
.flags
|= EV_RECEIPT
;
2027 if (kev_copy
.flags
& EV_DELETE
) {
2028 switch (kev_copy
.filter
) {
2030 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_rfds
)) {
2031 FD_CLR((int)kev_copy
.ident
, &_dispatch_rfds
);
2032 _dispatch_rfd_ptrs
[kev_copy
.ident
] = 0;
2036 if (FD_ISSET((int)kev_copy
.ident
, &_dispatch_wfds
)) {
2037 FD_CLR((int)kev_copy
.ident
, &_dispatch_wfds
);
2038 _dispatch_wfd_ptrs
[kev_copy
.ident
] = 0;
2046 int rval
= kevent(_dispatch_get_kq(), &kev_copy
, 1, &kev_copy
, 1, NULL
);
2048 // If we fail to register with kevents, for other reasons aside from
2049 // changelist elements.
2050 dispatch_assume_zero(errno
);
2051 //kev_copy.flags |= EV_ERROR;
2052 //kev_copy.data = error;
2056 // The following select workaround only applies to adding kevents
2057 if (!(kev
->flags
& EV_ADD
)) {
2061 switch (kev_copy
.data
) {
2067 // If an error occurred while registering with kevent, and it was
2068 // because of a kevent changelist processing && the kevent involved
2069 // either doing a read or write, it would indicate we were trying
2070 // to register a /dev/* port; fall back to select
2071 switch (kev_copy
.filter
) {
2073 _dispatch_select_workaround
= true;
2074 FD_SET((int)kev_copy
.ident
, &_dispatch_rfds
);
2075 _dispatch_rfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2078 _dispatch_select_workaround
= true;
2079 FD_SET((int)kev_copy
.ident
, &_dispatch_wfds
);
2080 _dispatch_wfd_ptrs
[kev_copy
.ident
] = kev_copy
.udata
;
2083 _dispatch_source_drain_kevent(&kev_copy
);
2090 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable
= {
2091 .do_type
= DISPATCH_QUEUE_MGR_TYPE
,
2092 .do_kind
= "mgr-queue",
2093 .do_invoke
= _dispatch_mgr_invoke
,
2094 .do_debug
= dispatch_queue_debug
,
2095 .do_probe
= _dispatch_mgr_wakeup
,
2098 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2099 struct dispatch_queue_s _dispatch_mgr_q
= {
2100 .do_vtable
= &_dispatch_queue_mgr_vtable
,
2101 .do_ref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2102 .do_xref_cnt
= DISPATCH_OBJECT_GLOBAL_REFCNT
,
2103 .do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_LOCK
,
2104 .do_targetq
= &_dispatch_root_queues
[DISPATCH_ROOT_QUEUE_COUNT
- 1],
2106 .dq_label
= "com.apple.libdispatch-manager",
2111 const struct dispatch_queue_offsets_s dispatch_queue_offsets
= {
2113 .dqo_label
= offsetof(struct dispatch_queue_s
, dq_label
),
2114 .dqo_label_size
= sizeof(_dispatch_main_q
.dq_label
),
2116 .dqo_flags_size
= 0,
2117 .dqo_width
= offsetof(struct dispatch_queue_s
, dq_width
),
2118 .dqo_width_size
= sizeof(_dispatch_main_q
.dq_width
),
2119 .dqo_serialnum
= offsetof(struct dispatch_queue_s
, dq_serialnum
),
2120 .dqo_serialnum_size
= sizeof(_dispatch_main_q
.dq_serialnum
),
2121 .dqo_running
= offsetof(struct dispatch_queue_s
, dq_running
),
2122 .dqo_running_size
= sizeof(_dispatch_main_q
.dq_running
),
2127 dispatch_after(dispatch_time_t when
, dispatch_queue_t queue
, dispatch_block_t work
)
2129 // test before the copy of the block
2130 if (when
== DISPATCH_TIME_FOREVER
) {
2132 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2136 dispatch_after_f(when
, queue
, _dispatch_Block_copy(work
), _dispatch_call_block_and_release
);
2142 dispatch_after_f(dispatch_time_t when
, dispatch_queue_t queue
, void *ctxt
, void (*func
)(void *))
2145 if (when
== DISPATCH_TIME_FOREVER
) {
2147 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2152 // this function can and should be optimized to not use a dispatch source
2154 delta
= _dispatch_timeout(when
);
2156 return dispatch_async_f(queue
, ctxt
, func
);
2158 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL
, delta
, 0, NULL
, queue
, ^(dispatch_source_t ds
) {
2159 long err_dom
, err_val
;
2160 if ((err_dom
= dispatch_source_get_error(ds
, &err_val
))) {
2161 dispatch_assert(err_dom
== DISPATCH_ERROR_DOMAIN_POSIX
);
2162 dispatch_assert(err_val
== ECANCELED
);
2164 dispatch_release(ds
); // MUST NOT be _dispatch_release()
2166 dispatch_source_cancel(ds
);