+ return kr;
+}
+
+static inline void
+_dispatch_kevent_mach_portset(struct kevent64_s *ke)
+{
+ if (ke->ident == _dispatch_mach_recv_portset) {
+ return _dispatch_kevent_mach_msg_drain(ke);
+ } else if (ke->ident == _dispatch_mach_portset) {
+ return _dispatch_kevent_machport_drain(ke);
+ } else {
+ return _dispatch_kevent_error(ke);
+ }
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_kevent_machport_drain(struct kevent64_s *ke)
+{
+ mach_port_t name = (mach_port_name_t)ke->data;
+ dispatch_kevent_t dk;
+ struct kevent64_s kev;
+
+ _dispatch_debug_machport(name);
+ dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
+ if (!dispatch_assume(dk)) {
+ return;
+ }
+ _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
+
+ EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
+ DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
+ _dispatch_kevent_debug(&kev, __func__);
+ _dispatch_kevent_merge(&kev);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
+{
+ mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
+ mach_msg_size_t siz, msgsiz;
+ mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
+
+ _dispatch_kevent_mach_recv_reenable(ke);
+ if (!dispatch_assume(hdr)) {
+ DISPATCH_CRASH("EVFILT_MACHPORT with no message");
+ }
+ if (fastpath(!kr)) {
+ return _dispatch_kevent_mach_msg_recv(hdr);
+ } else if (kr != MACH_RCV_TOO_LARGE) {
+ goto out;
+ }
+ if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
+ dispatch_mach_trailer_size)) {
+ DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
+ }
+ siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
+ hdr = malloc(siz);
+ if (ke->data) {
+ if (!dispatch_assume(hdr)) {
+ // Kernel will discard message too large to fit
+ hdr = _dispatch_get_mach_recv_msg_buf();
+ siz = _dispatch_mach_recv_msg_buf_size;
+ }
+ mach_port_t name = (mach_port_name_t)ke->data;
+ const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
+ MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
+ kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
+ MACH_PORT_NULL);
+ if (fastpath(!kr)) {
+ return _dispatch_kevent_mach_msg_recv(hdr);
+ } else if (kr == MACH_RCV_TOO_LARGE) {
+ _dispatch_log("BUG in libdispatch client: "
+ "_dispatch_kevent_mach_msg_drain: dropped message too "
+ "large to fit in memory: id = 0x%x, size = %lld",
+ hdr->msgh_id, ke->ext[1]);
+ kr = MACH_MSG_SUCCESS;
+ }
+ } else {
+ // We don't know which port in the portset contains the large message,
+ // so need to receive all messages pending on the portset to ensure the
+ // large message is drained. <rdar://problem/13950432>
+ bool received = false;
+ for (;;) {
+ if (!dispatch_assume(hdr)) {
+ DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
+ }
+ const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
+ MACH_RCV_TIMEOUT);
+ kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
+ MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
+ if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
+ hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
+ DISPATCH_CRASH("Overlarge message");
+ }
+ if (fastpath(!kr)) {
+ msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
+ if (msgsiz < siz) {
+ void *shrink = realloc(hdr, msgsiz);
+ if (shrink) hdr = shrink;
+ }
+ _dispatch_kevent_mach_msg_recv(hdr);
+ hdr = NULL;
+ received = true;
+ } else if (kr == MACH_RCV_TOO_LARGE) {
+ siz = hdr->msgh_size + dispatch_mach_trailer_size;
+ } else {
+ if (kr == MACH_RCV_TIMED_OUT && received) {
+ kr = MACH_MSG_SUCCESS;
+ }
+ break;
+ }
+ hdr = reallocf(hdr, siz);
+ }
+ }
+ if (hdr != _dispatch_get_mach_recv_msg_buf()) {
+ free(hdr);
+ }
+out:
+ if (slowpath(kr)) {
+ _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
+ "message reception failed", kr);
+ }
+}
+
+static void
+_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
+{
+ dispatch_source_refs_t dri;
+ dispatch_kevent_t dk;
+ mach_port_t name = hdr->msgh_local_port;
+ mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
+
+ if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
+ dispatch_mach_trailer_size)) {
+ _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
+ "received overlarge message");
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+ }
+ if (!dispatch_assume(name)) {
+ _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
+ "received message with MACH_PORT_NULL port");
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+ }
+ _dispatch_debug_machport(name);
+ dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
+ if (!dispatch_assume(dk)) {
+ _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
+ "received message with unknown kevent");
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+ }
+ _dispatch_kevent_debug(&dk->dk_kevent, __func__);
+ TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
+ dispatch_source_t dsi = _dispatch_source_from_refs(dri);
+ if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
+ return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
+ }
+ }
+ _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
+ "received message with no listeners");
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+}
+
+static void
+_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
+{
+ if (hdr) {
+ mach_msg_destroy(hdr);
+ if (hdr != _dispatch_get_mach_recv_msg_buf()) {
+ free(hdr);
+ }
+ }
+}
+
+static void
+_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
+ dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
+{
+ if (ds == _dispatch_mach_notify_source) {
+ _dispatch_mach_notify_source_invoke(hdr);
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+ }
+ if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
+ _dispatch_mach_reply_kevent_unregister((dispatch_mach_t)ds,
+ (dispatch_mach_reply_refs_t)dr, false);
+ }
+ return _dispatch_mach_msg_recv((dispatch_mach_t)ds, hdr, siz);
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
+{
+ dispatch_source_refs_t dri, dr_next;
+ dispatch_kevent_t dk;
+ struct kevent64_s kev;
+ bool unreg;
+
+ dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
+ if (!dk) {
+ return;
+ }
+
+ // Update notification registration state.
+ dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
+ EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
+ flag, 0, (uintptr_t)dk, 0, 0);
+ if (final) {
+ // This can never happen again
+ unreg = true;
+ } else {
+ // Re-register for notification before delivery
+ unreg = _dispatch_kevent_resume(dk, flag, 0);
+ }
+ DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
+ TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
+ dispatch_source_t dsi = _dispatch_source_from_refs(dri);
+ if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
+ dispatch_mach_t dm = (dispatch_mach_t)dsi;
+ _dispatch_mach_merge_kevent(dm, &kev);
+ if (unreg && dm->dm_dkev) {
+ _dispatch_mach_kevent_unregister(dm);
+ }
+ } else {
+ _dispatch_source_merge_kevent(dsi, &kev);
+ if (unreg) {
+ _dispatch_source_kevent_unregister(dsi);
+ }
+ }
+ if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
+ // current merge is last in list (dk might have been freed)
+ // or it re-armed the notification
+ return;
+ }
+ }
+}
+
+static kern_return_t
+_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
+ uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
+ mach_port_mscount_t notify_sync)
+{
+ mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
+ typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
+ kern_return_t kr, krr = 0;
+
+ // Update notification registration state.
+ dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
+ dk->dk_kevent.data &= ~(del_flags & mask);
+
+ _dispatch_debug_machport(port);
+ if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
+ // initialize _dispatch_mach_notify_port:
+ (void)_dispatch_get_mach_recv_portset();
+ _dispatch_debug("machport[0x%08x]: registering for send-possible "
+ "notification", port);
+ previous = MACH_PORT_NULL;
+ krr = mach_port_request_notification(mach_task_self(), port,
+ notify_msgid, notify_sync, _dispatch_mach_notify_port,
+ MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
+ DISPATCH_VERIFY_MIG(krr);
+
+ switch(krr) {
+ case KERN_INVALID_NAME:
+ case KERN_INVALID_RIGHT:
+ // Supress errors & clear registration state
+ dk->dk_kevent.data &= ~mask;
+ break;
+ default:
+ // Else, we dont expect any errors from mach. Log any errors
+ if (dispatch_assume_zero(krr)) {
+ // log the error & clear registration state
+ dk->dk_kevent.data &= ~mask;
+ } else if (dispatch_assume_zero(previous)) {
+ // Another subsystem has beat libdispatch to requesting the
+ // specified Mach notification on this port. We should
+ // technically cache the previous port and message it when the
+ // kernel messages our port. Or we can just say screw those
+ // subsystems and deallocate the previous port.
+ // They should adopt libdispatch :-P
+ kr = mach_port_deallocate(mach_task_self(), previous);
+ DISPATCH_VERIFY_MIG(kr);
+ (void)dispatch_assume_zero(kr);
+ previous = MACH_PORT_NULL;
+ }
+ }
+ } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
+ _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
+ "notification", port);
+ previous = MACH_PORT_NULL;
+ kr = mach_port_request_notification(mach_task_self(), port,
+ notify_msgid, notify_sync, MACH_PORT_NULL,
+ MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
+ DISPATCH_VERIFY_MIG(kr);
+
+ switch (kr) {
+ case KERN_INVALID_NAME:
+ case KERN_INVALID_RIGHT:
+ case KERN_INVALID_ARGUMENT:
+ break;
+ default:
+ if (dispatch_assume_zero(kr)) {
+ // log the error
+ }
+ }
+ } else {
+ return 0;
+ }
+ if (slowpath(previous)) {
+ // the kernel has not consumed the send-once right yet
+ (void)dispatch_assume_zero(
+ _dispatch_send_consume_send_once_right(previous));
+ }
+ return krr;
+}
+
+static void
+_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
+{
+ (void)_dispatch_get_mach_recv_portset();
+ _dispatch_debug("registering for calendar-change notification");
+ kern_return_t kr = host_request_notification(mach_host_self(),
+ HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
+ DISPATCH_VERIFY_MIG(kr);
+ (void)dispatch_assume_zero(kr);
+}
+
+static void
+_dispatch_mach_host_calendar_change_register(void)
+{
+ static dispatch_once_t pred;
+ dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
+}
+
+static void
+_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
+{
+ mig_reply_error_t reply;
+ dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
+ __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
+ dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
+ boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
+ if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
+ // host_notify_reply.defs: host_calendar_changed
+ _dispatch_debug("calendar-change notification");
+ _dispatch_timers_calendar_change();
+ _dispatch_mach_host_notify_update(NULL);
+ success = TRUE;
+ reply.RetCode = KERN_SUCCESS;
+ }
+ if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
+ (void)dispatch_assume_zero(reply.RetCode);
+ }
+}
+
+kern_return_t
+_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
+ mach_port_name_t name)
+{
+#if DISPATCH_DEBUG
+ _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
+ "deleted prematurely", name);
+#endif
+
+ _dispatch_debug_machport(name);
+ _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
+
+ return KERN_SUCCESS;
+}
+
+kern_return_t
+_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
+ mach_port_name_t name)
+{
+ kern_return_t kr;
+
+ _dispatch_debug("machport[0x%08x]: dead-name notification", name);
+ _dispatch_debug_machport(name);
+ _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
+
+ // the act of receiving a dead name notification allocates a dead-name
+ // right that must be deallocated
+ kr = mach_port_deallocate(mach_task_self(), name);
+ DISPATCH_VERIFY_MIG(kr);
+ //(void)dispatch_assume_zero(kr);
+
+ return KERN_SUCCESS;
+}
+
+kern_return_t
+_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
+ mach_port_name_t name)
+{
+ _dispatch_debug("machport[0x%08x]: send-possible notification", name);
+ _dispatch_debug_machport(name);
+ _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
+
+ return KERN_SUCCESS;
+}
+
+#pragma mark -
+#pragma mark dispatch_mach_t
+
+#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
+#define DISPATCH_MACH_PSEUDO_RECEIVED 0x1
+#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
+#define DISPATCH_MACH_OPTIONS_MASK 0xffff
+
+static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
+static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
+ mach_port_t local_port, mach_port_t remote_port);
+static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
+ dispatch_object_t dou);
+static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
+ dispatch_mach_msg_t dmsg);
+
+static dispatch_mach_t
+_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
+ dispatch_mach_handler_function_t handler, bool handler_is_block)
+{
+ dispatch_mach_t dm;
+ dispatch_mach_refs_t dr;
+
+ dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
+ sizeof(struct dispatch_mach_s));
+ _dispatch_queue_init((dispatch_queue_t)dm);
+ dm->dq_label = label;
+
+ dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
+ dm->do_ref_cnt++; // since channel is created suspended
+ dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
+ dm->do_targetq = &_dispatch_mgr_q;
+
+ dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
+ dr->dr_source_wref = _dispatch_ptr2wref(dm);
+ dr->dm_handler_func = handler;
+ dr->dm_handler_ctxt = context;
+ dm->ds_refs = dr;
+ dm->ds_handler_is_block = handler_is_block;
+
+ dm->dm_refs = _dispatch_calloc(1ul,
+ sizeof(struct dispatch_mach_send_refs_s));
+ dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
+ dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
+ TAILQ_INIT(&dm->dm_refs->dm_replies);
+
+ // First item on the channel sets the user-specified target queue
+ dispatch_set_target_queue(dm, q);
+ _dispatch_object_debug(dm, "%s", __func__);
+ return dm;
+}
+
+dispatch_mach_t
+dispatch_mach_create(const char *label, dispatch_queue_t q,
+ dispatch_mach_handler_t handler)
+{
+ dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
+ return _dispatch_mach_create(label, q, bb,
+ (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
+}
+
+dispatch_mach_t
+dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
+ dispatch_mach_handler_function_t handler)
+{
+ return _dispatch_mach_create(label, q, context, handler, false);
+}
+
+void
+_dispatch_mach_dispose(dispatch_mach_t dm)
+{
+ _dispatch_object_debug(dm, "%s", __func__);
+ dispatch_mach_refs_t dr = dm->ds_refs;
+ if (dm->ds_handler_is_block && dr->dm_handler_ctxt) {
+ Block_release(dr->dm_handler_ctxt);
+ }
+ free(dr);
+ free(dm->dm_refs);
+ _dispatch_queue_destroy(dm);
+}
+
+void
+dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
+ mach_port_t send, dispatch_mach_msg_t checkin)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ dispatch_kevent_t dk;
+
+ if (MACH_PORT_VALID(receive)) {
+ dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
+ dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
+ dk->dk_kevent.ident = receive;
+ dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
+ dk->dk_kevent.udata = (uintptr_t)dk;
+ TAILQ_INIT(&dk->dk_sources);
+ dm->ds_dkev = dk;
+ dm->ds_pending_data_mask = dk->dk_kevent.fflags;
+ _dispatch_retain(dm); // the reference the manager queue holds
+ }
+ dr->dm_send = send;
+ if (MACH_PORT_VALID(send)) {
+ if (checkin) {
+ dispatch_retain(checkin);
+ dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
+ }
+ dr->dm_checkin = checkin;
+ }
+ // monitor message reply ports
+ dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
+ if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
+ DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
+ DISPATCH_CLIENT_CRASH("Channel already connected");
+ }
+ _dispatch_object_debug(dm, "%s", __func__);
+ return dispatch_resume(dm);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
+ dispatch_mach_reply_refs_t dmr, bool disconnected)
+{
+ dispatch_kevent_t dk = dmr->dm_dkev;
+ mach_port_t local_port = (mach_port_t)dk->dk_kevent.ident;
+ TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
+ _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
+ TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dm_list);
+ free(dmr);
+ if (disconnected) {
+ _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
+ }
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
+ void *ctxt)
+{
+ dispatch_kevent_t dk;
+ dispatch_mach_reply_refs_t dmr;
+
+ dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
+ dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
+ dk->dk_kevent.ident = reply;
+ dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
+ dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
+ dk->dk_kevent.udata = (uintptr_t)dk;
+ // make reply context visible to leaks rdar://11777199
+ dk->dk_kevent.ext[1] = (uintptr_t)ctxt;
+ TAILQ_INIT(&dk->dk_sources);
+
+ dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
+ dmr->dr_source_wref = _dispatch_ptr2wref(dm);
+ dmr->dm_dkev = dk;
+
+ _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
+ ctxt);
+ uint32_t flags;
+ bool do_resume = _dispatch_kevent_register(&dmr->dm_dkev, &flags);
+ TAILQ_INSERT_TAIL(&dmr->dm_dkev->dk_sources, (dispatch_source_refs_t)dmr,
+ dr_list);
+ TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dm_list);
+ if (do_resume && _dispatch_kevent_resume(dmr->dm_dkev, flags, 0)) {
+ _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
+ }
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_kevent_unregister(dispatch_mach_t dm)
+{
+ dispatch_kevent_t dk = dm->dm_dkev;
+ dm->dm_dkev = NULL;
+ TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
+ dr_list);
+ dm->ds_pending_data_mask &= ~(unsigned long)
+ (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
+ _dispatch_kevent_unregister(dk,
+ DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
+{
+ dispatch_kevent_t dk;
+
+ dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
+ dk->dk_kevent = _dispatch_source_type_mach_send.ke;
+ dk->dk_kevent.ident = send;
+ dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
+ dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
+ dk->dk_kevent.udata = (uintptr_t)dk;
+ TAILQ_INIT(&dk->dk_sources);
+
+ dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
+
+ uint32_t flags;
+ bool do_resume = _dispatch_kevent_register(&dk, &flags);
+ TAILQ_INSERT_TAIL(&dk->dk_sources,
+ (dispatch_source_refs_t)dm->dm_refs, dr_list);
+ dm->dm_dkev = dk;
+ if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
+ _dispatch_mach_kevent_unregister(dm);
+ }
+}
+
+static inline void
+_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou)
+{
+ return _dispatch_queue_push(dm._dq, dou);
+}
+
+static inline void
+_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
+{
+ dou._do->do_suspend_cnt = (unsigned int)options;
+}
+
+static inline mach_msg_option_t
+_dispatch_mach_msg_get_options(dispatch_object_t dou)
+{
+ mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
+ return options;
+}
+
+static inline void
+_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
+ unsigned long reason)
+{
+ dispatch_assert_zero(reason & ~(unsigned long)code_emask);
+ dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err :
+ err_local|err_sub(0x3e0)|(mach_error_t)reason);
+}
+
+static inline unsigned long
+_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
+{
+ mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
+ dou._do->do_suspend_cnt = 0;
+ if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
+ *err_ptr = 0;
+ return err_get_code(err);
+ }
+ *err_ptr = err;
+ return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
+}
+
+static void
+_dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr,
+ mach_msg_size_t siz)
+{
+ _dispatch_debug_machport(hdr->msgh_remote_port);
+ _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
+ hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
+ if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
+ return _dispatch_kevent_mach_msg_destroy(hdr);
+ }
+ dispatch_mach_msg_t dmsg;
+ dispatch_mach_msg_destructor_t destructor;
+ destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
+ DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
+ DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
+ dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
+ _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
+ return _dispatch_mach_push(dm, dmsg);
+}
+
+static inline mach_port_t
+_dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
+{
+ mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
+ mach_port_t remote = hdr->msgh_remote_port;
+ return remote;
+}
+
+static inline mach_port_t
+_dispatch_mach_msg_get_reply_port(dispatch_mach_t dm, dispatch_object_t dou)
+{
+ mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
+ mach_port_t reply = MACH_PORT_NULL;
+ mach_msg_option_t msg_opts = _dispatch_mach_msg_get_options(dou);
+ if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
+ reply = hdr->msgh_reserved;
+ hdr->msgh_reserved = 0;
+ } else if (MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) ==
+ MACH_MSG_TYPE_MAKE_SEND_ONCE &&
+ MACH_PORT_VALID(hdr->msgh_local_port) && (!dm->ds_dkev ||
+ dm->ds_dkev->dk_kevent.ident != hdr->msgh_local_port)) {
+ reply = hdr->msgh_local_port;
+ }
+ return reply;
+}
+
+static inline void
+_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
+ mach_port_t remote_port)
+{
+ mach_msg_header_t *hdr;
+ dispatch_mach_msg_t dmsg;
+ dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
+ DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
+ if (local_port) hdr->msgh_local_port = local_port;
+ if (remote_port) hdr->msgh_remote_port = remote_port;
+ _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
+ return _dispatch_mach_push(dm, dmsg);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
+{
+ mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dou);
+ _dispatch_mach_msg_set_reason(dou, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
+ _dispatch_mach_push(dm, dou);
+ if (reply) {
+ _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
+ }
+}
+
+DISPATCH_NOINLINE
+static dispatch_object_t
+_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ dispatch_mach_msg_t dmsg = dou._dmsg;
+ dr->dm_needs_mgr = 0;
+ if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
+ // send initial checkin message
+ if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
+ &_dispatch_mgr_q)) {
+ // send kevent must be uninstalled on the manager queue
+ dr->dm_needs_mgr = 1;
+ goto out;
+ }
+ dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
+ if (slowpath(dr->dm_checkin)) {
+ goto out;
+ }
+ }
+ mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
+ mach_msg_return_t kr = 0;
+ mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dmsg);
+ mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
+ if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
+ opts = MACH_SEND_MSG | (msg_opts & DISPATCH_MACH_OPTIONS_MASK);
+ if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
+ MACH_MSG_TYPE_MOVE_SEND_ONCE) {
+ if (dmsg != dr->dm_checkin) {
+ msg->msgh_remote_port = dr->dm_send;
+ }
+ if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
+ if (slowpath(!dm->dm_dkev)) {
+ _dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
+ }
+ if (fastpath(dm->dm_dkev)) {
+ if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
+ goto out;
+ }
+ opts |= MACH_SEND_NOTIFY;
+ }
+ }
+ opts |= MACH_SEND_TIMEOUT;
+ }
+ _dispatch_debug_machport(msg->msgh_remote_port);
+ if (reply) _dispatch_debug_machport(reply);
+ kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
+ MACH_PORT_NULL);
+ }
+ _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, opts 0x%x, "
+ "msg_opts 0x%x, reply on 0x%08x: %s - 0x%x", msg->msgh_remote_port,
+ msg->msgh_id, dmsg->do_ctxt, opts, msg_opts, reply,
+ mach_error_string(kr), kr);
+ if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
+ if (opts & MACH_SEND_NOTIFY) {
+ _dispatch_debug("machport[0x%08x]: send-possible notification "
+ "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
+ DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
+ } else {
+ // send kevent must be installed on the manager queue
+ dr->dm_needs_mgr = 1;
+ }
+ if (reply) {
+ _dispatch_mach_msg_set_options(dmsg, msg_opts |
+ DISPATCH_MACH_PSEUDO_RECEIVED);
+ msg->msgh_reserved = reply; // Remember the original reply port
+ }
+ goto out;
+ }
+ if (fastpath(!kr) && reply) {
+ if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
+ // reply receive kevent must be installed on the manager queue
+ dr->dm_needs_mgr = 1;
+ _dispatch_mach_msg_set_options(dmsg, msg_opts |
+ DISPATCH_MACH_REGISTER_FOR_REPLY);
+ if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
+ msg->msgh_reserved = reply; // Remember the original reply port
+ }
+ goto out;
+ }
+ _dispatch_mach_reply_kevent_register(dm, reply, dmsg->do_ctxt);
+ }
+ if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
+ _dispatch_mach_kevent_unregister(dm);
+ }
+ _dispatch_mach_msg_set_reason(dmsg, kr, 0);
+ _dispatch_mach_push(dm, dmsg);
+ dmsg = NULL;
+ if (slowpath(kr) && reply) {
+ // Send failed, so reply was never connected <rdar://problem/14309159>
+ _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
+ }
+out:
+ return (dispatch_object_t)dmsg;
+}
+
+static void
+_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ struct dispatch_object_s *prev, *dc = dou._do;
+ dc->do_next = NULL;
+
+ prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
+ if (fastpath(prev)) {
+ prev->do_next = dc;
+ return;
+ }
+ dr->dm_head = dc;
+ _dispatch_wakeup(dm);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_send_drain(dispatch_mach_t dm)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ struct dispatch_object_s *dc = NULL, *next_dc = NULL;
+ while (dr->dm_tail) {
+ while (!(dc = fastpath(dr->dm_head))) {
+ dispatch_hardware_pause();
+ }
+ do {
+ next_dc = fastpath(dc->do_next);
+ dr->dm_head = next_dc;
+ if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
+ relaxed)) {
+ // Enqueue is TIGHTLY controlled, we won't wait long.
+ while (!(next_dc = fastpath(dc->do_next))) {
+ dispatch_hardware_pause();
+ }
+ dr->dm_head = next_dc;
+ }
+ if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
+ if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
+ // send barrier
+ // leave send queue locked until barrier has completed
+ return _dispatch_mach_push(dm, dc);
+ }
+#if DISPATCH_MACH_SEND_SYNC
+ if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
+ _dispatch_thread_semaphore_signal(
+ (_dispatch_thread_semaphore_t)dc->do_ctxt);
+ continue;
+ }
+#endif // DISPATCH_MACH_SEND_SYNC
+ if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
+ goto out;
+ }
+ continue;
+ }
+ if (slowpath(dr->dm_disconnect_cnt) ||
+ slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
+ _dispatch_mach_msg_not_sent(dm, dc);
+ continue;
+ }
+ if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
+ goto out;
+ }
+ } while ((dc = next_dc));
+ }
+out:
+ // if this is not a complete drain, we must undo some things
+ if (slowpath(dc)) {
+ if (!next_dc &&
+ !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
+ // wait for enqueue slow path to finish
+ while (!(next_dc = fastpath(dr->dm_head))) {
+ dispatch_hardware_pause();
+ }
+ dc->do_next = next_dc;
+ }
+ dr->dm_head = dc;
+ }
+ (void)dispatch_atomic_dec2o(dr, dm_sending, release);
+ _dispatch_wakeup(dm);
+}
+
+static inline void
+_dispatch_mach_send(dispatch_mach_t dm)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
+ dm_sending, 0, 1, acquire))) {
+ return;
+ }
+ _dispatch_object_debug(dm, "%s", __func__);
+ _dispatch_mach_send_drain(dm);
+}
+
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
+{
+ if (!(ke->fflags & dm->ds_pending_data_mask)) {
+ return;
+ }
+ _dispatch_mach_send(dm);
+}
+
+DISPATCH_NOINLINE
+void
+dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
+ mach_msg_option_t options)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
+ DISPATCH_CLIENT_CRASH("Message already enqueued");
+ }
+ dispatch_retain(dmsg);
+ dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
+ _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
+ if (slowpath(dr->dm_tail) || slowpath(dr->dm_disconnect_cnt) ||
+ slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
+ slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
+ acquire))) {
+ return _dispatch_mach_send_push(dm, dmsg);
+ }
+ if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
+ (void)dispatch_atomic_dec2o(dr, dm_sending, release);
+ return _dispatch_mach_send_push(dm, dmsg);
+ }
+ if (slowpath(dr->dm_tail)) {
+ return _dispatch_mach_send_drain(dm);
+ }
+ (void)dispatch_atomic_dec2o(dr, dm_sending, release);
+ _dispatch_wakeup(dm);
+}
+
+static void
+_dispatch_mach_disconnect(dispatch_mach_t dm)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ if (dm->dm_dkev) {
+ _dispatch_mach_kevent_unregister(dm);
+ }
+ if (MACH_PORT_VALID(dr->dm_send)) {
+ _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
+ }
+ dr->dm_send = MACH_PORT_NULL;
+ if (dr->dm_checkin) {
+ _dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
+ dr->dm_checkin = NULL;
+ }
+ if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
+ dispatch_mach_reply_refs_t dmr, tmp;
+ TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dm_list, tmp){
+ _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
+ }
+ }
+}
+
+DISPATCH_NOINLINE
+static bool
+_dispatch_mach_cancel(dispatch_mach_t dm)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
+ return false;
+ }
+ _dispatch_object_debug(dm, "%s", __func__);
+ _dispatch_mach_disconnect(dm);
+ if (dm->ds_dkev) {
+ mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
+ _dispatch_source_kevent_unregister((dispatch_source_t)dm);
+ _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
+ }
+ (void)dispatch_atomic_dec2o(dr, dm_sending, release);
+ return true;
+}
+
+DISPATCH_NOINLINE
+static bool
+_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
+{
+ if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
+ if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
+ // send/reply kevents must be uninstalled on the manager queue
+ return false;
+ }
+ }
+ _dispatch_mach_disconnect(dm);
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ dr->dm_checkin = dou._dc->dc_data;
+ dr->dm_send = (mach_port_t)dou._dc->dc_other;
+ _dispatch_continuation_free(dou._dc);
+ (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
+ _dispatch_object_debug(dm, "%s", __func__);
+ return true;
+}
+
+DISPATCH_NOINLINE
+void
+dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
+ dispatch_mach_msg_t checkin)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
+ if (MACH_PORT_VALID(send) && checkin) {
+ dispatch_retain(checkin);
+ dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
+ } else {
+ checkin = NULL;
+ dr->dm_checkin_port = MACH_PORT_NULL;
+ }
+ dispatch_continuation_t dc = _dispatch_continuation_alloc();
+ dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
+ dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
+ dc->dc_ctxt = dc;
+ dc->dc_data = checkin;
+ dc->dc_other = (void*)(uintptr_t)send;
+ return _dispatch_mach_send_push(dm, dc);
+}
+
+#if DISPATCH_MACH_SEND_SYNC
+DISPATCH_NOINLINE
+static void
+_dispatch_mach_send_sync_slow(dispatch_mach_t dm)
+{
+ _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
+ struct dispatch_object_s dc = {
+ .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
+ .do_ctxt = (void*)sema,
+ };
+ _dispatch_mach_send_push(dm, &dc);
+ _dispatch_thread_semaphore_wait(sema);
+ _dispatch_put_thread_semaphore(sema);
+}
+#endif // DISPATCH_MACH_SEND_SYNC
+
+DISPATCH_NOINLINE
+mach_port_t
+dispatch_mach_get_checkin_port(dispatch_mach_t dm)
+{
+ dispatch_mach_send_refs_t dr = dm->dm_refs;
+ if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
+ return MACH_PORT_DEAD;
+ }
+ return dr->dm_checkin_port;
+}