/*
- * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
+ * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* @APPLE_APACHE_LICENSE_HEADER_END@
*/
#include "internal.h"
-#include "protocol.h"
-#include "protocolServer.h"
-#include <sys/mount.h>
-
-#define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
-#define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
-#define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
-#define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
-
-#define DISPATCH_TIMER_INDEX_WALL 0
-#define DISPATCH_TIMER_INDEX_MACH 1
-static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
- {
- .dk_kevent = {
- .ident = DISPATCH_TIMER_INDEX_WALL,
- .filter = DISPATCH_EVFILT_TIMER,
- .udata = &_dispatch_kevent_timer[0],
- },
- .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
- },
- {
- .dk_kevent = {
- .ident = DISPATCH_TIMER_INDEX_MACH,
- .filter = DISPATCH_EVFILT_TIMER,
- .udata = &_dispatch_kevent_timer[1],
- },
- .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
- },
-};
-#define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
-
-static struct dispatch_kevent_s _dispatch_kevent_data_or = {
- .dk_kevent = {
- .filter = DISPATCH_EVFILT_CUSTOM_OR,
- .flags = EV_CLEAR,
- .udata = &_dispatch_kevent_data_or,
- },
- .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
-};
-static struct dispatch_kevent_s _dispatch_kevent_data_add = {
- .dk_kevent = {
- .filter = DISPATCH_EVFILT_CUSTOM_ADD,
- .udata = &_dispatch_kevent_data_add,
- },
- .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
-};
-
-#ifndef DISPATCH_NO_LEGACY
-struct dispatch_source_attr_vtable_s {
- DISPATCH_VTABLE_HEADER(dispatch_source_attr_s);
-};
-struct dispatch_source_attr_s {
- DISPATCH_STRUCT_HEADER(dispatch_source_attr_s, dispatch_source_attr_vtable_s);
- void* finalizer_ctxt;
- dispatch_source_finalizer_function_t finalizer_func;
- void* context;
-};
-#endif /* DISPATCH_NO_LEGACY */
-
-#define _dispatch_source_call_block ((void *)-1)
-static void _dispatch_source_latch_and_call(dispatch_source_t ds);
-static void _dispatch_source_cancel_callout(dispatch_source_t ds);
-static bool _dispatch_source_probe(dispatch_source_t ds);
-static void _dispatch_source_dispose(dispatch_source_t ds);
-static void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke);
-static size_t _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz);
-static size_t dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz);
-static dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds);
-
-static void _dispatch_kevent_merge(dispatch_source_t ds);
-static void _dispatch_kevent_release(dispatch_source_t ds);
-static void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
-static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
-static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk);
-static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk);
-
-static void _dispatch_drain_mach_messages(struct kevent *ke);
-static void _dispatch_timer_list_update(dispatch_source_t ds);
+static void _dispatch_source_handler_free(dispatch_source_t ds, long kind);
+static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval);
-static void
-_dispatch_mach_notify_source_init(void *context __attribute__((unused)));
-
-static const char *
-_evfiltstr(short filt)
-{
- switch (filt) {
-#define _evfilt2(f) case (f): return #f
- _evfilt2(EVFILT_READ);
- _evfilt2(EVFILT_WRITE);
- _evfilt2(EVFILT_AIO);
- _evfilt2(EVFILT_VNODE);
- _evfilt2(EVFILT_PROC);
- _evfilt2(EVFILT_SIGNAL);
- _evfilt2(EVFILT_TIMER);
- _evfilt2(EVFILT_MACHPORT);
- _evfilt2(EVFILT_FS);
- _evfilt2(EVFILT_USER);
- _evfilt2(EVFILT_SESSION);
-
- _evfilt2(DISPATCH_EVFILT_TIMER);
- _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
- _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
- default:
- return "EVFILT_missing";
- }
-}
+#define DISPATCH_TIMERS_UNREGISTER 0x1
+#define DISPATCH_TIMERS_RETAIN_2 0x2
+static void _dispatch_timers_update(dispatch_unote_t du, uint32_t flags);
+static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt);
-#define DSL_HASH_SIZE 256u // must be a power of two
-#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
+static void _dispatch_source_timer_configure(dispatch_source_t ds);
+static inline unsigned long _dispatch_source_timer_data(
+ dispatch_source_t ds, dispatch_unote_t du);
-static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
+#pragma mark -
+#pragma mark dispatch_source_t
-static dispatch_kevent_t
-_dispatch_kevent_find(uintptr_t ident, short filter)
+dispatch_source_t
+dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
+ unsigned long mask, dispatch_queue_t dq)
{
- uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
- dispatch_kevent_t dki;
+ dispatch_source_refs_t dr;
+ dispatch_source_t ds;
- TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
- if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
- break;
- }
+ dr = dux_create(dst, handle, mask)._dr;
+ if (unlikely(!dr)) {
+ return DISPATCH_BAD_INPUT;
}
- return dki;
-}
-
-static void
-_dispatch_kevent_insert(dispatch_kevent_t dk)
-{
- uintptr_t ident = dk->dk_kevent.ident;
- uintptr_t hash = DSL_HASH(dk->dk_kevent.filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
- TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
+ ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
+ sizeof(struct dispatch_source_s));
+ // Initialize as a queue first, then override some settings below.
+ _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
+ DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
+ ds->dq_label = "source";
+ ds->do_ref_cnt++; // the reference the manager queue holds
+ ds->ds_refs = dr;
+ dr->du_owner_wref = _dispatch_ptr2wref(ds);
+
+ if (slowpath(!dq)) {
+ dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
+ } else {
+ _dispatch_retain((dispatch_queue_t _Nonnull)dq);
+ }
+ ds->do_targetq = dq;
+ if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
+ _dispatch_source_set_interval(ds, handle);
+ }
+ _dispatch_object_debug(ds, "%s", __func__);
+ return ds;
}
void
-dispatch_source_cancel(dispatch_source_t ds)
+_dispatch_source_dispose(dispatch_source_t ds, bool *allow_free)
{
-#if DISPATCH_DEBUG
- dispatch_debug(ds, __FUNCTION__);
-#endif
- dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);
- _dispatch_wakeup(ds);
+ _dispatch_object_debug(ds, "%s", __func__);
+ _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
+ _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
+ _dispatch_source_handler_free(ds, DS_CANCEL_HANDLER);
+ _dispatch_unote_dispose(ds->ds_refs);
+ ds->ds_refs = NULL;
+ _dispatch_queue_destroy(ds->_as_dq, allow_free);
}
-#ifndef DISPATCH_NO_LEGACY
void
-_dispatch_source_legacy_xref_release(dispatch_source_t ds)
+_dispatch_source_xref_dispose(dispatch_source_t ds)
{
- if (ds->ds_is_legacy) {
- if (!(ds->ds_timer.flags & DISPATCH_TIMER_ONESHOT)) {
- dispatch_source_cancel(ds);
- }
-
- // Clients often leave sources suspended at the last release
- dispatch_atomic_and(&ds->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK);
- } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
- // Arguments for and against this assert are within 6705399
- DISPATCH_CLIENT_CRASH("Release of a suspended object");
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if (unlikely(!(dqf & (DQF_LEGACY|DSF_CANCELED)))) {
+ DISPATCH_CLIENT_CRASH(ds, "Release of a source that has not been "
+ "cancelled, but has a mandatory cancel handler");
}
- _dispatch_wakeup(ds);
- _dispatch_release(ds);
+ dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
-#endif /* DISPATCH_NO_LEGACY */
long
dispatch_source_testcancel(dispatch_source_t ds)
{
- return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
+ return (bool)(ds->dq_atomic_flags & DSF_CANCELED);
}
-
unsigned long
dispatch_source_get_mask(dispatch_source_t ds)
{
- return ds->ds_pending_data_mask;
+ dispatch_source_refs_t dr = ds->ds_refs;
+ if (ds->dq_atomic_flags & DSF_CANCELED) {
+ return 0;
+ }
+#if DISPATCH_USE_MEMORYSTATUS
+ if (dr->du_vmpressure_override) {
+ return NOTE_VM_PRESSURE;
+ }
+#if TARGET_IPHONE_SIMULATOR
+ if (dr->du_memorypressure_override) {
+ return NOTE_MEMORYSTATUS_PRESSURE_WARN;
+ }
+#endif
+#endif // DISPATCH_USE_MEMORYSTATUS
+ return dr->du_fflags;
}
uintptr_t
dispatch_source_get_handle(dispatch_source_t ds)
{
- return (int)ds->ds_ident_hack;
+ dispatch_source_refs_t dr = ds->ds_refs;
+#if TARGET_IPHONE_SIMULATOR
+ if (dr->du_memorypressure_override) {
+ return 0;
+ }
+#endif
+ return dr->du_ident;
}
unsigned long
dispatch_source_get_data(dispatch_source_t ds)
{
- return ds->ds_data;
-}
-
-#if DISPATCH_DEBUG
-void
-dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
-{
- size_t i;
- for (i = 0; i < count; ++i) {
- _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
- i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags, kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
+#if DISPATCH_USE_MEMORYSTATUS
+ dispatch_source_refs_t dr = ds->ds_refs;
+ if (dr->du_vmpressure_override) {
+ return NOTE_VM_PRESSURE;
+ }
+#if TARGET_IPHONE_SIMULATOR
+ if (dr->du_memorypressure_override) {
+ return NOTE_MEMORYSTATUS_PRESSURE_WARN;
}
-}
#endif
-
-static size_t
-_dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
-{
- size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
- offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
- ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
- return offset;
+#endif // DISPATCH_USE_MEMORYSTATUS
+ uint64_t value = os_atomic_load2o(ds, ds_data, relaxed);
+ return (unsigned long)(
+ ds->ds_refs->du_data_action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
+ ? DISPATCH_SOURCE_GET_DATA(value) : value);
}
-static void
-_dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
-{
- unsigned int i;
- for (i = 0; i < DSL_HASH_SIZE; i++) {
- TAILQ_INIT(&_dispatch_sources[i]);
+size_t
+dispatch_source_get_extended_data(dispatch_source_t ds,
+ dispatch_source_extended_data_t edata, size_t size)
+{
+ size_t target_size = MIN(size,
+ sizeof(struct dispatch_source_extended_data_s));
+ if (size > 0) {
+ unsigned long data, status = 0;
+ if (ds->ds_refs->du_data_action
+ == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
+ uint64_t combined = os_atomic_load(&ds->ds_data, relaxed);
+ data = DISPATCH_SOURCE_GET_DATA(combined);
+ status = DISPATCH_SOURCE_GET_STATUS(combined);
+ } else {
+ data = dispatch_source_get_data(ds);
+ }
+ if (size >= offsetof(struct dispatch_source_extended_data_s, data)
+ + sizeof(edata->data)) {
+ edata->data = data;
+ }
+ if (size >= offsetof(struct dispatch_source_extended_data_s, status)
+ + sizeof(edata->status)) {
+ edata->status = status;
+ }
+ if (size > sizeof(struct dispatch_source_extended_data_s)) {
+ memset(
+ (char *)edata + sizeof(struct dispatch_source_extended_data_s),
+ 0, size - sizeof(struct dispatch_source_extended_data_s));
+ }
}
-
- TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
- TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
- TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
- TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
+ return target_size;
}
-// Find existing kevents, and merge any new flags if necessary
+DISPATCH_NOINLINE
void
-_dispatch_kevent_merge(dispatch_source_t ds)
+_dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp,
+ unsigned long val)
{
- static dispatch_once_t pred;
- dispatch_kevent_t dk;
- typeof(dk->dk_kevent.fflags) new_flags;
- bool do_resume = false;
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ int filter = ds->ds_refs->du_filter;
- if (ds->ds_is_installed) {
+ if (unlikely(dqf & (DSF_CANCELED | DSF_DELETED))) {
return;
}
- ds->ds_is_installed = true;
- dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array);
-
- dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);
-
- if (dk) {
- // If an existing dispatch kevent is found, check to see if new flags
- // need to be added to the existing kevent
- new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
- dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
- free(ds->ds_dkev);
- ds->ds_dkev = dk;
- do_resume = new_flags;
- } else {
- dk = ds->ds_dkev;
- _dispatch_kevent_insert(dk);
- new_flags = dk->dk_kevent.fflags;
- do_resume = true;
+ switch (filter) {
+ case DISPATCH_EVFILT_CUSTOM_ADD:
+ os_atomic_add2o(ds, ds_pending_data, val, relaxed);
+ break;
+ case DISPATCH_EVFILT_CUSTOM_OR:
+ os_atomic_or2o(ds, ds_pending_data, val, relaxed);
+ break;
+ case DISPATCH_EVFILT_CUSTOM_REPLACE:
+ os_atomic_store2o(ds, ds_pending_data, val, relaxed);
+ break;
+ default:
+ DISPATCH_CLIENT_CRASH(filter, "Invalid source type");
}
- TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);
-
- // Re-register the kevent with the kernel if new flags were added
- // by the dispatch kevent
- if (do_resume) {
- dk->dk_kevent.flags |= EV_ADD;
- _dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
- ds->ds_is_armed = true;
- }
+ dx_wakeup(ds, _dispatch_qos_from_pp(pp), DISPATCH_WAKEUP_MAKE_DIRTY);
}
-
void
-_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
+dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
{
- switch (dk->dk_kevent.filter) {
- case DISPATCH_EVFILT_TIMER:
- case DISPATCH_EVFILT_CUSTOM_ADD:
- case DISPATCH_EVFILT_CUSTOM_OR:
- // these types not registered with kevent
- return;
- case EVFILT_MACHPORT:
- _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
- break;
- case EVFILT_PROC:
- if (dk->dk_kevent.flags & EV_ONESHOT) {
- return;
- }
- // fall through
- default:
- _dispatch_update_kq(&dk->dk_kevent);
- if (dk->dk_kevent.flags & EV_DISPATCH) {
- dk->dk_kevent.flags &= ~EV_ADD;
- }
- break;
- }
+ _dispatch_source_merge_data(ds, 0, val);
}
-dispatch_queue_t
-_dispatch_source_invoke(dispatch_source_t ds)
+#pragma mark -
+#pragma mark dispatch_source_handler
+
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_continuation_t
+_dispatch_source_get_handler(dispatch_source_refs_t dr, long kind)
{
- // This function performs all source actions. Each action is responsible
- // for verifying that it takes place on the appropriate queue. If the
- // current queue is not the correct queue for this action, the correct queue
- // will be returned and the invoke will be re-driven on that queue.
+ return os_atomic_load(&dr->ds_handler[kind], relaxed);
+}
+#define _dispatch_source_get_event_handler(dr) \
+ _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
+#define _dispatch_source_get_cancel_handler(dr) \
+ _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
+#define _dispatch_source_get_registration_handler(dr) \
+ _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
- // The order of tests here in invoke and in probe should be consistent.
-
- dispatch_queue_t dq = _dispatch_queue_get_current();
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_continuation_t
+_dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind,
+ bool block)
+{
+ // sources don't propagate priority by default
+ const dispatch_block_flags_t flags =
+ DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
+ dispatch_continuation_t dc = _dispatch_continuation_alloc();
+ if (func) {
+ uintptr_t dc_flags = 0;
- if (!ds->ds_is_installed) {
- // The source needs to be installed on the manager queue.
- if (dq != &_dispatch_mgr_q) {
- return &_dispatch_mgr_q;
- }
- _dispatch_kevent_merge(ds);
- } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
- // The source has been cancelled and needs to be uninstalled from the
- // manager queue. After uninstallation, the cancellation handler needs
- // to be delivered to the target queue.
- if (ds->ds_dkev) {
- if (dq != &_dispatch_mgr_q) {
- return &_dispatch_mgr_q;
- }
- _dispatch_kevent_release(ds);
- return ds->do_targetq;
- } else if (ds->ds_cancel_handler) {
- if (dq != ds->do_targetq) {
- return ds->do_targetq;
- }
- }
- _dispatch_source_cancel_callout(ds);
- } else if (ds->ds_pending_data) {
- // The source has pending data to deliver via the event handler callback
- // on the target queue. Some sources need to be rearmed on the manager
- // queue after event delivery.
- if (dq != ds->do_targetq) {
- return ds->do_targetq;
- }
- _dispatch_source_latch_and_call(ds);
- if (ds->ds_needs_rearm) {
- return &_dispatch_mgr_q;
+ if (kind != DS_EVENT_HANDLER) {
+ dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
}
- } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
- // The source needs to be rearmed on the manager queue.
- if (dq != &_dispatch_mgr_q) {
- return &_dispatch_mgr_q;
+ if (block) {
+#ifdef __BLOCKS__
+ _dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
+#endif /* __BLOCKS__ */
+ } else {
+ dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
+ _dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
+ 0, flags, dc_flags);
}
- _dispatch_kevent_resume(ds->ds_dkev, 0, 0);
- ds->ds_is_armed = true;
+ _dispatch_trace_continuation_push(ds->_as_dq, dc);
+ } else {
+ dc->dc_flags = 0;
+ dc->dc_func = NULL;
}
-
- return NULL;
+ return dc;
}
-bool
-_dispatch_source_probe(dispatch_source_t ds)
+DISPATCH_NOINLINE
+static void
+_dispatch_source_handler_dispose(dispatch_continuation_t dc)
{
- // This function determines whether the source needs to be invoked.
- // The order of tests here in probe and in invoke should be consistent.
-
- if (!ds->ds_is_installed) {
- // The source needs to be installed on the manager queue.
- return true;
- } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
- // The source needs to be uninstalled from the manager queue, or the
- // cancellation handler needs to be delivered to the target queue.
- // Note: cancellation assumes installation.
- if (ds->ds_dkev || ds->ds_cancel_handler) {
- return true;
- }
- } else if (ds->ds_pending_data) {
- // The source has pending data to deliver to the target queue.
- return true;
- } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
- // The source needs to be rearmed on the manager queue.
- return true;
+#ifdef __BLOCKS__
+ if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
+ Block_release(dc->dc_ctxt);
}
- // Nothing to do.
- return false;
+#endif /* __BLOCKS__ */
+ if (dc->dc_voucher) {
+ _voucher_release(dc->dc_voucher);
+ dc->dc_voucher = VOUCHER_INVALID;
+ }
+ _dispatch_continuation_free(dc);
}
-void
-_dispatch_source_dispose(dispatch_source_t ds)
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_continuation_t
+_dispatch_source_handler_take(dispatch_source_t ds, long kind)
{
- _dispatch_queue_dispose((dispatch_queue_t)ds);
+ return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed);
}
-static void
-_dispatch_kevent_debugger2(void *context, dispatch_source_t unused __attribute__((unused)))
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_source_handler_free(dispatch_source_t ds, long kind)
{
- struct sockaddr sa;
- socklen_t sa_len = sizeof(sa);
- int c, fd = (int)(long)context;
- unsigned int i;
- dispatch_kevent_t dk;
- dispatch_source_t ds;
- FILE *debug_stream;
+ dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind);
+ if (dc) _dispatch_source_handler_dispose(dc);
+}
- c = accept(fd, &sa, &sa_len);
- if (c == -1) {
- if (errno != EAGAIN) {
- dispatch_assume_zero(errno);
- }
- return;
- }
-#if 0
- int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
- if (r == -1) {
- dispatch_assume_zero(errno);
- }
-#endif
- debug_stream = fdopen(c, "a");
- if (!dispatch_assume(debug_stream)) {
- close(c);
- return;
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_source_handler_replace(dispatch_source_t ds, long kind,
+ dispatch_continuation_t dc)
+{
+ if (!dc->dc_func) {
+ _dispatch_continuation_free(dc);
+ dc = NULL;
+ } else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
+ dc->dc_ctxt = ds->do_ctxt;
}
+ dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
+ if (dc) _dispatch_source_handler_dispose(dc);
+}
- fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
- fprintf(debug_stream, "Content-type: text/html\r\n");
- fprintf(debug_stream, "Pragma: nocache\r\n");
- fprintf(debug_stream, "\r\n");
- fprintf(debug_stream, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
-
- //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
+DISPATCH_NOINLINE
+static void
+_dispatch_source_set_handler_slow(void *context)
+{
+ dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
+ dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
- for (i = 0; i < DSL_HASH_SIZE; i++) {
- if (TAILQ_EMPTY(&_dispatch_sources[i])) {
- continue;
- }
- TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
- fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
- dk, dk->dk_kevent.ident, _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
- dk->dk_kevent.fflags, dk->dk_kevent.data, dk->dk_kevent.udata);
- fprintf(debug_stream, "\t\t<ul>\n");
- TAILQ_FOREACH(ds, &dk->dk_sources, ds_list) {
- fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
- ds, ds->do_ref_cnt, ds->do_suspend_cnt, ds->ds_pending_data, ds->ds_pending_data_mask,
- ds->ds_atomic_flags);
- if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
- dispatch_queue_t dq = ds->do_targetq;
- fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq, dq->do_ref_cnt, dq->do_suspend_cnt, dq->dq_label);
- }
- }
- fprintf(debug_stream, "\t\t</ul>\n");
- fprintf(debug_stream, "\t</li>\n");
- }
- }
- fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
- fflush(debug_stream);
- fclose(debug_stream);
+ dispatch_continuation_t dc = context;
+ long kind = (long)dc->dc_data;
+ dc->dc_data = NULL;
+ _dispatch_source_handler_replace(ds, kind, dc);
}
+DISPATCH_NOINLINE
static void
-_dispatch_kevent_debugger(void *context __attribute__((unused)))
-{
- union {
- struct sockaddr_in sa_in;
- struct sockaddr sa;
- } sa_u = {
- .sa_in = {
- .sin_family = AF_INET,
- .sin_addr = { htonl(INADDR_LOOPBACK), },
- },
- };
- dispatch_source_t ds;
- const char *valstr;
- int val, r, fd, sock_opt = 1;
- socklen_t slen = sizeof(sa_u);
-
- if (issetugid()) {
- return;
- }
- valstr = getenv("LIBDISPATCH_DEBUGGER");
- if (!valstr) {
- return;
- }
- val = atoi(valstr);
- if (val == 2) {
- sa_u.sa_in.sin_addr.s_addr = 0;
- }
- fd = socket(PF_INET, SOCK_STREAM, 0);
- if (fd == -1) {
- dispatch_assume_zero(errno);
- return;
- }
- r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, (socklen_t) sizeof sock_opt);
- if (r == -1) {
- dispatch_assume_zero(errno);
- goto out_bad;
+_dispatch_source_set_handler(dispatch_source_t ds, long kind,
+ dispatch_continuation_t dc)
+{
+ dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
+ if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) {
+ _dispatch_source_handler_replace(ds, kind, dc);
+ return dx_vtable(ds)->do_resume(ds, false);
}
-#if 0
- r = fcntl(fd, F_SETFL, O_NONBLOCK);
- if (r == -1) {
- dispatch_assume_zero(errno);
- goto out_bad;
+ if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
+ DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
+ "after it has been activated");
}
-#endif
- r = bind(fd, &sa_u.sa, sizeof(sa_u));
- if (r == -1) {
- dispatch_assume_zero(errno);
- goto out_bad;
- }
- r = listen(fd, SOMAXCONN);
- if (r == -1) {
- dispatch_assume_zero(errno);
- goto out_bad;
- }
- r = getsockname(fd, &sa_u.sa, &slen);
- if (r == -1) {
- dispatch_assume_zero(errno);
- goto out_bad;
- }
- ds = dispatch_source_read_create_f(fd, NULL, &_dispatch_mgr_q, (void *)(long)fd, _dispatch_kevent_debugger2);
- if (dispatch_assume(ds)) {
- _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u.sa_in.sin_port));
- return;
+ _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
+ if (kind == DS_REGISTN_HANDLER) {
+ _dispatch_bug_deprecated("Setting registration handler after "
+ "the source has been activated");
}
-out_bad:
- close(fd);
+ dc->dc_data = (void *)kind;
+ _dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc,
+ _dispatch_source_set_handler_slow, 0);
}
+#ifdef __BLOCKS__
void
-_dispatch_source_drain_kevent(struct kevent *ke)
+dispatch_source_set_event_handler(dispatch_source_t ds,
+ dispatch_block_t handler)
{
- static dispatch_once_t pred;
- dispatch_kevent_t dk = ke->udata;
- dispatch_source_t dsi;
-
- dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
+ _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
+}
+#endif /* __BLOCKS__ */
- dispatch_debug_kevents(ke, 1, __func__);
+void
+dispatch_source_set_event_handler_f(dispatch_source_t ds,
+ dispatch_function_t handler)
+{
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
+ _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
+}
- if (ke->filter == EVFILT_MACHPORT) {
- return _dispatch_drain_mach_messages(ke);
- }
- dispatch_assert(dk);
+#ifdef __BLOCKS__
+DISPATCH_NOINLINE
+static void
+_dispatch_source_set_cancel_handler(dispatch_source_t ds,
+ dispatch_block_t handler)
+{
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
+ _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
+}
- if (ke->flags & EV_ONESHOT) {
- dk->dk_kevent.flags |= EV_ONESHOT;
+void
+dispatch_source_set_cancel_handler(dispatch_source_t ds,
+ dispatch_block_t handler)
+{
+ if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
+ DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
+ "this source");
}
+ return _dispatch_source_set_cancel_handler(ds, handler);
+}
- TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
- _dispatch_source_merge_kevent(dsi, ke);
- }
+void
+dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds,
+ dispatch_block_t handler)
+{
+ _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
+ return _dispatch_source_set_cancel_handler(ds, handler);
}
+#endif /* __BLOCKS__ */
+DISPATCH_NOINLINE
static void
-_dispatch_kevent_dispose(dispatch_kevent_t dk)
+_dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
+ dispatch_function_t handler)
{
- uintptr_t key;
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
+ _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
+}
- switch (dk->dk_kevent.filter) {
- case DISPATCH_EVFILT_TIMER:
- case DISPATCH_EVFILT_CUSTOM_ADD:
- case DISPATCH_EVFILT_CUSTOM_OR:
- // these sources live on statically allocated lists
- return;
- case EVFILT_MACHPORT:
- _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
- break;
- case EVFILT_PROC:
- if (dk->dk_kevent.flags & EV_ONESHOT) {
- break; // implicitly deleted
- }
- // fall through
- default:
- if (~dk->dk_kevent.flags & EV_DELETE) {
- dk->dk_kevent.flags |= EV_DELETE;
- _dispatch_update_kq(&dk->dk_kevent);
- }
- break;
+void
+dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
+ dispatch_function_t handler)
+{
+ if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
+ DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
+ "this source");
}
+ return _dispatch_source_set_cancel_handler_f(ds, handler);
+}
- if (dk->dk_kevent.filter == EVFILT_MACHPORT) {
- key = MACH_PORT_INDEX(dk->dk_kevent.ident);
- } else {
- key = dk->dk_kevent.ident;
- }
+void
+dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds,
+ dispatch_function_t handler)
+{
+ _dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
+ return _dispatch_source_set_cancel_handler_f(ds, handler);
+}
- TAILQ_REMOVE(&_dispatch_sources[DSL_HASH(key)], dk, dk_list);
- free(dk);
+#ifdef __BLOCKS__
+void
+dispatch_source_set_registration_handler(dispatch_source_t ds,
+ dispatch_block_t handler)
+{
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
+ _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
}
+#endif /* __BLOCKS__ */
void
-_dispatch_kevent_release(dispatch_source_t ds)
+dispatch_source_set_registration_handler_f(dispatch_source_t ds,
+ dispatch_function_t handler)
{
- dispatch_kevent_t dk = ds->ds_dkev;
- dispatch_source_t dsi;
- uint32_t del_flags, fflags = 0;
+ dispatch_continuation_t dc;
+ dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
+ _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
+}
- ds->ds_dkev = NULL;
+#pragma mark -
+#pragma mark dispatch_source_invoke
- TAILQ_REMOVE(&dk->dk_sources, ds, ds_list);
+bool
+_dispatch_source_will_reenable_kevent_4NW(dispatch_source_t ds)
+{
+ uint64_t dq_state = os_atomic_load2o(ds, dq_state, relaxed);
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
- if (TAILQ_EMPTY(&dk->dk_sources)) {
- _dispatch_kevent_dispose(dk);
- } else {
- TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
- fflags |= (uint32_t)dsi->ds_pending_data_mask;
- }
- del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
- if (del_flags) {
- dk->dk_kevent.flags |= EV_ADD;
- dk->dk_kevent.fflags = fflags;
- _dispatch_kevent_resume(dk, 0, del_flags);
- }
+ if (unlikely(!_dq_state_drain_locked_by_self(dq_state))) {
+ DISPATCH_CLIENT_CRASH(0, "_dispatch_source_will_reenable_kevent_4NW "
+ "not called from within the event handler");
}
- ds->ds_is_armed = false;
- ds->ds_needs_rearm = false; // re-arm is pointless and bad now
- _dispatch_release(ds); // the retain is done at creation time
+ return _dispatch_unote_needs_rearm(ds->ds_refs) && !(dqf & DSF_ARMED);
}
-void
-_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
+static void
+_dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq,
+ dispatch_invoke_flags_t flags)
{
- struct kevent fake;
+ dispatch_continuation_t dc;
- if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
- return;
+ dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER);
+ if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) {
+ // no registration callout if source is canceled rdar://problem/8955246
+ return _dispatch_source_handler_dispose(dc);
}
-
- // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
- // We simulate an exit event in this case. <rdar://problem/5067725>
- if (ke->flags & EV_ERROR) {
- if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
- fake = *ke;
- fake.flags &= ~EV_ERROR;
- fake.fflags = NOTE_EXIT;
- fake.data = 0;
- ke = &fake;
- } else {
- // log the unexpected error
- dispatch_assume_zero(ke->data);
- return;
- }
+ if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
+ dc->dc_ctxt = ds->do_ctxt;
}
+ _dispatch_continuation_pop(dc, NULL, flags, cq);
+}
- if (ds->ds_is_level) {
- // ke->data is signed and "negative available data" makes no sense
- // zero bytes happens when EV_EOF is set
- // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
- dispatch_assert(ke->data >= 0l);
- ds->ds_pending_data = ~ke->data;
- } else if (ds->ds_is_adder) {
- dispatch_atomic_add(&ds->ds_pending_data, ke->data);
- } else {
- dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
- }
+static void
+_dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq,
+ dispatch_invoke_flags_t flags)
+{
+ dispatch_continuation_t dc;
- // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
- if (ds->ds_needs_rearm) {
- ds->ds_is_armed = false;
+ dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER);
+ ds->ds_pending_data = 0;
+ ds->ds_data = 0;
+ _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
+ _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
+ if (!dc) {
+ return;
}
-
- _dispatch_wakeup(ds);
+ if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
+ return _dispatch_source_handler_dispose(dc);
+ }
+ if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
+ dc->dc_ctxt = ds->do_ctxt;
+ }
+ _dispatch_continuation_pop(dc, NULL, flags, cq);
}
-void
-_dispatch_source_latch_and_call(dispatch_source_t ds)
+static void
+_dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq,
+ dispatch_invoke_flags_t flags)
{
- unsigned long prev;
+ dispatch_source_refs_t dr = ds->ds_refs;
+ dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER);
+ uint64_t prev;
- if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
- return;
+ if (dr->du_is_timer && !(dr->du_fflags & DISPATCH_TIMER_AFTER)) {
+ prev = _dispatch_source_timer_data(ds, dr);
+ } else {
+ prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
}
- prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
- if (ds->ds_is_level) {
+ if (dr->du_data_action == DISPATCH_UNOTE_ACTION_DATA_SET) {
ds->ds_data = ~prev;
} else {
ds->ds_data = prev;
}
- if (dispatch_assume(prev)) {
- if (ds->ds_handler_func) {
- ds->ds_handler_func(ds->ds_handler_ctxt, ds);
- }
+ if (!dispatch_assume(prev != 0) || !dc) {
+ return;
+ }
+ _dispatch_continuation_pop(dc, NULL, flags, cq);
+ if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_AFTER)) {
+ _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
+ dispatch_release(ds); // dispatch_after sources are one-shot
}
}
-void
-_dispatch_source_cancel_callout(dispatch_source_t ds)
+DISPATCH_NOINLINE
+static void
+_dispatch_source_refs_finalize_unregistration(dispatch_source_t ds)
{
- ds->ds_pending_data_mask = 0;
- ds->ds_pending_data = 0;
- ds->ds_data = 0;
+ dispatch_queue_flags_t dqf;
+ dispatch_source_refs_t dr = ds->ds_refs;
-#ifdef __BLOCKS__
- if (ds->ds_handler_is_block) {
- Block_release(ds->ds_handler_ctxt);
- ds->ds_handler_is_block = false;
- ds->ds_handler_func = NULL;
- ds->ds_handler_ctxt = NULL;
+ dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq,
+ DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER);
+ if (dqf & DSF_CANCEL_WAITER) {
+ _dispatch_wake_by_address(&ds->dq_atomic_flags);
}
-#endif
+ _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
+ _dispatch_release_tailcall(ds); // the retain is done at creation time
+}
- if (!ds->ds_cancel_handler) {
- return;
- }
- if (ds->ds_cancel_is_block) {
-#ifdef __BLOCKS__
- dispatch_block_t b = ds->ds_cancel_handler;
- if (ds->ds_atomic_flags & DSF_CANCELED) {
- b();
+void
+_dispatch_source_refs_unregister(dispatch_source_t ds, uint32_t options)
+{
+ _dispatch_object_debug(ds, "%s", __func__);
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ dispatch_source_refs_t dr = ds->ds_refs;
+
+ if (dr->du_is_timer) {
+ // Because of the optimization to unregister fired oneshot timers
+ // from the target queue, we can't trust _dispatch_unote_registered()
+ // to tell the truth, it may not have happened yet
+ if (dqf & DSF_ARMED) {
+ _dispatch_timers_unregister(ds->ds_timer_refs);
+ _dispatch_release_2(ds);
}
- Block_release(ds->ds_cancel_handler);
- ds->ds_cancel_is_block = false;
-#endif
+ dr->du_ident = DISPATCH_TIMER_IDENT_CANCELED;
} else {
- dispatch_function_t f = ds->ds_cancel_handler;
- if (ds->ds_atomic_flags & DSF_CANCELED) {
- f(ds->do_ctxt);
+ if (_dispatch_unote_needs_rearm(dr) && !(dqf & DSF_ARMED)) {
+ options |= DU_UNREGISTER_IMMEDIATE_DELETE;
+ }
+ if (!_dispatch_unote_unregister(dr, options)) {
+ _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
+ ds, dr);
+ _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
+ return; // deferred unregistration
}
}
- ds->ds_cancel_handler = NULL;
-}
-
-const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
- .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
- .do_kind = "kevent-source",
- .do_invoke = _dispatch_source_invoke,
- .do_dispose = _dispatch_source_dispose,
- .do_probe = _dispatch_source_probe,
- .do_debug = _dispatch_source_kevent_debug,
-};
-
-void
-dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
-{
- struct kevent kev = {
- .fflags = (typeof(kev.fflags))val,
- .data = val,
- };
- dispatch_assert(ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
- ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
-
- _dispatch_source_merge_kevent(ds, &kev);
+ ds->ds_is_installed = true;
+ _dispatch_source_refs_finalize_unregistration(ds);
}
-size_t
-dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_source_tryarm(dispatch_source_t ds)
{
- dispatch_queue_t target = ds->do_targetq;
- return snprintf(buf, bufsiz,
- "target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
- target ? target->dq_label : "", target,
- ds->ds_pending_data, ds->ds_pending_data_mask);
+ dispatch_queue_flags_t oqf, nqf;
+ return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
+ if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) {
+ // the test is inside the loop because it's convenient but the
+ // result should not change for the duration of the rmw_loop
+ os_atomic_rmw_loop_give_up(break);
+ }
+ nqf = oqf | DSF_ARMED;
+ });
}
-size_t
-_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_source_refs_resume(dispatch_source_t ds)
{
- size_t offset = 0;
- offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dx_kind(ds), ds);
- offset += dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
- offset += dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
- return offset;
-}
-
-#ifndef DISPATCH_NO_LEGACY
-static void
-dispatch_source_attr_dispose(dispatch_source_attr_t attr)
-{
- // release the finalizer block if necessary
- dispatch_source_attr_set_finalizer(attr, NULL);
- _dispatch_dispose(attr);
-}
-
-static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable = {
- .do_type = DISPATCH_SOURCE_ATTR_TYPE,
- .do_kind = "source-attr",
- .do_dispose = dispatch_source_attr_dispose,
-};
-
-dispatch_source_attr_t
-dispatch_source_attr_create(void)
-{
- dispatch_source_attr_t rval = calloc(1, sizeof(struct dispatch_source_attr_s));
-
- if (rval) {
- rval->do_vtable = &dispatch_source_attr_vtable;
- rval->do_next = DISPATCH_OBJECT_LISTLESS;
- rval->do_targetq = dispatch_get_global_queue(0, 0);
- rval->do_ref_cnt = 1;
- rval->do_xref_cnt = 1;
+ dispatch_source_refs_t dr = ds->ds_refs;
+ if (dr->du_is_timer) {
+ _dispatch_timers_update(dr, 0);
+ return true;
}
-
- return rval;
+ if (unlikely(!_dispatch_source_tryarm(ds))) {
+ return false;
+ }
+ _dispatch_unote_resume(dr);
+ _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dr);
+ return true;
}
void
-dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr,
- void *context, dispatch_source_finalizer_function_t finalizer)
-{
-#ifdef __BLOCKS__
- if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
- Block_release(attr->finalizer_ctxt);
+_dispatch_source_refs_register(dispatch_source_t ds, dispatch_wlh_t wlh,
+ dispatch_priority_t pri)
+{
+ dispatch_source_refs_t dr = ds->ds_refs;
+ dispatch_priority_t kbp;
+
+ dispatch_assert(!ds->ds_is_installed);
+
+ if (dr->du_is_timer) {
+ dispatch_queue_t dq = ds->_as_dq;
+ kbp = _dispatch_queue_compute_priority_and_wlh(dq, NULL);
+ // aggressively coalesce background/maintenance QoS timers
+ // <rdar://problem/12200216&27342536>
+ if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp))) {
+ if (dr->du_fflags & DISPATCH_TIMER_STRICT) {
+ _dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer, ds);
+ } else {
+ dr->du_fflags |= DISPATCH_TIMER_BACKGROUND;
+ dr->du_ident = _dispatch_source_timer_idx(dr);
+ }
+ }
+ _dispatch_timers_update(dr, 0);
+ return;
}
-#endif
- attr->finalizer_ctxt = context;
- attr->finalizer_func = finalizer;
+ if (unlikely(!_dispatch_source_tryarm(ds) ||
+ !_dispatch_unote_register(dr, wlh, pri))) {
+ // Do the parts of dispatch_source_refs_unregister() that
+ // are required after this partial initialization.
+ _dispatch_source_refs_finalize_unregistration(ds);
+ } else {
+ _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, dr);
+ }
+ _dispatch_object_debug(ds, "%s", __func__);
}
-#ifdef __BLOCKS__
-long
-dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr,
- dispatch_source_finalizer_t finalizer)
+static void
+_dispatch_source_set_event_handler_context(void *ctxt)
{
- void *ctxt;
- dispatch_source_finalizer_function_t func;
+ dispatch_source_t ds = ctxt;
+ dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);
- if (finalizer) {
- if (!(ctxt = Block_copy(finalizer))) {
- return 1;
- }
- func = (void *)_dispatch_call_block_and_release2;
- } else {
- ctxt = NULL;
- func = NULL;
+ if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) {
+ dc->dc_ctxt = ds->do_ctxt;
}
-
- dispatch_source_attr_set_finalizer_f(attr, ctxt, func);
-
- return 0;
}
-dispatch_source_finalizer_t
-dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr)
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_source_install(dispatch_source_t ds, dispatch_wlh_t wlh,
+ dispatch_priority_t pri)
{
- if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
- return (dispatch_source_finalizer_t)attr->finalizer_ctxt;
- } else if (attr->finalizer_func == NULL) {
- return NULL;
- } else {
- abort(); // finalizer is not a block...
- }
+ _dispatch_source_refs_register(ds, wlh, pri);
+ ds->ds_is_installed = true;
}
-#endif
void
-dispatch_source_attr_set_context(dispatch_source_attr_t attr, void *context)
+_dispatch_source_finalize_activation(dispatch_source_t ds, bool *allow_resume)
{
- attr->context = context;
-}
+ dispatch_continuation_t dc;
+ dispatch_source_refs_t dr = ds->ds_refs;
+ dispatch_priority_t pri;
+ dispatch_wlh_t wlh;
-dispatch_source_attr_t
-dispatch_source_attr_copy(dispatch_source_attr_t proto)
-{
- dispatch_source_attr_t rval = NULL;
+ if (unlikely(dr->du_is_direct &&
+ (_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) {
+ return _dispatch_source_refs_unregister(ds, 0);
+ }
- if (proto && (rval = malloc(sizeof(struct dispatch_source_attr_s)))) {
- memcpy(rval, proto, sizeof(struct dispatch_source_attr_s));
-#ifdef __BLOCKS__
- if (rval->finalizer_func == (void*)_dispatch_call_block_and_release2) {
- rval->finalizer_ctxt = Block_copy(rval->finalizer_ctxt);
+ dc = _dispatch_source_get_event_handler(dr);
+ if (dc) {
+ if (_dispatch_object_is_barrier(dc)) {
+ _dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT);
+ }
+ ds->dq_priority = _dispatch_priority_from_pp_strip_flags(dc->dc_priority);
+ if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
+ _dispatch_barrier_async_detached_f(ds->_as_dq, ds,
+ _dispatch_source_set_event_handler_context);
}
-#endif
- } else if (!proto) {
- rval = dispatch_source_attr_create();
}
- return rval;
-}
-#endif /* DISPATCH_NO_LEGACY */
+ // call "super"
+ _dispatch_queue_finalize_activation(ds->_as_dq, allow_resume);
-struct dispatch_source_type_s {
- struct kevent ke;
- uint64_t mask;
-};
-
-const struct dispatch_source_type_s _dispatch_source_type_timer = {
- .ke = {
- .filter = DISPATCH_EVFILT_TIMER,
- },
- .mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
-};
+ if (dr->du_is_direct && !ds->ds_is_installed) {
+ dispatch_queue_t dq = ds->_as_dq;
+ pri = _dispatch_queue_compute_priority_and_wlh(dq, &wlh);
+ if (pri) _dispatch_source_install(ds, wlh, pri);
+ }
+}
-const struct dispatch_source_type_s _dispatch_source_type_read = {
- .ke = {
- .filter = EVFILT_READ,
- .flags = EV_DISPATCH,
- },
-};
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_queue_wakeup_target_t
+_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags, uint64_t *owned)
+{
+ dispatch_source_t ds = dou._ds;
+ dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
+ dispatch_queue_t dq = _dispatch_queue_get_current();
+ dispatch_source_refs_t dr = ds->ds_refs;
+ dispatch_queue_flags_t dqf;
+
+ if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN) &&
+ _dispatch_unote_wlh_changed(dr, _dispatch_get_wlh())) {
+ dqf = _dispatch_queue_atomic_flags_set_orig(ds->_as_dq,
+ DSF_WLH_CHANGED);
+ if (!(dqf & DSF_WLH_CHANGED)) {
+ _dispatch_bug_deprecated("Changing target queue "
+ "hierarchy after source was activated");
+ }
+ }
-const struct dispatch_source_type_s _dispatch_source_type_write = {
- .ke = {
- .filter = EVFILT_WRITE,
- .flags = EV_DISPATCH,
- },
-};
+ if (_dispatch_queue_class_probe(ds)) {
+ // Intentionally always drain even when on the manager queue
+ // and not the source's regular target queue: we need to be able
+ // to drain timer setting and the like there.
+ dispatch_with_disabled_narrowing(dic, {
+ retq = _dispatch_queue_serial_drain(ds->_as_dq, dic, flags, owned);
+ });
+ }
-const struct dispatch_source_type_s _dispatch_source_type_proc = {
- .ke = {
- .filter = EVFILT_PROC,
- .flags = EV_CLEAR,
- },
- .mask = NOTE_EXIT|NOTE_FORK|NOTE_EXEC|NOTE_SIGNAL|NOTE_REAP,
-};
+ // This function performs all source actions. Each action is responsible
+ // for verifying that it takes place on the appropriate queue. If the
+ // current queue is not the correct queue for this action, the correct queue
+ // will be returned and the invoke will be re-driven on that queue.
-const struct dispatch_source_type_s _dispatch_source_type_signal = {
- .ke = {
- .filter = EVFILT_SIGNAL,
- },
-};
+ // The order of tests here in invoke and in wakeup should be consistent.
-const struct dispatch_source_type_s _dispatch_source_type_vnode = {
- .ke = {
- .filter = EVFILT_VNODE,
- .flags = EV_CLEAR,
- },
- .mask = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND|NOTE_ATTRIB|NOTE_LINK|NOTE_RENAME|NOTE_REVOKE|NOTE_NONE,
-};
+ dispatch_queue_t dkq = &_dispatch_mgr_q;
+ bool prevent_starvation = false;
-const struct dispatch_source_type_s _dispatch_source_type_vfs = {
- .ke = {
- .filter = EVFILT_FS,
- .flags = EV_CLEAR,
- },
- .mask = VQ_NOTRESP|VQ_NEEDAUTH|VQ_LOWDISK|VQ_MOUNT|VQ_UNMOUNT|VQ_DEAD|VQ_ASSIST|VQ_NOTRESPLOCK|VQ_UPDATE|VQ_VERYLOWDISK,
-};
+ if (dr->du_is_direct) {
+ dkq = ds->do_targetq;
+ }
-const struct dispatch_source_type_s _dispatch_source_type_mach_send = {
- .ke = {
- .filter = EVFILT_MACHPORT,
- .flags = EV_DISPATCH,
- .fflags = DISPATCH_MACHPORT_DEAD,
- },
- .mask = DISPATCH_MACH_SEND_DEAD,
-};
+ if (dr->du_is_timer &&
+ os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
+ // timer has to be configured on the kevent queue
+ if (dq != dkq) {
+ return dkq;
+ }
+ _dispatch_source_timer_configure(ds);
+ }
+ }
-const struct dispatch_source_type_s _dispatch_source_type_mach_recv = {
- .ke = {
- .filter = EVFILT_MACHPORT,
- .flags = EV_DISPATCH,
- .fflags = DISPATCH_MACHPORT_RECV,
- },
-};
+ if (!ds->ds_is_installed) {
+ // The source needs to be installed on the kevent queue.
+ if (dq != dkq) {
+ return dkq;
+ }
+ _dispatch_source_install(ds, _dispatch_get_wlh(),
+ _dispatch_get_basepri());
+ }
-const struct dispatch_source_type_s _dispatch_source_type_data_add = {
- .ke = {
- .filter = DISPATCH_EVFILT_CUSTOM_ADD,
- },
-};
+ if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
+ // Source suspended by an item drained from the source queue.
+ return ds->do_targetq;
+ }
-const struct dispatch_source_type_s _dispatch_source_type_data_or = {
- .ke = {
- .filter = DISPATCH_EVFILT_CUSTOM_OR,
- .flags = EV_CLEAR,
- .fflags = ~0,
- },
-};
+ if (_dispatch_source_get_registration_handler(dr)) {
+ // The source has been registered and the registration handler needs
+ // to be delivered on the target queue.
+ if (dq != ds->do_targetq) {
+ return ds->do_targetq;
+ }
+ // clears ds_registration_handler
+ _dispatch_source_registration_callout(ds, dq, flags);
+ }
-dispatch_source_t
-dispatch_source_create(dispatch_source_type_t type,
- uintptr_t handle,
- unsigned long mask,
- dispatch_queue_t q)
-{
- const struct kevent *proto_kev = &type->ke;
- dispatch_source_t ds = NULL;
- dispatch_kevent_t dk = NULL;
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if ((dqf & DSF_DEFERRED_DELETE) && !(dqf & DSF_ARMED)) {
+unregister_event:
+ // DSF_DELETE: Pending source kevent unregistration has been completed
+ // !DSF_ARMED: event was delivered and can safely be unregistered
+ if (dq != dkq) {
+ return dkq;
+ }
+ _dispatch_source_refs_unregister(ds, DU_UNREGISTER_IMMEDIATE_DELETE);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ }
- // input validation
- if (type == NULL || (mask & ~type->mask)) {
- goto out_bad;
+ if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
+ os_atomic_load2o(ds, ds_pending_data, relaxed)) {
+ // The source has pending data to deliver via the event handler callback
+ // on the target queue. Some sources need to be rearmed on the kevent
+ // queue after event delivery.
+ if (dq == ds->do_targetq) {
+ _dispatch_source_latch_and_call(ds, dq, flags);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+
+ // starvation avoidance: if the source triggers itself then force a
+ // re-queue to give other things already queued on the target queue
+ // a chance to run.
+ //
+ // however, if the source is directly targeting an overcommit root
+ // queue, this would requeue the source and ask for a new overcommit
+ // thread right away.
+ prevent_starvation = dq->do_targetq ||
+ !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
+ if (prevent_starvation &&
+ os_atomic_load2o(ds, ds_pending_data, relaxed)) {
+ retq = ds->do_targetq;
+ }
+ } else {
+ // there is no point trying to be eager, the next thing to do is
+ // to deliver the event
+ return ds->do_targetq;
+ }
}
- switch (type->ke.filter) {
- case EVFILT_SIGNAL:
- if (handle >= NSIG) {
- goto out_bad;
+ if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
+ // The source has been cancelled and needs to be uninstalled from the
+ // kevent queue. After uninstallation, the cancellation handler needs
+ // to be delivered to the target queue.
+ if (!(dqf & DSF_DELETED)) {
+ if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
+ // timers can cheat if not armed because there's nothing left
+ // to do on the manager queue and unregistration can happen
+ // on the regular target queue
+ } else if (dq != dkq) {
+ return dkq;
+ }
+ _dispatch_source_refs_unregister(ds, 0);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
+ if (!(dqf & DSF_ARMED)) {
+ goto unregister_event;
+ }
+ // we need to wait for the EV_DELETE
+ return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
+ }
}
- break;
- case EVFILT_FS:
- case DISPATCH_EVFILT_CUSTOM_ADD:
- case DISPATCH_EVFILT_CUSTOM_OR:
- case DISPATCH_EVFILT_TIMER:
- if (handle) {
- goto out_bad;
+ if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
+ _dispatch_source_get_cancel_handler(dr) ||
+ _dispatch_source_get_registration_handler(dr))) {
+ retq = ds->do_targetq;
+ } else {
+ _dispatch_source_cancel_callout(ds, dq, flags);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
}
- break;
- default:
- break;
- }
-
- ds = calloc(1ul, sizeof(struct dispatch_source_s));
- if (slowpath(!ds)) {
- goto out_bad;
+ prevent_starvation = false;
}
- dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
- if (slowpath(!dk)) {
- goto out_bad;
- }
-
- dk->dk_kevent = *proto_kev;
- dk->dk_kevent.ident = handle;
- dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
- dk->dk_kevent.fflags |= (uint32_t)mask;
- dk->dk_kevent.udata = dk;
- TAILQ_INIT(&dk->dk_sources);
-
- // Initialize as a queue first, then override some settings below.
- _dispatch_queue_init((dispatch_queue_t)ds);
- strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
-
- // Dispatch Object
- ds->do_vtable = &_dispatch_source_kevent_vtable;
- ds->do_ref_cnt++; // the reference the manger queue holds
- ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
- // do_targetq will be retained below, past point of no-return
- ds->do_targetq = q;
-
- // Dispatch Source
- ds->ds_ident_hack = dk->dk_kevent.ident;
- ds->ds_dkev = dk;
- ds->ds_pending_data_mask = dk->dk_kevent.fflags;
- if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
- if (proto_kev->filter != EVFILT_MACHPORT) {
- ds->ds_is_level = true;
- }
- ds->ds_needs_rearm = true;
- } else if (!(EV_CLEAR & proto_kev->flags)) {
- // we cheat and use EV_CLEAR to mean a "flag thingy"
- ds->ds_is_adder = true;
- }
-
- // If its a timer source, it needs to be re-armed
- if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
- ds->ds_needs_rearm = true;
- }
-
- dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
-#if DISPATCH_DEBUG
- dispatch_debug(ds, __FUNCTION__);
-#endif
- // Some sources require special processing
- if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
- static dispatch_once_t pred;
- dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
- } else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
- ds->ds_timer.flags = mask;
+ if (_dispatch_unote_needs_rearm(dr) &&
+ !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
+ // The source needs to be rearmed on the kevent queue.
+ if (dq != dkq) {
+ return dkq;
+ }
+ if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
+ // no need for resume when we can directly unregister the kevent
+ goto unregister_event;
+ }
+ if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
+ // do not try to rearm the kevent if the source is suspended
+ // from the source handler
+ return ds->do_targetq;
+ }
+ if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
+ // keep the old behavior to force re-enqueue to our target queue
+ // for the rearm.
+ //
+ // if the handler didn't run, or this is a pending delete
+ // or our target queue is a global queue, then starvation is
+ // not a concern and we can rearm right away.
+ return ds->do_targetq;
+ }
+ if (unlikely(!_dispatch_source_refs_resume(ds))) {
+ goto unregister_event;
+ }
+ if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
+ // try to redrive the drain from under the lock for sources
+ // targeting an overcommit root queue to avoid parking
+ // when the next event has already fired
+ _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
+ }
}
- _dispatch_retain(ds->do_targetq);
- return ds;
-
-out_bad:
- free(ds);
- free(dk);
- return NULL;
+ return retq;
}
-// 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
-static void
-_dispatch_source_set_event_handler2(void *context)
+DISPATCH_NOINLINE
+void
+_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
+ dispatch_invoke_flags_t flags)
{
- struct Block_layout *bl = context;
-
- dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
- dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
-
- if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
- Block_release(ds->ds_handler_ctxt);
- }
- ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
- ds->ds_handler_ctxt = bl;
- ds->ds_handler_is_block = true;
+ _dispatch_queue_class_invoke(ds, dic, flags,
+ DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
}
void
-dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
+_dispatch_source_wakeup(dispatch_source_t ds, dispatch_qos_t qos,
+ dispatch_wakeup_flags_t flags)
{
- dispatch_assert(!ds->ds_is_legacy);
- handler = _dispatch_Block_copy(handler);
- dispatch_barrier_async_f((dispatch_queue_t)ds,
- handler, _dispatch_source_set_event_handler2);
-}
+ // This function determines whether the source needs to be invoked.
+ // The order of tests here in wakeup and in invoke should be consistent.
+
+ dispatch_source_refs_t dr = ds->ds_refs;
+ dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
+ dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
+ dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ bool deferred_delete = (dqf & DSF_DEFERRED_DELETE);
+
+ if (dr->du_is_direct) {
+ dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ }
+
+ if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && dr->du_is_timer &&
+ os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
+ // timer has to be configured on the kevent queue
+ tq = dkq;
+ } else if (!ds->ds_is_installed) {
+ // The source needs to be installed on the kevent queue.
+ tq = dkq;
+ } else if (_dispatch_source_get_registration_handler(dr)) {
+ // The registration handler needs to be delivered to the target queue.
+ tq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ } else if (deferred_delete && !(dqf & DSF_ARMED)) {
+ // Pending source kevent unregistration has been completed
+ // or EV_ONESHOT event can be acknowledged
+ tq = dkq;
+ } else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
+ os_atomic_load2o(ds, ds_pending_data, relaxed)) {
+ // The source has pending data to deliver to the target queue.
+ tq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ } else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) {
+ // The source needs to be uninstalled from the kevent queue, or the
+ // cancellation handler needs to be delivered to the target queue.
+ // Note: cancellation assumes installation.
+ if (!(dqf & DSF_DELETED)) {
+ if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
+ // timers can cheat if not armed because there's nothing left
+ // to do on the manager queue and unregistration can happen
+ // on the regular target queue
+ tq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ } else {
+ tq = dkq;
+ }
+ } else if (_dispatch_source_get_event_handler(dr) ||
+ _dispatch_source_get_cancel_handler(dr) ||
+ _dispatch_source_get_registration_handler(dr)) {
+ tq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ }
+ } else if (_dispatch_unote_needs_rearm(dr) &&
+ !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
+ // The source needs to be rearmed on the kevent queue.
+ tq = dkq;
+ }
+ if (!tq && _dispatch_queue_class_probe(ds)) {
+ tq = DISPATCH_QUEUE_WAKEUP_TARGET;
+ }
-static void
-_dispatch_source_set_event_handler_f(void *context)
-{
- dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
- dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
-
- if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
- Block_release(ds->ds_handler_ctxt);
+ if ((tq == DISPATCH_QUEUE_WAKEUP_TARGET) &&
+ ds->do_targetq == &_dispatch_mgr_q) {
+ tq = DISPATCH_QUEUE_WAKEUP_MGR;
}
- ds->ds_handler_func = context;
- ds->ds_handler_ctxt = ds->do_ctxt;
- ds->ds_handler_is_block = false;
+
+ return _dispatch_queue_class_wakeup(ds->_as_dq, qos, flags, tq);
}
void
-dispatch_source_set_event_handler_f(dispatch_source_t ds,
- dispatch_function_t handler)
+dispatch_source_cancel(dispatch_source_t ds)
{
- dispatch_assert(!ds->ds_is_legacy);
- dispatch_barrier_async_f((dispatch_queue_t)ds,
- handler, _dispatch_source_set_event_handler_f);
-}
+ _dispatch_object_debug(ds, "%s", __func__);
+ // Right after we set the cancel flag, someone else
+ // could potentially invoke the source, do the cancellation,
+ // unregister the source, and deallocate it. We would
+ // need to therefore retain/release before setting the bit
+ _dispatch_retain_2(ds);
-// 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
-static void
-_dispatch_source_set_cancel_handler2(void *context)
-{
- dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
- dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
-
- if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
- Block_release(ds->ds_cancel_handler);
+ dispatch_queue_t q = ds->_as_dq;
+ if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) {
+ _dispatch_release_2_tailcall(ds);
+ } else {
+ dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
}
- ds->ds_cancel_handler = context;
- ds->ds_cancel_is_block = true;
}
void
-dispatch_source_set_cancel_handler(dispatch_source_t ds,
- dispatch_block_t handler)
+dispatch_source_cancel_and_wait(dispatch_source_t ds)
{
- dispatch_assert(!ds->ds_is_legacy);
- handler = _dispatch_Block_copy(handler);
- dispatch_barrier_async_f((dispatch_queue_t)ds,
- handler, _dispatch_source_set_cancel_handler2);
-}
+ dispatch_queue_flags_t old_dqf, dqf, new_dqf;
+ dispatch_source_refs_t dr = ds->ds_refs;
-static void
-_dispatch_source_set_cancel_handler_f(void *context)
-{
- dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
- dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
-
- if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
- Block_release(ds->ds_cancel_handler);
+ if (unlikely(_dispatch_source_get_cancel_handler(dr))) {
+ DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler");
}
- ds->ds_cancel_handler = context;
- ds->ds_cancel_is_block = false;
-}
-void
-dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
- dispatch_function_t handler)
-{
- dispatch_assert(!ds->ds_is_legacy);
- dispatch_barrier_async_f((dispatch_queue_t)ds,
- handler, _dispatch_source_set_cancel_handler_f);
-}
+ _dispatch_object_debug(ds, "%s", __func__);
+ os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
+ new_dqf = old_dqf | DSF_CANCELED;
+ if (old_dqf & DSF_CANCEL_WAITER) {
+ os_atomic_rmw_loop_give_up(break);
+ }
+ if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
+ // just add DSF_CANCELED
+ } else if ((old_dqf & DSF_DEFERRED_DELETE) || !dr->du_is_direct) {
+ new_dqf |= DSF_CANCEL_WAITER;
+ }
+ });
+ dqf = new_dqf;
-#ifndef DISPATCH_NO_LEGACY
-// 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
-dispatch_source_t
-_dispatch_source_create2(dispatch_source_t ds,
- dispatch_source_attr_t attr,
- void *context,
- dispatch_source_handler_function_t handler)
-{
- if (ds == NULL || handler == NULL) {
- return NULL;
+ if (old_dqf & DQF_RELEASED) {
+ DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release");
+ }
+ if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
+ return;
+ }
+ if (dqf & DSF_CANCEL_WAITER) {
+ goto wakeup;
}
- ds->ds_is_legacy = true;
+ // simplified version of _dispatch_queue_drain_try_lock
+ // that also sets the DIRTY bit on failure to lock
+ uint64_t set_owner_and_set_full_width = _dispatch_lock_value_for_self() |
+ DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
+ uint64_t old_state, new_state;
- ds->ds_handler_func = handler;
- ds->ds_handler_ctxt = context;
-
- if (attr && attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
- ds->dq_finalizer_ctxt = attr->finalizer_ctxt;
- ds->dq_finalizer_func = (typeof(ds->dq_finalizer_func))attr->finalizer_func;
- ds->do_ctxt = attr->context;
- }
-#ifdef __BLOCKS__
- if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
- ds->dq_finalizer_ctxt = Block_copy(ds->dq_finalizer_ctxt);
- if (!ds->dq_finalizer_ctxt) {
- goto out_bad;
+ os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, {
+ new_state = old_state;
+ if (likely(_dq_state_is_runnable(old_state) &&
+ !_dq_state_drain_locked(old_state))) {
+ new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
+ new_state |= set_owner_and_set_full_width;
+ } else if (old_dqf & DSF_CANCELED) {
+ os_atomic_rmw_loop_give_up(break);
+ } else {
+ // this case needs a release barrier, hence the seq_cst above
+ new_state |= DISPATCH_QUEUE_DIRTY;
}
- }
- if (handler == _dispatch_source_call_block) {
- struct Block_layout *bl = ds->ds_handler_ctxt = Block_copy(context);
- if (!ds->ds_handler_ctxt) {
- if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
- Block_release(ds->dq_finalizer_ctxt);
+ });
+
+ if (unlikely(_dq_state_is_suspended(old_state))) {
+ if (unlikely(_dq_state_suspend_cnt(old_state))) {
+ DISPATCH_CLIENT_CRASH(ds, "Source is suspended");
+ }
+ // inactive sources have never been registered and there is no need
+ // to wait here because activation will notice and mark the source
+ // as deleted without ever trying to use the fd or mach port.
+ return dispatch_activate(ds);
+ }
+
+ if (likely(_dq_state_is_runnable(old_state) &&
+ !_dq_state_drain_locked(old_state))) {
+ // same thing _dispatch_source_invoke2() does when handling cancellation
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) {
+ _dispatch_source_refs_unregister(ds, 0);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) {
+ _dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE);
}
- goto out_bad;
- }
- ds->ds_handler_func = (void *)bl->invoke;
- ds->ds_handler_is_block = true;
- }
-
- // all legacy sources get a cancellation event on the normal event handler.
- dispatch_source_handler_function_t func = ds->ds_handler_func;
- dispatch_source_handler_t block = ds->ds_handler_ctxt;
- void *ctxt = ds->ds_handler_ctxt;
- bool handler_is_block = ds->ds_handler_is_block;
-
- ds->ds_cancel_is_block = true;
- if (handler_is_block) {
- ds->ds_cancel_handler = _dispatch_Block_copy(^{
- block(ds);
- });
+ }
+ dx_wakeup(ds, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
+ } else if (unlikely(_dq_state_drain_locked_by_self(old_state))) {
+ DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait "
+ "called from a source handler");
} else {
- ds->ds_cancel_handler = _dispatch_Block_copy(^{
- func(ctxt, ds);
- });
- }
-#endif
- if (attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
- dispatch_resume(ds);
+ dispatch_qos_t qos;
+wakeup:
+ qos = _dispatch_qos_from_pp(_dispatch_get_priority());
+ dx_wakeup(ds, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
+ dispatch_activate(ds);
+ }
+
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
+ if (unlikely(!(dqf & DSF_CANCEL_WAITER))) {
+ if (!os_atomic_cmpxchgv2o(ds, dq_atomic_flags,
+ dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) {
+ continue;
+ }
+ dqf |= DSF_CANCEL_WAITER;
+ }
+ _dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE);
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
}
-
- return ds;
-
-out_bad:
- free(ds);
- return NULL;
}
-long
-dispatch_source_get_error(dispatch_source_t ds, long *err_out)
-{
- // 6863892 don't report ECANCELED until kevent is unregistered
- if ((ds->ds_atomic_flags & DSF_CANCELED) && !ds->ds_dkev) {
- if (err_out) {
- *err_out = ECANCELED;
+void
+_dispatch_source_merge_evt(dispatch_unote_t du, uint32_t flags, uintptr_t data,
+ uintptr_t status, pthread_priority_t pp)
+{
+ dispatch_source_refs_t dr = du._dr;
+ dispatch_source_t ds = _dispatch_source_from_refs(dr);
+ dispatch_wakeup_flags_t wflags = 0;
+ dispatch_queue_flags_t dqf;
+
+ if (_dispatch_unote_needs_rearm(dr) || (flags & (EV_DELETE | EV_ONESHOT))) {
+ // once we modify the queue atomic flags below, it will allow concurrent
+ // threads running _dispatch_source_invoke2 to dispose of the source,
+ // so we can't safely borrow the reference we get from the muxnote udata
+ // anymore, and need our own
+ wflags = DISPATCH_WAKEUP_CONSUME_2;
+ _dispatch_retain_2(ds); // rdar://20382435
+ }
+
+ if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
+ !(flags & EV_DELETE)) {
+ dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
+ DSF_DEFERRED_DELETE, DSF_ARMED);
+ if (flags & EV_VANISHED) {
+ _dispatch_bug_kevent_client("kevent", dr->du_type->dst_kind,
+ "monitored resource vanished before the source "
+ "cancel handler was invoked", 0);
}
- return DISPATCH_ERROR_DOMAIN_POSIX;
+ _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds,
+ (flags & EV_VANISHED) ? "vanished" :
+ "deferred delete oneshot", dr);
+ } else if (flags & (EV_DELETE | EV_ONESHOT)) {
+ _dispatch_source_refs_unregister(ds, DU_UNREGISTER_ALREADY_DELETED);
+ _dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds, dr);
+ if (flags & EV_DELETE) goto done;
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ } else if (_dispatch_unote_needs_rearm(dr)) {
+ dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
+ _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
} else {
- return DISPATCH_ERROR_DOMAIN_NO_ERROR;
+ dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
+ }
+
+ if (dqf & (DSF_CANCELED | DQF_RELEASED)) {
+ goto done; // rdar://20204025
+ }
+
+ dispatch_unote_action_t action = dr->du_data_action;
+ if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
+ (flags & EV_VANISHED)) {
+ // if the resource behind the ident vanished, the event handler can't
+ // do anything useful anymore, so do not try to call it at all
+ //
+ // Note: if the kernel doesn't support EV_VANISHED we always get it
+ // back unchanged from the flags passed at EV_ADD (registration) time
+ // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
+ // if we get both bits it was a real EV_VANISHED delivery
+ os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
+#if HAVE_MACH
+ } else if (dr->du_filter == EVFILT_MACHPORT) {
+ os_atomic_store2o(ds, ds_pending_data, data, relaxed);
+#endif
+ } else if (action == DISPATCH_UNOTE_ACTION_DATA_SET) {
+ os_atomic_store2o(ds, ds_pending_data, data, relaxed);
+ } else if (action == DISPATCH_UNOTE_ACTION_DATA_ADD) {
+ os_atomic_add2o(ds, ds_pending_data, data, relaxed);
+ } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR) {
+ os_atomic_or2o(ds, ds_pending_data, data, relaxed);
+ } else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
+ // We combine the data and status into a single 64-bit value.
+ uint64_t odata, ndata;
+ uint64_t value = DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data, status);
+ os_atomic_rmw_loop2o(ds, ds_pending_data, odata, ndata, relaxed, {
+ ndata = DISPATCH_SOURCE_GET_DATA(odata) | value;
+ });
+ } else if (data) {
+ DISPATCH_INTERNAL_CRASH(action, "Unexpected source action value");
}
+ _dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds, dr);
+
+done:
+ _dispatch_object_debug(ds, "%s", __func__);
+ dx_wakeup(ds, _dispatch_qos_from_pp(pp), wflags | DISPATCH_WAKEUP_MAKE_DIRTY);
}
-#endif /* DISPATCH_NO_LEGACY */
-// Updates the ordered list of timers based on next fire date for changes to ds.
-// Should only be called from the context of _dispatch_mgr_q.
-void
-_dispatch_timer_list_update(dispatch_source_t ds)
+#pragma mark -
+#pragma mark dispatch_source_timer
+
+#if DISPATCH_USE_DTRACE
+static dispatch_timer_source_refs_t
+ _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
+#define _dispatch_trace_next_timer_set(x, q) \
+ _dispatch_trace_next_timer[(q)] = (x)
+#define _dispatch_trace_next_timer_program(d, q) \
+ _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_mgr_trace_timers_wakes(void)
{
- dispatch_source_t dsi = NULL;
- int idx;
-
- dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
+ uint32_t qos;
- // do not reschedule timers unregistered with _dispatch_kevent_release()
- if (!ds->ds_dkev) {
- return;
+ if (_dispatch_timers_will_wake) {
+ if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
+ for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
+ if (_dispatch_timers_will_wake & (1 << qos)) {
+ _dispatch_trace_timer_wake(_dispatch_trace_next_timer[qos]);
+ }
+ }
+ }
+ _dispatch_timers_will_wake = 0;
}
+}
+#else
+#define _dispatch_trace_next_timer_set(x, q)
+#define _dispatch_trace_next_timer_program(d, q)
+#define _dispatch_mgr_trace_timers_wakes()
+#endif
- // Ensure the source is on the global kevent lists before it is removed and
- // readded below.
- _dispatch_kevent_merge(ds);
-
- TAILQ_REMOVE(&ds->ds_dkev->dk_sources, ds, ds_list);
+#define _dispatch_source_timer_telemetry_enabled() false
- // change the list if the clock type has changed
- if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
- idx = DISPATCH_TIMER_INDEX_WALL;
- } else {
- idx = DISPATCH_TIMER_INDEX_MACH;
+DISPATCH_NOINLINE
+static void
+_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
+ dispatch_clock_t clock, struct dispatch_timer_source_s *values)
+{
+ if (_dispatch_trace_timer_configure_enabled()) {
+ _dispatch_trace_timer_configure(ds, clock, values);
}
- ds->ds_dkev = &_dispatch_kevent_timer[idx];
+}
- if (ds->ds_timer.target) {
- TAILQ_FOREACH(dsi, &ds->ds_dkev->dk_sources, ds_list) {
- if (dsi->ds_timer.target == 0 || ds->ds_timer.target < dsi->ds_timer.target) {
- break;
- }
- }
- }
-
- if (dsi) {
- TAILQ_INSERT_BEFORE(dsi, ds, ds_list);
- } else {
- TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds, ds_list);
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock,
+ struct dispatch_timer_source_s *values)
+{
+ if (_dispatch_trace_timer_configure_enabled() ||
+ _dispatch_source_timer_telemetry_enabled()) {
+ _dispatch_source_timer_telemetry_slow(ds, clock, values);
+ asm(""); // prevent tailcall
}
}
+DISPATCH_NOINLINE
static void
-_dispatch_run_timers2(unsigned int timer)
+_dispatch_source_timer_configure(dispatch_source_t ds)
{
- dispatch_source_t ds;
- uint64_t now, missed;
+ dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
+ dispatch_timer_config_t dtc;
- if (timer == DISPATCH_TIMER_INDEX_MACH) {
- now = mach_absolute_time();
+ dtc = os_atomic_xchg2o(dt, dt_pending_config, NULL, dependency);
+ if (dtc->dtc_clock == DISPATCH_CLOCK_MACH) {
+ dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
} else {
- now = _dispatch_get_nanoseconds();
+ dt->du_fflags &= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH;
+ }
+ dt->dt_timer = dtc->dtc_timer;
+ free(dtc);
+ if (ds->ds_is_installed) {
+ // Clear any pending data that might have accumulated on
+ // older timer params <rdar://problem/8574886>
+ os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
+ _dispatch_timers_update(dt, 0);
}
+}
- while ((ds = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
- // We may find timers on the wrong list due to a pending update from
- // dispatch_source_set_timer. Force an update of the list in that case.
- if (timer != ds->ds_ident_hack) {
- _dispatch_timer_list_update(ds);
- continue;
- }
- if (!ds->ds_timer.target) {
- // no configured timers on the list
- break;
- }
- if (ds->ds_timer.target > now) {
- // Done running timers for now.
- break;
+static dispatch_timer_config_t
+_dispatch_source_timer_config_create(dispatch_time_t start,
+ uint64_t interval, uint64_t leeway)
+{
+ dispatch_timer_config_t dtc;
+ dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s));
+ if (unlikely(interval == 0)) {
+ if (start != DISPATCH_TIME_FOREVER) {
+ _dispatch_bug_deprecated("Setting timer interval to 0 requests "
+ "a 1ns timer, did you mean FOREVER (a one-shot timer)?");
}
+ interval = 1;
+ } else if ((int64_t)interval < 0) {
+ // 6866347 - make sure nanoseconds won't overflow
+ interval = INT64_MAX;
+ }
+ if ((int64_t)leeway < 0) {
+ leeway = INT64_MAX;
+ }
+ if (start == DISPATCH_TIME_NOW) {
+ start = _dispatch_absolute_time();
+ } else if (start == DISPATCH_TIME_FOREVER) {
+ start = INT64_MAX;
+ }
- if (ds->ds_timer.flags & (DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE)) {
- dispatch_atomic_inc(&ds->ds_pending_data);
- ds->ds_timer.target = 0;
- } else {
- // Calculate number of missed intervals.
- missed = (now - ds->ds_timer.target) / ds->ds_timer.interval;
- dispatch_atomic_add(&ds->ds_pending_data, missed + 1);
- ds->ds_timer.target += (missed + 1) * ds->ds_timer.interval;
+ if ((int64_t)start < 0) {
+ // wall clock
+ start = (dispatch_time_t)-((int64_t)start);
+ dtc->dtc_clock = DISPATCH_CLOCK_WALL;
+ } else {
+ // absolute clock
+ interval = _dispatch_time_nano2mach(interval);
+ if (interval < 1) {
+ // rdar://problem/7287561 interval must be at least one in
+ // in order to avoid later division by zero when calculating
+ // the missed interval count. (NOTE: the wall clock's
+ // interval is already "fixed" to be 1 or more)
+ interval = 1;
}
+ leeway = _dispatch_time_nano2mach(leeway);
+ dtc->dtc_clock = DISPATCH_CLOCK_MACH;
+ }
+ if (interval < INT64_MAX && leeway > interval / 2) {
+ leeway = interval / 2;
+ }
- _dispatch_timer_list_update(ds);
- _dispatch_wakeup(ds);
+ dtc->dtc_timer.target = start;
+ dtc->dtc_timer.interval = interval;
+ if (start + leeway < INT64_MAX) {
+ dtc->dtc_timer.deadline = start + leeway;
+ } else {
+ dtc->dtc_timer.deadline = INT64_MAX;
}
+ return dtc;
}
+DISPATCH_NOINLINE
void
-_dispatch_run_timers(void)
+dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
+ uint64_t interval, uint64_t leeway)
{
- unsigned int i;
- for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
- _dispatch_run_timers2(i);
+ dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
+ dispatch_timer_config_t dtc;
+
+ if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) {
+ DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
}
-}
-#if defined(__i386__) || defined(__x86_64__)
-// these architectures always return mach_absolute_time() in nanoseconds
-#define _dispatch_convert_mach2nano(x) (x)
-#define _dispatch_convert_nano2mach(x) (x)
-#else
-static mach_timebase_info_data_t tbi;
-static dispatch_once_t tbi_pred;
+ dtc = _dispatch_source_timer_config_create(start, interval, leeway);
+ _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
+ dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
+ if (dtc) free(dtc);
+ dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
+}
static void
-_dispatch_convert_init(void *context __attribute__((unused)))
+_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
{
- dispatch_assume_zero(mach_timebase_info(&tbi));
+#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
+// approx 1 year (60s * 60m * 24h * 365d)
+#define FOREVER_NSEC 31536000000000000ull
+
+ dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
+ const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION;
+ if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
+ FOREVER_NSEC/NSEC_PER_MSEC))) {
+ interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
+ } else {
+ interval = FOREVER_NSEC;
+ }
+ interval = _dispatch_time_nano2mach(interval);
+ uint64_t target = _dispatch_absolute_time() + interval;
+ target -= (target % interval);
+ const uint64_t leeway = animation ?
+ _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
+ dr->dt_timer.target = target;
+ dr->dt_timer.deadline = target + leeway;
+ dr->dt_timer.interval = interval;
+ _dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer);
}
-static uint64_t
-_dispatch_convert_mach2nano(uint64_t val)
+#pragma mark -
+#pragma mark dispatch_after
+
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
+ void *ctxt, void *handler, bool block)
{
-#ifdef __LP64__
- __uint128_t tmp;
-#else
- long double tmp;
+ dispatch_timer_source_refs_t dt;
+ dispatch_source_t ds;
+ uint64_t leeway, delta;
+
+ if (when == DISPATCH_TIME_FOREVER) {
+#if DISPATCH_DEBUG
+ DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
+ return;
+ }
- dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
+ delta = _dispatch_timeout(when);
+ if (delta == 0) {
+ if (block) {
+ return dispatch_async(queue, handler);
+ }
+ return dispatch_async_f(queue, ctxt, handler);
+ }
+ leeway = delta / 10; // <rdar://problem/13447496>
- tmp = val;
- tmp *= tbi.numer;
- tmp /= tbi.denom;
+ if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
+ if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
- return tmp;
-}
+ // this function can and should be optimized to not use a dispatch source
+ ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
+ dt = ds->ds_timer_refs;
-static uint64_t
-_dispatch_convert_nano2mach(uint64_t val)
-{
-#ifdef __LP64__
- __uint128_t tmp;
-#else
- long double tmp;
-#endif
+ dispatch_continuation_t dc = _dispatch_continuation_alloc();
+ if (block) {
+ _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
+ } else {
+ _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
+ }
+ // reference `ds` so that it doesn't show up as a leak
+ dc->dc_data = ds;
+ _dispatch_trace_continuation_push(ds->_as_dq, dc);
+ os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
- dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
+ if ((int64_t)when < 0) {
+ // wall clock
+ when = (dispatch_time_t)-((int64_t)when);
+ } else {
+ // absolute clock
+ dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
+ leeway = _dispatch_time_nano2mach(leeway);
+ }
+ dt->dt_timer.target = when;
+ dt->dt_timer.interval = UINT64_MAX;
+ dt->dt_timer.deadline = when + leeway;
+ dispatch_activate(ds);
+}
- tmp = val;
- tmp *= tbi.denom;
- tmp /= tbi.numer;
+DISPATCH_NOINLINE
+void
+dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
+ dispatch_function_t func)
+{
+ _dispatch_after(when, queue, ctxt, func, false);
+}
- return tmp;
+#ifdef __BLOCKS__
+void
+dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
+ dispatch_block_t work)
+{
+ _dispatch_after(when, queue, NULL, work, true);
}
#endif
-// approx 1 year (60s * 60m * 24h * 365d)
-#define FOREVER_SEC 3153600l
-#define FOREVER_NSEC 31536000000000000ull
+#pragma mark -
+#pragma mark dispatch_timers
-struct timespec *
-_dispatch_get_next_timer_fire(struct timespec *howsoon)
-{
- // <rdar://problem/6459649>
- // kevent(2) does not allow large timeouts, so we use a long timeout
- // instead (approximately 1 year).
- dispatch_source_t ds = NULL;
- unsigned int timer;
- uint64_t now, delta_tmp, delta = UINT64_MAX;
-
- // We are looking for the first unsuspended timer which has its target
- // time set. Given timers are kept in order, if we hit an timer that's
- // unset there's no point in continuing down the list.
- for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
- TAILQ_FOREACH(ds, &_dispatch_kevent_timer[timer].dk_sources, ds_list) {
- if (!ds->ds_timer.target) {
- break;
- }
- if (DISPATCH_OBJECT_SUSPENDED(ds)) {
- ds->ds_is_armed = false;
- } else {
- break;
- }
- }
+/*
+ * The dispatch_timer_heap_t structure is a double min-heap of timers,
+ * interleaving the by-target min-heap in the even slots, and the by-deadline
+ * in the odd ones.
+ *
+ * The min element of these is held inline in the dispatch_timer_heap_t
+ * structure, and further entries are held in segments.
+ *
+ * dth_segments is the number of allocated segments.
+ *
+ * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
+ * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
+ *
+ * Segment n (dth_segments - 1) is the last segment and points its final n
+ * entries to previous segments. Its address is held in the `dth_heap` field.
+ *
+ * segment n [ regular timer pointers | n-1 | k | 0 ]
+ * | | |
+ * segment n-1 <---------------------------' | |
+ * segment k <--------------------------------' |
+ * segment 0 <------------------------------------'
+ */
+#define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u
- if (!ds || !ds->ds_timer.target) {
- continue;
- }
-
- if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
- now = _dispatch_get_nanoseconds();
- } else {
- now = mach_absolute_time();
- }
- if (ds->ds_timer.target <= now) {
- howsoon->tv_sec = 0;
- howsoon->tv_nsec = 0;
- return howsoon;
- }
+/*
+ * There are two min-heaps stored interleaved in a single array,
+ * even indices are for the by-target min-heap, and odd indices for
+ * the by-deadline one.
+ */
+#define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
+#define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
+#define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
+ (((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))
- // the subtraction cannot go negative because the previous "if"
- // verified that the target is greater than now.
- delta_tmp = ds->ds_timer.target - now;
- if (!(ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK)) {
- delta_tmp = _dispatch_convert_mach2nano(delta_tmp);
- }
- if (delta_tmp < delta) {
- delta = delta_tmp;
- }
- }
- if (slowpath(delta > FOREVER_NSEC)) {
- return NULL;
- } else {
- howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
- howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
- }
- return howsoon;
+DISPATCH_ALWAYS_INLINE
+static inline uint32_t
+_dispatch_timer_heap_capacity(uint32_t segments)
+{
+ if (segments == 0) return 2;
+ uint32_t seg_no = segments - 1;
+ // for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
+ // 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
+ return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << seg_no) - seg_no;
}
-struct dispatch_set_timer_params {
- dispatch_source_t ds;
- uintptr_t ident;
- struct dispatch_timer_source_s values;
-};
-
-// To be called from the context of the _dispatch_mgr_q
+DISPATCH_NOINLINE
static void
-_dispatch_source_set_timer2(void *context)
+_dispatch_timer_heap_grow(dispatch_timer_heap_t dth)
{
- struct dispatch_set_timer_params *params = context;
- dispatch_source_t ds = params->ds;
- ds->ds_ident_hack = params->ident;
- ds->ds_timer = params->values;
- _dispatch_timer_list_update(ds);
- dispatch_resume(ds);
- dispatch_release(ds);
- free(params);
-}
+ uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
+ uint32_t seg_no = dth->dth_segments++;
+ void **heap, **heap_prev = dth->dth_heap;
-void
-dispatch_source_set_timer(dispatch_source_t ds,
- dispatch_time_t start,
- uint64_t interval,
- uint64_t leeway)
-{
- struct dispatch_set_timer_params *params;
-
- // we use zero internally to mean disabled
- if (interval == 0) {
- interval = 1;
- } else if ((int64_t)interval < 0) {
- // 6866347 - make sure nanoseconds won't overflow
- interval = INT64_MAX;
+ if (seg_no > 0) {
+ seg_capacity <<= (seg_no - 1);
}
-
- // Suspend the source so that it doesn't fire with pending changes
- // The use of suspend/resume requires the external retain/release
- dispatch_retain(ds);
- dispatch_suspend(ds);
-
- if (start == DISPATCH_TIME_NOW) {
- start = mach_absolute_time();
- } else if (start == DISPATCH_TIME_FOREVER) {
- start = INT64_MAX;
+ heap = _dispatch_calloc(seg_capacity, sizeof(void *));
+ if (seg_no > 1) {
+ uint32_t prev_seg_no = seg_no - 1;
+ uint32_t prev_seg_capacity = seg_capacity >> 1;
+ memcpy(&heap[seg_capacity - prev_seg_no],
+ &heap_prev[prev_seg_capacity - prev_seg_no],
+ prev_seg_no * sizeof(void *));
}
-
- while (!(params = malloc(sizeof(struct dispatch_set_timer_params)))) {
- sleep(1);
+ if (seg_no > 0) {
+ heap[seg_capacity - seg_no] = heap_prev;
}
+ dth->dth_heap = heap;
+}
- params->ds = ds;
- params->values.flags = ds->ds_timer.flags;
+DISPATCH_NOINLINE
+static void
+_dispatch_timer_heap_shrink(dispatch_timer_heap_t dth)
+{
+ uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
+ uint32_t seg_no = --dth->dth_segments;
+ void **heap = dth->dth_heap, **heap_prev = NULL;
- if ((int64_t)start < 0) {
- // wall clock
- params->ident = DISPATCH_TIMER_INDEX_WALL;
- params->values.start = -((int64_t)start);
- params->values.target = -((int64_t)start);
- params->values.interval = interval;
- params->values.leeway = leeway;
- params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
- } else {
- // mach clock
- params->ident = DISPATCH_TIMER_INDEX_MACH;
- params->values.start = start;
- params->values.target = start;
- params->values.interval = _dispatch_convert_nano2mach(interval);
- params->values.leeway = _dispatch_convert_nano2mach(leeway);
- params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
+ if (seg_no > 0) {
+ seg_capacity <<= (seg_no - 1);
+ heap_prev = heap[seg_capacity - seg_no];
}
-
- dispatch_barrier_async_f(&_dispatch_mgr_q, params, _dispatch_source_set_timer2);
+ if (seg_no > 1) {
+ uint32_t prev_seg_no = seg_no - 1;
+ uint32_t prev_seg_capacity = seg_capacity >> 1;
+ memcpy(&heap_prev[prev_seg_capacity - prev_seg_no],
+ &heap[seg_capacity - prev_seg_no],
+ prev_seg_no * sizeof(void *));
+ }
+ dth->dth_heap = heap_prev;
+ free(heap);
}
-#ifndef DISPATCH_NO_LEGACY
-// LEGACY
-long
-dispatch_source_timer_set_time(dispatch_source_t ds, uint64_t nanoseconds, uint64_t leeway)
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_timer_source_refs_t *
+_dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth, uint32_t idx)
{
- dispatch_time_t start;
- if (nanoseconds == 0) {
- nanoseconds = 1;
+ uint32_t seg_no, segments = dth->dth_segments;
+ void **segment;
+
+ if (idx < DTH_ID_COUNT) {
+ return &dth->dth_min[idx];
}
- if (ds->ds_timer.flags == (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK)) {
- static const struct timespec t0;
- start = dispatch_walltime(&t0, nanoseconds);
- } else if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
- start = dispatch_walltime(DISPATCH_TIME_NOW, nanoseconds);
+ idx -= DTH_ID_COUNT;
+
+ // Derive the segment number from the index. Naming
+ // DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
+ // 0: 0 .. (C - 1)
+ // 1: C .. 2 * C - 1
+ // k: 2^(k-1) * C .. 2^k * C - 1
+ // so `k` can be derived from the first bit set in `idx`
+ seg_no = (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1) -
+ __builtin_clz(idx | (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1)));
+ if (seg_no + 1 == segments) {
+ segment = dth->dth_heap;
} else {
- start = dispatch_time(DISPATCH_TIME_NOW, nanoseconds);
+ uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
+ seg_capacity <<= (segments - 2);
+ segment = dth->dth_heap[seg_capacity - seg_no - 1];
+ }
+ if (seg_no) {
+ idx -= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (seg_no - 1);
+ }
+ return (dispatch_timer_source_refs_t *)(segment + idx);
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_timer_heap_set(dispatch_timer_source_refs_t *slot,
+ dispatch_timer_source_refs_t dt, uint32_t idx)
+{
+ *slot = dt;
+ dt->dt_heap_entry[DTH_HEAP_ID(idx)] = idx;
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline uint32_t
+_dispatch_timer_heap_parent(uint32_t idx)
+{
+ uint32_t heap_id = DTH_HEAP_ID(idx);
+ idx = (idx - DTH_ID_COUNT) / 2; // go to the parent
+ return DTH_IDX_FOR_HEAP_ID(idx, heap_id);
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline uint32_t
+_dispatch_timer_heap_left_child(uint32_t idx)
+{
+ uint32_t heap_id = DTH_HEAP_ID(idx);
+ // 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
+ return 2 * idx + DTH_ID_COUNT - heap_id;
+}
+
+#if DISPATCH_HAVE_TIMER_COALESCING
+DISPATCH_ALWAYS_INLINE
+static inline uint32_t
+_dispatch_timer_heap_walk_skip(uint32_t idx, uint32_t count)
+{
+ uint32_t heap_id = DTH_HEAP_ID(idx);
+
+ idx -= heap_id;
+ if (unlikely(idx + DTH_ID_COUNT == count)) {
+ // reaching `count` doesn't mean we're done, but there is a weird
+ // corner case if the last item of the heap is a left child:
+ //
+ // /\
+ // / \
+ // / __\
+ // /__/
+ // ^
+ //
+ // The formula below would return the sibling of `idx` which is
+ // out of bounds. Fortunately, the correct answer is the same
+ // as for idx's parent
+ idx = _dispatch_timer_heap_parent(idx);
+ }
+
+ //
+ // When considering the index in a non interleaved, 1-based array
+ // representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
+ // for a given idx in our dual-heaps, that index is in one of two forms:
+ //
+ // (a) 1xxxx011111 or (b) 111111111
+ // d i 0 d 0
+ //
+ // The first bit set is the row of the binary tree node (0-based).
+ // The following digits from most to least significant represent the path
+ // to that node, where `0` is a left turn and `1` a right turn.
+ //
+ // For example 0b0101 (5) is a node on row 2 accessed going left then right:
+ //
+ // row 0 1
+ // / .
+ // row 1 2 3
+ // . \ . .
+ // row 2 4 5 6 7
+ // : : : : : : : :
+ //
+ // Skipping a sub-tree in walk order means going to the sibling of the last
+ // node reached after we turned left. If the node was of the form (a),
+ // this node is 1xxxx1, which for the above example is 0b0011 (3).
+ // If the node was of the form (b) then we never took a left, meaning
+ // we reached the last element in traversal order.
+ //
+
+ //
+ // we want to find
+ // - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
+ // - which is offset by log_2(DTH_ID_COUNT) from the position of the least
+ // significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
+ // since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
+ // - which in turn is the same as the position of the least significant 1 in
+ // ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
+ //
+ dispatch_static_assert(powerof2(DTH_ID_COUNT));
+ idx += DTH_ID_COUNT + DTH_ID_COUNT - 1;
+ idx >>= __builtin_ctz(~idx);
+
+ //
+ // `idx` is now either:
+ // - 0 if it was the (b) case above, in which case the walk is done
+ // - 1xxxx0 as the position in a 0 based array representation of a non
+ // interleaved heap, so we just have to compute the interleaved index.
+ //
+ return likely(idx) ? DTH_ID_COUNT * idx + heap_id : UINT32_MAX;
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline uint32_t
+_dispatch_timer_heap_walk_next(uint32_t idx, uint32_t count)
+{
+ //
+ // Goes to the next element in heap walk order, which is the prefix ordered
+ // walk of the tree.
+ //
+ // From a given node, the next item to return is the left child if it
+ // exists, else the first right sibling we find by walking our parent chain,
+ // which is exactly what _dispatch_timer_heap_walk_skip() returns.
+ //
+ uint32_t lchild = _dispatch_timer_heap_left_child(idx);
+ if (lchild < count) {
+ return lchild;
+ }
+ return _dispatch_timer_heap_walk_skip(idx, count);
+}
+
+DISPATCH_NOINLINE
+static uint64_t
+_dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth, uint64_t limit)
+{
+ dispatch_timer_source_refs_t dri;
+ uint32_t idx = _dispatch_timer_heap_left_child(DTH_TARGET_ID);
+ uint32_t count = dth->dth_count;
+ uint64_t tmp, target = dth->dth_min[DTH_TARGET_ID]->dt_timer.target;
+
+ while (idx < count) {
+ dri = *_dispatch_timer_heap_get_slot(dth, idx);
+ tmp = dri->dt_timer.target;
+ if (tmp > limit) {
+ // skip subtree since none of the targets below can be before limit
+ idx = _dispatch_timer_heap_walk_skip(idx, count);
+ } else {
+ target = tmp;
+ idx = _dispatch_timer_heap_walk_next(idx, count);
+ }
}
- if (ds->ds_timer.flags & (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_ONESHOT)) {
- // 6866347 - make sure nanoseconds won't overflow
- nanoseconds = INT64_MAX; // non-repeating (~292 years)
+ return target;
+}
+#endif // DISPATCH_HAVE_TIMER_COALESCING
+
+DISPATCH_NOINLINE
+static void
+_dispatch_timer_heap_resift(dispatch_timer_heap_t dth,
+ dispatch_timer_source_refs_t dt, uint32_t idx)
+{
+ dispatch_static_assert(offsetof(struct dispatch_timer_source_s, target) ==
+ offsetof(struct dispatch_timer_source_s, heap_key[DTH_TARGET_ID]));
+ dispatch_static_assert(offsetof(struct dispatch_timer_source_s, deadline) ==
+ offsetof(struct dispatch_timer_source_s, heap_key[DTH_DEADLINE_ID]));
+#define dth_cmp(hid, dt1, op, dt2) \
+ (((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])
+
+ dispatch_timer_source_refs_t *pslot, pdt;
+ dispatch_timer_source_refs_t *cslot, cdt;
+ dispatch_timer_source_refs_t *rslot, rdt;
+ uint32_t cidx, dth_count = dth->dth_count;
+ dispatch_timer_source_refs_t *slot;
+ int heap_id = DTH_HEAP_ID(idx);
+ bool sifted_up = false;
+
+ // try to sift up
+
+ slot = _dispatch_timer_heap_get_slot(dth, idx);
+ while (idx >= DTH_ID_COUNT) {
+ uint32_t pidx = _dispatch_timer_heap_parent(idx);
+ pslot = _dispatch_timer_heap_get_slot(dth, pidx);
+ pdt = *pslot;
+ if (dth_cmp(heap_id, pdt, <=, dt)) {
+ break;
+ }
+ _dispatch_timer_heap_set(slot, pdt, idx);
+ slot = pslot;
+ idx = pidx;
+ sifted_up = true;
+ }
+ if (sifted_up) {
+ goto done;
+ }
+
+ // try to sift down
+
+ while ((cidx = _dispatch_timer_heap_left_child(idx)) < dth_count) {
+ uint32_t ridx = cidx + DTH_ID_COUNT;
+ cslot = _dispatch_timer_heap_get_slot(dth, cidx);
+ cdt = *cslot;
+ if (ridx < dth_count) {
+ rslot = _dispatch_timer_heap_get_slot(dth, ridx);
+ rdt = *rslot;
+ if (dth_cmp(heap_id, cdt, >, rdt)) {
+ cidx = ridx;
+ cdt = rdt;
+ cslot = rslot;
+ }
+ }
+ if (dth_cmp(heap_id, dt, <=, cdt)) {
+ break;
+ }
+ _dispatch_timer_heap_set(slot, cdt, idx);
+ slot = cslot;
+ idx = cidx;
}
- dispatch_source_set_timer(ds, start, nanoseconds, leeway);
- return 0;
+
+done:
+ _dispatch_timer_heap_set(slot, dt, idx);
+#undef dth_cmp
}
-// LEGACY
-uint64_t
-dispatch_event_get_nanoseconds(dispatch_source_t ds)
+DISPATCH_ALWAYS_INLINE
+static void
+_dispatch_timer_heap_insert(dispatch_timer_heap_t dth,
+ dispatch_timer_source_refs_t dt)
{
- if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
- return ds->ds_timer.interval;
- } else {
- return _dispatch_convert_mach2nano(ds->ds_timer.interval);
+ uint32_t idx = (dth->dth_count += DTH_ID_COUNT) - DTH_ID_COUNT;
+
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], ==,
+ DTH_INVALID_ID, "target idx");
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], ==,
+ DTH_INVALID_ID, "deadline idx");
+
+ if (idx == 0) {
+ dt->dt_heap_entry[DTH_TARGET_ID] = DTH_TARGET_ID;
+ dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_DEADLINE_ID;
+ dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = dt;
+ return;
+ }
+
+ if (unlikely(idx + DTH_ID_COUNT >
+ _dispatch_timer_heap_capacity(dth->dth_segments))) {
+ _dispatch_timer_heap_grow(dth);
}
+ _dispatch_timer_heap_resift(dth, dt, idx + DTH_TARGET_ID);
+ _dispatch_timer_heap_resift(dth, dt, idx + DTH_DEADLINE_ID);
}
-#endif /* DISPATCH_NO_LEGACY */
-static dispatch_source_t _dispatch_mach_notify_source;
-static mach_port_t _dispatch_port_set;
-static mach_port_t _dispatch_event_port;
+DISPATCH_NOINLINE
+static void
+_dispatch_timer_heap_remove(dispatch_timer_heap_t dth,
+ dispatch_timer_source_refs_t dt)
+{
+ uint32_t idx = (dth->dth_count -= DTH_ID_COUNT);
+
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
+ DTH_INVALID_ID, "target idx");
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
+ DTH_INVALID_ID, "deadline idx");
+
+ if (idx == 0) {
+ DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_TARGET_ID], ==, dt,
+ "target slot");
+ DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_DEADLINE_ID], ==, dt,
+ "deadline slot");
+ dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = NULL;
+ goto clear_heap_entry;
+ }
+
+ for (uint32_t heap_id = 0; heap_id < DTH_ID_COUNT; heap_id++) {
+ dispatch_timer_source_refs_t *slot, last_dt;
+ slot = _dispatch_timer_heap_get_slot(dth, idx + heap_id);
+ last_dt = *slot; *slot = NULL;
+ if (last_dt != dt) {
+ uint32_t removed_idx = dt->dt_heap_entry[heap_id];
+ _dispatch_timer_heap_resift(dth, last_dt, removed_idx);
+ }
+ }
+ if (unlikely(idx <= _dispatch_timer_heap_capacity(dth->dth_segments - 1))) {
+ _dispatch_timer_heap_shrink(dth);
+ }
-#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
-#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
+clear_heap_entry:
+ dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
+ dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
+}
-#define _DISPATCH_MACHPORT_HASH_SIZE 32
-#define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_timer_heap_update(dispatch_timer_heap_t dth,
+ dispatch_timer_source_refs_t dt)
+{
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
+ DTH_INVALID_ID, "target idx");
+ DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
+ DTH_INVALID_ID, "deadline idx");
-static void _dispatch_port_set_init(void *);
-static mach_port_t _dispatch_get_port_set(void);
-void
-_dispatch_drain_mach_messages(struct kevent *ke)
-{
- dispatch_source_t dsi;
- dispatch_kevent_t dk;
- struct kevent ke2;
+ _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_TARGET_ID]);
+ _dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_DEADLINE_ID]);
+}
- if (!dispatch_assume(ke->data)) {
- return;
- }
- dk = _dispatch_kevent_find(ke->data, EVFILT_MACHPORT);
- if (!dispatch_assume(dk)) {
- return;
- }
- _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
+DISPATCH_ALWAYS_INLINE
+static bool
+_dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth,
+ uint32_t count, uint32_t mask)
+{
+ dispatch_timer_source_refs_t dt;
+ bool changed = false;
+ uint64_t tmp;
+ uint32_t tidx;
- EV_SET(&ke2, ke->data, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, DISPATCH_MACHPORT_RECV, 0, dk);
+ for (tidx = 0; tidx < count; tidx++) {
+ if (!(mask & (1u << tidx))) {
+ continue;
+ }
- TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
- _dispatch_source_merge_kevent(dsi, &ke2);
+ dt = dth[tidx].dth_min[DTH_TARGET_ID];
+ tmp = dt ? dt->dt_timer.target : UINT64_MAX;
+ if (dth[tidx].dth_target != tmp) {
+ dth[tidx].dth_target = tmp;
+ changed = true;
+ }
+ dt = dth[tidx].dth_min[DTH_DEADLINE_ID];
+ tmp = dt ? dt->dt_timer.deadline : UINT64_MAX;
+ if (dth[tidx].dth_deadline != tmp) {
+ dth[tidx].dth_deadline = tmp;
+ changed = true;
+ }
}
+ return changed;
}
-void
-_dispatch_port_set_init(void *context __attribute__((unused)))
+static inline void
+_dispatch_timers_unregister(dispatch_timer_source_refs_t dt)
{
- struct kevent kev = {
- .filter = EVFILT_MACHPORT,
- .flags = EV_ADD,
- };
- kern_return_t kr;
+ uint32_t tidx = dt->du_ident;
+ dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
- kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &_dispatch_port_set);
- DISPATCH_VERIFY_MIG(kr);
- dispatch_assume_zero(kr);
- kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &_dispatch_event_port);
- DISPATCH_VERIFY_MIG(kr);
- dispatch_assume_zero(kr);
- kr = mach_port_move_member(mach_task_self(), _dispatch_event_port, _dispatch_port_set);
- DISPATCH_VERIFY_MIG(kr);
- dispatch_assume_zero(kr);
+ _dispatch_timer_heap_remove(heap, dt);
+ _dispatch_timers_reconfigure = true;
+ _dispatch_timers_processing_mask |= 1 << tidx;
+ dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
+ dt->du_wlh = NULL;
+}
- kev.ident = _dispatch_port_set;
+static inline void
+_dispatch_timers_register(dispatch_timer_source_refs_t dt, uint32_t tidx)
+{
+ dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
+ if (_dispatch_unote_registered(dt)) {
+ DISPATCH_TIMER_ASSERT(dt->du_ident, ==, tidx, "tidx");
+ _dispatch_timer_heap_update(heap, dt);
+ } else {
+ dt->du_ident = tidx;
+ _dispatch_timer_heap_insert(heap, dt);
+ }
+ _dispatch_timers_reconfigure = true;
+ _dispatch_timers_processing_mask |= 1 << tidx;
+ dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
+ dt->du_wlh = DISPATCH_WLH_ANON;
+}
- _dispatch_update_kq(&kev);
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_source_timer_tryarm(dispatch_source_t ds)
+{
+ dispatch_queue_flags_t oqf, nqf;
+ return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
+ if (oqf & (DSF_CANCELED | DQF_RELEASED)) {
+ // do not install a cancelled timer
+ os_atomic_rmw_loop_give_up(break);
+ }
+ nqf = oqf | DSF_ARMED;
+ });
}
-mach_port_t
-_dispatch_get_port_set(void)
+// Updates the ordered list of timers based on next fire date for changes to ds.
+// Should only be called from the context of _dispatch_mgr_q.
+static void
+_dispatch_timers_update(dispatch_unote_t du, uint32_t flags)
{
- static dispatch_once_t pred;
+ dispatch_timer_source_refs_t dr = du._dt;
+ dispatch_source_t ds = _dispatch_source_from_refs(dr);
+ const char *verb = "updated";
+ bool will_register, disarm = false;
- dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
+ DISPATCH_ASSERT_ON_MANAGER_QUEUE();
- return _dispatch_port_set;
-}
+ if (unlikely(dr->du_ident == DISPATCH_TIMER_IDENT_CANCELED)) {
+ dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
+ return;
+ }
-void
-_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
-{
- mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
- kern_return_t kr;
-
- if ((new_flags & DISPATCH_MACHPORT_RECV) || (!new_flags && !del_flags && dk->dk_kevent.fflags & DISPATCH_MACHPORT_RECV)) {
- _dispatch_kevent_machport_enable(dk);
- }
- if (new_flags & DISPATCH_MACHPORT_DEAD) {
- kr = mach_port_request_notification(mach_task_self(), port, MACH_NOTIFY_DEAD_NAME, 1,
- _dispatch_event_port, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
- DISPATCH_VERIFY_MIG(kr);
-
-
- switch(kr) {
- case KERN_INVALID_NAME:
- case KERN_INVALID_RIGHT:
- // Supress errors
- break;
- default:
- // Else, we dont expect any errors from mach. Log any errors if we do
- if (dispatch_assume_zero(kr)) {
- // log the error
- } else if (dispatch_assume_zero(previous)) {
- // Another subsystem has beat libdispatch to requesting the Mach
- // dead-name notification on this port. We should technically cache the
- // previous port and message it when the kernel messages our port. Or
- // we can just say screw those subsystems and drop the previous port.
- // They should adopt libdispatch :-P
- kr = mach_port_deallocate(mach_task_self(), previous);
- DISPATCH_VERIFY_MIG(kr);
- dispatch_assume_zero(kr);
- }
+ // Unregister timers that are unconfigured, disabled, suspended or have
+ // missed intervals. Rearm after dispatch_set_timer(), resume or source
+ // invoke will reenable them
+ will_register = !(flags & DISPATCH_TIMERS_UNREGISTER) &&
+ dr->dt_timer.target < INT64_MAX &&
+ !os_atomic_load2o(ds, ds_pending_data, relaxed) &&
+ !DISPATCH_QUEUE_IS_SUSPENDED(ds) &&
+ !os_atomic_load2o(dr, dt_pending_config, relaxed);
+ if (likely(!_dispatch_unote_registered(dr))) {
+ dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
+ if (unlikely(!will_register || !_dispatch_source_timer_tryarm(ds))) {
+ return;
}
+ verb = "armed";
+ } else if (unlikely(!will_register)) {
+ disarm = true;
+ verb = "disarmed";
}
- if (del_flags & DISPATCH_MACHPORT_RECV) {
- _dispatch_kevent_machport_disable(dk);
+ // The heap owns a +2 on dispatch sources it references
+ //
+ // _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
+ // when it wants to take over this +2 at the same time we are unregistering
+ // the timer from the heap.
+ //
+ // Compute our refcount balance according to these rules, if our balance
+ // would become negative we retain the source upfront, if it is positive, we
+ // get rid of the extraneous refcounts after we're done touching the source.
+ int refs = will_register ? -2 : 0;
+ if (_dispatch_unote_registered(dr) && !(flags & DISPATCH_TIMERS_RETAIN_2)) {
+ refs += 2;
+ }
+ if (refs < 0) {
+ dispatch_assert(refs == -2);
+ _dispatch_retain_2(ds);
}
- if (del_flags & DISPATCH_MACHPORT_DEAD) {
- kr = mach_port_request_notification(mach_task_self(), (mach_port_t)dk->dk_kevent.ident,
- MACH_NOTIFY_DEAD_NAME, 1, MACH_PORT_NULL, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
- DISPATCH_VERIFY_MIG(kr);
- switch (kr) {
- case KERN_INVALID_NAME:
- case KERN_INVALID_RIGHT:
- case KERN_INVALID_ARGUMENT:
- break;
- default:
- if (dispatch_assume_zero(kr)) {
- // log the error
- } else if (previous) {
- // the kernel has not consumed the right yet
- dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous));
- }
- }
+ uint32_t tidx = _dispatch_source_timer_idx(dr);
+ if (unlikely(_dispatch_unote_registered(dr) &&
+ (!will_register || dr->du_ident != tidx))) {
+ _dispatch_timers_unregister(dr);
+ }
+ if (likely(will_register)) {
+ _dispatch_timers_register(dr, tidx);
+ }
+
+ if (disarm) {
+ _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
+ }
+ _dispatch_debug("kevent-source[%p]: %s timer[%p]", ds, verb, dr);
+ _dispatch_object_debug(ds, "%s", __func__);
+ if (refs > 0) {
+ dispatch_assert(refs == 2);
+ _dispatch_release_2_tailcall(ds);
}
}
-void
-_dispatch_kevent_machport_enable(dispatch_kevent_t dk)
-{
- mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
- kern_return_t kr;
+#define DISPATCH_TIMER_MISSED_MARKER 1ul
- kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
- DISPATCH_VERIFY_MIG(kr);
- switch (kr) {
- case KERN_INVALID_NAME:
-#if DISPATCH_DEBUG
- _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
-#endif
- break;
- default:
- dispatch_assume_zero(kr);
+DISPATCH_ALWAYS_INLINE
+static inline unsigned long
+_dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt,
+ uint64_t now, unsigned long prev)
+{
+ uint64_t missed = (now - dt->dt_timer.target) / dt->dt_timer.interval;
+ if (++missed + prev > LONG_MAX) {
+ missed = LONG_MAX - prev;
+ }
+ if (dt->dt_timer.interval < INT64_MAX) {
+ uint64_t push_by = missed * dt->dt_timer.interval;
+ dt->dt_timer.target += push_by;
+ dt->dt_timer.deadline += push_by;
+ } else {
+ dt->dt_timer.target = UINT64_MAX;
+ dt->dt_timer.deadline = UINT64_MAX;
}
+ prev += missed;
+ return prev;
}
-void
-_dispatch_kevent_machport_disable(dispatch_kevent_t dk)
+DISPATCH_ALWAYS_INLINE
+static inline unsigned long
+_dispatch_source_timer_data(dispatch_source_t ds, dispatch_unote_t du)
{
- mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
- kern_return_t kr;
+ dispatch_timer_source_refs_t dr = du._dt;
+ unsigned long data, prev, clear_prev = 0;
- kr = mach_port_move_member(mach_task_self(), mp, 0);
- DISPATCH_VERIFY_MIG(kr);
- switch (kr) {
- case KERN_INVALID_RIGHT:
- case KERN_INVALID_NAME:
-#if DISPATCH_DEBUG
- _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
-#endif
- break;
- case 0:
- break;
- default:
- dispatch_assume_zero(kr);
- break;
- }
-}
+ os_atomic_rmw_loop2o(ds, ds_pending_data, prev, clear_prev, relaxed, {
+ data = prev >> 1;
+ if (unlikely(prev & DISPATCH_TIMER_MISSED_MARKER)) {
+ os_atomic_rmw_loop_give_up(goto handle_missed_intervals);
+ }
+ });
+ return data;
+
+handle_missed_intervals:
+ // The timer may be in _dispatch_source_invoke2() already for other
+ // reasons such as running the registration handler when ds_pending_data
+ // is changed by _dispatch_timers_run2() without holding the drain lock.
+ //
+ // We hence need dependency ordering to pair with the release barrier
+ // done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
+ os_atomic_thread_fence(dependency);
+ dr = os_atomic_force_dependency_on(dr, data);
+
+ uint64_t now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr->du_ident));
+ if (now >= dr->dt_timer.target) {
+ OS_COMPILER_CAN_ASSUME(dr->dt_timer.interval < INT64_MAX);
+ data = _dispatch_source_timer_compute_missed(dr, now, data);
+ }
+
+ // When we see the MISSED_MARKER the manager has given up on this timer
+ // and expects the handler to call "resume".
+ //
+ // However, it may not have reflected this into the atomic flags yet
+ // so make sure _dispatch_source_invoke2() sees the timer is disarmed
+ //
+ // The subsequent _dispatch_source_refs_resume() will enqueue the source
+ // on the manager and make the changes to `ds_timer` above visible.
+ _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
+ os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
+ return data;
+}
+
+static inline void
+_dispatch_timers_run2(dispatch_clock_now_cache_t nows, uint32_t tidx)
+{
+ dispatch_timer_source_refs_t dr;
+ dispatch_source_t ds;
+ uint64_t data, pending_data;
+ uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
+
+ while ((dr = _dispatch_timers_heap[tidx].dth_min[DTH_TARGET_ID])) {
+ DISPATCH_TIMER_ASSERT(dr->du_filter, ==, DISPATCH_EVFILT_TIMER,
+ "invalid filter");
+ DISPATCH_TIMER_ASSERT(dr->du_ident, ==, tidx, "tidx");
+ DISPATCH_TIMER_ASSERT(dr->dt_timer.target, !=, 0, "missing target");
+ ds = _dispatch_source_from_refs(dr);
+ if (dr->dt_timer.target > now) {
+ // Done running timers for now.
+ break;
+ }
+ if (dr->du_fflags & DISPATCH_TIMER_AFTER) {
+ _dispatch_trace_timer_fire(dr, 1, 1);
+ _dispatch_source_merge_evt(dr, EV_ONESHOT, 1, 0, 0);
+ _dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds, dr);
+ _dispatch_object_debug(ds, "%s", __func__);
+ continue;
+ }
-#define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
-#ifndef DISPATCH_NO_LEGACY
-dispatch_source_t
-dispatch_source_mig_create(mach_port_t mport, size_t max_msg_size, dispatch_source_attr_t attr,
- dispatch_queue_t dq, dispatch_mig_callback_t mig_callback)
-{
- if (max_msg_size < _DISPATCH_MIN_MSG_SZ) {
- max_msg_size = _DISPATCH_MIN_MSG_SZ;
- }
- return dispatch_source_machport_create(mport, DISPATCH_MACHPORT_RECV, attr, dq,
- ^(dispatch_source_t ds) {
- if (!dispatch_source_get_error(ds, NULL)) {
- if (dq->dq_width != 1) {
- dispatch_retain(ds); // this is a shim -- use the external retain
- dispatch_async(dq, ^{
- dispatch_mig_server(ds, max_msg_size, mig_callback);
- dispatch_release(ds); // this is a shim -- use the external release
- });
- } else {
- dispatch_mig_server(ds, max_msg_size, mig_callback);
+ data = os_atomic_load2o(ds, ds_pending_data, relaxed);
+ if (unlikely(data)) {
+ // the release barrier is required to make the changes
+ // to `ds_timer` visible to _dispatch_source_timer_data()
+ if (os_atomic_cmpxchg2o(ds, ds_pending_data, data,
+ data | DISPATCH_TIMER_MISSED_MARKER, release)) {
+ _dispatch_timers_update(dr, DISPATCH_TIMERS_UNREGISTER);
+ continue;
}
}
- });
+
+ data = _dispatch_source_timer_compute_missed(dr, now, 0);
+ _dispatch_timers_update(dr, DISPATCH_TIMERS_RETAIN_2);
+ pending_data = data << 1;
+ if (!_dispatch_unote_registered(dr) && dr->dt_timer.target < INT64_MAX){
+ // if we unregistered because of suspension we have to fake we
+ // missed events.
+ pending_data |= DISPATCH_TIMER_MISSED_MARKER;
+ os_atomic_store2o(ds, ds_pending_data, pending_data, release);
+ } else {
+ os_atomic_store2o(ds, ds_pending_data, pending_data, relaxed);
+ }
+ _dispatch_trace_timer_fire(dr, data, data);
+ _dispatch_debug("kevent-source[%p]: fired timer[%p]", ds, dr);
+ _dispatch_object_debug(ds, "%s", __func__);
+ dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
+ }
}
-#endif /* DISPATCH_NO_LEGACY */
+DISPATCH_NOINLINE
static void
-_dispatch_mach_notify_source_init(void *context __attribute__((unused)))
+_dispatch_timers_run(dispatch_clock_now_cache_t nows)
{
- size_t maxsz = sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem);
-
- if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem) > maxsz) {
- maxsz = sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem);
+ uint32_t tidx;
+ for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
+ if (_dispatch_timers_heap[tidx].dth_count) {
+ _dispatch_timers_run2(nows, tidx);
+ }
}
-
- _dispatch_get_port_set();
-
- _dispatch_mach_notify_source = dispatch_source_mig_create(_dispatch_event_port,
- maxsz, NULL, &_dispatch_mgr_q, libdispatch_internal_protocol_server);
-
- dispatch_assert(_dispatch_mach_notify_source);
}
-kern_return_t
-_dispatch_mach_notify_port_deleted(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
-{
- dispatch_source_t dsi;
- dispatch_kevent_t dk;
- struct kevent kev;
+#if DISPATCH_HAVE_TIMER_COALESCING
+#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
+ [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
-#if DISPATCH_DEBUG
- _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name);
+static const uint64_t _dispatch_kevent_coalescing_window[] = {
+ DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
+#if DISPATCH_HAVE_TIMER_QOS
+ DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
+ DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
+#endif
+};
+#endif // DISPATCH_HAVE_TIMER_COALESCING
+
+static inline dispatch_timer_delay_s
+_dispatch_timers_get_delay(dispatch_timer_heap_t dth, dispatch_clock_t clock,
+ uint32_t qos, dispatch_clock_now_cache_t nows)
+{
+ uint64_t target = dth->dth_target, deadline = dth->dth_deadline;
+ uint64_t delta = INT64_MAX, dldelta = INT64_MAX;
+ dispatch_timer_delay_s rc;
+
+ dispatch_assert(target <= deadline);
+ if (delta == 0 || target >= INT64_MAX) {
+ goto done;
+ }
+
+ if (qos < DISPATCH_TIMER_QOS_COUNT && dth->dth_count > 2) {
+#if DISPATCH_HAVE_TIMER_COALESCING
+ // Timer pre-coalescing <rdar://problem/13222034>
+ // When we have several timers with this target/deadline bracket:
+ //
+ // Target window Deadline
+ // V <-------V
+ // t1: [...........|.................]
+ // t2: [......|.......]
+ // t3: [..|..........]
+ // t4: | [.............]
+ // ^
+ // Optimal Target
+ //
+ // Coalescing works better if the Target is delayed to "Optimal", by
+ // picking the latest target that isn't too close to the deadline.
+ uint64_t window = _dispatch_kevent_coalescing_window[qos];
+ if (target + window < deadline) {
+ uint64_t latest = deadline - window;
+ target = _dispatch_timer_heap_max_target_before(dth, latest);
+ }
#endif
-
- dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
- if (!dk) {
- goto out;
}
- EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DELETED, 0, dk);
+ uint64_t now = _dispatch_time_now_cached(clock, nows);
+ if (target <= now) {
+ delta = 0;
+ dldelta = 0;
+ goto done;
+ }
- TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
- _dispatch_source_merge_kevent(dsi, &kev);
- // this can never happen again
- // this must happen after the merge
- // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
- dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DELETED;
+ uint64_t tmp = target - now;
+ if (clock != DISPATCH_CLOCK_WALL) {
+ tmp = _dispatch_time_mach2nano(tmp);
+ }
+ if (tmp < delta) {
+ delta = tmp;
}
- // no more sources have this flag
- dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DELETED;
+ tmp = deadline - now;
+ if (clock != DISPATCH_CLOCK_WALL) {
+ tmp = _dispatch_time_mach2nano(tmp);
+ }
+ if (tmp < dldelta) {
+ dldelta = tmp;
+ }
-out:
- return KERN_SUCCESS;
+done:
+ rc.delay = delta;
+ rc.leeway = delta < INT64_MAX ? dldelta - delta : INT64_MAX;
+ return rc;
}
-kern_return_t
-_dispatch_mach_notify_port_destroyed(mach_port_t notify __attribute__((unused)), mach_port_t name)
+static bool
+_dispatch_timers_program2(dispatch_clock_now_cache_t nows, uint32_t tidx)
{
- kern_return_t kr;
- // this function should never be called
- dispatch_assume_zero(name);
- kr = mach_port_mod_refs(mach_task_self(), name, MACH_PORT_RIGHT_RECEIVE, -1);
- DISPATCH_VERIFY_MIG(kr);
- dispatch_assume_zero(kr);
- return KERN_SUCCESS;
+ uint32_t qos = DISPATCH_TIMER_QOS(tidx);
+ dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx);
+ dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
+ dispatch_timer_delay_s range;
+
+ range = _dispatch_timers_get_delay(heap, clock, qos, nows);
+ if (range.delay == 0 || range.delay >= INT64_MAX) {
+ _dispatch_trace_next_timer_set(NULL, qos);
+ if (heap->dth_flags & DTH_ARMED) {
+ _dispatch_event_loop_timer_delete(tidx);
+ }
+ return range.delay == 0;
+ }
+
+ _dispatch_trace_next_timer_set(heap->dth_min[DTH_TARGET_ID], qos);
+ _dispatch_trace_next_timer_program(range.delay, qos);
+ _dispatch_event_loop_timer_arm(tidx, range, nows);
+ return false;
}
-kern_return_t
-_dispatch_mach_notify_no_senders(mach_port_t notify, mach_port_mscount_t mscnt __attribute__((unused)))
+DISPATCH_NOINLINE
+static bool
+_dispatch_timers_program(dispatch_clock_now_cache_t nows)
{
- // this function should never be called
- dispatch_assume_zero(notify);
- return KERN_SUCCESS;
+ bool poll = false;
+ uint32_t tidx, timerm = _dispatch_timers_processing_mask;
+
+ for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
+ if (timerm & (1 << tidx)) {
+ poll |= _dispatch_timers_program2(nows, tidx);
+ }
+ }
+ return poll;
}
-kern_return_t
-_dispatch_mach_notify_send_once(mach_port_t notify __attribute__((unused)))
+DISPATCH_NOINLINE
+static bool
+_dispatch_timers_configure(void)
{
- // we only register for dead-name notifications
- // some code deallocated our send-once right without consuming it
-#if DISPATCH_DEBUG
- _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
-#endif
- return KERN_SUCCESS;
+ // Find out if there is a new target/deadline on the timer lists
+ return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap,
+ countof(_dispatch_timers_heap), _dispatch_timers_processing_mask);
}
-kern_return_t
-_dispatch_mach_notify_dead_name(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
+static inline bool
+_dispatch_mgr_timers(void)
{
- dispatch_source_t dsi;
- dispatch_kevent_t dk;
- struct kevent kev;
- kern_return_t kr;
-
- dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
- if (!dk) {
- goto out;
+ dispatch_clock_now_cache_s nows = { };
+ bool expired = _dispatch_timers_expired;
+ if (unlikely(expired)) {
+ _dispatch_timers_run(&nows);
}
-
- EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DEAD, 0, dk);
-
- TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
- _dispatch_source_merge_kevent(dsi, &kev);
- // this can never happen again
- // this must happen after the merge
- // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
- dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DEAD;
+ _dispatch_mgr_trace_timers_wakes();
+ bool reconfigure = _dispatch_timers_reconfigure;
+ if (unlikely(reconfigure || expired)) {
+ if (reconfigure) {
+ reconfigure = _dispatch_timers_configure();
+ _dispatch_timers_reconfigure = false;
+ }
+ if (reconfigure || expired) {
+ expired = _dispatch_timers_expired = _dispatch_timers_program(&nows);
+ }
+ _dispatch_timers_processing_mask = 0;
}
+ return expired;
+}
- // no more sources have this flag
- dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DEAD;
-
-out:
- // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
- kr = mach_port_deallocate(mach_task_self(), name);
- DISPATCH_VERIFY_MIG(kr);
- //dispatch_assume_zero(kr);
+#pragma mark -
+#pragma mark dispatch_mgr
- return KERN_SUCCESS;
+void
+_dispatch_mgr_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
+ DISPATCH_UNUSED dispatch_qos_t qos)
+{
+ uint64_t dq_state;
+ _dispatch_trace_continuation_push(dq, dou._do);
+ if (unlikely(_dispatch_queue_push_update_tail(dq, dou._do))) {
+ _dispatch_queue_push_update_head(dq, dou._do);
+ dq_state = os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
+ if (!_dq_state_drain_locked_by_self(dq_state)) {
+ _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
+ }
+ }
}
-kern_return_t
-_dispatch_wakeup_main_thread(mach_port_t mp __attribute__((unused)))
+DISPATCH_NORETURN
+void
+_dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq,
+ DISPATCH_UNUSED dispatch_qos_t qos,
+ DISPATCH_UNUSED dispatch_wakeup_flags_t flags)
{
- // dummy function just to pop out the main thread out of mach_msg()
- return 0;
+ DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
}
-kern_return_t
-_dispatch_consume_send_once_right(mach_port_t mp __attribute__((unused)))
+#if DISPATCH_USE_MGR_THREAD
+DISPATCH_NOINLINE DISPATCH_NORETURN
+static void
+_dispatch_mgr_invoke(void)
{
- // dummy function to consume a send-once right
- return 0;
+#if DISPATCH_EVENT_BACKEND_KEVENT
+ dispatch_kevent_s evbuf[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT];
+#endif
+ dispatch_deferred_items_s ddi = {
+#if DISPATCH_EVENT_BACKEND_KEVENT
+ .ddi_maxevents = DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
+ .ddi_eventlist = evbuf,
+#endif
+ };
+ bool poll;
+
+ _dispatch_deferred_items_set(&ddi);
+ for (;;) {
+ _dispatch_mgr_queue_drain();
+ poll = _dispatch_mgr_timers();
+ poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
+ _dispatch_event_loop_drain(poll ? KEVENT_FLAG_IMMEDIATE : 0);
+ }
}
+#endif // DISPATCH_USE_MGR_THREAD
-mach_msg_return_t
-dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, dispatch_mig_callback_t callback)
+DISPATCH_NORETURN
+void
+_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
+ dispatch_invoke_context_t dic DISPATCH_UNUSED,
+ dispatch_invoke_flags_t flags DISPATCH_UNUSED)
{
- mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
- | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
- | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
- mach_msg_options_t tmp_options = options;
- mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
- mach_msg_return_t kr = 0;
- unsigned int cnt = 1000; // do not stall out serial queues
- int demux_success;
+#if DISPATCH_USE_KEVENT_WORKQUEUE
+ if (_dispatch_kevent_workqueue_enabled) {
+ DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
+ "kevent workqueue enabled");
+ }
+#endif
+#if DISPATCH_USE_MGR_THREAD
+ _dispatch_queue_set_current(&_dispatch_mgr_q);
+ _dispatch_mgr_priority_init();
+ _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
+ // never returns, so burn bridges behind us & clear stack 2k ahead
+ _dispatch_clear_stack(2048);
+ _dispatch_mgr_invoke();
+#endif
+}
- maxmsgsz += MAX_TRAILER_SIZE;
+#if DISPATCH_USE_KEVENT_WORKQUEUE
- // XXX FIXME -- allocate these elsewhere
- bufRequest = alloca(maxmsgsz);
- bufReply = alloca(maxmsgsz);
- bufReply->Head.msgh_size = 0; // make CLANG happy
+#define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)
- // XXX FIXME -- change this to not starve out the target queue
- for (;;) {
- if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
- options &= ~MACH_RCV_MSG;
- tmp_options &= ~MACH_RCV_MSG;
+_Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN >=
+ DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
+ "our list should not be longer than the kernel's");
- if (!(tmp_options & MACH_SEND_MSG)) {
- break;
- }
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_priority_t
+_dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh,
+ dispatch_deferred_items_t ddi)
+{
+ dispatch_assert(wlh);
+ dispatch_priority_t old_dbp;
+
+ pthread_priority_t pp = _dispatch_get_priority();
+ if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
+ // If this thread does not have the event manager flag set, don't setup
+ // as the dispatch manager and let the caller know to only process
+ // the delivered events.
+ //
+ // Also add the NEEDS_UNBIND flag so that
+ // _dispatch_priority_compute_update knows it has to unbind
+ pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
+ if (wlh == DISPATCH_WLH_ANON) {
+ pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
+ } else {
+ // pthread sets the flag when it is an event delivery thread
+ // so we need to explicitly clear it
+ pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
+ }
+ _dispatch_thread_setspecific(dispatch_priority_key,
+ (void *)(uintptr_t)pp);
+ if (wlh != DISPATCH_WLH_ANON) {
+ _dispatch_debug("wlh[%p]: handling events", wlh);
+ } else {
+ ddi->ddi_can_stash = true;
}
+ return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER;
+ }
+
+ if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) ||
+ !(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
+ // When the phtread kext is delivering kevents to us, and pthread
+ // root queues are in use, then the pthread priority TSD is set
+ // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
+ //
+ // Given that this isn't a valid QoS we need to fixup the TSD,
+ // and the best option is to clear the qos/priority bits which tells
+ // us to not do any QoS related calls on this thread.
+ //
+ // However, in that case the manager thread is opted out of QoS,
+ // as far as pthread is concerned, and can't be turned into
+ // something else, so we can't stash.
+ pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK;
+ }
+ // Managers always park without mutating to a regular worker thread, and
+ // hence never need to unbind from userland, and when draining a manager,
+ // the NEEDS_UNBIND flag would cause the mutation to happen.
+ // So we need to strip this flag
+ pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
+ _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
+
+ // ensure kevents registered from this thread are registered at manager QoS
+ old_dbp = _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER);
+ _dispatch_queue_set_current(&_dispatch_mgr_q);
+ _dispatch_queue_mgr_lock(&_dispatch_mgr_q);
+ return old_dbp;
+}
+
+DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
+static inline bool
+_dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp)
+{
+ bool needs_poll = _dispatch_queue_mgr_unlock(&_dispatch_mgr_q);
+ _dispatch_reset_basepri(old_dbp);
+ _dispatch_reset_basepri_override();
+ _dispatch_queue_set_current(NULL);
+ return needs_poll;
+}
+
+DISPATCH_ALWAYS_INLINE
+static void
+_dispatch_wlh_worker_thread(dispatch_wlh_t wlh, dispatch_kevent_t events,
+ int *nevents)
+{
+ _dispatch_introspection_thread_add();
+ DISPATCH_PERF_MON_VAR_INIT
- kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
- (mach_msg_size_t)maxmsgsz, (mach_port_t)ds->ds_ident_hack, 0, 0);
+ dispatch_deferred_items_s ddi = {
+ .ddi_eventlist = events,
+ };
+ dispatch_priority_t old_dbp;
- tmp_options = options;
+ old_dbp = _dispatch_wlh_worker_thread_init(wlh, &ddi);
+ if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
+ _dispatch_perfmon_start_impl(true);
+ } else {
+ dispatch_assert(wlh == DISPATCH_WLH_ANON);
+ wlh = DISPATCH_WLH_ANON;
+ }
+ _dispatch_deferred_items_set(&ddi);
+ _dispatch_event_loop_merge(events, *nevents);
- if (slowpath(kr)) {
- switch (kr) {
- case MACH_SEND_INVALID_DEST:
- case MACH_SEND_TIMED_OUT:
- if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
- mach_msg_destroy(&bufReply->Head);
- }
- break;
- case MACH_RCV_TIMED_OUT:
- case MACH_RCV_INVALID_NAME:
- break;
- default:
- dispatch_assume_zero(kr);
- break;
- }
- break;
+ if (old_dbp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
+ _dispatch_mgr_queue_drain();
+ bool poll = _dispatch_mgr_timers();
+ if (_dispatch_wlh_worker_thread_reset(old_dbp)) {
+ poll = true;
}
-
- if (!(tmp_options & MACH_RCV_MSG)) {
- break;
+ if (poll) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
+ } else if (ddi.ddi_stashed_dou._do) {
+ _dispatch_debug("wlh[%p]: draining deferred item %p", wlh,
+ ddi.ddi_stashed_dou._do);
+ if (wlh == DISPATCH_WLH_ANON) {
+ dispatch_assert(ddi.ddi_nevents == 0);
+ _dispatch_deferred_items_set(NULL);
+ _dispatch_root_queue_drain_deferred_item(&ddi
+ DISPATCH_PERF_MON_ARGS);
+ } else {
+ _dispatch_root_queue_drain_deferred_wlh(&ddi
+ DISPATCH_PERF_MON_ARGS);
}
+ }
- bufTemp = bufRequest;
- bufRequest = bufReply;
- bufReply = bufTemp;
+ _dispatch_deferred_items_set(NULL);
+ if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER &&
+ !ddi.ddi_stashed_dou._do) {
+ _dispatch_perfmon_end(perfmon_thread_event_no_steal);
+ }
+ _dispatch_debug("returning %d deferred kevents", ddi.ddi_nevents);
+ *nevents = ddi.ddi_nevents;
+}
- demux_success = callback(&bufRequest->Head, &bufReply->Head);
+DISPATCH_NOINLINE
+void
+_dispatch_kevent_worker_thread(dispatch_kevent_t *events, int *nevents)
+{
+ if (!events && !nevents) {
+ // events for worker thread request have already been delivered earlier
+ return;
+ }
+ if (!dispatch_assume(*nevents && *events)) return;
+ _dispatch_adopt_wlh_anon();
+ _dispatch_wlh_worker_thread(DISPATCH_WLH_ANON, *events, nevents);
+ _dispatch_reset_wlh();
+}
- if (!demux_success) {
- // destroy the request - but not the reply port
- bufRequest->Head.msgh_remote_port = 0;
- mach_msg_destroy(&bufRequest->Head);
- } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
- // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
- if (slowpath(bufReply->RetCode)) {
- if (bufReply->RetCode == MIG_NO_REPLY) {
- continue;
- }
- // destroy the request - but not the reply port
- bufRequest->Head.msgh_remote_port = 0;
- mach_msg_destroy(&bufRequest->Head);
- }
- }
+#endif // DISPATCH_USE_KEVENT_WORKQUEUE
+#pragma mark -
+#pragma mark dispatch_source_debug
- if (bufReply->Head.msgh_remote_port) {
- tmp_options |= MACH_SEND_MSG;
- if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != MACH_MSG_TYPE_MOVE_SEND_ONCE) {
- tmp_options |= MACH_SEND_TIMEOUT;
- }
- }
- }
+static size_t
+_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
+{
+ dispatch_queue_t target = ds->do_targetq;
+ dispatch_source_refs_t dr = ds->ds_refs;
+ return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%x, "
+ "mask = 0x%x, pending_data = 0x%llx, registered = %d, "
+ "armed = %d, deleted = %d%s, canceled = %d, ",
+ target && target->dq_label ? target->dq_label : "", target,
+ dr->du_ident, dr->du_fflags, (unsigned long long)ds->ds_pending_data,
+ ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED),
+ (bool)(ds->dq_atomic_flags & DSF_DELETED),
+ (ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "",
+ (bool)(ds->dq_atomic_flags & DSF_CANCELED));
+}
- return kr;
+static size_t
+_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
+{
+ dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
+ return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx"
+ ", interval = 0x%llx, flags = 0x%x }, ",
+ (unsigned long long)dr->dt_timer.target,
+ (unsigned long long)dr->dt_timer.deadline,
+ (unsigned long long)dr->dt_timer.interval, dr->du_fflags);
+}
+
+size_t
+_dispatch_source_debug(dispatch_source_t ds, char *buf, size_t bufsiz)
+{
+ dispatch_source_refs_t dr = ds->ds_refs;
+ size_t offset = 0;
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
+ dx_kind(ds), ds);
+ offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
+ offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
+ if (dr->du_is_timer) {
+ offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
+ }
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
+ "filter = %s }", dr, dr->du_is_direct ? " (direct)" : "",
+ dr->du_type->dst_kind);
+ return offset;
}