#include "internal.h"
#if HAVE_MACH
-#include "protocol.h"
+#include "protocol.h" // _dispatch_send_wakeup_runloop_thread
#endif
+#if HAVE_PTHREAD_WORKQUEUES || DISPATCH_USE_INTERNAL_WORKQUEUE
+#define DISPATCH_USE_WORKQUEUES 1
+#endif
#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
!defined(DISPATCH_ENABLE_THREAD_POOL)
#define DISPATCH_ENABLE_THREAD_POOL 1
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
#define DISPATCH_USE_PTHREAD_POOL 1
#endif
-#if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
- && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
-#define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
-#endif
-#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
- !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
+#if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || \
+ DISPATCH_DEBUG) && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
#endif
-#if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
-#undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
-#define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
+#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && (DISPATCH_DEBUG || \
+ (!DISPATCH_USE_KEVENT_WORKQUEUE && !HAVE_PTHREAD_WORKQUEUE_QOS)) && \
+ !defined(DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP)
+#define DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 1
+#endif
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || \
+ DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || \
+ DISPATCH_USE_INTERNAL_WORKQUEUE
+#if !DISPATCH_USE_INTERNAL_WORKQUEUE
+#define DISPATCH_USE_WORKQ_PRIORITY 1
+#endif
+#define DISPATCH_USE_WORKQ_OPTIONS 1
#endif
-#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
+
+#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#define pthread_workqueue_t void*
#endif
static void _dispatch_sig_thread(void *ctxt);
static void _dispatch_cache_cleanup(void *value);
-static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp);
static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
static void _dispatch_queue_cleanup(void *ctxt);
+static void _dispatch_wlh_cleanup(void *ctxt);
static void _dispatch_deferred_items_cleanup(void *ctxt);
static void _dispatch_frame_cleanup(void *ctxt);
static void _dispatch_context_cleanup(void *ctxt);
-static void _dispatch_non_barrier_complete(dispatch_queue_t dq);
-static inline void _dispatch_global_queue_poke(dispatch_queue_t dq);
+static void _dispatch_queue_barrier_complete(dispatch_queue_t dq,
+ dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
+static void _dispatch_queue_non_barrier_complete(dispatch_queue_t dq);
+static void _dispatch_queue_push_sync_waiter(dispatch_queue_t dq,
+ dispatch_sync_context_t dsc, dispatch_qos_t qos);
+#if HAVE_PTHREAD_WORKQUEUE_QOS
+static void _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
+ dispatch_queue_t dq, dispatch_qos_t qos);
+static inline void _dispatch_queue_class_wakeup_with_override(dispatch_queue_t,
+ uint64_t dq_state, dispatch_wakeup_flags_t flags);
+#endif
#if HAVE_PTHREAD_WORKQUEUES
static void _dispatch_worker_thread4(void *context);
#if HAVE_PTHREAD_WORKQUEUE_QOS
static void _dispatch_worker_thread3(pthread_priority_t priority);
#endif
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
static void _dispatch_worker_thread2(int priority, int options, void *context);
#endif
#endif
#if DISPATCH_USE_PTHREAD_POOL
static void *_dispatch_worker_thread(void *context);
-static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
#endif
#if DISPATCH_COCOA_COMPAT
static dispatch_once_t _dispatch_main_q_handle_pred;
static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
- pthread_priority_t pp, dispatch_wakeup_flags_t flags);
+ dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
static void _dispatch_runloop_queue_handle_init(void *ctxt);
static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
#endif
-static void _dispatch_root_queues_init_once(void *context);
-static dispatch_once_t _dispatch_root_queues_pred;
-
#pragma mark -
#pragma mark dispatch_root_queue
};
#endif
-#define MAX_PTHREAD_COUNT 255
+#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
+#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
+#endif
struct dispatch_root_queue_context_s {
union {
struct {
- unsigned int volatile dgq_pending;
-#if HAVE_PTHREAD_WORKQUEUES
+ int volatile dgq_pending;
+#if DISPATCH_USE_WORKQUEUES
qos_class_t dgq_qos;
- int dgq_wq_priority, dgq_wq_options;
+#if DISPATCH_USE_WORKQ_PRIORITY
+ int dgq_wq_priority;
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
+ int dgq_wq_options;
+#endif
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
pthread_workqueue_t dgq_kworkqueue;
#endif
-#endif // HAVE_PTHREAD_WORKQUEUES
+#endif // DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
void *dgq_ctxt;
- uint32_t volatile dgq_thread_pool_size;
+ int32_t volatile dgq_thread_pool_size;
#endif
};
char _dgq_pad[DISPATCH_CACHELINE_SIZE];
DISPATCH_CACHELINE_ALIGN
static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
-#if HAVE_PTHREAD_WORKQUEUES
- .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQUEUES
+ .dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
+#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
+#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
// renaming this symbol
DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_root_queues[] = {
-#define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
- [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
+#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
+ ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
+ DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
+ DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
+#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
+ [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = &_dispatch_root_queue_contexts[ \
- DISPATCH_ROOT_QUEUE_IDX_##n], \
- .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
- .dq_override_voucher = DISPATCH_NO_VOUCHER, \
- .dq_override = DISPATCH_SATURATED_OVERRIDE, \
+ _DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
+ .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
+ .dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
+ DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
+ ((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
+ DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
__VA_ARGS__ \
}
- _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
+ DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS,
+ _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
- _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT,
+ _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
[WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
&_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
};
-#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
-
-#define DISPATCH_PRIORITY_COUNT 5
-
-enum {
- // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
- // maintenance priority
- DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
- DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
- DISPATCH_PRIORITY_IDX_LOW,
- DISPATCH_PRIORITY_IDX_DEFAULT,
- DISPATCH_PRIORITY_IDX_HIGH,
-};
-
-static qos_class_t _dispatch_priority2qos[] = {
- [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
- [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
- [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
- [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
- [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
-};
-
-#if HAVE_PTHREAD_WORKQUEUE_QOS
-static const int _dispatch_priority2wq[] = {
- [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
- [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
- [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
- [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
- [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
-};
-#endif
+#endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
static struct dispatch_queue_s _dispatch_mgr_root_queue;
DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_mgr_q = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
- .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1),
+ .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
+ DISPATCH_QUEUE_ROLE_BASE_ANON,
.do_targetq = &_dispatch_mgr_root_queue,
.dq_label = "com.apple.libdispatch-manager",
- .dq_width = 1,
- .dq_override_voucher = DISPATCH_NO_VOUCHER,
- .dq_override = DISPATCH_SATURATED_OVERRIDE,
+ .dq_atomic_flags = DQF_WIDTH(1),
+ .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
+ DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 2,
};
if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return DISPATCH_BAD_INPUT;
}
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
- qos_class_t qos;
- switch (priority) {
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- case _DISPATCH_QOS_CLASS_MAINTENANCE:
- if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
- .dq_priority) {
- // map maintenance to background on old kernel
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
- } else {
- qos = (qos_class_t)priority;
- }
- break;
-#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
- break;
- case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
- break;
- case DISPATCH_QUEUE_PRIORITY_LOW:
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
- break;
- case DISPATCH_QUEUE_PRIORITY_DEFAULT:
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
- break;
- case DISPATCH_QUEUE_PRIORITY_HIGH:
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
- break;
- case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
- .dq_priority) {
- qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
- break;
- }
+ dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
+#if !HAVE_PTHREAD_WORKQUEUE_QOS
+ if (qos == QOS_CLASS_MAINTENANCE) {
+ qos = DISPATCH_QOS_BACKGROUND;
+ } else if (qos == QOS_CLASS_USER_INTERACTIVE) {
+ qos = DISPATCH_QOS_USER_INITIATED;
+ }
#endif
- // fallthrough
- default:
- qos = (qos_class_t)priority;
- break;
+ if (qos == DISPATCH_QOS_UNSPECIFIED) {
+ return DISPATCH_BAD_INPUT;
}
return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}
_dispatch_get_current_queue(void)
{
return _dispatch_queue_get_current() ?:
- _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
+ _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
}
dispatch_queue_t
"dispatch_assert_queue()");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- if (unlikely(_dq_state_drain_pended(dq_state))) {
- goto fail;
- }
- if (likely(_dq_state_drain_owner(dq_state) == _dispatch_tid_self())) {
+ if (likely(_dq_state_drain_locked_by_self(dq_state))) {
return;
}
- if (likely(dq->dq_width > 1)) {
- // we can look at the width: if it is changing while we read it,
- // it means that a barrier is running on `dq` concurrently, which
- // proves that we're not on `dq`. Hence reading a stale '1' is ok.
- if (fastpath(_dispatch_thread_frame_find_queue(dq))) {
+ // we can look at the width: if it is changing while we read it,
+ // it means that a barrier is running on `dq` concurrently, which
+ // proves that we're not on `dq`. Hence reading a stale '1' is ok.
+ //
+ // However if we can have thread bound queues, these mess with lock
+ // ownership and we always have to take the slowpath
+ if (likely(DISPATCH_COCOA_COMPAT || dq->dq_width > 1)) {
+ if (likely(_dispatch_thread_frame_find_queue(dq))) {
return;
}
}
-fail:
_dispatch_assert_queue_fail(dq, true);
}
"dispatch_assert_queue_not()");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- if (_dq_state_drain_pended(dq_state)) {
- return;
- }
- if (likely(_dq_state_drain_owner(dq_state) != _dispatch_tid_self())) {
- if (likely(dq->dq_width == 1)) {
- // we can look at the width: if it is changing while we read it,
- // it means that a barrier is running on `dq` concurrently, which
- // proves that we're not on `dq`. Hence reading a stale '1' is ok.
+ if (likely(!_dq_state_drain_locked_by_self(dq_state))) {
+ // we can look at the width: if it is changing while we read it,
+ // it means that a barrier is running on `dq` concurrently, which
+ // proves that we're not on `dq`. Hence reading a stale '1' is ok.
+ //
+ // However if we can have thread bound queues, these mess with lock
+ // ownership and we always have to take the slowpath
+ if (likely(!DISPATCH_COCOA_COMPAT && dq->dq_width == 1)) {
return;
}
if (likely(!_dispatch_thread_frame_find_queue(dq))) {
#pragma mark -
#pragma mark dispatch_init
-#if HAVE_PTHREAD_WORKQUEUE_QOS
-pthread_priority_t _dispatch_background_priority;
-pthread_priority_t _dispatch_user_initiated_priority;
-
-static void
-_dispatch_root_queues_init_qos(int supported)
-{
- pthread_priority_t p;
- qos_class_t qos;
- unsigned int i;
- for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
- p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
- qos = _pthread_qos_class_decode(p, NULL, NULL);
- dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
- _dispatch_priority2qos[i] = qos;
- }
- for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
- qos = _dispatch_root_queue_contexts[i].dgq_qos;
- if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
- !(supported & WORKQ_FEATURE_MAINTENANCE)) {
- continue;
- }
- unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
- flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
- if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
- i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
- flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
- }
- p = _pthread_qos_class_encode(qos, 0, flags);
- _dispatch_root_queues[i].dq_priority = (dispatch_priority_t)p;
- }
-}
-#endif // HAVE_PTHREAD_WORKQUEUE_QOS
-
static inline bool
_dispatch_root_queues_init_workq(int *wq_supported)
{
- int r;
+ int r; (void)r;
bool result = false;
*wq_supported = 0;
-#if HAVE_PTHREAD_WORKQUEUES
- bool disable_wq = false;
+#if DISPATCH_USE_WORKQUEUES
+ bool disable_wq = false; (void)disable_wq;
#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
#endif
#endif
#if DISPATCH_USE_KEVENT_WORKQUEUE
bool disable_kevent_wq = false;
-#if DISPATCH_DEBUG
+#if DISPATCH_DEBUG || DISPATCH_PROFILE
disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
#endif
#endif
+
if (!disable_wq && !disable_qos) {
*wq_supported = _pthread_workqueue_supported();
#if DISPATCH_USE_KEVENT_WORKQUEUE
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#if DISPATCH_USE_MGR_THREAD
_dispatch_kevent_workqueue_enabled = !r;
-#endif
-#if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
- _dispatch_evfilt_machport_direct_enabled = !r;
#endif
result = !r;
} else
-#endif
+#endif // DISPATCH_USE_KEVENT_WORKQUEUE
if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
#if DISPATCH_USE_MGR_THREAD
r = _pthread_workqueue_init(_dispatch_worker_thread3,
result = !r;
#endif
}
- if (result) _dispatch_root_queues_init_qos(*wq_supported);
+ if (!(*wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
+ DISPATCH_INTERNAL_CRASH(*wq_supported,
+ "QoS Maintenance support required");
+ }
}
#endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
if (!result && !disable_wq) {
pthread_workqueue_setdispatchoffset_np(
offsetof(struct dispatch_queue_s, dq_serialnum));
#endif
result = !r;
}
-#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
if (!result) {
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
(void)dispatch_assume_zero(r);
}
#endif
- int i;
+ size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
pthread_workqueue_t pwq = NULL;
dispatch_root_queue_context_t qc;
result = result || dispatch_assume(pwq);
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
- qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
+ if (pwq) {
+ qc->dgq_kworkqueue = pwq;
+ } else {
+ qc->dgq_kworkqueue = (void*)(~0ul);
+ // because the fastpath of _dispatch_global_queue_poke didn't
+ // know yet that we're using the internal pool implementation
+ // we have to undo its setting of dgq_pending
+ qc->dgq_pending = 0;
+ }
}
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
if (!disable_wq) {
#endif
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
-#endif // HAVE_PTHREAD_WORKQUEUES
+#endif // DISPATCH_USE_WORKQUEUES
return result;
}
#if DISPATCH_USE_PTHREAD_POOL
static inline void
_dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
- uint8_t pool_size, bool overcommit)
+ int32_t pool_size, bool overcommit)
{
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
- uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
- dispatch_hw_config(active_cpus);
+ int32_t thread_pool_size = overcommit ? DISPATCH_WORKQ_MAX_PTHREAD_COUNT :
+ (int32_t)dispatch_hw_config(active_cpus);
if (slowpath(pool_size) && pool_size < thread_pool_size) {
thread_pool_size = pool_size;
}
qc->dgq_thread_pool_size = thread_pool_size;
-#if HAVE_PTHREAD_WORKQUEUES
+#if DISPATCH_USE_WORKQUEUES
if (qc->dgq_qos) {
(void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
(void)dispatch_assume_zero(pthread_attr_setdetachstate(
#endif
}
#endif // HAVE_PTHREAD_WORKQUEUES
- _os_semaphore_t *sema = &pqc->dpq_thread_mediator.dsema_sema;
- _os_semaphore_init(sema, _OS_SEM_POLICY_LIFO);
- _os_semaphore_create(sema, _OS_SEM_POLICY_LIFO);
+ _dispatch_sema4_t *sema = &pqc->dpq_thread_mediator.dsema_sema;
+ _dispatch_sema4_init(sema, _DSEMA4_POLICY_LIFO);
+ _dispatch_sema4_create(sema, _DSEMA4_POLICY_LIFO);
}
#endif // DISPATCH_USE_PTHREAD_POOL
-static dispatch_once_t _dispatch_root_queues_pred;
-
-void
-_dispatch_root_queues_init(void)
-{
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
-}
-
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
if (!_dispatch_root_queues_init_workq(&wq_supported)) {
#if DISPATCH_ENABLE_THREAD_POOL
- int i;
+ size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
bool overcommit = true;
-#if TARGET_OS_EMBEDDED
+#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
// some software hangs if the non-overcommitting queues do not
// overcommit when threads block. Someday, this behavior should
// apply to all platforms
}
}
+void
+_dispatch_root_queues_init(void)
+{
+ static dispatch_once_t _dispatch_root_queues_pred;
+ dispatch_once_f(&_dispatch_root_queues_pred, NULL,
+ _dispatch_root_queues_init_once);
+}
+
DISPATCH_EXPORT DISPATCH_NOTHROW
void
libdispatch_init(void)
{
- dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
- dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
+ dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 2 * DISPATCH_QOS_MAX);
dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
-DISPATCH_QUEUE_PRIORITY_HIGH);
DISPATCH_ROOT_QUEUE_COUNT);
dispatch_assert(countof(_dispatch_root_queue_contexts) ==
DISPATCH_ROOT_QUEUE_COUNT);
- dispatch_assert(countof(_dispatch_priority2qos) ==
- DISPATCH_PRIORITY_COUNT);
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- dispatch_assert(countof(_dispatch_priority2wq) ==
- DISPATCH_PRIORITY_COUNT);
-#endif
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
dispatch_assert(sizeof(_dispatch_wq2root_queues) /
sizeof(_dispatch_wq2root_queues[0][0]) ==
WORKQ_NUM_PRIOQUEUE * 2);
dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
DISPATCH_CACHELINE_SIZE == 0);
-
#if HAVE_PTHREAD_WORKQUEUE_QOS
- // 26497968 _dispatch_user_initiated_priority should be set for qos
- // propagation to work properly
- pthread_priority_t p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
- _dispatch_main_q.dq_priority = (dispatch_priority_t)p;
- _dispatch_main_q.dq_override = p & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED, 0, 0);
- _dispatch_user_initiated_priority = p;
- p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND, 0, 0);
- _dispatch_background_priority = p;
+ dispatch_qos_t qos = _dispatch_qos_from_qos_class(qos_class_main());
+ dispatch_priority_t pri = _dispatch_priority_make(qos, 0);
+ _dispatch_main_q.dq_priority = _dispatch_priority_with_override_qos(pri, qos);
#if DISPATCH_DEBUG
if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
_dispatch_set_qos_class_enabled = 1;
#if DISPATCH_USE_THREAD_LOCAL_STORAGE
_dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
#else
+ _dispatch_thread_key_create(&dispatch_priority_key, NULL);
+ _dispatch_thread_key_create(&dispatch_r2k_key, NULL);
_dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
- _dispatch_thread_key_create(&dispatch_deferred_items_key,
- _dispatch_deferred_items_cleanup);
_dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
- _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
_dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
_dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
- _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
_dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
NULL);
-#if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
+ _dispatch_thread_key_create(&dispatch_basepri_key, NULL);
+#if DISPATCH_INTROSPECTION
+ _dispatch_thread_key_create(&dispatch_introspection_key , NULL);
+#elif DISPATCH_PERF_MON
_dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
#endif
-#if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
- if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK) {
- _dispatch_thread_key_create(&dispatch_sema4_key,
- _dispatch_thread_semaphore_dispose);
- }
-#endif
+ _dispatch_thread_key_create(&dispatch_wlh_key, _dispatch_wlh_cleanup);
+ _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
+ _dispatch_thread_key_create(&dispatch_deferred_items_key,
+ _dispatch_deferred_items_cleanup);
#endif
#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
dispatch_atfork_parent, dispatch_atfork_child));
#endif
_dispatch_hw_config_init();
+ _dispatch_time_init();
_dispatch_vtable_init();
_os_object_init();
_voucher_init();
_dispatch_introspection_init();
}
-#if HAVE_MACH
-static dispatch_once_t _dispatch_mach_host_port_pred;
-static mach_port_t _dispatch_mach_host_port;
-
-static void
-_dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
-{
- kern_return_t kr;
- mach_port_t mp, mhp = mach_host_self();
- kr = host_get_host_port(mhp, &mp);
- DISPATCH_VERIFY_MIG(kr);
- if (fastpath(!kr)) {
- // mach_host_self returned the HOST_PRIV port
- kr = mach_port_deallocate(mach_task_self(), mhp);
- DISPATCH_VERIFY_MIG(kr);
- mhp = mp;
- } else if (kr != KERN_INVALID_ARGUMENT) {
- (void)dispatch_assume_zero(kr);
- }
- if (!fastpath(mhp)) {
- DISPATCH_CLIENT_CRASH(kr, "Could not get unprivileged host port");
- }
- _dispatch_mach_host_port = mhp;
-}
-
-mach_port_t
-_dispatch_get_mach_host_port(void)
-{
- dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
- _dispatch_mach_host_port_init);
- return _dispatch_mach_host_port;
-}
-#endif
-
#if DISPATCH_USE_THREAD_LOCAL_STORAGE
#include <unistd.h>
#include <sys/syscall.h>
+#ifndef __ANDROID__
#ifdef SYS_gettid
DISPATCH_ALWAYS_INLINE
static inline pid_t
}
#else
#error "SYS_gettid unavailable on this system"
-#endif
+#endif /* SYS_gettid */
+#endif /* ! __ANDROID__ */
#define _tsd_call_cleanup(k, f) do { \
if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
- } while (0)
+ } while (0)
+
+#ifdef __ANDROID__
+static void (*_dispatch_thread_detach_callback)(void);
+
+void
+_dispatch_install_thread_detach_callback(dispatch_function_t cb)
+{
+ if (os_atomic_xchg(&_dispatch_thread_detach_callback, cb, relaxed)) {
+ DISPATCH_CLIENT_CRASH(0, "Installing a thread detach callback twice");
+ }
+}
+#endif
void
_libdispatch_tsd_cleanup(void *ctx)
{
struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
+ _tsd_call_cleanup(dispatch_priority_key, NULL);
+ _tsd_call_cleanup(dispatch_r2k_key, NULL);
+
_tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
_tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
_tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
_tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
_tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
NULL);
- _tsd_call_cleanup(dispatch_defaultpriority_key, NULL);
-#if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
+ _tsd_call_cleanup(dispatch_basepri_key, NULL);
+#if DISPATCH_INTROSPECTION
+ _tsd_call_cleanup(dispatch_introspection_key, NULL);
+#elif DISPATCH_PERF_MON
_tsd_call_cleanup(dispatch_bcounter_key, NULL);
#endif
-#if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
- _tsd_call_cleanup(dispatch_sema4_key, _dispatch_thread_semaphore_dispose);
-#endif
- _tsd_call_cleanup(dispatch_priority_key, NULL);
+ _tsd_call_cleanup(dispatch_wlh_key, _dispatch_wlh_cleanup);
_tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
_tsd_call_cleanup(dispatch_deferred_items_key,
_dispatch_deferred_items_cleanup);
+#ifdef __ANDROID__
+ if (_dispatch_thread_detach_callback) {
+ _dispatch_thread_detach_callback();
+ }
+#endif
tsd->tid = 0;
}
void
_dispatch_queue_atfork_child(void)
{
+ dispatch_queue_t main_q = &_dispatch_main_q;
void *crash = (void *)0x100;
size_t i;
-#if HAVE_MACH
- _dispatch_mach_host_port_pred = 0;
- _dispatch_mach_host_port = MACH_PORT_NULL;
-#endif
+ if (_dispatch_queue_is_thread_bound(main_q)) {
+ _dispatch_queue_set_bound_thread(main_q);
+ }
if (!_dispatch_is_multithreaded_inline()) return;
- _dispatch_main_q.dq_items_head = crash;
- _dispatch_main_q.dq_items_tail = crash;
+ main_q->dq_items_head = crash;
+ main_q->dq_items_tail = crash;
_dispatch_mgr_q.dq_items_head = crash;
_dispatch_mgr_q.dq_items_tail = crash;
}
}
+DISPATCH_NOINLINE
+void
+_dispatch_fork_becomes_unsafe_slow(void)
+{
+ uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
+ _DISPATCH_UNSAFE_FORK_MULTITHREADED, relaxed);
+ if (value & _DISPATCH_UNSAFE_FORK_PROHIBIT) {
+ DISPATCH_CLIENT_CRASH(0, "Transition to multithreaded is prohibited");
+ }
+}
+
+DISPATCH_NOINLINE
+void
+_dispatch_prohibit_transition_to_multithreaded(bool prohibit)
+{
+ if (prohibit) {
+ uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
+ _DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
+ if (value & _DISPATCH_UNSAFE_FORK_MULTITHREADED) {
+ DISPATCH_CLIENT_CRASH(0, "The executable is already multithreaded");
+ }
+ } else {
+ os_atomic_and(&_dispatch_unsafe_fork,
+ (uint8_t)~_DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
+ }
+}
+
#pragma mark -
#pragma mark dispatch_queue_attr_t
{
qos_class_t qos = (qos_class_t)qos_class;
switch (qos) {
- case _DISPATCH_QOS_CLASS_MAINTENANCE:
- case _DISPATCH_QOS_CLASS_BACKGROUND:
- case _DISPATCH_QOS_CLASS_UTILITY:
- case _DISPATCH_QOS_CLASS_DEFAULT:
- case _DISPATCH_QOS_CLASS_USER_INITIATED:
- case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
- case _DISPATCH_QOS_CLASS_UNSPECIFIED:
+ case QOS_CLASS_MAINTENANCE:
+ case QOS_CLASS_BACKGROUND:
+ case QOS_CLASS_UTILITY:
+ case QOS_CLASS_DEFAULT:
+ case QOS_CLASS_USER_INITIATED:
+ case QOS_CLASS_USER_INTERACTIVE:
+ case QOS_CLASS_UNSPECIFIED:
break;
default:
return false;
return true;
}
-#define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
- [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
-
-static const
-_dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
- DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
-};
-
#define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
DQA_INDEX_NON_OVERCOMMIT : \
#define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
-#define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
+#define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (qos)
static inline dispatch_queue_attr_t
-_dispatch_get_queue_attr(qos_class_t qos, int prio,
+_dispatch_get_queue_attr(dispatch_qos_t qos, int prio,
_dispatch_queue_attr_overcommit_t overcommit,
dispatch_autorelease_frequency_t frequency,
bool concurrent, bool inactive)
dispatch_queue_attr_t
_dispatch_get_default_queue_attr(void)
{
- return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED, 0,
+ return _dispatch_get_queue_attr(DISPATCH_QOS_UNSPECIFIED, 0,
_dispatch_queue_attr_overcommit_unspecified,
DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
}
dispatch_queue_attr_t
dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
- dispatch_qos_class_t qos_class, int relative_priority)
+ dispatch_qos_class_t qos_class, int relpri)
{
- if (!_dispatch_qos_class_valid(qos_class, relative_priority)) {
+ if (!_dispatch_qos_class_valid(qos_class, relpri)) {
return DISPATCH_BAD_INPUT;
}
if (!slowpath(dqa)) {
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
- return _dispatch_get_queue_attr(qos_class, relative_priority,
- dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
+ return _dispatch_get_queue_attr(_dispatch_qos_from_qos_class(qos_class),
+ relpri, dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
dqa->dqa_concurrent, dqa->dqa_inactive);
}
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
- return _dispatch_get_queue_attr(dqa->dqa_qos_class,
- dqa->dqa_relative_priority, dqa->dqa_overcommit,
+ dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
+ return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
+ _dispatch_priority_relpri(pri), dqa->dqa_overcommit,
dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
}
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
- return _dispatch_get_queue_attr(dqa->dqa_qos_class,
- dqa->dqa_relative_priority, overcommit ?
+ dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
+ return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
+ _dispatch_priority_relpri(pri), overcommit ?
_dispatch_queue_attr_overcommit_enabled :
_dispatch_queue_attr_overcommit_disabled,
dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
- return _dispatch_get_queue_attr(dqa->dqa_qos_class,
- dqa->dqa_relative_priority, dqa->dqa_overcommit,
+ dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
+ return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
+ _dispatch_priority_relpri(pri), dqa->dqa_overcommit,
frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
}
#pragma mark -
#pragma mark dispatch_queue_t
-// skip zero
-// 1 - main_q
-// 2 - mgr_q
-// 3 - mgr_root_q
-// 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
-// we use 'xadd' on Intel, so the initial value == next assigned
-unsigned long volatile _dispatch_queue_serial_numbers = 16;
+void
+dispatch_queue_set_label_nocopy(dispatch_queue_t dq, const char *label)
+{
+ if (dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
+ return;
+ }
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(dq);
+ if (unlikely(dqf & DQF_LABEL_NEEDS_FREE)) {
+ DISPATCH_CLIENT_CRASH(dq, "Cannot change label for this queue");
+ }
+ dq->dq_label = label;
+}
+
+static inline bool
+_dispatch_base_queue_is_wlh(dispatch_queue_t dq, dispatch_queue_t tq)
+{
+ (void)dq; (void)tq;
+ return false;
+}
+
+static void
+_dispatch_queue_inherit_wlh_from_target(dispatch_queue_t dq,
+ dispatch_queue_t tq)
+{
+ uint64_t old_state, new_state, role;
+
+ if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
+ role = DISPATCH_QUEUE_ROLE_INNER;
+ } else if (_dispatch_base_queue_is_wlh(dq, tq)) {
+ role = DISPATCH_QUEUE_ROLE_BASE_WLH;
+ } else {
+ role = DISPATCH_QUEUE_ROLE_BASE_ANON;
+ }
+
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ new_state = old_state & ~DISPATCH_QUEUE_ROLE_MASK;
+ new_state |= role;
+ if (old_state == new_state) {
+ os_atomic_rmw_loop_give_up(break);
+ }
+ });
+
+ dispatch_wlh_t cur_wlh = _dispatch_get_wlh();
+ if (cur_wlh == (dispatch_wlh_t)dq && !_dq_state_is_base_wlh(new_state)) {
+ _dispatch_event_loop_leave_immediate(cur_wlh, new_state);
+ }
+ if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
+#if DISPATCH_ALLOW_NON_LEAF_RETARGET
+ _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
+#else
+ _dispatch_queue_atomic_flags_set_and_clear(tq, DQF_TARGETED, DQF_LEGACY);
+#endif
+ }
+}
+
+unsigned long volatile _dispatch_queue_serial_numbers =
+ DISPATCH_QUEUE_SERIAL_NUMBER_INIT;
+
+dispatch_priority_t
+_dispatch_queue_compute_priority_and_wlh(dispatch_queue_t dq,
+ dispatch_wlh_t *wlh_out)
+{
+ dispatch_priority_t p = dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
+ dispatch_queue_t tq = dq->do_targetq;
+ dispatch_priority_t tqp = tq->dq_priority &DISPATCH_PRIORITY_REQUESTED_MASK;
+ dispatch_wlh_t wlh = DISPATCH_WLH_ANON;
+
+ if (_dq_state_is_base_wlh(dq->dq_state)) {
+ wlh = (dispatch_wlh_t)dq;
+ }
+
+ while (unlikely(!dx_hastypeflag(tq, QUEUE_ROOT))) {
+ if (unlikely(tq == &_dispatch_mgr_q)) {
+ if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
+ return DISPATCH_PRIORITY_FLAG_MANAGER;
+ }
+ if (unlikely(_dispatch_queue_is_thread_bound(tq))) {
+ // thread-bound hierarchies are weird, we need to install
+ // from the context of the thread this hierarchy is bound to
+ if (wlh_out) *wlh_out = NULL;
+ return 0;
+ }
+ if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq))) {
+ // this queue may not be activated yet, so the queue graph may not
+ // have stabilized yet
+ _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
+ if (wlh_out) *wlh_out = NULL;
+ return 0;
+ }
+
+ if (_dq_state_is_base_wlh(tq->dq_state)) {
+ wlh = (dispatch_wlh_t)tq;
+ } else if (unlikely(_dispatch_queue_is_legacy(tq))) {
+ // we're not allowed to dereference tq->do_targetq
+ _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
+ if (wlh_out) *wlh_out = NULL;
+ return 0;
+ }
+
+ if (!(tq->dq_priority & DISPATCH_PRIORITY_FLAG_INHERIT)) {
+ if (p < tqp) p = tqp;
+ }
+ tq = tq->do_targetq;
+ tqp = tq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
+ }
+
+ if (unlikely(!tqp)) {
+ // pthread root queues opt out of QoS
+ if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
+ return DISPATCH_PRIORITY_FLAG_MANAGER;
+ }
+ if (wlh_out) *wlh_out = wlh;
+ return _dispatch_priority_inherit_from_root_queue(p, tq);
+}
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- // Be sure the root queue priorities are set
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
-#endif
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
// Step 1: Normalize arguments (qos, overcommit, tq)
//
- qos_class_t qos = dqa->dqa_qos_class;
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
- !_dispatch_root_queues[
- DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
- qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
+ dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
+#if !HAVE_PTHREAD_WORKQUEUE_QOS
+ if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
+ qos = DISPATCH_QOS_USER_INITIATED;
}
-#endif
- bool maintenance_fallback = false;
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- maintenance_fallback = true;
-#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- if (maintenance_fallback) {
- if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
- !_dispatch_root_queues[
- DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
- qos = _DISPATCH_QOS_CLASS_BACKGROUND;
- }
+ if (qos == DISPATCH_QOS_MAINTENANCE) {
+ qos = DISPATCH_QOS_BACKGROUND;
}
+#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
- if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
+ if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
- if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
- tq = _dispatch_get_root_queue_with_overcommit(tq,
+ if (qos == DISPATCH_QOS_UNSPECIFIED) {
+ dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
+ tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
- if (qos != _DISPATCH_QOS_CLASS_UNSPECIFIED) {
+ if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
}
}
if (!tq) {
- qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ?
- _DISPATCH_QOS_CLASS_DEFAULT : qos;
- tq = _dispatch_get_root_queue(tq_qos, overcommit ==
- _dispatch_queue_attr_overcommit_enabled);
+ tq = _dispatch_get_root_queue(
+ qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
+ overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
+ if (legacy) {
+ dqf |= DQF_LEGACY;
+ }
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
}
}
- dispatch_queue_t dq = _dispatch_alloc(vtable,
+ dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
- DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
+ DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
+ (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
-
#if HAVE_PTHREAD_WORKQUEUE_QOS
- dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos,
- dqa->dqa_relative_priority,
- overcommit == _dispatch_queue_attr_overcommit_enabled ?
- _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
+ dq->dq_priority = dqa->dqa_qos_and_relpri;
+ if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
+ dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
+ }
#endif
_dispatch_retain(tq);
- if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
+ if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
- _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
+ _dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
dispatch_queue_create_with_accounting_override_voucher(const char *label,
dispatch_queue_attr_t attr, voucher_t voucher)
{
- dispatch_queue_t dq = dispatch_queue_create_with_target(label, attr,
- DISPATCH_TARGET_QUEUE_DEFAULT);
- dq->dq_override_voucher = _voucher_create_accounting_voucher(voucher);
- return dq;
+ (void)label; (void)attr; (void)voucher;
+ DISPATCH_CLIENT_CRASH(0, "Unsupported interface");
}
void
-_dispatch_queue_destroy(dispatch_queue_t dq)
+_dispatch_queue_destroy(dispatch_queue_t dq, bool *allow_free)
{
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
- if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
+ if (dx_hastypeflag(dq, QUEUE_ROOT)) {
initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
}
- if (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE) {
- // dispatch_cancel_and_wait may apply overrides in a racy way with
- // the source cancellation finishing. This race is expensive and not
- // really worthwhile to resolve since the source becomes dead anyway.
- dq_state &= ~DISPATCH_QUEUE_HAS_OVERRIDE;
- }
+ dq_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
+ dq_state &= ~DISPATCH_QUEUE_DIRTY;
+ dq_state &= ~DISPATCH_QUEUE_ROLE_MASK;
if (slowpath(dq_state != initial_state)) {
if (_dq_state_drain_locked(dq_state)) {
- DISPATCH_CLIENT_CRASH(dq, "Release of a locked queue");
+ DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
+ "Release of a locked queue");
}
#ifndef __LP64__
dq_state >>= 32;
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"Release of a queue with corrupt state");
}
- if (slowpath(dq == _dispatch_queue_get_current())) {
- DISPATCH_CLIENT_CRASH(dq, "Release of a queue by itself");
- }
if (slowpath(dq->dq_items_tail)) {
DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
"Release of a queue while items are enqueued");
// trash the queue so that use after free will crash
dq->dq_items_head = (void *)0x200;
dq->dq_items_tail = (void *)0x200;
- // poison the state with something that is suspended and is easy to spot
- dq->dq_state = 0xdead000000000000;
dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
(void *)0x200, relaxed);
if (dqsq) {
_dispatch_release(dqsq);
}
- if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
- if (dq->dq_override_voucher) _voucher_release(dq->dq_override_voucher);
- dq->dq_override_voucher = DISPATCH_NO_VOUCHER;
+
+ // fastpath for queues that never got their storage retained
+ if (likely(os_atomic_load2o(dq, dq_sref_cnt, relaxed) == 0)) {
+ // poison the state with something that is suspended and is easy to spot
+ dq->dq_state = 0xdead000000000000;
+ return;
}
+
+ // Take over freeing the memory from _dispatch_object_dealloc()
+ //
+ // As soon as we call _dispatch_queue_release_storage(), we forfeit
+ // the possibility for the caller of dx_dispose() to finalize the object
+ // so that responsibility is ours.
+ _dispatch_object_finalize(dq);
+ *allow_free = false;
+ dq->dq_label = "<released queue, pending free>";
+ dq->do_targetq = NULL;
+ dq->do_finalizer = NULL;
+ dq->do_ctxt = NULL;
+ return _dispatch_queue_release_storage(dq);
}
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
void
-_dispatch_queue_dispose(dispatch_queue_t dq)
+_dispatch_queue_dispose(dispatch_queue_t dq, bool *allow_free)
{
_dispatch_object_debug(dq, "%s", __func__);
_dispatch_introspection_queue_dispose(dq);
if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
free((void*)dq->dq_label);
}
- _dispatch_queue_destroy(dq);
+ _dispatch_queue_destroy(dq, allow_free);
+}
+
+void
+_dispatch_queue_xref_dispose(dispatch_queue_t dq)
+{
+ uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
+ if (unlikely(_dq_state_is_suspended(dq_state))) {
+ long state = (long)dq_state;
+ if (sizeof(long) < sizeof(uint64_t)) state = (long)(dq_state >> 32);
+ if (unlikely(_dq_state_is_inactive(dq_state))) {
+ // Arguments for and against this assert are within 6705399
+ DISPATCH_CLIENT_CRASH(state, "Release of an inactive object");
+ }
+ DISPATCH_CLIENT_CRASH(dq_state, "Release of a suspended object");
+ }
+ os_atomic_or2o(dq, dq_atomic_flags, DQF_RELEASED, relaxed);
}
DISPATCH_NOINLINE
// threads could have touched this value while we were trying to acquire
// the lock, or because another thread raced us to do the same operation
// and got to the lock first.
- if (slowpath(os_sub_overflow(dq_state, delta, &value))) {
+ if (unlikely(os_sub_overflow(dq_state, delta, &value))) {
os_atomic_rmw_loop_give_up(goto retry);
}
});
- if (slowpath(os_add_overflow(dq->dq_side_suspend_cnt,
+ if (unlikely(os_add_overflow(dq->dq_side_suspend_cnt,
DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
}
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
- if (slowpath(os_add_overflow(dq_state, value, &value))) {
+ if (unlikely(os_add_overflow(dq_state, value, &value))) {
os_atomic_rmw_loop_give_up({
return _dispatch_queue_suspend_slow(dq);
});
if (!_dq_state_is_suspended(dq_state)) {
// rdar://8181908 we need to extend the queue life for the duration
// of the call to wakeup at _dispatch_queue_resume() time.
- _dispatch_retain(dq);
+ _dispatch_retain_2(dq);
}
}
// threads could have touched this value while we were trying to acquire
// the lock, or because another thread raced us to do the same operation
// and got to the lock first.
- if (slowpath(os_add_overflow(dq_state, delta, &value))) {
+ if (unlikely(os_add_overflow(dq_state, delta, &value))) {
os_atomic_rmw_loop_give_up(goto retry);
}
});
static void
_dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
{
+ bool allow_resume = true;
// Step 2: run the activation finalizer
if (dx_vtable(dq)->do_finalize_activation) {
- dx_vtable(dq)->do_finalize_activation(dq);
+ dx_vtable(dq)->do_finalize_activation(dq, &allow_resume);
}
// Step 3: consume the suspend count
- return dx_vtable(dq)->do_resume(dq, false);
+ if (allow_resume) {
+ return dx_vtable(dq)->do_resume(dq, false);
+ }
}
void
{
// covers all suspend and inactive bits, including side suspend bit
const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
+ uint64_t pending_barrier_width =
+ (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
+ uint64_t set_owner_and_set_full_width_and_in_barrier =
+ _dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
+ DISPATCH_QUEUE_IN_BARRIER;
+
// backward compatibility: only dispatch sources can abuse
// dispatch_resume() to really mean dispatch_activate()
- bool resume_can_activate = (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE);
+ bool is_source = (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
uint64_t dq_state, value;
dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
+ DISPATCH_QUEUE_NEEDS_ACTIVATION) {
// { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
- } else if (resume_can_activate && (dq_state & suspend_bits) ==
+ } else if (is_source && (dq_state & suspend_bits) ==
DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
// { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
value = dq_state - DISPATCH_QUEUE_INACTIVE
- DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_SUSPEND_INTERVAL;
- } else {
- value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
- if (slowpath(os_sub_overflow(dq_state, value, &value))) {
- // underflow means over-resume or a suspend count transfer
- // to the side count is needed
- os_atomic_rmw_loop_give_up({
- if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
- goto over_resume;
- }
- return _dispatch_queue_resume_slow(dq);
- });
- }
- if (_dq_state_is_runnable(value) &&
- !_dq_state_drain_locked(value)) {
- uint64_t full_width = value;
- if (_dq_state_has_pending_barrier(value)) {
- full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
- full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
- full_width += DISPATCH_QUEUE_IN_BARRIER;
- } else {
- full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
- full_width += DISPATCH_QUEUE_IN_BARRIER;
- }
- if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
- DISPATCH_QUEUE_WIDTH_FULL_BIT) {
- value = full_width;
- value &= ~DISPATCH_QUEUE_DIRTY;
- value |= _dispatch_tid_self();
+ } else if (unlikely(os_sub_overflow(dq_state,
+ DISPATCH_QUEUE_SUSPEND_INTERVAL, &value))) {
+ // underflow means over-resume or a suspend count transfer
+ // to the side count is needed
+ os_atomic_rmw_loop_give_up({
+ if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
+ goto over_resume;
}
- }
+ return _dispatch_queue_resume_slow(dq);
+ });
+ //
+ // below this, value = dq_state - DISPATCH_QUEUE_SUSPEND_INTERVAL
+ //
+ } else if (!_dq_state_is_runnable(value)) {
+ // Out of width or still suspended.
+ // For the former, force _dispatch_queue_non_barrier_complete
+ // to reconsider whether it has work to do
+ value |= DISPATCH_QUEUE_DIRTY;
+ } else if (_dq_state_drain_locked(value)) {
+ // still locked by someone else, make drain_try_unlock() fail
+ // and reconsider whether it has work to do
+ value |= DISPATCH_QUEUE_DIRTY;
+ } else if (!is_source && (_dq_state_has_pending_barrier(value) ||
+ value + pending_barrier_width <
+ DISPATCH_QUEUE_WIDTH_FULL_BIT)) {
+ // if we can, acquire the full width drain lock
+ // and then perform a lock transfer
+ //
+ // However this is never useful for a source where there are no
+ // sync waiters, so never take the lock and do a plain wakeup
+ value &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
+ value |= set_owner_and_set_full_width_and_in_barrier;
+ } else {
+ // clear overrides and force a wakeup
+ value &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
+ value &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
}
});
}
if (activate) {
// if we're still in an activate codepath here we should have
// { sc:>0 na:1 }, if not we've got a corrupt state
- if (!fastpath(_dq_state_is_suspended(value))) {
+ if (unlikely(!_dq_state_is_suspended(value))) {
DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
}
return;
return;
}
- if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
- _dispatch_try_lock_transfer_or_wakeup(dq);
- } else if (_dq_state_should_wakeup(value)) {
+ if (_dq_state_is_dirty(dq_state)) {
// <rdar://problem/14637483>
- // seq_cst wrt state changes that were flushed and not acted upon
- os_atomic_thread_fence(acquire);
- pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq,
- _dispatch_queue_is_thread_bound(dq));
- // Balancing the retain() done in suspend() for rdar://8181908
- return dx_wakeup(dq, pp, DISPATCH_WAKEUP_CONSUME);
+ // dependency ordering for dq state changes that were flushed
+ // and not acted upon
+ os_atomic_thread_fence(dependency);
+ dq = os_atomic_force_dependency_on(dq, dq_state);
}
-
- // Balancing the retain() done in suspend() for rdar://8181908
- return _dispatch_release_tailcall(dq);
+ // Balancing the retain_2 done in suspend() for rdar://8181908
+ dispatch_wakeup_flags_t flags = DISPATCH_WAKEUP_CONSUME_2;
+ if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
+ flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
+ } else if (!_dq_state_is_runnable(value)) {
+ if (_dq_state_is_base_wlh(dq_state)) {
+ _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
+ }
+ return _dispatch_release_2(dq);
+ }
+ dispatch_assert(!_dq_state_received_sync_wait(dq_state));
+ dispatch_assert(!_dq_state_in_sync_transfer(dq_state));
+ return dx_wakeup(dq, _dq_state_max_qos(dq_state), flags);
over_resume:
- if (slowpath(_dq_state_is_inactive(dq_state))) {
+ if (unlikely(_dq_state_is_inactive(dq_state))) {
DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
}
DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
}
qos_class_t
-dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
+dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relpri_ptr)
{
- qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
- int relative_priority = 0;
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- pthread_priority_t dqp = dq->dq_priority;
- if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
- qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
-#else
- (void)dq;
-#endif
- if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
- return qos;
+ dispatch_qos_class_t qos = _dispatch_priority_qos(dq->dq_priority);
+ if (relpri_ptr) {
+ *relpri_ptr = qos ? _dispatch_priority_relpri(dq->dq_priority) : 0;
+ }
+ return _dispatch_qos_to_qos_class(qos);
}
static void
uint32_t tmp;
dispatch_queue_t dq = _dispatch_queue_get_current();
- if (w > 0) {
- tmp = (unsigned int)w;
- } else switch (w) {
- case 0:
- tmp = 1;
- break;
- case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
- tmp = dispatch_hw_config(physical_cpus);
- break;
- case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
- tmp = dispatch_hw_config(active_cpus);
- break;
- default:
- // fall through
- case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
- tmp = dispatch_hw_config(logical_cpus);
- break;
+ if (w >= 0) {
+ tmp = w ? (unsigned int)w : 1;
+ } else {
+ dispatch_qos_t qos = _dispatch_qos_from_pp(_dispatch_get_priority());
+ switch (w) {
+ case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
+ tmp = _dispatch_qos_max_parallelism(qos,
+ DISPATCH_MAX_PARALLELISM_PHYSICAL);
+ break;
+ case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
+ tmp = _dispatch_qos_max_parallelism(qos,
+ DISPATCH_MAX_PARALLELISM_ACTIVE);
+ break;
+ case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
+ default:
+ tmp = _dispatch_qos_max_parallelism(qos, 0);
+ break;
+ }
}
if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
tmp = DISPATCH_QUEUE_WIDTH_MAX;
dispatch_queue_flags_t old_dqf, new_dqf;
os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
- new_dqf = old_dqf & ~DQF_WIDTH_MASK;
- new_dqf |= (tmp << DQF_WIDTH_SHIFT);
+ new_dqf = (old_dqf & DQF_FLAGS_MASK) | DQF_WIDTH(tmp);
});
+ _dispatch_queue_inherit_wlh_from_target(dq, dq->do_targetq);
_dispatch_object_debug(dq, "%s", __func__);
}
void
dispatch_queue_set_width(dispatch_queue_t dq, long width)
{
- if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
- slowpath(dx_hastypeflag(dq, QUEUE_ROOT))) {
+ if (unlikely(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT ||
+ dx_hastypeflag(dq, QUEUE_ROOT) ||
+ dx_hastypeflag(dq, QUEUE_BASE))) {
return;
}
DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
}
- _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
- _dispatch_queue_set_width2);
+ if (likely((int)width >= 0)) {
+ _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
+ _dispatch_queue_set_width2, DISPATCH_BARRIER_TRYSYNC_SUSPEND);
+ } else {
+ // The negative width constants need to execute on the queue to
+ // query the queue QoS
+ _dispatch_barrier_async_detached_f(dq, (void*)(intptr_t)width,
+ _dispatch_queue_set_width2);
+ }
}
static void
dispatch_queue_t otq = dq->do_targetq;
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
+#if DISPATCH_ALLOW_NON_LEAF_RETARGET
_dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
_dispatch_bug_deprecated("Changing the target of a queue "
"already targeted by other dispatch objects");
+#else
+ DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
+ "already targeted by other dispatch objects");
+#endif
}
_dispatch_queue_priority_inherit_from_target(dq, tq);
- _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
+ _dispatch_queue_inherit_wlh_from_target(dq, tq);
#if HAVE_PTHREAD_WORKQUEUE_QOS
// see _dispatch_queue_class_wakeup()
_dispatch_queue_sidelock_lock(dq);
dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
dq->do_targetq);
- if (slowpath(!tq)) {
+ if (unlikely(!tq)) {
bool is_concurrent_q = (dq->dq_width > 1);
- tq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
- !is_concurrent_q);
+ tq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, !is_concurrent_q);
}
if (_dispatch_queue_try_inactive_suspend(dq)) {
return dx_vtable(dq)->do_resume(dq, false);
}
- if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
- DISPATCH_CLIENT_CRASH(dq, "Cannot change the target of a queue or "
- "source with an accounting override voucher "
+#if !DISPATCH_ALLOW_NON_LEAF_RETARGET
+ if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
+ DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
+ "already targeted by other dispatch objects");
+ }
+#endif
+
+ if (unlikely(!_dispatch_queue_is_legacy(dq))) {
+#if DISPATCH_ALLOW_NON_LEAF_RETARGET
+ if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
+ DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
+ "already targeted by other dispatch objects");
+ }
+#endif
+ DISPATCH_CLIENT_CRASH(0, "Cannot change the target of this object "
"after it has been activated");
}
unsigned long type = dx_type(dq);
switch (type) {
case DISPATCH_QUEUE_LEGACY_TYPE:
+#if DISPATCH_ALLOW_NON_LEAF_RETARGET
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
_dispatch_bug_deprecated("Changing the target of a queue "
"already targeted by other dispatch objects");
}
+#endif
break;
case DISPATCH_SOURCE_KEVENT_TYPE:
case DISPATCH_MACH_CHANNEL_TYPE:
_dispatch_bug_deprecated("Changing the target of a source "
"after it has been activated");
break;
-
- case DISPATCH_QUEUE_SERIAL_TYPE:
- case DISPATCH_QUEUE_CONCURRENT_TYPE:
- DISPATCH_CLIENT_CRASH(type, "Cannot change the target of this queue "
- "after it has been activated");
default:
DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
}
_dispatch_retain(tq);
return _dispatch_barrier_trysync_or_async_f(dq, tq,
- _dispatch_queue_legacy_set_target_queue);
+ _dispatch_queue_legacy_set_target_queue,
+ DISPATCH_BARRIER_TRYSYNC_SUSPEND);
}
#pragma mark -
_dispatch_mgr_root_queue_pthread_context;
static struct dispatch_root_queue_context_s
_dispatch_mgr_root_queue_context = {{{
-#if HAVE_PTHREAD_WORKQUEUES
+#if DISPATCH_USE_WORKQUEUES
.dgq_kworkqueue = (void*)(~0ul),
#endif
.dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = &_dispatch_mgr_root_queue_context,
.dq_label = "com.apple.root.libdispatch-manager",
- .dq_width = DISPATCH_QUEUE_WIDTH_POOL,
- .dq_override = DISPATCH_SATURATED_OVERRIDE,
- .dq_override_voucher = DISPATCH_NO_VOUCHER,
+ .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
+ .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
+ DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 3,
};
#endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
static dispatch_once_t _dispatch_mgr_sched_pred;
-// TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
-
#if HAVE_PTHREAD_WORKQUEUE_QOS
+// TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
// Must be kept in sync with list of qos classes in sys/qos.h
static const int _dispatch_mgr_sched_qos2prio[] = {
- [_DISPATCH_QOS_CLASS_MAINTENANCE] = 4,
- [_DISPATCH_QOS_CLASS_BACKGROUND] = 4,
- [_DISPATCH_QOS_CLASS_UTILITY] = 20,
- [_DISPATCH_QOS_CLASS_DEFAULT] = 31,
- [_DISPATCH_QOS_CLASS_USER_INITIATED] = 37,
- [_DISPATCH_QOS_CLASS_USER_INTERACTIVE] = 47,
+ [QOS_CLASS_MAINTENANCE] = 4,
+ [QOS_CLASS_BACKGROUND] = 4,
+ [QOS_CLASS_UTILITY] = 20,
+ [QOS_CLASS_DEFAULT] = 31,
+ [QOS_CLASS_USER_INITIATED] = 37,
+ [QOS_CLASS_USER_INTERACTIVE] = 47,
};
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
(void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m));
#if HAVE_PTHREAD_WORKQUEUE_QOS
qos_class_t qos = qos_class_main();
- if (qos == _DISPATCH_QOS_CLASS_DEFAULT) {
- qos = _DISPATCH_QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
+ if (qos == QOS_CLASS_DEFAULT) {
+ qos = QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
}
if (qos) {
_dispatch_mgr_sched.qos = qos;
(void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
qos, 0));
}
- _dispatch_mgr_q.dq_priority =
- (dispatch_priority_t)_pthread_qos_class_encode(qos, 0, 0);
}
#endif
param.sched_priority = _dispatch_mgr_sched.prio;
if (p >= prio) os_atomic_rmw_loop_give_up(return);
});
#if DISPATCH_USE_KEVENT_WORKQUEUE
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
+ _dispatch_root_queues_init();
if (_dispatch_kevent_workqueue_enabled) {
pthread_priority_t pp = 0;
if (prio > _dispatch_mgr_sched.default_prio) {
_dispatch_kevent_workqueue_init(void)
{
// Initialize kevent workqueue support
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
+ _dispatch_root_queues_init();
if (!_dispatch_kevent_workqueue_enabled) return;
dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
qos_class_t qos = _dispatch_mgr_sched.qos;
pthread_priority_t pp = 0;
if (qos) {
pp = _pthread_qos_class_encode(qos, 0, 0);
- _dispatch_mgr_q.dq_priority = (dispatch_priority_t)pp;
}
if (prio > _dispatch_mgr_sched.default_prio) {
pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
(void)dispatch_assume_zero(r);
}
}
-#endif
+#endif // DISPATCH_USE_KEVENT_WORKQUEUE
#pragma mark -
#pragma mark dispatch_pthread_root_queue
dispatch_pthread_root_queue_context_t pqc;
dispatch_queue_flags_t dqf = 0;
size_t dqs;
- uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
- (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
+ int32_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
+ (int8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
- dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
+ dq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_root), dqs +
sizeof(struct dispatch_root_queue_context_s) +
sizeof(struct dispatch_pthread_root_queue_context_s));
qc = (void*)dq + dqs;
}
}
- _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, false);
+ _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, 0);
dq->dq_label = label;
- dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
- dq->dq_override = DISPATCH_SATURATED_OVERRIDE;
+ dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
dq->do_ctxt = qc;
- dq->do_targetq = NULL;
+ dq->dq_priority = DISPATCH_PRIORITY_SATURATED_OVERRIDE;
pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
qc->dgq_ctxt = pqc;
-#if HAVE_PTHREAD_WORKQUEUES
+#if DISPATCH_USE_WORKQUEUES
qc->dgq_kworkqueue = (void*)(~0ul);
#endif
_dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
{
dispatch_queue_t dq = _dispatch_queue_get_current();
if (!dq) return NULL;
- while (slowpath(dq->do_targetq)) {
+ while (unlikely(dq->do_targetq)) {
dq = dq->do_targetq;
}
if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
void
-_dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
+_dispatch_pthread_root_queue_dispose(dispatch_queue_t dq, bool *allow_free)
{
if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
pthread_attr_destroy(&pqc->dpq_thread_attr);
- _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
+ _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator, NULL);
if (pqc->dpq_thread_configure) {
Block_release(pqc->dpq_thread_configure);
}
- dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
- false);
+ dq->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
#endif
if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
free((void*)dq->dq_label);
}
- _dispatch_queue_destroy(dq);
+ _dispatch_queue_destroy(dq, allow_free);
}
#pragma mark -
DISPATCH_QUEUE_HEADER(queue_specific_queue);
TAILQ_HEAD(dispatch_queue_specific_head_s,
dispatch_queue_specific_s) dqsq_contexts;
-} DISPATCH_QUEUE_ALIGN;
+} DISPATCH_ATOMIC64_ALIGN;
struct dispatch_queue_specific_s {
const void *dqs_key;
DISPATCH_DECL(dispatch_queue_specific);
void
-_dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
+_dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq,
+ bool *allow_free)
{
dispatch_queue_specific_t dqs, tmp;
+ dispatch_queue_t rq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
if (dqs->dqs_destructor) {
- dispatch_async_f(_dispatch_get_root_queue(
- _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
- dqs->dqs_destructor);
+ dispatch_async_f(rq, dqs->dqs_ctxt, dqs->dqs_destructor);
}
free(dqs);
}
- _dispatch_queue_destroy(dqsq->_as_dq);
+ _dispatch_queue_destroy(dqsq->_as_dq, allow_free);
}
static void
{
dispatch_queue_specific_queue_t dqsq;
- dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
+ dqsq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_specific_queue),
sizeof(struct dispatch_queue_specific_queue_s));
- _dispatch_queue_init(dqsq->_as_dq, DQF_NONE,
- DISPATCH_QUEUE_WIDTH_MAX, false);
+ _dispatch_queue_init(dqsq->_as_dq, DQF_NONE, DISPATCH_QUEUE_WIDTH_MAX,
+ DISPATCH_QUEUE_ROLE_BASE_ANON);
dqsq->do_xref_cnt = -1;
dqsq->do_targetq = _dispatch_get_root_queue(
- _DISPATCH_QOS_CLASS_USER_INITIATED, true);
+ DISPATCH_QOS_USER_INITIATED, true);
dqsq->dq_label = "queue-specific";
TAILQ_INIT(&dqsq->dqsq_contexts);
if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
// Destroy previous context for existing key
if (dqs->dqs_destructor) {
dispatch_async_f(_dispatch_get_root_queue(
- _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
+ DISPATCH_QOS_DEFAULT, false), dqs->dqs_ctxt,
dqs->dqs_destructor);
}
if (dqsn->dqs_ctxt) {
_dispatch_queue_init_specific(dq);
}
_dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
- _dispatch_queue_set_specific);
+ _dispatch_queue_set_specific, 0);
}
static void
*ctxtp = NULL;
}
+DISPATCH_ALWAYS_INLINE
+static inline void *
+_dispatch_queue_get_specific_inline(dispatch_queue_t dq, const void *key)
+{
+ void *ctxt = NULL;
+ if (fastpath(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE && dq->dq_specific_q)){
+ ctxt = (void *)key;
+ dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
+ }
+ return ctxt;
+}
+
DISPATCH_NOINLINE
void *
dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
if (slowpath(!key)) {
return NULL;
}
- void *ctxt = NULL;
-
- if (fastpath(dq->dq_specific_q)) {
- ctxt = (void *)key;
- dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
- }
- return ctxt;
+ return _dispatch_queue_get_specific_inline(dq, key);
}
DISPATCH_NOINLINE
dispatch_queue_t dq = _dispatch_queue_get_current();
while (slowpath(dq)) {
- if (slowpath(dq->dq_specific_q)) {
- ctxt = (void *)key;
- dispatch_sync_f(dq->dq_specific_q, &ctxt,
- _dispatch_queue_get_specific);
- if (ctxt) break;
- }
+ ctxt = _dispatch_queue_get_specific_inline(dq, key);
+ if (ctxt) break;
dq = dq->do_targetq;
}
return ctxt;
DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- return _dq_state_drain_locked_by(dq_state, _dispatch_tid_self());
+ return _dq_state_drain_locked_by_self(dq_state);
}
#endif
{
size_t offset = 0;
dispatch_queue_t target = dq->do_targetq;
+ const char *tlabel = target && target->dq_label ? target->dq_label : "";
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- offset += dsnprintf(&buf[offset], bufsiz - offset,
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "sref = %d, "
"target = %s[%p], width = 0x%x, state = 0x%016llx",
- target && target->dq_label ? target->dq_label : "", target,
- dq->dq_width, (unsigned long long)dq_state);
+ dq->dq_sref_cnt + 1, tlabel, target, dq->dq_width,
+ (unsigned long long)dq_state);
if (_dq_state_is_suspended(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
_dq_state_suspend_cnt(dq_state));
if (_dq_state_is_dirty(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
}
- if (_dq_state_has_override(dq_state)) {
- offset += dsnprintf(&buf[offset], bufsiz - offset, ", async-override");
+ dispatch_qos_t qos = _dq_state_max_qos(dq_state);
+ if (qos) {
+ offset += dsnprintf(&buf[offset], bufsiz - offset, ", max qos %d", qos);
}
mach_port_t owner = _dq_state_drain_owner(dq_state);
if (!_dispatch_queue_is_thread_bound(dq) && owner) {
}
#endif
-#if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
-static OSSpinLock _dispatch_stats_lock;
+#if DISPATCH_PERF_MON
+
+#define DISPATCH_PERF_MON_BUCKETS 8
+
static struct {
- uint64_t time_total;
- uint64_t count_total;
- uint64_t thread_total;
-} _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
+ uint64_t volatile time_total;
+ uint64_t volatile count_total;
+ uint64_t volatile thread_total;
+} _dispatch_stats[DISPATCH_PERF_MON_BUCKETS];
+DISPATCH_USED static size_t _dispatch_stat_buckets = DISPATCH_PERF_MON_BUCKETS;
-static void
-_dispatch_queue_merge_stats(uint64_t start)
+void
+_dispatch_queue_merge_stats(uint64_t start, bool trace, perfmon_thread_type type)
{
uint64_t delta = _dispatch_absolute_time() - start;
unsigned long count;
-
+ int bucket = 0;
count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
_dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
-
- int bucket = flsl((long)count);
-
- // 64-bit counters on 32-bit require a lock or a queue
- OSSpinLockLock(&_dispatch_stats_lock);
-
- _dispatch_stats[bucket].time_total += delta;
- _dispatch_stats[bucket].count_total += count;
- _dispatch_stats[bucket].thread_total++;
-
- OSSpinLockUnlock(&_dispatch_stats_lock);
+ if (count == 0) {
+ bucket = 0;
+ if (trace) _dispatch_ktrace1(DISPATCH_PERF_MON_worker_useless, type);
+ } else {
+ bucket = MIN(DISPATCH_PERF_MON_BUCKETS - 1,
+ (int)sizeof(count) * CHAR_BIT - __builtin_clzl(count));
+ os_atomic_add(&_dispatch_stats[bucket].count_total, count, relaxed);
+ }
+ os_atomic_add(&_dispatch_stats[bucket].time_total, delta, relaxed);
+ os_atomic_inc(&_dispatch_stats[bucket].thread_total, relaxed);
+ if (trace) {
+ _dispatch_ktrace3(DISPATCH_PERF_MON_worker_thread_end, count, delta, type);
+ }
}
+
#endif
#pragma mark -
pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
// when we unbind, overcomitness can flip, so we need to learn
// it from the defaultpri, see _dispatch_priority_compute_update
- pp |= (_dispatch_get_defaultpriority() &
- _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
+ pp |= (_dispatch_get_basepri() &
+ DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
} else {
// else we need to keep the one that is set in the current pri
pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
}
- if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q) ==
- _dispatch_tid_self())) {
+ uint64_t mgr_dq_state =
+ os_atomic_load2o(&_dispatch_mgr_q, dq_state, relaxed);
+ if (unlikely(_dq_state_drain_locked_by_self(mgr_dq_state))) {
DISPATCH_INTERNAL_CRASH(pp,
"Changing the QoS while on the manager queue");
}
DISPATCH_NOINLINE
voucher_t
_dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
- voucher_t v, _dispatch_thread_set_self_t flags)
+ voucher_t v, dispatch_thread_set_self_t flags)
{
voucher_t ov = DISPATCH_NO_VOUCHER;
mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
kv = _voucher_swap_and_get_mach_voucher(ov, v);
}
}
-#if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
- flags &= ~(_dispatch_thread_set_self_t)DISPATCH_THREAD_PARK;
-#endif
if (!(flags & DISPATCH_THREAD_PARK)) {
_dispatch_set_priority_and_mach_voucher_slow(priority, kv);
}
#pragma mark -
#pragma mark dispatch_continuation_t
+const struct dispatch_continuation_vtable_s _dispatch_continuation_vtables[] = {
+ DC_VTABLE_ENTRY(ASYNC_REDIRECT,
+ .do_kind = "dc-redirect",
+ .do_invoke = _dispatch_async_redirect_invoke),
+#if HAVE_MACH
+ DC_VTABLE_ENTRY(MACH_SEND_BARRRIER_DRAIN,
+ .do_kind = "dc-mach-send-drain",
+ .do_invoke = _dispatch_mach_send_barrier_drain_invoke),
+ DC_VTABLE_ENTRY(MACH_SEND_BARRIER,
+ .do_kind = "dc-mach-send-barrier",
+ .do_invoke = _dispatch_mach_barrier_invoke),
+ DC_VTABLE_ENTRY(MACH_RECV_BARRIER,
+ .do_kind = "dc-mach-recv-barrier",
+ .do_invoke = _dispatch_mach_barrier_invoke),
+ DC_VTABLE_ENTRY(MACH_ASYNC_REPLY,
+ .do_kind = "dc-mach-async-reply",
+ .do_invoke = _dispatch_mach_msg_async_reply_invoke),
+#endif
+#if HAVE_PTHREAD_WORKQUEUE_QOS
+ DC_VTABLE_ENTRY(OVERRIDE_STEALING,
+ .do_kind = "dc-override-stealing",
+ .do_invoke = _dispatch_queue_override_invoke),
+ DC_VTABLE_ENTRY(OVERRIDE_OWNING,
+ .do_kind = "dc-override-owning",
+ .do_invoke = _dispatch_queue_override_invoke),
+#endif
+};
+
static void
_dispatch_force_cache_cleanup(void)
{
dc = _dispatch_thread_getspecific(dispatch_cache_key);
int cnt;
if (!dc || (cnt = dc->dc_cache_cnt -
- _dispatch_continuation_cache_limit) <= 0){
+ _dispatch_continuation_cache_limit) <= 0) {
return;
}
do {
}
#endif
-DISPATCH_ALWAYS_INLINE_NDEBUG
-static inline void
-_dispatch_continuation_slow_item_signal(dispatch_queue_t dq,
- dispatch_object_t dou)
-{
- dispatch_continuation_t dc = dou._dc;
- pthread_priority_t pp = dq->dq_override;
-
- _dispatch_trace_continuation_pop(dq, dc);
- if (pp > (dc->dc_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
- _dispatch_wqthread_override_start((mach_port_t)dc->dc_data, pp);
- }
- _dispatch_thread_event_signal((dispatch_thread_event_t)dc->dc_other);
- _dispatch_introspection_queue_item_complete(dc);
-}
-
DISPATCH_NOINLINE
static void
_dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
- _dispatch_queue_push(dq, dc,
- _dispatch_continuation_get_override_priority(dq, dc));
-}
-
-DISPATCH_NOINLINE
-static void
-_dispatch_continuation_push_sync_slow(dispatch_queue_t dq,
- dispatch_continuation_t dc)
-{
- _dispatch_queue_push_inline(dq, dc,
- _dispatch_continuation_get_override_priority(dq, dc),
- DISPATCH_WAKEUP_SLOW_WAITER);
+ dx_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}
DISPATCH_ALWAYS_INLINE
}
if (atomic_flags & DBF_CANCELED) goto out;
- pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
- _dispatch_thread_set_self_t adopt_flags = 0;
- if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
- op = _dispatch_get_priority();
+ pthread_priority_t op = 0, p = 0;
+ op = _dispatch_block_invoke_should_set_priority(flags, dbpd->dbpd_priority);
+ if (op) {
p = dbpd->dbpd_priority;
- if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
- adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
- }
}
voucher_t ov, v = DISPATCH_NO_VOUCHER;
if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
v = dbpd->dbpd_voucher;
}
- ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
+ ov = _dispatch_set_priority_and_voucher(p, v, 0);
dbpd->dbpd_thread = _dispatch_tid_self();
_dispatch_client_callout(dbpd->dbpd_block,
_dispatch_Block_invoke(dbpd->dbpd_block));
dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
dispatch_block_flags_t flags = dbpd->dbpd_flags;
unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
- if (slowpath(atomic_flags & DBF_WAITED)) {
+ if (unlikely(atomic_flags & DBF_WAITED)) {
DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
"run more than once and waited for");
}
if (atomic_flags & DBF_CANCELED) goto out;
- pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
- _dispatch_thread_set_self_t adopt_flags = 0;
- if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
- op = _dispatch_get_priority();
- p = dbpd->dbpd_priority;
- if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
- adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
- }
- }
- voucher_t ov, v = DISPATCH_NO_VOUCHER;
+ voucher_t ov = DISPATCH_NO_VOUCHER;
if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
- v = dbpd->dbpd_voucher;
+ ov = _dispatch_adopt_priority_and_set_voucher(0, dbpd->dbpd_voucher, 0);
}
- ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
dbpd->dbpd_block();
- _dispatch_reset_priority_and_voucher(op, ov);
+ _dispatch_reset_voucher(ov, 0);
out:
if ((atomic_flags & DBF_PERFORM) == 0) {
if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
if (oq) {
// balances dispatch_{,barrier_,}sync
- _os_object_release_internal(oq->_as_os_obj);
+ _os_object_release_internal_n(oq->_as_os_obj, 2);
}
}
-DISPATCH_ALWAYS_INLINE
static void
-_dispatch_block_async_invoke2(dispatch_block_t b, bool release)
+_dispatch_block_async_invoke_reset_max_qos(dispatch_queue_t dq,
+ dispatch_qos_t qos)
+{
+ uint64_t old_state, new_state, qos_bits = _dq_state_from_qos(qos);
+
+ // Only dispatch queues can reach this point (as opposed to sources or more
+ // complex objects) which allows us to handle the DIRTY bit protocol by only
+ // looking at the tail
+ dispatch_assert(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE);
+
+again:
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ dispatch_assert(_dq_state_is_base_wlh(old_state));
+ if ((old_state & DISPATCH_QUEUE_MAX_QOS_MASK) <= qos_bits) {
+ // Nothing to do if the QoS isn't going down
+ os_atomic_rmw_loop_give_up(return);
+ }
+ if (_dq_state_is_dirty(old_state)) {
+ os_atomic_rmw_loop_give_up({
+ // just renew the drain lock with an acquire barrier, to see
+ // what the enqueuer that set DIRTY has done.
+ // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
+ // is already in a register
+ os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
+ if (!dq->dq_items_tail) {
+ goto again;
+ }
+ return;
+ });
+ }
+
+ new_state = old_state;
+ new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
+ new_state |= qos_bits;
+ });
+
+ _dispatch_deferred_items_get()->ddi_wlh_needs_update = true;
+ _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
+}
+
+#define DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE 0x1
+#define DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET 0x2
+
+DISPATCH_NOINLINE
+static void
+_dispatch_block_async_invoke2(dispatch_block_t b, unsigned long invoke_flags)
{
dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
"run more than once and waited for");
}
+
+ if (unlikely((dbpd->dbpd_flags &
+ DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE) &&
+ !(invoke_flags & DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET))) {
+ dispatch_queue_t dq = _dispatch_get_current_queue();
+ dispatch_qos_t qos = _dispatch_qos_from_pp(_dispatch_get_priority());
+ if ((dispatch_wlh_t)dq == _dispatch_get_wlh() && !dq->dq_items_tail) {
+ _dispatch_block_async_invoke_reset_max_qos(dq, qos);
+ }
+ }
+
if (!slowpath(atomic_flags & DBF_CANCELED)) {
dbpd->dbpd_block();
}
dispatch_group_leave(_dbpd_group(dbpd));
}
}
- os_mpsc_queue_t oq;
- oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
+
+ os_mpsc_queue_t oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
if (oq) {
// balances dispatch_{,barrier_,group_}async
- _os_object_release_internal_inline(oq->_as_os_obj);
+ _os_object_release_internal_n_inline(oq->_as_os_obj, 2);
}
- if (release) {
+
+ if (invoke_flags & DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE) {
Block_release(b);
}
}
static void
_dispatch_block_async_invoke(void *block)
{
- _dispatch_block_async_invoke2(block, false);
+ _dispatch_block_async_invoke2(block, 0);
}
static void
_dispatch_block_async_invoke_and_release(void *block)
{
- _dispatch_block_async_invoke2(block, true);
+ _dispatch_block_async_invoke2(block, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE);
+}
+
+static void
+_dispatch_block_async_invoke_and_release_mach_barrier(void *block)
+{
+ _dispatch_block_async_invoke2(block, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE |
+ DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET);
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_block_supports_wait_and_cancel(dispatch_block_private_data_t dbpd)
+{
+ return dbpd && !(dbpd->dbpd_flags &
+ DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE);
}
void
dispatch_block_cancel(dispatch_block_t db)
{
dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
- if (!dbpd) {
+ if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
"dispatch_block_cancel()");
}
dispatch_block_testcancel(dispatch_block_t db)
{
dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
- if (!dbpd) {
+ if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
"dispatch_block_testcancel()");
}
dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
{
dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
- if (!dbpd) {
+ if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
"dispatch_block_wait()");
}
// neither of us would ever release. Side effect: After a _wait
// that times out, subsequent waits will not boost the qos of the
// still-running block.
- dx_wakeup(boost_oq, pp, DISPATCH_WAKEUP_OVERRIDING |
- DISPATCH_WAKEUP_CONSUME);
+ dx_wakeup(boost_oq, _dispatch_qos_from_pp(pp),
+ DISPATCH_WAKEUP_BLOCK_WAIT | DISPATCH_WAKEUP_CONSUME_2);
}
mach_port_t boost_th = dbpd->dbpd_thread;
// balanced in d_block_async_invoke_and_release or d_block_wait
if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, oq, relaxed)) {
- _os_object_retain_internal_inline(oq->_as_os_obj);
+ _os_object_retain_internal_n_inline(oq->_as_os_obj, 2);
}
- if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
+ if (dc_flags & DISPATCH_OBJ_MACH_BARRIER) {
+ dispatch_assert(dc_flags & DISPATCH_OBJ_CONSUME_BIT);
+ dc->dc_func = _dispatch_block_async_invoke_and_release_mach_barrier;
+ } else if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
dc->dc_func = _dispatch_block_async_invoke_and_release;
} else {
dc->dc_func = _dispatch_block_async_invoke;
dc->dc_flags = dc_flags;
}
-void
-_dispatch_continuation_update_bits(dispatch_continuation_t dc,
- uintptr_t dc_flags)
-{
- dc->dc_flags = dc_flags;
- if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
- if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
- dc->dc_func = _dispatch_block_async_invoke_and_release;
- } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
- dc->dc_func = _dispatch_call_block_and_release;
- }
- } else {
- if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
- dc->dc_func = _dispatch_block_async_invoke;
- } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
- dc->dc_func = _dispatch_Block_invoke(dc->dc_ctxt);
- }
- }
-}
-
#endif // __BLOCKS__
-
#pragma mark -
#pragma mark dispatch_barrier_async
dc->dc_ctxt = ctxt;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
dc->dc_priority = DISPATCH_NO_PRIORITY;
- _dispatch_queue_push(dq, dc, 0);
+ dx_push(dq, dc, 0);
}
#ifdef __BLOCKS__
void
-dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
+dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
void
_dispatch_async_redirect_invoke(dispatch_continuation_t dc,
- dispatch_invoke_flags_t flags)
+ dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
dispatch_thread_frame_s dtf;
struct dispatch_continuation_s *other_dc = dc->dc_other;
// the "right" root queue was stuffed into dc_func
dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
dispatch_queue_t dq = dc->dc_data, rq, old_dq;
- struct _dispatch_identity_s di;
-
- pthread_priority_t op, dp, old_dp;
+ dispatch_priority_t old_dbp;
if (ctxt_flags) {
flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
}
old_dq = _dispatch_get_current_queue();
if (assumed_rq) {
- _dispatch_queue_set_current(assumed_rq);
- _dispatch_root_queue_identity_assume(&di, 0);
- }
-
- old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp);
- op = dq->dq_override;
- if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
- _dispatch_wqthread_override_start(_dispatch_tid_self(), op);
- // Ensure that the root queue sees that this thread was overridden.
- _dispatch_set_defaultpriority_override();
+ old_dbp = _dispatch_root_queue_identity_assume(assumed_rq);
+ _dispatch_set_basepri(dq->dq_priority);
+ } else {
+ old_dbp = _dispatch_set_basepri(dq->dq_priority);
}
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
DISPATCH_OBJ_CONSUME_BIT, {
- _dispatch_continuation_pop(other_dc, dq, flags);
+ _dispatch_continuation_pop(other_dc, dic, flags, dq);
});
_dispatch_thread_frame_pop(&dtf);
- if (assumed_rq) {
- _dispatch_root_queue_identity_restore(&di);
- _dispatch_queue_set_current(old_dq);
- }
- _dispatch_reset_defaultpriority(old_dp);
+ if (assumed_rq) _dispatch_queue_set_current(old_dq);
+ _dispatch_reset_basepri(old_dbp);
rq = dq->do_targetq;
while (slowpath(rq->do_targetq) && rq != old_dq) {
- _dispatch_non_barrier_complete(rq);
+ _dispatch_queue_non_barrier_complete(rq);
rq = rq->do_targetq;
}
- _dispatch_non_barrier_complete(dq);
-
- if (dtf.dtf_deferred) {
- struct dispatch_object_s *dou = dtf.dtf_deferred;
- return _dispatch_queue_drain_deferred_invoke(dq, flags, 0, dou);
- }
-
- _dispatch_release_tailcall(dq);
+ _dispatch_queue_non_barrier_complete(dq);
+ _dispatch_release_tailcall(dq); // pairs with _dispatch_async_redirect_wrap
}
DISPATCH_ALWAYS_INLINE
dc->dc_other = dou._do;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
dc->dc_priority = DISPATCH_NO_PRIORITY;
- _dispatch_retain(dq);
+ _dispatch_retain(dq); // released in _dispatch_async_redirect_invoke
return dc;
}
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
- dispatch_object_t dou, pthread_priority_t pp)
+ dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
dq = dq->do_targetq;
}
- _dispatch_queue_push(dq, dou, pp);
+ dx_push(dq, dou, qos);
}
DISPATCH_ALWAYS_INLINE
// by _dispatch_async_f2.
// However we want to end up on the root queue matching `dc` qos, so pick up
// the current override of `dq` which includes dc's overrde (and maybe more)
- _dispatch_async_f_redirect(dq, dc, dq->dq_override);
+ uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
+ _dispatch_async_f_redirect(dq, dc, _dq_state_max_qos(dq_state));
_dispatch_introspection_queue_item_complete(dc);
}
}
return _dispatch_async_f_redirect(dq, dc,
- _dispatch_continuation_get_override_priority(dq, dc));
+ _dispatch_continuation_override_qos(dq, dc));
}
DISPATCH_ALWAYS_INLINE
#ifdef __BLOCKS__
void
-dispatch_async(dispatch_queue_t dq, void (^work)(void))
+dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
#endif
#pragma mark -
-#pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
+#pragma mark _dispatch_sync_invoke / _dispatch_sync_complete
DISPATCH_NOINLINE
static void
-_dispatch_sync_function_invoke_slow(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func)
+_dispatch_queue_non_barrier_complete(dispatch_queue_t dq)
{
- voucher_t ov;
- dispatch_thread_frame_s dtf;
- _dispatch_thread_frame_push(&dtf, dq);
- ov = _dispatch_set_priority_and_voucher(0, dq->dq_override_voucher, 0);
- _dispatch_client_callout(ctxt, func);
- _dispatch_perfmon_workitem_inc();
- _dispatch_reset_voucher(ov, 0);
- _dispatch_thread_frame_pop(&dtf);
+ uint64_t old_state, new_state, owner_self = _dispatch_lock_value_for_self();
+
+ // see _dispatch_queue_resume()
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
+ if (unlikely(_dq_state_drain_locked(old_state))) {
+ // make drain_try_unlock() fail and reconsider whether there's
+ // enough width now for a new item
+ new_state |= DISPATCH_QUEUE_DIRTY;
+ } else if (likely(_dq_state_is_runnable(new_state))) {
+ uint64_t full_width = new_state;
+ if (_dq_state_has_pending_barrier(old_state)) {
+ full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
+ full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
+ full_width += DISPATCH_QUEUE_IN_BARRIER;
+ } else {
+ full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
+ full_width += DISPATCH_QUEUE_IN_BARRIER;
+ }
+ if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
+ DISPATCH_QUEUE_WIDTH_FULL_BIT) {
+ new_state = full_width;
+ new_state &= ~DISPATCH_QUEUE_DIRTY;
+ new_state |= owner_self;
+ } else if (_dq_state_is_dirty(old_state)) {
+ new_state |= DISPATCH_QUEUE_ENQUEUED;
+ }
+ }
+ });
+
+ if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
+ if (_dq_state_is_dirty(old_state)) {
+ // <rdar://problem/14637483>
+ // dependency ordering for dq state changes that were flushed
+ // and not acted upon
+ os_atomic_thread_fence(dependency);
+ dq = os_atomic_force_dependency_on(dq, old_state);
+ }
+ return _dispatch_queue_barrier_complete(dq, 0, 0);
+ }
+
+ if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
+ _dispatch_retain_2(dq);
+ dispatch_assert(!_dq_state_is_base_wlh(new_state));
+ return dx_push(dq->do_targetq, dq, _dq_state_max_qos(new_state));
+ }
}
+
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
- if (slowpath(dq->dq_override_voucher != DISPATCH_NO_VOUCHER)) {
- return _dispatch_sync_function_invoke_slow(dq, ctxt, func);
- }
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
-void
-_dispatch_sync_recurse_invoke(void *ctxt)
+DISPATCH_NOINLINE
+static void
+_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
+ uintptr_t dc_flags)
{
- dispatch_continuation_t dc = ctxt;
- _dispatch_sync_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
+ bool barrier = (dc_flags & DISPATCH_OBJ_BARRIER_BIT);
+ do {
+ if (dq == stop_dq) return;
+ if (barrier) {
+ _dispatch_queue_barrier_complete(dq, 0, 0);
+ } else {
+ _dispatch_queue_non_barrier_complete(dq);
+ }
+ dq = dq->do_targetq;
+ barrier = (dq->dq_width == 1);
+ } while (unlikely(dq->do_targetq));
}
-DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_sync_function_recurse(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+DISPATCH_NOINLINE
+static void
+_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func, uintptr_t dc_flags)
{
- struct dispatch_continuation_s dc = {
- .dc_data = dq,
- .dc_func = func,
- .dc_ctxt = ctxt,
- };
- _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
+ _dispatch_sync_function_invoke_inline(dq, ctxt, func);
+ _dispatch_sync_complete_recurse(dq, NULL, dc_flags);
}
DISPATCH_NOINLINE
static void
-_dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
+_dispatch_sync_invoke_and_complete(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
- _dispatch_non_barrier_complete(dq);
+ _dispatch_queue_non_barrier_complete(dq);
}
+/*
+ * For queues we can cheat and inline the unlock code, which is invalid
+ * for objects with a more complex state machine (sources or mach channels)
+ */
DISPATCH_NOINLINE
static void
-_dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+_dispatch_queue_barrier_sync_invoke_and_complete(dispatch_queue_t dq,
+ void *ctxt, dispatch_function_t func)
{
- _dispatch_sync_function_recurse(dq, ctxt, func, pp);
- _dispatch_non_barrier_complete(dq);
-}
+ _dispatch_sync_function_invoke_inline(dq, ctxt, func);
+ if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
+ return _dispatch_queue_barrier_complete(dq, 0, 0);
+ }
-DISPATCH_ALWAYS_INLINE
-static void
-_dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
-{
- _dispatch_introspection_non_barrier_sync_begin(dq, func);
- if (slowpath(dq->do_targetq->do_targetq)) {
- return _dispatch_non_barrier_sync_f_recurse(dq, ctxt, func, pp);
+ // Presence of any of these bits requires more work that only
+ // _dispatch_queue_barrier_complete() handles properly
+ //
+ // Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
+ // checking the role is sloppy, but is a super fast check, and neither of
+ // these bits should be set if the lock was never contended/discovered.
+ const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
+ DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
+ DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
+ DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
+ uint64_t old_state, new_state;
+
+ // similar to _dispatch_queue_drain_try_unlock
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
+ new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
+ new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
+ new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
+ if (unlikely(old_state & fail_unlock_mask)) {
+ os_atomic_rmw_loop_give_up({
+ return _dispatch_queue_barrier_complete(dq, 0, 0);
+ });
+ }
+ });
+ if (_dq_state_is_base_wlh(old_state)) {
+ _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
- _dispatch_non_barrier_sync_f_invoke(dq, ctxt, func);
}
#pragma mark -
-#pragma mark dispatch_barrier_sync
+#pragma mark _dispatch_sync_wait / _dispatch_sync_waiter_wake
+
+#define DISPATCH_SYNC_WAITER_NO_UNLOCK (~0ull)
DISPATCH_NOINLINE
static void
-_dispatch_barrier_complete(dispatch_queue_t dq)
+_dispatch_sync_waiter_wake(dispatch_sync_context_t dsc,
+ dispatch_wlh_t wlh, uint64_t old_state, uint64_t new_state)
{
- uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
- dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
+ dispatch_wlh_t waiter_wlh = dsc->dc_data;
- if (slowpath(dq->dq_items_tail)) {
- return _dispatch_try_lock_transfer_or_wakeup(dq);
+ if (_dq_state_in_sync_transfer(old_state) ||
+ _dq_state_in_sync_transfer(new_state) ||
+ (waiter_wlh != DISPATCH_WLH_ANON)) {
+ _dispatch_event_loop_wake_owner(dsc, wlh, old_state, new_state);
}
-
- if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
- // someone enqueued a slow item at the head
- // looping may be its last chance
- return _dispatch_try_lock_transfer_or_wakeup(dq);
+ if (waiter_wlh == DISPATCH_WLH_ANON) {
+ if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
+ _dispatch_wqthread_override_start(dsc->dsc_waiter,
+ dsc->dsc_override_qos);
+ }
+ _dispatch_thread_event_signal(&dsc->dsc_event);
}
+ _dispatch_introspection_queue_item_complete(dsc->_as_dc);
}
DISPATCH_NOINLINE
static void
-_dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+_dispatch_sync_waiter_redirect_or_wake(dispatch_queue_t dq, uint64_t owned,
+ dispatch_object_t dou)
{
- _dispatch_sync_function_recurse(dq, ctxt, func, pp);
- _dispatch_barrier_complete(dq);
-}
+ dispatch_sync_context_t dsc = (dispatch_sync_context_t)dou._dc;
+ uint64_t next_owner = 0, old_state, new_state;
+ dispatch_wlh_t wlh = DISPATCH_WLH_ANON;
-DISPATCH_NOINLINE
-static void
-_dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func)
-{
- _dispatch_sync_function_invoke_inline(dq, ctxt, func);
- _dispatch_barrier_complete(dq);
-}
+ _dispatch_trace_continuation_pop(dq, dsc->_as_dc);
-DISPATCH_ALWAYS_INLINE
-static void
-_dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
-{
- _dispatch_introspection_barrier_sync_begin(dq, func);
- if (slowpath(dq->do_targetq->do_targetq)) {
- return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
+ if (owned == DISPATCH_SYNC_WAITER_NO_UNLOCK) {
+ dispatch_assert(!(dsc->dc_flags & DISPATCH_OBJ_BARRIER_BIT));
+ new_state = old_state = os_atomic_load2o(dq, dq_state, relaxed);
+ } else {
+ if (dsc->dc_flags & DISPATCH_OBJ_BARRIER_BIT) {
+ next_owner = _dispatch_lock_value_from_tid(dsc->dsc_waiter);
+ }
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
+ new_state = old_state - owned;
+ new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
+ new_state &= ~DISPATCH_QUEUE_DIRTY;
+ new_state |= next_owner;
+ if (_dq_state_is_base_wlh(old_state)) {
+ new_state |= DISPATCH_QUEUE_SYNC_TRANSFER;
+ }
+ });
+ if (_dq_state_is_base_wlh(old_state)) {
+ wlh = (dispatch_wlh_t)dq;
+ } else if (_dq_state_received_override(old_state)) {
+ // Ensure that the root queue sees that this thread was overridden.
+ _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
+ }
+ }
+
+ if (dsc->dc_data == DISPATCH_WLH_ANON) {
+ if (dsc->dsc_override_qos < _dq_state_max_qos(old_state)) {
+ dsc->dsc_override_qos = _dq_state_max_qos(old_state);
+ }
+ }
+
+ if (unlikely(_dq_state_is_inner_queue(old_state))) {
+ dispatch_queue_t tq = dq->do_targetq;
+ if (likely(tq->dq_width == 1)) {
+ dsc->dc_flags = DISPATCH_OBJ_BARRIER_BIT |
+ DISPATCH_OBJ_SYNC_WAITER_BIT;
+ } else {
+ dsc->dc_flags = DISPATCH_OBJ_SYNC_WAITER_BIT;
+ }
+ _dispatch_introspection_queue_item_complete(dsc->_as_dc);
+ return _dispatch_queue_push_sync_waiter(tq, dsc, 0);
}
- _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
-}
-typedef struct dispatch_barrier_sync_context_s {
- struct dispatch_continuation_s dbsc_dc;
- dispatch_thread_frame_s dbsc_dtf;
-} *dispatch_barrier_sync_context_t;
+ return _dispatch_sync_waiter_wake(dsc, wlh, old_state, new_state);
+}
+DISPATCH_NOINLINE
static void
-_dispatch_barrier_sync_f_slow_invoke(void *ctxt)
+_dispatch_queue_class_barrier_complete(dispatch_queue_t dq, dispatch_qos_t qos,
+ dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
+ uint64_t owned)
{
- dispatch_barrier_sync_context_t dbsc = ctxt;
- dispatch_continuation_t dc = &dbsc->dbsc_dc;
- dispatch_queue_t dq = dc->dc_data;
- dispatch_thread_event_t event = (dispatch_thread_event_t)dc->dc_other;
+ uint64_t old_state, new_state, enqueue;
+ dispatch_queue_t tq;
- dispatch_assert(dq == _dispatch_queue_get_current());
-#if DISPATCH_COCOA_COMPAT
- if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
- dispatch_assert(_dispatch_thread_frame_get_current() == NULL);
+ if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
+ tq = &_dispatch_mgr_q;
+ enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
+ } else if (target) {
+ tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
+ enqueue = DISPATCH_QUEUE_ENQUEUED;
+ } else {
+ tq = NULL;
+ enqueue = 0;
+ }
- // the block runs on the thread the queue is bound to and not
- // on the calling thread, but we mean to see the calling thread
- // dispatch thread frames, so we fake the link, and then undo it
- _dispatch_thread_frame_set_current(&dbsc->dbsc_dtf);
- // The queue is bound to a non-dispatch thread (e.g. main thread)
- _dispatch_continuation_voucher_adopt(dc, DISPATCH_NO_VOUCHER,
- DISPATCH_OBJ_CONSUME_BIT);
- _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
- os_atomic_store2o(dc, dc_func, NULL, release);
- _dispatch_thread_frame_set_current(NULL);
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
+ new_state = _dq_state_merge_qos(old_state - owned, qos);
+ new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
+ if (unlikely(_dq_state_is_suspended(old_state))) {
+ if (likely(_dq_state_is_base_wlh(old_state))) {
+ new_state &= ~DISPATCH_QUEUE_ENQUEUED;
+ }
+ } else if (enqueue) {
+ if (!_dq_state_is_enqueued(old_state)) {
+ new_state |= enqueue;
+ }
+ } else if (unlikely(_dq_state_is_dirty(old_state))) {
+ os_atomic_rmw_loop_give_up({
+ // just renew the drain lock with an acquire barrier, to see
+ // what the enqueuer that set DIRTY has done.
+ // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
+ // is already in a register
+ os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
+ flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
+ return dx_wakeup(dq, qos, flags);
+ });
+ } else if (likely(_dq_state_is_base_wlh(old_state))) {
+ new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
+ new_state &= ~DISPATCH_QUEUE_ENQUEUED;
+ } else {
+ new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
+ }
+ });
+ old_state -= owned;
+ dispatch_assert(_dq_state_drain_locked_by_self(old_state));
+ dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state));
+
+
+ if (_dq_state_received_override(old_state)) {
+ // Ensure that the root queue sees that this thread was overridden.
+ _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
}
+
+ if (tq) {
+ if (likely((old_state ^ new_state) & enqueue)) {
+ dispatch_assert(_dq_state_is_enqueued(new_state));
+ dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
+ return _dispatch_queue_push_queue(tq, dq, new_state);
+ }
+#if HAVE_PTHREAD_WORKQUEUE_QOS
+ // <rdar://problem/27694093> when doing sync to async handoff
+ // if the queue received an override we have to forecefully redrive
+ // the same override so that a new stealer is enqueued because
+ // the previous one may be gone already
+ if (_dq_state_should_override(new_state)) {
+ return _dispatch_queue_class_wakeup_with_override(dq, new_state,
+ flags);
+ }
#endif
- _dispatch_thread_event_signal(event); // release
+ }
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
+ }
}
DISPATCH_NOINLINE
static void
-_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+_dispatch_queue_barrier_complete(dispatch_queue_t dq, dispatch_qos_t qos,
+ dispatch_wakeup_flags_t flags)
{
- if (slowpath(!dq->do_targetq)) {
- // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
- return _dispatch_sync_function_invoke(dq, ctxt, func);
- }
+ dispatch_continuation_t dc_tmp, dc_start = NULL, dc_end = NULL;
+ dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
+ struct dispatch_object_s *dc = NULL;
+ uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
+ dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
+ size_t count = 0;
- if (!pp) {
- pp = _dispatch_get_priority();
- pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
- pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
- }
- dispatch_thread_event_s event;
- _dispatch_thread_event_init(&event);
- struct dispatch_barrier_sync_context_s dbsc = {
- .dbsc_dc = {
- .dc_data = dq,
-#if DISPATCH_COCOA_COMPAT
- .dc_func = func,
- .dc_ctxt = ctxt,
-#endif
- .dc_other = &event,
+ dispatch_assert(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE);
+
+ if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
+ dc = _dispatch_queue_head(dq);
+ if (!_dispatch_object_is_sync_waiter(dc)) {
+ // not a slow item, needs to wake up
+ } else if (likely(dq->dq_width == 1) ||
+ _dispatch_object_is_barrier(dc)) {
+ // rdar://problem/8290662 "barrier/writer lock transfer"
+ dc_start = dc_end = (dispatch_continuation_t)dc;
+ owned = 0;
+ count = 1;
+ dc = _dispatch_queue_next(dq, dc);
+ } else {
+ // <rdar://problem/10164594> "reader lock transfer"
+ // we must not wake waiters immediately because our right
+ // for dequeuing is granted through holding the full "barrier" width
+ // which a signaled work item could relinquish out from our feet
+ dc_start = (dispatch_continuation_t)dc;
+ do {
+ // no check on width here because concurrent queues
+ // do not respect width for blocked readers, the thread
+ // is already spent anyway
+ dc_end = (dispatch_continuation_t)dc;
+ owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
+ count++;
+ dc = _dispatch_queue_next(dq, dc);
+ } while (dc && _dispatch_object_is_sync_waiter_non_barrier(dc));
}
- };
-#if DISPATCH_COCOA_COMPAT
- // It's preferred to execute synchronous blocks on the current thread
- // due to thread-local side effects, etc. However, blocks submitted
- // to the main thread MUST be run on the main thread
- if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
- // consumed by _dispatch_barrier_sync_f_slow_invoke
- // or in the DISPATCH_COCOA_COMPAT hunk below
- _dispatch_continuation_voucher_set(&dbsc.dbsc_dc, dq, 0);
- // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
- _dispatch_thread_frame_save_state(&dbsc.dbsc_dtf);
- // thread bound queues cannot mutate their target queue hierarchy
- // so it's fine to look now
- _dispatch_introspection_barrier_sync_begin(dq, func);
- }
-#endif
- uint32_t th_self = _dispatch_tid_self();
- struct dispatch_continuation_s dbss = {
- .dc_flags = DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT,
- .dc_func = _dispatch_barrier_sync_f_slow_invoke,
- .dc_ctxt = &dbsc,
- .dc_data = (void*)(uintptr_t)th_self,
- .dc_priority = pp,
- .dc_other = &event,
- .dc_voucher = DISPATCH_NO_VOUCHER,
- };
- uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
- DISPATCH_CLIENT_CRASH(dq, "dispatch_barrier_sync called on queue "
- "already owned by current thread");
+ if (count) {
+ do {
+ dc_tmp = dc_start;
+ dc_start = dc_start->do_next;
+ _dispatch_sync_waiter_redirect_or_wake(dq, owned, dc_tmp);
+ owned = DISPATCH_SYNC_WAITER_NO_UNLOCK;
+ } while (dc_tmp != dc_end);
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
+ }
+ return;
+ }
+ if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
+ _dispatch_retain_2(dq);
+ flags |= DISPATCH_WAKEUP_CONSUME_2;
+ }
+ target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
- _dispatch_continuation_push_sync_slow(dq, &dbss);
- _dispatch_thread_event_wait(&event); // acquire
- _dispatch_thread_event_destroy(&event);
- if (_dispatch_queue_received_override(dq, pp)) {
- // Ensure that the root queue sees that this thread was overridden.
- // pairs with the _dispatch_wqthread_override_start in
- // _dispatch_continuation_slow_item_signal
- _dispatch_set_defaultpriority_override();
- }
+ return _dispatch_queue_class_barrier_complete(dq, qos, flags, target,owned);
+}
#if DISPATCH_COCOA_COMPAT
- // Queue bound to a non-dispatch thread
- if (dbsc.dbsc_dc.dc_func == NULL) {
- return;
- } else if (dbsc.dbsc_dc.dc_voucher) {
- // this almost never happens, unless a dispatch_sync() onto a thread
- // bound queue went to the slow path at the same time dispatch_main()
- // is called, or the queue is detached from the runloop.
- _voucher_release(dbsc.dbsc_dc.dc_voucher);
- }
-#endif
+static void
+_dispatch_sync_thread_bound_invoke(void *ctxt)
+{
+ dispatch_sync_context_t dsc = ctxt;
+ dispatch_queue_t cq = _dispatch_queue_get_current();
+ dispatch_queue_t orig_dq = dsc->dc_other;
+ dispatch_thread_frame_s dtf;
+ dispatch_assert(_dispatch_queue_is_thread_bound(cq));
+
+ // the block runs on the thread the queue is bound to and not
+ // on the calling thread, but we mean to see the calling thread
+ // dispatch thread frames, so we fake the link, and then undo it
+ _dispatch_thread_frame_push_and_rebase(&dtf, orig_dq, &dsc->dsc_dtf);
+ _dispatch_client_callout(dsc->dsc_ctxt, dsc->dsc_func);
+ _dispatch_thread_frame_pop(&dtf);
- _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
+ // communicate back to _dispatch_sync_wait who the thread bound queue
+ // was so that we skip it during _dispatch_sync_complete_recurse
+ dsc->dc_other = cq;
+ dsc->dsc_func = NULL;
+ _dispatch_thread_event_signal(&dsc->dsc_event); // release
}
+#endif
DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_barrier_sync_f2(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+static inline uint64_t
+_dispatch_sync_wait_prepare(dispatch_queue_t dq)
{
- if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
- // global concurrent queues and queues bound to non-dispatch threads
- // always fall into the slow case
- return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
- }
- //
- // TODO: the more correct thing to do would be to set dq_override to the qos
- // of the thread that just acquired the barrier lock here. Unwinding that
- // would slow down the uncontended fastpath however.
- //
- // The chosen tradeoff is that if an enqueue on a lower priority thread
- // contends with this fastpath, this thread may receive a useless override.
- // Improving this requires the override level to be part of the atomic
- // dq_state
- //
- _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
+ uint64_t old_state, new_state;
+
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ if (_dq_state_is_suspended(old_state) ||
+ !_dq_state_is_base_wlh(old_state)) {
+ os_atomic_rmw_loop_give_up(return old_state);
+ }
+ if (!_dq_state_drain_locked(old_state) ||
+ _dq_state_in_sync_transfer(old_state)) {
+ os_atomic_rmw_loop_give_up(return old_state);
+ }
+ new_state = old_state | DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
+ });
+ return new_state;
}
-DISPATCH_NOINLINE
static void
-_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func, pthread_priority_t pp)
+_dispatch_sync_waiter_compute_wlh(dispatch_queue_t dq,
+ dispatch_sync_context_t dsc)
{
- _dispatch_barrier_sync_f2(dq, ctxt, func, pp);
-}
+ bool needs_locking = _dispatch_queue_is_legacy(dq);
-DISPATCH_NOINLINE
-void
-dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func)
-{
- _dispatch_barrier_sync_f2(dq, ctxt, func, 0);
+ if (needs_locking) {
+ dsc->dsc_release_storage = true;
+ _dispatch_queue_sidelock_lock(dq);
+ }
+
+ dispatch_queue_t tq = dq->do_targetq;
+ uint64_t dq_state = _dispatch_sync_wait_prepare(tq);
+
+ if (_dq_state_is_suspended(dq_state) ||
+ _dq_state_is_base_anon(dq_state)) {
+ dsc->dsc_release_storage = false;
+ dsc->dc_data = DISPATCH_WLH_ANON;
+ } else if (_dq_state_is_base_wlh(dq_state)) {
+ if (dsc->dsc_release_storage) {
+ _dispatch_queue_retain_storage(tq);
+ }
+ dsc->dc_data = (dispatch_wlh_t)tq;
+ } else {
+ _dispatch_sync_waiter_compute_wlh(tq, dsc);
+ }
+ if (needs_locking) _dispatch_queue_sidelock_unlock(dq);
}
-#ifdef __BLOCKS__
DISPATCH_NOINLINE
static void
-_dispatch_sync_block_with_private_data(dispatch_queue_t dq,
- void (^work)(void), dispatch_block_flags_t flags)
+_dispatch_sync_wait(dispatch_queue_t top_dq, void *ctxt,
+ dispatch_function_t func, uintptr_t top_dc_flags,
+ dispatch_queue_t dq, uintptr_t dc_flags)
{
- pthread_priority_t pp = _dispatch_block_get_priority(work);
+ pthread_priority_t pp = _dispatch_get_priority();
+ dispatch_tid tid = _dispatch_tid_self();
+ dispatch_qos_t qos;
+ uint64_t dq_state;
- flags |= _dispatch_block_get_flags(work);
- if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
- pthread_priority_t tp = _dispatch_get_priority();
- tp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
- if (pp < tp) {
- pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
- } else if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
- pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
- }
+ dq_state = _dispatch_sync_wait_prepare(dq);
+ if (unlikely(_dq_state_drain_locked_by(dq_state, tid))) {
+ DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
+ "dispatch_sync called on queue "
+ "already owned by current thread");
}
- // balanced in d_block_sync_invoke or d_block_wait
- if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work),
- dbpd_queue, NULL, dq->_as_oq, relaxed)) {
- _dispatch_retain(dq);
+
+ struct dispatch_sync_context_s dsc = {
+ .dc_flags = dc_flags | DISPATCH_OBJ_SYNC_WAITER_BIT,
+ .dc_other = top_dq,
+ .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
+ .dc_voucher = DISPATCH_NO_VOUCHER,
+ .dsc_func = func,
+ .dsc_ctxt = ctxt,
+ .dsc_waiter = tid,
+ };
+ if (_dq_state_is_suspended(dq_state) ||
+ _dq_state_is_base_anon(dq_state)) {
+ dsc.dc_data = DISPATCH_WLH_ANON;
+ } else if (_dq_state_is_base_wlh(dq_state)) {
+ dsc.dc_data = (dispatch_wlh_t)dq;
+ } else {
+ _dispatch_sync_waiter_compute_wlh(dq, &dsc);
}
- if (flags & DISPATCH_BLOCK_BARRIER) {
- _dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
+#if DISPATCH_COCOA_COMPAT
+ // It's preferred to execute synchronous blocks on the current thread
+ // due to thread-local side effects, etc. However, blocks submitted
+ // to the main thread MUST be run on the main thread
+ //
+ // Since we don't know whether that will happen, save the frame linkage
+ // for the sake of _dispatch_sync_thread_bound_invoke
+ _dispatch_thread_frame_save_state(&dsc.dsc_dtf);
+
+ // Since the continuation doesn't have the CONSUME bit, the voucher will be
+ // retained on adoption on the thread bound queue if it happens so we can
+ // borrow this thread's reference
+ dsc.dc_voucher = _voucher_get();
+ dsc.dc_func = _dispatch_sync_thread_bound_invoke;
+ dsc.dc_ctxt = &dsc;
+#endif
+
+ if (dsc.dc_data == DISPATCH_WLH_ANON) {
+ dsc.dsc_override_qos_floor = dsc.dsc_override_qos =
+ _dispatch_get_basepri_override_qos_floor();
+ qos = _dispatch_qos_from_pp(pp);
+ _dispatch_thread_event_init(&dsc.dsc_event);
} else {
- _dispatch_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
+ qos = 0;
+ }
+ _dispatch_queue_push_sync_waiter(dq, &dsc, qos);
+ if (dsc.dc_data == DISPATCH_WLH_ANON) {
+ _dispatch_thread_event_wait(&dsc.dsc_event); // acquire
+ _dispatch_thread_event_destroy(&dsc.dsc_event);
+ // If _dispatch_sync_waiter_wake() gave this thread an override,
+ // ensure that the root queue sees it.
+ if (dsc.dsc_override_qos > dsc.dsc_override_qos_floor) {
+ _dispatch_set_basepri_override_qos(dsc.dsc_override_qos);
+ }
+ } else {
+ _dispatch_event_loop_wait_for_ownership(&dsc);
}
-}
-
-void
-dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
-{
- if (slowpath(_dispatch_block_has_private_data(work))) {
- dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
- return _dispatch_sync_block_with_private_data(dq, work, flags);
+ _dispatch_introspection_sync_begin(top_dq);
+#if DISPATCH_COCOA_COMPAT
+ if (unlikely(dsc.dsc_func == NULL)) {
+ // Queue bound to a non-dispatch thread, the continuation already ran
+ // so just unlock all the things, except for the thread bound queue
+ dispatch_queue_t bound_dq = dsc.dc_other;
+ return _dispatch_sync_complete_recurse(top_dq, bound_dq, top_dc_flags);
}
- dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
-}
#endif
+ _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
+}
DISPATCH_NOINLINE
-void
-_dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t func)
+static void
+_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func, uintptr_t dc_flags)
{
- // Use for mutation of queue-/source-internal state only, ignores target
- // queue hierarchy!
- if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq))) {
- return _dispatch_barrier_async_detached_f(dq, ctxt, func);
+ if (unlikely(!dq->do_targetq)) {
+ return _dispatch_sync_function_invoke(dq, ctxt, func);
}
- // skip the recursion because it's about the queue state only
- _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
+ _dispatch_sync_wait(dq, ctxt, func, dc_flags, dq, dc_flags);
}
#pragma mark -
-#pragma mark dispatch_sync
+#pragma mark dispatch_sync / dispatch_barrier_sync
DISPATCH_NOINLINE
static void
-_dispatch_non_barrier_complete(dispatch_queue_t dq)
+_dispatch_sync_recurse(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func, uintptr_t dc_flags)
{
- uint64_t old_state, new_state;
+ dispatch_tid tid = _dispatch_tid_self();
+ dispatch_queue_t tq = dq->do_targetq;
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
- new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
- if (_dq_state_is_runnable(new_state)) {
- if (!_dq_state_is_runnable(old_state)) {
- // we're making a FULL -> non FULL transition
- new_state |= DISPATCH_QUEUE_DIRTY;
+ do {
+ if (likely(tq->dq_width == 1)) {
+ if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
+ return _dispatch_sync_wait(dq, ctxt, func, dc_flags, tq,
+ DISPATCH_OBJ_BARRIER_BIT);
}
- if (!_dq_state_drain_locked(new_state)) {
- uint64_t full_width = new_state;
- if (_dq_state_has_pending_barrier(new_state)) {
- full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
- full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
- full_width += DISPATCH_QUEUE_IN_BARRIER;
- } else {
- full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
- full_width += DISPATCH_QUEUE_IN_BARRIER;
- }
- if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
- DISPATCH_QUEUE_WIDTH_FULL_BIT) {
- new_state = full_width;
- new_state &= ~DISPATCH_QUEUE_DIRTY;
- new_state |= _dispatch_tid_self();
- }
+ } else {
+ if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq))) {
+ return _dispatch_sync_wait(dq, ctxt, func, dc_flags, tq, 0);
}
}
- });
+ tq = tq->do_targetq;
+ } while (unlikely(tq->do_targetq));
- if (_dq_state_is_in_barrier(new_state)) {
- return _dispatch_try_lock_transfer_or_wakeup(dq);
- }
- if (!_dq_state_is_runnable(old_state)) {
- _dispatch_queue_try_wakeup(dq, new_state,
- DISPATCH_WAKEUP_WAITER_HANDOFF);
- }
+ return _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags);
}
DISPATCH_NOINLINE
-static void
-_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
- pthread_priority_t pp)
-{
- dispatch_assert(dq->do_targetq);
- if (!pp) {
- pp = _dispatch_get_priority();
- pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
- pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
- }
- dispatch_thread_event_s event;
- _dispatch_thread_event_init(&event);
- uint32_t th_self = _dispatch_tid_self();
- struct dispatch_continuation_s dc = {
- .dc_flags = DISPATCH_OBJ_SYNC_SLOW_BIT,
-#if DISPATCH_INTROSPECTION
- .dc_func = func,
- .dc_ctxt = ctxt,
-#endif
- .dc_data = (void*)(uintptr_t)th_self,
- .dc_other = &event,
- .dc_priority = pp,
- .dc_voucher = DISPATCH_NO_VOUCHER,
- };
+void
+dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func)
+{
+ dispatch_tid tid = _dispatch_tid_self();
- uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
- if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
- DISPATCH_CLIENT_CRASH(dq, "dispatch_sync called on queue "
- "already owned by current thread");
+ // The more correct thing to do would be to merge the qos of the thread
+ // that just acquired the barrier lock into the queue state.
+ //
+ // However this is too expensive for the fastpath, so skip doing it.
+ // The chosen tradeoff is that if an enqueue on a lower priority thread
+ // contends with this fastpath, this thread may receive a useless override.
+ //
+ // Global concurrent queues and queues bound to non-dispatch threads
+ // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
+ if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
+ return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
}
- _dispatch_continuation_push_sync_slow(dq, &dc);
- _dispatch_thread_event_wait(&event); // acquire
- _dispatch_thread_event_destroy(&event);
- if (_dispatch_queue_received_override(dq, pp)) {
- // Ensure that the root queue sees that this thread was overridden.
- // pairs with the _dispatch_wqthread_override_start in
- // _dispatch_continuation_slow_item_signal
- _dispatch_set_defaultpriority_override();
+ _dispatch_introspection_sync_begin(dq);
+ if (unlikely(dq->do_targetq->do_targetq)) {
+ return _dispatch_sync_recurse(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
}
- _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
+ _dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func);
}
-DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
- pthread_priority_t pp)
+DISPATCH_NOINLINE
+void
+dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
- // <rdar://problem/24738102&24743140> reserving non barrier width
- // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
- // equivalent), so we have to check that this thread hasn't enqueued
- // anything ahead of this call or we can break ordering
- if (slowpath(dq->dq_items_tail)) {
- return _dispatch_sync_f_slow(dq, ctxt, func, pp);
+ if (likely(dq->dq_width == 1)) {
+ return dispatch_barrier_sync_f(dq, ctxt, func);
}
- // concurrent queues do not respect width on sync
- if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
- return _dispatch_sync_f_slow(dq, ctxt, func, pp);
+
+ // Global concurrent queues and queues bound to non-dispatch threads
+ // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
+ if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
+ return _dispatch_sync_f_slow(dq, ctxt, func, 0);
+ }
+
+ _dispatch_introspection_sync_begin(dq);
+ if (unlikely(dq->do_targetq->do_targetq)) {
+ return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
- _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
+ _dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
+#ifdef __BLOCKS__
DISPATCH_NOINLINE
static void
-_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
- pthread_priority_t pp)
+_dispatch_sync_block_with_private_data(dispatch_queue_t dq,
+ dispatch_block_t work, dispatch_block_flags_t flags)
{
- if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
- return _dispatch_sync_f2(dq, ctxt, func, pp);
+ dispatch_block_private_data_t dbpd = _dispatch_block_get_data(work);
+ pthread_priority_t op = 0, p = 0;
+
+ flags |= dbpd->dbpd_flags;
+ op = _dispatch_block_invoke_should_set_priority(flags, dbpd->dbpd_priority);
+ if (op) {
+ p = dbpd->dbpd_priority;
+ }
+ voucher_t ov, v = DISPATCH_NO_VOUCHER;
+ if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
+ v = dbpd->dbpd_voucher;
+ }
+ ov = _dispatch_set_priority_and_voucher(p, v, 0);
+
+ // balanced in d_block_sync_invoke or d_block_wait
+ if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, dq->_as_oq, relaxed)) {
+ _dispatch_retain_2(dq);
}
- return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
+ if (flags & DISPATCH_BLOCK_BARRIER) {
+ dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke);
+ } else {
+ dispatch_sync_f(dq, work, _dispatch_block_sync_invoke);
+ }
+ _dispatch_reset_priority_and_voucher(op, ov);
}
-DISPATCH_NOINLINE
void
-dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
+dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
- if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
- return _dispatch_sync_f2(dq, ctxt, func, 0);
+ if (unlikely(_dispatch_block_has_private_data(work))) {
+ dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
+ return _dispatch_sync_block_with_private_data(dq, work, flags);
}
- return dispatch_barrier_sync_f(dq, ctxt, func);
+ dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
}
-#ifdef __BLOCKS__
void
-dispatch_sync(dispatch_queue_t dq, void (^work)(void))
+dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
- if (slowpath(_dispatch_block_has_private_data(work))) {
+ if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}
-#endif
+#endif // __BLOCKS__
#pragma mark -
#pragma mark dispatch_trysync
-struct trysync_context {
- dispatch_queue_t tc_dq;
- void *tc_ctxt;
- dispatch_function_t tc_func;
-};
-
DISPATCH_NOINLINE
-static int
-_dispatch_trysync_recurse(dispatch_queue_t dq,
- struct trysync_context *tc, bool barrier)
+static void
+_dispatch_barrier_trysync_or_async_f_complete(dispatch_queue_t dq,
+ void *ctxt, dispatch_function_t func, uint32_t flags)
{
- dispatch_queue_t tq = dq->do_targetq;
+ dispatch_wakeup_flags_t wflags = DISPATCH_WAKEUP_BARRIER_COMPLETE;
- if (barrier) {
- if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
- return EWOULDBLOCK;
- }
- } else {
- // <rdar://problem/24743140> check nothing was queued by the current
- // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
- // ignores the ENQUEUED bit which could cause it to miss a barrier_async
- // made by the same thread just before.
- if (slowpath(dq->dq_items_tail)) {
- return EWOULDBLOCK;
- }
- // concurrent queues do not respect width on sync
- if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
- return EWOULDBLOCK;
+ _dispatch_sync_function_invoke_inline(dq, ctxt, func);
+ if (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) {
+ uint64_t dq_state = os_atomic_sub2o(dq, dq_state,
+ DISPATCH_QUEUE_SUSPEND_INTERVAL, relaxed);
+ if (!_dq_state_is_suspended(dq_state)) {
+ wflags |= DISPATCH_WAKEUP_CONSUME_2;
}
}
-
- int rc = 0;
- if (_dispatch_queue_cannot_trysync(tq)) {
- _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
- rc = ENOTSUP;
- } else if (tq->do_targetq) {
- rc = _dispatch_trysync_recurse(tq, tc, tq->dq_width == 1);
- if (rc == ENOTSUP) {
- _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
- }
- } else {
- dispatch_thread_frame_s dtf;
- _dispatch_thread_frame_push(&dtf, tq);
- _dispatch_sync_function_invoke(tc->tc_dq, tc->tc_ctxt, tc->tc_func);
- _dispatch_thread_frame_pop(&dtf);
- }
- if (barrier) {
- _dispatch_barrier_complete(dq);
- } else {
- _dispatch_non_barrier_complete(dq);
- }
- return rc;
+ dx_wakeup(dq, 0, wflags);
}
+// Use for mutation of queue-/source-internal state only
+// ignores target queue hierarchy!
DISPATCH_NOINLINE
-bool
-_dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
- dispatch_function_t f)
+void
+_dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func, uint32_t flags)
{
- if (slowpath(!dq->do_targetq)) {
- _dispatch_sync_function_invoke(dq, ctxt, f);
- return true;
+ dispatch_tid tid = _dispatch_tid_self();
+ uint64_t suspend_count = (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) ? 1 : 0;
+ if (unlikely(!_dispatch_queue_try_acquire_barrier_sync_and_suspend(dq, tid,
+ suspend_count))) {
+ return _dispatch_barrier_async_detached_f(dq, ctxt, func);
}
- if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
- return false;
+ if (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) {
+ _dispatch_retain_2(dq); // see _dispatch_queue_suspend
}
- struct trysync_context tc = {
- .tc_dq = dq,
- .tc_func = f,
- .tc_ctxt = ctxt,
- };
- return _dispatch_trysync_recurse(dq, &tc, true) == 0;
+ _dispatch_barrier_trysync_or_async_f_complete(dq, ctxt, func, flags);
}
DISPATCH_NOINLINE
-bool
-_dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
-{
- if (slowpath(!dq->do_targetq)) {
- _dispatch_sync_function_invoke(dq, ctxt, f);
- return true;
- }
- if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
- return false;
- }
- struct trysync_context tc = {
- .tc_dq = dq,
- .tc_func = f,
- .tc_ctxt = ctxt,
- };
- return _dispatch_trysync_recurse(dq, &tc, dq->dq_width == 1) == 0;
-}
-
-#pragma mark -
-#pragma mark dispatch_after
-
-DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
- void *ctxt, void *handler, bool block)
+static long
+_dispatch_trysync_recurse(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t f, uintptr_t dc_flags)
{
- dispatch_source_t ds;
- uint64_t leeway, delta;
+ dispatch_tid tid = _dispatch_tid_self();
+ dispatch_queue_t q, tq = dq->do_targetq;
- if (when == DISPATCH_TIME_FOREVER) {
-#if DISPATCH_DEBUG
- DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
-#endif
- return;
- }
-
- delta = _dispatch_timeout(when);
- if (delta == 0) {
- if (block) {
- return dispatch_async(queue, handler);
+ for (;;) {
+ if (likely(tq->do_targetq == NULL)) {
+ _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, f, dc_flags);
+ return true;
+ }
+ if (unlikely(_dispatch_queue_cannot_trysync(tq))) {
+ for (q = dq; q != tq; q = q->do_targetq) {
+ _dispatch_queue_atomic_flags_set(q, DQF_CANNOT_TRYSYNC);
+ }
+ break;
+ }
+ if (likely(tq->dq_width == 1)) {
+ if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
+ break;
+ }
+ } else {
+ if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq))) {
+ break;
+ }
}
- return dispatch_async_f(queue, ctxt, handler);
+ tq = tq->do_targetq;
}
- leeway = delta / 10; // <rdar://problem/13447496>
- if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
- if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
-
- // this function can and should be optimized to not use a dispatch source
- ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
- dispatch_assert(ds);
-
- dispatch_continuation_t dc = _dispatch_continuation_alloc();
- if (block) {
- _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
- } else {
- _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
- }
- // reference `ds` so that it doesn't show up as a leak
- dc->dc_data = ds;
- _dispatch_source_set_event_handler_continuation(ds, dc);
- dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
- dispatch_activate(ds);
+ _dispatch_sync_complete_recurse(dq, tq, dc_flags);
+ return false;
}
DISPATCH_NOINLINE
-void
-dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
- dispatch_function_t func)
+long
+_dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t f)
{
- _dispatch_after(when, queue, ctxt, func, false);
+ dispatch_tid tid = _dispatch_tid_self();
+ if (unlikely(!dq->do_targetq)) {
+ DISPATCH_CLIENT_CRASH(dq, "_dispatch_trsync called on a root queue");
+ }
+ if (unlikely(_dispatch_queue_cannot_trysync(dq))) {
+ return false;
+ }
+ if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
+ return false;
+ }
+ return _dispatch_trysync_recurse(dq, ctxt, f, DISPATCH_OBJ_BARRIER_BIT);
}
-
-#ifdef __BLOCKS__
-void
-dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
- dispatch_block_t work)
+
+DISPATCH_NOINLINE
+long
+_dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
{
- _dispatch_after(when, queue, NULL, work, true);
+ if (likely(dq->dq_width == 1)) {
+ return _dispatch_barrier_trysync_f(dq, ctxt, f);
+ }
+ if (unlikely(!dq->do_targetq)) {
+ DISPATCH_CLIENT_CRASH(dq, "_dispatch_trsync called on a root queue");
+ }
+ if (unlikely(_dispatch_queue_cannot_trysync(dq))) {
+ return false;
+ }
+ if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
+ return false;
+ }
+ return _dispatch_trysync_recurse(dq, ctxt, f, 0);
}
-#endif
#pragma mark -
#pragma mark dispatch_queue_wakeup
DISPATCH_NOINLINE
void
-_dispatch_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
+_dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
+ if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
+ return _dispatch_queue_barrier_complete(dq, qos, flags);
+ }
if (_dispatch_queue_class_probe(dq)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
- if (target) {
- return _dispatch_queue_class_wakeup(dq, pp, flags, target);
- } else if (pp) {
- return _dispatch_queue_class_override_drainer(dq, pp, flags);
- } else if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
- }
+ return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}
#if DISPATCH_COCOA_COMPAT
}
#endif // DISPATCH_COCOA_COMPAT
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_qos_t
+_dispatch_runloop_queue_reset_max_qos(dispatch_queue_class_t dqu)
+{
+ uint64_t old_state, clear_bits = DISPATCH_QUEUE_MAX_QOS_MASK |
+ DISPATCH_QUEUE_RECEIVED_OVERRIDE;
+ old_state = os_atomic_and_orig2o(dqu._dq, dq_state, ~clear_bits, relaxed);
+ return _dq_state_max_qos(old_state);
+}
+
void
-_dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
+_dispatch_runloop_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
#if DISPATCH_COCOA_COMPAT
if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) {
// <rdar://problem/14026816>
- return _dispatch_queue_wakeup(dq, pp, flags);
+ return _dispatch_queue_wakeup(dq, qos, flags);
}
+ if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
+ os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
+ }
if (_dispatch_queue_class_probe(dq)) {
- return _dispatch_runloop_queue_poke(dq, pp, flags);
+ return _dispatch_runloop_queue_poke(dq, qos, flags);
}
- pp = _dispatch_queue_reset_override_priority(dq, true);
- if (pp) {
+ qos = _dispatch_runloop_queue_reset_max_qos(dq);
+ if (qos) {
mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
if (_dispatch_queue_class_probe(dq)) {
- _dispatch_runloop_queue_poke(dq, pp, flags);
+ _dispatch_runloop_queue_poke(dq, qos, flags);
}
_dispatch_thread_override_end(owner, dq);
return;
}
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
}
#else
- return _dispatch_queue_wakeup(dq, pp, flags);
+ return _dispatch_queue_wakeup(dq, qos, flags);
#endif
}
void
-_dispatch_main_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
+_dispatch_main_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
#if DISPATCH_COCOA_COMPAT
if (_dispatch_queue_is_thread_bound(dq)) {
- return _dispatch_runloop_queue_wakeup(dq, pp, flags);
+ return _dispatch_runloop_queue_wakeup(dq, qos, flags);
}
#endif
- return _dispatch_queue_wakeup(dq, pp, flags);
-}
-
-void
-_dispatch_root_queue_wakeup(dispatch_queue_t dq,
- pthread_priority_t pp DISPATCH_UNUSED,
- dispatch_wakeup_flags_t flags)
-{
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- // see _dispatch_queue_push_set_head
- dispatch_assert(flags & DISPATCH_WAKEUP_FLUSH);
- }
- _dispatch_global_queue_poke(dq);
+ return _dispatch_queue_wakeup(dq, qos, flags);
}
#pragma mark -
return;
}
-#if TARGET_OS_MAC
+#if HAVE_MACH
mach_port_t mp = handle;
kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
switch (kr) {
DISPATCH_NOINLINE
static void
-_dispatch_runloop_queue_poke(dispatch_queue_t dq,
- pthread_priority_t pp, dispatch_wakeup_flags_t flags)
+_dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos,
+ dispatch_wakeup_flags_t flags)
{
- // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
- // a release barrier and that when runloop queues stop being thread bound
+ // it's not useful to handle WAKEUP_MAKE_DIRTY because mach_msg() will have
+ // a release barrier and that when runloop queues stop being thread-bound
// they have a non optional wake-up to start being a "normal" queue
// either in _dispatch_runloop_queue_xref_dispose,
// or in _dispatch_queue_cleanup2() for the main thread.
+ uint64_t old_state, new_state;
if (dq == &_dispatch_main_q) {
dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
_dispatch_runloop_queue_handle_init);
}
- _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
- if (flags & DISPATCH_WAKEUP_OVERRIDING) {
- mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
+
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ new_state = _dq_state_merge_qos(old_state, qos);
+ if (old_state == new_state) {
+ os_atomic_rmw_loop_give_up(goto no_change);
+ }
+ });
+
+ dispatch_qos_t dq_qos = _dispatch_priority_qos(dq->dq_priority);
+ if (qos > dq_qos) {
+ mach_port_t owner = _dq_state_drain_owner(new_state);
+ pthread_priority_t pp = _dispatch_qos_to_pp(qos);
_dispatch_thread_override_start(owner, pp, dq);
- if (flags & DISPATCH_WAKEUP_WAS_OVERRIDDEN) {
+ if (_dq_state_max_qos(old_state) > dq_qos) {
_dispatch_thread_override_end(owner, dq);
}
}
+no_change:
_dispatch_runloop_queue_class_poke(dq);
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
}
}
#endif
DISPATCH_NOINLINE
static void
-_dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
+_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
{
dispatch_root_queue_context_t qc = dq->do_ctxt;
- uint32_t i = n;
- int r;
-
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
+ int remaining = n;
+ int r = ENOSYS;
+ _dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
-#if HAVE_PTHREAD_WORKQUEUES
+#if DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
if (qc->dgq_kworkqueue != (void*)(~0ul))
#endif
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
_dispatch_worker_thread4, dq, &wh, &gen_cnt);
(void)dispatch_assume_zero(r);
- } while (--i);
+ } while (--remaining);
return;
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
- if (!dq->dq_priority) {
- r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
- qc->dgq_wq_options, (int)i);
- (void)dispatch_assume_zero(r);
- return;
- }
-#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
- r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
- (void)dispatch_assume_zero(r);
+ r = _pthread_workqueue_addthreads(remaining,
+ _dispatch_priority_to_pp(dq->dq_priority));
+#elif DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+ r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
+ qc->dgq_wq_options, remaining);
#endif
+ (void)dispatch_assume_zero(r);
return;
}
-#endif // HAVE_PTHREAD_WORKQUEUES
+#endif // DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
- if (!--i) {
+ _dispatch_root_queue_debug("signaled sleeping worker for "
+ "global queue: %p", dq);
+ if (!--remaining) {
return;
}
}
}
- uint32_t j, t_count;
+
+ bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
+ if (overcommit) {
+ os_atomic_add2o(qc, dgq_pending, remaining, relaxed);
+ } else {
+ if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) {
+ _dispatch_root_queue_debug("worker thread request still pending for "
+ "global queue: %p", dq);
+ return;
+ }
+ }
+
+ int32_t can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
do {
- if (!t_count) {
+ can_request = t_count < floor ? 0 : t_count - floor;
+ if (remaining > can_request) {
+ _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
+ remaining, can_request);
+ os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed);
+ remaining = can_request;
+ }
+ if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
- j = i > t_count ? t_count : i;
} while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
- t_count - j, &t_count, acquire));
+ t_count - remaining, &t_count, acquire));
pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
}
#endif
do {
- _dispatch_retain(dq);
+ _dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
- } while (--j);
+ } while (--remaining);
#endif // DISPATCH_USE_PTHREAD_POOL
}
-static inline void
-_dispatch_global_queue_poke_n(dispatch_queue_t dq, unsigned int n)
+DISPATCH_NOINLINE
+void
+_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
{
if (!_dispatch_queue_class_probe(dq)) {
return;
}
-#if HAVE_PTHREAD_WORKQUEUES
+#if DISPATCH_USE_WORKQUEUES
dispatch_root_queue_context_t qc = dq->do_ctxt;
if (
#if DISPATCH_USE_PTHREAD_POOL
"global queue: %p", dq);
return;
}
-#endif // HAVE_PTHREAD_WORKQUEUES
- return _dispatch_global_queue_poke_slow(dq, n);
+#endif // DISPATCH_USE_WORKQUEUES
+ return _dispatch_global_queue_poke_slow(dq, n, floor);
}
-static inline void
-_dispatch_global_queue_poke(dispatch_queue_t dq)
+#pragma mark -
+#pragma mark dispatch_queue_drain
+
+void
+_dispatch_continuation_pop(dispatch_object_t dou, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags, dispatch_queue_t dq)
{
- return _dispatch_global_queue_poke_n(dq, 1);
+ _dispatch_continuation_pop_inline(dou, dic, flags, dq);
}
-DISPATCH_NOINLINE
void
-_dispatch_queue_push_list_slow(dispatch_queue_t dq, unsigned int n)
+_dispatch_continuation_invoke(dispatch_object_t dou, voucher_t ov,
+ dispatch_invoke_flags_t flags)
{
- return _dispatch_global_queue_poke_n(dq, n);
+ _dispatch_continuation_invoke_inline(dou, ov, flags);
}
-#pragma mark -
-#pragma mark dispatch_queue_drain
+DISPATCH_NOINLINE
+static void
+_dispatch_return_to_kernel(void)
+{
+ if (unlikely(_dispatch_get_wlh() == DISPATCH_WLH_ANON)) {
+ _dispatch_clear_return_to_kernel();
+ } else {
+ _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
+ }
+}
void
-_dispatch_continuation_pop(dispatch_object_t dou, dispatch_queue_t dq,
- dispatch_invoke_flags_t flags)
+_dispatch_poll_for_events_4launchd(void)
+{
+#if DISPATCH_USE_KEVENT_WORKQUEUE
+ if (_dispatch_get_wlh()) {
+ dispatch_assert(_dispatch_deferred_items_get()->ddi_wlh_servicing);
+ _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
+ }
+#endif
+}
+
+#if HAVE_PTHREAD_WORKQUEUE_NARROWING
+static os_atomic(uint64_t) _dispatch_narrowing_deadlines[DISPATCH_QOS_MAX];
+#if !DISPATCH_TIME_UNIT_USES_NANOSECONDS
+static uint64_t _dispatch_narrow_check_interval_cache;
+#endif
+
+DISPATCH_ALWAYS_INLINE
+static inline uint64_t
+_dispatch_narrow_check_interval(void)
{
- _dispatch_continuation_pop_inline(dou, dq, flags);
+#if DISPATCH_TIME_UNIT_USES_NANOSECONDS
+ return 50 * NSEC_PER_MSEC;
+#else
+ if (_dispatch_narrow_check_interval_cache == 0) {
+ _dispatch_narrow_check_interval_cache =
+ _dispatch_time_nano2mach(50 * NSEC_PER_MSEC);
+ }
+ return _dispatch_narrow_check_interval_cache;
+#endif
}
-void
-_dispatch_continuation_invoke(dispatch_object_t dou, voucher_t override_voucher,
- dispatch_invoke_flags_t flags)
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_queue_drain_init_narrowing_check_deadline(dispatch_invoke_context_t dic,
+ dispatch_priority_t pri)
+{
+ if (_dispatch_priority_qos(pri) &&
+ !(pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT)) {
+ dic->dic_next_narrow_check = _dispatch_approximate_time() +
+ _dispatch_narrow_check_interval();
+ }
+}
+
+DISPATCH_NOINLINE
+static bool
+_dispatch_queue_drain_should_narrow_slow(uint64_t now,
+ dispatch_invoke_context_t dic)
+{
+ if (dic->dic_next_narrow_check != DISPATCH_THREAD_IS_NARROWING) {
+ pthread_priority_t pp = _dispatch_get_priority();
+ dispatch_qos_t qos = _dispatch_qos_from_pp(pp);
+ if (unlikely(!qos || qos > countof(_dispatch_narrowing_deadlines))) {
+ DISPATCH_CLIENT_CRASH(pp, "Thread QoS corruption");
+ }
+ size_t idx = qos - 1; // no entry needed for DISPATCH_QOS_UNSPECIFIED
+ os_atomic(uint64_t) *deadline = &_dispatch_narrowing_deadlines[idx];
+ uint64_t oldval, newval = now + _dispatch_narrow_check_interval();
+
+ dic->dic_next_narrow_check = newval;
+ os_atomic_rmw_loop(deadline, oldval, newval, relaxed, {
+ if (now < oldval) {
+ os_atomic_rmw_loop_give_up(return false);
+ }
+ });
+
+ if (!_pthread_workqueue_should_narrow(pp)) {
+ return false;
+ }
+ dic->dic_next_narrow_check = DISPATCH_THREAD_IS_NARROWING;
+ }
+ return true;
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_queue_drain_should_narrow(dispatch_invoke_context_t dic)
{
- _dispatch_continuation_invoke_inline(dou, override_voucher, flags);
+ uint64_t next_check = dic->dic_next_narrow_check;
+ if (unlikely(next_check)) {
+ uint64_t now = _dispatch_approximate_time();
+ if (unlikely(next_check < now)) {
+ return _dispatch_queue_drain_should_narrow_slow(now, dic);
+ }
+ }
+ return false;
}
+#else
+#define _dispatch_queue_drain_init_narrowing_check_deadline(rq, dic) ((void)0)
+#define _dispatch_queue_drain_should_narrow(dic) false
+#endif
/*
* Drain comes in 2 flavours (serial/concurrent) and 2 modes
* queue drain moves to the more efficient serial mode.
*/
DISPATCH_ALWAYS_INLINE
-static dispatch_queue_t
-_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
- uint64_t *owned_ptr, struct dispatch_object_s **dc_out,
- bool serial_drain)
+static dispatch_queue_wakeup_target_t
+_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
dispatch_queue_t orig_tq = dq->do_targetq;
dispatch_thread_frame_s dtf;
struct dispatch_object_s *dc = NULL, *next_dc;
- uint64_t owned = *owned_ptr;
+ uint64_t dq_state, owned = *owned_ptr;
+
+ if (unlikely(!dq->dq_items_tail)) return NULL;
_dispatch_thread_frame_push(&dtf, dq);
- if (_dq_state_is_in_barrier(owned)) {
+ if (serial_drain || _dq_state_is_in_barrier(owned)) {
// we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
// but width can change while draining barrier work items, so we only
// convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
owned = DISPATCH_QUEUE_IN_BARRIER;
+ } else {
+ owned &= DISPATCH_QUEUE_WIDTH_MASK;
}
- while (dq->dq_items_tail) {
- dc = _dispatch_queue_head(dq);
- do {
- if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq))) {
- goto out;
+ dc = _dispatch_queue_head(dq);
+ goto first_iteration;
+
+ for (;;) {
+ dc = next_dc;
+ if (unlikely(dic->dic_deferred)) {
+ goto out_with_deferred_compute_owned;
+ }
+ if (unlikely(_dispatch_needs_to_return_to_kernel())) {
+ _dispatch_return_to_kernel();
+ }
+ if (unlikely(!dc)) {
+ if (!dq->dq_items_tail) {
+ break;
}
- if (unlikely(orig_tq != dq->do_targetq)) {
- goto out;
+ dc = _dispatch_queue_head(dq);
+ }
+ if (unlikely(serial_drain != (dq->dq_width == 1))) {
+ break;
+ }
+ if (unlikely(_dispatch_queue_drain_should_narrow(dic))) {
+ break;
+ }
+
+first_iteration:
+ dq_state = os_atomic_load(&dq->dq_state, relaxed);
+ if (unlikely(_dq_state_is_suspended(dq_state))) {
+ break;
+ }
+ if (unlikely(orig_tq != dq->do_targetq)) {
+ break;
+ }
+
+ if (serial_drain || _dispatch_object_is_barrier(dc)) {
+ if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
+ if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
+ goto out_with_no_width;
+ }
+ owned = DISPATCH_QUEUE_IN_BARRIER;
}
- if (unlikely(serial_drain != (dq->dq_width == 1))) {
- goto out;
+ next_dc = _dispatch_queue_next(dq, dc);
+ if (_dispatch_object_is_sync_waiter(dc)) {
+ owned = 0;
+ dic->dic_deferred = dc;
+ goto out_with_deferred;
}
- if (serial_drain || _dispatch_object_is_barrier(dc)) {
- if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
- goto out;
- }
- next_dc = _dispatch_queue_next(dq, dc);
- if (_dispatch_object_is_slow_item(dc)) {
- owned = 0;
- goto out_with_deferred;
- }
- } else {
- if (owned == DISPATCH_QUEUE_IN_BARRIER) {
- // we just ran barrier work items, we have to make their
- // effect visible to other sync work items on other threads
- // that may start coming in after this point, hence the
- // release barrier
- os_atomic_and2o(dq, dq_state, ~owned, release);
- owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
- } else if (unlikely(owned == 0)) {
- if (_dispatch_object_is_slow_item(dc)) {
- // sync "readers" don't observe the limit
- _dispatch_queue_reserve_sync_width(dq);
- } else if (!_dispatch_queue_try_acquire_async(dq)) {
- goto out_with_no_width;
- }
- owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
- }
-
- next_dc = _dispatch_queue_next(dq, dc);
- if (_dispatch_object_is_slow_item(dc)) {
- owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
- _dispatch_continuation_slow_item_signal(dq, dc);
- continue;
+ } else {
+ if (owned == DISPATCH_QUEUE_IN_BARRIER) {
+ // we just ran barrier work items, we have to make their
+ // effect visible to other sync work items on other threads
+ // that may start coming in after this point, hence the
+ // release barrier
+ os_atomic_xor2o(dq, dq_state, owned, release);
+ owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
+ } else if (unlikely(owned == 0)) {
+ if (_dispatch_object_is_sync_waiter(dc)) {
+ // sync "readers" don't observe the limit
+ _dispatch_queue_reserve_sync_width(dq);
+ } else if (!_dispatch_queue_try_acquire_async(dq)) {
+ goto out_with_no_width;
}
+ owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
+ }
- if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
- owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
- _dispatch_continuation_redirect(dq, dc);
- continue;
- }
+ next_dc = _dispatch_queue_next(dq, dc);
+ if (_dispatch_object_is_sync_waiter(dc)) {
+ owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
+ _dispatch_sync_waiter_redirect_or_wake(dq,
+ DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
+ continue;
}
- _dispatch_continuation_pop_inline(dc, dq, flags);
- _dispatch_perfmon_workitem_inc();
- if (unlikely(dtf.dtf_deferred)) {
- goto out_with_deferred_compute_owned;
+ if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
+ owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
+ _dispatch_continuation_redirect(dq, dc);
+ continue;
}
- } while ((dc = next_dc));
+ }
+
+ _dispatch_continuation_pop_inline(dc, dic, flags, dq);
}
-out:
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// if we're IN_BARRIER we really own the full width too
owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
if (dc) {
owned = _dispatch_queue_adjust_owned(dq, owned, dc);
}
- *owned_ptr = owned;
+ *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
+ *owned_ptr |= owned;
_dispatch_thread_frame_pop(&dtf);
return dc ? dq->do_targetq : NULL;
out_with_no_width:
- *owned_ptr = 0;
+ *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
_dispatch_thread_frame_pop(&dtf);
- return NULL;
+ return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
out_with_deferred_compute_owned:
if (serial_drain) {
// if we're IN_BARRIER we really own the full width too
owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
}
- if (next_dc) {
- owned = _dispatch_queue_adjust_owned(dq, owned, next_dc);
+ if (dc) {
+ owned = _dispatch_queue_adjust_owned(dq, owned, dc);
}
}
out_with_deferred:
- *owned_ptr = owned;
- if (unlikely(!dc_out)) {
+ *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
+ *owned_ptr |= owned;
+ if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) {
DISPATCH_INTERNAL_CRASH(dc,
"Deferred continuation on source, mach channel or mgr");
}
- *dc_out = dc;
_dispatch_thread_frame_pop(&dtf);
return dq->do_targetq;
}
DISPATCH_NOINLINE
-static dispatch_queue_t
+static dispatch_queue_wakeup_target_t
_dispatch_queue_concurrent_drain(dispatch_queue_t dq,
- dispatch_invoke_flags_t flags, uint64_t *owned,
- struct dispatch_object_s **dc_ptr)
+ dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
+ uint64_t *owned)
{
- return _dispatch_queue_drain(dq, flags, owned, dc_ptr, false);
+ return _dispatch_queue_drain(dq, dic, flags, owned, false);
}
DISPATCH_NOINLINE
-dispatch_queue_t
-_dispatch_queue_serial_drain(dispatch_queue_t dq,
- dispatch_invoke_flags_t flags, uint64_t *owned,
- struct dispatch_object_s **dc_ptr)
+dispatch_queue_wakeup_target_t
+_dispatch_queue_serial_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags, uint64_t *owned)
{
flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
- return _dispatch_queue_drain(dq, flags, owned, dc_ptr, true);
+ return _dispatch_queue_drain(dq, dic, flags, owned, true);
}
#if DISPATCH_COCOA_COMPAT
+DISPATCH_NOINLINE
+static void
+_dispatch_main_queue_update_priority_from_thread(void)
+{
+ dispatch_queue_t dq = &_dispatch_main_q;
+ uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
+ mach_port_t owner = _dq_state_drain_owner(dq_state);
+
+ dispatch_priority_t main_pri =
+ _dispatch_priority_from_pp_strip_flags(_dispatch_get_priority());
+ dispatch_qos_t main_qos = _dispatch_priority_qos(main_pri);
+ dispatch_qos_t max_qos = _dq_state_max_qos(dq_state);
+ dispatch_qos_t old_qos = _dispatch_priority_qos(dq->dq_priority);
+
+ // the main thread QoS was adjusted by someone else, learn the new QoS
+ // and reinitialize _dispatch_main_q.dq_priority
+ dq->dq_priority = _dispatch_priority_with_override_qos(main_pri, main_qos);
+
+ if (old_qos < max_qos && main_qos == DISPATCH_QOS_UNSPECIFIED) {
+ // main thread is opted out of QoS and we had an override
+ return _dispatch_thread_override_end(owner, dq);
+ }
+
+ if (old_qos < max_qos && max_qos <= main_qos) {
+ // main QoS was raised, and we had an override which is now useless
+ return _dispatch_thread_override_end(owner, dq);
+ }
+
+ if (main_qos < max_qos && max_qos <= old_qos) {
+ // main thread QoS was lowered, and we actually need an override
+ pthread_priority_t pp = _dispatch_qos_to_pp(max_qos);
+ return _dispatch_thread_override_start(owner, pp, dq);
+ }
+}
+
static void
_dispatch_main_queue_drain(void)
{
return;
}
+ _dispatch_perfmon_start_notrace();
if (!fastpath(_dispatch_queue_is_thread_bound(dq))) {
DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
" after dispatch_main()");
}
- mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
- if (slowpath(owner != _dispatch_tid_self())) {
- DISPATCH_CLIENT_CRASH(owner, "_dispatch_main_queue_callback_4CF called"
+ uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
+ if (unlikely(!_dq_state_drain_locked_by_self(dq_state))) {
+ DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
+ "_dispatch_main_queue_callback_4CF called"
" from the wrong thread");
}
dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
_dispatch_runloop_queue_handle_init);
- _dispatch_perfmon_start();
// <rdar://problem/23256682> hide the frame chaining when CFRunLoop
// drains the main runloop, as this should not be observable that way
+ _dispatch_adopt_wlh_anon();
_dispatch_thread_frame_push_and_rebase(&dtf, dq, NULL);
- pthread_priority_t old_pri = _dispatch_get_priority();
- pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
+ pthread_priority_t pp = _dispatch_get_priority();
+ dispatch_priority_t pri = _dispatch_priority_from_pp(pp);
+ dispatch_qos_t qos = _dispatch_priority_qos(pri);
voucher_t voucher = _voucher_copy();
+ if (unlikely(qos != _dispatch_priority_qos(dq->dq_priority))) {
+ _dispatch_main_queue_update_priority_from_thread();
+ }
+ dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
+ _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
+
+ dispatch_invoke_context_s dic = { };
struct dispatch_object_s *dc, *next_dc, *tail;
dc = os_mpsc_capture_snapshot(dq, dq_items, &tail);
do {
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
- _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
- _dispatch_perfmon_workitem_inc();
+ _dispatch_continuation_pop_inline(dc, &dic, DISPATCH_INVOKE_NONE, dq);
} while ((dc = next_dc));
- // runloop based queues use their port for the queue PUBLISH pattern
- // so this raw call to dx_wakeup(0) is valid
dx_wakeup(dq, 0, 0);
_dispatch_voucher_debug("main queue restore", voucher);
- _dispatch_reset_defaultpriority(old_dp);
- _dispatch_reset_priority_and_voucher(old_pri, voucher);
+ _dispatch_reset_basepri(old_dbp);
+ _dispatch_reset_basepri_override();
+ _dispatch_reset_priority_and_voucher(pp, voucher);
_dispatch_thread_frame_pop(&dtf);
- _dispatch_perfmon_end();
+ _dispatch_reset_wlh();
_dispatch_force_cache_cleanup();
+ _dispatch_perfmon_end_notrace();
}
static bool
if (!dq->dq_items_tail) {
return false;
}
+ _dispatch_perfmon_start_notrace();
dispatch_thread_frame_s dtf;
- _dispatch_perfmon_start();
+ bool should_reset_wlh = _dispatch_adopt_wlh_anon_recurse();
_dispatch_thread_frame_push(&dtf, dq);
- pthread_priority_t old_pri = _dispatch_get_priority();
- pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
+ pthread_priority_t pp = _dispatch_get_priority();
+ dispatch_priority_t pri = _dispatch_priority_from_pp(pp);
voucher_t voucher = _voucher_copy();
+ dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
+ _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
+ dispatch_invoke_context_s dic = { };
struct dispatch_object_s *dc, *next_dc;
dc = _dispatch_queue_head(dq);
next_dc = _dispatch_queue_next(dq, dc);
- _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
- _dispatch_perfmon_workitem_inc();
+ _dispatch_continuation_pop_inline(dc, &dic, DISPATCH_INVOKE_NONE, dq);
if (!next_dc) {
- // runloop based queues use their port for the queue PUBLISH pattern
- // so this raw call to dx_wakeup(0) is valid
dx_wakeup(dq, 0, 0);
}
_dispatch_voucher_debug("runloop queue restore", voucher);
- _dispatch_reset_defaultpriority(old_dp);
- _dispatch_reset_priority_and_voucher(old_pri, voucher);
+ _dispatch_reset_basepri(old_dbp);
+ _dispatch_reset_basepri_override();
+ _dispatch_reset_priority_and_voucher(pp, voucher);
_dispatch_thread_frame_pop(&dtf);
- _dispatch_perfmon_end();
+ if (should_reset_wlh) _dispatch_reset_wlh();
_dispatch_force_cache_cleanup();
+ _dispatch_perfmon_end_notrace();
return next_dc;
}
#endif
-DISPATCH_NOINLINE
-void
-_dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq)
-{
- dispatch_continuation_t dc_tmp, dc_start, dc_end;
- struct dispatch_object_s *dc = NULL;
- uint64_t dq_state, owned;
- size_t count = 0;
-
- owned = DISPATCH_QUEUE_IN_BARRIER;
- owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
-attempt_running_slow_head:
- if (slowpath(dq->dq_items_tail) && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
- dc = _dispatch_queue_head(dq);
- if (!_dispatch_object_is_slow_item(dc)) {
- // not a slow item, needs to wake up
- } else if (fastpath(dq->dq_width == 1) ||
- _dispatch_object_is_barrier(dc)) {
- // rdar://problem/8290662 "barrier/writer lock transfer"
- dc_start = dc_end = (dispatch_continuation_t)dc;
- owned = 0;
- count = 1;
- dc = _dispatch_queue_next(dq, dc);
- } else {
- // <rdar://problem/10164594> "reader lock transfer"
- // we must not signal semaphores immediately because our right
- // for dequeuing is granted through holding the full "barrier" width
- // which a signaled work item could relinquish out from our feet
- dc_start = (dispatch_continuation_t)dc;
- do {
- // no check on width here because concurrent queues
- // do not respect width for blocked readers, the thread
- // is already spent anyway
- dc_end = (dispatch_continuation_t)dc;
- owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
- count++;
- dc = _dispatch_queue_next(dq, dc);
- } while (dc && _dispatch_object_is_slow_non_barrier(dc));
- }
-
- if (count) {
- _dispatch_queue_drain_transfer_lock(dq, owned, dc_start);
- do {
- // signaled job will release the continuation
- dc_tmp = dc_start;
- dc_start = dc_start->do_next;
- _dispatch_continuation_slow_item_signal(dq, dc_tmp);
- } while (dc_tmp != dc_end);
- return;
- }
- }
-
- if (dc || dx_metatype(dq) != _DISPATCH_QUEUE_TYPE) {
- // <rdar://problem/23336992> the following wakeup is needed for sources
- // or mach channels: when ds_pending_data is set at the same time
- // as a trysync_f happens, lock transfer code above doesn't know about
- // ds_pending_data or the wakeup logic, but lock transfer is useless
- // for sources and mach channels in the first place.
- owned = _dispatch_queue_adjust_owned(dq, owned, dc);
- dq_state = _dispatch_queue_drain_unlock(dq, owned, NULL);
- return _dispatch_queue_try_wakeup(dq, dq_state,
- DISPATCH_WAKEUP_WAITER_HANDOFF);
- } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
- // someone enqueued a slow item at the head
- // looping may be its last chance
- goto attempt_running_slow_head;
- }
-}
-
void
_dispatch_mgr_queue_drain(void)
{
const dispatch_invoke_flags_t flags = DISPATCH_INVOKE_MANAGER_DRAIN;
+ dispatch_invoke_context_s dic = { };
dispatch_queue_t dq = &_dispatch_mgr_q;
uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
- if (dq->dq_items_tail) {
- _dispatch_perfmon_start();
- if (slowpath(_dispatch_queue_serial_drain(dq, flags, &owned, NULL))) {
- DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
- }
- _dispatch_voucher_debug("mgr queue clear", NULL);
- _voucher_clear();
- _dispatch_reset_defaultpriority_override();
- _dispatch_perfmon_end();
- }
-
-#if DISPATCH_USE_KEVENT_WORKQUEUE
- if (!_dispatch_kevent_workqueue_enabled)
-#endif
- {
- _dispatch_force_cache_cleanup();
- }
-}
-
-#pragma mark -
-#pragma mark dispatch_queue_invoke
-
-void
-_dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq,
- dispatch_invoke_flags_t flags, uint64_t to_unlock,
- struct dispatch_object_s *dc)
-{
- if (_dispatch_object_is_slow_item(dc)) {
- dispatch_assert(to_unlock == 0);
- _dispatch_queue_drain_transfer_lock(dq, to_unlock, dc);
- _dispatch_continuation_slow_item_signal(dq, dc);
- return _dispatch_release_tailcall(dq);
- }
-
- bool should_defer_again = false, should_pend_queue = true;
- uint64_t old_state, new_state;
-
- if (_dispatch_get_current_queue()->do_targetq) {
- _dispatch_thread_frame_get_current()->dtf_deferred = dc;
- should_defer_again = true;
- should_pend_queue = false;
- }
-
- if (dq->dq_width > 1) {
- should_pend_queue = false;
- } else if (should_pend_queue) {
- dispatch_assert(to_unlock ==
- DISPATCH_QUEUE_WIDTH_INTERVAL + DISPATCH_QUEUE_IN_BARRIER);
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
- new_state = old_state;
- if (_dq_state_has_waiters(old_state) ||
- _dq_state_is_enqueued(old_state)) {
- os_atomic_rmw_loop_give_up(break);
- }
- new_state += DISPATCH_QUEUE_DRAIN_PENDED;
- new_state -= DISPATCH_QUEUE_IN_BARRIER;
- new_state -= DISPATCH_QUEUE_WIDTH_INTERVAL;
- });
- should_pend_queue = (new_state & DISPATCH_QUEUE_DRAIN_PENDED);
- }
-
- if (!should_pend_queue) {
- if (to_unlock & DISPATCH_QUEUE_IN_BARRIER) {
- _dispatch_try_lock_transfer_or_wakeup(dq);
- _dispatch_release(dq);
- } else if (to_unlock) {
- uint64_t dq_state = _dispatch_queue_drain_unlock(dq, to_unlock, NULL);
- _dispatch_queue_try_wakeup(dq, dq_state, DISPATCH_WAKEUP_CONSUME);
- } else {
- _dispatch_release(dq);
+ if (dq->dq_items_tail) {
+ _dispatch_perfmon_start();
+ _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
+ if (slowpath(_dispatch_queue_serial_drain(dq, &dic, flags, &owned))) {
+ DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
}
- dq = NULL;
+ _dispatch_voucher_debug("mgr queue clear", NULL);
+ _voucher_clear();
+ _dispatch_reset_basepri_override();
+ _dispatch_perfmon_end(perfmon_thread_manager);
}
- if (!should_defer_again) {
- dx_invoke(dc, flags & _DISPATCH_INVOKE_PROPAGATE_MASK);
+#if DISPATCH_USE_KEVENT_WORKQUEUE
+ if (!_dispatch_kevent_workqueue_enabled)
+#endif
+ {
+ _dispatch_force_cache_cleanup();
}
+}
- if (dq) {
- uint32_t self = _dispatch_tid_self();
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
- new_state = old_state;
- if (!_dq_state_drain_pended(old_state) ||
- _dq_state_drain_owner(old_state) != self) {
- os_atomic_rmw_loop_give_up({
- // We may have been overridden, so inform the root queue
- _dispatch_set_defaultpriority_override();
- return _dispatch_release_tailcall(dq);
- });
- }
- new_state = DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state);
- });
- if (_dq_state_has_override(old_state)) {
- // Ensure that the root queue sees that this thread was overridden.
- _dispatch_set_defaultpriority_override();
- }
- return dx_invoke(dq, flags | DISPATCH_INVOKE_STEALING);
+#pragma mark -
+#pragma mark dispatch_queue_invoke
+
+void
+_dispatch_queue_drain_sync_waiter(dispatch_queue_t dq,
+ dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
+ uint64_t owned)
+{
+ struct dispatch_object_s *dc = dic->dic_deferred;
+ dispatch_assert(_dispatch_object_is_sync_waiter(dc));
+ dic->dic_deferred = NULL;
+ if (flags & DISPATCH_INVOKE_WLH) {
+ // Leave the enqueued bit in place, completion of the last sync waiter
+ // in the handoff chain is responsible for dequeuing
+ //
+ // We currently have a +2 to consume, but we need to keep a +1
+ // for the thread request
+ dispatch_assert(_dq_state_is_enqueued_on_target(owned));
+ dispatch_assert(!_dq_state_is_enqueued_on_manager(owned));
+ owned &= ~DISPATCH_QUEUE_ENQUEUED;
+ _dispatch_release_no_dispose(dq);
+ } else {
+ // The sync waiter must own a reference
+ _dispatch_release_2_no_dispose(dq);
}
+ return _dispatch_sync_waiter_redirect_or_wake(dq, owned, dc);
}
void
-_dispatch_queue_finalize_activation(dispatch_queue_t dq)
+_dispatch_queue_finalize_activation(dispatch_queue_t dq,
+ DISPATCH_UNUSED bool *allow_resume)
{
dispatch_queue_t tq = dq->do_targetq;
_dispatch_queue_priority_inherit_from_target(dq, tq);
- _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
- if (dq->dq_override_voucher == DISPATCH_NO_VOUCHER) {
- voucher_t v = tq->dq_override_voucher;
- if (v != DISPATCH_NO_VOUCHER) {
- if (v) _voucher_retain(v);
- dq->dq_override_voucher = v;
- }
- }
+ _dispatch_queue_inherit_wlh_from_target(dq, tq);
}
DISPATCH_ALWAYS_INLINE
-static inline dispatch_queue_t
-dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
- uint64_t *owned, struct dispatch_object_s **dc_ptr)
+static inline dispatch_queue_wakeup_target_t
+dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags, uint64_t *owned)
{
dispatch_queue_t otq = dq->do_targetq;
dispatch_queue_t cq = _dispatch_queue_get_current();
return otq;
}
if (dq->dq_width == 1) {
- return _dispatch_queue_serial_drain(dq, flags, owned, dc_ptr);
+ return _dispatch_queue_serial_drain(dq, dic, flags, owned);
}
- return _dispatch_queue_concurrent_drain(dq, flags, owned, dc_ptr);
+ return _dispatch_queue_concurrent_drain(dq, dic, flags, owned);
}
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
DISPATCH_NOINLINE
void
-_dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_flags_t flags)
+_dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags)
{
- _dispatch_queue_class_invoke(dq, flags, dispatch_queue_invoke2);
+ _dispatch_queue_class_invoke(dq, dic, flags, 0, dispatch_queue_invoke2);
}
#pragma mark -
#if HAVE_PTHREAD_WORKQUEUE_QOS
void
_dispatch_queue_override_invoke(dispatch_continuation_t dc,
- dispatch_invoke_flags_t flags)
+ dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
dispatch_queue_t old_rq = _dispatch_queue_get_current();
dispatch_queue_t assumed_rq = dc->dc_other;
+ dispatch_priority_t old_dp;
voucher_t ov = DISPATCH_NO_VOUCHER;
dispatch_object_t dou;
dou._do = dc->dc_data;
- _dispatch_queue_set_current(assumed_rq);
- flags |= DISPATCH_INVOKE_OVERRIDING;
+ old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
flags |= DISPATCH_INVOKE_STEALING;
} else {
}
_dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
if (_dispatch_object_has_vtable(dou._do)) {
- dx_invoke(dou._do, flags);
+ dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, ov, flags);
}
});
+ _dispatch_reset_basepri(old_dp);
_dispatch_queue_set_current(old_rq);
}
DISPATCH_ALWAYS_INLINE
static inline bool
-_dispatch_need_global_root_queue_override(dispatch_queue_t rq,
- pthread_priority_t pp)
+_dispatch_root_queue_push_needs_override(dispatch_queue_t rq,
+ dispatch_qos_t qos)
{
- pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
+ dispatch_qos_t rqos = _dispatch_priority_qos(rq->dq_priority);
+ bool defaultqueue = rq->dq_priority & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE;
- if (unlikely(!rqp)) return false;
+ if (unlikely(!rqos)) return false;
- pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- return defaultqueue ? pp && pp != rqp : pp > rqp;
+ return defaultqueue ? qos && qos != rqos : qos > rqos;
}
DISPATCH_ALWAYS_INLINE
static inline bool
-_dispatch_need_global_root_queue_override_stealer(dispatch_queue_t rq,
- pthread_priority_t pp, dispatch_wakeup_flags_t wflags)
+_dispatch_root_queue_push_queue_override_needed(dispatch_queue_t rq,
+ dispatch_qos_t qos)
{
- pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
-
- if (unlikely(!rqp)) return false;
-
- if (wflags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
- if (!(wflags & _DISPATCH_WAKEUP_OVERRIDE_BITS)) {
- return false;
- }
- }
-
- pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- return defaultqueue || pp > rqp;
+ // for root queues, the override is the guaranteed minimum override level
+ return qos > _dispatch_priority_override_qos(rq->dq_priority);
}
DISPATCH_NOINLINE
static void
_dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
- dispatch_object_t dou, pthread_priority_t pp)
+ dispatch_object_t dou, dispatch_qos_t qos)
{
- bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
- dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
+ bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
+ dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
dispatch_continuation_t dc = dou._dc;
if (_dispatch_object_is_redirection(dc)) {
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}
-
- DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
- _dispatch_queue_push_inline(rq, dc, 0, 0);
+ _dispatch_root_queue_push_inline(rq, dc, dc, 1);
}
DISPATCH_NOINLINE
static void
_dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
- dispatch_queue_t dq, pthread_priority_t pp)
+ dispatch_queue_t dq, dispatch_qos_t qos)
{
- bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
- dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
+ bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
+ dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
dispatch_continuation_t dc = _dispatch_continuation_alloc();
dc->do_vtable = DC_VTABLE(OVERRIDE_STEALING);
- _dispatch_retain(dq);
+ _dispatch_retain_2(dq);
dc->dc_func = NULL;
dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dq;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
-
- DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
- _dispatch_queue_push_inline(rq, dc, 0, 0);
+ _dispatch_root_queue_push_inline(rq, dc, dc, 1);
}
DISPATCH_NOINLINE
static void
-_dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
- pthread_priority_t pp, dispatch_wakeup_flags_t flags, uint64_t dq_state)
+_dispatch_queue_class_wakeup_with_override_slow(dispatch_queue_t dq,
+ uint64_t dq_state, dispatch_wakeup_flags_t flags)
{
- mach_port_t owner = _dq_state_drain_owner(dq_state);
- pthread_priority_t pp2;
+ dispatch_qos_t oqos, qos = _dq_state_max_qos(dq_state);
dispatch_queue_t tq;
bool locked;
- if (owner) {
- int rc = _dispatch_wqthread_override_start_check_owner(owner, pp,
+ if (_dq_state_is_base_anon(dq_state)) {
+ mach_port_t owner = _dq_state_drain_owner(dq_state);
+ if (owner) {
+ (void)_dispatch_wqthread_override_start_check_owner(owner, qos,
&dq->dq_state_lock);
- // EPERM means the target of the override is not a work queue thread
- // and could be a thread bound queue such as the main queue.
- // When that happens we must get to that queue and wake it up if we
- // want the override to be appplied and take effect.
- if (rc != EPERM) {
goto out;
}
}
- if (_dq_state_is_suspended(dq_state)) {
- goto out;
- }
-
tq = dq->do_targetq;
- if (_dispatch_queue_has_immutable_target(dq)) {
+ if (likely(!_dispatch_queue_is_legacy(dq))) {
locked = false;
} else if (_dispatch_is_in_root_queues_array(tq)) {
// avoid locking when we recognize the target queue as a global root
// queue it is gross, but is a very common case. The locking isn't
// needed because these target queues cannot go away.
locked = false;
- } else if (_dispatch_queue_sidelock_trylock(dq, pp)) {
+ } else if (_dispatch_queue_sidelock_trylock(dq, qos)) {
// <rdar://problem/17735825> to traverse the tq chain safely we must
// lock it to ensure it cannot change
locked = true;
//
// Leading to being there, the current thread has:
// 1. enqueued an object on `dq`
- // 2. raised the dq_override value of `dq`
- // 3. set the HAS_OVERRIDE bit and not seen an owner
- // 4. tried and failed to acquire the side lock
- //
+ // 2. raised the max_qos value, set RECEIVED_OVERRIDE on `dq`
+ // and didn't see an owner
+ // 3. tried and failed to acquire the side lock
//
// The side lock owner can only be one of three things:
//
// the eventual dispatch_resume().
//
// - A dispatch_set_target_queue() call. The fact that we saw no `owner`
- // means that the trysync it does wasn't being drained when (3)
+ // means that the trysync it does wasn't being drained when (2)
// happened which can only be explained by one of these interleavings:
//
// o `dq` became idle between when the object queued in (1) ran and
// the set_target_queue call and we were unlucky enough that our
- // step (3) happened while this queue was idle. There is no reason
+ // step (2) happened while this queue was idle. There is no reason
// to override anything anymore, the queue drained to completion
// while we were preempted, our job is done.
//
- // o `dq` is queued but not draining during (1-3), then when we try
- // to lock at (4) the queue is now draining a set_target_queue.
- // Since we set HAS_OVERRIDE with a release barrier, the effect of
- // (2) was visible to the drainer when he acquired the drain lock,
- // and that guy has applied our override. Our job is done.
+ // o `dq` is queued but not draining during (1-2), then when we try
+ // to lock at (3) the queue is now draining a set_target_queue.
+ // This drainer must have seen the effects of (2) and that guy has
+ // applied our override. Our job is done.
//
// - Another instance of _dispatch_queue_class_wakeup_with_override(),
// which is fine because trylock leaves a hint that we failed our
}
apply_again:
- if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
- if (_dispatch_need_global_root_queue_override_stealer(tq, pp, flags)) {
- _dispatch_root_queue_push_override_stealer(tq, dq, pp);
+ if (dx_hastypeflag(tq, QUEUE_ROOT)) {
+ if (_dispatch_root_queue_push_queue_override_needed(tq, qos)) {
+ _dispatch_root_queue_push_override_stealer(tq, dq, qos);
}
- } else if (flags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
- dx_wakeup(tq, pp, flags);
- } else if (_dispatch_queue_need_override(tq, pp)) {
- dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING);
+ } else if (_dispatch_queue_need_override(tq, qos)) {
+ dx_wakeup(tq, qos, 0);
}
while (unlikely(locked && !_dispatch_queue_sidelock_tryunlock(dq))) {
// rdar://problem/24081326
// tried to acquire the side lock while we were running, and could have
// had a better override than ours to apply.
//
- pp2 = dq->dq_override;
- if (pp2 > pp) {
- pp = pp2;
+ oqos = _dq_state_max_qos(os_atomic_load2o(dq, dq_state, relaxed));
+ if (oqos > qos) {
+ qos = oqos;
// The other instance had a better priority than ours, override
// our thread, and apply the override that wasn't applied to `dq`
// because of us.
}
out:
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
}
}
+
+
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
+ uint64_t dq_state, dispatch_wakeup_flags_t flags)
+{
+ dispatch_assert(_dq_state_should_override(dq_state));
+
+ return _dispatch_queue_class_wakeup_with_override_slow(dq, dq_state, flags);
+}
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
DISPATCH_NOINLINE
void
-_dispatch_queue_class_override_drainer(dispatch_queue_t dq,
- pthread_priority_t pp, dispatch_wakeup_flags_t flags)
+_dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
+ dispatch_qos_t qos)
{
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- uint64_t dq_state, value;
-
- //
- // Someone is trying to override the last work item of the queue.
- // Do not remember this override on the queue because we know the precise
- // duration the override is required for: until the current drain unlocks.
- //
- // That is why this function only tries to set HAS_OVERRIDE if we can
- // still observe a drainer, and doesn't need to set the DIRTY bit
- // because oq_override wasn't touched and there is no race to resolve
- //
- os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
- if (!_dq_state_drain_locked(dq_state)) {
- os_atomic_rmw_loop_give_up(break);
+#if DISPATCH_USE_KEVENT_WORKQUEUE
+ dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
+ if (unlikely(ddi && ddi->ddi_can_stash)) {
+ dispatch_object_t old_dou = ddi->ddi_stashed_dou;
+ dispatch_priority_t rq_overcommit;
+ rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
+
+ if (likely(!old_dou._do || rq_overcommit)) {
+ dispatch_queue_t old_rq = ddi->ddi_stashed_rq;
+ dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
+ ddi->ddi_stashed_rq = rq;
+ ddi->ddi_stashed_dou = dou;
+ ddi->ddi_stashed_qos = qos;
+ _dispatch_debug("deferring item %p, rq %p, qos %d",
+ dou._do, rq, qos);
+ if (rq_overcommit) {
+ ddi->ddi_can_stash = false;
+ }
+ if (likely(!old_dou._do)) {
+ return;
+ }
+ // push the previously stashed item
+ qos = old_qos;
+ rq = old_rq;
+ dou = old_dou;
}
- value = dq_state | DISPATCH_QUEUE_HAS_OVERRIDE;
- });
- if (_dq_state_drain_locked(dq_state)) {
- return _dispatch_queue_class_wakeup_with_override(dq, pp,
- flags, dq_state);
}
-#else
- (void)pp;
-#endif // HAVE_PTHREAD_WORKQUEUE_QOS
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
+#endif
+#if HAVE_PTHREAD_WORKQUEUE_QOS
+ if (_dispatch_root_queue_push_needs_override(rq, qos)) {
+ return _dispatch_root_queue_push_override(rq, dou, qos);
}
+#else
+ (void)qos;
+#endif
+ _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
-#if DISPATCH_USE_KEVENT_WORKQUEUE
-DISPATCH_NOINLINE
-static void
-_dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou,
- pthread_priority_t pp, dispatch_deferred_items_t ddi)
-{
- dispatch_priority_t old_pp = ddi->ddi_stashed_pp;
- dispatch_queue_t old_dq = ddi->ddi_stashed_dq;
- struct dispatch_object_s *old_dou = ddi->ddi_stashed_dou;
- dispatch_priority_t rq_overcommit;
-
- rq_overcommit = dq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
- if (likely(!old_pp || rq_overcommit)) {
- ddi->ddi_stashed_dq = dq;
- ddi->ddi_stashed_dou = dou._do;
- ddi->ddi_stashed_pp = (dispatch_priority_t)pp | rq_overcommit |
- _PTHREAD_PRIORITY_PRIORITY_MASK;
- if (likely(!old_pp)) {
- return;
- }
- // push the previously stashed item
- pp = old_pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
- dq = old_dq;
- dou._do = old_dou;
+void
+_dispatch_root_queue_wakeup(dispatch_queue_t dq,
+ DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
+{
+ if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
+ DISPATCH_INTERNAL_CRASH(dq->dq_priority,
+ "Don't try to wake up or override a root queue");
}
- if (_dispatch_need_global_root_queue_override(dq, pp)) {
- return _dispatch_root_queue_push_override(dq, dou, pp);
+ if (flags & DISPATCH_WAKEUP_CONSUME_2) {
+ return _dispatch_release_2_tailcall(dq);
}
- // bit of cheating: we should really pass `pp` but we know that we are
- // pushing onto a global queue at this point, and we just checked that
- // `pp` doesn't matter.
- DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
- _dispatch_queue_push_inline(dq, dou, 0, 0);
}
-#endif
DISPATCH_NOINLINE
-static void
-_dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou,
- pthread_priority_t pp)
+void
+_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
+ dispatch_qos_t qos)
{
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
- _dispatch_queue_push(dq, dou, pp);
+ _dispatch_queue_push_inline(dq, dou, qos);
}
DISPATCH_NOINLINE
void
-_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
- pthread_priority_t pp)
+_dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
+ dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
- _dispatch_assert_is_valid_qos_override(pp);
- if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
-#if DISPATCH_USE_KEVENT_WORKQUEUE
- dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
- if (unlikely(ddi && !(ddi->ddi_stashed_pp &
- (dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) {
- dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE);
- return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi);
+ dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
+
+ if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
+ _dispatch_retain_2(dq);
+ flags |= DISPATCH_WAKEUP_CONSUME_2;
+ }
+
+ if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
+ //
+ // _dispatch_queue_class_barrier_complete() is about what both regular
+ // queues and sources needs to evaluate, but the former can have sync
+ // handoffs to perform which _dispatch_queue_class_barrier_complete()
+ // doesn't handle, only _dispatch_queue_barrier_complete() does.
+ //
+ // _dispatch_queue_wakeup() is the one for plain queues that calls
+ // _dispatch_queue_barrier_complete(), and this is only taken for non
+ // queue types.
+ //
+ dispatch_assert(dx_metatype(dq) != _DISPATCH_QUEUE_TYPE);
+ qos = _dispatch_queue_override_qos(dq, qos);
+ return _dispatch_queue_class_barrier_complete(dq, qos, flags, target,
+ DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
+ }
+
+ if (target) {
+ uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
+ if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
+ enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
+ }
+ qos = _dispatch_queue_override_qos(dq, qos);
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
+ new_state = _dq_state_merge_qos(old_state, qos);
+ if (likely(!_dq_state_is_suspended(old_state) &&
+ !_dq_state_is_enqueued(old_state) &&
+ (!_dq_state_drain_locked(old_state) ||
+ (enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
+ _dq_state_is_base_wlh(old_state))))) {
+ new_state |= enqueue;
+ }
+ if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
+ new_state |= DISPATCH_QUEUE_DIRTY;
+ } else if (new_state == old_state) {
+ os_atomic_rmw_loop_give_up(goto done);
+ }
+ });
+
+ if (likely((old_state ^ new_state) & enqueue)) {
+ dispatch_queue_t tq;
+ if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
+ // the rmw_loop above has no acquire barrier, as the last block
+ // of a queue asyncing to that queue is not an uncommon pattern
+ // and in that case the acquire would be completely useless
+ //
+ // so instead use depdendency ordering to read
+ // the targetq pointer.
+ os_atomic_thread_fence(dependency);
+ tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
+ (long)new_state);
+ } else {
+ tq = target;
+ }
+ dispatch_assert(_dq_state_is_enqueued(new_state));
+ return _dispatch_queue_push_queue(tq, dq, new_state);
}
-#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
- // can't use dispatch_once_f() as it would create a frame
- if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) {
- return _dispatch_queue_push_slow(dq, dou, pp);
+ if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
+ if (_dq_state_should_override(new_state)) {
+ return _dispatch_queue_class_wakeup_with_override(dq, new_state,
+ flags);
+ }
}
- if (_dispatch_need_global_root_queue_override(dq, pp)) {
- return _dispatch_root_queue_push_override(dq, dou, pp);
+ } else if (qos) {
+ //
+ // Someone is trying to override the last work item of the queue.
+ //
+ uint64_t old_state, new_state;
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ if (!_dq_state_drain_locked(old_state) ||
+ !_dq_state_is_enqueued(old_state)) {
+ os_atomic_rmw_loop_give_up(goto done);
+ }
+ new_state = _dq_state_merge_qos(old_state, qos);
+ if (new_state == old_state) {
+ os_atomic_rmw_loop_give_up(goto done);
+ }
+ });
+ if (_dq_state_should_override(new_state)) {
+ return _dispatch_queue_class_wakeup_with_override(dq, new_state,
+ flags);
}
-#endif
+#endif // HAVE_PTHREAD_WORKQUEUE_QOS
+ }
+done:
+ if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
+ return _dispatch_release_2_tailcall(dq);
}
- _dispatch_queue_push_inline(dq, dou, pp, 0);
}
DISPATCH_NOINLINE
static void
-_dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq, pthread_priority_t pp,
- dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
+_dispatch_queue_push_sync_waiter(dispatch_queue_t dq,
+ dispatch_sync_context_t dsc, dispatch_qos_t qos)
{
- dispatch_queue_t tq;
+ uint64_t old_state, new_state;
- if (flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAS_OVERRIDDEN)) {
- // _dispatch_queue_drain_try_unlock may have reset the override while
- // we were becoming the enqueuer
- _dispatch_queue_reinstate_override_priority(dq, (dispatch_priority_t)pp);
- }
- if (!(flags & DISPATCH_WAKEUP_CONSUME)) {
- _dispatch_retain(dq);
+ if (unlikely(dx_type(dq) == DISPATCH_QUEUE_NETWORK_EVENT_TYPE)) {
+ DISPATCH_CLIENT_CRASH(0,
+ "dispatch_sync onto a network event queue");
}
- if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
- // try_become_enqueuer has no acquire barrier, as the last block
- // of a queue asyncing to that queue is not an uncommon pattern
- // and in that case the acquire is completely useless
- //
- // so instead use a thread fence here when we will read the targetq
- // pointer because that is the only thing that really requires
- // that barrier.
- os_atomic_thread_fence(acquire);
- tq = dq->do_targetq;
- } else {
- dispatch_assert(target == DISPATCH_QUEUE_WAKEUP_MGR);
- tq = &_dispatch_mgr_q;
- }
- return _dispatch_queue_push(tq, dq, pp);
-}
-
-DISPATCH_NOINLINE
-void
-_dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
- dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
-{
- uint64_t old_state, new_state, bits = 0;
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
-#endif
+ _dispatch_trace_continuation_push(dq, dsc->_as_dc);
- if (flags & DISPATCH_WAKEUP_FLUSH) {
- bits = DISPATCH_QUEUE_DIRTY;
- }
- if (flags & DISPATCH_WAKEUP_OVERRIDING) {
- //
- // Setting the dirty bit here is about forcing callers of
- // _dispatch_queue_drain_try_unlock() to loop again when an override
- // has just been set to close the following race:
- //
- // Drainer (in drain_try_unlokc():
- // override_reset();
- // preempted....
- //
- // Enqueuer:
- // atomic_or(oq_override, override, relaxed);
- // atomic_or(dq_state, HAS_OVERRIDE, release);
- //
- // Drainer:
- // ... resumes
- // successful drain_unlock() and leaks `oq_override`
- //
- bits = DISPATCH_QUEUE_DIRTY | DISPATCH_QUEUE_HAS_OVERRIDE;
- }
+ if (unlikely(_dispatch_queue_push_update_tail(dq, dsc->_as_do))) {
+ // for slow waiters, we borrow the reference of the caller
+ // so we don't need to protect the wakeup with a temporary retain
+ _dispatch_queue_push_update_head(dq, dsc->_as_do);
+ if (unlikely(_dispatch_queue_is_thread_bound(dq))) {
+ return dx_wakeup(dq, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
+ }
- if (flags & DISPATCH_WAKEUP_SLOW_WAITER) {
uint64_t pending_barrier_width =
(dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
- uint64_t xor_owner_and_set_full_width_and_in_barrier =
- _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
- DISPATCH_QUEUE_IN_BARRIER;
-
-#ifdef DLOCK_NOWAITERS_BIT
- bits |= DLOCK_NOWAITERS_BIT;
-#else
- bits |= DLOCK_WAITERS_BIT;
-#endif
- flags ^= DISPATCH_WAKEUP_SLOW_WAITER;
- dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME));
-
+ uint64_t set_owner_and_set_full_width_and_in_barrier =
+ _dispatch_lock_value_for_self() |
+ DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
+ // similar to _dispatch_queue_drain_try_unlock()
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
- new_state = old_state | bits;
- if (_dq_state_drain_pended(old_state)) {
- // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
- // but we want to be more efficient wrt the WAITERS_BIT
- new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK;
- new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED;
- }
- if (unlikely(_dq_state_drain_locked(new_state))) {
-#ifdef DLOCK_NOWAITERS_BIT
- new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT;
-#endif
- } else if (unlikely(!_dq_state_is_runnable(new_state) ||
- !(flags & DISPATCH_WAKEUP_FLUSH))) {
- // either not runnable, or was not for the first item (26700358)
- // so we should not try to lock and handle overrides instead
+ new_state = _dq_state_merge_qos(old_state, qos);
+ new_state |= DISPATCH_QUEUE_DIRTY;
+ if (unlikely(_dq_state_drain_locked(old_state) ||
+ !_dq_state_is_runnable(old_state))) {
+ // not runnable, so we should just handle overrides
+ } else if (_dq_state_is_base_wlh(old_state) &&
+ _dq_state_is_enqueued(old_state)) {
+ // 32123779 let the event thread redrive since it's out already
} else if (_dq_state_has_pending_barrier(old_state) ||
new_state + pending_barrier_width <
DISPATCH_QUEUE_WIDTH_FULL_BIT) {
// see _dispatch_queue_drain_try_lock
new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
- new_state ^= xor_owner_and_set_full_width_and_in_barrier;
- } else {
- new_state |= DISPATCH_QUEUE_ENQUEUED;
+ new_state |= set_owner_and_set_full_width_and_in_barrier;
}
});
+
+ if (_dq_state_is_base_wlh(old_state) &&
+ (dsc->dsc_waiter == _dispatch_tid_self())) {
+ dsc->dsc_wlh_was_first = true;
+ }
+
if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
- return _dispatch_try_lock_transfer_or_wakeup(dq);
+ return _dispatch_queue_barrier_complete(dq, qos, 0);
}
- } else if (bits) {
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
- new_state = old_state | bits;
- if (likely(_dq_state_should_wakeup(old_state))) {
- new_state |= DISPATCH_QUEUE_ENQUEUED;
+#if HAVE_PTHREAD_WORKQUEUE_QOS
+ if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
+ if (_dq_state_should_override(new_state)) {
+ return _dispatch_queue_class_wakeup_with_override(dq,
+ new_state, 0);
}
- });
- } else {
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed,{
- new_state = old_state;
- if (likely(_dq_state_should_wakeup(old_state))) {
- new_state |= DISPATCH_QUEUE_ENQUEUED;
- } else {
- os_atomic_rmw_loop_give_up(break);
+ }
+ } else if (unlikely(qos)) {
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
+ new_state = _dq_state_merge_qos(old_state, qos);
+ if (old_state == new_state) {
+ os_atomic_rmw_loop_give_up(return);
}
});
- }
-
- if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
- return _dispatch_queue_class_wakeup_enqueue(dq, pp, flags, target);
- }
-
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- if ((flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAITER_HANDOFF))
- && target == DISPATCH_QUEUE_WAKEUP_TARGET) {
- return _dispatch_queue_class_wakeup_with_override(dq, pp,
- flags, new_state);
- }
-#endif
-
- if (flags & DISPATCH_WAKEUP_CONSUME) {
- return _dispatch_release_tailcall(dq);
+ if (_dq_state_should_override(new_state)) {
+ return _dispatch_queue_class_wakeup_with_override(dq, new_state, 0);
+ }
+#endif // HAVE_PTHREAD_WORKQUEUE_QOS
}
}
(void)os_atomic_dec2o(qc, dgq_pending, relaxed);
}
if (!available) {
- _dispatch_global_queue_poke(dq);
+ _dispatch_global_queue_poke(dq, 1, 0);
}
return available;
}
goto out;
}
// There must be a next item now.
- _dispatch_wait_until(next = head->do_next);
+ next = os_mpsc_get_next(head, do_next);
}
os_atomic_store2o(dq, dq_items_head, next, relaxed);
- _dispatch_global_queue_poke(dq);
+ _dispatch_global_queue_poke(dq, 1, 0);
out:
return head;
}
+#if DISPATCH_USE_KEVENT_WORKQUEUE
void
-_dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq,
- struct dispatch_object_s *dou, pthread_priority_t pp)
-{
- struct _dispatch_identity_s di;
+_dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi
+ DISPATCH_PERF_MON_ARGS_PROTO)
+{
+ dispatch_queue_t rq = ddi->ddi_stashed_rq;
+ dispatch_queue_t dq = ddi->ddi_stashed_dou._dq;
+ _dispatch_queue_set_current(rq);
+ dispatch_priority_t old_pri = _dispatch_set_basepri_wlh(rq->dq_priority);
+ dispatch_invoke_context_s dic = { };
+ dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
+ DISPATCH_INVOKE_REDIRECTING_DRAIN | DISPATCH_INVOKE_WLH;
+ _dispatch_queue_drain_init_narrowing_check_deadline(&dic, rq->dq_priority);
+ uint64_t dq_state;
+
+ ddi->ddi_wlh_servicing = true;
+ if (unlikely(_dispatch_needs_to_return_to_kernel())) {
+ _dispatch_return_to_kernel();
+ }
+retry:
+ dispatch_assert(ddi->ddi_wlh_needs_delete);
+ _dispatch_trace_continuation_pop(rq, dq);
+
+ if (_dispatch_queue_drain_try_lock_wlh(dq, &dq_state)) {
+ dx_invoke(dq, &dic, flags);
+ if (!ddi->ddi_wlh_needs_delete) {
+ goto park;
+ }
+ dq_state = os_atomic_load2o(dq, dq_state, relaxed);
+ if (unlikely(!_dq_state_is_base_wlh(dq_state))) { // rdar://32671286
+ goto park;
+ }
+ if (unlikely(_dq_state_is_enqueued_on_target(dq_state))) {
+ _dispatch_retain(dq);
+ _dispatch_trace_continuation_push(dq->do_targetq, dq);
+ goto retry;
+ }
+ } else {
+ if (_dq_state_is_suspended(dq_state)) {
+ dispatch_assert(!_dq_state_is_enqueued(dq_state));
+ _dispatch_release_2_no_dispose(dq);
+ } else {
+ dispatch_assert(_dq_state_is_enqueued(dq_state));
+ dispatch_assert(_dq_state_drain_locked(dq_state));
+ _dispatch_release_no_dispose(dq);
+ }
+ }
- // fake that we queued `dou` on `dq` for introspection purposes
- _dispatch_trace_continuation_push(dq, dou);
+ _dispatch_event_loop_leave_deferred((dispatch_wlh_t)dq, dq_state);
- pp = _dispatch_priority_inherit_from_root_queue(pp, dq);
- _dispatch_queue_set_current(dq);
- _dispatch_root_queue_identity_assume(&di, pp);
+park:
+ // event thread that could steal
+ _dispatch_perfmon_end(perfmon_thread_event_steal);
+ _dispatch_reset_basepri(old_pri);
+ _dispatch_reset_basepri_override();
+ _dispatch_queue_set_current(NULL);
+
+ _dispatch_voucher_debug("root queue clear", NULL);
+ _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
+}
+
+void
+_dispatch_root_queue_drain_deferred_item(dispatch_deferred_items_t ddi
+ DISPATCH_PERF_MON_ARGS_PROTO)
+{
+ dispatch_queue_t rq = ddi->ddi_stashed_rq;
+ _dispatch_queue_set_current(rq);
+ dispatch_priority_t old_pri = _dispatch_set_basepri(rq->dq_priority);
+
+ dispatch_invoke_context_s dic = { };
+ dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
+ DISPATCH_INVOKE_REDIRECTING_DRAIN;
#if DISPATCH_COCOA_COMPAT
- void *pool = _dispatch_last_resort_autorelease_pool_push();
+ _dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
+ _dispatch_queue_drain_init_narrowing_check_deadline(&dic, rq->dq_priority);
+ _dispatch_continuation_pop_inline(ddi->ddi_stashed_dou, &dic, flags, rq);
- _dispatch_perfmon_start();
- _dispatch_continuation_pop_inline(dou, dq,
- DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
- _dispatch_perfmon_workitem_inc();
- _dispatch_perfmon_end();
-
+ // event thread that could steal
+ _dispatch_perfmon_end(perfmon_thread_event_steal);
#if DISPATCH_COCOA_COMPAT
- _dispatch_last_resort_autorelease_pool_pop(pool);
+ _dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
- _dispatch_reset_defaultpriority(di.old_pp);
+ _dispatch_reset_basepri(old_pri);
+ _dispatch_reset_basepri_override();
_dispatch_queue_set_current(NULL);
_dispatch_voucher_debug("root queue clear", NULL);
_dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
}
+#endif
DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe)
static void
-_dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pri)
+_dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
#if DISPATCH_DEBUG
dispatch_queue_t cq;
}
#endif
_dispatch_queue_set_current(dq);
- if (dq->dq_priority) pri = dq->dq_priority;
- pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri, NULL);
-#if DISPATCH_COCOA_COMPAT
- void *pool = _dispatch_last_resort_autorelease_pool_push();
-#endif // DISPATCH_COCOA_COMPAT
+ dispatch_priority_t pri = dq->dq_priority;
+ if (!pri) pri = _dispatch_priority_from_pp(pp);
+ dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
+ _dispatch_adopt_wlh_anon();
- _dispatch_perfmon_start();
struct dispatch_object_s *item;
bool reset = false;
+ dispatch_invoke_context_s dic = { };
+#if DISPATCH_COCOA_COMPAT
+ _dispatch_last_resort_autorelease_pool_push(&dic);
+#endif // DISPATCH_COCOA_COMPAT
+ dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
+ DISPATCH_INVOKE_REDIRECTING_DRAIN;
+ _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
+ _dispatch_perfmon_start();
while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
if (reset) _dispatch_wqthread_override_reset();
- _dispatch_continuation_pop_inline(item, dq,
- DISPATCH_INVOKE_WORKER_DRAIN|DISPATCH_INVOKE_REDIRECTING_DRAIN);
- _dispatch_perfmon_workitem_inc();
- reset = _dispatch_reset_defaultpriority_override();
+ _dispatch_continuation_pop_inline(item, &dic, flags, dq);
+ reset = _dispatch_reset_basepri_override();
+ if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
+ break;
+ }
+ }
+
+ // overcommit or not. worker thread
+ if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
+ _dispatch_perfmon_end(perfmon_thread_worker_oc);
+ } else {
+ _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
}
- _dispatch_perfmon_end();
#if DISPATCH_COCOA_COMPAT
- _dispatch_last_resort_autorelease_pool_pop(pool);
+ _dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
- _dispatch_reset_defaultpriority(old_dp);
+ _dispatch_reset_wlh();
+ _dispatch_reset_basepri(old_dbp);
+ _dispatch_reset_basepri_override();
_dispatch_queue_set_current(NULL);
}
dispatch_root_queue_context_t qc = dq->do_ctxt;
_dispatch_introspection_thread_add();
- int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed);
+ int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
dispatch_assert(pending >= 0);
_dispatch_root_queue_drain(dq, _dispatch_get_priority());
_dispatch_voucher_debug("root queue clear", NULL);
dispatch_queue_t dq;
pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
- dq = _dispatch_get_root_queue_for_priority(pp, overcommit);
+ dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
return _dispatch_worker_thread4(dq);
}
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
-#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
static void
return _dispatch_worker_thread4(dq);
}
-#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
+#endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
#endif // HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
dispatch_root_queue_context_t qc = dq->do_ctxt;
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
+ int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
+ if (unlikely(pending < 0)) {
+ DISPATCH_INTERNAL_CRASH(pending, "Pending thread request underflow");
+ }
+
if (pqc->dpq_observer_hooks.queue_will_execute) {
_dispatch_set_pthread_root_queue_observer_hooks(
&pqc->dpq_observer_hooks);
pqc->dpq_thread_configure();
}
- sigset_t mask;
- int r;
// workaround tweaks the kernel workqueue does for us
- r = sigfillset(&mask);
- (void)dispatch_assume_zero(r);
- r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
- (void)dispatch_assume_zero(r);
+ _dispatch_sigmask();
_dispatch_introspection_thread_add();
+#if DISPATCH_USE_INTERNAL_WORKQUEUE
+ bool overcommit = (qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
+ bool manager = (dq == &_dispatch_mgr_root_queue);
+ bool monitored = !(overcommit || manager);
+ if (monitored) {
+ _dispatch_workq_worker_register(dq, qc->dgq_qos);
+ }
+#endif
+
const int64_t timeout = 5ull * NSEC_PER_SEC;
pthread_priority_t old_pri = _dispatch_get_priority();
do {
} while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
dispatch_time(0, timeout)) == 0);
+#if DISPATCH_USE_INTERNAL_WORKQUEUE
+ if (monitored) {
+ _dispatch_workq_worker_unregister(dq, qc->dgq_qos);
+ }
+#endif
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
- _dispatch_global_queue_poke(dq);
- _dispatch_release(dq);
-
+ _dispatch_global_queue_poke(dq, 1, 0);
+ _dispatch_release(dq); // retained in _dispatch_global_queue_poke_slow
return NULL;
}
+#endif // DISPATCH_USE_PTHREAD_POOL
-int
-_dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
-{
- int r;
-
- /* Workaround: 6269619 Not all signals can be delivered on any thread */
-
- r = sigdelset(set, SIGILL);
- (void)dispatch_assume_zero(r);
- r = sigdelset(set, SIGTRAP);
- (void)dispatch_assume_zero(r);
-#if HAVE_DECL_SIGEMT
- r = sigdelset(set, SIGEMT);
- (void)dispatch_assume_zero(r);
-#endif
- r = sigdelset(set, SIGFPE);
- (void)dispatch_assume_zero(r);
- r = sigdelset(set, SIGBUS);
- (void)dispatch_assume_zero(r);
- r = sigdelset(set, SIGSEGV);
- (void)dispatch_assume_zero(r);
- r = sigdelset(set, SIGSYS);
- (void)dispatch_assume_zero(r);
- r = sigdelset(set, SIGPIPE);
- (void)dispatch_assume_zero(r);
+#pragma mark -
+#pragma mark dispatch_network_root_queue
+#if TARGET_OS_MAC
- return pthread_sigmask(how, set, oset);
+dispatch_queue_t
+_dispatch_network_root_queue_create_4NW(const char *label,
+ const pthread_attr_t *attrs, dispatch_block_t configure)
+{
+ unsigned long flags = dispatch_pthread_root_queue_flags_pool_size(1);
+ return dispatch_pthread_root_queue_create(label, flags, attrs, configure);
}
-#endif // DISPATCH_USE_PTHREAD_POOL
+#endif // TARGET_OS_MAC
#pragma mark -
#pragma mark dispatch_runloop_queue
return DISPATCH_BAD_INPUT;
}
dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
- dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
- _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1, false);
- dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
+ dq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
+ _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1,
+ DISPATCH_QUEUE_ROLE_BASE_ANON);
+ dq->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
_dispatch_runloop_queue_handle_init(dq);
_dispatch_queue_set_bound_thread(dq);
{
_dispatch_object_debug(dq, "%s", __func__);
- pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq, true);
+ dispatch_qos_t qos = _dispatch_runloop_queue_reset_max_qos(dq);
_dispatch_queue_clear_bound_thread(dq);
- dx_wakeup(dq, pp, DISPATCH_WAKEUP_FLUSH);
- if (pp) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
+ dx_wakeup(dq, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
+ if (qos) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
}
void
-_dispatch_runloop_queue_dispose(dispatch_queue_t dq)
+_dispatch_runloop_queue_dispose(dispatch_queue_t dq, bool *allow_free)
{
_dispatch_object_debug(dq, "%s", __func__);
_dispatch_introspection_queue_dispose(dq);
_dispatch_runloop_queue_handle_dispose(dq);
- _dispatch_queue_destroy(dq);
+ _dispatch_queue_destroy(dq, allow_free);
}
bool
_dispatch_runloop_queue_wakeup(dq, 0, false);
}
+#if TARGET_OS_MAC
dispatch_runloop_handle_t
_dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
{
}
return _dispatch_runloop_queue_get_handle(dq);
}
+#endif
static void
_dispatch_runloop_queue_handle_init(void *ctxt)
void
dispatch_main(void)
{
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
-
+ _dispatch_root_queues_init();
#if HAVE_PTHREAD_MAIN_NP
if (pthread_main_np()) {
#endif
pthread_key_t dispatch_main_key;
pthread_key_create(&dispatch_main_key, _dispatch_sig_thread);
pthread_setspecific(dispatch_main_key, &dispatch_main_key);
+ _dispatch_sigmask();
#endif
pthread_exit(NULL);
DISPATCH_INTERNAL_CRASH(errno, "pthread_exit() returned");
_dispatch_queue_cleanup2(void)
{
dispatch_queue_t dq = &_dispatch_main_q;
- _dispatch_queue_clear_bound_thread(dq);
+ uint64_t old_state, new_state;
- // <rdar://problem/22623242>
- // Here is what happens when both this cleanup happens because of
- // dispatch_main() being called, and a concurrent enqueuer makes the queue
- // non empty.
- //
- // _dispatch_queue_cleanup2:
- // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
- // maximal_barrier();
- // if (load(dq_items_tail, seq_cst)) {
- // // do the wake up the normal serial queue way
- // } else {
- // // do no wake up <----
- // }
- //
- // enqueuer:
- // store(dq_items_tail, new_tail, release);
- // if (load(dq_is_thread_bound, relaxed)) {
- // // do the wake up the runloop way <----
- // } else {
- // // do the wake up the normal serial way
- // }
+ // Turning the main queue from a runloop queue into an ordinary serial queue
+ // is a 3 steps operation:
+ // 1. finish taking the main queue lock the usual way
+ // 2. clear the THREAD_BOUND flag
+ // 3. do a handoff
//
- // what would be bad is to take both paths marked <---- because the queue
- // wouldn't be woken up until the next time it's used (which may never
- // happen)
- //
- // An enqueuer that speculates the load of the old value of thread_bound
- // and then does the store may wake up the main queue the runloop way.
- // But then, the cleanup thread will see that store because the load
- // of dq_items_tail is sequentially consistent, and we have just thrown away
- // our pipeline.
- //
- // By the time cleanup2() is out of the maximally synchronizing barrier,
- // no other thread can speculate the wrong load anymore, and both cleanup2()
- // and a concurrent enqueuer would treat the queue in the standard non
- // thread bound way
-
- _dispatch_queue_atomic_flags_clear(dq,
- DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC);
- os_atomic_maximally_synchronizing_barrier();
- // no need to drop the override, the thread will die anyway
- // the barrier above includes an acquire, so it's ok to do this raw
- // call to dx_wakeup(0)
- dx_wakeup(dq, 0, 0);
+ // If an enqueuer executes concurrently, he may do the wakeup the runloop
+ // way, because he still believes the queue to be thread-bound, but the
+ // dirty bit will force this codepath to notice the enqueue, and the usual
+ // lock transfer will do the proper wakeup.
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
+ new_state = old_state & ~DISPATCH_QUEUE_DIRTY;
+ new_state += DISPATCH_QUEUE_WIDTH_INTERVAL;
+ new_state += DISPATCH_QUEUE_IN_BARRIER;
+ });
+ _dispatch_queue_atomic_flags_clear(dq, DQF_THREAD_BOUND|DQF_CANNOT_TRYSYNC);
+ _dispatch_queue_barrier_complete(dq, 0, 0);
// overload the "probably" variable to mean that dispatch_main() or
// similar non-POSIX API was called
#ifndef __linux__
if (_dispatch_program_is_probably_callback_driven) {
_dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
- _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
+ DISPATCH_QOS_DEFAULT, true), NULL, _dispatch_sig_thread);
sleep(1); // workaround 6778970
}
#endif
"Premature thread exit while a dispatch queue is running");
}
+static void
+_dispatch_wlh_cleanup(void *ctxt)
+{
+ // POSIX defines that destructors are only called if 'ctxt' is non-null
+ dispatch_queue_t wlh;
+ wlh = (dispatch_queue_t)((uintptr_t)ctxt & ~DISPATCH_WLH_STORAGE_REF);
+ _dispatch_queue_release_storage(wlh);
+}
+
+DISPATCH_NORETURN
static void
_dispatch_deferred_items_cleanup(void *ctxt)
{
"Premature thread exit with unhandled deferred items");
}
+DISPATCH_NORETURN
static void
_dispatch_frame_cleanup(void *ctxt)
{
"Premature thread exit while a dispatch frame is active");
}
+DISPATCH_NORETURN
static void
_dispatch_context_cleanup(void *ctxt)
{