X-Git-Url: https://git.saurik.com/apple/libdispatch.git/blobdiff_plain/98cf8cd208104c07d83445c24626e57ca11b5775..refs/heads/master:/src/source.c diff --git a/src/source.c b/src/source.c index b593ae0..a5573b6 100644 --- a/src/source.c +++ b/src/source.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008-2013 Apple Inc. All rights reserved. + * Copyright (c) 2008-2016 Apple Inc. All rights reserved. * * @APPLE_APACHE_LICENSE_HEADER_START@ * @@ -19,244 +19,204 @@ */ #include "internal.h" -#if HAVE_MACH -#include "protocol.h" -#include "protocolServer.h" -#endif -#include - -static void _dispatch_source_merge_kevent(dispatch_source_t ds, - const struct kevent64_s *ke); -static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp); -static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg); -static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, - uint32_t del_flags); -static void _dispatch_kevent_drain(struct kevent64_s *ke); -static void _dispatch_kevent_merge(struct kevent64_s *ke); -static void _dispatch_timers_kevent(struct kevent64_s *ke); -static void _dispatch_timers_unregister(dispatch_source_t ds, - dispatch_kevent_t dk); -static void _dispatch_timers_update(dispatch_source_t ds); -static void _dispatch_timer_aggregates_check(void); -static void _dispatch_timer_aggregates_register(dispatch_source_t ds); -static void _dispatch_timer_aggregates_update(dispatch_source_t ds, - unsigned int tidx); -static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds, - unsigned int tidx); + +static void _dispatch_source_handler_free(dispatch_source_t ds, long kind); +static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval); + +#define DISPATCH_TIMERS_UNREGISTER 0x1 +#define DISPATCH_TIMERS_RETAIN_2 0x2 +static void _dispatch_timers_update(dispatch_unote_t du, uint32_t flags); +static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt); + +static void _dispatch_source_timer_configure(dispatch_source_t ds); static inline unsigned long _dispatch_source_timer_data( - dispatch_source_refs_t dr, unsigned long prev); -static long _dispatch_kq_update(const struct kevent64_s *); -static void _dispatch_memorystatus_init(void); -#if HAVE_MACH -static void _dispatch_mach_host_calendar_change_register(void); -static void _dispatch_mach_recv_msg_buf_init(void); -static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk, - uint32_t new_flags, uint32_t del_flags); -static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, - uint32_t new_flags, uint32_t del_flags); -static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke); -#else -static inline void _dispatch_mach_host_calendar_change_register(void) {} -static inline void _dispatch_mach_recv_msg_buf_init(void) {} -#endif -static const char * _evfiltstr(short filt); -#if DISPATCH_DEBUG -static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str); -static void _dispatch_kevent_debugger(void *context); -#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \ - dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q) -#else -static inline void -_dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED, - const char* str DISPATCH_UNUSED) {} -#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() -#endif + dispatch_source_t ds, dispatch_unote_t du); #pragma mark - #pragma mark dispatch_source_t dispatch_source_t -dispatch_source_create(dispatch_source_type_t type, - uintptr_t handle, - unsigned long mask, - dispatch_queue_t q) +dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle, + unsigned long mask, dispatch_queue_t dq) { - const struct kevent64_s *proto_kev = &type->ke; + dispatch_source_refs_t dr; dispatch_source_t ds; - dispatch_kevent_t dk; - - // input validation - if (type == NULL || (mask & ~type->mask)) { - return NULL; - } - switch (type->ke.filter) { - case EVFILT_SIGNAL: - if (handle >= NSIG) { - return NULL; - } - break; - case EVFILT_FS: -#if DISPATCH_USE_VM_PRESSURE - case EVFILT_VM: -#endif -#if DISPATCH_USE_MEMORYSTATUS - case EVFILT_MEMORYSTATUS: -#endif - case DISPATCH_EVFILT_CUSTOM_ADD: - case DISPATCH_EVFILT_CUSTOM_OR: - if (handle) { - return NULL; - } - break; - case DISPATCH_EVFILT_TIMER: - if (!!handle ^ !!type->ke.ident) { - return NULL; - } - break; - default: - break; + dr = dux_create(dst, handle, mask)._dr; + if (unlikely(!dr)) { + return DISPATCH_BAD_INPUT; } - ds = _dispatch_alloc(DISPATCH_VTABLE(source), + ds = _dispatch_object_alloc(DISPATCH_VTABLE(source), sizeof(struct dispatch_source_s)); // Initialize as a queue first, then override some settings below. - _dispatch_queue_init((dispatch_queue_t)ds); + _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1, + DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER); ds->dq_label = "source"; - ds->do_ref_cnt++; // the reference the manager queue holds - ds->do_ref_cnt++; // since source is created suspended - ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; - // The initial target queue is the manager queue, in order to get - // the source installed. - ds->do_targetq = &_dispatch_mgr_q; - - dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); - dk->dk_kevent = *proto_kev; - dk->dk_kevent.ident = handle; - dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; - dk->dk_kevent.fflags |= (uint32_t)mask; - dk->dk_kevent.udata = (uintptr_t)dk; - TAILQ_INIT(&dk->dk_sources); - - ds->ds_dkev = dk; - ds->ds_pending_data_mask = dk->dk_kevent.fflags; - ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident; - if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) { - ds->ds_is_level = true; - ds->ds_needs_rearm = true; - } else if (!(EV_CLEAR & proto_kev->flags)) { - // we cheat and use EV_CLEAR to mean a "flag thingy" - ds->ds_is_adder = true; - } - // Some sources require special processing - if (type->init != NULL) { - type->init(ds, type, handle, mask, q); - } - dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder)); - - if (fastpath(!ds->ds_refs)) { - ds->ds_refs = _dispatch_calloc(1ul, - sizeof(struct dispatch_source_refs_s)); - } - ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds); - - // First item on the queue sets the user-specified target queue - dispatch_set_target_queue(ds, q); + ds->ds_refs = dr; + dr->du_owner_wref = _dispatch_ptr2wref(ds); + + if (slowpath(!dq)) { + dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true); + } else { + _dispatch_retain((dispatch_queue_t _Nonnull)dq); + } + ds->do_targetq = dq; + if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) { + _dispatch_source_set_interval(ds, handle); + } _dispatch_object_debug(ds, "%s", __func__); return ds; } void -_dispatch_source_dispose(dispatch_source_t ds) +_dispatch_source_dispose(dispatch_source_t ds, bool *allow_free) { _dispatch_object_debug(ds, "%s", __func__); - free(ds->ds_refs); - _dispatch_queue_destroy(ds); + _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER); + _dispatch_source_handler_free(ds, DS_EVENT_HANDLER); + _dispatch_source_handler_free(ds, DS_CANCEL_HANDLER); + _dispatch_unote_dispose(ds->ds_refs); + ds->ds_refs = NULL; + _dispatch_queue_destroy(ds->_as_dq, allow_free); } void _dispatch_source_xref_dispose(dispatch_source_t ds) { - _dispatch_wakeup(ds); -} - -void -dispatch_source_cancel(dispatch_source_t ds) -{ - _dispatch_object_debug(ds, "%s", __func__); - // Right after we set the cancel flag, someone else - // could potentially invoke the source, do the cancelation, - // unregister the source, and deallocate it. We would - // need to therefore retain/release before setting the bit - - _dispatch_retain(ds); - (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed); - _dispatch_wakeup(ds); - _dispatch_release(ds); + dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if (unlikely(!(dqf & (DQF_LEGACY|DSF_CANCELED)))) { + DISPATCH_CLIENT_CRASH(ds, "Release of a source that has not been " + "cancelled, but has a mandatory cancel handler"); + } + dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY); } long dispatch_source_testcancel(dispatch_source_t ds) { - return (bool)(ds->ds_atomic_flags & DSF_CANCELED); + return (bool)(ds->dq_atomic_flags & DSF_CANCELED); } unsigned long dispatch_source_get_mask(dispatch_source_t ds) { - unsigned long mask = ds->ds_pending_data_mask; - if (ds->ds_vmpressure_override) { - mask = NOTE_VM_PRESSURE; + dispatch_source_refs_t dr = ds->ds_refs; + if (ds->dq_atomic_flags & DSF_CANCELED) { + return 0; + } +#if DISPATCH_USE_MEMORYSTATUS + if (dr->du_vmpressure_override) { + return NOTE_VM_PRESSURE; } #if TARGET_IPHONE_SIMULATOR - else if (ds->ds_memorystatus_override) { - mask = NOTE_MEMORYSTATUS_PRESSURE_WARN; + if (dr->du_memorypressure_override) { + return NOTE_MEMORYSTATUS_PRESSURE_WARN; } #endif - return mask; +#endif // DISPATCH_USE_MEMORYSTATUS + return dr->du_fflags; } uintptr_t dispatch_source_get_handle(dispatch_source_t ds) { - unsigned int handle = (unsigned int)ds->ds_ident_hack; + dispatch_source_refs_t dr = ds->ds_refs; #if TARGET_IPHONE_SIMULATOR - if (ds->ds_memorystatus_override) { - handle = 0; + if (dr->du_memorypressure_override) { + return 0; } #endif - return handle; + return dr->du_ident; } unsigned long dispatch_source_get_data(dispatch_source_t ds) { - unsigned long data = ds->ds_data; - if (ds->ds_vmpressure_override) { - data = NOTE_VM_PRESSURE; +#if DISPATCH_USE_MEMORYSTATUS + dispatch_source_refs_t dr = ds->ds_refs; + if (dr->du_vmpressure_override) { + return NOTE_VM_PRESSURE; } #if TARGET_IPHONE_SIMULATOR - else if (ds->ds_memorystatus_override) { - data = NOTE_MEMORYSTATUS_PRESSURE_WARN; + if (dr->du_memorypressure_override) { + return NOTE_MEMORYSTATUS_PRESSURE_WARN; } #endif - return data; +#endif // DISPATCH_USE_MEMORYSTATUS + uint64_t value = os_atomic_load2o(ds, ds_data, relaxed); + return (unsigned long)( + ds->ds_refs->du_data_action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET + ? DISPATCH_SOURCE_GET_DATA(value) : value); +} + +size_t +dispatch_source_get_extended_data(dispatch_source_t ds, + dispatch_source_extended_data_t edata, size_t size) +{ + size_t target_size = MIN(size, + sizeof(struct dispatch_source_extended_data_s)); + if (size > 0) { + unsigned long data, status = 0; + if (ds->ds_refs->du_data_action + == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) { + uint64_t combined = os_atomic_load(&ds->ds_data, relaxed); + data = DISPATCH_SOURCE_GET_DATA(combined); + status = DISPATCH_SOURCE_GET_STATUS(combined); + } else { + data = dispatch_source_get_data(ds); + } + if (size >= offsetof(struct dispatch_source_extended_data_s, data) + + sizeof(edata->data)) { + edata->data = data; + } + if (size >= offsetof(struct dispatch_source_extended_data_s, status) + + sizeof(edata->status)) { + edata->status = status; + } + if (size > sizeof(struct dispatch_source_extended_data_s)) { + memset( + (char *)edata + sizeof(struct dispatch_source_extended_data_s), + 0, size - sizeof(struct dispatch_source_extended_data_s)); + } + } + return target_size; } +DISPATCH_NOINLINE void -dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) +_dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp, + unsigned long val) { - struct kevent64_s kev = { - .fflags = (typeof(kev.fflags))val, - .data = (typeof(kev.data))val, - }; + dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + int filter = ds->ds_refs->du_filter; + + if (unlikely(dqf & (DSF_CANCELED | DSF_DELETED))) { + return; + } + + switch (filter) { + case DISPATCH_EVFILT_CUSTOM_ADD: + os_atomic_add2o(ds, ds_pending_data, val, relaxed); + break; + case DISPATCH_EVFILT_CUSTOM_OR: + os_atomic_or2o(ds, ds_pending_data, val, relaxed); + break; + case DISPATCH_EVFILT_CUSTOM_REPLACE: + os_atomic_store2o(ds, ds_pending_data, val, relaxed); + break; + default: + DISPATCH_CLIENT_CRASH(filter, "Invalid source type"); + } - dispatch_assert( - ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD || - ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR); + dx_wakeup(ds, _dispatch_qos_from_pp(pp), DISPATCH_WAKEUP_MAKE_DIRTY); +} - _dispatch_source_merge_kevent(ds, &kev); +void +dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) +{ + _dispatch_source_merge_data(ds, 0, val); } #pragma mark - @@ -264,91 +224,130 @@ dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) DISPATCH_ALWAYS_INLINE static inline dispatch_continuation_t -_dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind, +_dispatch_source_get_handler(dispatch_source_refs_t dr, long kind) +{ + return os_atomic_load(&dr->ds_handler[kind], relaxed); +} +#define _dispatch_source_get_event_handler(dr) \ + _dispatch_source_get_handler(dr, DS_EVENT_HANDLER) +#define _dispatch_source_get_cancel_handler(dr) \ + _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER) +#define _dispatch_source_get_registration_handler(dr) \ + _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER) + +DISPATCH_ALWAYS_INLINE +static inline dispatch_continuation_t +_dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind, bool block) { + // sources don't propagate priority by default + const dispatch_block_flags_t flags = + DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER; dispatch_continuation_t dc = _dispatch_continuation_alloc(); - if (handler) { - dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT : - DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ? - DISPATCH_OBJ_ASYNC_BIT : 0l)); - dc->dc_priority = 0; - dc->dc_voucher = NULL; + if (func) { + uintptr_t dc_flags = 0; + + if (kind != DS_EVENT_HANDLER) { + dc_flags |= DISPATCH_OBJ_CONSUME_BIT; + } if (block) { #ifdef __BLOCKS__ - if (slowpath(_dispatch_block_has_private_data(handler))) { - // sources don't propagate priority by default - dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS; - flags |= _dispatch_block_get_flags(handler); - _dispatch_continuation_priority_set(dc, - _dispatch_block_get_priority(handler), flags); - } - if (kind != DS_EVENT_HANDLER) { - dc->dc_func = _dispatch_call_block_and_release; - } else { - dc->dc_func = _dispatch_Block_invoke(handler); - } - dc->dc_ctxt = _dispatch_Block_copy(handler); + _dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags); #endif /* __BLOCKS__ */ } else { - dc->dc_func = handler; - dc->dc_ctxt = ds->do_ctxt; + dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT; + _dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func, + 0, flags, dc_flags); } - _dispatch_trace_continuation_push((dispatch_queue_t)ds, dc); + _dispatch_trace_continuation_push(ds->_as_dq, dc); } else { + dc->dc_flags = 0; dc->dc_func = NULL; } - dc->dc_data = (void*)kind; return dc; } -static inline void -_dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind, - dispatch_continuation_t dc_new) +DISPATCH_NOINLINE +static void +_dispatch_source_handler_dispose(dispatch_continuation_t dc) { - dispatch_continuation_t dc = dr->ds_handler[kind]; - if (dc) { #ifdef __BLOCKS__ - if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) { - Block_release(dc->dc_ctxt); - } + if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) { + Block_release(dc->dc_ctxt); + } #endif /* __BLOCKS__ */ - if (dc->dc_voucher) { - _voucher_release(dc->dc_voucher); - dc->dc_voucher = NULL; - } - _dispatch_continuation_free(dc); + if (dc->dc_voucher) { + _voucher_release(dc->dc_voucher); + dc->dc_voucher = VOUCHER_INVALID; } - dr->ds_handler[kind] = dc_new; + _dispatch_continuation_free(dc); +} + +DISPATCH_ALWAYS_INLINE +static inline dispatch_continuation_t +_dispatch_source_handler_take(dispatch_source_t ds, long kind) +{ + return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed); +} + +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_source_handler_free(dispatch_source_t ds, long kind) +{ + dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind); + if (dc) _dispatch_source_handler_dispose(dc); } +DISPATCH_ALWAYS_INLINE static inline void -_dispatch_source_handler_free(dispatch_source_refs_t dr, long kind) +_dispatch_source_handler_replace(dispatch_source_t ds, long kind, + dispatch_continuation_t dc) { - _dispatch_source_handler_replace(dr, kind, NULL); + if (!dc->dc_func) { + _dispatch_continuation_free(dc); + dc = NULL; + } else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) { + dc->dc_ctxt = ds->do_ctxt; + } + dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release); + if (dc) _dispatch_source_handler_dispose(dc); } +DISPATCH_NOINLINE static void -_dispatch_source_set_handler(void *context) +_dispatch_source_set_handler_slow(void *context) { dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); + dispatch_continuation_t dc = context; long kind = (long)dc->dc_data; - dc->dc_data = 0; - if (!dc->dc_func) { - _dispatch_continuation_free(dc); - dc = NULL; - } else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { - dc->dc_ctxt = ds->do_ctxt; + dc->dc_data = NULL; + _dispatch_source_handler_replace(ds, kind, dc); +} + +DISPATCH_NOINLINE +static void +_dispatch_source_set_handler(dispatch_source_t ds, long kind, + dispatch_continuation_t dc) +{ + dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); + if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) { + _dispatch_source_handler_replace(ds, kind, dc); + return dx_vtable(ds)->do_resume(ds, false); } - _dispatch_source_handler_replace(ds->ds_refs, kind, dc); - if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) { -#if HAVE_PTHREAD_WORKQUEUE_QOS - ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK; - _dispatch_queue_set_override_priority((dispatch_queue_t)ds); -#endif + if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) { + DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source " + "after it has been activated"); + } + _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds); + if (kind == DS_REGISTN_HANDLER) { + _dispatch_bug_deprecated("Setting registration handler after " + "the source has been activated"); } + dc->dc_data = (void *)kind; + _dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc, + _dispatch_source_set_handler_slow, 0); } #ifdef __BLOCKS__ @@ -358,8 +357,7 @@ dispatch_source_set_event_handler(dispatch_source_t ds, { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc); } #endif /* __BLOCKS__ */ @@ -369,43 +367,67 @@ dispatch_source_set_event_handler_f(dispatch_source_t ds, { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc); } -void -_dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds, - void *ctxt, dispatch_function_t handler) +#ifdef __BLOCKS__ +DISPATCH_NOINLINE +static void +_dispatch_source_set_cancel_handler(dispatch_source_t ds, + dispatch_block_t handler) { dispatch_continuation_t dc; - dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false); - dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT); - dc->dc_other = dc->dc_ctxt; - dc->dc_ctxt = ctxt; - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true); + _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc); } -#ifdef __BLOCKS__ void dispatch_source_set_cancel_handler(dispatch_source_t ds, dispatch_block_t handler) { - dispatch_continuation_t dc; - dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) { + DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on " + "this source"); + } + return _dispatch_source_set_cancel_handler(ds, handler); } -#endif /* __BLOCKS__ */ void -dispatch_source_set_cancel_handler_f(dispatch_source_t ds, +dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds, + dispatch_block_t handler) +{ + _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY); + return _dispatch_source_set_cancel_handler(ds, handler); +} +#endif /* __BLOCKS__ */ + +DISPATCH_NOINLINE +static void +_dispatch_source_set_cancel_handler_f(dispatch_source_t ds, dispatch_function_t handler) { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc); +} + +void +dispatch_source_set_cancel_handler_f(dispatch_source_t ds, + dispatch_function_t handler) +{ + if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) { + DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on " + "this source"); + } + return _dispatch_source_set_cancel_handler_f(ds, handler); +} + +void +dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds, + dispatch_function_t handler) +{ + _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY); + return _dispatch_source_set_cancel_handler_f(ds, handler); } #ifdef __BLOCKS__ @@ -415,8 +437,7 @@ dispatch_source_set_registration_handler(dispatch_source_t ds, { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc); } #endif /* __BLOCKS__ */ @@ -426,151 +447,294 @@ dispatch_source_set_registration_handler_f(dispatch_source_t ds, { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false); - _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, - _dispatch_source_set_handler); + _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc); } #pragma mark - #pragma mark dispatch_source_invoke +bool +_dispatch_source_will_reenable_kevent_4NW(dispatch_source_t ds) +{ + uint64_t dq_state = os_atomic_load2o(ds, dq_state, relaxed); + dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + + if (unlikely(!_dq_state_drain_locked_by_self(dq_state))) { + DISPATCH_CLIENT_CRASH(0, "_dispatch_source_will_reenable_kevent_4NW " + "not called from within the event handler"); + } + + return _dispatch_unote_needs_rearm(ds->ds_refs) && !(dqf & DSF_ARMED); +} + static void -_dispatch_source_registration_callout(dispatch_source_t ds) +_dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq, + dispatch_invoke_flags_t flags) { - dispatch_source_refs_t dr = ds->ds_refs; - dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER]; - if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { + dispatch_continuation_t dc; + + dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER); + if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) { // no registration callout if source is canceled rdar://problem/8955246 - return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER); + return _dispatch_source_handler_dispose(dc); } - pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); - if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { + if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) { dc->dc_ctxt = ds->do_ctxt; } - _dispatch_continuation_pop(dc); - dr->ds_handler[DS_REGISTN_HANDLER] = NULL; - _dispatch_reset_defaultpriority(old_dp); + _dispatch_continuation_pop(dc, NULL, flags, cq); } static void -_dispatch_source_cancel_callout(dispatch_source_t ds) +_dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq, + dispatch_invoke_flags_t flags) { - dispatch_source_refs_t dr = ds->ds_refs; - dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER]; - ds->ds_pending_data_mask = 0; + dispatch_continuation_t dc; + + dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER); ds->ds_pending_data = 0; ds->ds_data = 0; - _dispatch_source_handler_free(dr, DS_EVENT_HANDLER); - _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER); + _dispatch_source_handler_free(ds, DS_EVENT_HANDLER); + _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER); if (!dc) { return; } - if (!(ds->ds_atomic_flags & DSF_CANCELED)) { - return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER); + if (!(ds->dq_atomic_flags & DSF_CANCELED)) { + return _dispatch_source_handler_dispose(dc); } - pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); - if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { + if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) { dc->dc_ctxt = ds->do_ctxt; } - _dispatch_continuation_pop(dc); - dr->ds_handler[DS_CANCEL_HANDLER] = NULL; - _dispatch_reset_defaultpriority(old_dp); + _dispatch_continuation_pop(dc, NULL, flags, cq); } static void -_dispatch_source_latch_and_call(dispatch_source_t ds) +_dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq, + dispatch_invoke_flags_t flags) { - unsigned long prev; + dispatch_source_refs_t dr = ds->ds_refs; + dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER); + uint64_t prev; - if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { - return; + if (dr->du_is_timer && !(dr->du_fflags & DISPATCH_TIMER_AFTER)) { + prev = _dispatch_source_timer_data(ds, dr); + } else { + prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed); } - dispatch_source_refs_t dr = ds->ds_refs; - dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER]; - prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed); - if (ds->ds_is_level) { + if (dr->du_data_action == DISPATCH_UNOTE_ACTION_DATA_SET) { ds->ds_data = ~prev; - } else if (ds->ds_is_timer && ds_timer(dr).target && prev) { - ds->ds_data = _dispatch_source_timer_data(dr, prev); } else { ds->ds_data = prev; } - if (!dispatch_assume(prev) || !dc) { + if (!dispatch_assume(prev != 0) || !dc) { return; } - pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); - _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dc); - voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL; - _dispatch_continuation_voucher_adopt(dc); // consumes voucher reference - _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); - _dispatch_introspection_queue_item_complete(dc); - if (voucher) dc->dc_voucher = voucher; - _dispatch_reset_defaultpriority(old_dp); + _dispatch_continuation_pop(dc, NULL, flags, cq); + if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_AFTER)) { + _dispatch_source_handler_free(ds, DS_EVENT_HANDLER); + dispatch_release(ds); // dispatch_after sources are one-shot + } } +DISPATCH_NOINLINE static void -_dispatch_source_kevent_unregister(dispatch_source_t ds) +_dispatch_source_refs_finalize_unregistration(dispatch_source_t ds) +{ + dispatch_queue_flags_t dqf; + dispatch_source_refs_t dr = ds->ds_refs; + + dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq, + DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER); + if (dqf & DSF_CANCEL_WAITER) { + _dispatch_wake_by_address(&ds->dq_atomic_flags); + } + _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr); + _dispatch_release_tailcall(ds); // the retain is done at creation time +} + +void +_dispatch_source_refs_unregister(dispatch_source_t ds, uint32_t options) { _dispatch_object_debug(ds, "%s", __func__); - dispatch_kevent_t dk = ds->ds_dkev; - ds->ds_dkev = NULL; - switch (dk->dk_kevent.filter) { - case DISPATCH_EVFILT_TIMER: - _dispatch_timers_unregister(ds, dk); - break; - default: - TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list); - _dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask); - break; + dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + dispatch_source_refs_t dr = ds->ds_refs; + + if (dr->du_is_timer) { + // Because of the optimization to unregister fired oneshot timers + // from the target queue, we can't trust _dispatch_unote_registered() + // to tell the truth, it may not have happened yet + if (dqf & DSF_ARMED) { + _dispatch_timers_unregister(ds->ds_timer_refs); + _dispatch_release_2(ds); + } + dr->du_ident = DISPATCH_TIMER_IDENT_CANCELED; + } else { + if (_dispatch_unote_needs_rearm(dr) && !(dqf & DSF_ARMED)) { + options |= DU_UNREGISTER_IMMEDIATE_DELETE; + } + if (!_dispatch_unote_unregister(dr, options)) { + _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]", + ds, dr); + _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE); + return; // deferred unregistration + } } - (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); - ds->ds_needs_rearm = false; // re-arm is pointless and bad now - _dispatch_release(ds); // the retain is done at creation time + ds->ds_is_installed = true; + _dispatch_source_refs_finalize_unregistration(ds); } -static void -_dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags) +DISPATCH_ALWAYS_INLINE +static inline bool +_dispatch_source_tryarm(dispatch_source_t ds) { - switch (ds->ds_dkev->dk_kevent.filter) { - case DISPATCH_EVFILT_TIMER: - return _dispatch_timers_update(ds); - case EVFILT_MACHPORT: - if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) { - new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH + dispatch_queue_flags_t oqf, nqf; + return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, { + if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) { + // the test is inside the loop because it's convenient but the + // result should not change for the duration of the rmw_loop + os_atomic_rmw_loop_give_up(break); } - break; + nqf = oqf | DSF_ARMED; + }); +} + +DISPATCH_ALWAYS_INLINE +static inline bool +_dispatch_source_refs_resume(dispatch_source_t ds) +{ + dispatch_source_refs_t dr = ds->ds_refs; + if (dr->du_is_timer) { + _dispatch_timers_update(dr, 0); + return true; } - if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) { - _dispatch_source_kevent_unregister(ds); + if (unlikely(!_dispatch_source_tryarm(ds))) { + return false; } + _dispatch_unote_resume(dr); + _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dr); + return true; } -static void -_dispatch_source_kevent_register(dispatch_source_t ds) +void +_dispatch_source_refs_register(dispatch_source_t ds, dispatch_wlh_t wlh, + dispatch_priority_t pri) { - dispatch_assert_zero(ds->ds_is_installed); - switch (ds->ds_dkev->dk_kevent.filter) { - case DISPATCH_EVFILT_TIMER: - return _dispatch_timers_update(ds); - } - uint32_t flags; - bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags); - TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list); - if (do_resume || ds->ds_needs_rearm) { - _dispatch_source_kevent_resume(ds, flags); - } - (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); + dispatch_source_refs_t dr = ds->ds_refs; + dispatch_priority_t kbp; + + dispatch_assert(!ds->ds_is_installed); + + if (dr->du_is_timer) { + dispatch_queue_t dq = ds->_as_dq; + kbp = _dispatch_queue_compute_priority_and_wlh(dq, NULL); + // aggressively coalesce background/maintenance QoS timers + // + if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp))) { + if (dr->du_fflags & DISPATCH_TIMER_STRICT) { + _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer, ds); + } else { + dr->du_fflags |= DISPATCH_TIMER_BACKGROUND; + dr->du_ident = _dispatch_source_timer_idx(dr); + } + } + _dispatch_timers_update(dr, 0); + return; + } + + if (unlikely(!_dispatch_source_tryarm(ds) || + !_dispatch_unote_register(dr, wlh, pri))) { + // Do the parts of dispatch_source_refs_unregister() that + // are required after this partial initialization. + _dispatch_source_refs_finalize_unregistration(ds); + } else { + _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, dr); + } _dispatch_object_debug(ds, "%s", __func__); } +static void +_dispatch_source_set_event_handler_context(void *ctxt) +{ + dispatch_source_t ds = ctxt; + dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs); + + if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) { + dc->dc_ctxt = ds->do_ctxt; + } +} + +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_source_install(dispatch_source_t ds, dispatch_wlh_t wlh, + dispatch_priority_t pri) +{ + _dispatch_source_refs_register(ds, wlh, pri); + ds->ds_is_installed = true; +} + +void +_dispatch_source_finalize_activation(dispatch_source_t ds, bool *allow_resume) +{ + dispatch_continuation_t dc; + dispatch_source_refs_t dr = ds->ds_refs; + dispatch_priority_t pri; + dispatch_wlh_t wlh; + + if (unlikely(dr->du_is_direct && + (_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) { + return _dispatch_source_refs_unregister(ds, 0); + } + + dc = _dispatch_source_get_event_handler(dr); + if (dc) { + if (_dispatch_object_is_barrier(dc)) { + _dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT); + } + ds->dq_priority = _dispatch_priority_from_pp_strip_flags(dc->dc_priority); + if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) { + _dispatch_barrier_async_detached_f(ds->_as_dq, ds, + _dispatch_source_set_event_handler_context); + } + } + + // call "super" + _dispatch_queue_finalize_activation(ds->_as_dq, allow_resume); + + if (dr->du_is_direct && !ds->ds_is_installed) { + dispatch_queue_t dq = ds->_as_dq; + pri = _dispatch_queue_compute_priority_and_wlh(dq, &wlh); + if (pri) _dispatch_source_install(ds, wlh, pri); + } +} + DISPATCH_ALWAYS_INLINE -static inline dispatch_queue_t -_dispatch_source_invoke2(dispatch_object_t dou, - _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) +static inline dispatch_queue_wakeup_target_t +_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic, + dispatch_invoke_flags_t flags, uint64_t *owned) { dispatch_source_t ds = dou._ds; - if (slowpath(_dispatch_queue_drain(ds))) { - DISPATCH_CLIENT_CRASH("Sync onto source"); + dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE; + dispatch_queue_t dq = _dispatch_queue_get_current(); + dispatch_source_refs_t dr = ds->ds_refs; + dispatch_queue_flags_t dqf; + + if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN) && + _dispatch_unote_wlh_changed(dr, _dispatch_get_wlh())) { + dqf = _dispatch_queue_atomic_flags_set_orig(ds->_as_dq, + DSF_WLH_CHANGED); + if (!(dqf & DSF_WLH_CHANGED)) { + _dispatch_bug_deprecated("Changing target queue " + "hierarchy after source was activated"); + } + } + + if (_dispatch_queue_class_probe(ds)) { + // Intentionally always drain even when on the manager queue + // and not the source's regular target queue: we need to be able + // to drain timer setting and the like there. + dispatch_with_disabled_narrowing(dic, { + retq = _dispatch_queue_serial_drain(ds->_as_dq, dic, flags, owned); + }); } // This function performs all source actions. Each action is responsible @@ -578,510 +742,476 @@ _dispatch_source_invoke2(dispatch_object_t dou, // current queue is not the correct queue for this action, the correct queue // will be returned and the invoke will be re-driven on that queue. - // The order of tests here in invoke and in probe should be consistent. + // The order of tests here in invoke and in wakeup should be consistent. - dispatch_queue_t dq = _dispatch_queue_get_current(); - dispatch_source_refs_t dr = ds->ds_refs; + dispatch_queue_t dkq = &_dispatch_mgr_q; + bool prevent_starvation = false; - if (!ds->ds_is_installed) { - // The source needs to be installed on the manager queue. - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; - } - _dispatch_source_kevent_register(ds); - ds->ds_is_installed = true; - if (dr->ds_handler[DS_REGISTN_HANDLER]) { - return ds->do_targetq; + if (dr->du_is_direct) { + dkq = ds->do_targetq; + } + + if (dr->du_is_timer && + os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) { + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) { + // timer has to be configured on the kevent queue + if (dq != dkq) { + return dkq; + } + _dispatch_source_timer_configure(ds); } - if (slowpath(ds->do_xref_cnt == -1)) { - return &_dispatch_mgr_q; // rdar://problem/9558246 + } + + if (!ds->ds_is_installed) { + // The source needs to be installed on the kevent queue. + if (dq != dkq) { + return dkq; } - } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) { + _dispatch_source_install(ds, _dispatch_get_wlh(), + _dispatch_get_basepri()); + } + + if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) { // Source suspended by an item drained from the source queue. - return NULL; - } else if (dr->ds_handler[DS_REGISTN_HANDLER]) { + return ds->do_targetq; + } + + if (_dispatch_source_get_registration_handler(dr)) { // The source has been registered and the registration handler needs // to be delivered on the target queue. if (dq != ds->do_targetq) { return ds->do_targetq; } // clears ds_registration_handler - _dispatch_source_registration_callout(ds); - if (slowpath(ds->do_xref_cnt == -1)) { - return &_dispatch_mgr_q; // rdar://problem/9558246 + _dispatch_source_registration_callout(ds, dq, flags); + } + + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if ((dqf & DSF_DEFERRED_DELETE) && !(dqf & DSF_ARMED)) { +unregister_event: + // DSF_DELETE: Pending source kevent unregistration has been completed + // !DSF_ARMED: event was delivered and can safely be unregistered + if (dq != dkq) { + return dkq; + } + _dispatch_source_refs_unregister(ds, DU_UNREGISTER_IMMEDIATE_DELETE); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + } + + if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && + os_atomic_load2o(ds, ds_pending_data, relaxed)) { + // The source has pending data to deliver via the event handler callback + // on the target queue. Some sources need to be rearmed on the kevent + // queue after event delivery. + if (dq == ds->do_targetq) { + _dispatch_source_latch_and_call(ds, dq, flags); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + + // starvation avoidance: if the source triggers itself then force a + // re-queue to give other things already queued on the target queue + // a chance to run. + // + // however, if the source is directly targeting an overcommit root + // queue, this would requeue the source and ask for a new overcommit + // thread right away. + prevent_starvation = dq->do_targetq || + !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT); + if (prevent_starvation && + os_atomic_load2o(ds, ds_pending_data, relaxed)) { + retq = ds->do_targetq; + } + } else { + // there is no point trying to be eager, the next thing to do is + // to deliver the event + return ds->do_targetq; } - } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ + } + + if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) { // The source has been cancelled and needs to be uninstalled from the - // manager queue. After uninstallation, the cancellation handler needs + // kevent queue. After uninstallation, the cancellation handler needs // to be delivered to the target queue. - if (ds->ds_dkev) { - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; + if (!(dqf & DSF_DELETED)) { + if (dr->du_is_timer && !(dqf & DSF_ARMED)) { + // timers can cheat if not armed because there's nothing left + // to do on the manager queue and unregistration can happen + // on the regular target queue + } else if (dq != dkq) { + return dkq; } - _dispatch_source_kevent_unregister(ds); - } - if (dr->ds_handler[DS_EVENT_HANDLER] || - dr->ds_handler[DS_CANCEL_HANDLER] || - dr->ds_handler[DS_REGISTN_HANDLER]) { - if (dq != ds->do_targetq) { - return ds->do_targetq; + _dispatch_source_refs_unregister(ds, 0); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if (unlikely(dqf & DSF_DEFERRED_DELETE)) { + if (!(dqf & DSF_ARMED)) { + goto unregister_event; + } + // we need to wait for the EV_DELETE + return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT; } } - _dispatch_source_cancel_callout(ds); - } else if (ds->ds_pending_data) { - // The source has pending data to deliver via the event handler callback - // on the target queue. Some sources need to be rearmed on the manager - // queue after event delivery. - if (dq != ds->do_targetq) { + if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) || + _dispatch_source_get_cancel_handler(dr) || + _dispatch_source_get_registration_handler(dr))) { + retq = ds->do_targetq; + } else { + _dispatch_source_cancel_callout(ds, dq, flags); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + } + prevent_starvation = false; + } + + if (_dispatch_unote_needs_rearm(dr) && + !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) { + // The source needs to be rearmed on the kevent queue. + if (dq != dkq) { + return dkq; + } + if (unlikely(dqf & DSF_DEFERRED_DELETE)) { + // no need for resume when we can directly unregister the kevent + goto unregister_event; + } + if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) { + // do not try to rearm the kevent if the source is suspended + // from the source handler + return ds->do_targetq; + } + if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) { + // keep the old behavior to force re-enqueue to our target queue + // for the rearm. + // + // if the handler didn't run, or this is a pending delete + // or our target queue is a global queue, then starvation is + // not a concern and we can rearm right away. return ds->do_targetq; } - _dispatch_source_latch_and_call(ds); - if (ds->ds_needs_rearm) { - return &_dispatch_mgr_q; + if (unlikely(!_dispatch_source_refs_resume(ds))) { + goto unregister_event; } - } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { - // The source needs to be rearmed on the manager queue. - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; + if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) { + // try to redrive the drain from under the lock for sources + // targeting an overcommit root queue to avoid parking + // when the next event has already fired + _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE); } - _dispatch_source_kevent_resume(ds, 0); - (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); } - return NULL; + return retq; } DISPATCH_NOINLINE void -_dispatch_source_invoke(dispatch_source_t ds) +_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic, + dispatch_invoke_flags_t flags) { - _dispatch_queue_class_invoke(ds, _dispatch_source_invoke2); + _dispatch_queue_class_invoke(ds, dic, flags, + DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2); } -unsigned long -_dispatch_source_probe(dispatch_source_t ds) +void +_dispatch_source_wakeup(dispatch_source_t ds, dispatch_qos_t qos, + dispatch_wakeup_flags_t flags) { // This function determines whether the source needs to be invoked. - // The order of tests here in probe and in invoke should be consistent. + // The order of tests here in wakeup and in invoke should be consistent. dispatch_source_refs_t dr = ds->ds_refs; - if (!ds->ds_is_installed) { - // The source needs to be installed on the manager queue. - return true; - } else if (dr->ds_handler[DS_REGISTN_HANDLER]) { + dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR; + dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE; + dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + bool deferred_delete = (dqf & DSF_DEFERRED_DELETE); + + if (dr->du_is_direct) { + dkq = DISPATCH_QUEUE_WAKEUP_TARGET; + } + + if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && dr->du_is_timer && + os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) { + // timer has to be configured on the kevent queue + tq = dkq; + } else if (!ds->ds_is_installed) { + // The source needs to be installed on the kevent queue. + tq = dkq; + } else if (_dispatch_source_get_registration_handler(dr)) { // The registration handler needs to be delivered to the target queue. - return true; - } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ - // The source needs to be uninstalled from the manager queue, or the + tq = DISPATCH_QUEUE_WAKEUP_TARGET; + } else if (deferred_delete && !(dqf & DSF_ARMED)) { + // Pending source kevent unregistration has been completed + // or EV_ONESHOT event can be acknowledged + tq = dkq; + } else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && + os_atomic_load2o(ds, ds_pending_data, relaxed)) { + // The source has pending data to deliver to the target queue. + tq = DISPATCH_QUEUE_WAKEUP_TARGET; + } else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) { + // The source needs to be uninstalled from the kevent queue, or the // cancellation handler needs to be delivered to the target queue. // Note: cancellation assumes installation. - if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] || - dr->ds_handler[DS_CANCEL_HANDLER] || - dr->ds_handler[DS_REGISTN_HANDLER]) { - return true; + if (!(dqf & DSF_DELETED)) { + if (dr->du_is_timer && !(dqf & DSF_ARMED)) { + // timers can cheat if not armed because there's nothing left + // to do on the manager queue and unregistration can happen + // on the regular target queue + tq = DISPATCH_QUEUE_WAKEUP_TARGET; + } else { + tq = dkq; + } + } else if (_dispatch_source_get_event_handler(dr) || + _dispatch_source_get_cancel_handler(dr) || + _dispatch_source_get_registration_handler(dr)) { + tq = DISPATCH_QUEUE_WAKEUP_TARGET; } - } else if (ds->ds_pending_data) { - // The source has pending data to deliver to the target queue. - return true; - } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { - // The source needs to be rearmed on the manager queue. - return true; + } else if (_dispatch_unote_needs_rearm(dr) && + !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) { + // The source needs to be rearmed on the kevent queue. + tq = dkq; } - return _dispatch_queue_class_probe(ds); -} - -static void -_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke) -{ - if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { - return; + if (!tq && _dispatch_queue_class_probe(ds)) { + tq = DISPATCH_QUEUE_WAKEUP_TARGET; } - if (ds->ds_is_level) { - // ke->data is signed and "negative available data" makes no sense - // zero bytes happens when EV_EOF is set - // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file - dispatch_assert(ke->data >= 0l); - dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data, - relaxed); - } else if (ds->ds_is_adder) { - (void)dispatch_atomic_add2o(ds, ds_pending_data, - (unsigned long)ke->data, relaxed); - } else if (ke->fflags & ds->ds_pending_data_mask) { - (void)dispatch_atomic_or2o(ds, ds_pending_data, - ke->fflags & ds->ds_pending_data_mask, relaxed); - } - // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery - if (ds->ds_needs_rearm) { - (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); - } - - _dispatch_wakeup(ds); -} - -#pragma mark - -#pragma mark dispatch_kevent_t - -#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD -static void _dispatch_kevent_guard(dispatch_kevent_t dk); -static void _dispatch_kevent_unguard(dispatch_kevent_t dk); -#else -static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; } -static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; } -#endif - -static struct dispatch_kevent_s _dispatch_kevent_data_or = { - .dk_kevent = { - .filter = DISPATCH_EVFILT_CUSTOM_OR, - .flags = EV_CLEAR, - }, - .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources), -}; -static struct dispatch_kevent_s _dispatch_kevent_data_add = { - .dk_kevent = { - .filter = DISPATCH_EVFILT_CUSTOM_ADD, - }, - .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources), -}; - -#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1)) - -DISPATCH_CACHELINE_ALIGN -static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE]; - -static void -_dispatch_kevent_init() -{ - unsigned int i; - for (i = 0; i < DSL_HASH_SIZE; i++) { - TAILQ_INIT(&_dispatch_sources[i]); - } - - TAILQ_INSERT_TAIL(&_dispatch_sources[0], - &_dispatch_kevent_data_or, dk_list); - TAILQ_INSERT_TAIL(&_dispatch_sources[0], - &_dispatch_kevent_data_add, dk_list); - _dispatch_kevent_data_or.dk_kevent.udata = - (uintptr_t)&_dispatch_kevent_data_or; - _dispatch_kevent_data_add.dk_kevent.udata = - (uintptr_t)&_dispatch_kevent_data_add; -} - -static inline uintptr_t -_dispatch_kevent_hash(uint64_t ident, short filter) -{ - uint64_t value; -#if HAVE_MACH - value = (filter == EVFILT_MACHPORT || - filter == DISPATCH_EVFILT_MACH_NOTIFICATION ? - MACH_PORT_INDEX(ident) : ident); -#else - value = ident; -#endif - return DSL_HASH((uintptr_t)value); -} - -static dispatch_kevent_t -_dispatch_kevent_find(uint64_t ident, short filter) -{ - uintptr_t hash = _dispatch_kevent_hash(ident, filter); - dispatch_kevent_t dki; - TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) { - if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) { - break; - } + if ((tq == DISPATCH_QUEUE_WAKEUP_TARGET) && + ds->do_targetq == &_dispatch_mgr_q) { + tq = DISPATCH_QUEUE_WAKEUP_MGR; } - return dki; -} -static void -_dispatch_kevent_insert(dispatch_kevent_t dk) -{ - _dispatch_kevent_guard(dk); - uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident, - dk->dk_kevent.filter); - TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list); + return _dispatch_queue_class_wakeup(ds->_as_dq, qos, flags, tq); } -// Find existing kevents, and merge any new flags if necessary -static bool -_dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp) +void +dispatch_source_cancel(dispatch_source_t ds) { - dispatch_kevent_t dk, ds_dkev = *dkp; - uint32_t new_flags; - bool do_resume = false; - - dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident, - ds_dkev->dk_kevent.filter); - if (dk) { - // If an existing dispatch kevent is found, check to see if new flags - // need to be added to the existing kevent - new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags; - dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags; - free(ds_dkev); - *dkp = dk; - do_resume = new_flags; - } else { - dk = ds_dkev; - _dispatch_kevent_insert(dk); - new_flags = dk->dk_kevent.fflags; - do_resume = true; - } - // Re-register the kevent with the kernel if new flags were added - // by the dispatch kevent - if (do_resume) { - dk->dk_kevent.flags |= EV_ADD; - } - *flgp = new_flags; - return do_resume; -} + _dispatch_object_debug(ds, "%s", __func__); + // Right after we set the cancel flag, someone else + // could potentially invoke the source, do the cancellation, + // unregister the source, and deallocate it. We would + // need to therefore retain/release before setting the bit + _dispatch_retain_2(ds); -static bool -_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, - uint32_t del_flags) -{ - long r; - switch (dk->dk_kevent.filter) { - case DISPATCH_EVFILT_TIMER: - case DISPATCH_EVFILT_CUSTOM_ADD: - case DISPATCH_EVFILT_CUSTOM_OR: - // these types not registered with kevent - return 0; -#if HAVE_MACH - case EVFILT_MACHPORT: - return _dispatch_kevent_machport_resume(dk, new_flags, del_flags); - case DISPATCH_EVFILT_MACH_NOTIFICATION: - return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags); -#endif - case EVFILT_PROC: - if (dk->dk_kevent.flags & EV_ONESHOT) { - return 0; - } - // fall through - default: - r = _dispatch_kq_update(&dk->dk_kevent); - if (dk->dk_kevent.flags & EV_DISPATCH) { - dk->dk_kevent.flags &= ~EV_ADD; - } - return r; + dispatch_queue_t q = ds->_as_dq; + if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) { + _dispatch_release_2_tailcall(ds); + } else { + dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2); } } -static void -_dispatch_kevent_dispose(dispatch_kevent_t dk) +void +dispatch_source_cancel_and_wait(dispatch_source_t ds) { - uintptr_t hash; + dispatch_queue_flags_t old_dqf, dqf, new_dqf; + dispatch_source_refs_t dr = ds->ds_refs; - switch (dk->dk_kevent.filter) { - case DISPATCH_EVFILT_TIMER: - case DISPATCH_EVFILT_CUSTOM_ADD: - case DISPATCH_EVFILT_CUSTOM_OR: - // these sources live on statically allocated lists - return; -#if HAVE_MACH - case EVFILT_MACHPORT: - _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags); - break; - case DISPATCH_EVFILT_MACH_NOTIFICATION: - _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags); - break; -#endif - case EVFILT_PROC: - if (dk->dk_kevent.flags & EV_ONESHOT) { - break; // implicitly deleted - } - // fall through - default: - if (~dk->dk_kevent.flags & EV_DELETE) { - dk->dk_kevent.flags |= EV_DELETE; - dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE); - _dispatch_kq_update(&dk->dk_kevent); - } - break; + if (unlikely(_dispatch_source_get_cancel_handler(dr))) { + DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler"); } - hash = _dispatch_kevent_hash(dk->dk_kevent.ident, - dk->dk_kevent.filter); - TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list); - _dispatch_kevent_unguard(dk); - free(dk); -} - -static void -_dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg) -{ - dispatch_source_refs_t dri; - uint32_t del_flags, fflags = 0; - - if (TAILQ_EMPTY(&dk->dk_sources)) { - _dispatch_kevent_dispose(dk); - } else { - TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { - dispatch_source_t dsi = _dispatch_source_from_refs(dri); - uint32_t mask = (uint32_t)dsi->ds_pending_data_mask; - fflags |= mask; + _dispatch_object_debug(ds, "%s", __func__); + os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, { + new_dqf = old_dqf | DSF_CANCELED; + if (old_dqf & DSF_CANCEL_WAITER) { + os_atomic_rmw_loop_give_up(break); } - del_flags = flg & ~fflags; - if (del_flags) { - dk->dk_kevent.flags |= EV_ADD; - dk->dk_kevent.fflags = fflags; - _dispatch_kevent_resume(dk, 0, del_flags); + if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) { + // just add DSF_CANCELED + } else if ((old_dqf & DSF_DEFERRED_DELETE) || !dr->du_is_direct) { + new_dqf |= DSF_CANCEL_WAITER; } - } -} - -DISPATCH_NOINLINE -static void -_dispatch_kevent_proc_exit(struct kevent64_s *ke) -{ - // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie - // . As a workaround, we simulate an exit event for - // any EVFILT_PROC with an invalid pid . - struct kevent64_s fake; - fake = *ke; - fake.flags &= ~EV_ERROR; - fake.fflags = NOTE_EXIT; - fake.data = 0; - _dispatch_kevent_drain(&fake); -} + }); + dqf = new_dqf; -DISPATCH_NOINLINE -static void -_dispatch_kevent_error(struct kevent64_s *ke) -{ - _dispatch_kevent_debug(ke, __func__); - if (ke->data) { - // log the unexpected error - _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter), - ke->flags & EV_DELETE ? "delete" : - ke->flags & EV_ADD ? "add" : - ke->flags & EV_ENABLE ? "enable" : "monitor", - (int)ke->data); + if (old_dqf & DQF_RELEASED) { + DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release"); } -} - -static void -_dispatch_kevent_drain(struct kevent64_s *ke) -{ -#if DISPATCH_DEBUG - static dispatch_once_t pred; - dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger); -#endif - if (ke->filter == EVFILT_USER) { + if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) { return; } - if (slowpath(ke->flags & EV_ERROR)) { - if (ke->filter == EVFILT_PROC) { - if (ke->flags & EV_DELETE) { - // Process exited while monitored - return; - } else if (ke->data == ESRCH) { - return _dispatch_kevent_proc_exit(ke); - } - } - return _dispatch_kevent_error(ke); - } - _dispatch_kevent_debug(ke, __func__); - if (ke->filter == EVFILT_TIMER) { - return _dispatch_timers_kevent(ke); + if (dqf & DSF_CANCEL_WAITER) { + goto wakeup; } -#if HAVE_MACH - if (ke->filter == EVFILT_MACHPORT) { - return _dispatch_kevent_mach_portset(ke); - } -#endif - return _dispatch_kevent_merge(ke); -} -DISPATCH_NOINLINE -static void -_dispatch_kevent_merge(struct kevent64_s *ke) -{ - dispatch_kevent_t dk; - dispatch_source_refs_t dri; - - dk = (void*)ke->udata; - dispatch_assert(dk); - - if (ke->flags & EV_ONESHOT) { - dk->dk_kevent.flags |= EV_ONESHOT; - } - TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { - _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke); - } -} + // simplified version of _dispatch_queue_drain_try_lock + // that also sets the DIRTY bit on failure to lock + uint64_t set_owner_and_set_full_width = _dispatch_lock_value_for_self() | + DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER; + uint64_t old_state, new_state; -#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD -static void -_dispatch_kevent_guard(dispatch_kevent_t dk) -{ - guardid_t guard; - const unsigned int guard_flags = GUARD_CLOSE; - int r, fd_flags = 0; - switch (dk->dk_kevent.filter) { - case EVFILT_READ: - case EVFILT_WRITE: - case EVFILT_VNODE: - guard = &dk->dk_kevent; - r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0, - &guard, guard_flags, &fd_flags); - if (slowpath(r == -1)) { - int err = errno; - if (err != EPERM) { - (void)dispatch_assume_zero(err); + os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, { + new_state = old_state; + if (likely(_dq_state_is_runnable(old_state) && + !_dq_state_drain_locked(old_state))) { + new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK; + new_state |= set_owner_and_set_full_width; + } else if (old_dqf & DSF_CANCELED) { + os_atomic_rmw_loop_give_up(break); + } else { + // this case needs a release barrier, hence the seq_cst above + new_state |= DISPATCH_QUEUE_DIRTY; + } + }); + + if (unlikely(_dq_state_is_suspended(old_state))) { + if (unlikely(_dq_state_suspend_cnt(old_state))) { + DISPATCH_CLIENT_CRASH(ds, "Source is suspended"); + } + // inactive sources have never been registered and there is no need + // to wait here because activation will notice and mark the source + // as deleted without ever trying to use the fd or mach port. + return dispatch_activate(ds); + } + + if (likely(_dq_state_is_runnable(old_state) && + !_dq_state_drain_locked(old_state))) { + // same thing _dispatch_source_invoke2() does when handling cancellation + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) { + _dispatch_source_refs_unregister(ds, 0); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) { + _dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE); } - return; } - dk->dk_kevent.ext[0] = guard_flags; - dk->dk_kevent.ext[1] = fd_flags; - break; + dx_wakeup(ds, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE); + } else if (unlikely(_dq_state_drain_locked_by_self(old_state))) { + DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait " + "called from a source handler"); + } else { + dispatch_qos_t qos; +wakeup: + qos = _dispatch_qos_from_pp(_dispatch_get_priority()); + dx_wakeup(ds, qos, DISPATCH_WAKEUP_MAKE_DIRTY); + dispatch_activate(ds); + } + + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) { + if (unlikely(!(dqf & DSF_CANCEL_WAITER))) { + if (!os_atomic_cmpxchgv2o(ds, dq_atomic_flags, + dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) { + continue; + } + dqf |= DSF_CANCEL_WAITER; + } + _dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE); + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); } } -static void -_dispatch_kevent_unguard(dispatch_kevent_t dk) -{ - guardid_t guard; - unsigned int guard_flags; - int r, fd_flags; - switch (dk->dk_kevent.filter) { - case EVFILT_READ: - case EVFILT_WRITE: - case EVFILT_VNODE: - guard_flags = (unsigned int)dk->dk_kevent.ext[0]; - if (!guard_flags) { - return; - } - guard = &dk->dk_kevent; - fd_flags = (int)dk->dk_kevent.ext[1]; - r = change_fdguard_np((int)dk->dk_kevent.ident, &guard, - guard_flags, NULL, 0, &fd_flags); - if (slowpath(r == -1)) { - (void)dispatch_assume_zero(errno); - return; - } - dk->dk_kevent.ext[0] = 0; - break; +void +_dispatch_source_merge_evt(dispatch_unote_t du, uint32_t flags, uintptr_t data, + uintptr_t status, pthread_priority_t pp) +{ + dispatch_source_refs_t dr = du._dr; + dispatch_source_t ds = _dispatch_source_from_refs(dr); + dispatch_wakeup_flags_t wflags = 0; + dispatch_queue_flags_t dqf; + + if (_dispatch_unote_needs_rearm(dr) || (flags & (EV_DELETE | EV_ONESHOT))) { + // once we modify the queue atomic flags below, it will allow concurrent + // threads running _dispatch_source_invoke2 to dispose of the source, + // so we can't safely borrow the reference we get from the muxnote udata + // anymore, and need our own + wflags = DISPATCH_WAKEUP_CONSUME_2; + _dispatch_retain_2(ds); // rdar://20382435 + } + + if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) && + !(flags & EV_DELETE)) { + dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq, + DSF_DEFERRED_DELETE, DSF_ARMED); + if (flags & EV_VANISHED) { + _dispatch_bug_kevent_client("kevent", dr->du_type->dst_kind, + "monitored resource vanished before the source " + "cancel handler was invoked", 0); + } + _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds, + (flags & EV_VANISHED) ? "vanished" : + "deferred delete oneshot", dr); + } else if (flags & (EV_DELETE | EV_ONESHOT)) { + _dispatch_source_refs_unregister(ds, DU_UNREGISTER_ALREADY_DELETED); + _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds, dr); + if (flags & EV_DELETE) goto done; + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + } else if (_dispatch_unote_needs_rearm(dr)) { + dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED); + _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr); + } else { + dqf = _dispatch_queue_atomic_flags(ds->_as_dq); + } + + if (dqf & (DSF_CANCELED | DQF_RELEASED)) { + goto done; // rdar://20204025 } + + dispatch_unote_action_t action = dr->du_data_action; + if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) && + (flags & EV_VANISHED)) { + // if the resource behind the ident vanished, the event handler can't + // do anything useful anymore, so do not try to call it at all + // + // Note: if the kernel doesn't support EV_VANISHED we always get it + // back unchanged from the flags passed at EV_ADD (registration) time + // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources, + // if we get both bits it was a real EV_VANISHED delivery + os_atomic_store2o(ds, ds_pending_data, 0, relaxed); +#if HAVE_MACH + } else if (dr->du_filter == EVFILT_MACHPORT) { + os_atomic_store2o(ds, ds_pending_data, data, relaxed); +#endif + } else if (action == DISPATCH_UNOTE_ACTION_DATA_SET) { + os_atomic_store2o(ds, ds_pending_data, data, relaxed); + } else if (action == DISPATCH_UNOTE_ACTION_DATA_ADD) { + os_atomic_add2o(ds, ds_pending_data, data, relaxed); + } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR) { + os_atomic_or2o(ds, ds_pending_data, data, relaxed); + } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) { + // We combine the data and status into a single 64-bit value. + uint64_t odata, ndata; + uint64_t value = DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data, status); + os_atomic_rmw_loop2o(ds, ds_pending_data, odata, ndata, relaxed, { + ndata = DISPATCH_SOURCE_GET_DATA(odata) | value; + }); + } else if (data) { + DISPATCH_INTERNAL_CRASH(action, "Unexpected source action value"); + } + _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds, dr); + +done: + _dispatch_object_debug(ds, "%s", __func__); + dx_wakeup(ds, _dispatch_qos_from_pp(pp), wflags | DISPATCH_WAKEUP_MAKE_DIRTY); } -#endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD #pragma mark - #pragma mark dispatch_source_timer #if DISPATCH_USE_DTRACE -static dispatch_source_refs_t +static dispatch_timer_source_refs_t _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT]; #define _dispatch_trace_next_timer_set(x, q) \ _dispatch_trace_next_timer[(q)] = (x) #define _dispatch_trace_next_timer_program(d, q) \ _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d)) -#define _dispatch_trace_next_timer_wake(q) \ - _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)]) +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_mgr_trace_timers_wakes(void) +{ + uint32_t qos; + + if (_dispatch_timers_will_wake) { + if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) { + for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) { + if (_dispatch_timers_will_wake & (1 << qos)) { + _dispatch_trace_timer_wake(_dispatch_trace_next_timer[qos]); + } + } + } + _dispatch_timers_will_wake = 0; + } +} #else #define _dispatch_trace_next_timer_set(x, q) #define _dispatch_trace_next_timer_program(d, q) -#define _dispatch_trace_next_timer_wake(q) +#define _dispatch_mgr_trace_timers_wakes() #endif #define _dispatch_source_timer_telemetry_enabled() false @@ -1089,117 +1219,59 @@ static dispatch_source_refs_t DISPATCH_NOINLINE static void _dispatch_source_timer_telemetry_slow(dispatch_source_t ds, - uintptr_t ident, struct dispatch_timer_source_s *values) + dispatch_clock_t clock, struct dispatch_timer_source_s *values) { if (_dispatch_trace_timer_configure_enabled()) { - _dispatch_trace_timer_configure(ds, ident, values); + _dispatch_trace_timer_configure(ds, clock, values); } } DISPATCH_ALWAYS_INLINE static inline void -_dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident, +_dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock, struct dispatch_timer_source_s *values) { if (_dispatch_trace_timer_configure_enabled() || _dispatch_source_timer_telemetry_enabled()) { - _dispatch_source_timer_telemetry_slow(ds, ident, values); + _dispatch_source_timer_telemetry_slow(ds, clock, values); asm(""); // prevent tailcall } } -// approx 1 year (60s * 60m * 24h * 365d) -#define FOREVER_NSEC 31536000000000000ull - -DISPATCH_ALWAYS_INLINE -static inline uint64_t -_dispatch_source_timer_now(uint64_t nows[], unsigned int tidx) -{ - unsigned int tk = DISPATCH_TIMER_KIND(tidx); - if (nows && fastpath(nows[tk])) { - return nows[tk]; - } - uint64_t now; - switch (tk) { - case DISPATCH_TIMER_KIND_MACH: - now = _dispatch_absolute_time(); - break; - case DISPATCH_TIMER_KIND_WALL: - now = _dispatch_get_nanoseconds(); - break; - } - if (nows) { - nows[tk] = now; - } - return now; -} - -static inline unsigned long -_dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev) -{ - // calculate the number of intervals since last fire - unsigned long data, missed; - uint64_t now; - now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr)); - missed = (unsigned long)((now - ds_timer(dr).last_fire) / - ds_timer(dr).interval); - // correct for missed intervals already delivered last time - data = prev - ds_timer(dr).missed + missed; - ds_timer(dr).missed = missed; - return data; -} - -struct dispatch_set_timer_params { - dispatch_source_t ds; - uintptr_t ident; - struct dispatch_timer_source_s values; -}; - +DISPATCH_NOINLINE static void -_dispatch_source_set_timer3(void *context) +_dispatch_source_timer_configure(dispatch_source_t ds) { - // Called on the _dispatch_mgr_q - struct dispatch_set_timer_params *params = context; - dispatch_source_t ds = params->ds; - ds->ds_ident_hack = params->ident; - ds_timer(ds->ds_refs) = params->values; - // Clear any pending data that might have accumulated on - // older timer params - ds->ds_pending_data = 0; - // Re-arm in case we got disarmed because of pending set_timer suspension - (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release); - dispatch_resume(ds); - // Must happen after resume to avoid getting disarmed due to suspension - _dispatch_timers_update(ds); - dispatch_release(ds); - if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) { - _dispatch_mach_host_calendar_change_register(); - } - free(params); -} + dispatch_timer_source_refs_t dt = ds->ds_timer_refs; + dispatch_timer_config_t dtc; -static void -_dispatch_source_set_timer2(void *context) -{ - // Called on the source queue - struct dispatch_set_timer_params *params = context; - dispatch_suspend(params->ds); - _dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params, - _dispatch_source_set_timer3); + dtc = os_atomic_xchg2o(dt, dt_pending_config, NULL, dependency); + if (dtc->dtc_clock == DISPATCH_CLOCK_MACH) { + dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH; + } else { + dt->du_fflags &= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH; + } + dt->dt_timer = dtc->dtc_timer; + free(dtc); + if (ds->ds_is_installed) { + // Clear any pending data that might have accumulated on + // older timer params + os_atomic_store2o(ds, ds_pending_data, 0, relaxed); + _dispatch_timers_update(dt, 0); + } } -DISPATCH_NOINLINE -static struct dispatch_set_timer_params * -_dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start, +static dispatch_timer_config_t +_dispatch_source_timer_config_create(dispatch_time_t start, uint64_t interval, uint64_t leeway) { - struct dispatch_set_timer_params *params; - params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params)); - params->ds = ds; - params->values.flags = ds_timer(ds->ds_refs).flags; - - if (interval == 0) { - // we use zero internally to mean disabled + dispatch_timer_config_t dtc; + dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s)); + if (unlikely(interval == 0)) { + if (start != DISPATCH_TIME_FOREVER) { + _dispatch_bug_deprecated("Setting timer interval to 0 requests " + "a 1ns timer, did you mean FOREVER (a one-shot timer)?"); + } interval = 1; } else if ((int64_t)interval < 0) { // 6866347 - make sure nanoseconds won't overflow @@ -1217,7 +1289,7 @@ _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start, if ((int64_t)start < 0) { // wall clock start = (dispatch_time_t)-((int64_t)start); - params->values.flags |= DISPATCH_TIMER_WALL_CLOCK; + dtc->dtc_clock = DISPATCH_CLOCK_WALL; } else { // absolute clock interval = _dispatch_time_nano2mach(interval); @@ -1229,64 +1301,50 @@ _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start, interval = 1; } leeway = _dispatch_time_nano2mach(leeway); - params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK; - } - params->ident = DISPATCH_TIMER_IDENT(params->values.flags); - params->values.target = start; - params->values.deadline = (start < UINT64_MAX - leeway) ? - start + leeway : UINT64_MAX; - params->values.interval = interval; - params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ? - leeway : interval / 2; - return params; -} - -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, - uint64_t interval, uint64_t leeway, bool source_sync) -{ - if (slowpath(!ds->ds_is_timer) || - slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) { - DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source"); + dtc->dtc_clock = DISPATCH_CLOCK_MACH; + } + if (interval < INT64_MAX && leeway > interval / 2) { + leeway = interval / 2; } - struct dispatch_set_timer_params *params; - params = _dispatch_source_timer_params(ds, start, interval, leeway); - - _dispatch_source_timer_telemetry(ds, params->ident, ¶ms->values); - // Suspend the source so that it doesn't fire with pending changes - // The use of suspend/resume requires the external retain/release - dispatch_retain(ds); - if (source_sync) { - return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params, - _dispatch_source_set_timer2); + dtc->dtc_timer.target = start; + dtc->dtc_timer.interval = interval; + if (start + leeway < INT64_MAX) { + dtc->dtc_timer.deadline = start + leeway; } else { - return _dispatch_source_set_timer2(params); + dtc->dtc_timer.deadline = INT64_MAX; } + return dtc; } +DISPATCH_NOINLINE void dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, uint64_t interval, uint64_t leeway) { - _dispatch_source_set_timer(ds, start, interval, leeway, true); -} + dispatch_timer_source_refs_t dt = ds->ds_timer_refs; + dispatch_timer_config_t dtc; -void -_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds, - dispatch_time_t start, uint64_t interval, uint64_t leeway) -{ - // Don't serialize through the source queue for CF timers - _dispatch_source_set_timer(ds, start, interval, leeway, false); + if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) { + DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source"); + } + + dtc = _dispatch_source_timer_config_create(start, interval, leeway); + _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer); + dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release); + if (dtc) free(dtc); + dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY); } -void +static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval) { - dispatch_source_refs_t dr = ds->ds_refs; - #define NSEC_PER_FRAME (NSEC_PER_SEC/60) - const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION; +#define NSEC_PER_FRAME (NSEC_PER_SEC/60) +// approx 1 year (60s * 60m * 24h * 365d) +#define FOREVER_NSEC 31536000000000000ull + + dispatch_timer_source_refs_t dr = ds->ds_timer_refs; + const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION; if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME : FOREVER_NSEC/NSEC_PER_MSEC))) { interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC; @@ -1295,3402 +1353,1215 @@ _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval) } interval = _dispatch_time_nano2mach(interval); uint64_t target = _dispatch_absolute_time() + interval; - target = (target / interval) * interval; + target -= (target % interval); const uint64_t leeway = animation ? _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2; - ds_timer(dr).target = target; - ds_timer(dr).deadline = target + leeway; - ds_timer(dr).interval = interval; - ds_timer(dr).leeway = leeway; - _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr)); + dr->dt_timer.target = target; + dr->dt_timer.deadline = target + leeway; + dr->dt_timer.interval = interval; + _dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer); } #pragma mark - -#pragma mark dispatch_timers +#pragma mark dispatch_after -#define DISPATCH_TIMER_STRUCT(refs) \ - uint64_t target, deadline; \ - TAILQ_HEAD(, refs) dt_sources - -typedef struct dispatch_timer_s { - DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s); -} *dispatch_timer_t; - -#define DISPATCH_TIMER_INITIALIZER(tidx) \ - [tidx] = { \ - .target = UINT64_MAX, \ - .deadline = UINT64_MAX, \ - .dt_sources = TAILQ_HEAD_INITIALIZER( \ - _dispatch_timer[tidx].dt_sources), \ - } -#define DISPATCH_TIMER_INIT(kind, qos) \ - DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ - DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) - -struct dispatch_timer_s _dispatch_timer[] = { - DISPATCH_TIMER_INIT(WALL, NORMAL), - DISPATCH_TIMER_INIT(WALL, CRITICAL), - DISPATCH_TIMER_INIT(WALL, BACKGROUND), - DISPATCH_TIMER_INIT(MACH, NORMAL), - DISPATCH_TIMER_INIT(MACH, CRITICAL), - DISPATCH_TIMER_INIT(MACH, BACKGROUND), -}; -#define DISPATCH_TIMER_COUNT \ - ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0]))) - -#define DISPATCH_KEVENT_TIMER_UDATA(tidx) \ - (uintptr_t)&_dispatch_kevent_timer[tidx] -#ifdef __LP64__ -#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ - .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx) -#else // __LP64__ -// dynamic initialization in _dispatch_timers_init() -#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ - .udata = 0 -#endif // __LP64__ -#define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \ - [tidx] = { \ - .dk_kevent = { \ - .ident = tidx, \ - .filter = DISPATCH_EVFILT_TIMER, \ - DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \ - }, \ - .dk_sources = TAILQ_HEAD_INITIALIZER( \ - _dispatch_kevent_timer[tidx].dk_sources), \ - } -#define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \ - DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ - DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) - -struct dispatch_kevent_s _dispatch_kevent_timer[] = { - DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL), - DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL), - DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND), - DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL), - DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL), - DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND), - DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM), -}; -#define DISPATCH_KEVENT_TIMER_COUNT \ - ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0]))) - -#define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8) -#define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \ - [qos] = { \ - .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \ - .filter = EVFILT_TIMER, \ - .flags = EV_ONESHOT, \ - .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \ - } -#define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \ - DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note) - -struct kevent64_s _dispatch_kevent_timeout[] = { - DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0), - DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL), - DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND), -}; +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_after(dispatch_time_t when, dispatch_queue_t queue, + void *ctxt, void *handler, bool block) +{ + dispatch_timer_source_refs_t dt; + dispatch_source_t ds; + uint64_t leeway, delta; -#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \ - [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC + if (when == DISPATCH_TIME_FOREVER) { +#if DISPATCH_DEBUG + DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity"); +#endif + return; + } -static const uint64_t _dispatch_kevent_coalescing_window[] = { - DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75), - DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1), - DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100), -}; + delta = _dispatch_timeout(when); + if (delta == 0) { + if (block) { + return dispatch_async(queue, handler); + } + return dispatch_async_f(queue, ctxt, handler); + } + leeway = delta / 10; // -#define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \ - typeof(dr) dri = NULL; typeof(dt) dti; \ - if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ - TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \ - if (ds_timer(dr).target < ds_timer(dri).target) { \ - break; \ - } \ - } \ - TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \ - if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \ - break; \ - } \ - } \ - if (dti) { \ - TAILQ_INSERT_BEFORE(dti, dt, dt_list); \ - } else { \ - TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \ - } \ - } \ - if (dri) { \ - TAILQ_INSERT_BEFORE(dri, dr, dr_list); \ - } else { \ - TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \ - } \ - }) - -#define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \ - ({ \ - if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ - TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \ - } \ - TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \ - dr_list); }) - -#define _dispatch_timers_check(dra, dta) ({ \ - unsigned int qosm = _dispatch_timers_qos_mask; \ - bool update = false; \ - unsigned int tidx; \ - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \ - if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \ - continue; \ - } \ - dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \ - TAILQ_FIRST(&dra[tidx].dk_sources); \ - dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \ - TAILQ_FIRST(&dta[tidx].dt_sources); \ - uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \ - uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \ - if (target != dta[tidx].target) { \ - dta[tidx].target = target; \ - update = true; \ - } \ - if (deadline != dta[tidx].deadline) { \ - dta[tidx].deadline = deadline; \ - update = true; \ - } \ - } \ - update; }) - -static bool _dispatch_timers_reconfigure, _dispatch_timer_expired; -static unsigned int _dispatch_timers_qos_mask; -static bool _dispatch_timers_force_max_leeway; + if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC; + if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC; -static void -_dispatch_timers_init(void) -{ -#ifndef __LP64__ - unsigned int tidx; - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { - _dispatch_kevent_timer[tidx].dk_kevent.udata = \ - DISPATCH_KEVENT_TIMER_UDATA(tidx); + // this function can and should be optimized to not use a dispatch source + ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue); + dt = ds->ds_timer_refs; + + dispatch_continuation_t dc = _dispatch_continuation_alloc(); + if (block) { + _dispatch_continuation_init(dc, ds, handler, 0, 0, 0); + } else { + _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0); } -#endif // __LP64__ - if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) { - _dispatch_timers_force_max_leeway = true; + // reference `ds` so that it doesn't show up as a leak + dc->dc_data = ds; + _dispatch_trace_continuation_push(ds->_as_dq, dc); + os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed); + + if ((int64_t)when < 0) { + // wall clock + when = (dispatch_time_t)-((int64_t)when); + } else { + // absolute clock + dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH; + leeway = _dispatch_time_nano2mach(leeway); } + dt->dt_timer.target = when; + dt->dt_timer.interval = UINT64_MAX; + dt->dt_timer.deadline = when + leeway; + dispatch_activate(ds); } -static inline void -_dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk) +DISPATCH_NOINLINE +void +dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, + dispatch_function_t func) { - dispatch_source_refs_t dr = ds->ds_refs; - unsigned int tidx = (unsigned int)dk->dk_kevent.ident; + _dispatch_after(when, queue, ctxt, func, false); +} - if (slowpath(ds_timer_aggregate(ds))) { - _dispatch_timer_aggregates_unregister(ds, tidx); - } - _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list, - _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); - if (tidx != DISPATCH_TIMER_INDEX_DISARM) { - _dispatch_timers_reconfigure = true; - _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); - } +#ifdef __BLOCKS__ +void +dispatch_after(dispatch_time_t when, dispatch_queue_t queue, + dispatch_block_t work) +{ + _dispatch_after(when, queue, NULL, work, true); } +#endif -// Updates the ordered list of timers based on next fire date for changes to ds. -// Should only be called from the context of _dispatch_mgr_q. -static void -_dispatch_timers_update(dispatch_source_t ds) +#pragma mark - +#pragma mark dispatch_timers + +/* + * The dispatch_timer_heap_t structure is a double min-heap of timers, + * interleaving the by-target min-heap in the even slots, and the by-deadline + * in the odd ones. + * + * The min element of these is held inline in the dispatch_timer_heap_t + * structure, and further entries are held in segments. + * + * dth_segments is the number of allocated segments. + * + * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers + * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1)) + * + * Segment n (dth_segments - 1) is the last segment and points its final n + * entries to previous segments. Its address is held in the `dth_heap` field. + * + * segment n [ regular timer pointers | n-1 | k | 0 ] + * | | | + * segment n-1 <---------------------------' | | + * segment k <--------------------------------' | + * segment 0 <------------------------------------' + */ +#define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u + +/* + * There are two min-heaps stored interleaved in a single array, + * even indices are for the by-target min-heap, and odd indices for + * the by-deadline one. + */ +#define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1) +#define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK) +#define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \ + (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id)) + +DISPATCH_ALWAYS_INLINE +static inline uint32_t +_dispatch_timer_heap_capacity(uint32_t segments) { - dispatch_kevent_t dk = ds->ds_dkev; - dispatch_source_refs_t dr = ds->ds_refs; - unsigned int tidx; + if (segments == 0) return 2; + uint32_t seg_no = segments - 1; + // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY, + // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no + return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << seg_no) - seg_no; +} - DISPATCH_ASSERT_ON_MANAGER_QUEUE(); +DISPATCH_NOINLINE +static void +_dispatch_timer_heap_grow(dispatch_timer_heap_t dth) +{ + uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY; + uint32_t seg_no = dth->dth_segments++; + void **heap, **heap_prev = dth->dth_heap; - // Do not reschedule timers unregistered with _dispatch_kevent_unregister() - if (slowpath(!dk)) { - return; + if (seg_no > 0) { + seg_capacity <<= (seg_no - 1); } - // Move timers that are disabled, suspended or have missed intervals to the - // disarmed list, rearm after resume resp. source invoke will reenable them - if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) || - ds->ds_pending_data) { - tidx = DISPATCH_TIMER_INDEX_DISARM; - (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); - } else { - tidx = _dispatch_source_timer_idx(dr); + heap = _dispatch_calloc(seg_capacity, sizeof(void *)); + if (seg_no > 1) { + uint32_t prev_seg_no = seg_no - 1; + uint32_t prev_seg_capacity = seg_capacity >> 1; + memcpy(&heap[seg_capacity - prev_seg_no], + &heap_prev[prev_seg_capacity - prev_seg_no], + prev_seg_no * sizeof(void *)); } - if (slowpath(ds_timer_aggregate(ds))) { - _dispatch_timer_aggregates_register(ds); + if (seg_no > 0) { + heap[seg_capacity - seg_no] = heap_prev; } - if (slowpath(!ds->ds_is_installed)) { - ds->ds_is_installed = true; - if (tidx != DISPATCH_TIMER_INDEX_DISARM) { - (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); - } - _dispatch_object_debug(ds, "%s", __func__); - ds->ds_dkev = NULL; - free(dk); - } else { - _dispatch_timers_unregister(ds, dk); + dth->dth_heap = heap; +} + +DISPATCH_NOINLINE +static void +_dispatch_timer_heap_shrink(dispatch_timer_heap_t dth) +{ + uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY; + uint32_t seg_no = --dth->dth_segments; + void **heap = dth->dth_heap, **heap_prev = NULL; + + if (seg_no > 0) { + seg_capacity <<= (seg_no - 1); + heap_prev = heap[seg_capacity - seg_no]; } - if (tidx != DISPATCH_TIMER_INDEX_DISARM) { - _dispatch_timers_reconfigure = true; - _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); + if (seg_no > 1) { + uint32_t prev_seg_no = seg_no - 1; + uint32_t prev_seg_capacity = seg_capacity >> 1; + memcpy(&heap_prev[prev_seg_capacity - prev_seg_no], + &heap[seg_capacity - prev_seg_no], + prev_seg_no * sizeof(void *)); } - if (dk != &_dispatch_kevent_timer[tidx]){ - ds->ds_dkev = &_dispatch_kevent_timer[tidx]; + dth->dth_heap = heap_prev; + free(heap); +} + +DISPATCH_ALWAYS_INLINE +static inline dispatch_timer_source_refs_t * +_dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth, uint32_t idx) +{ + uint32_t seg_no, segments = dth->dth_segments; + void **segment; + + if (idx < DTH_ID_COUNT) { + return &dth->dth_min[idx]; + } + idx -= DTH_ID_COUNT; + + // Derive the segment number from the index. Naming + // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are: + // 0: 0 .. (C - 1) + // 1: C .. 2 * C - 1 + // k: 2^(k-1) * C .. 2^k * C - 1 + // so `k` can be derived from the first bit set in `idx` + seg_no = (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1) - + __builtin_clz(idx | (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1))); + if (seg_no + 1 == segments) { + segment = dth->dth_heap; + } else { + uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY; + seg_capacity <<= (segments - 2); + segment = dth->dth_heap[seg_capacity - seg_no - 1]; } - _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list, - _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); - if (slowpath(ds_timer_aggregate(ds))) { - _dispatch_timer_aggregates_update(ds, tidx); + if (seg_no) { + idx -= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (seg_no - 1); } + return (dispatch_timer_source_refs_t *)(segment + idx); } +DISPATCH_ALWAYS_INLINE static inline void -_dispatch_timers_run2(uint64_t nows[], unsigned int tidx) +_dispatch_timer_heap_set(dispatch_timer_source_refs_t *slot, + dispatch_timer_source_refs_t dt, uint32_t idx) { - dispatch_source_refs_t dr; - dispatch_source_t ds; - uint64_t now, missed; + *slot = dt; + dt->dt_heap_entry[DTH_HEAP_ID(idx)] = idx; +} - now = _dispatch_source_timer_now(nows, tidx); - while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) { - ds = _dispatch_source_from_refs(dr); - // We may find timers on the wrong list due to a pending update from - // dispatch_source_set_timer. Force an update of the list in that case. - if (tidx != ds->ds_ident_hack) { - _dispatch_timers_update(ds); - continue; - } - if (!ds_timer(dr).target) { - // No configured timers on the list - break; - } - if (ds_timer(dr).target > now) { - // Done running timers for now. - break; - } - // Remove timers that are suspended or have missed intervals from the - // list, rearm after resume resp. source invoke will reenable them - if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) { - _dispatch_timers_update(ds); - continue; - } - // Calculate number of missed intervals. - missed = (now - ds_timer(dr).target) / ds_timer(dr).interval; - if (++missed > INT_MAX) { - missed = INT_MAX; - } - if (ds_timer(dr).interval < INT64_MAX) { - ds_timer(dr).target += missed * ds_timer(dr).interval; - ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway; +DISPATCH_ALWAYS_INLINE +static inline uint32_t +_dispatch_timer_heap_parent(uint32_t idx) +{ + uint32_t heap_id = DTH_HEAP_ID(idx); + idx = (idx - DTH_ID_COUNT) / 2; // go to the parent + return DTH_IDX_FOR_HEAP_ID(idx, heap_id); +} + +DISPATCH_ALWAYS_INLINE +static inline uint32_t +_dispatch_timer_heap_left_child(uint32_t idx) +{ + uint32_t heap_id = DTH_HEAP_ID(idx); + // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id + return 2 * idx + DTH_ID_COUNT - heap_id; +} + +#if DISPATCH_HAVE_TIMER_COALESCING +DISPATCH_ALWAYS_INLINE +static inline uint32_t +_dispatch_timer_heap_walk_skip(uint32_t idx, uint32_t count) +{ + uint32_t heap_id = DTH_HEAP_ID(idx); + + idx -= heap_id; + if (unlikely(idx + DTH_ID_COUNT == count)) { + // reaching `count` doesn't mean we're done, but there is a weird + // corner case if the last item of the heap is a left child: + // + // /\ + // / \ + // / __\ + // /__/ + // ^ + // + // The formula below would return the sibling of `idx` which is + // out of bounds. Fortunately, the correct answer is the same + // as for idx's parent + idx = _dispatch_timer_heap_parent(idx); + } + + // + // When considering the index in a non interleaved, 1-based array + // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1) + // for a given idx in our dual-heaps, that index is in one of two forms: + // + // (a) 1xxxx011111 or (b) 111111111 + // d i 0 d 0 + // + // The first bit set is the row of the binary tree node (0-based). + // The following digits from most to least significant represent the path + // to that node, where `0` is a left turn and `1` a right turn. + // + // For example 0b0101 (5) is a node on row 2 accessed going left then right: + // + // row 0 1 + // / . + // row 1 2 3 + // . \ . . + // row 2 4 5 6 7 + // : : : : : : : : + // + // Skipping a sub-tree in walk order means going to the sibling of the last + // node reached after we turned left. If the node was of the form (a), + // this node is 1xxxx1, which for the above example is 0b0011 (3). + // If the node was of the form (b) then we never took a left, meaning + // we reached the last element in traversal order. + // + + // + // we want to find + // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1) + // - which is offset by log_2(DTH_ID_COUNT) from the position of the least + // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1) + // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2. + // - which in turn is the same as the position of the least significant 1 in + // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1) + // + dispatch_static_assert(powerof2(DTH_ID_COUNT)); + idx += DTH_ID_COUNT + DTH_ID_COUNT - 1; + idx >>= __builtin_ctz(~idx); + + // + // `idx` is now either: + // - 0 if it was the (b) case above, in which case the walk is done + // - 1xxxx0 as the position in a 0 based array representation of a non + // interleaved heap, so we just have to compute the interleaved index. + // + return likely(idx) ? DTH_ID_COUNT * idx + heap_id : UINT32_MAX; +} + +DISPATCH_ALWAYS_INLINE +static inline uint32_t +_dispatch_timer_heap_walk_next(uint32_t idx, uint32_t count) +{ + // + // Goes to the next element in heap walk order, which is the prefix ordered + // walk of the tree. + // + // From a given node, the next item to return is the left child if it + // exists, else the first right sibling we find by walking our parent chain, + // which is exactly what _dispatch_timer_heap_walk_skip() returns. + // + uint32_t lchild = _dispatch_timer_heap_left_child(idx); + if (lchild < count) { + return lchild; + } + return _dispatch_timer_heap_walk_skip(idx, count); +} + +DISPATCH_NOINLINE +static uint64_t +_dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth, uint64_t limit) +{ + dispatch_timer_source_refs_t dri; + uint32_t idx = _dispatch_timer_heap_left_child(DTH_TARGET_ID); + uint32_t count = dth->dth_count; + uint64_t tmp, target = dth->dth_min[DTH_TARGET_ID]->dt_timer.target; + + while (idx < count) { + dri = *_dispatch_timer_heap_get_slot(dth, idx); + tmp = dri->dt_timer.target; + if (tmp > limit) { + // skip subtree since none of the targets below can be before limit + idx = _dispatch_timer_heap_walk_skip(idx, count); } else { - ds_timer(dr).target = UINT64_MAX; - ds_timer(dr).deadline = UINT64_MAX; + target = tmp; + idx = _dispatch_timer_heap_walk_next(idx, count); } - _dispatch_timers_update(ds); - ds_timer(dr).last_fire = now; - - unsigned long data; - data = dispatch_atomic_add2o(ds, ds_pending_data, - (unsigned long)missed, relaxed); - _dispatch_trace_timer_fire(dr, data, (unsigned long)missed); - _dispatch_wakeup(ds); } + return target; } +#endif // DISPATCH_HAVE_TIMER_COALESCING DISPATCH_NOINLINE static void -_dispatch_timers_run(uint64_t nows[]) -{ - unsigned int tidx; - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { - if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) { - _dispatch_timers_run2(nows, tidx); +_dispatch_timer_heap_resift(dispatch_timer_heap_t dth, + dispatch_timer_source_refs_t dt, uint32_t idx) +{ + dispatch_static_assert(offsetof(struct dispatch_timer_source_s, target) == + offsetof(struct dispatch_timer_source_s, heap_key[DTH_TARGET_ID])); + dispatch_static_assert(offsetof(struct dispatch_timer_source_s, deadline) == + offsetof(struct dispatch_timer_source_s, heap_key[DTH_DEADLINE_ID])); +#define dth_cmp(hid, dt1, op, dt2) \ + (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid]) + + dispatch_timer_source_refs_t *pslot, pdt; + dispatch_timer_source_refs_t *cslot, cdt; + dispatch_timer_source_refs_t *rslot, rdt; + uint32_t cidx, dth_count = dth->dth_count; + dispatch_timer_source_refs_t *slot; + int heap_id = DTH_HEAP_ID(idx); + bool sifted_up = false; + + // try to sift up + + slot = _dispatch_timer_heap_get_slot(dth, idx); + while (idx >= DTH_ID_COUNT) { + uint32_t pidx = _dispatch_timer_heap_parent(idx); + pslot = _dispatch_timer_heap_get_slot(dth, pidx); + pdt = *pslot; + if (dth_cmp(heap_id, pdt, <=, dt)) { + break; } + _dispatch_timer_heap_set(slot, pdt, idx); + slot = pslot; + idx = pidx; + sifted_up = true; + } + if (sifted_up) { + goto done; } -} -static inline unsigned int -_dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[], - uint64_t *delay, uint64_t *leeway, int qos) -{ - unsigned int tidx, ridx = DISPATCH_TIMER_COUNT; - uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX; + // try to sift down - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { - if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){ - continue; - } - uint64_t target = timer[tidx].target; - if (target == UINT64_MAX) { - continue; - } - uint64_t deadline = timer[tidx].deadline; - if (qos >= 0) { - // Timer pre-coalescing - uint64_t window = _dispatch_kevent_coalescing_window[qos]; - uint64_t latest = deadline > window ? deadline - window : 0; - dispatch_source_refs_t dri; - TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources, - dr_list) { - tmp = ds_timer(dri).target; - if (tmp > latest) break; - target = tmp; + while ((cidx = _dispatch_timer_heap_left_child(idx)) < dth_count) { + uint32_t ridx = cidx + DTH_ID_COUNT; + cslot = _dispatch_timer_heap_get_slot(dth, cidx); + cdt = *cslot; + if (ridx < dth_count) { + rslot = _dispatch_timer_heap_get_slot(dth, ridx); + rdt = *rslot; + if (dth_cmp(heap_id, cdt, >, rdt)) { + cidx = ridx; + cdt = rdt; + cslot = rslot; } } - uint64_t now = _dispatch_source_timer_now(nows, tidx); - if (target <= now) { - delta = 0; + if (dth_cmp(heap_id, dt, <=, cdt)) { break; } - tmp = target - now; - if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { - tmp = _dispatch_time_mach2nano(tmp); - } - if (tmp < INT64_MAX && tmp < delta) { - ridx = tidx; - delta = tmp; - } - dispatch_assert(target <= deadline); - tmp = deadline - now; - if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { - tmp = _dispatch_time_mach2nano(tmp); - } - if (tmp < INT64_MAX && tmp < dldelta) { - dldelta = tmp; - } + _dispatch_timer_heap_set(slot, cdt, idx); + slot = cslot; + idx = cidx; } - *delay = delta; - *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX; - return ridx; + +done: + _dispatch_timer_heap_set(slot, dt, idx); +#undef dth_cmp } -static bool -_dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke, - unsigned int qos) +DISPATCH_ALWAYS_INLINE +static void +_dispatch_timer_heap_insert(dispatch_timer_heap_t dth, + dispatch_timer_source_refs_t dt) { - unsigned int tidx; - bool poll; - uint64_t delay, leeway; + uint32_t idx = (dth->dth_count += DTH_ID_COUNT) - DTH_ID_COUNT; - tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway, - (int)qos); - poll = (delay == 0); - if (poll || delay == UINT64_MAX) { - _dispatch_trace_next_timer_set(NULL, qos); - if (!ke->data) { - return poll; - } - ke->data = 0; - ke->flags |= EV_DELETE; - ke->flags &= ~(EV_ADD|EV_ENABLE); - } else { - _dispatch_trace_next_timer_set( - TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos); - _dispatch_trace_next_timer_program(delay, qos); - delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL); - if (slowpath(_dispatch_timers_force_max_leeway)) { - ke->data = (int64_t)(delay + leeway); - ke->ext[1] = 0; - } else { - ke->data = (int64_t)delay; - ke->ext[1] = leeway; - } - ke->flags |= EV_ADD|EV_ENABLE; - ke->flags &= ~EV_DELETE; + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], ==, + DTH_INVALID_ID, "target idx"); + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], ==, + DTH_INVALID_ID, "deadline idx"); + + if (idx == 0) { + dt->dt_heap_entry[DTH_TARGET_ID] = DTH_TARGET_ID; + dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_DEADLINE_ID; + dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = dt; + return; } - _dispatch_kq_update(ke); - return poll; + + if (unlikely(idx + DTH_ID_COUNT > + _dispatch_timer_heap_capacity(dth->dth_segments))) { + _dispatch_timer_heap_grow(dth); + } + _dispatch_timer_heap_resift(dth, dt, idx + DTH_TARGET_ID); + _dispatch_timer_heap_resift(dth, dt, idx + DTH_DEADLINE_ID); } DISPATCH_NOINLINE -static bool -_dispatch_timers_program(uint64_t nows[]) +static void +_dispatch_timer_heap_remove(dispatch_timer_heap_t dth, + dispatch_timer_source_refs_t dt) { - bool poll = false; - unsigned int qos, qosm = _dispatch_timers_qos_mask; - for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) { - if (!(qosm & 1 << qos)){ - continue; + uint32_t idx = (dth->dth_count -= DTH_ID_COUNT); + + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=, + DTH_INVALID_ID, "target idx"); + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=, + DTH_INVALID_ID, "deadline idx"); + + if (idx == 0) { + DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_TARGET_ID], ==, dt, + "target slot"); + DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_DEADLINE_ID], ==, dt, + "deadline slot"); + dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = NULL; + goto clear_heap_entry; + } + + for (uint32_t heap_id = 0; heap_id < DTH_ID_COUNT; heap_id++) { + dispatch_timer_source_refs_t *slot, last_dt; + slot = _dispatch_timer_heap_get_slot(dth, idx + heap_id); + last_dt = *slot; *slot = NULL; + if (last_dt != dt) { + uint32_t removed_idx = dt->dt_heap_entry[heap_id]; + _dispatch_timer_heap_resift(dth, last_dt, removed_idx); } - poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos], - qos); } - return poll; -} + if (unlikely(idx <= _dispatch_timer_heap_capacity(dth->dth_segments - 1))) { + _dispatch_timer_heap_shrink(dth); + } -DISPATCH_NOINLINE -static bool -_dispatch_timers_configure(void) -{ - _dispatch_timer_aggregates_check(); - // Find out if there is a new target/deadline on the timer lists - return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer); +clear_heap_entry: + dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID; + dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID; } -static void -_dispatch_timers_calendar_change(void) +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_timer_heap_update(dispatch_timer_heap_t dth, + dispatch_timer_source_refs_t dt) { - // calendar change may have gone past the wallclock deadline - _dispatch_timer_expired = true; - _dispatch_timers_qos_mask = ~0u; -} + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=, + DTH_INVALID_ID, "target idx"); + DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=, + DTH_INVALID_ID, "deadline idx"); -static void -_dispatch_timers_kevent(struct kevent64_s *ke) -{ - dispatch_assert(ke->data > 0); - dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) == - DISPATCH_KEVENT_TIMEOUT_IDENT_MASK); - unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK; - dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT); - dispatch_assert(_dispatch_kevent_timeout[qos].data); - _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT - _dispatch_timer_expired = true; - _dispatch_timers_qos_mask |= 1 << qos; - _dispatch_trace_next_timer_wake(qos); + + _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_TARGET_ID]); + _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_DEADLINE_ID]); } -static inline bool -_dispatch_mgr_timers(void) +DISPATCH_ALWAYS_INLINE +static bool +_dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth, + uint32_t count, uint32_t mask) { - uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {}; - bool expired = slowpath(_dispatch_timer_expired); - if (expired) { - _dispatch_timers_run(nows); - } - bool reconfigure = slowpath(_dispatch_timers_reconfigure); - if (reconfigure || expired) { - if (reconfigure) { - reconfigure = _dispatch_timers_configure(); - _dispatch_timers_reconfigure = false; + dispatch_timer_source_refs_t dt; + bool changed = false; + uint64_t tmp; + uint32_t tidx; + + for (tidx = 0; tidx < count; tidx++) { + if (!(mask & (1u << tidx))) { + continue; } - if (reconfigure || expired) { - expired = _dispatch_timer_expired = _dispatch_timers_program(nows); - expired = expired || _dispatch_mgr_q.dq_items_tail; + + dt = dth[tidx].dth_min[DTH_TARGET_ID]; + tmp = dt ? dt->dt_timer.target : UINT64_MAX; + if (dth[tidx].dth_target != tmp) { + dth[tidx].dth_target = tmp; + changed = true; + } + dt = dth[tidx].dth_min[DTH_DEADLINE_ID]; + tmp = dt ? dt->dt_timer.deadline : UINT64_MAX; + if (dth[tidx].dth_deadline != tmp) { + dth[tidx].dth_deadline = tmp; + changed = true; } - _dispatch_timers_qos_mask = 0; } - return expired; + return changed; } -#pragma mark - -#pragma mark dispatch_timer_aggregate - -typedef struct { - TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources; -} dispatch_timer_aggregate_refs_s; - -typedef struct dispatch_timer_aggregate_s { - DISPATCH_STRUCT_HEADER(queue); - DISPATCH_QUEUE_HEADER; - TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list; - dispatch_timer_aggregate_refs_s - dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT]; - struct { - DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s); - } dta_timer[DISPATCH_TIMER_COUNT]; - struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT]; - unsigned int dta_refcount; -} dispatch_timer_aggregate_s; - -typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s; -static dispatch_timer_aggregates_s _dispatch_timer_aggregates = - TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates); - -dispatch_timer_aggregate_t -dispatch_timer_aggregate_create(void) +static inline void +_dispatch_timers_unregister(dispatch_timer_source_refs_t dt) { - unsigned int tidx; - dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue), - sizeof(struct dispatch_timer_aggregate_s)); - _dispatch_queue_init((dispatch_queue_t)dta); - dta->do_targetq = _dispatch_get_root_queue( - _DISPATCH_QOS_CLASS_USER_INITIATED, true); - dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX; - //FIXME: aggregates need custom vtable - //dta->dq_label = "timer-aggregate"; - for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) { - TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources); - } - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { - TAILQ_INIT(&dta->dta_timer[tidx].dt_sources); - dta->dta_timer[tidx].target = UINT64_MAX; - dta->dta_timer[tidx].deadline = UINT64_MAX; - dta->dta_timer_data[tidx].target = UINT64_MAX; - dta->dta_timer_data[tidx].deadline = UINT64_MAX; - } - return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create( - (dispatch_queue_t)dta); -} - -typedef struct dispatch_timer_delay_s { - dispatch_timer_t timer; - uint64_t delay, leeway; -} *dispatch_timer_delay_t; + uint32_t tidx = dt->du_ident; + dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx]; -static void -_dispatch_timer_aggregate_get_delay(void *ctxt) -{ - dispatch_timer_delay_t dtd = ctxt; - struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {}; - _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway, - -1); + _dispatch_timer_heap_remove(heap, dt); + _dispatch_timers_reconfigure = true; + _dispatch_timers_processing_mask |= 1 << tidx; + dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON); + dt->du_wlh = NULL; } -uint64_t -dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta, - uint64_t *leeway_ptr) +static inline void +_dispatch_timers_register(dispatch_timer_source_refs_t dt, uint32_t tidx) { - struct dispatch_timer_delay_s dtd = { - .timer = dta->dta_timer_data, - }; - dispatch_sync_f((dispatch_queue_t)dta, &dtd, - _dispatch_timer_aggregate_get_delay); - if (leeway_ptr) { - *leeway_ptr = dtd.leeway; + dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx]; + if (_dispatch_unote_registered(dt)) { + DISPATCH_TIMER_ASSERT(dt->du_ident, ==, tidx, "tidx"); + _dispatch_timer_heap_update(heap, dt); + } else { + dt->du_ident = tidx; + _dispatch_timer_heap_insert(heap, dt); } - return dtd.delay; + _dispatch_timers_reconfigure = true; + _dispatch_timers_processing_mask |= 1 << tidx; + dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON); + dt->du_wlh = DISPATCH_WLH_ANON; } -static void -_dispatch_timer_aggregate_update(void *ctxt) +DISPATCH_ALWAYS_INLINE +static inline bool +_dispatch_source_timer_tryarm(dispatch_source_t ds) { - dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current(); - dispatch_timer_t dtau = ctxt; - unsigned int tidx; - for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { - dta->dta_timer_data[tidx].target = dtau[tidx].target; - dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline; - } - free(dtau); + dispatch_queue_flags_t oqf, nqf; + return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, { + if (oqf & (DSF_CANCELED | DQF_RELEASED)) { + // do not install a cancelled timer + os_atomic_rmw_loop_give_up(break); + } + nqf = oqf | DSF_ARMED; + }); } -DISPATCH_NOINLINE +// Updates the ordered list of timers based on next fire date for changes to ds. +// Should only be called from the context of _dispatch_mgr_q. static void -_dispatch_timer_aggregates_configure(void) +_dispatch_timers_update(dispatch_unote_t du, uint32_t flags) { - dispatch_timer_aggregate_t dta; - dispatch_timer_t dtau; - TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) { - if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) { - continue; + dispatch_timer_source_refs_t dr = du._dt; + dispatch_source_t ds = _dispatch_source_from_refs(dr); + const char *verb = "updated"; + bool will_register, disarm = false; + + DISPATCH_ASSERT_ON_MANAGER_QUEUE(); + + if (unlikely(dr->du_ident == DISPATCH_TIMER_IDENT_CANCELED)) { + dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0); + return; + } + + // Unregister timers that are unconfigured, disabled, suspended or have + // missed intervals. Rearm after dispatch_set_timer(), resume or source + // invoke will reenable them + will_register = !(flags & DISPATCH_TIMERS_UNREGISTER) && + dr->dt_timer.target < INT64_MAX && + !os_atomic_load2o(ds, ds_pending_data, relaxed) && + !DISPATCH_QUEUE_IS_SUSPENDED(ds) && + !os_atomic_load2o(dr, dt_pending_config, relaxed); + if (likely(!_dispatch_unote_registered(dr))) { + dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0); + if (unlikely(!will_register || !_dispatch_source_timer_tryarm(ds))) { + return; } - dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau)); - memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer)); - _dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau, - _dispatch_timer_aggregate_update); + verb = "armed"; + } else if (unlikely(!will_register)) { + disarm = true; + verb = "disarmed"; } -} -static inline void -_dispatch_timer_aggregates_check(void) -{ - if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) { - return; + // The heap owns a +2 on dispatch sources it references + // + // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2 + // when it wants to take over this +2 at the same time we are unregistering + // the timer from the heap. + // + // Compute our refcount balance according to these rules, if our balance + // would become negative we retain the source upfront, if it is positive, we + // get rid of the extraneous refcounts after we're done touching the source. + int refs = will_register ? -2 : 0; + if (_dispatch_unote_registered(dr) && !(flags & DISPATCH_TIMERS_RETAIN_2)) { + refs += 2; + } + if (refs < 0) { + dispatch_assert(refs == -2); + _dispatch_retain_2(ds); } - _dispatch_timer_aggregates_configure(); -} -static void -_dispatch_timer_aggregates_register(dispatch_source_t ds) -{ - dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); - if (!dta->dta_refcount++) { - TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list); + uint32_t tidx = _dispatch_source_timer_idx(dr); + if (unlikely(_dispatch_unote_registered(dr) && + (!will_register || dr->du_ident != tidx))) { + _dispatch_timers_unregister(dr); + } + if (likely(will_register)) { + _dispatch_timers_register(dr, tidx); } -} -DISPATCH_NOINLINE -static void -_dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx) -{ - dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); - dispatch_timer_source_aggregate_refs_t dr; - dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; - _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list, - dta->dta_timer, dr, dta_list); + if (disarm) { + _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED); + } + _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds, verb, dr); + _dispatch_object_debug(ds, "%s", __func__); + if (refs > 0) { + dispatch_assert(refs == 2); + _dispatch_release_2_tailcall(ds); + } } -DISPATCH_NOINLINE -static void -_dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx) +#define DISPATCH_TIMER_MISSED_MARKER 1ul + +DISPATCH_ALWAYS_INLINE +static inline unsigned long +_dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt, + uint64_t now, unsigned long prev) { - dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); - dispatch_timer_source_aggregate_refs_t dr; - dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; - _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL, - dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list); - if (!--dta->dta_refcount) { - TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list); + uint64_t missed = (now - dt->dt_timer.target) / dt->dt_timer.interval; + if (++missed + prev > LONG_MAX) { + missed = LONG_MAX - prev; + } + if (dt->dt_timer.interval < INT64_MAX) { + uint64_t push_by = missed * dt->dt_timer.interval; + dt->dt_timer.target += push_by; + dt->dt_timer.deadline += push_by; + } else { + dt->dt_timer.target = UINT64_MAX; + dt->dt_timer.deadline = UINT64_MAX; } + prev += missed; + return prev; } -#pragma mark - -#pragma mark dispatch_select +DISPATCH_ALWAYS_INLINE +static inline unsigned long +_dispatch_source_timer_data(dispatch_source_t ds, dispatch_unote_t du) +{ + dispatch_timer_source_refs_t dr = du._dt; + unsigned long data, prev, clear_prev = 0; -static int _dispatch_kq; + os_atomic_rmw_loop2o(ds, ds_pending_data, prev, clear_prev, relaxed, { + data = prev >> 1; + if (unlikely(prev & DISPATCH_TIMER_MISSED_MARKER)) { + os_atomic_rmw_loop_give_up(goto handle_missed_intervals); + } + }); + return data; -static unsigned int _dispatch_select_workaround; -static fd_set _dispatch_rfds; -static fd_set _dispatch_wfds; -static uint64_t*_dispatch_rfd_ptrs; -static uint64_t*_dispatch_wfd_ptrs; +handle_missed_intervals: + // The timer may be in _dispatch_source_invoke2() already for other + // reasons such as running the registration handler when ds_pending_data + // is changed by _dispatch_timers_run2() without holding the drain lock. + // + // We hence need dependency ordering to pair with the release barrier + // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit. + os_atomic_thread_fence(dependency); + dr = os_atomic_force_dependency_on(dr, data); + + uint64_t now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr->du_ident)); + if (now >= dr->dt_timer.target) { + OS_COMPILER_CAN_ASSUME(dr->dt_timer.interval < INT64_MAX); + data = _dispatch_source_timer_compute_missed(dr, now, data); + } + + // When we see the MISSED_MARKER the manager has given up on this timer + // and expects the handler to call "resume". + // + // However, it may not have reflected this into the atomic flags yet + // so make sure _dispatch_source_invoke2() sees the timer is disarmed + // + // The subsequent _dispatch_source_refs_resume() will enqueue the source + // on the manager and make the changes to `ds_timer` above visible. + _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED); + os_atomic_store2o(ds, ds_pending_data, 0, relaxed); + return data; +} -DISPATCH_NOINLINE -static bool -_dispatch_select_register(struct kevent64_s *kev) +static inline void +_dispatch_timers_run2(dispatch_clock_now_cache_t nows, uint32_t tidx) { + dispatch_timer_source_refs_t dr; + dispatch_source_t ds; + uint64_t data, pending_data; + uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows); + + while ((dr = _dispatch_timers_heap[tidx].dth_min[DTH_TARGET_ID])) { + DISPATCH_TIMER_ASSERT(dr->du_filter, ==, DISPATCH_EVFILT_TIMER, + "invalid filter"); + DISPATCH_TIMER_ASSERT(dr->du_ident, ==, tidx, "tidx"); + DISPATCH_TIMER_ASSERT(dr->dt_timer.target, !=, 0, "missing target"); + ds = _dispatch_source_from_refs(dr); + if (dr->dt_timer.target > now) { + // Done running timers for now. + break; + } + if (dr->du_fflags & DISPATCH_TIMER_AFTER) { + _dispatch_trace_timer_fire(dr, 1, 1); + _dispatch_source_merge_evt(dr, EV_ONESHOT, 1, 0, 0); + _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds, dr); + _dispatch_object_debug(ds, "%s", __func__); + continue; + } - // Must execute on manager queue - DISPATCH_ASSERT_ON_MANAGER_QUEUE(); - - // If an EINVAL or ENOENT error occurred while adding/enabling a read or - // write kevent, assume it was due to a type of filedescriptor not - // supported by kqueue and fall back to select - switch (kev->filter) { - case EVFILT_READ: - if ((kev->data == EINVAL || kev->data == ENOENT) && - dispatch_assume(kev->ident < FD_SETSIZE)) { - FD_SET((int)kev->ident, &_dispatch_rfds); - if (slowpath(!_dispatch_rfd_ptrs)) { - _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE, - sizeof(*_dispatch_rfd_ptrs)); - } - if (!_dispatch_rfd_ptrs[kev->ident]) { - _dispatch_rfd_ptrs[kev->ident] = kev->udata; - _dispatch_select_workaround++; - _dispatch_debug("select workaround used to read fd %d: 0x%lx", - (int)kev->ident, (long)kev->data); + data = os_atomic_load2o(ds, ds_pending_data, relaxed); + if (unlikely(data)) { + // the release barrier is required to make the changes + // to `ds_timer` visible to _dispatch_source_timer_data() + if (os_atomic_cmpxchg2o(ds, ds_pending_data, data, + data | DISPATCH_TIMER_MISSED_MARKER, release)) { + _dispatch_timers_update(dr, DISPATCH_TIMERS_UNREGISTER); + continue; } } - return true; - case EVFILT_WRITE: - if ((kev->data == EINVAL || kev->data == ENOENT) && - dispatch_assume(kev->ident < FD_SETSIZE)) { - FD_SET((int)kev->ident, &_dispatch_wfds); - if (slowpath(!_dispatch_wfd_ptrs)) { - _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE, - sizeof(*_dispatch_wfd_ptrs)); - } - if (!_dispatch_wfd_ptrs[kev->ident]) { - _dispatch_wfd_ptrs[kev->ident] = kev->udata; - _dispatch_select_workaround++; - _dispatch_debug("select workaround used to write fd %d: 0x%lx", - (int)kev->ident, (long)kev->data); - } + + data = _dispatch_source_timer_compute_missed(dr, now, 0); + _dispatch_timers_update(dr, DISPATCH_TIMERS_RETAIN_2); + pending_data = data << 1; + if (!_dispatch_unote_registered(dr) && dr->dt_timer.target < INT64_MAX){ + // if we unregistered because of suspension we have to fake we + // missed events. + pending_data |= DISPATCH_TIMER_MISSED_MARKER; + os_atomic_store2o(ds, ds_pending_data, pending_data, release); + } else { + os_atomic_store2o(ds, ds_pending_data, pending_data, relaxed); } - return true; + _dispatch_trace_timer_fire(dr, data, data); + _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds, dr); + _dispatch_object_debug(ds, "%s", __func__); + dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2); } - return false; } DISPATCH_NOINLINE -static bool -_dispatch_select_unregister(const struct kevent64_s *kev) +static void +_dispatch_timers_run(dispatch_clock_now_cache_t nows) { - // Must execute on manager queue - DISPATCH_ASSERT_ON_MANAGER_QUEUE(); - - switch (kev->filter) { - case EVFILT_READ: - if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE && - _dispatch_rfd_ptrs[kev->ident]) { - FD_CLR((int)kev->ident, &_dispatch_rfds); - _dispatch_rfd_ptrs[kev->ident] = 0; - _dispatch_select_workaround--; - return true; - } - break; - case EVFILT_WRITE: - if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE && - _dispatch_wfd_ptrs[kev->ident]) { - FD_CLR((int)kev->ident, &_dispatch_wfds); - _dispatch_wfd_ptrs[kev->ident] = 0; - _dispatch_select_workaround--; - return true; + uint32_t tidx; + for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { + if (_dispatch_timers_heap[tidx].dth_count) { + _dispatch_timers_run2(nows, tidx); } - break; } - return false; } -DISPATCH_NOINLINE -static bool -_dispatch_mgr_select(bool poll) -{ - static const struct timeval timeout_immediately = { 0, 0 }; - fd_set tmp_rfds, tmp_wfds; - struct kevent64_s kev; - int err, i, r; - bool kevent_avail = false; - - FD_COPY(&_dispatch_rfds, &tmp_rfds); - FD_COPY(&_dispatch_wfds, &tmp_wfds); - - r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, - poll ? (struct timeval*)&timeout_immediately : NULL); - if (slowpath(r == -1)) { - err = errno; - if (err != EBADF) { - if (err != EINTR) { - (void)dispatch_assume_zero(err); - } - return false; - } - for (i = 0; i < FD_SETSIZE; i++) { - if (i == _dispatch_kq) { - continue; - } - if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){ - continue; - } - r = dup(i); - if (dispatch_assume(r != -1)) { - close(r); - } else { - if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) { - FD_CLR(i, &_dispatch_rfds); - _dispatch_rfd_ptrs[i] = 0; - _dispatch_select_workaround--; - } - if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) { - FD_CLR(i, &_dispatch_wfds); - _dispatch_wfd_ptrs[i] = 0; - _dispatch_select_workaround--; - } - } +#if DISPATCH_HAVE_TIMER_COALESCING +#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \ + [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC + +static const uint64_t _dispatch_kevent_coalescing_window[] = { + DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75), +#if DISPATCH_HAVE_TIMER_QOS + DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1), + DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100), +#endif +}; +#endif // DISPATCH_HAVE_TIMER_COALESCING + +static inline dispatch_timer_delay_s +_dispatch_timers_get_delay(dispatch_timer_heap_t dth, dispatch_clock_t clock, + uint32_t qos, dispatch_clock_now_cache_t nows) +{ + uint64_t target = dth->dth_target, deadline = dth->dth_deadline; + uint64_t delta = INT64_MAX, dldelta = INT64_MAX; + dispatch_timer_delay_s rc; + + dispatch_assert(target <= deadline); + if (delta == 0 || target >= INT64_MAX) { + goto done; + } + + if (qos < DISPATCH_TIMER_QOS_COUNT && dth->dth_count > 2) { +#if DISPATCH_HAVE_TIMER_COALESCING + // Timer pre-coalescing + // When we have several timers with this target/deadline bracket: + // + // Target window Deadline + // V <-------V + // t1: [...........|.................] + // t2: [......|.......] + // t3: [..|..........] + // t4: | [.............] + // ^ + // Optimal Target + // + // Coalescing works better if the Target is delayed to "Optimal", by + // picking the latest target that isn't too close to the deadline. + uint64_t window = _dispatch_kevent_coalescing_window[qos]; + if (target + window < deadline) { + uint64_t latest = deadline - window; + target = _dispatch_timer_heap_max_target_before(dth, latest); } - return false; +#endif } - if (r > 0) { - for (i = 0; i < FD_SETSIZE; i++) { - if (FD_ISSET(i, &tmp_rfds)) { - if (i == _dispatch_kq) { - kevent_avail = true; - continue; - } - FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH - EV_SET64(&kev, i, EVFILT_READ, - EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, - _dispatch_rfd_ptrs[i], 0, 0); - _dispatch_kevent_drain(&kev); - } - if (FD_ISSET(i, &tmp_wfds)) { - FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH - EV_SET64(&kev, i, EVFILT_WRITE, - EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, - _dispatch_wfd_ptrs[i], 0, 0); - _dispatch_kevent_drain(&kev); - } - } + + uint64_t now = _dispatch_time_now_cached(clock, nows); + if (target <= now) { + delta = 0; + dldelta = 0; + goto done; } - return kevent_avail; -} -#pragma mark - -#pragma mark dispatch_kqueue + uint64_t tmp = target - now; + if (clock != DISPATCH_CLOCK_WALL) { + tmp = _dispatch_time_mach2nano(tmp); + } + if (tmp < delta) { + delta = tmp; + } -static void -_dispatch_kq_init(void *context DISPATCH_UNUSED) + tmp = deadline - now; + if (clock != DISPATCH_CLOCK_WALL) { + tmp = _dispatch_time_mach2nano(tmp); + } + if (tmp < dldelta) { + dldelta = tmp; + } + +done: + rc.delay = delta; + rc.leeway = delta < INT64_MAX ? dldelta - delta : INT64_MAX; + return rc; +} + +static bool +_dispatch_timers_program2(dispatch_clock_now_cache_t nows, uint32_t tidx) { - static const struct kevent64_s kev = { - .ident = 1, - .filter = EVFILT_USER, - .flags = EV_ADD|EV_CLEAR, - }; + uint32_t qos = DISPATCH_TIMER_QOS(tidx); + dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx); + dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx]; + dispatch_timer_delay_s range; - _dispatch_safe_fork = false; -#if DISPATCH_USE_GUARDED_FD - guardid_t guard = (uintptr_t)&kev; - _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP); -#else - _dispatch_kq = kqueue(); -#endif - if (_dispatch_kq == -1) { - int err = errno; - switch (err) { - case EMFILE: - DISPATCH_CLIENT_CRASH("kqueue() failure: " - "process is out of file descriptors"); - break; - case ENFILE: - DISPATCH_CLIENT_CRASH("kqueue() failure: " - "system is out of file descriptors"); - break; - case ENOMEM: - DISPATCH_CLIENT_CRASH("kqueue() failure: " - "kernel is out of memory"); - break; - default: - (void)dispatch_assume_zero(err); - DISPATCH_CRASH("kqueue() failure"); - break; + range = _dispatch_timers_get_delay(heap, clock, qos, nows); + if (range.delay == 0 || range.delay >= INT64_MAX) { + _dispatch_trace_next_timer_set(NULL, qos); + if (heap->dth_flags & DTH_ARMED) { + _dispatch_event_loop_timer_delete(tidx); } - } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) { - // in case we fall back to select() - FD_SET(_dispatch_kq, &_dispatch_rfds); + return range.delay == 0; } - (void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0, - NULL)); - _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0); + _dispatch_trace_next_timer_set(heap->dth_min[DTH_TARGET_ID], qos); + _dispatch_trace_next_timer_program(range.delay, qos); + _dispatch_event_loop_timer_arm(tidx, range, nows); + return false; } -static int -_dispatch_get_kq(void) +DISPATCH_NOINLINE +static bool +_dispatch_timers_program(dispatch_clock_now_cache_t nows) { - static dispatch_once_t pred; - - dispatch_once_f(&pred, NULL, _dispatch_kq_init); + bool poll = false; + uint32_t tidx, timerm = _dispatch_timers_processing_mask; - return _dispatch_kq; + for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { + if (timerm & (1 << tidx)) { + poll |= _dispatch_timers_program2(nows, tidx); + } + } + return poll; } DISPATCH_NOINLINE -static long -_dispatch_kq_update(const struct kevent64_s *kev) +static bool +_dispatch_timers_configure(void) { - int r; - struct kevent64_s kev_copy; + // Find out if there is a new target/deadline on the timer lists + return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap, + countof(_dispatch_timers_heap), _dispatch_timers_processing_mask); +} - if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) { - if (_dispatch_select_unregister(kev)) { - return 0; - } +static inline bool +_dispatch_mgr_timers(void) +{ + dispatch_clock_now_cache_s nows = { }; + bool expired = _dispatch_timers_expired; + if (unlikely(expired)) { + _dispatch_timers_run(&nows); } - kev_copy = *kev; - // This ensures we don't get a pending kevent back while registering - // a new kevent - kev_copy.flags |= EV_RECEIPT; -retry: - r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1, - &kev_copy, 1, 0, NULL)); - if (slowpath(r == -1)) { - int err = errno; - switch (err) { - case EINTR: - goto retry; - case EBADF: - DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); - break; - default: - (void)dispatch_assume_zero(err); - break; + _dispatch_mgr_trace_timers_wakes(); + bool reconfigure = _dispatch_timers_reconfigure; + if (unlikely(reconfigure || expired)) { + if (reconfigure) { + reconfigure = _dispatch_timers_configure(); + _dispatch_timers_reconfigure = false; } - return err; - } - switch (kev_copy.data) { - case 0: - return 0; - case EBADF: - case EPERM: - case EINVAL: - case ENOENT: - if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) { - if (_dispatch_select_register(&kev_copy)) { - return 0; - } + if (reconfigure || expired) { + expired = _dispatch_timers_expired = _dispatch_timers_program(&nows); } - // fall through - default: - kev_copy.flags |= kev->flags; - _dispatch_kevent_drain(&kev_copy); - break; + _dispatch_timers_processing_mask = 0; } - return (long)kev_copy.data; + return expired; } #pragma mark - #pragma mark dispatch_mgr -static struct kevent64_s *_dispatch_kevent_enable; - -static void inline -_dispatch_mgr_kevent_reenable(struct kevent64_s *ke) -{ - dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke); - _dispatch_kevent_enable = ke; -} - -unsigned long -_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED) +void +_dispatch_mgr_queue_push(dispatch_queue_t dq, dispatch_object_t dou, + DISPATCH_UNUSED dispatch_qos_t qos) { - if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { - return false; + uint64_t dq_state; + _dispatch_trace_continuation_push(dq, dou._do); + if (unlikely(_dispatch_queue_push_update_tail(dq, dou._do))) { + _dispatch_queue_push_update_head(dq, dou._do); + dq_state = os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release); + if (!_dq_state_drain_locked_by_self(dq_state)) { + _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0); + } } - - static const struct kevent64_s kev = { - .ident = 1, - .filter = EVFILT_USER, - .fflags = NOTE_TRIGGER, - }; - -#if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG - _dispatch_debug("waking up the dispatch manager queue: %p", dq); -#endif - - _dispatch_kq_update(&kev); - - return false; } -DISPATCH_NOINLINE -static void -_dispatch_mgr_init(void) +DISPATCH_NORETURN +void +_dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq, + DISPATCH_UNUSED dispatch_qos_t qos, + DISPATCH_UNUSED dispatch_wakeup_flags_t flags) { - (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed); - _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q); - _dispatch_queue_set_bound_thread(&_dispatch_mgr_q); - _dispatch_mgr_priority_init(); - _dispatch_kevent_init(); - _dispatch_timers_init(); - _dispatch_mach_recv_msg_buf_init(); - _dispatch_memorystatus_init(); + DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager"); } +#if DISPATCH_USE_MGR_THREAD DISPATCH_NOINLINE DISPATCH_NORETURN static void _dispatch_mgr_invoke(void) { - static const struct timespec timeout_immediately = { 0, 0 }; - struct kevent64_s kev; +#if DISPATCH_EVENT_BACKEND_KEVENT + dispatch_kevent_s evbuf[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT]; +#endif + dispatch_deferred_items_s ddi = { +#if DISPATCH_EVENT_BACKEND_KEVENT + .ddi_maxevents = DISPATCH_DEFERRED_ITEMS_EVENT_COUNT, + .ddi_eventlist = evbuf, +#endif + }; bool poll; - int r; + _dispatch_deferred_items_set(&ddi); for (;;) { _dispatch_mgr_queue_drain(); poll = _dispatch_mgr_timers(); - if (slowpath(_dispatch_select_workaround)) { - poll = _dispatch_mgr_select(poll); - if (!poll) continue; - } poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q); - r = kevent64(_dispatch_kq, _dispatch_kevent_enable, - _dispatch_kevent_enable ? 1 : 0, &kev, 1, 0, - poll ? &timeout_immediately : NULL); - _dispatch_kevent_enable = NULL; - if (slowpath(r == -1)) { - int err = errno; - switch (err) { - case EINTR: - break; - case EBADF: - DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); - break; - default: - (void)dispatch_assume_zero(err); - break; - } - } else if (r) { - _dispatch_kevent_drain(&kev); - } + _dispatch_event_loop_drain(poll ? KEVENT_FLAG_IMMEDIATE : 0); } } +#endif // DISPATCH_USE_MGR_THREAD DISPATCH_NORETURN void -_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED) +_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED, + dispatch_invoke_context_t dic DISPATCH_UNUSED, + dispatch_invoke_flags_t flags DISPATCH_UNUSED) { - _dispatch_mgr_init(); +#if DISPATCH_USE_KEVENT_WORKQUEUE + if (_dispatch_kevent_workqueue_enabled) { + DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with " + "kevent workqueue enabled"); + } +#endif +#if DISPATCH_USE_MGR_THREAD + _dispatch_queue_set_current(&_dispatch_mgr_q); + _dispatch_mgr_priority_init(); + _dispatch_queue_mgr_lock(&_dispatch_mgr_q); // never returns, so burn bridges behind us & clear stack 2k ahead _dispatch_clear_stack(2048); _dispatch_mgr_invoke(); -} - -#pragma mark - -#pragma mark dispatch_memorystatus - -#if DISPATCH_USE_MEMORYSTATUS_SOURCE -#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS -#define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \ - DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \ - DISPATCH_MEMORYSTATUS_PRESSURE_WARN) -#elif DISPATCH_USE_VM_PRESSURE_SOURCE -#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM -#define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE -#endif - -#if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE -static dispatch_source_t _dispatch_memorystatus_source; - -static void -_dispatch_memorystatus_handler(void *context DISPATCH_UNUSED) -{ -#if DISPATCH_USE_MEMORYSTATUS_SOURCE - unsigned long memorystatus; - memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source); - if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) { - _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; - _voucher_activity_heap_pressure_normal(); - return; - } - _dispatch_continuation_cache_limit = - DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN; - _voucher_activity_heap_pressure_warn(); #endif - malloc_zone_pressure_relief(0,0); -} - -static void -_dispatch_memorystatus_init(void) -{ - _dispatch_memorystatus_source = dispatch_source_create( - DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0, - DISPATCH_MEMORYSTATUS_SOURCE_MASK, - _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true)); - dispatch_source_set_event_handler_f(_dispatch_memorystatus_source, - _dispatch_memorystatus_handler); - dispatch_resume(_dispatch_memorystatus_source); } -#else -static inline void _dispatch_memorystatus_init(void) {} -#endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE - -#pragma mark - -#pragma mark dispatch_mach -#if HAVE_MACH - -#if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG -#define _dispatch_debug_machport(name) \ - dispatch_debug_machport((name), __func__) -#else -#define _dispatch_debug_machport(name) ((void)(name)) -#endif +#if DISPATCH_USE_KEVENT_WORKQUEUE -// Flags for all notifications that are registered/unregistered when a -// send-possible notification is requested/delivered -#define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \ - DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED) -#define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \ - DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ - DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) -#define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \ - DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ - DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) - -#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v) -#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \ - (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y))) - -#define _DISPATCH_MACHPORT_HASH_SIZE 32 -#define _DISPATCH_MACHPORT_HASH(x) \ - _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE) - -#ifndef MACH_RCV_LARGE_IDENTITY -#define MACH_RCV_LARGE_IDENTITY 0x00000008 -#endif -#ifndef MACH_RCV_VOUCHER -#define MACH_RCV_VOUCHER 0x00000800 -#endif -#define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX -#define DISPATCH_MACH_RCV_OPTIONS ( \ - MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \ - MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \ - MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \ - MACH_RCV_VOUCHER - -#define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0]) - -static void _dispatch_kevent_machport_drain(struct kevent64_s *ke); -static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke); -static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr); -static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr); -static void _dispatch_source_merge_mach_msg(dispatch_source_t ds, - dispatch_source_refs_t dr, dispatch_kevent_t dk, - mach_msg_header_t *hdr, mach_msg_size_t siz); -static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk, - uint32_t new_flags, uint32_t del_flags, uint32_t mask, - mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync); -static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr); -static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, - dispatch_mach_reply_refs_t dmr, bool disconnected); -static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm); -static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou, - mach_msg_option_t options); -static void _dispatch_mach_msg_recv(dispatch_mach_t dm, - dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr, - mach_msg_size_t siz); -static void _dispatch_mach_merge_kevent(dispatch_mach_t dm, - const struct kevent64_s *ke); -static inline mach_msg_option_t _dispatch_mach_checkin_options(void); - -static const size_t _dispatch_mach_recv_msg_size = - DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE; -static const size_t dispatch_mach_trailer_size = - sizeof(dispatch_mach_trailer_t); -static mach_msg_size_t _dispatch_mach_recv_msg_buf_size; -static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset; -static mach_port_t _dispatch_mach_notify_port; -static struct kevent64_s _dispatch_mach_recv_kevent = { - .filter = EVFILT_MACHPORT, - .flags = EV_ADD|EV_ENABLE|EV_DISPATCH, - .fflags = DISPATCH_MACH_RCV_OPTIONS, -}; -static dispatch_source_t _dispatch_mach_notify_source; -static const -struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = { - .ke = { - .filter = EVFILT_MACHPORT, - .flags = EV_CLEAR, - .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT, - }, -}; +#define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u) -static void -_dispatch_mach_recv_msg_buf_init(void) -{ - mach_vm_size_t vm_size = mach_vm_round_page( - _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size); - _dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size; - mach_vm_address_t vm_addr = vm_page_size; - kern_return_t kr; - - while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size, - VM_FLAGS_ANYWHERE))) { - if (kr != KERN_NO_SPACE) { - (void)dispatch_assume_zero(kr); - DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer"); - } - _dispatch_temporary_resource_shortage(); - vm_addr = vm_page_size; - } - _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr; - _dispatch_mach_recv_kevent.ext[1] = vm_size; -} +_Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN >= + DISPATCH_DEFERRED_ITEMS_EVENT_COUNT, + "our list should not be longer than the kernel's"); -static inline void* -_dispatch_get_mach_recv_msg_buf(void) +DISPATCH_ALWAYS_INLINE +static inline dispatch_priority_t +_dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh, + dispatch_deferred_items_t ddi) +{ + dispatch_assert(wlh); + dispatch_priority_t old_dbp; + + pthread_priority_t pp = _dispatch_get_priority(); + if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) { + // If this thread does not have the event manager flag set, don't setup + // as the dispatch manager and let the caller know to only process + // the delivered events. + // + // Also add the NEEDS_UNBIND flag so that + // _dispatch_priority_compute_update knows it has to unbind + pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK; + if (wlh == DISPATCH_WLH_ANON) { + pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG; + } else { + // pthread sets the flag when it is an event delivery thread + // so we need to explicitly clear it + pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG; + } + _dispatch_thread_setspecific(dispatch_priority_key, + (void *)(uintptr_t)pp); + if (wlh != DISPATCH_WLH_ANON) { + _dispatch_debug("wlh[%p]: handling events", wlh); + } else { + ddi->ddi_can_stash = true; + } + return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER; + } + + if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) || + !(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) { + // When the phtread kext is delivering kevents to us, and pthread + // root queues are in use, then the pthread priority TSD is set + // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set. + // + // Given that this isn't a valid QoS we need to fixup the TSD, + // and the best option is to clear the qos/priority bits which tells + // us to not do any QoS related calls on this thread. + // + // However, in that case the manager thread is opted out of QoS, + // as far as pthread is concerned, and can't be turned into + // something else, so we can't stash. + pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK; + } + // Managers always park without mutating to a regular worker thread, and + // hence never need to unbind from userland, and when draining a manager, + // the NEEDS_UNBIND flag would cause the mutation to happen. + // So we need to strip this flag + pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG; + _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp); + + // ensure kevents registered from this thread are registered at manager QoS + old_dbp = _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER); + _dispatch_queue_set_current(&_dispatch_mgr_q); + _dispatch_queue_mgr_lock(&_dispatch_mgr_q); + return old_dbp; +} + +DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT +static inline bool +_dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp) { - return (void*)_dispatch_mach_recv_kevent.ext[0]; + bool needs_poll = _dispatch_queue_mgr_unlock(&_dispatch_mgr_q); + _dispatch_reset_basepri(old_dbp); + _dispatch_reset_basepri_override(); + _dispatch_queue_set_current(NULL); + return needs_poll; } +DISPATCH_ALWAYS_INLINE static void -_dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED) -{ - kern_return_t kr; - - kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, - &_dispatch_mach_recv_portset); - DISPATCH_VERIFY_MIG(kr); - if (dispatch_assume_zero(kr)) { - DISPATCH_CLIENT_CRASH( - "mach_port_allocate() failed: cannot create port set"); - } - dispatch_assert(_dispatch_get_mach_recv_msg_buf()); - dispatch_assert(dispatch_mach_trailer_size == - REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS( - DISPATCH_MACH_RCV_TRAILER))); - _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset; - _dispatch_kq_update(&_dispatch_mach_recv_kevent); - - kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, - &_dispatch_mach_notify_port); - DISPATCH_VERIFY_MIG(kr); - if (dispatch_assume_zero(kr)) { - DISPATCH_CLIENT_CRASH( - "mach_port_allocate() failed: cannot create receive right"); - } - _dispatch_mach_notify_source = dispatch_source_create( - &_dispatch_source_type_mach_recv_direct, - _dispatch_mach_notify_port, 0, &_dispatch_mgr_q); - static const struct dispatch_continuation_s dc = { - .dc_func = (void*)_dispatch_mach_notify_source_invoke, - }; - _dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] = - (dispatch_continuation_t)&dc; - dispatch_assert(_dispatch_mach_notify_source); - dispatch_resume(_dispatch_mach_notify_source); -} - -static mach_port_t -_dispatch_get_mach_recv_portset(void) +_dispatch_wlh_worker_thread(dispatch_wlh_t wlh, dispatch_kevent_t events, + int *nevents) { - static dispatch_once_t pred; - dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init); - return _dispatch_mach_recv_portset; -} + _dispatch_introspection_thread_add(); + DISPATCH_PERF_MON_VAR_INIT -static void -_dispatch_mach_portset_init(void *context DISPATCH_UNUSED) -{ - struct kevent64_s kev = { - .filter = EVFILT_MACHPORT, - .flags = EV_ADD, + dispatch_deferred_items_s ddi = { + .ddi_eventlist = events, }; - kern_return_t kr; - - kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, - &_dispatch_mach_portset); - DISPATCH_VERIFY_MIG(kr); - if (dispatch_assume_zero(kr)) { - DISPATCH_CLIENT_CRASH( - "mach_port_allocate() failed: cannot create port set"); - } - kev.ident = _dispatch_mach_portset; - _dispatch_kq_update(&kev); -} - -static mach_port_t -_dispatch_get_mach_portset(void) -{ - static dispatch_once_t pred; - dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init); - return _dispatch_mach_portset; -} - -static kern_return_t -_dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps) -{ - mach_port_t mp = (mach_port_t)dk->dk_kevent.ident; - kern_return_t kr; - - _dispatch_debug_machport(mp); - kr = mach_port_move_member(mach_task_self(), mp, mps); - if (slowpath(kr)) { - DISPATCH_VERIFY_MIG(kr); - switch (kr) { - case KERN_INVALID_RIGHT: - if (mps) { - _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: " - "mach_port_move_member() failed ", kr); - break; - } - //fall through - case KERN_INVALID_NAME: -#if DISPATCH_DEBUG - _dispatch_log("Corruption: Mach receive right 0x%x destroyed " - "prematurely", mp); -#endif - break; - default: - (void)dispatch_assume_zero(kr); - break; - } - } - return mps ? kr : 0; -} + dispatch_priority_t old_dbp; -static void -_dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED) -{ -#if (TARGET_IPHONE_SIMULATOR && \ - IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \ - (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090) - // delete and re-add kevent to workaround - if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) { - struct kevent64_s kev = _dispatch_mach_recv_kevent; - kev.flags = EV_DELETE; - _dispatch_kq_update(&kev); + old_dbp = _dispatch_wlh_worker_thread_init(wlh, &ddi); + if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) { + _dispatch_perfmon_start_impl(true); + } else { + dispatch_assert(wlh == DISPATCH_WLH_ANON); + wlh = DISPATCH_WLH_ANON; } -#endif - _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent); -} + _dispatch_deferred_items_set(&ddi); + _dispatch_event_loop_merge(events, *nevents); -static kern_return_t -_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, - uint32_t del_flags) -{ - kern_return_t kr = 0; - dispatch_assert_zero(new_flags & del_flags); - if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) || - (del_flags & _DISPATCH_MACH_RECV_FLAGS)) { - mach_port_t mps; - if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { - mps = _dispatch_get_mach_recv_portset(); - } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) || - ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) && - (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) { - mps = _dispatch_get_mach_portset(); + if (old_dbp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) { + _dispatch_mgr_queue_drain(); + bool poll = _dispatch_mgr_timers(); + if (_dispatch_wlh_worker_thread_reset(old_dbp)) { + poll = true; + } + if (poll) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0); + } else if (ddi.ddi_stashed_dou._do) { + _dispatch_debug("wlh[%p]: draining deferred item %p", wlh, + ddi.ddi_stashed_dou._do); + if (wlh == DISPATCH_WLH_ANON) { + dispatch_assert(ddi.ddi_nevents == 0); + _dispatch_deferred_items_set(NULL); + _dispatch_root_queue_drain_deferred_item(&ddi + DISPATCH_PERF_MON_ARGS); } else { - mps = MACH_PORT_NULL; + _dispatch_root_queue_drain_deferred_wlh(&ddi + DISPATCH_PERF_MON_ARGS); } - kr = _dispatch_mach_portset_update(dk, mps); } - return kr; -} - -static kern_return_t -_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags, - uint32_t del_flags) -{ - kern_return_t kr = 0; - dispatch_assert_zero(new_flags & del_flags); - if ((new_flags & _DISPATCH_MACH_SP_FLAGS) || - (del_flags & _DISPATCH_MACH_SP_FLAGS)) { - // Requesting a (delayed) non-sync send-possible notification - // registers for both immediate dead-name notification and delayed-arm - // send-possible notification for the port. - // The send-possible notification is armed when a mach_msg() with the - // the MACH_SEND_NOTIFY to the port times out. - // If send-possible is unavailable, fall back to immediate dead-name - // registration rdar://problem/2527840&9008724 - kr = _dispatch_mach_notify_update(dk, new_flags, del_flags, - _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE, - MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0); - } - return kr; -} -static inline void -_dispatch_kevent_mach_portset(struct kevent64_s *ke) -{ - if (ke->ident == _dispatch_mach_recv_portset) { - return _dispatch_kevent_mach_msg_drain(ke); - } else if (ke->ident == _dispatch_mach_portset) { - return _dispatch_kevent_machport_drain(ke); - } else { - return _dispatch_kevent_error(ke); + _dispatch_deferred_items_set(NULL); + if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER && + !ddi.ddi_stashed_dou._do) { + _dispatch_perfmon_end(perfmon_thread_event_no_steal); } + _dispatch_debug("returning %d deferred kevents", ddi.ddi_nevents); + *nevents = ddi.ddi_nevents; } DISPATCH_NOINLINE -static void -_dispatch_kevent_machport_drain(struct kevent64_s *ke) +void +_dispatch_kevent_worker_thread(dispatch_kevent_t *events, int *nevents) { - mach_port_t name = (mach_port_name_t)ke->data; - dispatch_kevent_t dk; - struct kevent64_s kev; - - _dispatch_debug_machport(name); - dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); - if (!dispatch_assume(dk)) { + if (!events && !nevents) { + // events for worker thread request have already been delivered earlier return; } - _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH - - EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, - DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0); - _dispatch_kevent_debug(&kev, __func__); - _dispatch_kevent_merge(&kev); -} - -DISPATCH_NOINLINE -static void -_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke) -{ - mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0]; - mach_msg_size_t siz, msgsiz; - mach_msg_return_t kr = (mach_msg_return_t)ke->fflags; - - _dispatch_kevent_mach_recv_reenable(ke); - if (!dispatch_assume(hdr)) { - DISPATCH_CRASH("EVFILT_MACHPORT with no message"); - } - if (fastpath(!kr)) { - return _dispatch_kevent_mach_msg_recv(hdr); - } else if (kr != MACH_RCV_TOO_LARGE) { - goto out; - } - if (!dispatch_assume(ke->ext[1] <= UINT_MAX - - dispatch_mach_trailer_size)) { - DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message"); - } - siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size; - hdr = malloc(siz); - if (ke->data) { - if (!dispatch_assume(hdr)) { - // Kernel will discard message too large to fit - hdr = _dispatch_get_mach_recv_msg_buf(); - siz = _dispatch_mach_recv_msg_buf_size; - } - mach_port_t name = (mach_port_name_t)ke->data; - const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS | - MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE); - kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE, - MACH_PORT_NULL); - if (fastpath(!kr)) { - return _dispatch_kevent_mach_msg_recv(hdr); - } else if (kr == MACH_RCV_TOO_LARGE) { - _dispatch_log("BUG in libdispatch client: " - "_dispatch_kevent_mach_msg_drain: dropped message too " - "large to fit in memory: id = 0x%x, size = %lld", - hdr->msgh_id, ke->ext[1]); - kr = MACH_MSG_SUCCESS; - } - } else { - // We don't know which port in the portset contains the large message, - // so need to receive all messages pending on the portset to ensure the - // large message is drained. - bool received = false; - for (;;) { - if (!dispatch_assume(hdr)) { - DISPATCH_CLIENT_CRASH("Message too large to fit in memory"); - } - const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS | - MACH_RCV_TIMEOUT); - kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset, - MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); - if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume( - hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) { - DISPATCH_CRASH("Overlarge message"); - } - if (fastpath(!kr)) { - msgsiz = hdr->msgh_size + dispatch_mach_trailer_size; - if (msgsiz < siz) { - void *shrink = realloc(hdr, msgsiz); - if (shrink) hdr = shrink; - } - _dispatch_kevent_mach_msg_recv(hdr); - hdr = NULL; - received = true; - } else if (kr == MACH_RCV_TOO_LARGE) { - siz = hdr->msgh_size + dispatch_mach_trailer_size; - } else { - if (kr == MACH_RCV_TIMED_OUT && received) { - kr = MACH_MSG_SUCCESS; - } - break; - } - hdr = reallocf(hdr, siz); - } - } - if (hdr != _dispatch_get_mach_recv_msg_buf()) { - free(hdr); - } -out: - if (slowpath(kr)) { - _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: " - "message reception failed", kr); - } + if (!dispatch_assume(*nevents && *events)) return; + _dispatch_adopt_wlh_anon(); + _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON, *events, nevents); + _dispatch_reset_wlh(); } -static void -_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr) -{ - dispatch_source_refs_t dri; - dispatch_kevent_t dk; - mach_port_t name = hdr->msgh_local_port; - mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size; - - if (!dispatch_assume(hdr->msgh_size <= UINT_MAX - - dispatch_mach_trailer_size)) { - _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " - "received overlarge message"); - return _dispatch_kevent_mach_msg_destroy(hdr); - } - if (!dispatch_assume(name)) { - _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " - "received message with MACH_PORT_NULL port"); - return _dispatch_kevent_mach_msg_destroy(hdr); - } - _dispatch_debug_machport(name); - dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); - if (!dispatch_assume(dk)) { - _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " - "received message with unknown kevent"); - return _dispatch_kevent_mach_msg_destroy(hdr); - } - _dispatch_kevent_debug(&dk->dk_kevent, __func__); - TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { - dispatch_source_t dsi = _dispatch_source_from_refs(dri); - if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { - return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz); - } - } - _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " - "received message with no listeners"); - return _dispatch_kevent_mach_msg_destroy(hdr); -} -static void -_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr) -{ - if (hdr) { - mach_msg_destroy(hdr); - if (hdr != _dispatch_get_mach_recv_msg_buf()) { - free(hdr); - } - } -} +#endif // DISPATCH_USE_KEVENT_WORKQUEUE +#pragma mark - +#pragma mark dispatch_source_debug -static void -_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr, - dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz) +static size_t +_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) { - if (ds == _dispatch_mach_notify_source) { - _dispatch_mach_notify_source_invoke(hdr); - return _dispatch_kevent_mach_msg_destroy(hdr); - } - dispatch_mach_reply_refs_t dmr = NULL; - if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) { - dmr = (dispatch_mach_reply_refs_t)dr; - } - return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz); + dispatch_queue_t target = ds->do_targetq; + dispatch_source_refs_t dr = ds->ds_refs; + return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%x, " + "mask = 0x%x, pending_data = 0x%llx, registered = %d, " + "armed = %d, deleted = %d%s, canceled = %d, ", + target && target->dq_label ? target->dq_label : "", target, + dr->du_ident, dr->du_fflags, (unsigned long long)ds->ds_pending_data, + ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED), + (bool)(ds->dq_atomic_flags & DSF_DELETED), + (ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "", + (bool)(ds->dq_atomic_flags & DSF_CANCELED)); } -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final) +static size_t +_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) { - dispatch_source_refs_t dri, dr_next; - dispatch_kevent_t dk; - struct kevent64_s kev; - bool unreg; - - dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION); - if (!dk) { - return; - } - - // Update notification registration state. - dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS; - EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE, - flag, 0, (uintptr_t)dk, 0, 0); - if (final) { - // This can never happen again - unreg = true; - } else { - // Re-register for notification before delivery - unreg = _dispatch_kevent_resume(dk, flag, 0); - } - DISPATCH_MACH_KEVENT_ARMED(dk) = 0; - TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) { - dispatch_source_t dsi = _dispatch_source_from_refs(dri); - if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) { - dispatch_mach_t dm = (dispatch_mach_t)dsi; - _dispatch_mach_merge_kevent(dm, &kev); - if (unreg && dm->dm_dkev) { - _dispatch_mach_kevent_unregister(dm); - } - } else { - _dispatch_source_merge_kevent(dsi, &kev); - if (unreg) { - _dispatch_source_kevent_unregister(dsi); - } - } - if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) { - // current merge is last in list (dk might have been freed) - // or it re-armed the notification - return; - } - } + dispatch_timer_source_refs_t dr = ds->ds_timer_refs; + return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx" + ", interval = 0x%llx, flags = 0x%x }, ", + (unsigned long long)dr->dt_timer.target, + (unsigned long long)dr->dt_timer.deadline, + (unsigned long long)dr->dt_timer.interval, dr->du_fflags); } -static kern_return_t -_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags, - uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid, - mach_port_mscount_t notify_sync) +size_t +_dispatch_source_debug(dispatch_source_t ds, char *buf, size_t bufsiz) { - mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident; - typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data; - kern_return_t kr, krr = 0; - - // Update notification registration state. - dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask; - dk->dk_kevent.data &= ~(del_flags & mask); - - _dispatch_debug_machport(port); - if ((dk->dk_kevent.data & mask) && !(prev & mask)) { - // initialize _dispatch_mach_notify_port: - (void)_dispatch_get_mach_recv_portset(); - _dispatch_debug("machport[0x%08x]: registering for send-possible " - "notification", port); - previous = MACH_PORT_NULL; - krr = mach_port_request_notification(mach_task_self(), port, - notify_msgid, notify_sync, _dispatch_mach_notify_port, - MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous); - DISPATCH_VERIFY_MIG(krr); - - switch(krr) { - case KERN_INVALID_NAME: - case KERN_INVALID_RIGHT: - // Supress errors & clear registration state - dk->dk_kevent.data &= ~mask; - break; - default: - // Else, we dont expect any errors from mach. Log any errors - if (dispatch_assume_zero(krr)) { - // log the error & clear registration state - dk->dk_kevent.data &= ~mask; - } else if (dispatch_assume_zero(previous)) { - // Another subsystem has beat libdispatch to requesting the - // specified Mach notification on this port. We should - // technically cache the previous port and message it when the - // kernel messages our port. Or we can just say screw those - // subsystems and deallocate the previous port. - // They should adopt libdispatch :-P - kr = mach_port_deallocate(mach_task_self(), previous); - DISPATCH_VERIFY_MIG(kr); - (void)dispatch_assume_zero(kr); - previous = MACH_PORT_NULL; - } - } - } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) { - _dispatch_debug("machport[0x%08x]: unregistering for send-possible " - "notification", port); - previous = MACH_PORT_NULL; - kr = mach_port_request_notification(mach_task_self(), port, - notify_msgid, notify_sync, MACH_PORT_NULL, - MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous); - DISPATCH_VERIFY_MIG(kr); - - switch (kr) { - case KERN_INVALID_NAME: - case KERN_INVALID_RIGHT: - case KERN_INVALID_ARGUMENT: - break; - default: - if (dispatch_assume_zero(kr)) { - // log the error - } - } - } else { - return 0; - } - if (slowpath(previous)) { - // the kernel has not consumed the send-once right yet - (void)dispatch_assume_zero( - _dispatch_send_consume_send_once_right(previous)); + dispatch_source_refs_t dr = ds->ds_refs; + size_t offset = 0; + offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", + dx_kind(ds), ds); + offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset); + offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset); + if (dr->du_is_timer) { + offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset); } - return krr; -} - -static void -_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED) -{ - (void)_dispatch_get_mach_recv_portset(); - _dispatch_debug("registering for calendar-change notification"); - kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(), - HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port); - DISPATCH_VERIFY_MIG(kr); - (void)dispatch_assume_zero(kr); + offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, " + "filter = %s }", dr, dr->du_is_direct ? " (direct)" : "", + dr->du_type->dst_kind); + return offset; } - -static void -_dispatch_mach_host_calendar_change_register(void) -{ - static dispatch_once_t pred; - dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update); -} - -static void -_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr) -{ - mig_reply_error_t reply; - dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union - __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem)); - dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size); - boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head); - if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) { - // host_notify_reply.defs: host_calendar_changed - _dispatch_debug("calendar-change notification"); - _dispatch_timers_calendar_change(); - _dispatch_mach_host_notify_update(NULL); - success = TRUE; - reply.RetCode = KERN_SUCCESS; - } - if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) { - (void)dispatch_assume_zero(reply.RetCode); - } -} - -kern_return_t -_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED, - mach_port_name_t name) -{ -#if DISPATCH_DEBUG - _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x " - "deleted prematurely", name); -#endif - - _dispatch_debug_machport(name); - _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true); - - return KERN_SUCCESS; -} - -kern_return_t -_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED, - mach_port_name_t name) -{ - kern_return_t kr; - - _dispatch_debug("machport[0x%08x]: dead-name notification", name); - _dispatch_debug_machport(name); - _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true); - - // the act of receiving a dead name notification allocates a dead-name - // right that must be deallocated - kr = mach_port_deallocate(mach_task_self(), name); - DISPATCH_VERIFY_MIG(kr); - //(void)dispatch_assume_zero(kr); - - return KERN_SUCCESS; -} - -kern_return_t -_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED, - mach_port_name_t name) -{ - _dispatch_debug("machport[0x%08x]: send-possible notification", name); - _dispatch_debug_machport(name); - _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false); - - return KERN_SUCCESS; -} - -#pragma mark - -#pragma mark dispatch_mach_t - -#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2) -#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2 -#define DISPATCH_MACH_OPTIONS_MASK 0xffff - -static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou); -static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm, - mach_port_t local_port, mach_port_t remote_port); -static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected( - dispatch_object_t dou, dispatch_mach_reply_refs_t dmr); -static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, - dispatch_object_t dou); -static inline mach_msg_header_t* _dispatch_mach_msg_get_msg( - dispatch_mach_msg_t dmsg); -static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou, - pthread_priority_t pp); - -static dispatch_mach_t -_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context, - dispatch_mach_handler_function_t handler, bool handler_is_block) -{ - dispatch_mach_t dm; - dispatch_mach_refs_t dr; - - dm = _dispatch_alloc(DISPATCH_VTABLE(mach), - sizeof(struct dispatch_mach_s)); - _dispatch_queue_init((dispatch_queue_t)dm); - dm->dq_label = label; - - dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds - dm->do_ref_cnt++; // since channel is created suspended - dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; - dm->do_targetq = &_dispatch_mgr_q; - - dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s)); - dr->dr_source_wref = _dispatch_ptr2wref(dm); - dr->dm_handler_func = handler; - dr->dm_handler_ctxt = context; - dm->ds_refs = dr; - dm->dm_handler_is_block = handler_is_block; - - dm->dm_refs = _dispatch_calloc(1ul, - sizeof(struct dispatch_mach_send_refs_s)); - dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm); - dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED; - TAILQ_INIT(&dm->dm_refs->dm_replies); - - // First item on the channel sets the user-specified target queue - dispatch_set_target_queue(dm, q); - _dispatch_object_debug(dm, "%s", __func__); - return dm; -} - -dispatch_mach_t -dispatch_mach_create(const char *label, dispatch_queue_t q, - dispatch_mach_handler_t handler) -{ - dispatch_block_t bb = _dispatch_Block_copy((void*)handler); - return _dispatch_mach_create(label, q, bb, - (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true); -} - -dispatch_mach_t -dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context, - dispatch_mach_handler_function_t handler) -{ - return _dispatch_mach_create(label, q, context, handler, false); -} - -void -_dispatch_mach_dispose(dispatch_mach_t dm) -{ - _dispatch_object_debug(dm, "%s", __func__); - dispatch_mach_refs_t dr = dm->ds_refs; - if (dm->dm_handler_is_block && dr->dm_handler_ctxt) { - Block_release(dr->dm_handler_ctxt); - } - free(dr); - free(dm->dm_refs); - _dispatch_queue_destroy(dm); -} - -void -dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive, - mach_port_t send, dispatch_mach_msg_t checkin) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - dispatch_kevent_t dk; - - if (MACH_PORT_VALID(receive)) { - dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); - dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; - dk->dk_kevent.ident = receive; - dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; - dk->dk_kevent.udata = (uintptr_t)dk; - TAILQ_INIT(&dk->dk_sources); - dm->ds_dkev = dk; - dm->ds_pending_data_mask = dk->dk_kevent.fflags; - _dispatch_retain(dm); // the reference the manager queue holds - } - dr->dm_send = send; - if (MACH_PORT_VALID(send)) { - if (checkin) { - dispatch_retain(checkin); - mach_msg_option_t options = _dispatch_mach_checkin_options(); - _dispatch_mach_msg_set_options(checkin, options); - dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); - } - dr->dm_checkin = checkin; - } - // monitor message reply ports - dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; - if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt, - DISPATCH_MACH_NEVER_CONNECTED, 0, release))) { - DISPATCH_CLIENT_CRASH("Channel already connected"); - } - _dispatch_object_debug(dm, "%s", __func__); - return dispatch_resume(dm); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, - dispatch_mach_reply_refs_t dmr, bool disconnected) -{ - dispatch_mach_msg_t dmsgr = NULL; - if (disconnected) { - dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr); - } - dispatch_kevent_t dk = dmr->dmr_dkev; - TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list); - _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE); - TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list); - if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher); - free(dmr); - if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply, - dispatch_mach_msg_t dmsg) -{ - dispatch_kevent_t dk; - dispatch_mach_reply_refs_t dmr; - - dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); - dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; - dk->dk_kevent.ident = reply; - dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; - dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; - dk->dk_kevent.udata = (uintptr_t)dk; - TAILQ_INIT(&dk->dk_sources); - - dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s)); - dmr->dr_source_wref = _dispatch_ptr2wref(dm); - dmr->dmr_dkev = dk; - if (dmsg->dmsg_voucher) { - dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher); - } - dmr->dmr_priority = dmsg->dmsg_priority; - // make reply context visible to leaks rdar://11777199 - dmr->dmr_ctxt = dmsg->do_ctxt; - - _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply, - dmsg->do_ctxt); - uint32_t flags; - bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags); - TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr, - dr_list); - TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list); - if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) { - _dispatch_mach_reply_kevent_unregister(dm, dmr, true); - } -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_kevent_unregister(dispatch_mach_t dm) -{ - dispatch_kevent_t dk = dm->dm_dkev; - dm->dm_dkev = NULL; - TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs, - dr_list); - dm->ds_pending_data_mask &= ~(unsigned long) - (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); - _dispatch_kevent_unregister(dk, - DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send) -{ - dispatch_kevent_t dk; - - dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); - dk->dk_kevent = _dispatch_source_type_mach_send.ke; - dk->dk_kevent.ident = send; - dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; - dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD; - dk->dk_kevent.udata = (uintptr_t)dk; - TAILQ_INIT(&dk->dk_sources); - - dm->ds_pending_data_mask |= dk->dk_kevent.fflags; - - uint32_t flags; - bool do_resume = _dispatch_kevent_register(&dk, &flags); - TAILQ_INSERT_TAIL(&dk->dk_sources, - (dispatch_source_refs_t)dm->dm_refs, dr_list); - dm->dm_dkev = dk; - if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) { - _dispatch_mach_kevent_unregister(dm); - } -} - -static inline void -_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou, - pthread_priority_t pp) -{ - return _dispatch_queue_push(dm._dq, dou, pp); -} - -static inline void -_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options) -{ - dou._do->do_suspend_cnt = (unsigned int)options; -} - -static inline mach_msg_option_t -_dispatch_mach_msg_get_options(dispatch_object_t dou) -{ - mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt; - return options; -} - -static inline void -_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err, - unsigned long reason) -{ - dispatch_assert_zero(reason & ~(unsigned long)code_emask); - dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err : - err_local|err_sub(0x3e0)|(mach_error_t)reason); -} - -static inline unsigned long -_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr) -{ - mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt; - dou._do->do_suspend_cnt = 0; - if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) { - *err_ptr = 0; - return err_get_code(err); - } - *err_ptr = err; - return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT; -} - -static void -_dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr, - mach_msg_header_t *hdr, mach_msg_size_t siz) -{ - _dispatch_debug_machport(hdr->msgh_remote_port); - _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x", - hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port); - if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { - return _dispatch_kevent_mach_msg_destroy(hdr); - } - dispatch_mach_msg_t dmsg; - voucher_t voucher; - pthread_priority_t priority; - void *ctxt = NULL; - if (dmr) { - _voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher - voucher = dmr->dmr_voucher; - dmr->dmr_voucher = NULL; // transfer reference - priority = dmr->dmr_priority; - ctxt = dmr->dmr_ctxt; - _dispatch_mach_reply_kevent_unregister(dm, dmr, false); - } else { - voucher = voucher_create_with_mach_msg(hdr); - priority = _voucher_get_priority(voucher); - } - dispatch_mach_msg_destructor_t destructor; - destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ? - DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT : - DISPATCH_MACH_MSG_DESTRUCTOR_FREE; - dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL); - dmsg->dmsg_voucher = voucher; - dmsg->dmsg_priority = priority; - dmsg->do_ctxt = ctxt; - _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED); - _dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg); - _dispatch_voucher_ktrace_dmsg_push(dmsg); - return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); -} - -static inline mach_port_t -_dispatch_mach_msg_get_remote_port(dispatch_object_t dou) -{ - mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg); - mach_port_t remote = hdr->msgh_remote_port; - return remote; -} - -static inline void -_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port, - mach_port_t remote_port) -{ - mach_msg_header_t *hdr; - dispatch_mach_msg_t dmsg; - dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t), - DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr); - if (local_port) hdr->msgh_local_port = local_port; - if (remote_port) hdr->msgh_remote_port = remote_port; - _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED); - return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); -} - -static inline dispatch_mach_msg_t -_dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou, - dispatch_mach_reply_refs_t dmr) -{ - dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr; - if (dmsg && !dmsg->dmsg_reply) return NULL; - mach_msg_header_t *hdr; - dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t), - DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr); - if (dmsg) { - hdr->msgh_local_port = dmsg->dmsg_reply; - if (dmsg->dmsg_voucher) { - dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher); - } - dmsgr->dmsg_priority = dmsg->dmsg_priority; - dmsgr->do_ctxt = dmsg->do_ctxt; - } else { - hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident; - dmsgr->dmsg_voucher = dmr->dmr_voucher; - dmr->dmr_voucher = NULL; // transfer reference - dmsgr->dmsg_priority = dmr->dmr_priority; - dmsgr->do_ctxt = dmr->dmr_ctxt; - } - _dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED); - return dmsgr; -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou) -{ - dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr; - dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL); - _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT); - _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); - if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); -} - -DISPATCH_NOINLINE -static dispatch_object_t -_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL; - voucher_t voucher = dmsg->dmsg_voucher; - mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL; - bool clear_voucher = false, kvoucher_move_send = false; - dr->dm_needs_mgr = 0; - if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) { - // send initial checkin message - if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() != - &_dispatch_mgr_q)) { - // send kevent must be uninstalled on the manager queue - dr->dm_needs_mgr = 1; - goto out; - } - dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg; - if (slowpath(dr->dm_checkin)) { - goto out; - } - } - mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg); - mach_msg_return_t kr = 0; - mach_port_t reply = dmsg->dmsg_reply; - mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg); - if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) { - opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK); - if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) != - MACH_MSG_TYPE_MOVE_SEND_ONCE) { - if (dmsg != dr->dm_checkin) { - msg->msgh_remote_port = dr->dm_send; - } - if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { - if (slowpath(!dm->dm_dkev)) { - _dispatch_mach_kevent_register(dm, msg->msgh_remote_port); - } - if (fastpath(dm->dm_dkev)) { - if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) { - goto out; - } - opts |= MACH_SEND_NOTIFY; - } - } - opts |= MACH_SEND_TIMEOUT; - if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) { - ipc_kvoucher = _voucher_create_mach_voucher_with_priority( - voucher, dmsg->dmsg_priority); - } - _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg); - if (ipc_kvoucher) { - kvoucher_move_send = true; - clear_voucher = _voucher_mach_msg_set_mach_voucher(msg, - ipc_kvoucher, kvoucher_move_send); - } else { - clear_voucher = _voucher_mach_msg_set(msg, voucher); - } - } - _voucher_activity_trace_msg(voucher, msg, send); - _dispatch_debug_machport(msg->msgh_remote_port); - if (reply) _dispatch_debug_machport(reply); - kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0, - MACH_PORT_NULL); - _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, " - "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: " - "%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt, - opts, msg_opts, msg->msgh_voucher_port, reply, - mach_error_string(kr), kr); - if (clear_voucher) { - if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) { - DISPATCH_CRASH("Voucher port corruption"); - } - mach_voucher_t kv; - kv = _voucher_mach_msg_clear(msg, kvoucher_move_send); - if (kvoucher_move_send) ipc_kvoucher = kv; - } - } - if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) { - if (opts & MACH_SEND_NOTIFY) { - _dispatch_debug("machport[0x%08x]: send-possible notification " - "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident); - DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1; - } else { - // send kevent must be installed on the manager queue - dr->dm_needs_mgr = 1; - } - if (ipc_kvoucher) { - _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher); - voucher_t ipc_voucher; - ipc_voucher = _voucher_create_with_priority_and_mach_voucher( - voucher, dmsg->dmsg_priority, ipc_kvoucher); - _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]", - ipc_voucher, dmsg, voucher); - if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher); - dmsg->dmsg_voucher = ipc_voucher; - } - goto out; - } else if (ipc_kvoucher && (kr || !kvoucher_move_send)) { - _voucher_dealloc_mach_voucher(ipc_kvoucher); - } - if (fastpath(!kr) && reply && - !(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) { - if (_dispatch_queue_get_current() != &_dispatch_mgr_q) { - // reply receive kevent must be installed on the manager queue - dr->dm_needs_mgr = 1; - _dispatch_mach_msg_set_options(dmsg, msg_opts | - DISPATCH_MACH_REGISTER_FOR_REPLY); - goto out; - } - _dispatch_mach_reply_kevent_register(dm, reply, dmsg); - } - if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) { - _dispatch_mach_kevent_unregister(dm); - } - if (slowpath(kr)) { - // Send failed, so reply was never connected - dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL); - } - _dispatch_mach_msg_set_reason(dmsg, kr, 0); - _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); - if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); - dmsg = NULL; -out: - return (dispatch_object_t)dmsg; -} - -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou, - bool wakeup) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - struct dispatch_object_s *prev, *dc = dou._do; - dc->do_next = NULL; - - prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release); - if (fastpath(prev)) { - prev->do_next = dc; - } else { - dr->dm_head = dc; - } - if (wakeup || !prev) { - _dispatch_wakeup(dm); - } -} - -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou) -{ - return _dispatch_mach_send_push_wakeup(dm, dou, false); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_send_drain(dispatch_mach_t dm) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - struct dispatch_object_s *dc = NULL, *next_dc = NULL; - while (dr->dm_tail) { - _dispatch_wait_until(dc = fastpath(dr->dm_head)); - do { - next_dc = fastpath(dc->do_next); - dr->dm_head = next_dc; - if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL, - relaxed)) { - _dispatch_wait_until(next_dc = fastpath(dc->do_next)); - dr->dm_head = next_dc; - } - if (!DISPATCH_OBJ_IS_VTABLE(dc)) { - if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { - // send barrier - // leave send queue locked until barrier has completed - return _dispatch_mach_push(dm, dc, - ((dispatch_continuation_t)dc)->dc_priority); - } -#if DISPATCH_MACH_SEND_SYNC - if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){ - _dispatch_thread_semaphore_signal( - (_dispatch_thread_semaphore_t)dc->do_ctxt); - continue; - } -#endif // DISPATCH_MACH_SEND_SYNC - if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) { - goto out; - } - continue; - } - _dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc); - if (slowpath(dr->dm_disconnect_cnt) || - slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { - _dispatch_mach_msg_not_sent(dm, dc); - continue; - } - if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) { - goto out; - } - } while ((dc = next_dc)); - } -out: - // if this is not a complete drain, we must undo some things - if (slowpath(dc)) { - if (!next_dc && - !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) { - // wait for enqueue slow path to finish - _dispatch_wait_until(next_dc = fastpath(dr->dm_head)); - dc->do_next = next_dc; - } - dr->dm_head = dc; - } - (void)dispatch_atomic_dec2o(dr, dm_sending, release); - _dispatch_wakeup(dm); -} - -static inline void -_dispatch_mach_send(dispatch_mach_t dm) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr, - dm_sending, 0, 1, acquire))) { - return; - } - _dispatch_object_debug(dm, "%s", __func__); - _dispatch_mach_send_drain(dm); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke) -{ - if (!(ke->fflags & dm->ds_pending_data_mask)) { - return; - } - _dispatch_mach_send(dm); -} - -static inline mach_msg_option_t -_dispatch_mach_checkin_options(void) -{ - mach_msg_option_t options = 0; -#if DISPATCH_USE_CHECKIN_NOIMPORTANCE - options = MACH_SEND_NOIMPORTANCE; // -#endif - return options; -} - - -static inline mach_msg_option_t -_dispatch_mach_send_options(void) -{ - mach_msg_option_t options = 0; - return options; -} - -DISPATCH_NOINLINE -void -dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg, - mach_msg_option_t options) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) { - DISPATCH_CLIENT_CRASH("Message already enqueued"); - } - dispatch_retain(dmsg); - dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK); - options |= _dispatch_mach_send_options(); - _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK); - mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg); - dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) == - MACH_MSG_TYPE_MAKE_SEND_ONCE && - MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port : - MACH_PORT_NULL); - bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) == - MACH_MSG_TYPE_MOVE_SEND_ONCE); - dmsg->dmsg_priority = _dispatch_priority_propagate(); - dmsg->dmsg_voucher = _voucher_copy(); - _dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg); - if ((!is_reply && slowpath(dr->dm_tail)) || - slowpath(dr->dm_disconnect_cnt) || - slowpath(dm->ds_atomic_flags & DSF_CANCELED) || - slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, - acquire))) { - _dispatch_voucher_ktrace_dmsg_push(dmsg); - return _dispatch_mach_send_push(dm, dmsg); - } - if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) { - (void)dispatch_atomic_dec2o(dr, dm_sending, release); - _dispatch_voucher_ktrace_dmsg_push(dmsg); - return _dispatch_mach_send_push_wakeup(dm, dmsg, true); - } - if (!is_reply && slowpath(dr->dm_tail)) { - return _dispatch_mach_send_drain(dm); - } - (void)dispatch_atomic_dec2o(dr, dm_sending, release); - _dispatch_wakeup(dm); -} - -static void -_dispatch_mach_disconnect(dispatch_mach_t dm) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (dm->dm_dkev) { - _dispatch_mach_kevent_unregister(dm); - } - if (MACH_PORT_VALID(dr->dm_send)) { - _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send); - } - dr->dm_send = MACH_PORT_NULL; - if (dr->dm_checkin) { - _dispatch_mach_msg_not_sent(dm, dr->dm_checkin); - dr->dm_checkin = NULL; - } - if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { - dispatch_mach_reply_refs_t dmr, tmp; - TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){ - _dispatch_mach_reply_kevent_unregister(dm, dmr, true); - } - } -} - -DISPATCH_NOINLINE -static bool -_dispatch_mach_cancel(dispatch_mach_t dm) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) { - return false; - } - _dispatch_object_debug(dm, "%s", __func__); - _dispatch_mach_disconnect(dm); - if (dm->ds_dkev) { - mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident; - _dispatch_source_kevent_unregister((dispatch_source_t)dm); - _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL); - } - (void)dispatch_atomic_dec2o(dr, dm_sending, release); - return true; -} - -DISPATCH_NOINLINE -static bool -_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou) -{ - if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { - if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) { - // send/reply kevents must be uninstalled on the manager queue - return false; - } - } - _dispatch_mach_disconnect(dm); - dispatch_mach_send_refs_t dr = dm->dm_refs; - dr->dm_checkin = dou._dc->dc_data; - dr->dm_send = (mach_port_t)dou._dc->dc_other; - _dispatch_continuation_free(dou._dc); - (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed); - _dispatch_object_debug(dm, "%s", __func__); - return true; -} - -DISPATCH_NOINLINE -void -dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send, - dispatch_mach_msg_t checkin) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed); - if (MACH_PORT_VALID(send) && checkin) { - dispatch_retain(checkin); - mach_msg_option_t options = _dispatch_mach_checkin_options(); - _dispatch_mach_msg_set_options(checkin, options); - dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); - } else { - checkin = NULL; - dr->dm_checkin_port = MACH_PORT_NULL; - } - dispatch_continuation_t dc = _dispatch_continuation_alloc(); - dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); - dc->dc_func = (void*)_dispatch_mach_reconnect_invoke; - dc->dc_ctxt = dc; - dc->dc_data = checkin; - dc->dc_other = (void*)(uintptr_t)send; - return _dispatch_mach_send_push(dm, dc); -} - -#if DISPATCH_MACH_SEND_SYNC -DISPATCH_NOINLINE -static void -_dispatch_mach_send_sync_slow(dispatch_mach_t dm) -{ - _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); - struct dispatch_object_s dc = { - .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT), - .do_ctxt = (void*)sema, - }; - _dispatch_mach_send_push(dm, &dc); - _dispatch_thread_semaphore_wait(sema); - _dispatch_put_thread_semaphore(sema); -} -#endif // DISPATCH_MACH_SEND_SYNC - -DISPATCH_NOINLINE -mach_port_t -dispatch_mach_get_checkin_port(dispatch_mach_t dm) -{ - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { - return MACH_PORT_DEAD; - } - return dr->dm_checkin_port; -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_connect_invoke(dispatch_mach_t dm) -{ - dispatch_mach_refs_t dr = dm->ds_refs; - _dispatch_client_callout4(dr->dm_handler_ctxt, - DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func); - dm->dm_connect_handler_called = 1; -} - -DISPATCH_NOINLINE -void -_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg) -{ - dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); - dispatch_mach_refs_t dr = dm->ds_refs; - mach_error_t err; - unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err); - - dmsg->do_next = DISPATCH_OBJECT_LISTLESS; - _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); - _dispatch_voucher_ktrace_dmsg_pop(dmsg); - _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg); - _dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority, - dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE); - dmsg->dmsg_voucher = NULL; - if (slowpath(!dm->dm_connect_handler_called)) { - _dispatch_mach_connect_invoke(dm); - } - _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err, - dr->dm_handler_func); - _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); - _dispatch_introspection_queue_item_complete(dmsg); - dispatch_release(dmsg); -} - -DISPATCH_NOINLINE -void -_dispatch_mach_barrier_invoke(void *ctxt) -{ - dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); - dispatch_mach_refs_t dr = dm->ds_refs; - struct dispatch_continuation_s *dc = ctxt; - void *context = dc->dc_data; - dispatch_function_t barrier = dc->dc_other; - bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT); - - _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); - if (slowpath(!dm->dm_connect_handler_called)) { - _dispatch_mach_connect_invoke(dm); - } - _dispatch_client_callout(context, barrier); - _dispatch_client_callout4(dr->dm_handler_ctxt, - DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func); - _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); - if (send_barrier) { - (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release); - } -} - -DISPATCH_NOINLINE -void -dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context, - dispatch_function_t barrier) -{ - dispatch_continuation_t dc = _dispatch_continuation_alloc(); - dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); - dc->dc_func = _dispatch_mach_barrier_invoke; - dc->dc_ctxt = dc; - dc->dc_data = context; - dc->dc_other = barrier; - _dispatch_continuation_voucher_set(dc, 0); - _dispatch_continuation_priority_set(dc, 0, 0); - - dispatch_mach_send_refs_t dr = dm->dm_refs; - if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr, - dm_sending, 0, 1, acquire))) { - return _dispatch_mach_send_push(dm, dc); - } - // leave send queue locked until barrier has completed - return _dispatch_mach_push(dm, dc, dc->dc_priority); -} - -DISPATCH_NOINLINE -void -dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context, - dispatch_function_t barrier) -{ - dispatch_continuation_t dc = _dispatch_continuation_alloc(); - dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); - dc->dc_func = _dispatch_mach_barrier_invoke; - dc->dc_ctxt = dc; - dc->dc_data = context; - dc->dc_other = barrier; - _dispatch_continuation_voucher_set(dc, 0); - _dispatch_continuation_priority_set(dc, 0, 0); - - return _dispatch_mach_push(dm, dc, dc->dc_priority); -} - -DISPATCH_NOINLINE -void -dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier) -{ - dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier), - _dispatch_call_block_and_release); -} - -DISPATCH_NOINLINE -void -dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier) -{ - dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier), - _dispatch_call_block_and_release); -} - -DISPATCH_NOINLINE -static void -_dispatch_mach_cancel_invoke(dispatch_mach_t dm) -{ - dispatch_mach_refs_t dr = dm->ds_refs; - if (slowpath(!dm->dm_connect_handler_called)) { - _dispatch_mach_connect_invoke(dm); - } - _dispatch_client_callout4(dr->dm_handler_ctxt, - DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func); - dm->dm_cancel_handler_called = 1; - _dispatch_release(dm); // the retain is done at creation time -} - -DISPATCH_NOINLINE -void -dispatch_mach_cancel(dispatch_mach_t dm) -{ - dispatch_source_cancel((dispatch_source_t)dm); -} - -DISPATCH_ALWAYS_INLINE -static inline dispatch_queue_t -_dispatch_mach_invoke2(dispatch_object_t dou, - _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) -{ - dispatch_mach_t dm = dou._dm; - - // This function performs all mach channel actions. Each action is - // responsible for verifying that it takes place on the appropriate queue. - // If the current queue is not the correct queue for this action, the - // correct queue will be returned and the invoke will be re-driven on that - // queue. - - // The order of tests here in invoke and in probe should be consistent. - - dispatch_queue_t dq = _dispatch_queue_get_current(); - dispatch_mach_send_refs_t dr = dm->dm_refs; - - if (slowpath(!dm->ds_is_installed)) { - // The channel needs to be installed on the manager queue. - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; - } - if (dm->ds_dkev) { - _dispatch_source_kevent_register((dispatch_source_t)dm); - } - dm->ds_is_installed = true; - _dispatch_mach_send(dm); - // Apply initial target queue change - _dispatch_queue_drain(dou); - if (dm->dq_items_tail) { - return dm->do_targetq; - } - } else if (dm->dq_items_tail) { - // The channel has pending messages to deliver to the target queue. - if (dq != dm->do_targetq) { - return dm->do_targetq; - } - dispatch_queue_t tq = dm->do_targetq; - if (slowpath(_dispatch_queue_drain(dou))) { - DISPATCH_CLIENT_CRASH("Sync onto mach channel"); - } - if (slowpath(tq != dm->do_targetq)) { - // An item on the channel changed the target queue - return dm->do_targetq; - } - } else if (dr->dm_sending) { - // Sending and uninstallation below require the send lock, the channel - // will be woken up when the lock is dropped - return NULL; - } else if (dr->dm_tail) { - if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) && - (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) { - // Send/reply kevents need to be installed or uninstalled - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; - } - } - if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || - (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) { - // The channel has pending messages to send. - _dispatch_mach_send(dm); - } - } else if (dm->ds_atomic_flags & DSF_CANCELED){ - // The channel has been cancelled and needs to be uninstalled from the - // manager queue. After uninstallation, the cancellation handler needs - // to be delivered to the target queue. - if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || - !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { - if (dq != &_dispatch_mgr_q) { - return &_dispatch_mgr_q; - } - if (!_dispatch_mach_cancel(dm)) { - return NULL; - } - } - if (!dm->dm_cancel_handler_called) { - if (dq != dm->do_targetq) { - return dm->do_targetq; - } - _dispatch_mach_cancel_invoke(dm); - } - } - return NULL; -} - -DISPATCH_NOINLINE -void -_dispatch_mach_invoke(dispatch_mach_t dm) -{ - _dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2); -} - -unsigned long -_dispatch_mach_probe(dispatch_mach_t dm) -{ - // This function determines whether the mach channel needs to be invoked. - // The order of tests here in probe and in invoke should be consistent. - - dispatch_mach_send_refs_t dr = dm->dm_refs; - - if (slowpath(!dm->ds_is_installed)) { - // The channel needs to be installed on the manager queue. - return true; - } else if (_dispatch_queue_class_probe(dm)) { - // The source has pending messages to deliver to the target queue. - return true; - } else if (dr->dm_sending) { - // Sending and uninstallation below require the send lock, the channel - // will be woken up when the lock is dropped - return false; - } else if (dr->dm_tail && - (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || - (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) { - // The channel has pending messages to send. - return true; - } else if (dm->ds_atomic_flags & DSF_CANCELED) { - if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || - !TAILQ_EMPTY(&dm->dm_refs->dm_replies) || - !dm->dm_cancel_handler_called) { - // The channel needs to be uninstalled from the manager queue, or - // the cancellation handler needs to be delivered to the target - // queue. - return true; - } - } - // Nothing to do. - return false; -} - -#pragma mark - -#pragma mark dispatch_mach_msg_t - -dispatch_mach_msg_t -dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size, - dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr) -{ - if (slowpath(size < sizeof(mach_msg_header_t)) || - slowpath(destructor && !msg)) { - DISPATCH_CLIENT_CRASH("Empty message"); - } - dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg), - sizeof(struct dispatch_mach_msg_s) + - (destructor ? 0 : size - sizeof(dmsg->dmsg_msg))); - if (destructor) { - dmsg->dmsg_msg = msg; - } else if (msg) { - memcpy(dmsg->dmsg_buf, msg, size); - } - dmsg->do_next = DISPATCH_OBJECT_LISTLESS; - dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, - false); - dmsg->dmsg_destructor = destructor; - dmsg->dmsg_size = size; - if (msg_ptr) { - *msg_ptr = _dispatch_mach_msg_get_msg(dmsg); - } - return dmsg; -} - -void -_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg) -{ - if (dmsg->dmsg_voucher) { - _voucher_release(dmsg->dmsg_voucher); - dmsg->dmsg_voucher = NULL; - } - switch (dmsg->dmsg_destructor) { - case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT: - break; - case DISPATCH_MACH_MSG_DESTRUCTOR_FREE: - free(dmsg->dmsg_msg); - break; - case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: { - mach_vm_size_t vm_size = dmsg->dmsg_size; - mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg; - (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(), - vm_addr, vm_size)); - break; - }} -} - -static inline mach_msg_header_t* -_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg) -{ - return dmsg->dmsg_destructor ? dmsg->dmsg_msg : - (mach_msg_header_t*)dmsg->dmsg_buf; -} - -mach_msg_header_t* -dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr) -{ - if (size_ptr) { - *size_ptr = dmsg->dmsg_size; - } - return _dispatch_mach_msg_get_msg(dmsg); -} - -size_t -_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz) -{ - size_t offset = 0; - offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", - dx_kind(dmsg), dmsg); - offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, " - "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1); - offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, " - "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf); - mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg); - if (hdr->msgh_id) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ", - hdr->msgh_id); - } - if (hdr->msgh_size) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ", - hdr->msgh_size); - } - if (hdr->msgh_bits) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "bits msgh_bits), - MACH_MSGH_BITS_REMOTE(hdr->msgh_bits)); - if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) { - offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x", - MACH_MSGH_BITS_OTHER(hdr->msgh_bits)); - } - offset += dsnprintf(&buf[offset], bufsiz - offset, ">, "); - } - if (hdr->msgh_local_port && hdr->msgh_remote_port) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, " - "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port); - } else if (hdr->msgh_local_port) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x", - hdr->msgh_local_port); - } else if (hdr->msgh_remote_port) { - offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x", - hdr->msgh_remote_port); - } else { - offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports"); - } - offset += dsnprintf(&buf[offset], bufsiz - offset, " } }"); - return offset; -} - -#pragma mark - -#pragma mark dispatch_mig_server - -mach_msg_return_t -dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, - dispatch_mig_callback_t callback) -{ - mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT - | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX) - | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER; - mach_msg_options_t tmp_options; - mig_reply_error_t *bufTemp, *bufRequest, *bufReply; - mach_msg_return_t kr = 0; - uint64_t assertion_token = 0; - unsigned int cnt = 1000; // do not stall out serial queues - boolean_t demux_success; - bool received = false; - size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE; - - // XXX FIXME -- allocate these elsewhere - bufRequest = alloca(rcv_size); - bufReply = alloca(rcv_size); - bufReply->Head.msgh_size = 0; - bufRequest->RetCode = 0; - -#if DISPATCH_DEBUG - options |= MACH_RCV_LARGE; // rdar://problem/8422992 -#endif - tmp_options = options; - // XXX FIXME -- change this to not starve out the target queue - for (;;) { - if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) { - options &= ~MACH_RCV_MSG; - tmp_options &= ~MACH_RCV_MSG; - - if (!(tmp_options & MACH_SEND_MSG)) { - goto out; - } - } - kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size, - (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0); - - tmp_options = options; - - if (slowpath(kr)) { - switch (kr) { - case MACH_SEND_INVALID_DEST: - case MACH_SEND_TIMED_OUT: - if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) { - mach_msg_destroy(&bufReply->Head); - } - break; - case MACH_RCV_TIMED_OUT: - // Don't return an error if a message was sent this time or - // a message was successfully received previously - // rdar://problems/7363620&7791738 - if(bufReply->Head.msgh_remote_port || received) { - kr = MACH_MSG_SUCCESS; - } - break; - case MACH_RCV_INVALID_NAME: - break; -#if DISPATCH_DEBUG - case MACH_RCV_TOO_LARGE: - // receive messages that are too large and log their id and size - // rdar://problem/8422992 - tmp_options &= ~MACH_RCV_LARGE; - size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE; - void *large_buf = malloc(large_size); - if (large_buf) { - rcv_size = large_size; - bufReply = large_buf; - } - if (!mach_msg(&bufReply->Head, tmp_options, 0, - (mach_msg_size_t)rcv_size, - (mach_port_t)ds->ds_ident_hack, 0, 0)) { - _dispatch_log("BUG in libdispatch client: " - "dispatch_mig_server received message larger than " - "requested size %zd: id = 0x%x, size = %d", - maxmsgsz, bufReply->Head.msgh_id, - bufReply->Head.msgh_size); - } - if (large_buf) { - free(large_buf); - } - // fall through -#endif - default: - _dispatch_bug_mach_client( - "dispatch_mig_server: mach_msg() failed", kr); - break; - } - goto out; - } - - if (!(tmp_options & MACH_RCV_MSG)) { - goto out; - } - - if (assertion_token) { -#if DISPATCH_USE_IMPORTANCE_ASSERTION - int r = proc_importance_assertion_complete(assertion_token); - (void)dispatch_assume_zero(r); -#endif - assertion_token = 0; - } - received = true; - - bufTemp = bufRequest; - bufRequest = bufReply; - bufReply = bufTemp; - -#if DISPATCH_USE_IMPORTANCE_ASSERTION - int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head, - NULL, &assertion_token); - if (r && slowpath(r != EIO)) { - (void)dispatch_assume_zero(r); - } -#endif - _voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head)); - demux_success = callback(&bufRequest->Head, &bufReply->Head); - - if (!demux_success) { - // destroy the request - but not the reply port - bufRequest->Head.msgh_remote_port = 0; - mach_msg_destroy(&bufRequest->Head); - } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) { - // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode - // is present - if (slowpath(bufReply->RetCode)) { - if (bufReply->RetCode == MIG_NO_REPLY) { - continue; - } - - // destroy the request - but not the reply port - bufRequest->Head.msgh_remote_port = 0; - mach_msg_destroy(&bufRequest->Head); - } - } - - if (bufReply->Head.msgh_remote_port) { - tmp_options |= MACH_SEND_MSG; - if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != - MACH_MSG_TYPE_MOVE_SEND_ONCE) { - tmp_options |= MACH_SEND_TIMEOUT; - } - } - } - -out: - if (assertion_token) { -#if DISPATCH_USE_IMPORTANCE_ASSERTION - int r = proc_importance_assertion_complete(assertion_token); - (void)dispatch_assume_zero(r); -#endif - } - - return kr; -} - -#endif /* HAVE_MACH */ - -#pragma mark - -#pragma mark dispatch_source_debug - -DISPATCH_NOINLINE -static const char * -_evfiltstr(short filt) -{ - switch (filt) { -#define _evfilt2(f) case (f): return #f - _evfilt2(EVFILT_READ); - _evfilt2(EVFILT_WRITE); - _evfilt2(EVFILT_AIO); - _evfilt2(EVFILT_VNODE); - _evfilt2(EVFILT_PROC); - _evfilt2(EVFILT_SIGNAL); - _evfilt2(EVFILT_TIMER); -#ifdef EVFILT_VM - _evfilt2(EVFILT_VM); -#endif -#ifdef EVFILT_MEMORYSTATUS - _evfilt2(EVFILT_MEMORYSTATUS); -#endif -#if HAVE_MACH - _evfilt2(EVFILT_MACHPORT); - _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION); -#endif - _evfilt2(EVFILT_FS); - _evfilt2(EVFILT_USER); - - _evfilt2(DISPATCH_EVFILT_TIMER); - _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD); - _evfilt2(DISPATCH_EVFILT_CUSTOM_OR); - default: - return "EVFILT_missing"; - } -} - -static size_t -_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) -{ - dispatch_queue_t target = ds->do_targetq; - return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, " - "pending_data = 0x%lx, pending_data_mask = 0x%lx, ", - target && target->dq_label ? target->dq_label : "", target, - ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask); -} - -static size_t -_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) -{ - dispatch_source_refs_t dr = ds->ds_refs; - return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx," - " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ", - ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire, - ds_timer(dr).interval, ds_timer(dr).flags); -} - -size_t -_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz) -{ - size_t offset = 0; - offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", - dx_kind(ds), ds); - offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset); - offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset); - if (ds->ds_is_timer) { - offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset); - } - offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }", - ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????"); - return offset; -} - -static size_t -_dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz) -{ - dispatch_queue_t target = dm->do_targetq; - return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, " - "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, " - "sending = %d, disconnected = %d, canceled = %d ", - target && target->dq_label ? target->dq_label : "", target, - dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0, - dm->dm_refs->dm_send, - dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0, - dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ? - " (armed)" : "", dm->dm_refs->dm_checkin_port, - dm->dm_refs->dm_checkin ? " (pending)" : "", - dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt, - (bool)(dm->ds_atomic_flags & DSF_CANCELED)); -} -size_t -_dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz) -{ - size_t offset = 0; - offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", - dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label : - dx_kind(dm), dm); - offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset); - offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset); - offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); - return offset; -} - -#if DISPATCH_DEBUG -static void -_dispatch_kevent_debug(struct kevent64_s* kev, const char* str) -{ - _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, " - "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, " - "ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter), - kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0], - kev->ext[1], str); -} - -static void -_dispatch_kevent_debugger2(void *context) -{ - struct sockaddr sa; - socklen_t sa_len = sizeof(sa); - int c, fd = (int)(long)context; - unsigned int i; - dispatch_kevent_t dk; - dispatch_source_t ds; - dispatch_source_refs_t dr; - FILE *debug_stream; - - c = accept(fd, &sa, &sa_len); - if (c == -1) { - if (errno != EAGAIN) { - (void)dispatch_assume_zero(errno); - } - return; - } -#if 0 - int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO - if (r == -1) { - (void)dispatch_assume_zero(errno); - } -#endif - debug_stream = fdopen(c, "a"); - if (!dispatch_assume(debug_stream)) { - close(c); - return; - } - - fprintf(debug_stream, "HTTP/1.0 200 OK\r\n"); - fprintf(debug_stream, "Content-type: text/html\r\n"); - fprintf(debug_stream, "Pragma: nocache\r\n"); - fprintf(debug_stream, "\r\n"); - fprintf(debug_stream, "\n"); - fprintf(debug_stream, "PID %u\n", getpid()); - fprintf(debug_stream, "\n
    \n"); - - //fprintf(debug_stream, "DKDKDKDK" - // "DKDKDK\n"); - - for (i = 0; i < DSL_HASH_SIZE; i++) { - if (TAILQ_EMPTY(&_dispatch_sources[i])) { - continue; - } - TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) { - fprintf(debug_stream, "\t
  • DK %p ident %lu filter %s flags " - "0x%hx fflags 0x%x data 0x%lx udata %p\n", - dk, (unsigned long)dk->dk_kevent.ident, - _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags, - dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data, - (void*)dk->dk_kevent.udata); - fprintf(debug_stream, "\t\t
      \n"); - TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) { - ds = _dispatch_source_from_refs(dr); - fprintf(debug_stream, "\t\t\t
    • DS %p refcnt 0x%x suspend " - "0x%x data 0x%lx mask 0x%lx flags 0x%x
    • \n", - ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt, - ds->ds_pending_data, ds->ds_pending_data_mask, - ds->ds_atomic_flags); - if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) { - dispatch_queue_t dq = ds->do_targetq; - fprintf(debug_stream, "\t\t
      DQ: %p refcnt 0x%x suspend " - "0x%x label: %s\n", dq, dq->do_ref_cnt + 1, - dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:""); - } - } - fprintf(debug_stream, "\t\t
    \n"); - fprintf(debug_stream, "\t
  • \n"); - } - } - fprintf(debug_stream, "
\n\n\n"); - fflush(debug_stream); - fclose(debug_stream); -} - -static void -_dispatch_kevent_debugger2_cancel(void *context) -{ - int ret, fd = (int)(long)context; - - ret = close(fd); - if (ret != -1) { - (void)dispatch_assume_zero(errno); - } -} - -static void -_dispatch_kevent_debugger(void *context DISPATCH_UNUSED) -{ - union { - struct sockaddr_in sa_in; - struct sockaddr sa; - } sa_u = { - .sa_in = { - .sin_family = AF_INET, - .sin_addr = { htonl(INADDR_LOOPBACK), }, - }, - }; - dispatch_source_t ds; - const char *valstr; - int val, r, fd, sock_opt = 1; - socklen_t slen = sizeof(sa_u); - - if (issetugid()) { - return; - } - valstr = getenv("LIBDISPATCH_DEBUGGER"); - if (!valstr) { - return; - } - val = atoi(valstr); - if (val == 2) { - sa_u.sa_in.sin_addr.s_addr = 0; - } - fd = socket(PF_INET, SOCK_STREAM, 0); - if (fd == -1) { - (void)dispatch_assume_zero(errno); - return; - } - r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, - (socklen_t) sizeof sock_opt); - if (r == -1) { - (void)dispatch_assume_zero(errno); - goto out_bad; - } -#if 0 - r = fcntl(fd, F_SETFL, O_NONBLOCK); - if (r == -1) { - (void)dispatch_assume_zero(errno); - goto out_bad; - } -#endif - r = bind(fd, &sa_u.sa, sizeof(sa_u)); - if (r == -1) { - (void)dispatch_assume_zero(errno); - goto out_bad; - } - r = listen(fd, SOMAXCONN); - if (r == -1) { - (void)dispatch_assume_zero(errno); - goto out_bad; - } - r = getsockname(fd, &sa_u.sa, &slen); - if (r == -1) { - (void)dispatch_assume_zero(errno); - goto out_bad; - } - - ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0, - &_dispatch_mgr_q); - if (dispatch_assume(ds)) { - _dispatch_log("LIBDISPATCH: debug port: %hu", - (in_port_t)ntohs(sa_u.sa_in.sin_port)); - - /* ownership of fd transfers to ds */ - dispatch_set_context(ds, (void *)(long)fd); - dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2); - dispatch_source_set_cancel_handler_f(ds, - _dispatch_kevent_debugger2_cancel); - dispatch_resume(ds); - - return; - } -out_bad: - close(fd); -} - -#if HAVE_MACH - -#ifndef MACH_PORT_TYPE_SPREQUEST -#define MACH_PORT_TYPE_SPREQUEST 0x40000000 -#endif - -DISPATCH_NOINLINE -void -dispatch_debug_machport(mach_port_t name, const char* str) -{ - mach_port_type_t type; - mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0; - unsigned int dnreqs = 0, dnrsiz; - kern_return_t kr = mach_port_type(mach_task_self(), name, &type); - if (kr) { - _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name, - kr, mach_error_string(kr), str); - return; - } - if (type & MACH_PORT_TYPE_SEND) { - (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, - MACH_PORT_RIGHT_SEND, &ns)); - } - if (type & MACH_PORT_TYPE_SEND_ONCE) { - (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, - MACH_PORT_RIGHT_SEND_ONCE, &nso)); - } - if (type & MACH_PORT_TYPE_DEAD_NAME) { - (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, - MACH_PORT_RIGHT_DEAD_NAME, &nd)); - } - if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) { - kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs); - if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr); - } - if (type & MACH_PORT_TYPE_RECEIVE) { - mach_port_status_t status = { .mps_pset = 0, }; - mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT; - (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, - MACH_PORT_RIGHT_RECEIVE, &nr)); - (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(), - name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt)); - _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " - "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) " - "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) " - "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs, - type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", - status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N", - status.mps_srights ? "Y":"N", status.mps_sorights, - status.mps_qlimit, status.mps_msgcount, status.mps_mscount, - status.mps_seqno, str); - } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE| - MACH_PORT_TYPE_DEAD_NAME)) { - _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " - "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs, - type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str); - } else { - _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type, - str); - } -} - -#endif // HAVE_MACH - -#endif // DISPATCH_DEBUG